diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..507b440 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM golang:1.11.5 AS builder +ADD . /src +RUN cd /src \ + && go test . \ + && go build -o binary + +FROM alpine +WORKDIR /app +COPY --from=builder /src/binary /app/ +ENTRYPOINT ./binary diff --git a/config.go b/config.go index 113561a..b8c15ef 100644 --- a/config.go +++ b/config.go @@ -4,7 +4,6 @@ import ( "bufio" "fmt" "github.com/sirupsen/logrus" - "github.com/spf13/pflag" "github.com/spf13/viper" "io" "os" @@ -27,8 +26,6 @@ var config struct { JobBufferSize int } -var onlineMode bool - const ( ConfServerUrl = "server.url" ConfToken = "server.token" @@ -46,7 +43,6 @@ const ( ConfDialTimeout = "crawl.dial_timeout" ConfTimeout = "crawl.timeout" ConfJobBufferSize = "crawl.job_buffer" - ConfResume = "crawl.resume" ConfCrawlStats = "output.crawl_stats" ConfAllocStats = "output.resource_stats" @@ -58,58 +54,8 @@ const ( func prepareConfig() { pf := rootCmd.PersistentFlags() - pf.SortFlags = false - pf.StringVar(&configFile, "config", "", "Config file") - configFile = os.Getenv("OD_CONFIG") - - pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL") - - pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)") - - pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout") - - pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs") - - pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error") - - pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size") - - pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries") - - pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries") - - pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks") - - pf.Uint(ConfWorkers, 4, "Crawler: Connections per server") - - pf.Uint(ConfRetries, 5, "Crawler: Request retries") - - pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout") - - pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout") - - pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent") - - pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size") - - pf.Duration(ConfResume, 0, "Crawler: Resume tasks not older than x") - - pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval") - - pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval") - - pf.Bool(ConfVerbose, false, "Log: Print every listed dir") - - pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors") - - pf.String(ConfLogFile, "crawler.log", "Log file") - - // Bind all flags to Viper - pf.VisitAll(func(flag *pflag.Flag) { - s := flag.Name - s = strings.TrimLeft(s, "-") - - if err := viper.BindPFlag(s, flag); err != nil { + bind := func(s string) { + if err := viper.BindPFlag(s, pf.Lookup(s)); err != nil { panic(err) } var envKey string @@ -119,7 +65,71 @@ func prepareConfig() { if err := viper.BindEnv(s, envKey); err != nil { panic(err) } - }) + } + + pf.SortFlags = false + pf.StringVar(&configFile, "config", "", "Config file") + configFile = os.Getenv("OD_CONFIG") + + pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL") + bind(ConfServerUrl) + + pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)") + bind(ConfToken) + + pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout") + bind(ConfServerTimeout) + + pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs") + bind(ConfRecheck) + + pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error") + bind(ConfCooldown) + + pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size") + bind(ConfChunkSize) + + pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries") + bind(ConfUploadRetries) + + pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries") + bind(ConfUploadRetryInterval) + + pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks") + bind(ConfTasks) + + pf.Uint(ConfWorkers, 4, "Crawler: Connections per server") + bind(ConfWorkers) + + pf.Uint(ConfRetries, 5, "Crawler: Request retries") + bind(ConfRetries) + + pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout") + bind(ConfDialTimeout) + + pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout") + bind(ConfTimeout) + + pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent") + bind(ConfUserAgent) + + pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size") + bind(ConfJobBufferSize) + + pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval") + bind(ConfCrawlStats) + + pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval") + bind(ConfAllocStats) + + pf.Bool(ConfVerbose, false, "Log: Print every listed dir") + bind(ConfVerbose) + + pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors") + bind(ConfPrintHTTP) + + pf.String(ConfLogFile, "crawler.log", "Log file") + bind(ConfLogFile) } func readConfig() { @@ -147,17 +157,15 @@ func readConfig() { } } - if onlineMode { - config.ServerUrl = viper.GetString(ConfServerUrl) - if config.ServerUrl == "" { - configMissing(ConfServerUrl) - } - config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") + config.ServerUrl = viper.GetString(ConfServerUrl) + if config.ServerUrl == "" { + configMissing(ConfServerUrl) + } + config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") - config.Token = viper.GetString(ConfToken) - if config.Token == "" { - configMissing(ConfToken) - } + config.Token = viper.GetString(ConfToken) + if config.Token == "" { + configMissing(ConfToken) } config.ServerTimeout = viper.GetDuration(ConfServerTimeout) diff --git a/config.yml b/config.yml index 0dd96c5..a420c80 100644 --- a/config.yml +++ b/config.yml @@ -82,8 +82,3 @@ crawl: # A negative value will cause all jobs # to be stored in memory. (Don't do this) job_buffer: 5000 - - # Resume jobs with max age - # WARNING: Unstable and buggy feature, - # set to 0 until stable. - resume: 0s diff --git a/ds/redblackhash/redblack.go b/ds/redblackhash/redblack.go index 6b5b1ed..95084c2 100644 --- a/ds/redblackhash/redblack.go +++ b/ds/redblackhash/redblack.go @@ -15,10 +15,7 @@ package redblackhash import ( "bytes" - "encoding/binary" - "encoding/hex" "fmt" - "io" "sync" ) @@ -46,13 +43,6 @@ type Node struct { Parent *Node } -type nodeHeader struct { - Key *Key - Color color -} - -var o = binary.BigEndian - func (k *Key) Compare(o *Key) int { return bytes.Compare(k[:], o[:]) } @@ -243,7 +233,7 @@ func (tree *Tree) String() string { } func (node *Node) String() string { - return hex.EncodeToString(node.Key[:16]) + "..." + return fmt.Sprintf("%v", node.Key) } func output(node *Node, prefix string, isTail bool, str *string) { @@ -491,119 +481,6 @@ func (tree *Tree) deleteCase6(node *Node) { } } -func (tree *Tree) Marshal(w io.Writer) (err error) { - tree.Lock() - defer tree.Unlock() - - err = binary.Write(w, o, uint64(0x617979797979790A)) - if err != nil { return err } - - err = marshal(tree.Root, w) - if err != nil { return err } - - err = binary.Write(w, o, uint64(0x6C6D616F6F6F6F0A)) - if err != nil { return err } - - return nil -} - -func marshal(n *Node, w io.Writer) (err error) { - if n == nil { - err = binary.Write(w, o, uint64(0x796565656565740A)) - return err - } - - err = binary.Write(w, o, uint64(0xF09F85B1EFB88F0A)) - if err != nil { return err } - - _, err = w.Write(n.Key[:]) - if err != nil { return err } - - var colorI uint64 - if n.color { - colorI = 0x7468652D6579657C - } else { - colorI = 0x6865782B7465727C - } - - err = binary.Write(w, o, colorI) - if err != nil { return err } - - err = marshal(n.Left, w) - if err != nil { return err } - - err = marshal(n.Right, w) - if err != nil { return err } - - return nil -} - -func (tree *Tree) Unmarshal(r io.Reader) (err error) { - tree.Lock() - defer tree.Unlock() - - var sof uint64 - err = binary.Read(r, o, &sof) - if err != nil { return err } - if sof != 0x617979797979790A { - return fmt.Errorf("redblack: wrong format") - } - - tree.Root, tree.size, err = unmarshal(r) - if err != nil { return err } - - var eof uint64 - err = binary.Read(r, o, &eof) - if err != nil { return err } - if eof != 0x6C6D616F6F6F6F0A { - return fmt.Errorf("redblack: end of file missing") - } - - return nil -} - -func unmarshal(r io.Reader) (n *Node, size int, err error) { - var head uint64 - err = binary.Read(r, o, &head) - if err != nil { return nil, 0, err } - - size = 1 - - switch head { - case 0x796565656565740A: - return nil, 0, nil - case 0xF09F85B1EFB88F0A: - n = new(Node) - - _, err = io.ReadFull(r, n.Key[:]) - if err != nil { return nil, 0, err } - - var colorInt uint64 - err = binary.Read(r, o, &colorInt) - if err != nil { return nil, 0, err } - switch colorInt { - case 0x7468652D6579657C: - n.color = true - case 0x6865782B7465727C: - n.color = false - default: - return nil, 0, fmt.Errorf("redblack: corrupt node color") - } - default: - return nil, 0, fmt.Errorf("redblack: corrupt node info") - } - - var s2 int - n.Left, s2, err = unmarshal(r) - size += s2 - if err != nil { return nil, 0, err } - n.Right, s2, err = unmarshal(r) - size += s2 - if err != nil { return nil, 0, err } - - return n, size, nil -} - func nodeColor(node *Node) color { if node == nil { return black diff --git a/ds/redblackhash/redblack_test.go b/ds/redblackhash/redblack_test.go deleted file mode 100644 index 9944880..0000000 --- a/ds/redblackhash/redblack_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package redblackhash - -import ( - "bytes" - "math/rand" - "testing" -) - -func TestTree_Marshal(t *testing.T) { - var t1, t2 Tree - - // Generate 1000 random values to insert - for i := 0; i < 1000; i++ { - var key Key - rand.Read(key[:]) - t1.Put(&key) - } - - // Marshal tree - var wr bytes.Buffer - err := t1.Marshal(&wr) - if err != nil { - t.Error(err) - t.FailNow() - } - - buf := wr.Bytes() - rd := bytes.NewBuffer(buf) - - // Unmarshal tree - err = t2.Unmarshal(rd) - if err != nil { - t.Error(err) - t.FailNow() - } - - if !compare(t1.Root, t2.Root) { - t.Error("trees are not equal") - t.FailNow() - } -} - -func compare(n1, n2 *Node) bool { - return n1.Key.Compare(&n2.Key) == 0 && - (n1.Left == nil || compare(n1.Left, n2.Left)) && - (n1.Right == nil || compare(n1.Right, n2.Right)) -} diff --git a/go.mod b/go.mod index aaa2fba..9eeb244 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/syndtr/od-database-crawler +module github.com/terorie/od-database-crawler require ( github.com/beeker1121/goque v2.0.1+incompatible @@ -7,7 +7,6 @@ require ( github.com/spf13/cobra v0.0.3 github.com/spf13/viper v1.3.1 github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect - github.com/terorie/od-database-crawler v1.1.1 github.com/valyala/fasthttp v1.1.0 golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 diff --git a/go.sum b/go.sum index 498b38a..8d675d1 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,6 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs= github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= -github.com/terorie/od-database-crawler v1.1.1 h1:Ca+ZqbZX3rVWBR8SDRzvroyxjBtUs75MQXZ9YG0gqGo= -github.com/terorie/od-database-crawler v1.1.1/go.mod h1:vVJ7pLkudrlUNp9qu24JCzQ8N6mFsrOmX1tPXr155DQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= diff --git a/main.go b/main.go index b9b4c96..e2d67ac 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,6 @@ import ( "github.com/spf13/viper" "github.com/terorie/od-database-crawler/fasturl" "os" - "os/signal" "strings" "sync/atomic" "time" @@ -62,6 +61,8 @@ func preRun(cmd *cobra.Command, args []string) error { if err := os.MkdirAll("queue", 0755); err != nil { panic(err) } + readConfig() + return nil } @@ -74,32 +75,25 @@ func main() { } func cmdBase(_ *cobra.Command, _ []string) { - onlineMode = true - readConfig() - - appCtx, soft := context.WithCancel(context.Background()) - forceCtx, hard := context.WithCancel(context.Background()) - go hardShutdown(forceCtx) - go listenCtrlC(soft, hard) + // TODO Graceful shutdown + appCtx := context.Background() + forceCtx := context.Background() inRemotes := make(chan *OD) - go LoadResumeTasks(inRemotes) - go Schedule(appCtx, inRemotes) + go Schedule(forceCtx, inRemotes) ticker := time.NewTicker(config.Recheck) defer ticker.Stop() for { select { case <-appCtx.Done(): - goto shutdown + return case <-ticker.C: t, err := FetchTask() if err != nil { logrus.WithError(err). Error("Failed to get new task") - if !sleep(viper.GetDuration(ConfCooldown), appCtx) { - goto shutdown - } + time.Sleep(viper.GetDuration(ConfCooldown)) continue } if t == nil { @@ -115,7 +109,13 @@ func cmdBase(_ *cobra.Command, _ []string) { if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme { // Not an error err = nil - // TODO FTP crawler + + // Give back task + //err2 := CancelTask(t.WebsiteId) + //if err2 != nil { + // logrus.Error(err2) + //} + continue } else if err != nil { logrus.WithError(err). @@ -126,15 +126,9 @@ func cmdBase(_ *cobra.Command, _ []string) { ScheduleTask(inRemotes, t, &baseUri) } } - - shutdown: - globalWait.Wait() } func cmdCrawler(_ *cobra.Command, args []string) error { - onlineMode = false - readConfig() - arg := args[0] // https://github.com/golang/go/issues/19779 if !strings.Contains(arg, "://") { @@ -167,30 +161,3 @@ func cmdCrawler(_ *cobra.Command, args []string) error { return nil } - -func listenCtrlC(soft, hard context.CancelFunc) { - c := make(chan os.Signal) - signal.Notify(c, os.Interrupt) - - <-c - logrus.Info(">>> Shutting down crawler... <<<") - soft() - - <-c - logrus.Warning(">>> Force shutdown! <<<") - hard() -} - -func hardShutdown(c context.Context) { - <-c.Done() - os.Exit(1) -} - -func sleep(d time.Duration, c context.Context) bool { - select { - case <-time.After(d): - return true - case <-c.Done(): - return false - } -} diff --git a/model.go b/model.go index 7a0a907..0b24e91 100644 --- a/model.go +++ b/model.go @@ -3,6 +3,7 @@ package main import ( "github.com/terorie/od-database-crawler/ds/redblackhash" "github.com/terorie/od-database-crawler/fasturl" + "sync" "time" ) @@ -29,19 +30,12 @@ type Job struct { } type OD struct { - Task Task - Result TaskResult - InProgress int64 - BaseUri fasturl.URL - WCtx WorkerContext - Scanned redblackhash.Tree -} - -type PausedOD struct { - Task *Task - Result *TaskResult - BaseUri *fasturl.URL - InProgress int64 + Task Task + Result TaskResult + Wait sync.WaitGroup + BaseUri fasturl.URL + WCtx WorkerContext + Scanned redblackhash.Tree } type File struct { diff --git a/resume.go b/resume.go deleted file mode 100644 index e5da8bf..0000000 --- a/resume.go +++ /dev/null @@ -1,270 +0,0 @@ -package main - -import ( - "bytes" - "encoding/binary" - "encoding/gob" - "fmt" - "github.com/beeker1121/goque" - "github.com/sirupsen/logrus" - "github.com/spf13/viper" - "io" - "os" - "path/filepath" - "strconv" - "sync/atomic" - "time" -) - -func init() { - gob.Register(&PausedOD{}) -} - -func LoadResumeTasks(inRemotes chan<- *OD) { - resumed, err := ResumeTasks() - if err != nil { - logrus.WithError(err). - Error("Failed to resume queued tasks. " + - "/queue is probably corrupt") - err = nil - } - - for _, remote := range resumed { - inRemotes <- remote - } -} - -func ResumeTasks() (tasks []*OD, err error) { - // Get files in /queue - var queueF *os.File - var entries []os.FileInfo - queueF, err = os.Open("queue") - if err != nil { return nil, err } - defer queueF.Close() - entries, err = queueF.Readdir(-1) - if err != nil { return nil, err } - - resumeDur := viper.GetDuration(ConfResume) - - for _, entry := range entries { - if !entry.IsDir() { continue } - - // Check if name is a number - var id uint64 - if id, err = strconv.ParseUint(entry.Name(), 10, 64); err != nil { - continue - } - - // Too old to be resumed - timeDelta := time.Since(entry.ModTime()) - if resumeDur >= 0 && timeDelta > resumeDur { - removeOldQueue(id) - continue - } - - // Load queue - var od *OD - if od, err = resumeQueue(id); err != nil { - logrus.WithError(err). - WithField("id", id). - Warning("Failed to load paused task") - continue - } else if od == nil { - removeOldQueue(id) - continue - } - - tasks = append(tasks, od) - } - - return tasks, nil -} - -func SaveTask(od *OD) (err error) { - dir := filepath.Join("queue", - strconv.FormatUint(od.Task.WebsiteId, 10)) - - fPath := filepath.Join(dir, "PAUSED") - - err = os.Mkdir(dir, 0777) - if err != nil { return err } - - // Open pause file - pausedF, err := os.OpenFile(fPath, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0666) - if err != nil { return err } - defer pausedF.Close() - - err = writePauseFile(od, pausedF) - if err != nil { return err } - - return nil -} - -func resumeQueue(id uint64) (od *OD, err error) { - logrus.WithField("id", id). - Info("Found unfinished") - - fPath := filepath.Join("queue", strconv.FormatUint(id, 10)) - - // Try to find pause file - pausedF, err := os.Open(filepath.Join(fPath, "PAUSED")) - if os.IsNotExist(err) { - // No PAUSED file => not paused - // not paused => no error - return nil, nil - } else if err != nil { - return nil, err - } - defer pausedF.Close() - - od = new(OD) - od.WCtx.OD = od - - err = readPauseFile(od, pausedF) - if err != nil { return nil, err } - - // Open queue - bq, err := OpenQueue(fPath) - if err != nil { return nil, err } - - od.WCtx.Queue = bq - - logrus.WithField("id", id). - Info("Resuming task") - - return od, nil -} - -func removeOldQueue(id uint64) { - if id == 0 { - // TODO Make custom crawl less of an ugly hack - return - } - - logrus.WithField("id", id). - Warning("Deleting & returning old task") - - name := strconv.FormatUint(id, 10) - - fPath := filepath.Join("queue", name) - - // Acquire old queue - q, err := goque.OpenQueue(fPath) - if err != nil { - // Queue lock exists, don't delete - logrus.WithField("err", err). - WithField("path", fPath). - Error("Failed to acquire old task") - return - } - - // Delete old queue from disk - err = q.Drop() - if err != nil { - // Queue lock exists, don't delete - logrus.WithField("err", err). - WithField("path", fPath). - Error("Failed to delete old task") - return - } - - // Delete old crawl result from disk - _ = os.Remove(filepath.Join("crawled", name + ".json")) - - // Return task to server - if err := CancelTask(id); err != nil { - // Queue lock exists, don't delete - logrus.WithField("err", err). - WithField("id", id). - Warning("Failed to return unfinished task to server") - return - } -} - -func writePauseFile(od *OD, w io.Writer) (err error) { - // Write pause file version - _, err = w.Write([]byte("ODPAUSE-")) - if err != nil { return err } - - // Create save state - paused := PausedOD { - Task: &od.Task, - Result: &od.Result, - BaseUri: &od.BaseUri, - InProgress: atomic.LoadInt64(&od.InProgress), - } - - // Prepare pause settings - var b bytes.Buffer - pauseEnc := gob.NewEncoder(&b) - err = pauseEnc.Encode(&paused) - if err != nil { return err } - - // Write length of pause settings - err = binary.Write(w, binary.LittleEndian, uint64(b.Len())) - if err != nil { return err } - - // Write pause settings - _, err = w.Write(b.Bytes()) - if err != nil { return err } - - // Write pause scan state - err = od.Scanned.Marshal(w) - if err != nil { return err } - - // Save mark - _, err = w.Write([]byte("--------")) - if err != nil { return err } - - return nil -} - -func readPauseFile(od *OD, r io.Reader) (err error) { - // Make the paused struct point to OD fields - // So gob loads values into the OD struct - paused := PausedOD { - Task: &od.Task, - Result: &od.Result, - BaseUri: &od.BaseUri, - } - - var version [8]byte - _, err = io.ReadFull(r, version[:]) - if err != nil { return err } - if !bytes.Equal(version[:], []byte("ODPAUSE-")) { - return fmt.Errorf("unsupported pause file") - } - - // Read pause settings len - var pauseSettingsLen uint64 - err = binary.Read(r, binary.LittleEndian, &pauseSettingsLen) - - // Read pause settings - pauseDec := gob.NewDecoder(io.LimitReader(r, int64(pauseSettingsLen))) - err = pauseDec.Decode(&paused) - if err != nil { return err } - atomic.StoreInt64(&od.InProgress, paused.InProgress) - - err = readPauseStateTree(od, r) - if err != nil { - return fmt.Errorf("failed to read state tree: %s", err) - } - - return nil -} - -func readPauseStateTree(od *OD, r io.Reader) (err error) { - // Read pause scan state - err = od.Scanned.Unmarshal(r) - if err != nil { return err } - - // Check mark - var mark [8]byte - _, err = io.ReadFull(r, mark[:]) - if err != nil { return err } - if !bytes.Equal(mark[:], []byte("--------")) { - return fmt.Errorf("corrupt pause file") - } - - return nil -} diff --git a/resume_test.go b/resume_test.go deleted file mode 100644 index ce737bb..0000000 --- a/resume_test.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -import ( - "bytes" - "github.com/terorie/od-database-crawler/fasturl" - "testing" - "time" -) - -func TestResumeTasks_Empty(t *testing.T) { - start := time.Now().Add(-1 * time.Minute) - od := OD { - Task: Task { - WebsiteId: 213, - Url: "https://the-eye.eu/public/", - }, - Result: TaskResult { - StartTime: start, - StartTimeUnix: start.Unix(), - EndTimeUnix: time.Now().Unix(), - WebsiteId: 213, - }, - InProgress: 0, - BaseUri: fasturl.URL { - Scheme: fasturl.SchemeHTTPS, - Host: "the-eye.eu", - Path: "/public/", - }, - } - od.WCtx.OD = &od - - var b bytes.Buffer - var err error - err = writePauseFile(&od, &b) - if err != nil { - t.Fatal(err) - } - - buf := b.Bytes() - - var od2 OD - - b2 := bytes.NewBuffer(buf) - err = readPauseFile(&od2, b2) - if err != nil { - t.Fatal(err) - } -} diff --git a/scheduler.go b/scheduler.go index d7ab810..9abe491 100644 --- a/scheduler.go +++ b/scheduler.go @@ -21,69 +21,61 @@ var totalQueued int64 func Schedule(c context.Context, remotes <-chan *OD) { go Stats(c) - for { - select { - case remote := <-remotes: - if !scheduleNewTask(c, remote) { + for remote := range remotes { + logrus.WithField("url", remote.BaseUri.String()). + Info("Starting crawler") + + // Collect results + results := make(chan File) + + remote.WCtx.OD = remote + + // Get queue path + queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId)) + + // Delete existing queue + if err := os.RemoveAll(queuePath); + err != nil { panic(err) } + + // Start new queue + var err error + remote.WCtx.Queue, err = OpenQueue(queuePath) + if err != nil { panic(err) } + + // Spawn workers + for i := 0; i < config.Workers; i++ { + go remote.WCtx.Worker(results) + } + + // Enqueue initial job + atomic.AddInt32(&numActiveTasks, 1) + remote.WCtx.queueJob(Job{ + Uri: remote.BaseUri, + UriStr: remote.BaseUri.String(), + Fails: 0, + }) + + // Upload result when ready + go remote.Watch(results) + + // Sleep if max number of tasks are active + for atomic.LoadInt32(&numActiveTasks) > config.Tasks { + select { + case <-c.Done(): return + case <-time.After(time.Second): + continue } - case <-c.Done(): - return } } } -func scheduleNewTask(c context.Context, remote *OD) bool { - logrus.WithField("url", remote.BaseUri.String()). - Info("Starting crawler") - - // Collect results - results := make(chan File) - - remote.WCtx.OD = remote - - // Get queue path - queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId)) - - // Delete existing queue - if err := os.RemoveAll(queuePath); - err != nil { panic(err) } - - // Start new queue - var err error - remote.WCtx.Queue, err = OpenQueue(queuePath) - if err != nil { panic(err) } - - // Spawn workers - remote.WCtx.SpawnWorkers(c, results, config.Workers) - - // Enqueue initial job - atomic.AddInt32(&numActiveTasks, 1) - remote.WCtx.queueJob(Job{ - Uri: remote.BaseUri, - UriStr: remote.BaseUri.String(), - Fails: 0, - }) - - // Upload result when ready - globalWait.Add(1) - go remote.Watch(results) - - // Sleep if max number of tasks are active - for atomic.LoadInt32(&numActiveTasks) > config.Tasks { - if !sleep(time.Second, c) { - break - } - } - - return true -} - func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { if !t.register() { return } + globalWait.Add(1) now := time.Now() od := &OD { Task: *t, @@ -125,7 +117,7 @@ func (o *OD) Watch(results chan File) { // Open crawl results file f, err := os.OpenFile( filePath, - os.O_CREATE | os.O_RDWR | os.O_APPEND, + os.O_CREATE | os.O_RDWR | os.O_TRUNC, 0644, ) if err != nil { @@ -141,9 +133,7 @@ func (o *OD) Watch(results chan File) { // Block until all results are written // (closes results channel) - if !o.handleCollect(results, f, collectErrC) { - return - } + o.handleCollect(results, f, collectErrC) // Exit code of Collect() err = <-collectErrC @@ -163,40 +153,22 @@ func (o *OD) Watch(results chan File) { } } -// Returns if aborted naturally (results ready for upload) -func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) bool { +func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) { // Begin collecting results go o.Task.Collect(results, f, collectErrC) defer close(results) // Wait for all jobs on remote to finish - for { - // Natural finish - if atomic.LoadInt64(&o.InProgress) == 0 { - o.onTaskFinished() - return true - } - // Abort - if atomic.LoadInt32(&o.WCtx.aborted) != 0 { - // Wait for all workers to finish - o.WCtx.workers.Wait() - o.onTaskPaused() - return false - } - - time.Sleep(500 * time.Millisecond) - } -} - -func (o *OD) onTaskFinished() { - defer atomic.AddInt32(&numActiveTasks, -1) + o.Wait.Wait() // Close queue if err := o.WCtx.Queue.Close(); err != nil { panic(err) } + atomic.AddInt32(&numActiveTasks, -1) // Log finish + logrus.WithFields(logrus.Fields{ "id": o.Task.WebsiteId, "url": o.BaseUri.String(), @@ -219,37 +191,6 @@ func (o *OD) onTaskFinished() { } } -func (o *OD) onTaskPaused() { - defer atomic.AddInt32(&numActiveTasks, -1) - - // Close queue - if err := o.WCtx.Queue.Close(); err != nil { - panic(err) - } - - // Set current end time - o.Result.EndTimeUnix = time.Now().Unix() - - // Save task metadata - err := SaveTask(o) - if err != nil { - // Log finish - logrus.WithFields(logrus.Fields{ - "err": err.Error(), - "id": o.Task.WebsiteId, - "url": o.BaseUri.String(), - }).Error("Failed to save crawler state") - return - } - - // Log finish - logrus.WithFields(logrus.Fields{ - "id": o.Task.WebsiteId, - "url": o.BaseUri.String(), - "duration": time.Since(o.Result.StartTime), - }).Info("Crawler paused") -} - func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) { err := t.collect(results, f) if err != nil { diff --git a/worker.go b/worker.go index 5c07a15..118e28f 100644 --- a/worker.go +++ b/worker.go @@ -1,7 +1,6 @@ package main import ( - "context" "github.com/beeker1121/goque" "github.com/sirupsen/logrus" "math" @@ -19,29 +18,10 @@ type WorkerContext struct { Queue *BufferedQueue lastRateLimit time.Time numRateLimits int - workers sync.WaitGroup - aborted int32 } -func (w *WorkerContext) SpawnWorkers(c context.Context, results chan<- File, n int) { - w.workers.Add(n) - for i := 0; i < n; i++ { - go w.Worker(c, results) - } -} - -func (w *WorkerContext) Worker(c context.Context, results chan<- File) { - defer w.workers.Done() - +func (w *WorkerContext) Worker(results chan<- File) { for { - select { - case <-c.Done(): - // Not yet done - atomic.StoreInt32(&w.aborted, 1) - return - default: - } - job, err := w.Queue.Dequeue() switch err { case goque.ErrEmpty: @@ -176,7 +156,7 @@ func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) { } func (w *WorkerContext) queueJob(job Job) { - atomic.AddInt64(&w.OD.InProgress, 1) + w.OD.Wait.Add(1) if w.numRateLimits > 0 { if time.Since(w.lastRateLimit) > 5 * time.Second { @@ -193,7 +173,7 @@ func (w *WorkerContext) queueJob(job Job) { } func (w *WorkerContext) finishJob() { - atomic.AddInt64(&w.OD.InProgress, -1) + w.OD.Wait.Done() } func isErrSilent(err error) bool {