Initial resume implementation

This commit is contained in:
Richard Patel 2019-02-03 15:02:07 +01:00
parent b18b70f798
commit a83eb0cfd7
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
7 changed files with 464 additions and 122 deletions

142
config.go
View File

@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"io"
"os"
@ -26,6 +27,8 @@ var config struct {
JobBufferSize int
}
var onlineMode bool
const (
ConfServerUrl = "server.url"
ConfToken = "server.token"
@ -43,6 +46,7 @@ const (
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfResume = "crawl.resume"
ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats"
@ -54,8 +58,58 @@ const (
func prepareConfig() {
pf := rootCmd.PersistentFlags()
bind := func(s string) {
if err := viper.BindPFlag(s, pf.Lookup(s)); err != nil {
pf.SortFlags = false
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
pf.Duration(ConfResume, 72 * time.Hour, "Crawler: Resume tasks not older than x")
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
pf.String(ConfLogFile, "crawler.log", "Log file")
// Bind all flags to Viper
pf.VisitAll(func(flag *pflag.Flag) {
s := flag.Name
s = strings.TrimLeft(s, "-")
if err := viper.BindPFlag(s, flag); err != nil {
panic(err)
}
var envKey string
@ -65,71 +119,7 @@ func prepareConfig() {
if err := viper.BindEnv(s, envKey); err != nil {
panic(err)
}
}
pf.SortFlags = false
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
bind(ConfServerUrl)
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
bind(ConfToken)
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
bind(ConfServerTimeout)
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
bind(ConfRecheck)
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
bind(ConfCooldown)
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
bind(ConfChunkSize)
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
bind(ConfUploadRetries)
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
bind(ConfUploadRetryInterval)
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
bind(ConfTasks)
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
bind(ConfWorkers)
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
bind(ConfRetries)
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
bind(ConfDialTimeout)
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
bind(ConfTimeout)
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
bind(ConfUserAgent)
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
bind(ConfJobBufferSize)
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
bind(ConfCrawlStats)
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
bind(ConfAllocStats)
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
bind(ConfVerbose)
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
bind(ConfPrintHTTP)
pf.String(ConfLogFile, "crawler.log", "Log file")
bind(ConfLogFile)
})
}
func readConfig() {
@ -157,15 +147,17 @@ func readConfig() {
}
}
config.ServerUrl = viper.GetString(ConfServerUrl)
if config.ServerUrl == "" {
configMissing(ConfServerUrl)
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
if onlineMode {
config.ServerUrl = viper.GetString(ConfServerUrl)
if config.ServerUrl == "" {
configMissing(ConfServerUrl)
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
}
}
config.ServerTimeout = viper.GetDuration(ConfServerTimeout)

View File

@ -15,7 +15,10 @@ package redblackhash
import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"sync"
)
@ -43,6 +46,13 @@ type Node struct {
Parent *Node
}
type nodeHeader struct {
Key *Key
Color color
}
var o = binary.BigEndian
func (k *Key) Compare(o *Key) int {
return bytes.Compare(k[:], o[:])
}
@ -233,7 +243,7 @@ func (tree *Tree) String() string {
}
func (node *Node) String() string {
return fmt.Sprintf("%v", node.Key)
return hex.EncodeToString(node.Key[:16]) + "..."
}
func output(node *Node, prefix string, isTail bool, str *string) {
@ -481,6 +491,119 @@ func (tree *Tree) deleteCase6(node *Node) {
}
}
func (tree *Tree) Marshal(w io.Writer) (err error) {
tree.Lock()
defer tree.Unlock()
err = binary.Write(w, o, uint64(0x617979797979790A))
if err != nil { return err }
err = marshal(tree.Root, w)
if err != nil { return err }
err = binary.Write(w, o, uint64(0x6C6D616F6F6F6F0A))
if err != nil { return err }
return nil
}
func marshal(n *Node, w io.Writer) (err error) {
if n == nil {
err = binary.Write(w, o, uint64(0x796565656565740A))
return err
}
err = binary.Write(w, o, uint64(0xF09F85B1EFB88F0A))
if err != nil { return err }
_, err = w.Write(n.Key[:])
if err != nil { return err }
var colorI uint64
if n.color {
colorI = 0x7468652D6579657C
} else {
colorI = 0x6865782B7465727C
}
err = binary.Write(w, o, colorI)
if err != nil { return err }
err = marshal(n.Left, w)
if err != nil { return err }
err = marshal(n.Right, w)
if err != nil { return err }
return nil
}
func (tree *Tree) Unmarshal(r io.Reader) (err error) {
tree.Lock()
defer tree.Unlock()
var sof uint64
err = binary.Read(r, o, &sof)
if err != nil { return err }
if sof != 0x617979797979790A {
return fmt.Errorf("redblack: wrong format")
}
tree.Root, tree.size, err = unmarshal(r)
if err != nil { return err }
var eof uint64
err = binary.Read(r, o, &eof)
if err != nil { return err }
if eof != 0x6C6D616F6F6F6F0A {
return fmt.Errorf("redblack: end of file missing")
}
return nil
}
func unmarshal(r io.Reader) (n *Node, size int, err error) {
var head uint64
err = binary.Read(r, o, &head)
if err != nil { return nil, 0, err }
size = 1
switch head {
case 0x796565656565740A:
return nil, 0, nil
case 0xF09F85B1EFB88F0A:
n = new(Node)
_, err = io.ReadFull(r, n.Key[:])
if err != nil { return nil, 0, err }
var colorInt uint64
err = binary.Read(r, o, &colorInt)
if err != nil { return nil, 0, err }
switch colorInt {
case 0x7468652D6579657C:
n.color = true
case 0x6865782B7465727C:
n.color = false
default:
return nil, 0, fmt.Errorf("redblack: corrupt node color")
}
default:
return nil, 0, fmt.Errorf("redblack: corrupt node info")
}
var s2 int
n.Left, s2, err = unmarshal(r)
size += s2
if err != nil { return nil, 0, err }
n.Right, s2, err = unmarshal(r)
size += s2
if err != nil { return nil, 0, err }
return n, size, nil
}
func nodeColor(node *Node) color {
if node == nil {
return black

View File

@ -0,0 +1,47 @@
package redblackhash
import (
"bytes"
"math/rand"
"testing"
)
func TestTree_Marshal(t *testing.T) {
var t1, t2 Tree
// Generate 1000 random values to insert
for i := 0; i < 1000; i++ {
var key Key
rand.Read(key[:])
t1.Put(&key)
}
// Marshal tree
var wr bytes.Buffer
err := t1.Marshal(&wr)
if err != nil {
t.Error(err)
t.FailNow()
}
buf := wr.Bytes()
rd := bytes.NewBuffer(buf)
// Unmarshal tree
err = t2.Unmarshal(rd)
if err != nil {
t.Error(err)
t.FailNow()
}
if !compare(t1.Root, t2.Root) {
t.Error("trees are not equal")
t.FailNow()
}
}
func compare(n1, n2 *Node) bool {
return n1.Key.Compare(&n2.Key) == 0 &&
(n1.Left == nil || compare(n1.Left, n2.Left)) &&
(n1.Right == nil || compare(n1.Right, n2.Right))
}

View File

@ -61,8 +61,6 @@ func preRun(cmd *cobra.Command, args []string) error {
if err := os.MkdirAll("queue", 0755);
err != nil { panic(err) }
readConfig()
return nil
}
@ -75,11 +73,15 @@ func main() {
}
func cmdBase(_ *cobra.Command, _ []string) {
onlineMode = true
readConfig()
// TODO Graceful shutdown
appCtx := context.Background()
forceCtx := context.Background()
inRemotes := make(chan *OD)
go LoadResumeTasks(inRemotes)
go Schedule(forceCtx, inRemotes)
ticker := time.NewTicker(config.Recheck)
@ -129,6 +131,9 @@ func cmdBase(_ *cobra.Command, _ []string) {
}
func cmdCrawler(_ *cobra.Command, args []string) error {
onlineMode = false
readConfig()
arg := args[0]
// https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") {

View File

@ -38,6 +38,12 @@ type OD struct {
Scanned redblackhash.Tree
}
type PausedOD struct {
Task *Task
Result *TaskResult
BaseUri *fasturl.URL
}
type File struct {
Name string `json:"name"`
Size int64 `json:"size"`

161
resume.go Normal file
View File

@ -0,0 +1,161 @@
package main
import (
"encoding/gob"
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"os"
"path/filepath"
"strconv"
"time"
)
func LoadResumeTasks(inRemotes chan<- *OD) {
resumed, err := ResumeTasks()
if err != nil {
logrus.WithError(err).
Error("Failed to resume queued tasks. " +
"/queue is probably corrupt")
err = nil
}
for _, remote := range resumed {
inRemotes <- remote
}
}
func ResumeTasks() (tasks []*OD, err error) {
// Get files in /queue
var queueF *os.File
var entries []os.FileInfo
queueF, err = os.Open("queue")
if err != nil { return nil, err }
defer queueF.Close()
entries, err = queueF.Readdir(-1)
if err != nil { return nil, err }
resumeDur := viper.GetDuration(ConfResume)
for _, entry := range entries {
if !entry.IsDir() { continue }
// Check if name is a number
var id uint64
if id, err = strconv.ParseUint(entry.Name(), 10, 64); err != nil {
continue
}
// Too old to be resumed
timeDelta := time.Since(entry.ModTime())
if resumeDur >= 0 && timeDelta > resumeDur {
removeOldQueue(id)
continue
}
// Load queue
var od *OD
if od, err = resumeQueue(id); err != nil {
logrus.WithError(err).
WithField("id", id).
Warning("Failed to load paused task")
continue
} else if od == nil {
removeOldQueue(id)
continue
}
tasks = append(tasks, od)
}
return tasks, nil
}
func resumeQueue(id uint64) (od *OD, err error) {
fPath := filepath.Join("queue", strconv.FormatUint(id, 10))
// Try to find pause file
pausedF, err := os.Open(filepath.Join(fPath, "PAUSED"))
if os.IsNotExist(err) {
// No PAUSED file => not paused
// not paused => no error
return nil, nil
} else if err != nil {
return nil, err
}
defer pausedF.Close()
od = new(OD)
od.WCtx.OD = od
// Make the paused struct point to OD fields
// So gob loads values into the OD struct
paused := PausedOD {
Task: &od.Task,
Result: &od.Result,
BaseUri: &od.BaseUri,
}
// Read pause settings
pauseDec := gob.NewDecoder(pausedF)
err = pauseDec.Decode(&paused)
if err != nil { return nil, err }
// Read pause scan state
err = od.Scanned.Unmarshal(pausedF)
if err != nil { return nil, err }
// Open queue
bq, err := OpenQueue(fPath)
if err != nil { return nil, err }
od.WCtx.Queue = bq
return od, nil
}
func removeOldQueue(id uint64) {
if id == 0 {
// TODO Make custom crawl less of an ugly hack
return
}
logrus.WithField("id", id).
Warning("Deleting & returning old task")
name := strconv.FormatUint(id, 10)
fPath := filepath.Join("queue", name)
// Acquire old queue
q, err := goque.OpenQueue(fPath)
if err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("path", fPath).
Error("Failed to acquire old task")
return
}
// Delete old queue from disk
err = q.Drop()
if err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("path", fPath).
Error("Failed to delete old task")
return
}
// Delete old crawl result from disk
_ = os.Remove(filepath.Join("crawled", name + ".json"))
// Return task to server
if err := CancelTask(id); err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("id", id).
Warning("Failed to return unfinished task to server")
return
}
}

View File

@ -22,54 +22,62 @@ func Schedule(c context.Context, remotes <-chan *OD) {
go Stats(c)
for remote := range remotes {
logrus.WithField("url", remote.BaseUri.String()).
Info("Starting crawler")
// Collect results
results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers
for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker(results)
}
// Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
// Upload result when ready
go remote.Watch(results)
// Sleep if max number of tasks are active
for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
select {
case <-c.Done():
return
case <-time.After(time.Second):
continue
}
if !scheduleNewTask(c, remote) {
return
}
}
}
func scheduleNewTask(c context.Context, remote *OD) bool {
logrus.WithField("url", remote.BaseUri.String()).
Info("Starting crawler")
// Collect results
results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers
for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker(results)
}
// Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
// Upload result when ready
go remote.Watch(results)
// Sleep if max number of tasks are active
for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
select {
case <-c.Done():
return false
case <-time.After(time.Second):
return true
}
}
return true
}
func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
if !t.register() {
return