Getting tasks

This commit is contained in:
Richard Patel 2018-11-16 04:47:08 +01:00
parent 3c39f0d621
commit 3f85cf679b
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
6 changed files with 77 additions and 65 deletions

View File

@ -5,6 +5,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
"os" "os"
"strings"
"time" "time"
) )
@ -52,14 +53,15 @@ func readConfig() {
} }
config.ServerUrl = viper.GetString(ConfServerUrl) config.ServerUrl = viper.GetString(ConfServerUrl)
//if config.ServerUrl == "" { if config.ServerUrl == "" {
// configMissing(ConfServerUrl) configMissing(ConfServerUrl)
//} }
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.Token = viper.GetString(ConfToken) config.Token = viper.GetString(ConfToken)
//if config.Token == "" { if config.Token == "" {
// configMissing(ConfToken) configMissing(ConfToken)
//} }
config.Retries = viper.GetInt(ConfRetries) config.Retries = viper.GetInt(ConfRetries)
if config.Retries < 0 { if config.Retries < 0 {

View File

@ -1,7 +1,7 @@
# OD-Database server settings # OD-Database server settings
server: server:
# Connection URL # Connection URL
url: localhost:6969 url: http://od-db.mine.terorie.com/api
# Server auth token # Server auth token
token: token:
@ -10,7 +10,7 @@ output:
# Crawl statistics # Crawl statistics
crawl_stats: 1s crawl_stats: 1s
# CPU/RAM/Job queue stats # CPU/RAM/Job queue stats
resource_stats: 1s resource_stats: 10s
# More output? (Every listed dir) # More output? (Every listed dir)
verbose: false verbose: false
@ -18,9 +18,9 @@ output:
crawl: crawl:
# Number of sites that can be # Number of sites that can be
# processed at once # processed at once
tasks: 3 tasks: 300
# Number of connections per site # Number of connections per site
connections: 2 connections: 20
# How often to retry getting data # How often to retry getting data
# from the site before giving up # from the site before giving up
retries: 5 retries: 5

85
main.go
View File

@ -9,7 +9,7 @@ import (
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"strings" "sync/atomic"
"time" "time"
) )
@ -20,14 +20,7 @@ var app = cli.App {
BashComplete: cli.DefaultAppComplete, BashComplete: cli.DefaultAppComplete,
Writer: os.Stdout, Writer: os.Stdout,
Compiled: buildDate, Compiled: buildDate,
Commands: []cli.Command{ Action: cmdBase,
{
Name: "crawl",
Usage: "Crawl a list of URLs",
ArgsUsage: "[site, site, ...]",
Action: cmdCrawler,
},
},
} }
func init() { func init() {
@ -41,50 +34,56 @@ func main() {
app.Run(os.Args) app.Run(os.Args)
} }
func cmdCrawler(clic *cli.Context) error { func cmdBase(clic *cli.Context) error {
readConfig() readConfig()
if clic.NArg() == 0 { // TODO Graceful shutdown
cli.ShowCommandHelpAndExit(clic, "crawl", 1) appCtx := context.Background()
} forceCtx := context.Background()
args := clic.Args()
remotes := make([]*OD, len(args))
for i, arg := range args {
// https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") {
arg = "http://" + arg
}
var u fasturl.URL
err := u.Parse(arg)
if !strings.HasSuffix(u.Path, "/") {
u.Path += "/"
}
if err != nil { return err }
remotes[i] = &OD {
Task: &Task{
WebsiteId: 0,
Url: u.String(),
},
BaseUri: u,
}
}
c := context.Background()
inRemotes := make(chan *OD) inRemotes := make(chan *OD)
go Schedule(c, inRemotes) go Schedule(forceCtx, inRemotes)
for _, remote := range remotes { ticker := time.NewTicker(3 * time.Second)
globalWait.Add(1) defer ticker.Stop()
inRemotes <- remote for {
select {
case <-appCtx.Done():
return nil
case <-ticker.C:
t, err := FetchTask()
if err != nil {
logrus.WithError(err).
Error("Failed getting new task")
time.Sleep(30 * time.Second)
continue
}
if t == nil {
// No new task
if atomic.LoadInt32(&activeTasks) == 0 {
logrus.Info("Waiting …")
}
continue
}
var baseUri fasturl.URL
err = baseUri.Parse(t.Url)
if err != nil {
logrus.WithError(err).
Error("Failed getting new task")
time.Sleep(30 * time.Second)
continue
}
inRemotes <- &OD {
Task: t,
BaseUri: baseUri,
}
}
} }
// Wait for all jobs to finish // Wait for all jobs to finish
globalWait.Wait() globalWait.Wait()
logrus.Info("All dirs processed!")
return nil return nil
} }

View File

@ -8,6 +8,7 @@ import (
"os" "os"
"path" "path"
"sync/atomic" "sync/atomic"
"time"
) )
var activeTasks int32 var activeTasks int32
@ -40,6 +41,15 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Upload result when ready // Upload result when ready
go remote.Watch(results) go remote.Watch(results)
for atomic.LoadInt32(&activeTasks) > config.Tasks {
select {
case <-c.Done():
return
case <-time.After(time.Second):
continue
}
}
} }
} }

View File

@ -22,22 +22,19 @@ const (
var serverClient = http.DefaultClient var serverClient = http.DefaultClient
func FetchTask() (t *Task, err error) { func FetchTask() (t *Task, err error) {
escToken, _ := json.Marshal(config.Token) res, err := serverClient.PostForm(
payload := `{"token":` + string(escToken) + `}`
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/get", config.ServerUrl + "/task/get",
strings.NewReader(payload)) url.Values{ "token": {config.Token} })
if err != nil { return }
res, err := serverClient.Do(req)
if err != nil { return } if err != nil { return }
defer res.Body.Close() defer res.Body.Close()
if res.StatusCode != 200 { switch res.StatusCode {
err = fmt.Errorf("http %s", res.Status) case 200:
return break
case 500:
return nil, nil
default:
return nil, fmt.Errorf("http %s", res.Status)
} }
t = new(Task) t = new(Task)

View File

@ -39,6 +39,10 @@ func Stats(c context.Context) {
perSecond = math.Round(perSecond) perSecond = math.Round(perSecond)
perSecond /= 2 perSecond /= 2
if perSecond <= 0 {
continue
}
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"per_second": perSecond, "per_second": perSecond,
"done": atomic.LoadUint64(&totalDone), "done": atomic.LoadUint64(&totalDone),