Remember scanned URLs

This commit is contained in:
Richard Patel 2018-10-28 17:07:30 +01:00
parent c196b6f20d
commit b1c40767e0
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
7 changed files with 89 additions and 61 deletions

View File

@ -11,7 +11,7 @@ var config struct {
Token string Token string
Retries int Retries int
Workers int Workers int
Tasks int Tasks int32
StatsInterval time.Duration StatsInterval time.Duration
Verbose bool Verbose bool
} }
@ -62,9 +62,9 @@ func readConfig() {
configOOB(ConfWorkers, config.Workers) configOOB(ConfWorkers, config.Workers)
} }
config.Tasks = viper.GetInt(ConfTasks) config.Tasks = viper.GetInt32(ConfTasks)
if config.Tasks <= 0 { if config.Tasks <= 0 {
configOOB(ConfTasks, config.Tasks) configOOB(ConfTasks, int(config.Tasks))
} }
config.StatsInterval = viper.GetDuration(ConfStatsInterval) config.StatsInterval = viper.GetDuration(ConfStatsInterval)

View File

@ -12,7 +12,6 @@ import (
"path" "path"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
) )
@ -20,15 +19,8 @@ var client fasthttp.Client
var ErrRateLimit = errors.New("too many requests") var ErrRateLimit = errors.New("too many requests")
var ErrForbidden = errors.New("access denied") var ErrForbidden = errors.New("access denied")
type RemoteDir struct { func NewRemoteDir(u url.URL) *OD {
Wait sync.WaitGroup return &OD{ BaseUri: u }
BaseUri url.URL
lock sync.Mutex
Files []File
}
func NewRemoteDir(u url.URL) *RemoteDir {
return &RemoteDir{ BaseUri: u }
} }
func GetDir(j *Job, f *File) (links []url.URL, err error) { func GetDir(j *Job, f *File) (links []url.URL, err error) {

View File

@ -5,13 +5,16 @@ import (
"net/url" "net/url"
) )
func main() { func init() {
prepareConfig() prepareConfig()
}
func main() {
readConfig() readConfig()
c := context.Background() c := context.Background()
remotes := make(chan *RemoteDir) remotes := make(chan *OD)
go Schedule(c, remotes) go Schedule(c, remotes)
u, _ := url.Parse("http://mine.terorie.com:420/") u, _ := url.Parse("http://mine.terorie.com:420/")

32
model.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
"net/url"
"sync"
"time"
)
type Job struct {
OD *OD
Uri url.URL
UriStr string
Fails int
LastError error
}
type OD struct {
Wait sync.WaitGroup
BaseUri url.URL
lock sync.Mutex
Files []File
WCtx WorkerContext
Scanned sync.Map
}
type File struct {
Name string `json:"name"`
Size int64 `json:"size"`
MTime time.Time `json:"mtime"`
Path string `json:"path"`
IsDir bool `json:"-"`
}

View File

@ -2,67 +2,68 @@ package main
import ( import (
"context" "context"
"net/url"
"sync/atomic" "sync/atomic"
"time"
) )
type Job struct {
Remote *RemoteDir
Uri url.URL
UriStr string
Fails int
LastError error
}
var activeTasks int32 var activeTasks int32
func Schedule(c context.Context, remotes <-chan *RemoteDir) { func Schedule(c context.Context, remotes <-chan *OD) {
in, out := makeJobBuffer()
wCtx := WorkerContext{
in: in,
out: out,
}
for i := 0; i < config.Workers; i++ {
go wCtx.Worker()
}
go Stats(c) go Stats(c)
for { for {
select { select {
case <-c.Done(): case <-c.Done():
close(in)
return return
case remote := <-remotes: case remote := <-remotes:
for atomic.LoadInt32(&activeTasks) > config.Tasks {
select {
case <-time.After(time.Second):
break
case <-c.Done():
return
}
}
// Spawn workers
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker()
}
// Enqueue initial job // Enqueue initial job
atomic.AddInt32(&activeTasks, 1) atomic.AddInt32(&activeTasks, 1)
wCtx.queueJob(Job{ remote.WCtx.queueJob(Job{
Remote: remote, OD: remote,
Uri: remote.BaseUri, Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(), UriStr: remote.BaseUri.String(),
Fails: 0, Fails: 0,
}) })
globalWait.Done() globalWait.Done()
// Upload result when ready // Upload result when ready
go remote.Watch() go remote.Watch()
} }
} }
} }
func (r *RemoteDir) Watch() { func (r *OD) Watch() {
// Wait for all jobs on remote to finish // Wait for all jobs on remote to finish
r.Wait.Wait() r.Wait.Wait()
close(r.WCtx.in)
atomic.AddInt32(&activeTasks, -1) atomic.AddInt32(&activeTasks, -1)
} }
func makeJobBuffer() (chan<- Job, <-chan Job) { func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {
in := make(chan Job) in := make(chan Job)
out := make(chan Job) out := make(chan Job)
go bufferJobs(in, out) go bufferJobs(c, in, out)
return in, out return in, out
} }
func bufferJobs(in chan Job, out chan Job) { func bufferJobs(c context.Context, in chan Job, out chan Job) {
defer close(out)
var inQueue []Job var inQueue []Job
outCh := func() chan Job { outCh := func() chan Job {
if len(inQueue) == 0 { if len(inQueue) == 0 {
@ -72,11 +73,15 @@ func bufferJobs(in chan Job, out chan Job) {
} }
for len(inQueue) > 0 || in != nil { for len(inQueue) > 0 || in != nil {
if len(inQueue) == 0 { if len(inQueue) == 0 {
v, ok := <-in select {
if !ok { case v, ok := <-in:
in = nil if !ok {
} else { in = nil
inQueue = append(inQueue, v) } else {
inQueue = append(inQueue, v)
}
case <-c.Done():
return
} }
} else { } else {
select { select {
@ -88,8 +93,9 @@ func bufferJobs(in chan Job, out chan Job) {
} }
case outCh() <- inQueue[0]: case outCh() <- inQueue[0]:
inQueue = inQueue[1:] inQueue = inQueue[1:]
case <-c.Done():
return
} }
} }
} }
close(out)
} }

View File

@ -13,14 +13,6 @@ var totalDone uint64
var totalRetries uint64 var totalRetries uint64
var totalAborted uint64 var totalAborted uint64
type File struct {
Name string `json:"name"`
Size int64 `json:"size"`
MTime time.Time `json:"mtime"`
Path string `json:"path"`
IsDir bool `json:"-"`
}
func Stats(c context.Context) { func Stats(c context.Context) {
var startedLast uint64 = 0 var startedLast uint64 = 0
ticker := time.NewTicker(config.StatsInterval).C ticker := time.NewTicker(config.StatsInterval).C

View File

@ -60,7 +60,7 @@ func (w WorkerContext) step(job Job) {
w.queueJob(job) w.queueJob(job)
} }
job.Remote.Files = append(job.Remote.Files, f) job.OD.Files = append(job.OD.Files, f)
} }
func DoJob(job *Job, f *File) (newJobs []Job, err error) { func DoJob(job *Job, f *File) (newJobs []Job, err error) {
@ -75,9 +75,12 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
return nil, err return nil, err
} }
for _, link := range links { for _, link := range links {
job.Remote.Wait.Add(1) if _, old := job.OD.Scanned.LoadOrStore(link, true); old {
continue
}
job.OD.Wait.Add(1)
newJobs = append(newJobs, Job{ newJobs = append(newJobs, Job{
Remote: job.Remote, OD: job.OD,
Uri: link, Uri: link,
UriStr: link.String(), UriStr: link.String(),
Fails: 0, Fails: 0,
@ -100,7 +103,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
} }
func (w WorkerContext) queueJob(job Job) { func (w WorkerContext) queueJob(job Job) {
job.Remote.Wait.Add(1) job.OD.Wait.Add(1)
globalWait.Add(1) globalWait.Add(1)
if w.numRateLimits > 0 { if w.numRateLimits > 0 {
@ -117,6 +120,6 @@ func (w WorkerContext) queueJob(job Job) {
} }
func (w WorkerContext) finishJob(job *Job) { func (w WorkerContext) finishJob(job *Job) {
job.Remote.Wait.Done() job.OD.Wait.Done()
globalWait.Done() globalWait.Done()
} }