Refactor uploading & chunk size parameter

This commit is contained in:
Richard Patel 2018-11-18 00:15:08 +01:00
parent 1e6687c519
commit 339175220d
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
4 changed files with 92 additions and 39 deletions

View File

@ -12,6 +12,7 @@ import (
var config struct { var config struct {
ServerUrl string ServerUrl string
Token string Token string
ChunkSize uint
Retries int Retries int
Workers int Workers int
Timeout time.Duration Timeout time.Duration
@ -25,6 +26,7 @@ var config struct {
const ( const (
ConfServerUrl = "server.url" ConfServerUrl = "server.url"
ConfToken = "server.token" ConfToken = "server.token"
ConfChunkSize = "server.upload_chunk"
ConfTasks = "crawl.tasks" ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries" ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections" ConfWorkers = "crawl.connections"
@ -44,6 +46,7 @@ func prepareConfig() {
viper.SetDefault(ConfAllocStats, 0) viper.SetDefault(ConfAllocStats, 0)
viper.SetDefault(ConfVerbose, false) viper.SetDefault(ConfVerbose, false)
viper.SetDefault(ConfPrintHTTP, false) viper.SetDefault(ConfPrintHTTP, false)
viper.SetDefault(ConfChunkSize, "1 MB")
} }
func readConfig() { func readConfig() {
@ -66,6 +69,11 @@ func readConfig() {
configMissing(ConfToken) configMissing(ConfToken)
} }
config.ChunkSize = viper.GetSizeInBytes(ConfChunkSize)
if config.ChunkSize < 100 {
configOOB(ConfChunkSize, config.ChunkSize)
}
config.Retries = viper.GetInt(ConfRetries) config.Retries = viper.GetInt(ConfRetries)
if config.Retries < 0 { if config.Retries < 0 {
config.Retries = 1 << 31 config.Retries = 1 << 31
@ -100,7 +108,7 @@ func configMissing(key string) {
os.Exit(1) os.Exit(1)
} }
func configOOB(key string, v int) { func configOOB(key string, v interface{}) {
fmt.Fprintf(os.Stderr, "config: illegal value %d for %key!\n", v, key) fmt.Fprintf(os.Stderr, "config: illegal value %v for key %s!\n", v, key)
os.Exit(1) os.Exit(1)
} }

View File

@ -4,6 +4,9 @@ server:
url: http://od-db.mine.terorie.com/api url: http://od-db.mine.terorie.com/api
# Server auth token # Server auth token
token: token:
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
# Log output settings # Log output settings
output: output:

View File

@ -40,6 +40,12 @@ func main() {
go func() { go func() {
log.Println(http.ListenAndServe("localhost:42069", nil)) log.Println(http.ListenAndServe("localhost:42069", nil))
}() }()
err := os.MkdirAll("crawled", 0755)
if err != nil {
panic(err)
}
app.Run(os.Args) app.Run(os.Args)
} }
@ -69,7 +75,7 @@ func cmdBase(_ *cli.Context) error {
} }
if t == nil { if t == nil {
// No new task // No new task
if atomic.LoadInt32(&activeTasks) == 0 { if atomic.LoadInt32(&numActiveTasks) == 0 {
logrus.Info("Waiting …") logrus.Info("Waiting …")
} }
continue continue

View File

@ -13,7 +13,9 @@ import (
"time" "time"
) )
var activeTasks int32 var activeTasksLock sync.Mutex
var activeTasks = make(map[uint64]bool)
var numActiveTasks int32
var totalBuffered int64 var totalBuffered int64
func Schedule(c context.Context, remotes <-chan *OD) { func Schedule(c context.Context, remotes <-chan *OD) {
@ -33,7 +35,7 @@ func Schedule(c context.Context, remotes <-chan *OD) {
} }
// Enqueue initial job // Enqueue initial job
atomic.AddInt32(&activeTasks, 1) atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{ remote.WCtx.queueJob(Job{
OD: remote, OD: remote,
Uri: remote.BaseUri, Uri: remote.BaseUri,
@ -45,7 +47,7 @@ func Schedule(c context.Context, remotes <-chan *OD) {
go remote.Watch(results) go remote.Watch(results)
// Sleep if max number of tasks are active // Sleep if max number of tasks are active
for atomic.LoadInt32(&activeTasks) > config.Tasks { for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
select { select {
case <-c.Done(): case <-c.Done():
return return
@ -57,6 +59,10 @@ func Schedule(c context.Context, remotes <-chan *OD) {
} }
func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
if !t.register() {
return
}
globalWait.Add(1) globalWait.Add(1)
now := time.Now() now := time.Now()
od := &OD { od := &OD {
@ -71,40 +77,87 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
remotes <- od remotes <- od
} }
func (t *Task) register() bool {
activeTasksLock.Lock()
defer activeTasksLock.Unlock()
if _, known := activeTasks[t.WebsiteId]; known {
return false
} else {
activeTasks[t.WebsiteId] = true
return true
}
}
func (t *Task) unregister() {
activeTasksLock.Lock()
delete(activeTasks, t.WebsiteId)
activeTasksLock.Unlock()
}
func (o *OD) Watch(results chan File) { func (o *OD) Watch(results chan File) {
// Mark job as completely done
defer globalWait.Done()
defer o.Task.unregister()
filePath := path.Join("crawled", fmt.Sprintf("%d.json", o.Task.WebsiteId)) filePath := path.Join("crawled", fmt.Sprintf("%d.json", o.Task.WebsiteId))
// Open crawl results file // Open crawl results file
// TODO Print errors
err := os.MkdirAll("crawled", 0755)
if err != nil { return }
f, err := os.OpenFile( f, err := os.OpenFile(
filePath, filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC, os.O_CREATE | os.O_RDWR | os.O_TRUNC,
0644, 0644,
) )
if err != nil {
if err != nil { return } logrus.WithError(err).
Error("Failed saving crawl results")
return
}
defer f.Close() defer f.Close()
defer os.Remove(filePath) defer os.Remove(filePath)
// Wait for the file to be fully written // Listen for exit code of Collect()
var fileLock sync.Mutex collectErrC := make(chan error)
fileLock.Lock()
go o.Task.Collect(results, f, &fileLock) // Block until all results are written
// (closes results channel)
o.handleCollect(results, f, collectErrC)
// Exit code of Collect()
err = <-collectErrC
close(collectErrC)
if err != nil {
logrus.WithError(err).
Error("Failed saving crawl results")
return
}
// Upload results
err = PushResult(&o.Result, f)
if err != nil {
logrus.WithError(err).
Error("Failed uploading crawl results")
return
}
}
func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) {
// Begin collecting results
go o.Task.Collect(results, f, collectErrC)
defer close(results)
// Wait for all jobs on remote to finish // Wait for all jobs on remote to finish
o.Wait.Wait() o.Wait.Wait()
close(o.WCtx.in) close(o.WCtx.in)
atomic.AddInt32(&activeTasks, -1) atomic.AddInt32(&numActiveTasks, -1)
// Log finish // Log finish
logrus. logrus.WithFields(logrus.Fields{
WithField("url", o.BaseUri.String()). "id": o.Task.WebsiteId,
WithField("duration", time.Since(o.Result.StartTime)). "url": o.BaseUri.String(),
Info("Crawler finished") "duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished")
// Set status code // Set status code
now := time.Now() now := time.Now()
@ -120,32 +173,15 @@ func (o *OD) Watch(results chan File) {
} else { } else {
o.Result.StatusCode = "success" o.Result.StatusCode = "success"
} }
// Shut down Collect()
close(results)
// Wait for results to sync to file
fileLock.Lock()
fileLock.Unlock()
// Upload results
err = PushResult(&o.Result, f)
if err != nil {
logrus.WithError(err).
Error("Failed uploading result")
} }
// Mark job as completely done func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) {
globalWait.Done()
}
func (t *Task) Collect(results chan File, f *os.File, done *sync.Mutex) {
err := t.collect(results, f) err := t.collect(results, f)
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed saving crawl results") Error("Failed saving crawl results")
} }
done.Unlock() errC <- err
} }
func (t *Task) collect(results chan File, f *os.File) error { func (t *Task) collect(results chan File, f *os.File) error {