diff --git a/crawl.go b/crawl.go index 9046540..80dc58e 100644 --- a/crawl.go +++ b/crawl.go @@ -165,22 +165,27 @@ func (f *File) applyContentLength(v string) { f.Size = size } +// TODO Cleanup func (f *File) applyLastModified(v string) { if v == "" { return } + var t time.Time var err error - f.MTime, err = time.Parse(time.RFC1123, v) + t, err = time.Parse(time.RFC1123, v) if err == nil { + f.MTime = t.Unix() return } - f.MTime, err = time.Parse(time.RFC850, v) + t, err = time.Parse(time.RFC850, v) if err == nil { + f.MTime = t.Unix() return } // TODO Parse asctime - f.MTime, err = time.Parse("2006-01-02", v[:10]) + t, err = time.Parse("2006-01-02", v[:10]) if err == nil { + f.MTime = t.Unix() return } } diff --git a/ds/redblackhash/redblack.go b/ds/redblackhash/redblack.go index a198ba6..95084c2 100644 --- a/ds/redblackhash/redblack.go +++ b/ds/redblackhash/redblack.go @@ -16,6 +16,7 @@ package redblackhash import ( "bytes" "fmt" + "sync" ) const ( @@ -28,6 +29,7 @@ type Key [KeySize]byte // Tree holds elements of the red-black tree type Tree struct { + sync.Mutex Root *Node size int } diff --git a/main.go b/main.go index 17e824d..9285d56 100644 --- a/main.go +++ b/main.go @@ -83,11 +83,7 @@ func cmdBase(clic *cli.Context) error { time.Sleep(30 * time.Second) continue } - globalWait.Add(1) - inRemotes <- &OD { - Task: t, - BaseUri: baseUri, - } + ScheduleTask(inRemotes, t, &baseUri) } } @@ -122,14 +118,11 @@ func cmdCrawler(clic *cli.Context) error { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() - globalWait.Add(1) - inRemotes <- &OD { - Task: &Task{ - WebsiteId: 0, - Url: u.String(), - }, - BaseUri: u, + task := Task { + WebsiteId: 0, + Url: u.String(), } + ScheduleTask(inRemotes, &task, &u) // Wait for all jobs to finish globalWait.Wait() diff --git a/model.go b/model.go index 150d1e3..d0509b6 100644 --- a/model.go +++ b/model.go @@ -7,6 +7,21 @@ import ( "time" ) +type Task struct { + WebsiteId uint64 `json:"website_id"` + Url string `json:"url"` +} + +type TaskResult struct { + StatusCode string `json:"status_code"` + FileCount uint64 `json:"file_count"` + ErrorCount uint64 `json:"-"` + StartTime time.Time `json:"-"` + StartTimeUnix int64 `json:"start_time"` + EndTimeUnix int64 `json:"end_time"` + WebsiteId uint64 `json:"website_id"` +} + type Job struct { OD *OD Uri fasturl.URL @@ -16,26 +31,25 @@ type Job struct { } type OD struct { - Task *Task + Task Task + Result TaskResult Wait sync.WaitGroup BaseUri fasturl.URL WCtx WorkerContext Scanned redblackhash.Tree - - lock sync.Mutex } type File struct { - Name string `json:"name"` - Size int64 `json:"size"` - MTime time.Time `json:"mtime"` - Path string `json:"path"` - IsDir bool `json:"-"` + Name string `json:"name"` + Size int64 `json:"size"` + MTime int64 `json:"mtime"` + Path string `json:"path"` + IsDir bool `json:"-"` } func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) { - o.lock.Lock() - defer o.lock.Unlock() + o.Scanned.Lock() + defer o.Scanned.Unlock() exists = o.Scanned.Get(k) if exists { return true } diff --git a/scheduler.go b/scheduler.go index 1d43ea0..e3cb939 100644 --- a/scheduler.go +++ b/scheduler.go @@ -8,6 +8,7 @@ import ( "github.com/terorie/od-database-crawler/fasturl" "os" "path" + "sync" "sync/atomic" "time" ) @@ -55,28 +56,80 @@ func Schedule(c context.Context, remotes <-chan *OD) { } } -func (r *OD) Watch(results chan File) { - go r.Task.Collect(results) - - // Wait for all jobs on remote to finish - r.Wait.Wait() - close(r.WCtx.in) - atomic.AddInt32(&activeTasks, -1) - - logrus.WithField("url", r.BaseUri.String()). - Info("Crawler finished") - - globalWait.Done() - - close(results) +func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { + globalWait.Add(1) + now := time.Now() + od := &OD { + Task: *t, + BaseUri: *u, + Result: TaskResult { + WebsiteId: t.WebsiteId, + StartTime: now, + StartTimeUnix: now.Unix(), + }, + } + remotes <- od } -func (t *Task) Collect(results chan File) { +func (o *OD) Watch(results chan File) { + // Wait for the file to be fully written + var fileLock sync.Mutex + fileLock.Lock() + + go o.Task.Collect(results, &fileLock) + + // Wait for all jobs on remote to finish + o.Wait.Wait() + close(o.WCtx.in) + atomic.AddInt32(&activeTasks, -1) + + // Log finish + + logrus. + WithField("url", o.BaseUri.String()). + WithField("duration", time.Since(o.Result.StartTime)). + Info("Crawler finished") + + // Set status code + now := time.Now() + o.Result.EndTimeUnix = now.Unix() + fileCount := atomic.LoadUint64(&o.Result.FileCount) + if fileCount == 0 { + errorCount := atomic.LoadUint64(&o.Result.ErrorCount) + if errorCount == 0 { + o.Result.StatusCode = "empty" + } else { + o.Result.StatusCode = "directory listing failed" + } + } else { + o.Result.StatusCode = "success" + } + + // Shut down Collect() + close(results) + + // Wait for results to sync to file + fileLock.Lock() + fileLock.Unlock() + + // Upload results + err := PushResult(&o.Result) + if err != nil { + logrus.WithError(err). + Error("Failed uploading result") + } + + // Mark job as completely done + globalWait.Done() +} + +func (t *Task) Collect(results chan File, done *sync.Mutex) { err := t.collect(results) if err != nil { logrus.WithError(err). Error("Failed saving crawl results") } + done.Unlock() } func (t *Task) collect(results chan File) error { diff --git a/server.go b/server.go index c3987da..8430dac 100644 --- a/server.go +++ b/server.go @@ -12,7 +12,6 @@ import ( "os" "path/filepath" "strconv" - "strings" ) const ( @@ -31,7 +30,7 @@ func FetchTask() (t *Task, err error) { switch res.StatusCode { case 200: break - case 500: + case 404, 500: return nil, nil default: return nil, fmt.Errorf("http %s", res.Status) @@ -45,6 +44,11 @@ func FetchTask() (t *Task, err error) { } func PushResult(result *TaskResult) (err error) { + if result.WebsiteId == 0 { + // Not a real result, don't push + return nil + } + filePath := filepath.Join( ".", "crawled", fmt.Sprintf("%d.json", result.WebsiteId)) @@ -83,34 +87,41 @@ func PushResult(result *TaskResult) (err error) { return } -func uploadChunks(websiteId uint64, f *os.File) (err error) { - for iter := 1; iter > 0; iter++ { +func uploadChunks(websiteId uint64, f *os.File) error { + eof := false + for iter := 1; !eof; iter++ { // TODO Stream with io.Pipe? var b bytes.Buffer multi := multipart.NewWriter(&b) // Set upload fields + var err error err = multi.WriteField("token", config.Token) - if err != nil { return } + if err != nil { return err } err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId)) - if err != nil { return } + if err != nil { return err } // Copy chunk to file_list formFile, err := multi.CreateFormFile("file_list", "file_list") - _, err = io.CopyN(formFile, f, fileListChunkSize) - if err == io.EOF { - break - } else if err == io.ErrUnexpectedEOF { + var n int64 + n, err = io.CopyN(formFile, f, fileListChunkSize) + if err != io.EOF { + return err + } + if n < fileListChunkSize { err = nil // Break at end of iteration - iter = -420 + eof = true } + multi.Close() + req, err := http.NewRequest( http.MethodPost, config.ServerUrl + "/task/upload", &b) + req.Header.Set("content-type", multi.FormDataContentType()) if err != nil { return err } res, err := serverClient.Do(req) @@ -125,49 +136,38 @@ func uploadChunks(websiteId uint64, f *os.File) (err error) { logrus.Infof("Uploading file list part %d: %s", iter, res.Status) } - return + return nil } func uploadResult(result *TaskResult) (err error) { resultEnc, err := json.Marshal(result) if err != nil { panic(err) } - payload := url.Values { - "token": {config.Token}, - "result": {string(resultEnc)}, - }.Encode() - - req, err := http.NewRequest( - http.MethodPost, + res, err := serverClient.PostForm( config.ServerUrl + "/task/complete", - strings.NewReader(payload)) - if err != nil { return } - - res, err := serverClient.Do(req) + url.Values { + "token": {config.Token}, + "result": {string(resultEnc)}, + }, + ) if err != nil { return } res.Body.Close() if res.StatusCode != http.StatusOK { - return fmt.Errorf("failed to cancel task: %s", res.Status) + return fmt.Errorf("%s", res.Status) } return } func CancelTask(websiteId uint64) (err error) { - form := url.Values{ - "token": {config.Token}, - "website_id": {strconv.FormatUint(websiteId, 10)}, - } - encForm := form.Encode() - - req, err := http.NewRequest( - http.MethodPost, + res, err := serverClient.PostForm( config.ServerUrl + "/task/cancel", - strings.NewReader(encForm)) - if err != nil { return } - - res, err := serverClient.Do(req) + url.Values{ + "token": {config.Token}, + "website_id": {strconv.FormatUint(websiteId, 10)}, + }, + ) if err != nil { return } res.Body.Close() diff --git a/stats.go b/stats.go index 64337e1..73a846e 100644 --- a/stats.go +++ b/stats.go @@ -57,7 +57,7 @@ func Stats(c context.Context) { runtime.ReadMemStats(&mem) logrus.WithFields(logrus.Fields{ - "queue_count": totalBuffered, + "queue_count": atomic.LoadInt64(&totalBuffered), "heap": FormatByteCount(mem.Alloc), "objects": mem.HeapObjects, "num_gc": mem.NumGC, diff --git a/tasks.go b/tasks.go deleted file mode 100644 index a92e35a..0000000 --- a/tasks.go +++ /dev/null @@ -1,16 +0,0 @@ -package main - -import "time" - -type Task struct { - WebsiteId int `json:"website_id"` - Url string `json:"url"` -} - -type TaskResult struct { - StatusCode int `json:"status_code"` - FileCount uint64 `json:"file_count"` - StartTime time.Time `json:"start_time"` - EndTime time.Time `json:"end_time"` - WebsiteId uint64 `json:"website_id"` -} diff --git a/worker.go b/worker.go index 8066875..704d55b 100644 --- a/worker.go +++ b/worker.go @@ -43,6 +43,7 @@ func (w WorkerContext) step(results chan<- File, job Job) { if httpErr, ok := err.(HttpError); ok { switch httpErr.code { case + fasthttp.StatusFound, fasthttp.StatusUnauthorized, fasthttp.StatusForbidden, fasthttp.StatusNotFound: @@ -137,6 +138,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) { Error("Failed getting file") return nil, err } + atomic.AddUint64(&job.OD.Result.FileCount, 1) } return }