Fix ton of bugs

This commit is contained in:
Richard Patel 2018-11-17 04:18:22 +01:00
parent 0fe97a8058
commit d596882b40
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
9 changed files with 146 additions and 93 deletions

View File

@ -165,22 +165,27 @@ func (f *File) applyContentLength(v string) {
f.Size = size
}
// TODO Cleanup
func (f *File) applyLastModified(v string) {
if v == "" {
return
}
var t time.Time
var err error
f.MTime, err = time.Parse(time.RFC1123, v)
t, err = time.Parse(time.RFC1123, v)
if err == nil {
f.MTime = t.Unix()
return
}
f.MTime, err = time.Parse(time.RFC850, v)
t, err = time.Parse(time.RFC850, v)
if err == nil {
f.MTime = t.Unix()
return
}
// TODO Parse asctime
f.MTime, err = time.Parse("2006-01-02", v[:10])
t, err = time.Parse("2006-01-02", v[:10])
if err == nil {
f.MTime = t.Unix()
return
}
}

View File

@ -16,6 +16,7 @@ package redblackhash
import (
"bytes"
"fmt"
"sync"
)
const (
@ -28,6 +29,7 @@ type Key [KeySize]byte
// Tree holds elements of the red-black tree
type Tree struct {
sync.Mutex
Root *Node
size int
}

17
main.go
View File

@ -83,11 +83,7 @@ func cmdBase(clic *cli.Context) error {
time.Sleep(30 * time.Second)
continue
}
globalWait.Add(1)
inRemotes <- &OD {
Task: t,
BaseUri: baseUri,
}
ScheduleTask(inRemotes, t, &baseUri)
}
}
@ -122,14 +118,11 @@ func cmdCrawler(clic *cli.Context) error {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
globalWait.Add(1)
inRemotes <- &OD {
Task: &Task{
WebsiteId: 0,
Url: u.String(),
},
BaseUri: u,
task := Task {
WebsiteId: 0,
Url: u.String(),
}
ScheduleTask(inRemotes, &task, &u)
// Wait for all jobs to finish
globalWait.Wait()

View File

@ -7,6 +7,21 @@ import (
"time"
)
type Task struct {
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
}
type TaskResult struct {
StatusCode string `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
}
type Job struct {
OD *OD
Uri fasturl.URL
@ -16,26 +31,25 @@ type Job struct {
}
type OD struct {
Task *Task
Task Task
Result TaskResult
Wait sync.WaitGroup
BaseUri fasturl.URL
WCtx WorkerContext
Scanned redblackhash.Tree
lock sync.Mutex
}
type File struct {
Name string `json:"name"`
Size int64 `json:"size"`
MTime time.Time `json:"mtime"`
Path string `json:"path"`
IsDir bool `json:"-"`
Name string `json:"name"`
Size int64 `json:"size"`
MTime int64 `json:"mtime"`
Path string `json:"path"`
IsDir bool `json:"-"`
}
func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
o.lock.Lock()
defer o.lock.Unlock()
o.Scanned.Lock()
defer o.Scanned.Unlock()
exists = o.Scanned.Get(k)
if exists { return true }

View File

@ -8,6 +8,7 @@ import (
"github.com/terorie/od-database-crawler/fasturl"
"os"
"path"
"sync"
"sync/atomic"
"time"
)
@ -55,28 +56,80 @@ func Schedule(c context.Context, remotes <-chan *OD) {
}
}
func (r *OD) Watch(results chan File) {
go r.Task.Collect(results)
// Wait for all jobs on remote to finish
r.Wait.Wait()
close(r.WCtx.in)
atomic.AddInt32(&activeTasks, -1)
logrus.WithField("url", r.BaseUri.String()).
Info("Crawler finished")
globalWait.Done()
close(results)
func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
globalWait.Add(1)
now := time.Now()
od := &OD {
Task: *t,
BaseUri: *u,
Result: TaskResult {
WebsiteId: t.WebsiteId,
StartTime: now,
StartTimeUnix: now.Unix(),
},
}
remotes <- od
}
func (t *Task) Collect(results chan File) {
func (o *OD) Watch(results chan File) {
// Wait for the file to be fully written
var fileLock sync.Mutex
fileLock.Lock()
go o.Task.Collect(results, &fileLock)
// Wait for all jobs on remote to finish
o.Wait.Wait()
close(o.WCtx.in)
atomic.AddInt32(&activeTasks, -1)
// Log finish
logrus.
WithField("url", o.BaseUri.String()).
WithField("duration", time.Since(o.Result.StartTime)).
Info("Crawler finished")
// Set status code
now := time.Now()
o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount)
if fileCount == 0 {
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
} else {
o.Result.StatusCode = "success"
}
// Shut down Collect()
close(results)
// Wait for results to sync to file
fileLock.Lock()
fileLock.Unlock()
// Upload results
err := PushResult(&o.Result)
if err != nil {
logrus.WithError(err).
Error("Failed uploading result")
}
// Mark job as completely done
globalWait.Done()
}
func (t *Task) Collect(results chan File, done *sync.Mutex) {
err := t.collect(results)
if err != nil {
logrus.WithError(err).
Error("Failed saving crawl results")
}
done.Unlock()
}
func (t *Task) collect(results chan File) error {

View File

@ -12,7 +12,6 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
)
const (
@ -31,7 +30,7 @@ func FetchTask() (t *Task, err error) {
switch res.StatusCode {
case 200:
break
case 500:
case 404, 500:
return nil, nil
default:
return nil, fmt.Errorf("http %s", res.Status)
@ -45,6 +44,11 @@ func FetchTask() (t *Task, err error) {
}
func PushResult(result *TaskResult) (err error) {
if result.WebsiteId == 0 {
// Not a real result, don't push
return nil
}
filePath := filepath.Join(
".", "crawled",
fmt.Sprintf("%d.json", result.WebsiteId))
@ -83,34 +87,41 @@ func PushResult(result *TaskResult) (err error) {
return
}
func uploadChunks(websiteId uint64, f *os.File) (err error) {
for iter := 1; iter > 0; iter++ {
func uploadChunks(websiteId uint64, f *os.File) error {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
multi := multipart.NewWriter(&b)
// Set upload fields
var err error
err = multi.WriteField("token", config.Token)
if err != nil { return }
if err != nil { return err }
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return }
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
_, err = io.CopyN(formFile, f, fileListChunkSize)
if err == io.EOF {
break
} else if err == io.ErrUnexpectedEOF {
var n int64
n, err = io.CopyN(formFile, f, fileListChunkSize)
if err != io.EOF {
return err
}
if n < fileListChunkSize {
err = nil
// Break at end of iteration
iter = -420
eof = true
}
multi.Close()
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { return err }
res, err := serverClient.Do(req)
@ -125,49 +136,38 @@ func uploadChunks(websiteId uint64, f *os.File) (err error) {
logrus.Infof("Uploading file list part %d: %s",
iter, res.Status)
}
return
return nil
}
func uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
payload := url.Values {
"token": {config.Token},
"result": {string(resultEnc)},
}.Encode()
req, err := http.NewRequest(
http.MethodPost,
res, err := serverClient.PostForm(
config.ServerUrl + "/task/complete",
strings.NewReader(payload))
if err != nil { return }
res, err := serverClient.Do(req)
url.Values {
"token": {config.Token},
"result": {string(resultEnc)},
},
)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
return fmt.Errorf("%s", res.Status)
}
return
}
func CancelTask(websiteId uint64) (err error) {
form := url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
}
encForm := form.Encode()
req, err := http.NewRequest(
http.MethodPost,
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
strings.NewReader(encForm))
if err != nil { return }
res, err := serverClient.Do(req)
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()

View File

@ -57,7 +57,7 @@ func Stats(c context.Context) {
runtime.ReadMemStats(&mem)
logrus.WithFields(logrus.Fields{
"queue_count": totalBuffered,
"queue_count": atomic.LoadInt64(&totalBuffered),
"heap": FormatByteCount(mem.Alloc),
"objects": mem.HeapObjects,
"num_gc": mem.NumGC,

View File

@ -1,16 +0,0 @@
package main
import "time"
type Task struct {
WebsiteId int `json:"website_id"`
Url string `json:"url"`
}
type TaskResult struct {
StatusCode int `json:"status_code"`
FileCount uint64 `json:"file_count"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
}

View File

@ -43,6 +43,7 @@ func (w WorkerContext) step(results chan<- File, job Job) {
if httpErr, ok := err.(HttpError); ok {
switch httpErr.code {
case
fasthttp.StatusFound,
fasthttp.StatusUnauthorized,
fasthttp.StatusForbidden,
fasthttp.StatusNotFound:
@ -137,6 +138,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
Error("Failed getting file")
return nil, err
}
atomic.AddUint64(&job.OD.Result.FileCount, 1)
}
return
}