From 206ea0e91de6e81b5fcd46de7dd089896532b9e1 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Fri, 22 Feb 2019 18:50:35 +0100 Subject: [PATCH] Simplify config --- config.go | 139 +++++++++++++++++++++++++----------------------------- main.go | 64 ++++++++++++++++++------- 2 files changed, 112 insertions(+), 91 deletions(-) diff --git a/config.go b/config.go index b8c15ef..d80ebef 100644 --- a/config.go +++ b/config.go @@ -4,6 +4,7 @@ import ( "bufio" "fmt" "github.com/sirupsen/logrus" + "github.com/spf13/pflag" "github.com/spf13/viper" "io" "os" @@ -26,6 +27,8 @@ var config struct { JobBufferSize int } +var onlineMode bool + const ( ConfServerUrl = "server.url" ConfToken = "server.token" @@ -54,8 +57,56 @@ const ( func prepareConfig() { pf := rootCmd.PersistentFlags() - bind := func(s string) { - if err := viper.BindPFlag(s, pf.Lookup(s)); err != nil { + pf.SortFlags = false + pf.StringVar(&configFile, "config", "", "Config file") + configFile = os.Getenv("OD_CONFIG") + + pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL") + + pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)") + + pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout") + + pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs") + + pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error") + + pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size") + + pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries") + + pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries") + + pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks") + + pf.Uint(ConfWorkers, 4, "Crawler: Connections per server") + + pf.Uint(ConfRetries, 5, "Crawler: Request retries") + + pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout") + + pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout") + + pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent") + + pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size") + + pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval") + + pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval") + + pf.Bool(ConfVerbose, false, "Log: Print every listed dir") + + pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors") + + pf.String(ConfLogFile, "crawler.log", "Log file") + + // Bind all flags to Viper + pf.VisitAll(func(flag *pflag.Flag) { + s := flag.Name + s = strings.TrimLeft(s, "-") + + if err := viper.BindPFlag(s, flag); err != nil { panic(err) } var envKey string @@ -65,71 +116,7 @@ func prepareConfig() { if err := viper.BindEnv(s, envKey); err != nil { panic(err) } - } - - pf.SortFlags = false - pf.StringVar(&configFile, "config", "", "Config file") - configFile = os.Getenv("OD_CONFIG") - - pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL") - bind(ConfServerUrl) - - pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)") - bind(ConfToken) - - pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout") - bind(ConfServerTimeout) - - pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs") - bind(ConfRecheck) - - pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error") - bind(ConfCooldown) - - pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size") - bind(ConfChunkSize) - - pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries") - bind(ConfUploadRetries) - - pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries") - bind(ConfUploadRetryInterval) - - pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks") - bind(ConfTasks) - - pf.Uint(ConfWorkers, 4, "Crawler: Connections per server") - bind(ConfWorkers) - - pf.Uint(ConfRetries, 5, "Crawler: Request retries") - bind(ConfRetries) - - pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout") - bind(ConfDialTimeout) - - pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout") - bind(ConfTimeout) - - pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent") - bind(ConfUserAgent) - - pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size") - bind(ConfJobBufferSize) - - pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval") - bind(ConfCrawlStats) - - pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval") - bind(ConfAllocStats) - - pf.Bool(ConfVerbose, false, "Log: Print every listed dir") - bind(ConfVerbose) - - pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors") - bind(ConfPrintHTTP) - - pf.String(ConfLogFile, "crawler.log", "Log file") - bind(ConfLogFile) + }) } func readConfig() { @@ -157,15 +144,17 @@ func readConfig() { } } - config.ServerUrl = viper.GetString(ConfServerUrl) - if config.ServerUrl == "" { - configMissing(ConfServerUrl) - } - config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") + if onlineMode { + config.ServerUrl = viper.GetString(ConfServerUrl) + if config.ServerUrl == "" { + configMissing(ConfServerUrl) + } + config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") - config.Token = viper.GetString(ConfToken) - if config.Token == "" { - configMissing(ConfToken) + config.Token = viper.GetString(ConfToken) + if config.Token == "" { + configMissing(ConfToken) + } } config.ServerTimeout = viper.GetDuration(ConfServerTimeout) diff --git a/main.go b/main.go index e2d67ac..736d3a6 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "github.com/spf13/viper" "github.com/terorie/od-database-crawler/fasturl" "os" + "os/signal" "strings" "sync/atomic" "time" @@ -17,7 +18,7 @@ var configFile string var rootCmd = cobra.Command { Use: "od-database-crawler", - Version: "1.2.1", + Version: "1.2.2", Short: "OD-Database Go crawler", Long: helpText, PersistentPreRunE: preRun, @@ -61,8 +62,6 @@ func preRun(cmd *cobra.Command, args []string) error { if err := os.MkdirAll("queue", 0755); err != nil { panic(err) } - readConfig() - return nil } @@ -75,25 +74,31 @@ func main() { } func cmdBase(_ *cobra.Command, _ []string) { - // TODO Graceful shutdown - appCtx := context.Background() - forceCtx := context.Background() + onlineMode = true + readConfig() + + appCtx, soft := context.WithCancel(context.Background()) + forceCtx, hard := context.WithCancel(context.Background()) + go hardShutdown(forceCtx) + go listenCtrlC(soft, hard) inRemotes := make(chan *OD) - go Schedule(forceCtx, inRemotes) + go Schedule(appCtx, inRemotes) ticker := time.NewTicker(config.Recheck) defer ticker.Stop() for { select { case <-appCtx.Done(): - return + goto shutdown case <-ticker.C: t, err := FetchTask() if err != nil { logrus.WithError(err). Error("Failed to get new task") - time.Sleep(viper.GetDuration(ConfCooldown)) + if !sleep(viper.GetDuration(ConfCooldown), appCtx) { + goto shutdown + } continue } if t == nil { @@ -109,13 +114,7 @@ func cmdBase(_ *cobra.Command, _ []string) { if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme { // Not an error err = nil - - // Give back task - //err2 := CancelTask(t.WebsiteId) - //if err2 != nil { - // logrus.Error(err2) - //} - + // TODO FTP crawler continue } else if err != nil { logrus.WithError(err). @@ -126,9 +125,15 @@ func cmdBase(_ *cobra.Command, _ []string) { ScheduleTask(inRemotes, t, &baseUri) } } + +shutdown: + globalWait.Wait() } func cmdCrawler(_ *cobra.Command, args []string) error { + onlineMode = false + readConfig() + arg := args[0] // https://github.com/golang/go/issues/19779 if !strings.Contains(arg, "://") { @@ -161,3 +166,30 @@ func cmdCrawler(_ *cobra.Command, args []string) error { return nil } + +func listenCtrlC(soft, hard context.CancelFunc) { + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + + <-c + logrus.Info(">>> Shutting down crawler... <<<") + soft() + + <-c + logrus.Warning(">>> Force shutdown! <<<") + hard() +} + +func hardShutdown(c context.Context) { + <-c.Done() + os.Exit(1) +} + +func sleep(d time.Duration, c context.Context) bool { + select { + case <-time.After(d): + return true + case <-c.Done(): + return false + } +}