From 339175220d91f39aa073d33d67b8d05d2c1e0bb1 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Sun, 18 Nov 2018 00:15:08 +0100 Subject: [PATCH] Refactor uploading & chunk size parameter --- config.go | 12 +++++- config.yml | 3 ++ main.go | 8 +++- scheduler.go | 108 ++++++++++++++++++++++++++++++++++----------------- 4 files changed, 92 insertions(+), 39 deletions(-) diff --git a/config.go b/config.go index d6f81e4..5e669d5 100644 --- a/config.go +++ b/config.go @@ -12,6 +12,7 @@ import ( var config struct { ServerUrl string Token string + ChunkSize uint Retries int Workers int Timeout time.Duration @@ -25,6 +26,7 @@ var config struct { const ( ConfServerUrl = "server.url" ConfToken = "server.token" + ConfChunkSize = "server.upload_chunk" ConfTasks = "crawl.tasks" ConfRetries = "crawl.retries" ConfWorkers = "crawl.connections" @@ -44,6 +46,7 @@ func prepareConfig() { viper.SetDefault(ConfAllocStats, 0) viper.SetDefault(ConfVerbose, false) viper.SetDefault(ConfPrintHTTP, false) + viper.SetDefault(ConfChunkSize, "1 MB") } func readConfig() { @@ -66,6 +69,11 @@ func readConfig() { configMissing(ConfToken) } + config.ChunkSize = viper.GetSizeInBytes(ConfChunkSize) + if config.ChunkSize < 100 { + configOOB(ConfChunkSize, config.ChunkSize) + } + config.Retries = viper.GetInt(ConfRetries) if config.Retries < 0 { config.Retries = 1 << 31 @@ -100,7 +108,7 @@ func configMissing(key string) { os.Exit(1) } -func configOOB(key string, v int) { - fmt.Fprintf(os.Stderr, "config: illegal value %d for %key!\n", v, key) +func configOOB(key string, v interface{}) { + fmt.Fprintf(os.Stderr, "config: illegal value %v for key %s!\n", v, key) os.Exit(1) } diff --git a/config.yml b/config.yml index 8e4b0bd..945a503 100644 --- a/config.yml +++ b/config.yml @@ -4,6 +4,9 @@ server: url: http://od-db.mine.terorie.com/api # Server auth token token: + # Upload chunk size + # If the value is too high, the upload fails. + upload_chunk: 1 MB # Log output settings output: diff --git a/main.go b/main.go index bba62a9..16ae21d 100644 --- a/main.go +++ b/main.go @@ -40,6 +40,12 @@ func main() { go func() { log.Println(http.ListenAndServe("localhost:42069", nil)) }() + + err := os.MkdirAll("crawled", 0755) + if err != nil { + panic(err) + } + app.Run(os.Args) } @@ -69,7 +75,7 @@ func cmdBase(_ *cli.Context) error { } if t == nil { // No new task - if atomic.LoadInt32(&activeTasks) == 0 { + if atomic.LoadInt32(&numActiveTasks) == 0 { logrus.Info("Waiting …") } continue diff --git a/scheduler.go b/scheduler.go index 7eb301b..6f6ed92 100644 --- a/scheduler.go +++ b/scheduler.go @@ -13,7 +13,9 @@ import ( "time" ) -var activeTasks int32 +var activeTasksLock sync.Mutex +var activeTasks = make(map[uint64]bool) +var numActiveTasks int32 var totalBuffered int64 func Schedule(c context.Context, remotes <-chan *OD) { @@ -33,7 +35,7 @@ func Schedule(c context.Context, remotes <-chan *OD) { } // Enqueue initial job - atomic.AddInt32(&activeTasks, 1) + atomic.AddInt32(&numActiveTasks, 1) remote.WCtx.queueJob(Job{ OD: remote, Uri: remote.BaseUri, @@ -45,7 +47,7 @@ func Schedule(c context.Context, remotes <-chan *OD) { go remote.Watch(results) // Sleep if max number of tasks are active - for atomic.LoadInt32(&activeTasks) > config.Tasks { + for atomic.LoadInt32(&numActiveTasks) > config.Tasks { select { case <-c.Done(): return @@ -57,6 +59,10 @@ func Schedule(c context.Context, remotes <-chan *OD) { } func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { + if !t.register() { + return + } + globalWait.Add(1) now := time.Now() od := &OD { @@ -71,40 +77,87 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { remotes <- od } +func (t *Task) register() bool { + activeTasksLock.Lock() + defer activeTasksLock.Unlock() + + if _, known := activeTasks[t.WebsiteId]; known { + return false + } else { + activeTasks[t.WebsiteId] = true + return true + } +} + +func (t *Task) unregister() { + activeTasksLock.Lock() + delete(activeTasks, t.WebsiteId) + activeTasksLock.Unlock() +} + func (o *OD) Watch(results chan File) { + // Mark job as completely done + defer globalWait.Done() + defer o.Task.unregister() + filePath := path.Join("crawled", fmt.Sprintf("%d.json", o.Task.WebsiteId)) // Open crawl results file - // TODO Print errors - err := os.MkdirAll("crawled", 0755) - if err != nil { return } f, err := os.OpenFile( filePath, os.O_CREATE | os.O_RDWR | os.O_TRUNC, 0644, ) - - if err != nil { return } + if err != nil { + logrus.WithError(err). + Error("Failed saving crawl results") + return + } defer f.Close() defer os.Remove(filePath) - // Wait for the file to be fully written - var fileLock sync.Mutex - fileLock.Lock() + // Listen for exit code of Collect() + collectErrC := make(chan error) - go o.Task.Collect(results, f, &fileLock) + // Block until all results are written + // (closes results channel) + o.handleCollect(results, f, collectErrC) + + // Exit code of Collect() + err = <-collectErrC + close(collectErrC) + if err != nil { + logrus.WithError(err). + Error("Failed saving crawl results") + return + } + + // Upload results + err = PushResult(&o.Result, f) + if err != nil { + logrus.WithError(err). + Error("Failed uploading crawl results") + return + } +} + +func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) { + // Begin collecting results + go o.Task.Collect(results, f, collectErrC) + defer close(results) // Wait for all jobs on remote to finish o.Wait.Wait() close(o.WCtx.in) - atomic.AddInt32(&activeTasks, -1) + atomic.AddInt32(&numActiveTasks, -1) // Log finish - logrus. - WithField("url", o.BaseUri.String()). - WithField("duration", time.Since(o.Result.StartTime)). - Info("Crawler finished") + logrus.WithFields(logrus.Fields{ + "id": o.Task.WebsiteId, + "url": o.BaseUri.String(), + "duration": time.Since(o.Result.StartTime), + }).Info("Crawler finished") // Set status code now := time.Now() @@ -120,32 +173,15 @@ func (o *OD) Watch(results chan File) { } else { o.Result.StatusCode = "success" } - - // Shut down Collect() - close(results) - - // Wait for results to sync to file - fileLock.Lock() - fileLock.Unlock() - - // Upload results - err = PushResult(&o.Result, f) - if err != nil { - logrus.WithError(err). - Error("Failed uploading result") - } - - // Mark job as completely done - globalWait.Done() } -func (t *Task) Collect(results chan File, f *os.File, done *sync.Mutex) { +func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) { err := t.collect(results, f) if err != nil { logrus.WithError(err). Error("Failed saving crawl results") } - done.Unlock() + errC <- err } func (t *Task) collect(results chan File, f *os.File) error {