From e15cab98efa290a2ae8d22a806ebefc983f1dad9 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 29 May 2019 15:27:37 -0400 Subject: [PATCH] retry on fail, retry on http status codes --- main.go | 91 ++++++++++++++++++++++++++----------- retry.go | 127 ++++++++++++++++++++++++++++++++++++++++++++++++++++ test/web.py | 24 ++++++++++ web.py | 14 ------ 4 files changed, 216 insertions(+), 40 deletions(-) create mode 100644 retry.go create mode 100644 test/web.py delete mode 100644 web.py diff --git a/main.go b/main.go index ed5c24a..aa230c8 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "github.com/elazarl/goproxy" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/time/rate" "net/http" @@ -77,12 +78,9 @@ func simplifyHost(host string) string { return host } -func (b *Balancer) chooseProxy(host string) *Proxy { - - _ = simplifyHost(host) +func (b *Balancer) chooseProxy() *Proxy { sort.Sort(ByConnectionCount(b.proxies)) - return b.proxies[0] } @@ -97,35 +95,18 @@ func New() *Balancer { balancer.server.OnRequest().Do(LogRequestMiddleware( func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { - sHost := simplifyHost(r.Host) - p := balancer.chooseProxy(sHost) + p := balancer.chooseProxy() - p.Connections += 1 logrus.WithFields(logrus.Fields{ "proxy": p.Name, "connexions": p.Connections, }).Trace("Routing request") - limiter := p.getLimiter(sHost) - reservation := limiter.Reserve() - if !reservation.OK() { - logrus.Warn("Could not reserve") - } - delay := reservation.Delay() - if delay > 0 { - logrus.WithFields(logrus.Fields{ - "time": delay, - }).Trace("Sleeping") - time.Sleep(delay) - } + resp, err := p.processRequest(r) - proxyReq := preprocessRequest(cloneRequest(r)) - resp, err := p.HttpClient.Do(proxyReq) - p.Connections -= 1 - - //TODO: handle err if err != nil { - panic(err) + logrus.WithError(err).Trace("Could not complete request") + return nil, goproxy.NewResponse(r, "text/plain", 500, err.Error()) } return nil, resp @@ -133,6 +114,63 @@ func New() *Balancer { return balancer } +func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) { + + p.Connections += 1 + defer func() { + p.Connections -= 1 + }() + retries := 1 + const maxRetries = 5 + + p.waitRateLimit(r) + proxyReq := preprocessRequest(cloneRequest(r)) + + for { + + if retries > maxRetries { + return nil, errors.Errorf("giving up after %d retries", maxRetries) + } + + resp, err := p.HttpClient.Do(proxyReq) + + if err != nil { + if isPermanentError(err) { + return nil, err + } + + wait := waitTime(retries) + + logrus.WithError(err).WithFields(logrus.Fields{ + "wait": wait, + }).Trace("Temporary error during request") + time.Sleep(wait) + + retries += 1 + continue + } + + if isHttpSuccessCode(resp.StatusCode) { + + return resp, nil + } else if shouldRetryHttpCode(resp.StatusCode) { + + wait := waitTime(retries) + + logrus.WithFields(logrus.Fields{ + "wait": wait, + "status": resp.StatusCode, + }).Trace("HTTP error during request") + + time.Sleep(wait) + retries += 1 + continue + } else { + return nil, errors.Errorf("HTTP error: %d", resp.StatusCode) + } + } +} + func (b *Balancer) Run() { addr := flag.String("addr", "localhost:5050", "listen address") @@ -143,7 +181,8 @@ func (b *Balancer) Run() { "addr": *addr, }).Info("Listening") - go logrus.Fatal(http.ListenAndServe(*addr, b.server)) + err := http.ListenAndServe(*addr, b.server) + logrus.Fatal(err) } func preprocessRequest(r *http.Request) *http.Request { diff --git a/retry.go b/retry.go new file mode 100644 index 0000000..bb48131 --- /dev/null +++ b/retry.go @@ -0,0 +1,127 @@ +package main + +import ( + "fmt" + "github.com/sirupsen/logrus" + "log" + "math" + "net" + "net/http" + "net/url" + "os" + "syscall" + "time" +) + +func isPermanentError(err error) bool { + + var opErr *net.OpError + + urlErr, ok := err.(*url.Error) + if ok { + opErr, ok = urlErr.Err.(*net.OpError) + if !ok { + return false + } + } else { + netErr, ok := err.(net.Error) + if ok { + if netErr.Timeout() { + return false + } + + opErr, ok = netErr.(*net.OpError) + if !ok { + return false + } + } + } + + //This should not happen... + if opErr == nil { + logrus.Error("FIXME: isPermanentError; opErr == nil") + return false + } + + if opErr.Op == "proxyconnect" { + logrus.Error("Error connecting to the proxy!") + return true + } + + if opErr.Timeout() { + // Usually means that there is no route to host + return true + } + + switch t := opErr.Err.(type) { + case *net.DNSError: + return true + case *os.SyscallError: + + logrus.Printf("os.SyscallError:%+v", t) + + if errno, ok := t.Err.(syscall.Errno); ok { + switch errno { + case syscall.ECONNREFUSED: + log.Println("connect refused") + return true + case syscall.ETIMEDOUT: + log.Println("timeout") + return false + } + } + } + + //todo: handle the other error types + fmt.Println("fixme: None of the above") + + return false +} + +func waitTime(retries int) time.Duration { + + const multiplier = 1.5 + const wait = int64(5 * time.Second) + + return time.Duration(wait * int64(math.Pow(multiplier, float64(retries)))) +} + +func (p *Proxy) waitRateLimit(r *http.Request) { + + sHost := simplifyHost(r.Host) + + limiter := p.getLimiter(sHost) + reservation := limiter.Reserve() + if !reservation.OK() { + logrus.WithFields(logrus.Fields{ + "host": sHost, + }).Warn("Could not get reservation, make sure that burst is > 0") + } + + delay := reservation.Delay() + if delay > 0 { + logrus.WithFields(logrus.Fields{ + "time": delay, + }).Trace("Sleeping") + time.Sleep(delay) + } +} + +func isHttpSuccessCode(code int) bool { + return code >= 200 && code < 300 +} + +func shouldRetryHttpCode(code int) bool { + + switch { + case code == 403: + case code == 408: + case code == 429: + case code == 444: + case code == 499: + case code >= 500: + return true + } + + return false +} diff --git a/test/web.py b/test/web.py new file mode 100644 index 0000000..cebbdd7 --- /dev/null +++ b/test/web.py @@ -0,0 +1,24 @@ +from flask import Flask, Response +import time + +app = Flask(__name__) + + +@app.route("/") +def hello(): + time.sleep(90) + return "Hello World!" + + +@app.route("/500") +def e500(): + return Response(status=500) + + +@app.route("/404") +def e404(): + return Response(status=404) + + +if __name__ == "__main__": + app.run(port=9999) diff --git a/web.py b/web.py deleted file mode 100644 index 310cd69..0000000 --- a/web.py +++ /dev/null @@ -1,14 +0,0 @@ -from flask import Flask -import time - -app = Flask(__name__) - - -@app.route("/") -def hello(): - time.sleep(3) - return "Hello World!" - - -if __name__ == "__main__": - app.run(port=9999)