mirror of
https://github.com/simon987/Architeuthis.git
synced 2025-04-20 07:46:41 +00:00
retry on fail, retry on http status codes
This commit is contained in:
parent
6f0637dc3d
commit
e15cab98ef
91
main.go
91
main.go
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"github.com/elazarl/goproxy"
|
"github.com/elazarl/goproxy"
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -77,12 +78,9 @@ func simplifyHost(host string) string {
|
|||||||
return host
|
return host
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Balancer) chooseProxy(host string) *Proxy {
|
func (b *Balancer) chooseProxy() *Proxy {
|
||||||
|
|
||||||
_ = simplifyHost(host)
|
|
||||||
|
|
||||||
sort.Sort(ByConnectionCount(b.proxies))
|
sort.Sort(ByConnectionCount(b.proxies))
|
||||||
|
|
||||||
return b.proxies[0]
|
return b.proxies[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,35 +95,18 @@ func New() *Balancer {
|
|||||||
balancer.server.OnRequest().Do(LogRequestMiddleware(
|
balancer.server.OnRequest().Do(LogRequestMiddleware(
|
||||||
func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
|
func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
|
||||||
|
|
||||||
sHost := simplifyHost(r.Host)
|
p := balancer.chooseProxy()
|
||||||
p := balancer.chooseProxy(sHost)
|
|
||||||
|
|
||||||
p.Connections += 1
|
|
||||||
logrus.WithFields(logrus.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"proxy": p.Name,
|
"proxy": p.Name,
|
||||||
"connexions": p.Connections,
|
"connexions": p.Connections,
|
||||||
}).Trace("Routing request")
|
}).Trace("Routing request")
|
||||||
|
|
||||||
limiter := p.getLimiter(sHost)
|
resp, err := p.processRequest(r)
|
||||||
reservation := limiter.Reserve()
|
|
||||||
if !reservation.OK() {
|
|
||||||
logrus.Warn("Could not reserve")
|
|
||||||
}
|
|
||||||
delay := reservation.Delay()
|
|
||||||
if delay > 0 {
|
|
||||||
logrus.WithFields(logrus.Fields{
|
|
||||||
"time": delay,
|
|
||||||
}).Trace("Sleeping")
|
|
||||||
time.Sleep(delay)
|
|
||||||
}
|
|
||||||
|
|
||||||
proxyReq := preprocessRequest(cloneRequest(r))
|
|
||||||
resp, err := p.HttpClient.Do(proxyReq)
|
|
||||||
p.Connections -= 1
|
|
||||||
|
|
||||||
//TODO: handle err
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
logrus.WithError(err).Trace("Could not complete request")
|
||||||
|
return nil, goproxy.NewResponse(r, "text/plain", 500, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, resp
|
return nil, resp
|
||||||
@ -133,6 +114,63 @@ func New() *Balancer {
|
|||||||
return balancer
|
return balancer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) {
|
||||||
|
|
||||||
|
p.Connections += 1
|
||||||
|
defer func() {
|
||||||
|
p.Connections -= 1
|
||||||
|
}()
|
||||||
|
retries := 1
|
||||||
|
const maxRetries = 5
|
||||||
|
|
||||||
|
p.waitRateLimit(r)
|
||||||
|
proxyReq := preprocessRequest(cloneRequest(r))
|
||||||
|
|
||||||
|
for {
|
||||||
|
|
||||||
|
if retries > maxRetries {
|
||||||
|
return nil, errors.Errorf("giving up after %d retries", maxRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := p.HttpClient.Do(proxyReq)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if isPermanentError(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
wait := waitTime(retries)
|
||||||
|
|
||||||
|
logrus.WithError(err).WithFields(logrus.Fields{
|
||||||
|
"wait": wait,
|
||||||
|
}).Trace("Temporary error during request")
|
||||||
|
time.Sleep(wait)
|
||||||
|
|
||||||
|
retries += 1
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if isHttpSuccessCode(resp.StatusCode) {
|
||||||
|
|
||||||
|
return resp, nil
|
||||||
|
} else if shouldRetryHttpCode(resp.StatusCode) {
|
||||||
|
|
||||||
|
wait := waitTime(retries)
|
||||||
|
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"wait": wait,
|
||||||
|
"status": resp.StatusCode,
|
||||||
|
}).Trace("HTTP error during request")
|
||||||
|
|
||||||
|
time.Sleep(wait)
|
||||||
|
retries += 1
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
return nil, errors.Errorf("HTTP error: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (b *Balancer) Run() {
|
func (b *Balancer) Run() {
|
||||||
|
|
||||||
addr := flag.String("addr", "localhost:5050", "listen address")
|
addr := flag.String("addr", "localhost:5050", "listen address")
|
||||||
@ -143,7 +181,8 @@ func (b *Balancer) Run() {
|
|||||||
"addr": *addr,
|
"addr": *addr,
|
||||||
}).Info("Listening")
|
}).Info("Listening")
|
||||||
|
|
||||||
go logrus.Fatal(http.ListenAndServe(*addr, b.server))
|
err := http.ListenAndServe(*addr, b.server)
|
||||||
|
logrus.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func preprocessRequest(r *http.Request) *http.Request {
|
func preprocessRequest(r *http.Request) *http.Request {
|
||||||
|
127
retry.go
Normal file
127
retry.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func isPermanentError(err error) bool {
|
||||||
|
|
||||||
|
var opErr *net.OpError
|
||||||
|
|
||||||
|
urlErr, ok := err.(*url.Error)
|
||||||
|
if ok {
|
||||||
|
opErr, ok = urlErr.Err.(*net.OpError)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
netErr, ok := err.(net.Error)
|
||||||
|
if ok {
|
||||||
|
if netErr.Timeout() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
opErr, ok = netErr.(*net.OpError)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//This should not happen...
|
||||||
|
if opErr == nil {
|
||||||
|
logrus.Error("FIXME: isPermanentError; opErr == nil")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if opErr.Op == "proxyconnect" {
|
||||||
|
logrus.Error("Error connecting to the proxy!")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if opErr.Timeout() {
|
||||||
|
// Usually means that there is no route to host
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
switch t := opErr.Err.(type) {
|
||||||
|
case *net.DNSError:
|
||||||
|
return true
|
||||||
|
case *os.SyscallError:
|
||||||
|
|
||||||
|
logrus.Printf("os.SyscallError:%+v", t)
|
||||||
|
|
||||||
|
if errno, ok := t.Err.(syscall.Errno); ok {
|
||||||
|
switch errno {
|
||||||
|
case syscall.ECONNREFUSED:
|
||||||
|
log.Println("connect refused")
|
||||||
|
return true
|
||||||
|
case syscall.ETIMEDOUT:
|
||||||
|
log.Println("timeout")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//todo: handle the other error types
|
||||||
|
fmt.Println("fixme: None of the above")
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitTime(retries int) time.Duration {
|
||||||
|
|
||||||
|
const multiplier = 1.5
|
||||||
|
const wait = int64(5 * time.Second)
|
||||||
|
|
||||||
|
return time.Duration(wait * int64(math.Pow(multiplier, float64(retries))))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Proxy) waitRateLimit(r *http.Request) {
|
||||||
|
|
||||||
|
sHost := simplifyHost(r.Host)
|
||||||
|
|
||||||
|
limiter := p.getLimiter(sHost)
|
||||||
|
reservation := limiter.Reserve()
|
||||||
|
if !reservation.OK() {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"host": sHost,
|
||||||
|
}).Warn("Could not get reservation, make sure that burst is > 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := reservation.Delay()
|
||||||
|
if delay > 0 {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"time": delay,
|
||||||
|
}).Trace("Sleeping")
|
||||||
|
time.Sleep(delay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isHttpSuccessCode(code int) bool {
|
||||||
|
return code >= 200 && code < 300
|
||||||
|
}
|
||||||
|
|
||||||
|
func shouldRetryHttpCode(code int) bool {
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case code == 403:
|
||||||
|
case code == 408:
|
||||||
|
case code == 429:
|
||||||
|
case code == 444:
|
||||||
|
case code == 499:
|
||||||
|
case code >= 500:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
24
test/web.py
Normal file
24
test/web.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
from flask import Flask, Response
|
||||||
|
import time
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/")
|
||||||
|
def hello():
|
||||||
|
time.sleep(90)
|
||||||
|
return "Hello World!"
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/500")
|
||||||
|
def e500():
|
||||||
|
return Response(status=500)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/404")
|
||||||
|
def e404():
|
||||||
|
return Response(status=404)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
app.run(port=9999)
|
Loading…
x
Reference in New Issue
Block a user