From 37fbfe130ec5c0173a9a0e4cea7d31553a1e4708 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 18 Jul 2019 15:37:40 -0400 Subject: [PATCH] Request routing. walker76/stackoversight#7 --- README.md | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++--- config.go | 2 +- config.json | 31 +++++++++++++++++- main.go | 87 +++++++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 198 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index bda7aa1..beb93f6 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,6 @@ ![GitHub](https://img.shields.io/github/license/simon987/Architeuthis.svg) [![Build Status](https://ci.simon987.net/buildStatus/icon?job=architeuthis_builds)](https://ci.simon987.net/job/architeuthis_builds/) -*NOTE: this is very WIP* - HTTP(S) proxy with integrated load-balancing, rate-limiting and error handling. Built for automated web scraping. @@ -13,6 +11,7 @@ and error handling. Built for automated web scraping. * Seamless exponential backoff retries on timeout or error HTTP codes * Requires no additional configuration for integration into existing programs * Configurable per-host behavior +* Proxy routing (Requests can be forced to use a specific proxy with header param) ### Typical use case ![user_case](use_case.png) @@ -20,8 +19,8 @@ and error handling. Built for automated web scraping. ### Usage ```bash -wget https://simon987.net/data/architeuthis/15_architeuthis.tar.gz -tar -xzf 15_architeuthis.tar.gz +wget https://simon987.net/data/architeuthis/16_architeuthis.tar.gz +tar -xzf 16_architeuthis.tar.gz vim config.json # Configure settings here ./architeuthis @@ -50,6 +49,89 @@ level=trace msg=Sleeping wait=433.394361ms ... ``` +### Proxy routing + +To use routing, enable the `routing` parameter in the configuration file. + +**Explicitly choose proxy** + +You can force a request to go through a specific proxy by using the `X-Architeuthis-Proxy` header. +When specified and `routing` is +enabled in the config file, the request will use the proxy with the +matching name. + +Example: + +in `config.json`: +``` +... + routing: true, + "proxies": [ + { + "name": "p0", + "url": "" + }, + { + "name": "p1", + "url": "" + }, + ... + ], +``` + +This request will *always* be routed through the **p0** proxy: +```bash +curl https://google.ca/ -k -H "X-Architeuthis-Proxy: p0" +``` + +Invalid/blank values are silently ignored; the request will be routed +according to the usual load balancer rules. + +**Hashed routing** + +You can also use the `X-Architeuthis-Hash` header to specify an abitrary string. +The string will be hashed and uniformly routed to its corresponding proxy. Unless the number +proxy changes, requests with the same hash value will always be routed to the same proxy. + +Example: + +`X-Architeuthis-Hash: userOne` is guaranteed to always be routed to the same proxy. +`X-Architeuthis-Hash: userTwo` is also guaranteed to always be routed to the same proxy, +but **not necessarily a proxy different than userOne**. + + +**Unique string routing** + +You can use the `X-Architeuthis-Unique` header to specify a unique string that +will be dynamically associated to a single proxy. + +The first time such a request is received, the unique string is bound to a proxy and +will *always* be routed to this proxy. Any other non-empty value for this header will +be routed to another proxy and bound to it. + + This means that you cannot use more unique strings than proxies, +doing so will cause the request to drop and will show the message +`No blank proxies to route this request!`. + +Reloading the configuration or restarting the `architeuthis` instance will clear the +proxy binds. + +Example with configured proxies p0-p3: +``` +msg=Listening addr="localhost:5050" +msg="Bound unique param user1 to p3" +msg="Routing request" conns=0 proxy=p3 url="https://google.ca:443/" +msg="Bound unique param user2 to p2" +msg="Routing request" conns=0 proxy=p2 url="https://google.ca:443/" +msg="Bound unique param user3 to p1" +msg="Routing request" conns=0 proxy=p1 url="https://google.ca:443/" +msg="Bound unique param user4 to p0" +msg="Routing request" conns=0 proxy=p0 url="https://google.ca:443/" +msg="No blank proxies to route this request!" unique param=user5 +``` + +The `X-Architeuthis-*` header *will not* be sent to the remote host. + ### Hot config reload ```bash @@ -114,6 +196,7 @@ Note that having too many rules for one host might negatively impact performance "multiplier": 2.5, "retries": 3, "retries_hard": 6, + "routing": true, "proxies": [ { "name": "squid_P0", diff --git a/config.go b/config.go index 7938e7d..ebd99d5 100644 --- a/config.go +++ b/config.go @@ -82,10 +82,10 @@ var config struct { Wait int64 Timeout time.Duration DefaultConfig *HostConfig + Routing bool } func parseRule(raw *RawHostRule) (*HostRule, error) { - //TODO: for the love of god someone please refactor this func rule := &HostRule{} var err error diff --git a/config.json b/config.json index 947b6b7..68882ff 100644 --- a/config.json +++ b/config.json @@ -5,16 +5,29 @@ "multiplier": 2.5, "retries": 3, "retries_hard": 6, + "routing": true, "proxies": [ { "name": "p0", "url": "" + }, + { + "name": "p1", + "url": "" + }, + { + "name": "p2", + "url": "" + }, + { + "name": "p3", + "url": "" } ], "hosts": [ { "host": "*", - "every": "500ms", + "every": "125ms", "burst": 25, "headers": { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", @@ -26,6 +39,14 @@ {"condition": "response_time>10s", "action": "dont_retry"} ] }, + { + "host": ".i.imgur.com", + "every": "100ms", + "burst": 1, + "headers": { + "User-Agent": "curl/7.65.1" + } + }, { "host": "*.reddit.com", "every": "2s", @@ -50,6 +71,14 @@ "every": "2s", "burst": 3 }, + { + "host": ".ve.media.tumblr.com", + "every": "200ms", + "burst": 30, + "rules": [ + {"condition": "status=403", "action": "dont_retry"} + ] + }, { "host": ".s3.amazonaws.com", "every": "10s", diff --git a/main.go b/main.go index d6dc3fa..dce170e 100644 --- a/main.go +++ b/main.go @@ -2,11 +2,13 @@ package main import ( "fmt" + "github.com/dchest/siphash" "github.com/elazarl/goproxy" "github.com/pkg/errors" "github.com/ryanuber/go-glob" "github.com/sirupsen/logrus" "golang.org/x/time/rate" + "math" "math/rand" "net/http" "net/url" @@ -37,6 +39,7 @@ type Proxy struct { Limiters []*ExpiringLimiter HttpClient *http.Client Connections *int32 + UniqueParam string } type RequestCtx struct { @@ -106,10 +109,64 @@ func simplifyHost(host string) string { return "." + host } -func (b *Balancer) chooseProxy() *Proxy { +func (b *Balancer) chooseProxy(r *http.Request) (*Proxy, error) { if len(b.proxies) == 0 { - return b.proxies[0] + return b.proxies[0], nil + } + + if config.Routing { + routingProxyParam := r.Header.Get("X-Architeuthis-Proxy") + r.Header.Del("X-Architeuthis-Proxy") + + if routingProxyParam != "" { + p := b.getProxyByNameOrNil(routingProxyParam) + if p != nil { + return p, nil + } + } + + routingHashParam := r.Header.Get("X-Architeuthis-Hash") + r.Header.Del("X-Architeuthis-Hash") + + if routingHashParam != "" { + hash := siphash.Hash(1, 2, []byte(routingHashParam)) + if hash == 0 { + hash = 1 + } + + pIdx := int(float64(hash) / (float64(math.MaxUint64) / float64(len(b.proxies)))) + + logrus.WithFields(logrus.Fields{ + "hash": routingHashParam, + }).Trace("Using hash") + + return b.proxies[pIdx], nil + } + + routingUniqueParam := r.Header.Get("X-Architeuthis-Unique") + r.Header.Del("X-Architeuthis-Unique") + + if routingUniqueParam != "" { + + var blankProxy *Proxy + + for _, p := range b.proxies { + if p.UniqueParam == "" { + blankProxy = p + } else if p.UniqueParam == routingUniqueParam { + return p, nil + } + } + if blankProxy != nil { + blankProxy.UniqueParam = routingUniqueParam + logrus.Infof("Bound unique param %s to %s", routingUniqueParam, blankProxy.Name) + return blankProxy, nil + } else { + logrus.WithField("unique param", routingUniqueParam).Error("No blank proxies to route this request!") + return nil, errors.Errorf("No blank proxies to route this request! unique param: %s", routingUniqueParam) + } + } } sort.Sort(ByConnectionCount(b.proxies)) @@ -118,17 +175,29 @@ func (b *Balancer) chooseProxy() *Proxy { proxiesWithSameConnCount := b.getProxiesWithSameConnCountAs(proxyWithLeastConns) if len(proxiesWithSameConnCount) > 1 { - return proxiesWithSameConnCount[rand.Intn(len(proxiesWithSameConnCount))] + return proxiesWithSameConnCount[rand.Intn(len(proxiesWithSameConnCount))], nil } else { - return proxyWithLeastConns + return proxyWithLeastConns, nil } } +func (b *Balancer) getProxyByNameOrNil(routingParam string) *Proxy { + if routingParam != "" { + for _, p := range b.proxies { + if p.Name == routingParam { + return p + } + } + } + + return nil +} + func (b *Balancer) getProxiesWithSameConnCountAs(p0 *Proxy) []*Proxy { proxiesWithSameConnCount := make([]*Proxy, 0) for _, p := range b.proxies { - if p.Connections != p0.Connections { + if *p.Connections != *p0.Connections { break } proxiesWithSameConnCount = append(proxiesWithSameConnCount, p) @@ -149,7 +218,12 @@ func New() *Balancer { func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { balancer.proxyMutex.RLock() - p := balancer.chooseProxy() + defer balancer.proxyMutex.RUnlock() + p, err := balancer.chooseProxy(r) + + if err != nil { + return nil, goproxy.NewResponse(r, "text/plain", 500, err.Error()) + } logrus.WithFields(logrus.Fields{ "proxy": p.Name, @@ -158,7 +232,6 @@ func New() *Balancer { }).Trace("Routing request") resp, err := p.processRequest(r) - balancer.proxyMutex.RUnlock() if err != nil { logrus.WithError(err).Trace("Could not complete request")