Simplify config

This commit is contained in:
Richard Patel 2019-02-22 18:50:35 +01:00
parent 8b9d8bfd17
commit 206ea0e91d
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
2 changed files with 112 additions and 91 deletions

139
config.go
View File

@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
"io" "io"
"os" "os"
@ -26,6 +27,8 @@ var config struct {
JobBufferSize int JobBufferSize int
} }
var onlineMode bool
const ( const (
ConfServerUrl = "server.url" ConfServerUrl = "server.url"
ConfToken = "server.token" ConfToken = "server.token"
@ -54,8 +57,56 @@ const (
func prepareConfig() { func prepareConfig() {
pf := rootCmd.PersistentFlags() pf := rootCmd.PersistentFlags()
bind := func(s string) { pf.SortFlags = false
if err := viper.BindPFlag(s, pf.Lookup(s)); err != nil { pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
pf.String(ConfLogFile, "crawler.log", "Log file")
// Bind all flags to Viper
pf.VisitAll(func(flag *pflag.Flag) {
s := flag.Name
s = strings.TrimLeft(s, "-")
if err := viper.BindPFlag(s, flag); err != nil {
panic(err) panic(err)
} }
var envKey string var envKey string
@ -65,71 +116,7 @@ func prepareConfig() {
if err := viper.BindEnv(s, envKey); err != nil { if err := viper.BindEnv(s, envKey); err != nil {
panic(err) panic(err)
} }
} })
pf.SortFlags = false
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
bind(ConfServerUrl)
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
bind(ConfToken)
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
bind(ConfServerTimeout)
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
bind(ConfRecheck)
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
bind(ConfCooldown)
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
bind(ConfChunkSize)
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
bind(ConfUploadRetries)
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
bind(ConfUploadRetryInterval)
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
bind(ConfTasks)
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
bind(ConfWorkers)
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
bind(ConfRetries)
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
bind(ConfDialTimeout)
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
bind(ConfTimeout)
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
bind(ConfUserAgent)
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
bind(ConfJobBufferSize)
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
bind(ConfCrawlStats)
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
bind(ConfAllocStats)
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
bind(ConfVerbose)
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
bind(ConfPrintHTTP)
pf.String(ConfLogFile, "crawler.log", "Log file")
bind(ConfLogFile)
} }
func readConfig() { func readConfig() {
@ -157,15 +144,17 @@ func readConfig() {
} }
} }
config.ServerUrl = viper.GetString(ConfServerUrl) if onlineMode {
if config.ServerUrl == "" { config.ServerUrl = viper.GetString(ConfServerUrl)
configMissing(ConfServerUrl) if config.ServerUrl == "" {
} configMissing(ConfServerUrl)
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") }
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.Token = viper.GetString(ConfToken) config.Token = viper.GetString(ConfToken)
if config.Token == "" { if config.Token == "" {
configMissing(ConfToken) configMissing(ConfToken)
}
} }
config.ServerTimeout = viper.GetDuration(ConfServerTimeout) config.ServerTimeout = viper.GetDuration(ConfServerTimeout)

64
main.go
View File

@ -8,6 +8,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"os" "os"
"os/signal"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -17,7 +18,7 @@ var configFile string
var rootCmd = cobra.Command { var rootCmd = cobra.Command {
Use: "od-database-crawler", Use: "od-database-crawler",
Version: "1.2.1", Version: "1.2.2",
Short: "OD-Database Go crawler", Short: "OD-Database Go crawler",
Long: helpText, Long: helpText,
PersistentPreRunE: preRun, PersistentPreRunE: preRun,
@ -61,8 +62,6 @@ func preRun(cmd *cobra.Command, args []string) error {
if err := os.MkdirAll("queue", 0755); if err := os.MkdirAll("queue", 0755);
err != nil { panic(err) } err != nil { panic(err) }
readConfig()
return nil return nil
} }
@ -75,25 +74,31 @@ func main() {
} }
func cmdBase(_ *cobra.Command, _ []string) { func cmdBase(_ *cobra.Command, _ []string) {
// TODO Graceful shutdown onlineMode = true
appCtx := context.Background() readConfig()
forceCtx := context.Background()
appCtx, soft := context.WithCancel(context.Background())
forceCtx, hard := context.WithCancel(context.Background())
go hardShutdown(forceCtx)
go listenCtrlC(soft, hard)
inRemotes := make(chan *OD) inRemotes := make(chan *OD)
go Schedule(forceCtx, inRemotes) go Schedule(appCtx, inRemotes)
ticker := time.NewTicker(config.Recheck) ticker := time.NewTicker(config.Recheck)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-appCtx.Done(): case <-appCtx.Done():
return goto shutdown
case <-ticker.C: case <-ticker.C:
t, err := FetchTask() t, err := FetchTask()
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed to get new task") Error("Failed to get new task")
time.Sleep(viper.GetDuration(ConfCooldown)) if !sleep(viper.GetDuration(ConfCooldown), appCtx) {
goto shutdown
}
continue continue
} }
if t == nil { if t == nil {
@ -109,13 +114,7 @@ func cmdBase(_ *cobra.Command, _ []string) {
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme { if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
// Not an error // Not an error
err = nil err = nil
// TODO FTP crawler
// Give back task
//err2 := CancelTask(t.WebsiteId)
//if err2 != nil {
// logrus.Error(err2)
//}
continue continue
} else if err != nil { } else if err != nil {
logrus.WithError(err). logrus.WithError(err).
@ -126,9 +125,15 @@ func cmdBase(_ *cobra.Command, _ []string) {
ScheduleTask(inRemotes, t, &baseUri) ScheduleTask(inRemotes, t, &baseUri)
} }
} }
shutdown:
globalWait.Wait()
} }
func cmdCrawler(_ *cobra.Command, args []string) error { func cmdCrawler(_ *cobra.Command, args []string) error {
onlineMode = false
readConfig()
arg := args[0] arg := args[0]
// https://github.com/golang/go/issues/19779 // https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") { if !strings.Contains(arg, "://") {
@ -161,3 +166,30 @@ func cmdCrawler(_ *cobra.Command, args []string) error {
return nil return nil
} }
func listenCtrlC(soft, hard context.CancelFunc) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
<-c
logrus.Info(">>> Shutting down crawler... <<<")
soft()
<-c
logrus.Warning(">>> Force shutdown! <<<")
hard()
}
func hardShutdown(c context.Context) {
<-c.Done()
os.Exit(1)
}
func sleep(d time.Duration, c context.Context) bool {
select {
case <-time.After(d):
return true
case <-c.Done():
return false
}
}