diff --git a/README.md b/README.md index 2ee4ee2..b39d6e8 100644 --- a/README.md +++ b/README.md @@ -20,10 +20,14 @@ and error handling. Built for automated web scraping. ```json { "addr": "localhost:5050", + "timeout": "15s", + "wait": "4s", + "multiplier": 2.5, + "retries": 3, "proxies": [ { "name": "squid_P0", - "url": "http://p0.exemple.com:8080" + "url": "http://user:pass@p0.exemple.com:8080" }, { "name": "privoxy_P1", @@ -38,7 +42,7 @@ and error handling. Built for automated web scraping. }, "reddit.com": { "every": "2s", - "burst": 1, + "burst": 2, "headers": {"User-Agent": "mybot_v0.1"} } } diff --git a/config.go b/config.go index c7aa662..fc0fb10 100644 --- a/config.go +++ b/config.go @@ -2,16 +2,19 @@ package main import ( "encoding/json" + "fmt" "golang.org/x/time/rate" "io/ioutil" "os" + "strings" "time" ) type HostConfig struct { - Every string `json:"every"` - Burst int `json:"burst"` - Headers map[string]string `json:"headers"` + EveryStr string `json:"every"` + Burst int `json:"burst"` + Headers map[string]string `json:"headers"` + Every time.Duration } type ProxyConfig struct { @@ -20,9 +23,15 @@ type ProxyConfig struct { } var config struct { - Addr string `json:"addr"` - Hosts map[string]HostConfig `json:"hosts"` - Proxies []ProxyConfig `json:"proxies"` + Addr string `json:"addr"` + TimeoutStr string `json:"timeout"` + WaitStr string `json:"wait"` + Multiplier float64 `json:"multiplier"` + Retries int `json:"retries"` + Hosts map[string]*HostConfig `json:"hosts"` + Proxies []ProxyConfig `json:"proxies"` + Wait int64 + Timeout time.Duration } func loadConfig() { @@ -35,15 +44,47 @@ func loadConfig() { err = json.Unmarshal(configBytes, &config) handleErr(err) + + validateConfig() + + config.Timeout, err = time.ParseDuration(config.TimeoutStr) + wait, err := time.ParseDuration(config.WaitStr) + config.Wait = int64(wait) + + for _, conf := range config.Hosts { + conf.Every, err = time.ParseDuration(conf.EveryStr) + handleErr(err) + } +} + +func validateConfig() { + + hasDefaultHost := false + + for host, conf := range config.Hosts { + + if host == "*" { + hasDefaultHost = true + } + + for k := range conf.Headers { + if strings.ToLower(k) == "accept-encoding" { + panic(fmt.Sprintf("headers config for '%s':"+ + " Do not set the Accept-Encoding header, it breaks goproxy", host)) + } + } + } + + if !hasDefaultHost { + panic("config.json: You must specify a default host ('*')") + } } func applyConfig(proxy *Proxy) { for host, conf := range config.Hosts { - duration, err := time.ParseDuration(conf.Every) - handleErr(err) proxy.Limiters[host] = &ExpiringLimiter{ - rate.NewLimiter(rate.Every(duration), conf.Burst), + rate.NewLimiter(rate.Every(conf.Every), conf.Burst), time.Now(), } } diff --git a/config.json b/config.json index f115997..b3faf15 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,9 @@ { "addr": "localhost:5050", + "timeout": "15s", + "wait": "4s", + "multiplier": 2.5, + "retries": 3, "proxies": [ { "name": "p0", @@ -12,14 +16,20 @@ ], "hosts": { "*": { - "every": "10s", - "burst": 1, - "headers": {} + "every": "500s", + "burst": 25, + "headers": { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + "Cache-Control": "max-age=0", + "Connection": "keep-alive", + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0" + } }, "reddit.com": { - "every": "100s", - "burst": 1, - "headers": {} + "every": "2s", + "burst": 2, + "headers": { + } } } } \ No newline at end of file diff --git a/gc.go b/gc.go index f274a83..7122851 100644 --- a/gc.go +++ b/gc.go @@ -32,7 +32,7 @@ func (b *Balancer) cleanAllExpiredLimits() { logrus.WithFields(logrus.Fields{ "removed": before - after, - }).Info("Did limiters cleanup") + }).Info("Cleaned up limiters") } func cleanExpiredLimits(proxy *Proxy) { diff --git a/main.go b/main.go index 1a28fc2..9d5ce06 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "flag" "github.com/elazarl/goproxy" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -58,11 +57,12 @@ func (p *Proxy) getLimiter(host string) *rate.Limiter { } func (p *Proxy) makeNewLimiter(host string) *ExpiringLimiter { - every := time.Millisecond //todo load default from conf + + defaultConf := config.Hosts["*"] newExpiringLimiter := &ExpiringLimiter{ LastRead: time.Now(), - Limiter: rate.NewLimiter(rate.Every(every), 1), + Limiter: rate.NewLimiter(rate.Every(defaultConf.Every), defaultConf.Burst), } p.Limiters[host] = newExpiringLimiter @@ -76,7 +76,12 @@ func (p *Proxy) makeNewLimiter(host string) *ExpiringLimiter { func simplifyHost(host string) string { if strings.HasPrefix(host, "www.") { - return host[4:] + host = host[4:] + } + + col := strings.LastIndex(host, ":") + if col > 0 { + host = host[:col] } return host @@ -104,6 +109,7 @@ func New() *Balancer { logrus.WithFields(logrus.Fields{ "proxy": p.Name, "connexions": p.Connections, + "host": r.Host, }).Trace("Routing request") resp, err := p.processRequest(r) @@ -118,22 +124,38 @@ func New() *Balancer { return balancer } +func applyHeaders(r *http.Request) *http.Request { + + if conf, ok := config.Hosts["*"]; ok { + for k, v := range conf.Headers { + r.Header.Set(k, v) + } + } + + sHost := simplifyHost(r.Host) + if conf, ok := config.Hosts[sHost]; ok { + for k, v := range conf.Headers { + r.Header.Set(k, v) + } + } + return r +} + func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) { p.Connections += 1 defer func() { p.Connections -= 1 }() - retries := 1 - const maxRetries = 5 + retries := 0 p.waitRateLimit(r) - proxyReq := preprocessRequest(cloneRequest(r)) + proxyReq := applyHeaders(cloneRequest(r)) for { - if retries > maxRetries { - return nil, errors.Errorf("giving up after %d retries", maxRetries) + if retries >= config.Retries { + return nil, errors.Errorf("giving up after %d retries", config.Retries) } resp, err := p.HttpClient.Do(proxyReq) @@ -177,22 +199,15 @@ func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) { func (b *Balancer) Run() { - addr := flag.String("addr", "localhost:5050", "listen address") - flag.Parse() - //b.Verbose = true logrus.WithFields(logrus.Fields{ - "addr": *addr, + "addr": config.Addr, }).Info("Listening") - err := http.ListenAndServe(*addr, b.server) + err := http.ListenAndServe(config.Addr, b.server) logrus.Fatal(err) } -func preprocessRequest(r *http.Request) *http.Request { - return r -} - func cloneRequest(r *http.Request) *http.Request { proxyReq := &http.Request{ @@ -223,7 +238,6 @@ func NewProxy(name, stringUrl string) (*Proxy, error) { } var httpClient *http.Client - //TODO: setup extra headers & qargs here if parsedUrl == nil { httpClient = &http.Client{} } else { @@ -234,6 +248,8 @@ func NewProxy(name, stringUrl string) (*Proxy, error) { } } + httpClient.Timeout = config.Timeout + return &Proxy{ Name: name, Url: parsedUrl, diff --git a/retry.go b/retry.go index bb48131..ed3528b 100644 --- a/retry.go +++ b/retry.go @@ -21,19 +21,16 @@ func isPermanentError(err error) bool { if ok { opErr, ok = urlErr.Err.(*net.OpError) if !ok { + if urlErr.Err != nil && urlErr.Err.Error() == "Proxy Authentication Required" { + logrus.Warn("Got 'Proxy Authentication Required'. Did you forget to configure the password for a proxy?") + return true + } return false } } else { - netErr, ok := err.(net.Error) + _, ok := err.(net.Error) if ok { - if netErr.Timeout() { - return false - } - - opErr, ok = netErr.(*net.OpError) - if !ok { - return false - } + return false } } @@ -80,10 +77,7 @@ func isPermanentError(err error) bool { func waitTime(retries int) time.Duration { - const multiplier = 1.5 - const wait = int64(5 * time.Second) - - return time.Duration(wait * int64(math.Pow(multiplier, float64(retries)))) + return time.Duration(config.Wait * int64(math.Pow(config.Multiplier, float64(retries)))) } func (p *Proxy) waitRateLimit(r *http.Request) { @@ -101,7 +95,7 @@ func (p *Proxy) waitRateLimit(r *http.Request) { delay := reservation.Delay() if delay > 0 { logrus.WithFields(logrus.Fields{ - "time": delay, + "wait": delay, }).Trace("Sleeping") time.Sleep(delay) } @@ -115,10 +109,15 @@ func shouldRetryHttpCode(code int) bool { switch { case code == 403: + fallthrough case code == 408: + fallthrough case code == 429: + fallthrough case code == 444: + fallthrough case code == 499: + fallthrough case code >= 500: return true } diff --git a/test/web.py b/test/web.py index d63c50c..e009956 100644 --- a/test/web.py +++ b/test/web.py @@ -5,7 +5,7 @@ app = Flask(__name__) @app.route("/") -def hello(): +def slow(): time.sleep(90) return "Hello World!" @@ -17,7 +17,11 @@ def e500(): @app.route("/404") def e404(): - time.sleep(0.5) + return Response(status=404) + + +@app.route("/403") +def e403(): return Response(status=404)