From b1c40767e00259f52597c3d7b9105d0a50d4b4ea Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Sun, 28 Oct 2018 17:07:30 +0100 Subject: [PATCH] Remember scanned URLs --- config.go | 6 ++--- crawl.go | 12 ++------- main.go | 7 +++-- model.go | 32 +++++++++++++++++++++++ scheduler.go | 72 ++++++++++++++++++++++++++++------------------------ stats.go | 8 ------ worker.go | 13 ++++++---- 7 files changed, 89 insertions(+), 61 deletions(-) create mode 100644 model.go diff --git a/config.go b/config.go index ad63034..bb36103 100644 --- a/config.go +++ b/config.go @@ -11,7 +11,7 @@ var config struct { Token string Retries int Workers int - Tasks int + Tasks int32 StatsInterval time.Duration Verbose bool } @@ -62,9 +62,9 @@ func readConfig() { configOOB(ConfWorkers, config.Workers) } - config.Tasks = viper.GetInt(ConfTasks) + config.Tasks = viper.GetInt32(ConfTasks) if config.Tasks <= 0 { - configOOB(ConfTasks, config.Tasks) + configOOB(ConfTasks, int(config.Tasks)) } config.StatsInterval = viper.GetDuration(ConfStatsInterval) diff --git a/crawl.go b/crawl.go index 30e63d7..c8cdc10 100644 --- a/crawl.go +++ b/crawl.go @@ -12,7 +12,6 @@ import ( "path" "strconv" "strings" - "sync" "time" ) @@ -20,15 +19,8 @@ var client fasthttp.Client var ErrRateLimit = errors.New("too many requests") var ErrForbidden = errors.New("access denied") -type RemoteDir struct { - Wait sync.WaitGroup - BaseUri url.URL - lock sync.Mutex - Files []File -} - -func NewRemoteDir(u url.URL) *RemoteDir { - return &RemoteDir{ BaseUri: u } +func NewRemoteDir(u url.URL) *OD { + return &OD{ BaseUri: u } } func GetDir(j *Job, f *File) (links []url.URL, err error) { diff --git a/main.go b/main.go index ba8a427..db979b6 100644 --- a/main.go +++ b/main.go @@ -5,13 +5,16 @@ import ( "net/url" ) -func main() { +func init() { prepareConfig() +} + +func main() { readConfig() c := context.Background() - remotes := make(chan *RemoteDir) + remotes := make(chan *OD) go Schedule(c, remotes) u, _ := url.Parse("http://mine.terorie.com:420/") diff --git a/model.go b/model.go new file mode 100644 index 0000000..8bab19e --- /dev/null +++ b/model.go @@ -0,0 +1,32 @@ +package main + +import ( + "net/url" + "sync" + "time" +) + +type Job struct { + OD *OD + Uri url.URL + UriStr string + Fails int + LastError error +} + +type OD struct { + Wait sync.WaitGroup + BaseUri url.URL + lock sync.Mutex + Files []File + WCtx WorkerContext + Scanned sync.Map +} + +type File struct { + Name string `json:"name"` + Size int64 `json:"size"` + MTime time.Time `json:"mtime"` + Path string `json:"path"` + IsDir bool `json:"-"` +} diff --git a/scheduler.go b/scheduler.go index e6a9ca3..904677b 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,67 +2,68 @@ package main import ( "context" - "net/url" "sync/atomic" + "time" ) -type Job struct { - Remote *RemoteDir - Uri url.URL - UriStr string - Fails int - LastError error -} - var activeTasks int32 -func Schedule(c context.Context, remotes <-chan *RemoteDir) { - in, out := makeJobBuffer() - wCtx := WorkerContext{ - in: in, - out: out, - } - for i := 0; i < config.Workers; i++ { - go wCtx.Worker() - } +func Schedule(c context.Context, remotes <-chan *OD) { go Stats(c) for { select { case <-c.Done(): - close(in) return case remote := <-remotes: + for atomic.LoadInt32(&activeTasks) > config.Tasks { + select { + case <-time.After(time.Second): + break + case <-c.Done(): + return + } + } + + // Spawn workers + remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c) + for i := 0; i < config.Workers; i++ { + go remote.WCtx.Worker() + } + // Enqueue initial job atomic.AddInt32(&activeTasks, 1) - wCtx.queueJob(Job{ - Remote: remote, - Uri: remote.BaseUri, + remote.WCtx.queueJob(Job{ + OD: remote, + Uri: remote.BaseUri, UriStr: remote.BaseUri.String(), - Fails: 0, + Fails: 0, }) globalWait.Done() + // Upload result when ready go remote.Watch() } } } -func (r *RemoteDir) Watch() { +func (r *OD) Watch() { // Wait for all jobs on remote to finish r.Wait.Wait() + close(r.WCtx.in) atomic.AddInt32(&activeTasks, -1) } -func makeJobBuffer() (chan<- Job, <-chan Job) { +func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) { in := make(chan Job) out := make(chan Job) - go bufferJobs(in, out) + go bufferJobs(c, in, out) return in, out } -func bufferJobs(in chan Job, out chan Job) { +func bufferJobs(c context.Context, in chan Job, out chan Job) { + defer close(out) var inQueue []Job outCh := func() chan Job { if len(inQueue) == 0 { @@ -72,11 +73,15 @@ func bufferJobs(in chan Job, out chan Job) { } for len(inQueue) > 0 || in != nil { if len(inQueue) == 0 { - v, ok := <-in - if !ok { - in = nil - } else { - inQueue = append(inQueue, v) + select { + case v, ok := <-in: + if !ok { + in = nil + } else { + inQueue = append(inQueue, v) + } + case <-c.Done(): + return } } else { select { @@ -88,8 +93,9 @@ func bufferJobs(in chan Job, out chan Job) { } case outCh() <- inQueue[0]: inQueue = inQueue[1:] + case <-c.Done(): + return } } } - close(out) } diff --git a/stats.go b/stats.go index 1b12c27..f78e557 100644 --- a/stats.go +++ b/stats.go @@ -13,14 +13,6 @@ var totalDone uint64 var totalRetries uint64 var totalAborted uint64 -type File struct { - Name string `json:"name"` - Size int64 `json:"size"` - MTime time.Time `json:"mtime"` - Path string `json:"path"` - IsDir bool `json:"-"` -} - func Stats(c context.Context) { var startedLast uint64 = 0 ticker := time.NewTicker(config.StatsInterval).C diff --git a/worker.go b/worker.go index badee0e..49793b3 100644 --- a/worker.go +++ b/worker.go @@ -60,7 +60,7 @@ func (w WorkerContext) step(job Job) { w.queueJob(job) } - job.Remote.Files = append(job.Remote.Files, f) + job.OD.Files = append(job.OD.Files, f) } func DoJob(job *Job, f *File) (newJobs []Job, err error) { @@ -75,9 +75,12 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) { return nil, err } for _, link := range links { - job.Remote.Wait.Add(1) + if _, old := job.OD.Scanned.LoadOrStore(link, true); old { + continue + } + job.OD.Wait.Add(1) newJobs = append(newJobs, Job{ - Remote: job.Remote, + OD: job.OD, Uri: link, UriStr: link.String(), Fails: 0, @@ -100,7 +103,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) { } func (w WorkerContext) queueJob(job Job) { - job.Remote.Wait.Add(1) + job.OD.Wait.Add(1) globalWait.Add(1) if w.numRateLimits > 0 { @@ -117,6 +120,6 @@ func (w WorkerContext) queueJob(job Job) { } func (w WorkerContext) finishJob(job *Job) { - job.Remote.Wait.Done() + job.OD.Wait.Done() globalWait.Done() }