diff --git a/.gitignore b/.gitignore index fd594d2..ba48acc 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ *.iml .idea/ +architeuthis diff --git a/README.md b/README.md index d00eaef..a96ed18 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,9 @@ and error handling. Built for automated web scraping. * Seamless exponential backoff retries on timeout or error HTTP codes * Requires no additional configuration for integration into existing programs * Configurable per-host behavior -* Proxy routing (Requests can be forced to use a specific proxy with header param) +* Monitoring with InfluxDB + +![grafana](grafana.png) ### Typical use case ![user_case](use_case.png) @@ -19,19 +21,30 @@ and error handling. Built for automated web scraping. ### Usage ```bash -wget https://simon987.net/data/architeuthis/17_architeuthis.tar.gz -tar -xzf 17_architeuthis.tar.gz +wget https://simon987.net/data/architeuthis/16_architeuthis.tar.gz +tar -xzf 16_architeuthis.tar.gz vim config.json # Configure settings here ./architeuthis ``` +You can add proxies using the `/add_proxy` API: + +```bash +curl http://localhost:5050?url=&name= +``` + +Or automatically using Proxybroker: +```bash +python3 import_from_broker.py +``` + ### Example usage with wget ```bash export http_proxy="http://localhost:5050" # --no-check-certificates is necessary for https mitm # You don't need to specify user-agent if it's already in your config.json -wget -m -np -c --no-check-certificate -R index.html* http://ca.releases.ubuntu.com/ +wget -m -np -c --no-check-certificate -R index.html* http http://ca.releases.ubuntu.com/ ``` With `"every": "500ms"` and a single proxy, you should see @@ -49,89 +62,6 @@ level=trace msg=Sleeping wait=433.394361ms ... ``` -### Proxy routing - -To use routing, enable the `routing` parameter in the configuration file. - -**Explicitly choose proxy** - -You can force a request to go through a specific proxy by using the `X-Architeuthis-Proxy` header. -When specified and `routing` is -enabled in the config file, the request will use the proxy with the -matching name. - -Example: - -in `config.json`: -``` -... - routing: true, - "proxies": [ - { - "name": "p0", - "url": "" - }, - { - "name": "p1", - "url": "" - }, - ... - ], -``` - -This request will *always* be routed through the **p0** proxy: -```bash -curl https://google.ca/ -k -H "X-Architeuthis-Proxy: p0" -``` - -Invalid/blank values are silently ignored; the request will be routed -according to the usual load balancer rules. - -**Hashed routing** - -You can also use the `X-Architeuthis-Hash` header to specify an abitrary string. -The string will be hashed and uniformly routed to its corresponding proxy. Unless the number -proxy changes, requests with the same hash value will always be routed to the same proxy. - -Example: - -`X-Architeuthis-Hash: userOne` is guaranteed to always be routed to the same proxy. -`X-Architeuthis-Hash: userTwo` is also guaranteed to always be routed to the same proxy, -but **not necessarily a proxy different than userOne**. - - -**Unique string routing** - -You can use the `X-Architeuthis-Unique` header to specify a unique string that -will be dynamically associated to a single proxy. - -The first time such a request is received, the unique string is bound to a proxy and -will *always* be routed to this proxy. Any other non-empty value for this header will -be routed to another proxy and bound to it. - - This means that you cannot use more unique strings than proxies, -doing so will cause the request to drop and will show the message -`No blank proxies to route this request!`. - -Reloading the configuration or restarting the `architeuthis` instance will clear the -proxy binds. - -Example with configured proxies p0-p3: -``` -msg=Listening addr="localhost:5050" -msg="Bound unique param user1 to p3" -msg="Routing request" conns=0 proxy=p3 url="https://google.ca:443/" -msg="Bound unique param user2 to p2" -msg="Routing request" conns=0 proxy=p2 url="https://google.ca:443/" -msg="Bound unique param user3 to p1" -msg="Routing request" conns=0 proxy=p1 url="https://google.ca:443/" -msg="Bound unique param user4 to p0" -msg="Routing request" conns=0 proxy=p0 url="https://google.ca:443/" -msg="No blank proxies to route this request!" unique param=user5 -``` - -The `X-Architeuthis-*` header *will not* be sent to the remote host. - ### Hot config reload ```bash @@ -178,8 +108,6 @@ Actions | should_retry | Override default retry behavior for http errors (by default it retries on 403,408,429,444,499,>500) | force_retry | Always retry (Up to retries_hard times) | dont_retry | Immediately stop retrying -| multiply_every | Multiply the current limiter's 'every' value by `arg` | `1.5`(float) -| set_every | Set the current limiter's 'every' value to `arg` | `10s`(duration) In the event of a temporary network error, `should_retry` is ignored (it will always retry unless `dont_retry` is set) @@ -195,18 +123,6 @@ Note that having too many rules for one host might negatively impact performance "wait": "4s", "multiplier": 2.5, "retries": 3, - "retries_hard": 6, - "routing": true, - "proxies": [ - { - "name": "squid_P0", - "url": "http://user:pass@p0.exemple.com:8080" - }, - { - "name": "privoxy_P1", - "url": "http://p1.exemple.com:8080" - } - ], "hosts": [ { "host": "*", @@ -232,14 +148,6 @@ Note that having too many rules for one host might negatively impact performance "rules": [ {"condition": "status=403", "action": "dont_retry"} ] - }, - { - "host": ".www.instagram.com", - "every": "4500ms", - "burst": 3, - "rules": [ - {"condition": "body=*please try again in a few minutes*", "action": "multiply_every", "arg": "2"} - ] } ] } diff --git a/config.go b/config.go index ebd99d5..10e9865 100644 --- a/config.go +++ b/config.go @@ -7,7 +7,6 @@ import ( "github.com/pkg/errors" "github.com/ryanuber/go-glob" "github.com/sirupsen/logrus" - "golang.org/x/time/rate" "io/ioutil" "os" "reflect" @@ -17,40 +16,10 @@ import ( "time" ) -type HostConfig struct { - Host string `json:"host"` - EveryStr string `json:"every"` - Burst int `json:"burst"` - Headers map[string]string `json:"headers"` - RawRules []*RawHostRule `json:"rules"` - Every time.Duration - Rules []*HostRule -} - -type RawHostRule struct { - Condition string `json:"condition"` - Action string `json:"action"` - Arg string `json:"arg"` -} - -type HostRuleAction int - -const ( - DontRetry HostRuleAction = 0 - MultiplyEvery HostRuleAction = 1 - SetEvery HostRuleAction = 2 - ForceRetry HostRuleAction = 3 - ShouldRetry HostRuleAction = 4 -) - func (a HostRuleAction) String() string { switch a { case DontRetry: return "dont_retry" - case MultiplyEvery: - return "multiply_every" - case SetEvery: - return "set_every" case ForceRetry: return "force_retry" case ShouldRetry: @@ -59,63 +28,21 @@ func (a HostRuleAction) String() string { return "???" } -type HostRule struct { - Matches func(r *RequestCtx) bool - Action HostRuleAction - Arg float64 -} - -type ProxyConfig struct { - Name string `json:"name"` - Url string `json:"url"` -} - -var config struct { - Addr string `json:"addr"` - TimeoutStr string `json:"timeout"` - WaitStr string `json:"wait"` - Multiplier float64 `json:"multiplier"` - Retries int `json:"retries"` - RetriesHard int `json:"retries_hard"` - Hosts []*HostConfig `json:"hosts"` - Proxies []ProxyConfig `json:"proxies"` - Wait int64 - Timeout time.Duration - DefaultConfig *HostConfig - Routing bool -} - func parseRule(raw *RawHostRule) (*HostRule, error) { rule := &HostRule{} - var err error switch raw.Action { case "should_retry": rule.Action = ShouldRetry case "dont_retry": rule.Action = DontRetry - case "multiply_every": - rule.Action = MultiplyEvery - rule.Arg, err = strconv.ParseFloat(raw.Arg, 64) - case "set_every": - rule.Action = SetEvery - var duration time.Duration - duration, err = time.ParseDuration(raw.Arg) - if err != nil { - return nil, err - } - rule.Arg = 1 / duration.Seconds() case "force_retry": rule.Action = ForceRetry default: return nil, errors.Errorf("Invalid argument for action: %s", raw.Action) } - if err != nil { - return nil, err - } - switch { case strings.Contains(raw.Condition, "!="): op1Str, op2Str := split(raw.Condition, "!=") @@ -125,12 +52,12 @@ func parseRule(raw *RawHostRule) (*HostRule, error) { } if isGlob(op2Str) { - rule.Matches = func(ctx *RequestCtx) bool { + rule.Matches = func(ctx *ResponseCtx) bool { return !glob.Glob(op2Str, op1Func(ctx)) } } else { op2Str = strings.Replace(op2Str, "\\*", "*", -1) - rule.Matches = func(ctx *RequestCtx) bool { + rule.Matches = func(ctx *ResponseCtx) bool { return op1Func(ctx) != op2Str } } @@ -142,12 +69,12 @@ func parseRule(raw *RawHostRule) (*HostRule, error) { } if isGlob(op2Str) { - rule.Matches = func(ctx *RequestCtx) bool { + rule.Matches = func(ctx *ResponseCtx) bool { return glob.Glob(op2Str, op1Func(ctx)) } } else { op2Str = strings.Replace(op2Str, "\\*", "*", -1) - rule.Matches = func(ctx *RequestCtx) bool { + rule.Matches = func(ctx *ResponseCtx) bool { return op1Func(ctx) == op2Str } } @@ -162,7 +89,7 @@ func parseRule(raw *RawHostRule) (*HostRule, error) { return nil, err } - rule.Matches = func(ctx *RequestCtx) bool { + rule.Matches = func(ctx *ResponseCtx) bool { op1Num, err := strconv.ParseFloat(op1Func(ctx), 64) handleRuleErr(err) return op1Num > op2Num @@ -178,7 +105,7 @@ func parseRule(raw *RawHostRule) (*HostRule, error) { return nil, err } - rule.Matches = func(ctx *RequestCtx) bool { + rule.Matches = func(ctx *ResponseCtx) bool { op1Num, err := strconv.ParseFloat(op1Func(ctx), 64) handleRuleErr(err) return op1Num < op2Num @@ -214,10 +141,10 @@ func parseOperand2(op1, op2 string) (float64, error) { return strconv.ParseFloat(op2, 64) } -func parseOperand1(op string) func(ctx *RequestCtx) string { +func parseOperand1(op string) func(ctx *ResponseCtx) string { switch { case op == "body": - return func(ctx *RequestCtx) string { + return func(ctx *ResponseCtx) string { if ctx.Response == nil { return "" @@ -235,19 +162,19 @@ func parseOperand1(op string) func(ctx *RequestCtx) string { return string(bodyBytes) } case op == "status": - return func(ctx *RequestCtx) string { + return func(ctx *ResponseCtx) string { if ctx.Response == nil { return "" } return strconv.Itoa(ctx.Response.StatusCode) } case op == "response_time": - return func(ctx *RequestCtx) string { - return strconv.FormatFloat(time.Now().Sub(ctx.RequestTime).Seconds(), 'f', 6, 64) + return func(ctx *ResponseCtx) string { + return strconv.FormatFloat(ctx.ResponseTime, 'f', 6, 64) } case strings.HasPrefix(op, "header:"): header := op[strings.Index(op, ":")+1:] - return func(ctx *RequestCtx) string { + return func(ctx *ResponseCtx) string { if ctx.Response == nil { return "" } @@ -356,49 +283,8 @@ func validateConfig() { } } -func applyConfig(proxy *Proxy) { - - //Reverse order - for i := len(config.Hosts) - 1; i >= 0; i-- { - - conf := config.Hosts[i] - - proxy.Limiters = append(proxy.Limiters, &ExpiringLimiter{ - HostGlob: conf.Host, - IsGlob: isGlob(conf.Host), - Limiter: rate.NewLimiter(rate.Every(conf.Every), conf.Burst), - LastRead: time.Now(), - CanDelete: false, - }) - } -} - -func (b *Balancer) reloadConfig() { - - b.proxyMutex.Lock() - err := loadConfig() - if err != nil { - panic(err) - } - - if b.proxies != nil { - b.proxies = b.proxies[:0] - } - - for _, proxyConf := range config.Proxies { - proxy, err := NewProxy(proxyConf.Name, proxyConf.Url) - handleErr(err) - b.proxies = append(b.proxies, proxy) - - applyConfig(proxy) - - logrus.WithFields(logrus.Fields{ - "name": proxy.Name, - "url": proxy.Url, - }).Info("Proxy") - } - b.proxyMutex.Unlock() - +func (a *Architeuthis) reloadConfig() { + _ = loadConfig() logrus.Info("Reloaded config") } diff --git a/config.json b/config.json index 68882ff..e0ced5b 100644 --- a/config.json +++ b/config.json @@ -1,47 +1,27 @@ { "addr": "localhost:5050", "timeout": "15s", - "wait": "4s", - "multiplier": 2.5, + "wait": "0.5s", + "multiplier": 1, "retries": 3, - "retries_hard": 6, - "routing": true, - "proxies": [ - { - "name": "p0", - "url": "" - }, - { - "name": "p1", - "url": "" - }, - { - "name": "p2", - "url": "" - }, - { - "name": "p3", - "url": "" - } - ], + "max_error": 0.4, + "redis_url": "localhost:6379", "hosts": [ { "host": "*", - "every": "125ms", - "burst": 25, + "every": "1ms", + "burst": 1, "headers": { - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Cache-Control": "max-age=0", "Connection": "keep-alive", - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0" + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:70.0) Gecko/20100101 Firefox/70.0" }, "rules": [ - {"condition": "response_time>10s", "action": "dont_retry"} ] }, { "host": ".i.imgur.com", - "every": "100ms", + "every": "1s", "burst": 1, "headers": { "User-Agent": "curl/7.65.1" @@ -49,7 +29,7 @@ }, { "host": "*.reddit.com", - "every": "2s", + "every": "1s", "burst": 1 }, { diff --git a/cron.go b/cron.go new file mode 100644 index 0000000..5d716b3 --- /dev/null +++ b/cron.go @@ -0,0 +1,54 @@ +package main + +import ( + "github.com/robfig/cron" + "github.com/sirupsen/logrus" + "sync" + "time" +) + +func (a *Architeuthis) setupProxyReviver() { + + const gcInterval = time.Minute * 10 + + gcCron := cron.New() + gcSchedule := cron.Every(gcInterval) + gcCron.Schedule(gcSchedule, cron.FuncJob(a.reviveProxies)) + + go gcCron.Run() + + logrus.WithFields(logrus.Fields{ + "every": gcInterval, + }).Info("Started proxy revive cron") +} + +func (a *Architeuthis) testUrl(ch chan *Proxy, url string, wg sync.WaitGroup) { + + for p := range ch { + r, _ := p.HttpClient.Get(url) + + if r != nil && isHttpSuccessCode(r.StatusCode) { + a.setAlive(p.Name) + } + } + wg.Done() +} + +func (a *Architeuthis) reviveProxies() { + + wg := sync.WaitGroup{} + const checkers = 50 + wg.Add(checkers) + + ch := make(chan *Proxy, checkers) + + for i := 0; i < checkers; i++ { + go a.testUrl(ch, "https://google.com/", wg) + } + + for _, p := range a.GetDeadProxies() { + ch <- p + } + + wg.Wait() +} diff --git a/retry.go b/errors.go similarity index 70% rename from retry.go rename to errors.go index e68f9fa..585424c 100644 --- a/retry.go +++ b/errors.go @@ -13,6 +13,33 @@ import ( "time" ) +func shouldBlameProxy(rCtx *ResponseCtx) bool { + + if rCtx.Response != nil { + return shouldBlameProxyHttpCode(rCtx.Response.StatusCode) + } else { + //TODO: don't blame proxy for timeout? + return true + } +} + +func isProxyError(err error) bool { + + urlErr, ok := err.(*url.Error) + if ok { + opErr, ok := urlErr.Err.(*net.OpError) + if ok { + if opErr.Op == "proxyconnect" { + return true + } + if opErr.Op == "local error" { + return true + } + } + } + return false +} + func isPermanentError(err error) bool { var opErr *net.OpError @@ -27,6 +54,11 @@ func isPermanentError(err error) bool { } return false } + + if opErr.Err.Error() == "Internal Privoxy Error" { + return true + } + } else { _, ok := err.(net.Error) if ok { @@ -40,11 +72,6 @@ func isPermanentError(err error) bool { return false } - if opErr.Op == "proxyconnect" { - logrus.Error("Error connecting to the proxy!") - return true - } - if opErr.Timeout() { // Usually means that there is no route to host return true @@ -54,9 +81,6 @@ func isPermanentError(err error) bool { case *net.DNSError: return true case *os.SyscallError: - - logrus.Printf("os.SyscallError:%+v", t) - if errno, ok := t.Err.(syscall.Errno); ok { switch errno { case syscall.ECONNREFUSED: @@ -65,30 +89,28 @@ func isPermanentError(err error) bool { case syscall.ETIMEDOUT: log.Println("timeout") return false + case syscall.ECONNRESET: + log.Println("connection reset by peer") + return false } } } - //todo: handle the other error types fmt.Println("fixme: None of the above") return false } -func waitTime(retries int) time.Duration { - +func getWaitTime(retries int) time.Duration { return time.Duration(config.Wait * int64(math.Pow(config.Multiplier, float64(retries)))) } func (p *Proxy) waitRateLimit(limiter *rate.Limiter) { reservation := limiter.Reserve() - delay := reservation.Delay() + if delay > 0 { - logrus.WithFields(logrus.Fields{ - "wait": delay, - }).Trace("Sleeping") time.Sleep(delay) } } @@ -97,6 +119,16 @@ func isHttpSuccessCode(code int) bool { return code >= 200 && code < 300 } +func shouldBlameProxyHttpCode(code int) bool { + + switch { + case code >= 500: + return false + default: + return true + } +} + func shouldRetryHttpCode(code int) bool { switch { diff --git a/gc.go b/gc.go deleted file mode 100644 index 041a876..0000000 --- a/gc.go +++ /dev/null @@ -1,61 +0,0 @@ -package main - -import ( - "github.com/robfig/cron" - "github.com/sirupsen/logrus" - "time" -) - -func (b *Balancer) setupGarbageCollector() { - - const gcInterval = time.Minute * 5 - - gcCron := cron.New() - gcSchedule := cron.Every(gcInterval) - gcCron.Schedule(gcSchedule, cron.FuncJob(b.cleanAllExpiredLimits)) - - go gcCron.Run() - - logrus.WithFields(logrus.Fields{ - "every": gcInterval, - }).Info("Started task cleanup cron") -} - -func (b *Balancer) cleanAllExpiredLimits() { - before := 0 - after := 0 - - b.proxyMutex.RLock() - for _, p := range b.proxies { - before += len(p.Limiters) - cleanExpiredLimits(p) - after += len(p.Limiters) - } - b.proxyMutex.RUnlock() - - logrus.WithFields(logrus.Fields{ - "removed": before - after, - }).Info("Cleaned up limiters") -} - -func cleanExpiredLimits(proxy *Proxy) { - - const ttl = time.Hour - - var limits []*ExpiringLimiter - now := time.Now() - - for host, limiter := range proxy.Limiters { - if now.Sub(limiter.LastRead) > ttl && limiter.CanDelete { - logrus.WithFields(logrus.Fields{ - "proxy": proxy.Name, - "limiter": host, - "last_read": now.Sub(limiter.LastRead), - }).Trace("Pruning limiter") - } else { - limits = append(limits, limiter) - } - } - - proxy.Limiters = limits -} diff --git a/grafana.png b/grafana.png new file mode 100644 index 0000000..94c9fab Binary files /dev/null and b/grafana.png differ diff --git a/grafana/model.json b/grafana/model.json new file mode 100644 index 0000000..a2ddcb6 --- /dev/null +++ b/grafana/model.json @@ -0,0 +1,778 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": 1, + "links": [], + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 14, + "x": 0, + "y": 0 + }, + "id": 6, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "request.count" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "1m" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "request", + "orderByTime": "ASC", + "policy": "default", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "status" + ], + "type": "field" + }, + { + "params": [], + "type": "count" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Requests / minute", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 10, + "x": 14, + "y": 0 + }, + "id": 5, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "good", + "color": "#73BF69" + }, + { + "alias": "bad", + "color": "#F2495C", + "lines": false, + "pointradius": 4, + "points": true + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "alias": "good", + "groupBy": [ + { + "params": [ + "1m" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "request", + "orderByTime": "ASC", + "policy": "default", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "latency" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [ + { + "key": "ok", + "operator": "=", + "value": "true" + } + ] + }, + { + "alias": "bad", + "groupBy": [ + { + "params": [ + "1m" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "request", + "orderByTime": "ASC", + "policy": "default", + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "latency" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [ + { + "key": "ok", + "operator": "=", + "value": "false" + } + ] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Latency", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 8, + "x": 0, + "y": 9 + }, + "id": 3, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "request.sum", + "color": "#A352CC" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "1m" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "request", + "orderByTime": "ASC", + "policy": "default", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "size" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + }, + { + "params": [ + "/60" + ], + "type": "math" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Bandwidth", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "Bps", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 8, + "x": 8, + "y": 9 + }, + "id": 2, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "connected", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "count", + "color": "#5794F2" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "alias": "count", + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "add_proxy", + "orderByTime": "ASC", + "policy": "default", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "newCount" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Proxy count", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": true, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 9, + "w": 8, + "x": 16, + "y": 9 + }, + "id": 4, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": false, + "linewidth": 1, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "retry", + "color": "#FF9830" + }, + { + "alias": "rate", + "color": "#5794F2" + } + ], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "alias": "retry", + "groupBy": [ + { + "params": [ + "1m" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "sleep", + "orderByTime": "ASC", + "policy": "default", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "duration" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [ + { + "key": "context", + "operator": "=", + "value": "retry" + } + ] + }, + { + "alias": "rate", + "groupBy": [ + { + "params": [ + "1m" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "measurement": "sleep", + "orderByTime": "ASC", + "policy": "default", + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "duration" + ], + "type": "field" + }, + { + "params": [], + "type": "sum" + } + ] + ], + "tags": [ + { + "key": "context", + "operator": "=", + "value": "rate" + } + ] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Sleep times", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": false + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + } + ], + "refresh": "5s", + "schemaVersion": 20, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-3h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ] + }, + "timezone": "", + "title": "Architeuthis", + "uid": "I2xEOnbZk", + "version": 11 +} \ No newline at end of file diff --git a/import_from_broker.py b/import_from_broker.py new file mode 100644 index 0000000..aa04d11 --- /dev/null +++ b/import_from_broker.py @@ -0,0 +1,31 @@ +import asyncio + +import requests +from proxybroker import Broker, Checker + +ARCHITEUTHIS_URL = "http://localhost:5050" + + +def add_to_architeuthis(name, url): + r = requests.get(ARCHITEUTHIS_URL + "/add_proxy?name=%s&url=%s" % (name, url)) + print("ADD %s <%d>" % (name, r.status_code)) + + +async def add(proxies): + while True: + proxy = await proxies.get() + if proxy is None: + break + + url = "http://%s:%d" % (proxy.host, proxy.port) + name = "%s_%d" % (proxy.host, proxy.port) + + add_to_architeuthis(name, url) + + +proxies = asyncio.Queue() +broker = Broker(proxies) +tasks = asyncio.gather(broker.find(types=['HTTPS'], limit=300), add(proxies)) + +loop = asyncio.get_event_loop() +loop.run_until_complete(tasks) diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..43fbe74 --- /dev/null +++ b/influxdb.go @@ -0,0 +1,111 @@ +package main + +import ( + influx "github.com/influxdata/influxdb1-client/v2" + "github.com/sirupsen/logrus" + "strconv" + "time" +) + +const InfluxDbBufferSize = 100 +const InfluxDbDatabase = "architeuthis" +const InfluxDBRetentionPolicy = "" + +func (a *Architeuthis) asyncWriter(metrics chan *influx.Point) { + + logrus.Trace("Started async influxdb writer") + + bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{ + Database: InfluxDbDatabase, + RetentionPolicy: InfluxDBRetentionPolicy, + }) + + for point := range metrics { + bp.AddPoint(point) + + if len(bp.Points()) >= InfluxDbBufferSize { + flushQueue(a.influxdb, &bp) + } + } + flushQueue(a.influxdb, &bp) +} + +func flushQueue(influxdb influx.Client, bp *influx.BatchPoints) { + + err := influxdb.Write(*bp) + + if err != nil { + logrus.WithError(err).Error("influxdb write") + return + } + + logrus.WithFields(logrus.Fields{ + "size": len((*bp).Points()), + }).Trace("Wrote points") + + *bp, _ = influx.NewBatchPoints(influx.BatchPointsConfig{ + Database: InfluxDbDatabase, + RetentionPolicy: InfluxDBRetentionPolicy, + }) +} + +func (a *Architeuthis) writeMetricProxyCount(newCount int) { + point, _ := influx.NewPoint( + "add_proxy", + nil, + map[string]interface{}{ + "newCount": newCount, + }, + time.Now(), + ) + a.points <- point +} + +func (a *Architeuthis) writeMetricRequest(ctx ResponseCtx) { + + var fields map[string]interface{} + + if ctx.Response != nil { + + size, _ := strconv.ParseInt(ctx.Response.Header.Get("Content-Length"), 10, 64) + + fields = map[string]interface{}{ + "status": ctx.Response.StatusCode, + "latency": ctx.ResponseTime, + "size": size, + } + } else { + fields = map[string]interface{}{} + } + + var ok string + if ctx.Error == nil { + ok = "true" + } else { + ok = "false" + } + + point, _ := influx.NewPoint( + "request", + map[string]string{ + "ok": ok, + }, + fields, + time.Now(), + ) + a.points <- point +} + +func (a *Architeuthis) writeMetricSleep(duration time.Duration, tag string) { + point, _ := influx.NewPoint( + "sleep", + map[string]string{ + "context": tag, + }, + map[string]interface{}{ + "duration": duration.Seconds(), + }, + time.Now(), + ) + a.points <- point +} diff --git a/jenkins/Jenkinsfile b/jenkins/Jenkinsfile index 8f85688..8db8173 100644 --- a/jenkins/Jenkinsfile +++ b/jenkins/Jenkinsfile @@ -6,6 +6,7 @@ remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa' remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts' remote.allowAnyHosts = true logLevel = 'FINER' +remote.port = 2299 pipeline { agent none diff --git a/main.go b/main.go index dce170e..e8a9df1 100644 --- a/main.go +++ b/main.go @@ -2,473 +2,292 @@ package main import ( "fmt" - "github.com/dchest/siphash" "github.com/elazarl/goproxy" + "github.com/go-redis/redis" + influx "github.com/influxdata/influxdb1-client/v2" "github.com/pkg/errors" - "github.com/ryanuber/go-glob" "github.com/sirupsen/logrus" - "golang.org/x/time/rate" - "math" - "math/rand" + "html/template" "net/http" - "net/url" - "sort" "strings" - "sync" - "sync/atomic" "time" ) -type Balancer struct { - server *goproxy.ProxyHttpServer - proxies []*Proxy - proxyMutex *sync.RWMutex -} +func New() *Architeuthis { -type ExpiringLimiter struct { - HostGlob string - IsGlob bool - CanDelete bool - Limiter *rate.Limiter - LastRead time.Time -} + a := new(Architeuthis) -type Proxy struct { - Name string - Url *url.URL - Limiters []*ExpiringLimiter - HttpClient *http.Client - Connections *int32 - UniqueParam string -} + a.redis = redis.NewClient(&redis.Options{ + Addr: config.RedisUrl, + Password: "", + DB: 0, + }) -type RequestCtx struct { - RequestTime time.Time - Response *http.Response -} + a.setupProxyReviver() -type ByConnectionCount []*Proxy + var err error + const InfluxDBUrl = "http://localhost:8086" + a.influxdb, err = influx.NewHTTPClient(influx.HTTPConfig{ + Addr: InfluxDBUrl, + }) -func (a ByConnectionCount) Len() int { - return len(a) -} - -func (a ByConnectionCount) Swap(i, j int) { - a[i], a[j] = a[j], a[i] -} - -func (a ByConnectionCount) Less(i, j int) bool { - return *a[i].Connections < *a[j].Connections -} - -func (p *Proxy) getLimiter(host string) *rate.Limiter { - - for _, limiter := range p.Limiters { - if limiter.IsGlob { - if glob.Glob(limiter.HostGlob, host) { - limiter.LastRead = time.Now() - return limiter.Limiter - } - } else if limiter.HostGlob == host { - limiter.LastRead = time.Now() - return limiter.Limiter - } + _, err = http.Post(InfluxDBUrl+"/query", "application/x-www-form-urlencoded", strings.NewReader("q=CREATE DATABASE \"architeuthis\"")) + if err != nil { + panic(err) } - newExpiringLimiter := p.makeNewLimiter(host) - return newExpiringLimiter.Limiter -} + a.points = make(chan *influx.Point, InfluxDbBufferSize) -func (p *Proxy) makeNewLimiter(host string) *ExpiringLimiter { + go a.asyncWriter(a.points) - newExpiringLimiter := &ExpiringLimiter{ - CanDelete: false, - HostGlob: host, - IsGlob: false, - LastRead: time.Now(), - Limiter: rate.NewLimiter(rate.Every(config.DefaultConfig.Every), config.DefaultConfig.Burst), - } + a.server = goproxy.NewProxyHttpServer() + a.server.OnRequest().HandleConnect(goproxy.AlwaysMitm) - p.Limiters = append([]*ExpiringLimiter{newExpiringLimiter}, p.Limiters...) - - logrus.WithFields(logrus.Fields{ - "host": host, - "every": config.DefaultConfig.Every, - }).Trace("New limiter") - - return newExpiringLimiter -} - -func simplifyHost(host string) string { - - col := strings.LastIndex(host, ":") - if col > 0 { - host = host[:col] - } - - return "." + host -} - -func (b *Balancer) chooseProxy(r *http.Request) (*Proxy, error) { - - if len(b.proxies) == 0 { - return b.proxies[0], nil - } - - if config.Routing { - routingProxyParam := r.Header.Get("X-Architeuthis-Proxy") - r.Header.Del("X-Architeuthis-Proxy") - - if routingProxyParam != "" { - p := b.getProxyByNameOrNil(routingProxyParam) - if p != nil { - return p, nil - } - } - - routingHashParam := r.Header.Get("X-Architeuthis-Hash") - r.Header.Del("X-Architeuthis-Hash") - - if routingHashParam != "" { - hash := siphash.Hash(1, 2, []byte(routingHashParam)) - if hash == 0 { - hash = 1 - } - - pIdx := int(float64(hash) / (float64(math.MaxUint64) / float64(len(b.proxies)))) - - logrus.WithFields(logrus.Fields{ - "hash": routingHashParam, - }).Trace("Using hash") - - return b.proxies[pIdx], nil - } - - routingUniqueParam := r.Header.Get("X-Architeuthis-Unique") - r.Header.Del("X-Architeuthis-Unique") - - if routingUniqueParam != "" { - - var blankProxy *Proxy - - for _, p := range b.proxies { - if p.UniqueParam == "" { - blankProxy = p - } else if p.UniqueParam == routingUniqueParam { - return p, nil - } - } - if blankProxy != nil { - blankProxy.UniqueParam = routingUniqueParam - logrus.Infof("Bound unique param %s to %s", routingUniqueParam, blankProxy.Name) - return blankProxy, nil - } else { - logrus.WithField("unique param", routingUniqueParam).Error("No blank proxies to route this request!") - return nil, errors.Errorf("No blank proxies to route this request! unique param: %s", routingUniqueParam) - } - } - } - - sort.Sort(ByConnectionCount(b.proxies)) - - proxyWithLeastConns := b.proxies[0] - proxiesWithSameConnCount := b.getProxiesWithSameConnCountAs(proxyWithLeastConns) - - if len(proxiesWithSameConnCount) > 1 { - return proxiesWithSameConnCount[rand.Intn(len(proxiesWithSameConnCount))], nil - } else { - return proxyWithLeastConns, nil - } -} - -func (b *Balancer) getProxyByNameOrNil(routingParam string) *Proxy { - if routingParam != "" { - for _, p := range b.proxies { - if p.Name == routingParam { - return p - } - } - } - - return nil -} - -func (b *Balancer) getProxiesWithSameConnCountAs(p0 *Proxy) []*Proxy { - - proxiesWithSameConnCount := make([]*Proxy, 0) - for _, p := range b.proxies { - if *p.Connections != *p0.Connections { - break - } - proxiesWithSameConnCount = append(proxiesWithSameConnCount, p) - } - return proxiesWithSameConnCount -} - -func New() *Balancer { - - balancer := new(Balancer) - - balancer.proxyMutex = &sync.RWMutex{} - balancer.server = goproxy.NewProxyHttpServer() - - balancer.server.OnRequest().HandleConnect(goproxy.AlwaysMitm) - - balancer.server.OnRequest().DoFunc( + a.server.OnRequest().DoFunc( func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { - balancer.proxyMutex.RLock() - defer balancer.proxyMutex.RUnlock() - p, err := balancer.chooseProxy(r) - - if err != nil { - return nil, goproxy.NewResponse(r, "text/plain", 500, err.Error()) - } - - logrus.WithFields(logrus.Fields{ - "proxy": p.Name, - "conns": *p.Connections, - "url": r.URL, - }).Trace("Routing request") - - resp, err := p.processRequest(r) + resp, err := a.processRequest(r) if err != nil { logrus.WithError(err).Trace("Could not complete request") - return nil, goproxy.NewResponse(r, "text/plain", 500, err.Error()) + return nil, goproxy.NewResponse(r, "text/plain", http.StatusInternalServerError, err.Error()) } return nil, resp }) - balancer.server.NonproxyHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mux := http.NewServeMux() + a.server.NonproxyHandler = mux - if r.URL.Path == "/reload" { - balancer.reloadConfig() - _, _ = fmt.Fprint(w, "Reloaded\n") - } else { - w.Header().Set("Content-Type", "application/json") - _, _ = fmt.Fprint(w, "{\"name\":\"Architeuthis\",\"version\":1.0}") - } + mux.HandleFunc("/reload", func(w http.ResponseWriter, r *http.Request) { + a.reloadConfig() + _, _ = fmt.Fprint(w, "Reloaded\n") }) - return balancer + + templ, _ := template.ParseFiles("templates/stats.html") + + mux.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) { + err = templ.Execute(w, a.getStats()) + }) + + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = fmt.Fprint(w, "{\"name\":\"Architeuthis\",\"version\":2.0}") + }) + + mux.HandleFunc("/add_proxy", func(w http.ResponseWriter, r *http.Request) { + name := r.URL.Query().Get("name") + url := r.URL.Query().Get("url") + + if name == "" || url == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + + err := a.AddProxy(name, url) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "name": name, + "url": url, + }).Error("Could not add proxy") + + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + }) + + return a } -func getConfsMatchingRequest(r *http.Request) []*HostConfig { +func (a *Architeuthis) processRequest(r *http.Request) (*http.Response, error) { - sHost := simplifyHost(r.Host) + sHost := normalizeHost(r.Host) + configs := getConfigsMatchingHost(sHost) - configs := make([]*HostConfig, 0) + options := parseOptions(&r.Header) + proxyReq := applyHeaders(cloneRequest(r), configs) - for _, conf := range config.Hosts { - if glob.Glob(conf.Host, sHost) { - configs = append(configs, conf) - } + requestCtx := RequestCtx{ + Request: proxyReq, + Retries: 0, + RequestTime: time.Now(), + options: options, + configs: configs, } - return configs + for { + responseCtx := a.processRequestWithCtx(&requestCtx) + + a.writeMetricRequest(responseCtx) + + if requestCtx.p != nil { + a.UpdateProxy(requestCtx.p) + } + + if responseCtx.Error == nil { + return responseCtx.Response, nil + } + + if !responseCtx.ShouldRetry { + return nil, responseCtx.Error + } + } } -func applyHeaders(r *http.Request, configs []*HostConfig) *http.Request { - - for _, conf := range configs { - for k, v := range conf.Headers { - r.Header.Set(k, v) - } +func (lim *RedisLimiter) waitRateLimit() (time.Duration, error) { + result, err := lim.Limiter.Allow(lim.Key) + if err != nil { + return 0, err } - return r + if result.RetryAfter > 0 { + time.Sleep(result.RetryAfter) + } + return result.RetryAfter, nil } -func computeRules(ctx *RequestCtx, configs []*HostConfig) (dontRetry, forceRetry bool, - limitMultiplier, newLimit float64, shouldRetry bool) { - dontRetry = false - forceRetry = false - shouldRetry = false - limitMultiplier = 1 +func (a *Architeuthis) processRequestWithCtx(rCtx *RequestCtx) ResponseCtx { - for _, conf := range configs { - for _, rule := range conf.Rules { - if rule.Matches(ctx) { - switch rule.Action { - case DontRetry: - dontRetry = true - case MultiplyEvery: - limitMultiplier = rule.Arg - case SetEvery: - newLimit = rule.Arg - case ForceRetry: - forceRetry = true - case ShouldRetry: - shouldRetry = true - } - } - } + if !rCtx.LastErrorWasProxyError && rCtx.Retries > config.Retries { + return ResponseCtx{Error: errors.Errorf("Giving up after %d retries", rCtx.Retries)} } + name, err := a.ChooseProxy(rCtx) + if err != nil { + return ResponseCtx{Error: err} + } + + logrus.WithFields(logrus.Fields{ + "proxy": name, + "host": rCtx.Request.Host, + }).Info("Routing request") + + p, err := a.GetProxy(name) + if err != nil { + return ResponseCtx{Error: err} + } + + rCtx.p = p + response, err := a.processRequestWithProxy(rCtx) + + responseCtx := ResponseCtx{ + Response: response, + ResponseTime: time.Now().Sub(rCtx.RequestTime).Seconds(), + Error: err, + } + + p.incrReqTime = responseCtx.ResponseTime + + if response != nil && isHttpSuccessCode(response.StatusCode) { + p.incrGood += 1 + return responseCtx + } + + rCtx.LastFailedProxy = p.Name + + if isProxyError(err) { + a.handleFatalProxyError(p) + rCtx.LastErrorWasProxyError = true + responseCtx.ShouldRetry = true + return responseCtx + } + + if err != nil { + if isPermanentError(err) { + a.handleProxyError(p, &responseCtx) + return responseCtx + } + + a.waitAfterFail(rCtx) + a.handleProxyError(p, &responseCtx) + responseCtx.ShouldRetry = true + } + + dontRetry, forceRetry, shouldRetry := computeRules(rCtx, responseCtx) + + if forceRetry { + responseCtx.ShouldRetry = true + return responseCtx + + } else if dontRetry { + responseCtx.Error = errors.Errorf("Applied dont_retry rule") + return responseCtx + } + + if response == nil { + return responseCtx + } + + // Handle HTTP errors + responseCtx.Error = errors.Errorf("HTTP error: %d", response.StatusCode) + + if shouldRetry || shouldRetryHttpCode(response.StatusCode) { + responseCtx.ShouldRetry = true + } + + return responseCtx +} + +func (a *Architeuthis) waitAfterFail(rCtx *RequestCtx) { + wait := getWaitTime(rCtx.Retries) + time.Sleep(wait) + + a.writeMetricSleep(wait, "retry") + + rCtx.Retries += 1 +} + +func isRemoteProxy(p *Proxy) bool { + return p.HttpClient.Transport != nil +} + +func (a *Architeuthis) handleProxyError(p *Proxy, rCtx *ResponseCtx) { + + if isRemoteProxy(p) && shouldBlameProxy(rCtx) { + p.incrBad += 1 + p.BadRequestCount += 1 + } +} + +func (a *Architeuthis) handleFatalProxyError(p *Proxy) { + a.setDead(p.Name) +} + +func (a *Architeuthis) processRequestWithProxy(rCtx *RequestCtx) (r *http.Response, e error) { + + a.incConns(rCtx.p.Name) + + limiter := a.getLimiter(rCtx) + duration, err := limiter.waitRateLimit() + if err != nil { + return nil, err + } + + if duration > 0 { + a.writeMetricSleep(duration, "rate") + } + + r, e = rCtx.p.HttpClient.Do(rCtx.Request) + return } -func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) { +func (a *Architeuthis) Run() { - atomic.AddInt32(p.Connections, 1) - defer func() { - atomic.AddInt32(p.Connections, -1) - }() - retries := 0 - additionalRetries := 0 - - configs := getConfsMatchingRequest(r) - sHost := simplifyHost(r.Host) - limiter := p.getLimiter(sHost) - - proxyReq := applyHeaders(cloneRequest(r), configs) - - for { - p.waitRateLimit(limiter) - - if retries >= config.Retries+additionalRetries || retries > config.RetriesHard { - return nil, errors.Errorf("giving up after %d retries", retries) - } - - ctx := &RequestCtx{ - RequestTime: time.Now(), - } - var err error - ctx.Response, err = p.HttpClient.Do(proxyReq) - - if err != nil { - if isPermanentError(err) { - return nil, err - } - - dontRetry, forceRetry, limitMultiplier, newLimit, _ := computeRules(ctx, configs) - if forceRetry { - additionalRetries += 1 - } else if dontRetry { - return nil, errors.Errorf("Applied dont_retry rule for (%s)", err) - } - p.applyLimiterRules(newLimit, limiter, limitMultiplier) - - wait := waitTime(retries) - logrus.WithError(err).WithFields(logrus.Fields{ - "wait": wait, - }).Trace("Temporary error during request") - time.Sleep(wait) - - retries += 1 - continue - } - - // Compute rules - dontRetry, forceRetry, limitMultiplier, newLimit, shouldRetry := computeRules(ctx, configs) - - if forceRetry { - additionalRetries += 1 - } else if dontRetry { - return nil, errors.Errorf("Applied dont_retry rule") - } - p.applyLimiterRules(newLimit, limiter, limitMultiplier) - - if isHttpSuccessCode(ctx.Response.StatusCode) { - return ctx.Response, nil - - } else if forceRetry || shouldRetry || shouldRetryHttpCode(ctx.Response.StatusCode) { - - wait := waitTime(retries) - - logrus.WithFields(logrus.Fields{ - "wait": wait, - "status": ctx.Response.StatusCode, - }).Trace("HTTP error during request") - - time.Sleep(wait) - retries += 1 - continue - } else { - return nil, errors.Errorf("HTTP error: %d", ctx.Response.StatusCode) - } - } -} - -func (p *Proxy) applyLimiterRules(newLimit float64, limiter *rate.Limiter, limitMultiplier float64) { - if newLimit != 0 { - limiter.SetLimit(rate.Limit(newLimit)) - } else if limitMultiplier != 1 { - limiter.SetLimit(limiter.Limit() * rate.Limit(1/limitMultiplier)) - } -} - -func (b *Balancer) Run() { - - //b.Verbose = true logrus.WithFields(logrus.Fields{ "addr": config.Addr, }).Info("Listening") - err := http.ListenAndServe(config.Addr, b.server) + err := http.ListenAndServe(config.Addr, a.server) logrus.Fatal(err) } -func cloneRequest(r *http.Request) *http.Request { - - proxyReq := &http.Request{ - Method: r.Method, - URL: r.URL, - Proto: "HTTP/1.1", - ProtoMajor: 1, - ProtoMinor: 1, - Header: r.Header, - Body: r.Body, - Host: r.URL.Host, - } - - return proxyReq -} - -func NewProxy(name, stringUrl string) (*Proxy, error) { - - var parsedUrl *url.URL - var err error - if stringUrl != "" { - parsedUrl, err = url.Parse(stringUrl) - if err != nil { - return nil, err - } - } else { - parsedUrl = nil - } - - var httpClient *http.Client - if parsedUrl == nil { - httpClient = &http.Client{} - } else { - httpClient = &http.Client{ - Transport: &http.Transport{ - Proxy: http.ProxyURL(parsedUrl), - }, - } - } - - httpClient.Timeout = config.Timeout - - p := &Proxy{ - Name: name, - Url: parsedUrl, - HttpClient: httpClient, - } - - conns := int32(0) - p.Connections = &conns - return p, nil -} - func main() { logrus.SetLevel(logrus.TraceLevel) balancer := New() balancer.reloadConfig() - balancer.setupGarbageCollector() balancer.Run() } diff --git a/models.go b/models.go new file mode 100644 index 0000000..71843c6 --- /dev/null +++ b/models.go @@ -0,0 +1,212 @@ +package main + +import ( + "github.com/elazarl/goproxy" + redisPackage "github.com/go-redis/redis" + "github.com/go-redis/redis_rate" + influx "github.com/influxdata/influxdb1-client/v2" + "math" + "net/http" + "net/url" + "time" +) + +type Architeuthis struct { + server *goproxy.ProxyHttpServer + redis *redisPackage.Client + influxdb influx.Client + points chan *influx.Point +} + +// Request/Response +type RequestCtx struct { + Request *http.Request + + Retries int + + LastFailedProxy string + p *Proxy + LastErrorWasProxyError bool + + RequestTime time.Time + options RequestOptions + configs []*HostConfig +} + +type ResponseCtx struct { + Response *http.Response + ResponseTime float64 + Error error + ShouldRetry bool +} + +type RequestOptions struct { + DoCloudflareBypass bool +} + +// Proxy +type Proxy struct { + Name string + Url *url.URL + + HttpClient *http.Client + + GoodRequestCount int64 + incrGood int64 + + BadRequestCount int64 + incrBad int64 + + TotalRequestTime float64 + incrReqTime float64 + + Connections int64 + + KillOnError bool +} + +func (p *Proxy) AvgLatency() float64 { + return p.TotalRequestTime / float64(p.GoodRequestCount+p.BadRequestCount) +} + +func (p *Proxy) Score() float64 { + + if p.GoodRequestCount+p.BadRequestCount == 0 { + return 1000 + } + + var errorMod float64 + var latencyMod float64 + + if p.BadRequestCount == 0 { + errorMod = 1 + } else { + errorMod = math.Min(float64(p.GoodRequestCount)/float64(p.BadRequestCount), 1) + } + + avgLatency := p.AvgLatency() + + switch { + case avgLatency < 3: + latencyMod = 1 + case avgLatency < 4: + latencyMod = 0.8 + case avgLatency < 5: + latencyMod = 0.7 + case avgLatency < 9: + latencyMod = 0.6 + case avgLatency < 10: + latencyMod = 0.5 + case avgLatency < 15: + latencyMod = 0.3 + case avgLatency < 20: + latencyMod = 0.1 + default: + latencyMod = 0 + } + + return 600*errorMod + 400*latencyMod - 200*(math.Max(float64(p.Connections-1), 0)) +} + +func (p *Proxy) getStats() proxyStat { + return proxyStat{ + Name: p.Name, + Url: p.Url.String(), + GoodRequestCount: p.GoodRequestCount, + BadRequestCount: p.BadRequestCount, + AvgLatency: p.AvgLatency(), + Connections: p.Connections, + Score: int64(p.Score()), + } +} + +type proxyStat struct { + Name string + Url string + + GoodRequestCount int64 + BadRequestCount int64 + AvgLatency float64 + Connections int64 + Score int64 +} + +type statsData struct { + TotalGood int + TotalBad int + Connections int + AvgLatency float64 + AvgScore float64 + + Proxies []proxyStat +} + +type CheckMethod string + +const ( + CheckIp CheckMethod = "check_ip" + HttpOk CheckMethod = "http_ok" +) + +type ProxyJudge struct { + url *url.URL + method CheckMethod +} + +type RedisLimiter struct { + Key string + Limiter *redis_rate.Limiter +} + +// Config +type HostConfig struct { + Host string `json:"host"` + EveryStr string `json:"every"` + Burst int `json:"burst"` + Headers map[string]string `json:"headers"` + RawRules []*RawHostRule `json:"rules"` + IsGlob bool + Every time.Duration + Rules []*HostRule +} + +type RawHostRule struct { + Condition string `json:"condition"` + Action string `json:"action"` + Arg string `json:"arg"` +} + +type HostRuleAction int + +const ( + DontRetry HostRuleAction = 0 + ForceRetry HostRuleAction = 1 + ShouldRetry HostRuleAction = 2 +) + +type HostRule struct { + Matches func(r *ResponseCtx) bool + Action HostRuleAction + Arg float64 +} + +type ProxyConfig struct { + Name string `json:"name"` + Url string `json:"url"` +} + +var config struct { + Addr string `json:"addr"` + TimeoutStr string `json:"timeout"` + WaitStr string `json:"wait"` + Multiplier float64 `json:"multiplier"` + Retries int `json:"retries"` + MaxErrorRatio float64 `json:"max_error"` + Hosts []*HostConfig `json:"hosts"` + Proxies []ProxyConfig `json:"proxies"` + RedisUrl string `json:"redis_url"` + Wait int64 + Timeout time.Duration + DefaultConfig *HostConfig + Routing bool +} diff --git a/redis.go b/redis.go new file mode 100644 index 0000000..30de073 --- /dev/null +++ b/redis.go @@ -0,0 +1,293 @@ +package main + +import ( + "errors" + "github.com/go-redis/redis" + "github.com/go-redis/redis_rate" + "github.com/sirupsen/logrus" + "math" + "math/rand" + "net/http" + "net/url" + "strconv" +) + +const KeyProxyList = "proxies" +const KeyDeadProxyList = "deadProxies" +const PrefixProxy = "proxy:" + +const KeyConnectionCount = "conn" +const KeyRequestTime = "reqtime" +const KeyBadRequestCount = "bad" +const KeyGoodRequestCount = "good" +const KeyRevived = "revived" +const KeyUrl = "url" + +func (a *Architeuthis) getLimiter(rCtx *RequestCtx) *RedisLimiter { + + var hostConfig *HostConfig + if len(rCtx.configs) == 0 { + hostConfig = config.DefaultConfig + } else { + hostConfig = rCtx.configs[len(rCtx.configs)-1] + } + + return &RedisLimiter{ + Key: hostConfig.Host + ":" + rCtx.p.Name, + Limiter: redis_rate.NewLimiter(a.redis, &redis_rate.Limit{ + Rate: 1, + Period: hostConfig.Every, + Burst: hostConfig.Burst, + }), + } +} + +func (a *Architeuthis) UpdateProxy(p *Proxy) { + + key := PrefixProxy + p.Name + pipe := a.redis.Pipeline() + + if p.incrBad != 0 { + pipe.HIncrBy(key, KeyBadRequestCount, p.incrBad) + p.BadRequestCount += p.incrBad + } else { + pipe.HIncrBy(key, KeyGoodRequestCount, p.incrGood) + p.GoodRequestCount += p.incrGood + + if p.KillOnError { + pipe.HSet(key, KeyRevived, 0) + } + } + + pipe.HIncrByFloat(key, KeyRequestTime, p.incrReqTime) + p.TotalRequestTime += p.incrReqTime + + pipe.HIncrBy(key, KeyConnectionCount, -1) + + pipe.ZAddXX(KeyProxyList, &redis.Z{ + Score: p.Score(), + Member: p.Name, + }) + + _, _ = pipe.Exec() + + newBadRatio := float64(p.BadRequestCount) / float64(p.GoodRequestCount) + + if p.incrBad > 0 && (p.KillOnError || (newBadRatio > config.MaxErrorRatio && p.BadRequestCount >= 5)) { + a.setDead(p.Name) + } +} + +func (a *Architeuthis) AddProxy(name, stringUrl string) error { + + _, err := url.Parse(stringUrl) + if err != nil { + return err + } + + pipe := a.redis.Pipeline() + + pipe.HMSet(PrefixProxy+name, map[string]interface{}{ + KeyUrl: stringUrl, + KeyRequestTime: 0, + KeyGoodRequestCount: 0, + KeyBadRequestCount: 0, + KeyConnectionCount: 0, + KeyRevived: 0, + }) + + zadd := pipe.ZAdd(KeyProxyList, &redis.Z{ + Score: 1000, + Member: name, + }) + + zcard := pipe.ZCard(KeyProxyList) + + _, _ = pipe.Exec() + + if zadd.Val() != 0 { + logrus.WithFields(logrus.Fields{ + KeyUrl: stringUrl, + }).Info("Add proxy") + + a.writeMetricProxyCount(int(zcard.Val())) + } + + return nil +} + +func (a *Architeuthis) incConns(name string) int64 { + res, _ := a.redis.HIncrBy(PrefixProxy+name, KeyConnectionCount, 1).Result() + return res +} + +func (a *Architeuthis) setDead(name string) { + + pipe := a.redis.Pipeline() + + pipe.ZRem(KeyProxyList, name) + pipe.SAdd(KeyDeadProxyList, name) + count := pipe.ZCard(KeyProxyList) + + _, _ = pipe.Exec() + + logrus.WithFields(logrus.Fields{ + "proxy": name, + }).Trace("dead") + + a.writeMetricProxyCount(int(count.Val())) +} + +func (a *Architeuthis) setAlive(name string) { + + pipe := a.redis.Pipeline() + + pipe.SRem(KeyDeadProxyList, name) + pipe.HMSet(KeyProxyList+name, map[string]interface{}{ + KeyRevived: 1, + KeyRequestTime: 0, + KeyGoodRequestCount: 0, + KeyBadRequestCount: 0, + KeyConnectionCount: 0, + }) + pipe.ZAdd(KeyProxyList, &redis.Z{ + Score: 1000, + Member: name, + }) + count := pipe.ZCard(KeyProxyList) + + _, _ = pipe.Exec() + + logrus.WithFields(logrus.Fields{ + "proxy": name, + }).Trace("revive") + + a.writeMetricProxyCount(int(count.Val())) +} + +func (a *Architeuthis) GetDeadProxies() []*Proxy { + + result, err := a.redis.SMembers(KeyDeadProxyList).Result() + if err != nil { + return nil + } + + return a.getProxies(result) +} + +func (a *Architeuthis) GetAliveProxies() []*Proxy { + + result, err := a.redis.ZRange(KeyProxyList, 0, math.MaxInt64).Result() + if err != nil { + return nil + } + + return a.getProxies(result) +} + +func (a *Architeuthis) getProxies(names []string) []*Proxy { + + var proxies []*Proxy + + for _, name := range names { + p, _ := a.GetProxy(name) + if p != nil { + proxies = append(proxies, p) + } + } + + return proxies +} + +func (a *Architeuthis) getStats() statsData { + + data := statsData{} + + var totalTime float64 = 0 + var totalScore int64 = 0 + + for _, p := range a.GetAliveProxies() { + stat := p.getStats() + data.Proxies = append(data.Proxies, stat) + + data.TotalBad += int(p.BadRequestCount) + data.TotalGood += int(p.GoodRequestCount) + data.Connections += int(p.Connections) + + totalTime += p.TotalRequestTime + totalScore += stat.Score + } + + data.AvgLatency = totalTime / float64(data.TotalGood+data.TotalBad) + data.AvgScore = float64(totalScore) / float64(len(data.Proxies)) + + return data +} + +func (a *Architeuthis) GetProxy(name string) (*Proxy, error) { + + result, err := a.redis.HGetAll(PrefixProxy + name).Result() + if err != nil { + return nil, err + } + + var parsedUrl *url.URL + var httpClient *http.Client + + if result[KeyUrl] == "" { + parsedUrl = nil + httpClient = &http.Client{ + Timeout: config.Timeout, + } + } else { + parsedUrl, err = url.Parse(result[KeyUrl]) + if err != nil { + return nil, err + } + + httpClient = &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyURL(parsedUrl), + }, + Timeout: config.Timeout, + } + } + + conns, _ := strconv.ParseInt(result[KeyConnectionCount], 10, 64) + good, _ := strconv.ParseInt(result[KeyGoodRequestCount], 10, 64) + bad, _ := strconv.ParseInt(result[KeyBadRequestCount], 10, 64) + reqtime, _ := strconv.ParseFloat(result[KeyRequestTime], 64) + + return &Proxy{ + Name: name, + Url: parsedUrl, + HttpClient: httpClient, + Connections: conns, + GoodRequestCount: good, + BadRequestCount: bad, + TotalRequestTime: reqtime, + }, nil +} + +func (a *Architeuthis) ChooseProxy(rCtx *RequestCtx) (string, error) { + results, err := a.redis.ZRevRangeWithScores(KeyProxyList, 0, 12).Result() + if err != nil { + return "", err + } + + if len(results) == 0 { + return "", errors.New("no proxies available") + } + + if len(results) == 1 { + return results[0].Member.(string), nil + } + + for { + idx := rand.Intn(len(results)) + + if results[idx].Member != rCtx.LastFailedProxy { + return results[idx].Member.(string), nil + } + } +} diff --git a/templates/stats.html b/templates/stats.html new file mode 100644 index 0000000..a4beb01 --- /dev/null +++ b/templates/stats.html @@ -0,0 +1,53 @@ + + + + + Architeuthis - Stats + + + + + + + + + + + + + + + + + + + {{ range .Proxies}} + + + + + + + + + + {{end}} + + + + + + + + + + + +
ProxyUrlConnsGoodBadLatencyScore
{{ .Name}}{{ .Url}}{{ .Connections}}{{ .GoodRequestCount}}{{ .BadRequestCount}}{{ printf "%.2f" .AvgLatency}}{{ .Score}}
Total{{ .Connections}}{{ .TotalGood}}{{ .TotalBad}}{{ printf "%.2f" .AvgLatency}}{{ printf "%.2f" .AvgScore}}
+ + + \ No newline at end of file diff --git a/util.go b/util.go new file mode 100644 index 0000000..d6bee4f --- /dev/null +++ b/util.go @@ -0,0 +1,93 @@ +package main + +import ( + "github.com/ryanuber/go-glob" + "net/http" + "strings" +) + +func normalizeHost(host string) string { + + col := strings.LastIndex(host, ":") + if col > 0 { + host = host[:col] + } + + return "." + host +} + +func parseOptions(header *http.Header) RequestOptions { + + opts := RequestOptions{} + + cfParam := header.Get("X-Architeuthis-CF-Bypass") + if cfParam != "" { + header.Del("X-Architeuthis-CF-Bypass") + opts.DoCloudflareBypass = true + } + + return opts +} + +func getConfigsMatchingHost(sHost string) []*HostConfig { + + configs := make([]*HostConfig, 0) + + for _, conf := range config.Hosts { + if glob.Glob(conf.Host, sHost) { + configs = append(configs, conf) + } + } + + return configs +} + +func applyHeaders(r *http.Request, configs []*HostConfig) *http.Request { + + for _, conf := range configs { + for k, v := range conf.Headers { + r.Header.Set(k, v) + } + } + + return r +} + +func computeRules(requestCtx *RequestCtx, responseCtx ResponseCtx) (dontRetry, forceRetry bool, shouldRetry bool) { + dontRetry = false + forceRetry = false + shouldRetry = false + + for _, conf := range requestCtx.configs { + for _, rule := range conf.Rules { + if rule.Matches(&responseCtx) { + switch rule.Action { + case DontRetry: + dontRetry = true + case ForceRetry: + forceRetry = true + case ShouldRetry: + shouldRetry = true + } + } + } + } + + return +} + +func cloneRequest(r *http.Request) *http.Request { + + proxyReq := &http.Request{ + Method: r.Method, + URL: r.URL, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Header: r.Header, + Body: r.Body, + Host: r.URL.Host, + } + + return proxyReq +}