diff --git a/scheduler.go b/scheduler.go index e3cb939..896d84c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -72,11 +72,22 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { } func (o *OD) Watch(results chan File) { + filePath := path.Join("crawled", fmt.Sprintf("%d.json", o.Task.WebsiteId)) + // Open crawl results file + f, err := os.OpenFile( + filePath, + os.O_CREATE | os.O_RDWR | os.O_TRUNC, + 0644, + ) + if err != nil { return } + defer f.Close() + defer os.Remove(filePath) + // Wait for the file to be fully written var fileLock sync.Mutex fileLock.Lock() - go o.Task.Collect(results, &fileLock) + go o.Task.Collect(results, f, &fileLock) // Wait for all jobs on remote to finish o.Wait.Wait() @@ -113,7 +124,7 @@ func (o *OD) Watch(results chan File) { fileLock.Unlock() // Upload results - err := PushResult(&o.Result) + err = PushResult(&o.Result, f) if err != nil { logrus.WithError(err). Error("Failed uploading result") @@ -123,8 +134,8 @@ func (o *OD) Watch(results chan File) { globalWait.Done() } -func (t *Task) Collect(results chan File, done *sync.Mutex) { - err := t.collect(results) +func (t *Task) Collect(results chan File, f *os.File, done *sync.Mutex) { + err := t.collect(results, f) if err != nil { logrus.WithError(err). Error("Failed saving crawl results") @@ -132,18 +143,10 @@ func (t *Task) Collect(results chan File, done *sync.Mutex) { done.Unlock() } -func (t *Task) collect(results chan File) error { +func (t *Task) collect(results chan File, f *os.File) error { err := os.MkdirAll("crawled", 0755) if err != nil { return err } - f, err := os.OpenFile( - path.Join("crawled", fmt.Sprintf("%d.json", t.WebsiteId)), - os.O_CREATE | os.O_WRONLY | os.O_TRUNC, - 0755, - ) - if err != nil { return err } - defer f.Close() - for result := range results { result.Path = fasturl.PathUnescape(result.Path) result.Name = fasturl.PathUnescape(result.Name) diff --git a/server.go b/server.go index 8430dac..25665b9 100644 --- a/server.go +++ b/server.go @@ -10,7 +10,6 @@ import ( "net/http" "net/url" "os" - "path/filepath" "strconv" ) @@ -43,26 +42,17 @@ func FetchTask() (t *Task, err error) { return } -func PushResult(result *TaskResult) (err error) { +func PushResult(result *TaskResult, f *os.File) (err error) { if result.WebsiteId == 0 { // Not a real result, don't push return nil } - filePath := filepath.Join( - ".", "crawled", - fmt.Sprintf("%d.json", result.WebsiteId)) - - defer os.Remove(filePath) - - f, err := os.Open(filePath) - if os.IsNotExist(err) { - err = fmt.Errorf("cannot upload result: %s does not exist", filePath) - return - } else if err != nil { + // Rewind to the beginning of the file + _, err = f.Seek(0, 0) + if err != nil { return } - defer f.Close() err = uploadChunks(result.WebsiteId, f) if err != nil {