mirror of
https://github.com/terorie/od-database-crawler.git
synced 2025-04-10 05:56:42 +00:00
271 lines
5.7 KiB
Go
271 lines
5.7 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/gob"
|
|
"fmt"
|
|
"github.com/beeker1121/goque"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/spf13/viper"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
func init() {
|
|
gob.Register(&PausedOD{})
|
|
}
|
|
|
|
func LoadResumeTasks(inRemotes chan<- *OD) {
|
|
resumed, err := ResumeTasks()
|
|
if err != nil {
|
|
logrus.WithError(err).
|
|
Error("Failed to resume queued tasks. " +
|
|
"/queue is probably corrupt")
|
|
err = nil
|
|
}
|
|
|
|
for _, remote := range resumed {
|
|
inRemotes <- remote
|
|
}
|
|
}
|
|
|
|
func ResumeTasks() (tasks []*OD, err error) {
|
|
// Get files in /queue
|
|
var queueF *os.File
|
|
var entries []os.FileInfo
|
|
queueF, err = os.Open("queue")
|
|
if err != nil { return nil, err }
|
|
defer queueF.Close()
|
|
entries, err = queueF.Readdir(-1)
|
|
if err != nil { return nil, err }
|
|
|
|
resumeDur := viper.GetDuration(ConfResume)
|
|
|
|
for _, entry := range entries {
|
|
if !entry.IsDir() { continue }
|
|
|
|
// Check if name is a number
|
|
var id uint64
|
|
if id, err = strconv.ParseUint(entry.Name(), 10, 64); err != nil {
|
|
continue
|
|
}
|
|
|
|
// Too old to be resumed
|
|
timeDelta := time.Since(entry.ModTime())
|
|
if resumeDur >= 0 && timeDelta > resumeDur {
|
|
removeOldQueue(id)
|
|
continue
|
|
}
|
|
|
|
// Load queue
|
|
var od *OD
|
|
if od, err = resumeQueue(id); err != nil {
|
|
logrus.WithError(err).
|
|
WithField("id", id).
|
|
Warning("Failed to load paused task")
|
|
continue
|
|
} else if od == nil {
|
|
removeOldQueue(id)
|
|
continue
|
|
}
|
|
|
|
tasks = append(tasks, od)
|
|
}
|
|
|
|
return tasks, nil
|
|
}
|
|
|
|
func SaveTask(od *OD) (err error) {
|
|
dir := filepath.Join("queue",
|
|
strconv.FormatUint(od.Task.WebsiteId, 10))
|
|
|
|
fPath := filepath.Join(dir, "PAUSED")
|
|
|
|
err = os.Mkdir(dir, 0777)
|
|
if err != nil { return err }
|
|
|
|
// Open pause file
|
|
pausedF, err := os.OpenFile(fPath, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0666)
|
|
if err != nil { return err }
|
|
defer pausedF.Close()
|
|
|
|
err = writePauseFile(od, pausedF)
|
|
if err != nil { return err }
|
|
|
|
return nil
|
|
}
|
|
|
|
func resumeQueue(id uint64) (od *OD, err error) {
|
|
logrus.WithField("id", id).
|
|
Info("Found unfinished")
|
|
|
|
fPath := filepath.Join("queue", strconv.FormatUint(id, 10))
|
|
|
|
// Try to find pause file
|
|
pausedF, err := os.Open(filepath.Join(fPath, "PAUSED"))
|
|
if os.IsNotExist(err) {
|
|
// No PAUSED file => not paused
|
|
// not paused => no error
|
|
return nil, nil
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
defer pausedF.Close()
|
|
|
|
od = new(OD)
|
|
od.WCtx.OD = od
|
|
|
|
err = readPauseFile(od, pausedF)
|
|
if err != nil { return nil, err }
|
|
|
|
// Open queue
|
|
bq, err := OpenQueue(fPath)
|
|
if err != nil { return nil, err }
|
|
|
|
od.WCtx.Queue = bq
|
|
|
|
logrus.WithField("id", id).
|
|
Info("Resuming task")
|
|
|
|
return od, nil
|
|
}
|
|
|
|
func removeOldQueue(id uint64) {
|
|
if id == 0 {
|
|
// TODO Make custom crawl less of an ugly hack
|
|
return
|
|
}
|
|
|
|
logrus.WithField("id", id).
|
|
Warning("Deleting & returning old task")
|
|
|
|
name := strconv.FormatUint(id, 10)
|
|
|
|
fPath := filepath.Join("queue", name)
|
|
|
|
// Acquire old queue
|
|
q, err := goque.OpenQueue(fPath)
|
|
if err != nil {
|
|
// Queue lock exists, don't delete
|
|
logrus.WithField("err", err).
|
|
WithField("path", fPath).
|
|
Error("Failed to acquire old task")
|
|
return
|
|
}
|
|
|
|
// Delete old queue from disk
|
|
err = q.Drop()
|
|
if err != nil {
|
|
// Queue lock exists, don't delete
|
|
logrus.WithField("err", err).
|
|
WithField("path", fPath).
|
|
Error("Failed to delete old task")
|
|
return
|
|
}
|
|
|
|
// Delete old crawl result from disk
|
|
_ = os.Remove(filepath.Join("crawled", name + ".json"))
|
|
|
|
// Return task to server
|
|
if err := CancelTask(id); err != nil {
|
|
// Queue lock exists, don't delete
|
|
logrus.WithField("err", err).
|
|
WithField("id", id).
|
|
Warning("Failed to return unfinished task to server")
|
|
return
|
|
}
|
|
}
|
|
|
|
func writePauseFile(od *OD, w io.Writer) (err error) {
|
|
// Write pause file version
|
|
_, err = w.Write([]byte("ODPAUSE-"))
|
|
if err != nil { return err }
|
|
|
|
// Create save state
|
|
paused := PausedOD {
|
|
Task: &od.Task,
|
|
Result: &od.Result,
|
|
BaseUri: &od.BaseUri,
|
|
InProgress: atomic.LoadInt64(&od.InProgress),
|
|
}
|
|
|
|
// Prepare pause settings
|
|
var b bytes.Buffer
|
|
pauseEnc := gob.NewEncoder(&b)
|
|
err = pauseEnc.Encode(&paused)
|
|
if err != nil { return err }
|
|
|
|
// Write length of pause settings
|
|
err = binary.Write(w, binary.LittleEndian, uint64(b.Len()))
|
|
if err != nil { return err }
|
|
|
|
// Write pause settings
|
|
_, err = w.Write(b.Bytes())
|
|
if err != nil { return err }
|
|
|
|
// Write pause scan state
|
|
err = od.Scanned.Marshal(w)
|
|
if err != nil { return err }
|
|
|
|
// Save mark
|
|
_, err = w.Write([]byte("--------"))
|
|
if err != nil { return err }
|
|
|
|
return nil
|
|
}
|
|
|
|
func readPauseFile(od *OD, r io.Reader) (err error) {
|
|
// Make the paused struct point to OD fields
|
|
// So gob loads values into the OD struct
|
|
paused := PausedOD {
|
|
Task: &od.Task,
|
|
Result: &od.Result,
|
|
BaseUri: &od.BaseUri,
|
|
}
|
|
|
|
var version [8]byte
|
|
_, err = io.ReadFull(r, version[:])
|
|
if err != nil { return err }
|
|
if !bytes.Equal(version[:], []byte("ODPAUSE-")) {
|
|
return fmt.Errorf("unsupported pause file")
|
|
}
|
|
|
|
// Read pause settings len
|
|
var pauseSettingsLen uint64
|
|
err = binary.Read(r, binary.LittleEndian, &pauseSettingsLen)
|
|
|
|
// Read pause settings
|
|
pauseDec := gob.NewDecoder(io.LimitReader(r, int64(pauseSettingsLen)))
|
|
err = pauseDec.Decode(&paused)
|
|
if err != nil { return err }
|
|
atomic.StoreInt64(&od.InProgress, paused.InProgress)
|
|
|
|
err = readPauseStateTree(od, r)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read state tree: %s", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func readPauseStateTree(od *OD, r io.Reader) (err error) {
|
|
// Read pause scan state
|
|
err = od.Scanned.Unmarshal(r)
|
|
if err != nil { return err }
|
|
|
|
// Check mark
|
|
var mark [8]byte
|
|
_, err = io.ReadFull(r, mark[:])
|
|
if err != nil { return err }
|
|
if !bytes.Equal(mark[:], []byte("--------")) {
|
|
return fmt.Errorf("corrupt pause file")
|
|
}
|
|
|
|
return nil
|
|
}
|