mirror of
https://github.com/terorie/od-database-crawler.git
synced 2025-04-19 18:36:43 +00:00
Fix WaitGroup crash
This commit is contained in:
parent
cea6c1658b
commit
dbd787aa81
8
main.go
8
main.go
@ -115,13 +115,7 @@ func cmdBase(_ *cobra.Command, _ []string) {
|
|||||||
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
|
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
|
||||||
// Not an error
|
// Not an error
|
||||||
err = nil
|
err = nil
|
||||||
|
// TODO FTP crawler
|
||||||
// Give back task
|
|
||||||
//err2 := CancelTask(t.WebsiteId)
|
|
||||||
//if err2 != nil {
|
|
||||||
// logrus.Error(err2)
|
|
||||||
//}
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
logrus.WithError(err).
|
logrus.WithError(err).
|
||||||
|
@ -30,6 +30,8 @@ func LoadResumeTasks(inRemotes chan<- *OD) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, remote := range resumed {
|
for _, remote := range resumed {
|
||||||
|
// TODO Cleanup globalWait management
|
||||||
|
globalWait.Add(1)
|
||||||
inRemotes <- remote
|
inRemotes <- remote
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
11
scheduler.go
11
scheduler.go
@ -141,7 +141,9 @@ func (o *OD) Watch(results chan File) {
|
|||||||
|
|
||||||
// Block until all results are written
|
// Block until all results are written
|
||||||
// (closes results channel)
|
// (closes results channel)
|
||||||
o.handleCollect(results, f, collectErrC)
|
if !o.handleCollect(results, f, collectErrC) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Exit code of Collect()
|
// Exit code of Collect()
|
||||||
err = <-collectErrC
|
err = <-collectErrC
|
||||||
@ -161,7 +163,8 @@ func (o *OD) Watch(results chan File) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) {
|
// Returns if aborted naturally (results ready for upload)
|
||||||
|
func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) bool {
|
||||||
// Begin collecting results
|
// Begin collecting results
|
||||||
go o.Task.Collect(results, f, collectErrC)
|
go o.Task.Collect(results, f, collectErrC)
|
||||||
defer close(results)
|
defer close(results)
|
||||||
@ -171,14 +174,14 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
|
|||||||
// Natural finish
|
// Natural finish
|
||||||
if atomic.LoadInt64(&o.InProgress) == 0 {
|
if atomic.LoadInt64(&o.InProgress) == 0 {
|
||||||
o.onTaskFinished()
|
o.onTaskFinished()
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
// Abort
|
// Abort
|
||||||
if atomic.LoadInt32(&o.WCtx.aborted) != 0 {
|
if atomic.LoadInt32(&o.WCtx.aborted) != 0 {
|
||||||
// Wait for all workers to finish
|
// Wait for all workers to finish
|
||||||
o.WCtx.workers.Wait()
|
o.WCtx.workers.Wait()
|
||||||
o.onTaskPaused()
|
o.onTaskPaused()
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user