mirror of
https://github.com/simon987/Architeuthis.git
synced 2025-04-10 13:36:41 +00:00
Request routing. walker76/stackoversight#7
This commit is contained in:
parent
3ae1089048
commit
37fbfe130e
91
README.md
91
README.md
@ -4,8 +4,6 @@
|
||||

|
||||
[](https://ci.simon987.net/job/architeuthis_builds/)
|
||||
|
||||
*NOTE: this is very WIP*
|
||||
|
||||
HTTP(S) proxy with integrated load-balancing, rate-limiting
|
||||
and error handling. Built for automated web scraping.
|
||||
|
||||
@ -13,6 +11,7 @@ and error handling. Built for automated web scraping.
|
||||
* Seamless exponential backoff retries on timeout or error HTTP codes
|
||||
* Requires no additional configuration for integration into existing programs
|
||||
* Configurable per-host behavior
|
||||
* Proxy routing (Requests can be forced to use a specific proxy with header param)
|
||||
|
||||
### Typical use case
|
||||

|
||||
@ -20,8 +19,8 @@ and error handling. Built for automated web scraping.
|
||||
### Usage
|
||||
|
||||
```bash
|
||||
wget https://simon987.net/data/architeuthis/15_architeuthis.tar.gz
|
||||
tar -xzf 15_architeuthis.tar.gz
|
||||
wget https://simon987.net/data/architeuthis/16_architeuthis.tar.gz
|
||||
tar -xzf 16_architeuthis.tar.gz
|
||||
|
||||
vim config.json # Configure settings here
|
||||
./architeuthis
|
||||
@ -50,6 +49,89 @@ level=trace msg=Sleeping wait=433.394361ms
|
||||
...
|
||||
```
|
||||
|
||||
### Proxy routing
|
||||
|
||||
To use routing, enable the `routing` parameter in the configuration file.
|
||||
|
||||
**Explicitly choose proxy**
|
||||
|
||||
You can force a request to go through a specific proxy by using the `X-Architeuthis-Proxy` header.
|
||||
When specified and `routing` is
|
||||
enabled in the config file, the request will use the proxy with the
|
||||
matching name.
|
||||
|
||||
Example:
|
||||
|
||||
in `config.json`:
|
||||
```
|
||||
...
|
||||
routing: true,
|
||||
"proxies": [
|
||||
{
|
||||
"name": "p0",
|
||||
"url": ""
|
||||
},
|
||||
{
|
||||
"name": "p1",
|
||||
"url": ""
|
||||
},
|
||||
...
|
||||
],
|
||||
```
|
||||
|
||||
This request will *always* be routed through the **p0** proxy:
|
||||
```bash
|
||||
curl https://google.ca/ -k -H "X-Architeuthis-Proxy: p0"
|
||||
```
|
||||
|
||||
Invalid/blank values are silently ignored; the request will be routed
|
||||
according to the usual load balancer rules.
|
||||
|
||||
**Hashed routing**
|
||||
|
||||
You can also use the `X-Architeuthis-Hash` header to specify an abitrary string.
|
||||
The string will be hashed and uniformly routed to its corresponding proxy. Unless the number
|
||||
proxy changes, requests with the same hash value will always be routed to the same proxy.
|
||||
|
||||
Example:
|
||||
|
||||
`X-Architeuthis-Hash: userOne` is guaranteed to always be routed to the same proxy.
|
||||
`X-Architeuthis-Hash: userTwo` is also guaranteed to always be routed to the same proxy,
|
||||
but **not necessarily a proxy different than userOne**.
|
||||
|
||||
|
||||
**Unique string routing**
|
||||
|
||||
You can use the `X-Architeuthis-Unique` header to specify a unique string that
|
||||
will be dynamically associated to a single proxy.
|
||||
|
||||
The first time such a request is received, the unique string is bound to a proxy and
|
||||
will *always* be routed to this proxy. Any other non-empty value for this header will
|
||||
be routed to another proxy and bound to it.
|
||||
|
||||
This means that you cannot use more unique strings than proxies,
|
||||
doing so will cause the request to drop and will show the message
|
||||
`No blank proxies to route this request!`.
|
||||
|
||||
Reloading the configuration or restarting the `architeuthis` instance will clear the
|
||||
proxy binds.
|
||||
|
||||
Example with configured proxies p0-p3:
|
||||
```
|
||||
msg=Listening addr="localhost:5050"
|
||||
msg="Bound unique param user1 to p3"
|
||||
msg="Routing request" conns=0 proxy=p3 url="https://google.ca:443/"
|
||||
msg="Bound unique param user2 to p2"
|
||||
msg="Routing request" conns=0 proxy=p2 url="https://google.ca:443/"
|
||||
msg="Bound unique param user3 to p1"
|
||||
msg="Routing request" conns=0 proxy=p1 url="https://google.ca:443/"
|
||||
msg="Bound unique param user4 to p0"
|
||||
msg="Routing request" conns=0 proxy=p0 url="https://google.ca:443/"
|
||||
msg="No blank proxies to route this request!" unique param=user5
|
||||
```
|
||||
|
||||
The `X-Architeuthis-*` header *will not* be sent to the remote host.
|
||||
|
||||
### Hot config reload
|
||||
|
||||
```bash
|
||||
@ -114,6 +196,7 @@ Note that having too many rules for one host might negatively impact performance
|
||||
"multiplier": 2.5,
|
||||
"retries": 3,
|
||||
"retries_hard": 6,
|
||||
"routing": true,
|
||||
"proxies": [
|
||||
{
|
||||
"name": "squid_P0",
|
||||
|
@ -82,10 +82,10 @@ var config struct {
|
||||
Wait int64
|
||||
Timeout time.Duration
|
||||
DefaultConfig *HostConfig
|
||||
Routing bool
|
||||
}
|
||||
|
||||
func parseRule(raw *RawHostRule) (*HostRule, error) {
|
||||
//TODO: for the love of god someone please refactor this func
|
||||
|
||||
rule := &HostRule{}
|
||||
var err error
|
||||
|
31
config.json
31
config.json
@ -5,16 +5,29 @@
|
||||
"multiplier": 2.5,
|
||||
"retries": 3,
|
||||
"retries_hard": 6,
|
||||
"routing": true,
|
||||
"proxies": [
|
||||
{
|
||||
"name": "p0",
|
||||
"url": ""
|
||||
},
|
||||
{
|
||||
"name": "p1",
|
||||
"url": ""
|
||||
},
|
||||
{
|
||||
"name": "p2",
|
||||
"url": ""
|
||||
},
|
||||
{
|
||||
"name": "p3",
|
||||
"url": ""
|
||||
}
|
||||
],
|
||||
"hosts": [
|
||||
{
|
||||
"host": "*",
|
||||
"every": "500ms",
|
||||
"every": "125ms",
|
||||
"burst": 25,
|
||||
"headers": {
|
||||
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
|
||||
@ -26,6 +39,14 @@
|
||||
{"condition": "response_time>10s", "action": "dont_retry"}
|
||||
]
|
||||
},
|
||||
{
|
||||
"host": ".i.imgur.com",
|
||||
"every": "100ms",
|
||||
"burst": 1,
|
||||
"headers": {
|
||||
"User-Agent": "curl/7.65.1"
|
||||
}
|
||||
},
|
||||
{
|
||||
"host": "*.reddit.com",
|
||||
"every": "2s",
|
||||
@ -50,6 +71,14 @@
|
||||
"every": "2s",
|
||||
"burst": 3
|
||||
},
|
||||
{
|
||||
"host": ".ve.media.tumblr.com",
|
||||
"every": "200ms",
|
||||
"burst": 30,
|
||||
"rules": [
|
||||
{"condition": "status=403", "action": "dont_retry"}
|
||||
]
|
||||
},
|
||||
{
|
||||
"host": ".s3.amazonaws.com",
|
||||
"every": "10s",
|
||||
|
87
main.go
87
main.go
@ -2,11 +2,13 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/dchest/siphash"
|
||||
"github.com/elazarl/goproxy"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/ryanuber/go-glob"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/time/rate"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@ -37,6 +39,7 @@ type Proxy struct {
|
||||
Limiters []*ExpiringLimiter
|
||||
HttpClient *http.Client
|
||||
Connections *int32
|
||||
UniqueParam string
|
||||
}
|
||||
|
||||
type RequestCtx struct {
|
||||
@ -106,10 +109,64 @@ func simplifyHost(host string) string {
|
||||
return "." + host
|
||||
}
|
||||
|
||||
func (b *Balancer) chooseProxy() *Proxy {
|
||||
func (b *Balancer) chooseProxy(r *http.Request) (*Proxy, error) {
|
||||
|
||||
if len(b.proxies) == 0 {
|
||||
return b.proxies[0]
|
||||
return b.proxies[0], nil
|
||||
}
|
||||
|
||||
if config.Routing {
|
||||
routingProxyParam := r.Header.Get("X-Architeuthis-Proxy")
|
||||
r.Header.Del("X-Architeuthis-Proxy")
|
||||
|
||||
if routingProxyParam != "" {
|
||||
p := b.getProxyByNameOrNil(routingProxyParam)
|
||||
if p != nil {
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
routingHashParam := r.Header.Get("X-Architeuthis-Hash")
|
||||
r.Header.Del("X-Architeuthis-Hash")
|
||||
|
||||
if routingHashParam != "" {
|
||||
hash := siphash.Hash(1, 2, []byte(routingHashParam))
|
||||
if hash == 0 {
|
||||
hash = 1
|
||||
}
|
||||
|
||||
pIdx := int(float64(hash) / (float64(math.MaxUint64) / float64(len(b.proxies))))
|
||||
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"hash": routingHashParam,
|
||||
}).Trace("Using hash")
|
||||
|
||||
return b.proxies[pIdx], nil
|
||||
}
|
||||
|
||||
routingUniqueParam := r.Header.Get("X-Architeuthis-Unique")
|
||||
r.Header.Del("X-Architeuthis-Unique")
|
||||
|
||||
if routingUniqueParam != "" {
|
||||
|
||||
var blankProxy *Proxy
|
||||
|
||||
for _, p := range b.proxies {
|
||||
if p.UniqueParam == "" {
|
||||
blankProxy = p
|
||||
} else if p.UniqueParam == routingUniqueParam {
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
if blankProxy != nil {
|
||||
blankProxy.UniqueParam = routingUniqueParam
|
||||
logrus.Infof("Bound unique param %s to %s", routingUniqueParam, blankProxy.Name)
|
||||
return blankProxy, nil
|
||||
} else {
|
||||
logrus.WithField("unique param", routingUniqueParam).Error("No blank proxies to route this request!")
|
||||
return nil, errors.Errorf("No blank proxies to route this request! unique param: %s", routingUniqueParam)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(ByConnectionCount(b.proxies))
|
||||
@ -118,17 +175,29 @@ func (b *Balancer) chooseProxy() *Proxy {
|
||||
proxiesWithSameConnCount := b.getProxiesWithSameConnCountAs(proxyWithLeastConns)
|
||||
|
||||
if len(proxiesWithSameConnCount) > 1 {
|
||||
return proxiesWithSameConnCount[rand.Intn(len(proxiesWithSameConnCount))]
|
||||
return proxiesWithSameConnCount[rand.Intn(len(proxiesWithSameConnCount))], nil
|
||||
} else {
|
||||
return proxyWithLeastConns
|
||||
return proxyWithLeastConns, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Balancer) getProxyByNameOrNil(routingParam string) *Proxy {
|
||||
if routingParam != "" {
|
||||
for _, p := range b.proxies {
|
||||
if p.Name == routingParam {
|
||||
return p
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Balancer) getProxiesWithSameConnCountAs(p0 *Proxy) []*Proxy {
|
||||
|
||||
proxiesWithSameConnCount := make([]*Proxy, 0)
|
||||
for _, p := range b.proxies {
|
||||
if p.Connections != p0.Connections {
|
||||
if *p.Connections != *p0.Connections {
|
||||
break
|
||||
}
|
||||
proxiesWithSameConnCount = append(proxiesWithSameConnCount, p)
|
||||
@ -149,7 +218,12 @@ func New() *Balancer {
|
||||
func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
|
||||
|
||||
balancer.proxyMutex.RLock()
|
||||
p := balancer.chooseProxy()
|
||||
defer balancer.proxyMutex.RUnlock()
|
||||
p, err := balancer.chooseProxy(r)
|
||||
|
||||
if err != nil {
|
||||
return nil, goproxy.NewResponse(r, "text/plain", 500, err.Error())
|
||||
}
|
||||
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"proxy": p.Name,
|
||||
@ -158,7 +232,6 @@ func New() *Balancer {
|
||||
}).Trace("Routing request")
|
||||
|
||||
resp, err := p.processRequest(r)
|
||||
balancer.proxyMutex.RUnlock()
|
||||
|
||||
if err != nil {
|
||||
logrus.WithError(err).Trace("Could not complete request")
|
||||
|
Loading…
x
Reference in New Issue
Block a user