Big refactor/rewrite: dynamic proxies, kill/revive proxies, all internal state stored in redis

This commit is contained in:
simon 2019-11-19 19:00:10 -05:00
parent c735f3cc87
commit ce41aee843
17 changed files with 1938 additions and 747 deletions

1
.gitignore vendored
View File

@ -14,3 +14,4 @@
*.iml
.idea/
architeuthis

126
README.md
View File

@ -11,7 +11,9 @@ and error handling. Built for automated web scraping.
* Seamless exponential backoff retries on timeout or error HTTP codes
* Requires no additional configuration for integration into existing programs
* Configurable per-host behavior
* Proxy routing (Requests can be forced to use a specific proxy with header param)
* Monitoring with InfluxDB
![grafana](grafana.png)
### Typical use case
![user_case](use_case.png)
@ -19,19 +21,30 @@ and error handling. Built for automated web scraping.
### Usage
```bash
wget https://simon987.net/data/architeuthis/17_architeuthis.tar.gz
tar -xzf 17_architeuthis.tar.gz
wget https://simon987.net/data/architeuthis/16_architeuthis.tar.gz
tar -xzf 16_architeuthis.tar.gz
vim config.json # Configure settings here
./architeuthis
```
You can add proxies using the `/add_proxy` API:
```bash
curl http://localhost:5050?url=<url>&name=<name>
```
Or automatically using Proxybroker:
```bash
python3 import_from_broker.py
```
### Example usage with wget
```bash
export http_proxy="http://localhost:5050"
# --no-check-certificates is necessary for https mitm
# You don't need to specify user-agent if it's already in your config.json
wget -m -np -c --no-check-certificate -R index.html* http://ca.releases.ubuntu.com/
wget -m -np -c --no-check-certificate -R index.html* http http://ca.releases.ubuntu.com/
```
With `"every": "500ms"` and a single proxy, you should see
@ -49,89 +62,6 @@ level=trace msg=Sleeping wait=433.394361ms
...
```
### Proxy routing
To use routing, enable the `routing` parameter in the configuration file.
**Explicitly choose proxy**
You can force a request to go through a specific proxy by using the `X-Architeuthis-Proxy` header.
When specified and `routing` is
enabled in the config file, the request will use the proxy with the
matching name.
Example:
in `config.json`:
```
...
routing: true,
"proxies": [
{
"name": "p0",
"url": ""
},
{
"name": "p1",
"url": ""
},
...
],
```
This request will *always* be routed through the **p0** proxy:
```bash
curl https://google.ca/ -k -H "X-Architeuthis-Proxy: p0"
```
Invalid/blank values are silently ignored; the request will be routed
according to the usual load balancer rules.
**Hashed routing**
You can also use the `X-Architeuthis-Hash` header to specify an abitrary string.
The string will be hashed and uniformly routed to its corresponding proxy. Unless the number
proxy changes, requests with the same hash value will always be routed to the same proxy.
Example:
`X-Architeuthis-Hash: userOne` is guaranteed to always be routed to the same proxy.
`X-Architeuthis-Hash: userTwo` is also guaranteed to always be routed to the same proxy,
but **not necessarily a proxy different than userOne**.
**Unique string routing**
You can use the `X-Architeuthis-Unique` header to specify a unique string that
will be dynamically associated to a single proxy.
The first time such a request is received, the unique string is bound to a proxy and
will *always* be routed to this proxy. Any other non-empty value for this header will
be routed to another proxy and bound to it.
This means that you cannot use more unique strings than proxies,
doing so will cause the request to drop and will show the message
`No blank proxies to route this request!`.
Reloading the configuration or restarting the `architeuthis` instance will clear the
proxy binds.
Example with configured proxies p0-p3:
```
msg=Listening addr="localhost:5050"
msg="Bound unique param user1 to p3"
msg="Routing request" conns=0 proxy=p3 url="https://google.ca:443/"
msg="Bound unique param user2 to p2"
msg="Routing request" conns=0 proxy=p2 url="https://google.ca:443/"
msg="Bound unique param user3 to p1"
msg="Routing request" conns=0 proxy=p1 url="https://google.ca:443/"
msg="Bound unique param user4 to p0"
msg="Routing request" conns=0 proxy=p0 url="https://google.ca:443/"
msg="No blank proxies to route this request!" unique param=user5
```
The `X-Architeuthis-*` header *will not* be sent to the remote host.
### Hot config reload
```bash
@ -178,8 +108,6 @@ Actions
| should_retry | Override default retry behavior for http errors (by default it retries on 403,408,429,444,499,>500)
| force_retry | Always retry (Up to retries_hard times)
| dont_retry | Immediately stop retrying
| multiply_every | Multiply the current limiter's 'every' value by `arg` | `1.5`(float)
| set_every | Set the current limiter's 'every' value to `arg` | `10s`(duration)
In the event of a temporary network error, `should_retry` is ignored (it will always retry unless `dont_retry` is set)
@ -195,18 +123,6 @@ Note that having too many rules for one host might negatively impact performance
"wait": "4s",
"multiplier": 2.5,
"retries": 3,
"retries_hard": 6,
"routing": true,
"proxies": [
{
"name": "squid_P0",
"url": "http://user:pass@p0.exemple.com:8080"
},
{
"name": "privoxy_P1",
"url": "http://p1.exemple.com:8080"
}
],
"hosts": [
{
"host": "*",
@ -232,14 +148,6 @@ Note that having too many rules for one host might negatively impact performance
"rules": [
{"condition": "status=403", "action": "dont_retry"}
]
},
{
"host": ".www.instagram.com",
"every": "4500ms",
"burst": 3,
"rules": [
{"condition": "body=*please try again in a few minutes*", "action": "multiply_every", "arg": "2"}
]
}
]
}

142
config.go
View File

@ -7,7 +7,6 @@ import (
"github.com/pkg/errors"
"github.com/ryanuber/go-glob"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"io/ioutil"
"os"
"reflect"
@ -17,40 +16,10 @@ import (
"time"
)
type HostConfig struct {
Host string `json:"host"`
EveryStr string `json:"every"`
Burst int `json:"burst"`
Headers map[string]string `json:"headers"`
RawRules []*RawHostRule `json:"rules"`
Every time.Duration
Rules []*HostRule
}
type RawHostRule struct {
Condition string `json:"condition"`
Action string `json:"action"`
Arg string `json:"arg"`
}
type HostRuleAction int
const (
DontRetry HostRuleAction = 0
MultiplyEvery HostRuleAction = 1
SetEvery HostRuleAction = 2
ForceRetry HostRuleAction = 3
ShouldRetry HostRuleAction = 4
)
func (a HostRuleAction) String() string {
switch a {
case DontRetry:
return "dont_retry"
case MultiplyEvery:
return "multiply_every"
case SetEvery:
return "set_every"
case ForceRetry:
return "force_retry"
case ShouldRetry:
@ -59,63 +28,21 @@ func (a HostRuleAction) String() string {
return "???"
}
type HostRule struct {
Matches func(r *RequestCtx) bool
Action HostRuleAction
Arg float64
}
type ProxyConfig struct {
Name string `json:"name"`
Url string `json:"url"`
}
var config struct {
Addr string `json:"addr"`
TimeoutStr string `json:"timeout"`
WaitStr string `json:"wait"`
Multiplier float64 `json:"multiplier"`
Retries int `json:"retries"`
RetriesHard int `json:"retries_hard"`
Hosts []*HostConfig `json:"hosts"`
Proxies []ProxyConfig `json:"proxies"`
Wait int64
Timeout time.Duration
DefaultConfig *HostConfig
Routing bool
}
func parseRule(raw *RawHostRule) (*HostRule, error) {
rule := &HostRule{}
var err error
switch raw.Action {
case "should_retry":
rule.Action = ShouldRetry
case "dont_retry":
rule.Action = DontRetry
case "multiply_every":
rule.Action = MultiplyEvery
rule.Arg, err = strconv.ParseFloat(raw.Arg, 64)
case "set_every":
rule.Action = SetEvery
var duration time.Duration
duration, err = time.ParseDuration(raw.Arg)
if err != nil {
return nil, err
}
rule.Arg = 1 / duration.Seconds()
case "force_retry":
rule.Action = ForceRetry
default:
return nil, errors.Errorf("Invalid argument for action: %s", raw.Action)
}
if err != nil {
return nil, err
}
switch {
case strings.Contains(raw.Condition, "!="):
op1Str, op2Str := split(raw.Condition, "!=")
@ -125,12 +52,12 @@ func parseRule(raw *RawHostRule) (*HostRule, error) {
}
if isGlob(op2Str) {
rule.Matches = func(ctx *RequestCtx) bool {
rule.Matches = func(ctx *ResponseCtx) bool {
return !glob.Glob(op2Str, op1Func(ctx))
}
} else {
op2Str = strings.Replace(op2Str, "\\*", "*", -1)
rule.Matches = func(ctx *RequestCtx) bool {
rule.Matches = func(ctx *ResponseCtx) bool {
return op1Func(ctx) != op2Str
}
}
@ -142,12 +69,12 @@ func parseRule(raw *RawHostRule) (*HostRule, error) {
}
if isGlob(op2Str) {
rule.Matches = func(ctx *RequestCtx) bool {
rule.Matches = func(ctx *ResponseCtx) bool {
return glob.Glob(op2Str, op1Func(ctx))
}
} else {
op2Str = strings.Replace(op2Str, "\\*", "*", -1)
rule.Matches = func(ctx *RequestCtx) bool {
rule.Matches = func(ctx *ResponseCtx) bool {
return op1Func(ctx) == op2Str
}
}
@ -162,7 +89,7 @@ func parseRule(raw *RawHostRule) (*HostRule, error) {
return nil, err
}
rule.Matches = func(ctx *RequestCtx) bool {
rule.Matches = func(ctx *ResponseCtx) bool {
op1Num, err := strconv.ParseFloat(op1Func(ctx), 64)
handleRuleErr(err)
return op1Num > op2Num
@ -178,7 +105,7 @@ func parseRule(raw *RawHostRule) (*HostRule, error) {
return nil, err
}
rule.Matches = func(ctx *RequestCtx) bool {
rule.Matches = func(ctx *ResponseCtx) bool {
op1Num, err := strconv.ParseFloat(op1Func(ctx), 64)
handleRuleErr(err)
return op1Num < op2Num
@ -214,10 +141,10 @@ func parseOperand2(op1, op2 string) (float64, error) {
return strconv.ParseFloat(op2, 64)
}
func parseOperand1(op string) func(ctx *RequestCtx) string {
func parseOperand1(op string) func(ctx *ResponseCtx) string {
switch {
case op == "body":
return func(ctx *RequestCtx) string {
return func(ctx *ResponseCtx) string {
if ctx.Response == nil {
return ""
@ -235,19 +162,19 @@ func parseOperand1(op string) func(ctx *RequestCtx) string {
return string(bodyBytes)
}
case op == "status":
return func(ctx *RequestCtx) string {
return func(ctx *ResponseCtx) string {
if ctx.Response == nil {
return ""
}
return strconv.Itoa(ctx.Response.StatusCode)
}
case op == "response_time":
return func(ctx *RequestCtx) string {
return strconv.FormatFloat(time.Now().Sub(ctx.RequestTime).Seconds(), 'f', 6, 64)
return func(ctx *ResponseCtx) string {
return strconv.FormatFloat(ctx.ResponseTime, 'f', 6, 64)
}
case strings.HasPrefix(op, "header:"):
header := op[strings.Index(op, ":")+1:]
return func(ctx *RequestCtx) string {
return func(ctx *ResponseCtx) string {
if ctx.Response == nil {
return ""
}
@ -356,49 +283,8 @@ func validateConfig() {
}
}
func applyConfig(proxy *Proxy) {
//Reverse order
for i := len(config.Hosts) - 1; i >= 0; i-- {
conf := config.Hosts[i]
proxy.Limiters = append(proxy.Limiters, &ExpiringLimiter{
HostGlob: conf.Host,
IsGlob: isGlob(conf.Host),
Limiter: rate.NewLimiter(rate.Every(conf.Every), conf.Burst),
LastRead: time.Now(),
CanDelete: false,
})
}
}
func (b *Balancer) reloadConfig() {
b.proxyMutex.Lock()
err := loadConfig()
if err != nil {
panic(err)
}
if b.proxies != nil {
b.proxies = b.proxies[:0]
}
for _, proxyConf := range config.Proxies {
proxy, err := NewProxy(proxyConf.Name, proxyConf.Url)
handleErr(err)
b.proxies = append(b.proxies, proxy)
applyConfig(proxy)
logrus.WithFields(logrus.Fields{
"name": proxy.Name,
"url": proxy.Url,
}).Info("Proxy")
}
b.proxyMutex.Unlock()
func (a *Architeuthis) reloadConfig() {
_ = loadConfig()
logrus.Info("Reloaded config")
}

View File

@ -1,47 +1,27 @@
{
"addr": "localhost:5050",
"timeout": "15s",
"wait": "4s",
"multiplier": 2.5,
"wait": "0.5s",
"multiplier": 1,
"retries": 3,
"retries_hard": 6,
"routing": true,
"proxies": [
{
"name": "p0",
"url": ""
},
{
"name": "p1",
"url": ""
},
{
"name": "p2",
"url": ""
},
{
"name": "p3",
"url": ""
}
],
"max_error": 0.4,
"redis_url": "localhost:6379",
"hosts": [
{
"host": "*",
"every": "125ms",
"burst": 25,
"every": "1ms",
"burst": 1,
"headers": {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Cache-Control": "max-age=0",
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:67.0) Gecko/20100101 Firefox/67.0"
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:70.0) Gecko/20100101 Firefox/70.0"
},
"rules": [
{"condition": "response_time>10s", "action": "dont_retry"}
]
},
{
"host": ".i.imgur.com",
"every": "100ms",
"every": "1s",
"burst": 1,
"headers": {
"User-Agent": "curl/7.65.1"
@ -49,7 +29,7 @@
},
{
"host": "*.reddit.com",
"every": "2s",
"every": "1s",
"burst": 1
},
{

54
cron.go Normal file
View File

@ -0,0 +1,54 @@
package main
import (
"github.com/robfig/cron"
"github.com/sirupsen/logrus"
"sync"
"time"
)
func (a *Architeuthis) setupProxyReviver() {
const gcInterval = time.Minute * 10
gcCron := cron.New()
gcSchedule := cron.Every(gcInterval)
gcCron.Schedule(gcSchedule, cron.FuncJob(a.reviveProxies))
go gcCron.Run()
logrus.WithFields(logrus.Fields{
"every": gcInterval,
}).Info("Started proxy revive cron")
}
func (a *Architeuthis) testUrl(ch chan *Proxy, url string, wg sync.WaitGroup) {
for p := range ch {
r, _ := p.HttpClient.Get(url)
if r != nil && isHttpSuccessCode(r.StatusCode) {
a.setAlive(p.Name)
}
}
wg.Done()
}
func (a *Architeuthis) reviveProxies() {
wg := sync.WaitGroup{}
const checkers = 50
wg.Add(checkers)
ch := make(chan *Proxy, checkers)
for i := 0; i < checkers; i++ {
go a.testUrl(ch, "https://google.com/", wg)
}
for _, p := range a.GetDeadProxies() {
ch <- p
}
wg.Wait()
}

View File

@ -13,6 +13,33 @@ import (
"time"
)
func shouldBlameProxy(rCtx *ResponseCtx) bool {
if rCtx.Response != nil {
return shouldBlameProxyHttpCode(rCtx.Response.StatusCode)
} else {
//TODO: don't blame proxy for timeout?
return true
}
}
func isProxyError(err error) bool {
urlErr, ok := err.(*url.Error)
if ok {
opErr, ok := urlErr.Err.(*net.OpError)
if ok {
if opErr.Op == "proxyconnect" {
return true
}
if opErr.Op == "local error" {
return true
}
}
}
return false
}
func isPermanentError(err error) bool {
var opErr *net.OpError
@ -27,6 +54,11 @@ func isPermanentError(err error) bool {
}
return false
}
if opErr.Err.Error() == "Internal Privoxy Error" {
return true
}
} else {
_, ok := err.(net.Error)
if ok {
@ -40,11 +72,6 @@ func isPermanentError(err error) bool {
return false
}
if opErr.Op == "proxyconnect" {
logrus.Error("Error connecting to the proxy!")
return true
}
if opErr.Timeout() {
// Usually means that there is no route to host
return true
@ -54,9 +81,6 @@ func isPermanentError(err error) bool {
case *net.DNSError:
return true
case *os.SyscallError:
logrus.Printf("os.SyscallError:%+v", t)
if errno, ok := t.Err.(syscall.Errno); ok {
switch errno {
case syscall.ECONNREFUSED:
@ -65,30 +89,28 @@ func isPermanentError(err error) bool {
case syscall.ETIMEDOUT:
log.Println("timeout")
return false
case syscall.ECONNRESET:
log.Println("connection reset by peer")
return false
}
}
}
//todo: handle the other error types
fmt.Println("fixme: None of the above")
return false
}
func waitTime(retries int) time.Duration {
func getWaitTime(retries int) time.Duration {
return time.Duration(config.Wait * int64(math.Pow(config.Multiplier, float64(retries))))
}
func (p *Proxy) waitRateLimit(limiter *rate.Limiter) {
reservation := limiter.Reserve()
delay := reservation.Delay()
if delay > 0 {
logrus.WithFields(logrus.Fields{
"wait": delay,
}).Trace("Sleeping")
time.Sleep(delay)
}
}
@ -97,6 +119,16 @@ func isHttpSuccessCode(code int) bool {
return code >= 200 && code < 300
}
func shouldBlameProxyHttpCode(code int) bool {
switch {
case code >= 500:
return false
default:
return true
}
}
func shouldRetryHttpCode(code int) bool {
switch {

61
gc.go
View File

@ -1,61 +0,0 @@
package main
import (
"github.com/robfig/cron"
"github.com/sirupsen/logrus"
"time"
)
func (b *Balancer) setupGarbageCollector() {
const gcInterval = time.Minute * 5
gcCron := cron.New()
gcSchedule := cron.Every(gcInterval)
gcCron.Schedule(gcSchedule, cron.FuncJob(b.cleanAllExpiredLimits))
go gcCron.Run()
logrus.WithFields(logrus.Fields{
"every": gcInterval,
}).Info("Started task cleanup cron")
}
func (b *Balancer) cleanAllExpiredLimits() {
before := 0
after := 0
b.proxyMutex.RLock()
for _, p := range b.proxies {
before += len(p.Limiters)
cleanExpiredLimits(p)
after += len(p.Limiters)
}
b.proxyMutex.RUnlock()
logrus.WithFields(logrus.Fields{
"removed": before - after,
}).Info("Cleaned up limiters")
}
func cleanExpiredLimits(proxy *Proxy) {
const ttl = time.Hour
var limits []*ExpiringLimiter
now := time.Now()
for host, limiter := range proxy.Limiters {
if now.Sub(limiter.LastRead) > ttl && limiter.CanDelete {
logrus.WithFields(logrus.Fields{
"proxy": proxy.Name,
"limiter": host,
"last_read": now.Sub(limiter.LastRead),
}).Trace("Pruning limiter")
} else {
limits = append(limits, limiter)
}
}
proxy.Limiters = limits
}

BIN
grafana.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 204 KiB

778
grafana/model.json Normal file
View File

@ -0,0 +1,778 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": 1,
"links": [],
"panels": [
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 14,
"x": 0,
"y": 0
},
"id": 6,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "request.count"
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"groupBy": [
{
"params": [
"1m"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "request",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"status"
],
"type": "field"
},
{
"params": [],
"type": "count"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Requests / minute",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 10,
"x": 14,
"y": 0
},
"id": 5,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "connected",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "good",
"color": "#73BF69"
},
{
"alias": "bad",
"color": "#F2495C",
"lines": false,
"pointradius": 4,
"points": true
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "good",
"groupBy": [
{
"params": [
"1m"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "request",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"latency"
],
"type": "field"
},
{
"params": [],
"type": "mean"
}
]
],
"tags": [
{
"key": "ok",
"operator": "=",
"value": "true"
}
]
},
{
"alias": "bad",
"groupBy": [
{
"params": [
"1m"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "request",
"orderByTime": "ASC",
"policy": "default",
"refId": "B",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"latency"
],
"type": "field"
},
{
"params": [],
"type": "mean"
}
]
],
"tags": [
{
"key": "ok",
"operator": "=",
"value": "false"
}
]
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Latency",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 8,
"x": 0,
"y": 9
},
"id": 3,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "connected",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "request.sum",
"color": "#A352CC"
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"groupBy": [
{
"params": [
"1m"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "request",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"size"
],
"type": "field"
},
{
"params": [],
"type": "sum"
},
{
"params": [
"/60"
],
"type": "math"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Bandwidth",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "Bps",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 8,
"x": 8,
"y": 9
},
"id": 2,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "connected",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "count",
"color": "#5794F2"
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "count",
"groupBy": [
{
"params": [
"$__interval"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "add_proxy",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"newCount"
],
"type": "field"
},
{
"params": [],
"type": "mean"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Proxy count",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": true,
"dashLength": 10,
"dashes": false,
"datasource": null,
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 8,
"x": 16,
"y": 9
},
"id": 4,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": false,
"linewidth": 1,
"nullPointMode": "null as zero",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "retry",
"color": "#FF9830"
},
{
"alias": "rate",
"color": "#5794F2"
}
],
"spaceLength": 10,
"stack": true,
"steppedLine": false,
"targets": [
{
"alias": "retry",
"groupBy": [
{
"params": [
"1m"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "sleep",
"orderByTime": "ASC",
"policy": "default",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"duration"
],
"type": "field"
},
{
"params": [],
"type": "sum"
}
]
],
"tags": [
{
"key": "context",
"operator": "=",
"value": "retry"
}
]
},
{
"alias": "rate",
"groupBy": [
{
"params": [
"1m"
],
"type": "time"
},
{
"params": [
"null"
],
"type": "fill"
}
],
"measurement": "sleep",
"orderByTime": "ASC",
"policy": "default",
"refId": "B",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"duration"
],
"type": "field"
},
{
"params": [],
"type": "sum"
}
]
],
"tags": [
{
"key": "context",
"operator": "=",
"value": "rate"
}
]
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Sleep times",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": false
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"refresh": "5s",
"schemaVersion": 20,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-3h",
"to": "now"
},
"timepicker": {
"refresh_intervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
]
},
"timezone": "",
"title": "Architeuthis",
"uid": "I2xEOnbZk",
"version": 11
}

31
import_from_broker.py Normal file
View File

@ -0,0 +1,31 @@
import asyncio
import requests
from proxybroker import Broker, Checker
ARCHITEUTHIS_URL = "http://localhost:5050"
def add_to_architeuthis(name, url):
r = requests.get(ARCHITEUTHIS_URL + "/add_proxy?name=%s&url=%s" % (name, url))
print("ADD %s <%d>" % (name, r.status_code))
async def add(proxies):
while True:
proxy = await proxies.get()
if proxy is None:
break
url = "http://%s:%d" % (proxy.host, proxy.port)
name = "%s_%d" % (proxy.host, proxy.port)
add_to_architeuthis(name, url)
proxies = asyncio.Queue()
broker = Broker(proxies)
tasks = asyncio.gather(broker.find(types=['HTTPS'], limit=300), add(proxies))
loop = asyncio.get_event_loop()
loop.run_until_complete(tasks)

111
influxdb.go Normal file
View File

@ -0,0 +1,111 @@
package main
import (
influx "github.com/influxdata/influxdb1-client/v2"
"github.com/sirupsen/logrus"
"strconv"
"time"
)
const InfluxDbBufferSize = 100
const InfluxDbDatabase = "architeuthis"
const InfluxDBRetentionPolicy = ""
func (a *Architeuthis) asyncWriter(metrics chan *influx.Point) {
logrus.Trace("Started async influxdb writer")
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: InfluxDbDatabase,
RetentionPolicy: InfluxDBRetentionPolicy,
})
for point := range metrics {
bp.AddPoint(point)
if len(bp.Points()) >= InfluxDbBufferSize {
flushQueue(a.influxdb, &bp)
}
}
flushQueue(a.influxdb, &bp)
}
func flushQueue(influxdb influx.Client, bp *influx.BatchPoints) {
err := influxdb.Write(*bp)
if err != nil {
logrus.WithError(err).Error("influxdb write")
return
}
logrus.WithFields(logrus.Fields{
"size": len((*bp).Points()),
}).Trace("Wrote points")
*bp, _ = influx.NewBatchPoints(influx.BatchPointsConfig{
Database: InfluxDbDatabase,
RetentionPolicy: InfluxDBRetentionPolicy,
})
}
func (a *Architeuthis) writeMetricProxyCount(newCount int) {
point, _ := influx.NewPoint(
"add_proxy",
nil,
map[string]interface{}{
"newCount": newCount,
},
time.Now(),
)
a.points <- point
}
func (a *Architeuthis) writeMetricRequest(ctx ResponseCtx) {
var fields map[string]interface{}
if ctx.Response != nil {
size, _ := strconv.ParseInt(ctx.Response.Header.Get("Content-Length"), 10, 64)
fields = map[string]interface{}{
"status": ctx.Response.StatusCode,
"latency": ctx.ResponseTime,
"size": size,
}
} else {
fields = map[string]interface{}{}
}
var ok string
if ctx.Error == nil {
ok = "true"
} else {
ok = "false"
}
point, _ := influx.NewPoint(
"request",
map[string]string{
"ok": ok,
},
fields,
time.Now(),
)
a.points <- point
}
func (a *Architeuthis) writeMetricSleep(duration time.Duration, tag string) {
point, _ := influx.NewPoint(
"sleep",
map[string]string{
"context": tag,
},
map[string]interface{}{
"duration": duration.Seconds(),
},
time.Now(),
)
a.points <- point
}

1
jenkins/Jenkinsfile vendored
View File

@ -6,6 +6,7 @@ remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
remote.allowAnyHosts = true
logLevel = 'FINER'
remote.port = 2299
pipeline {
agent none

629
main.go
View File

@ -2,473 +2,292 @@ package main
import (
"fmt"
"github.com/dchest/siphash"
"github.com/elazarl/goproxy"
"github.com/go-redis/redis"
influx "github.com/influxdata/influxdb1-client/v2"
"github.com/pkg/errors"
"github.com/ryanuber/go-glob"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"math"
"math/rand"
"html/template"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
)
type Balancer struct {
server *goproxy.ProxyHttpServer
proxies []*Proxy
proxyMutex *sync.RWMutex
}
func New() *Architeuthis {
type ExpiringLimiter struct {
HostGlob string
IsGlob bool
CanDelete bool
Limiter *rate.Limiter
LastRead time.Time
}
a := new(Architeuthis)
type Proxy struct {
Name string
Url *url.URL
Limiters []*ExpiringLimiter
HttpClient *http.Client
Connections *int32
UniqueParam string
}
a.redis = redis.NewClient(&redis.Options{
Addr: config.RedisUrl,
Password: "",
DB: 0,
})
type RequestCtx struct {
RequestTime time.Time
Response *http.Response
}
a.setupProxyReviver()
type ByConnectionCount []*Proxy
var err error
const InfluxDBUrl = "http://localhost:8086"
a.influxdb, err = influx.NewHTTPClient(influx.HTTPConfig{
Addr: InfluxDBUrl,
})
func (a ByConnectionCount) Len() int {
return len(a)
}
func (a ByConnectionCount) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
func (a ByConnectionCount) Less(i, j int) bool {
return *a[i].Connections < *a[j].Connections
}
func (p *Proxy) getLimiter(host string) *rate.Limiter {
for _, limiter := range p.Limiters {
if limiter.IsGlob {
if glob.Glob(limiter.HostGlob, host) {
limiter.LastRead = time.Now()
return limiter.Limiter
}
} else if limiter.HostGlob == host {
limiter.LastRead = time.Now()
return limiter.Limiter
}
_, err = http.Post(InfluxDBUrl+"/query", "application/x-www-form-urlencoded", strings.NewReader("q=CREATE DATABASE \"architeuthis\""))
if err != nil {
panic(err)
}
newExpiringLimiter := p.makeNewLimiter(host)
return newExpiringLimiter.Limiter
}
a.points = make(chan *influx.Point, InfluxDbBufferSize)
func (p *Proxy) makeNewLimiter(host string) *ExpiringLimiter {
go a.asyncWriter(a.points)
newExpiringLimiter := &ExpiringLimiter{
CanDelete: false,
HostGlob: host,
IsGlob: false,
LastRead: time.Now(),
Limiter: rate.NewLimiter(rate.Every(config.DefaultConfig.Every), config.DefaultConfig.Burst),
}
a.server = goproxy.NewProxyHttpServer()
a.server.OnRequest().HandleConnect(goproxy.AlwaysMitm)
p.Limiters = append([]*ExpiringLimiter{newExpiringLimiter}, p.Limiters...)
logrus.WithFields(logrus.Fields{
"host": host,
"every": config.DefaultConfig.Every,
}).Trace("New limiter")
return newExpiringLimiter
}
func simplifyHost(host string) string {
col := strings.LastIndex(host, ":")
if col > 0 {
host = host[:col]
}
return "." + host
}
func (b *Balancer) chooseProxy(r *http.Request) (*Proxy, error) {
if len(b.proxies) == 0 {
return b.proxies[0], nil
}
if config.Routing {
routingProxyParam := r.Header.Get("X-Architeuthis-Proxy")
r.Header.Del("X-Architeuthis-Proxy")
if routingProxyParam != "" {
p := b.getProxyByNameOrNil(routingProxyParam)
if p != nil {
return p, nil
}
}
routingHashParam := r.Header.Get("X-Architeuthis-Hash")
r.Header.Del("X-Architeuthis-Hash")
if routingHashParam != "" {
hash := siphash.Hash(1, 2, []byte(routingHashParam))
if hash == 0 {
hash = 1
}
pIdx := int(float64(hash) / (float64(math.MaxUint64) / float64(len(b.proxies))))
logrus.WithFields(logrus.Fields{
"hash": routingHashParam,
}).Trace("Using hash")
return b.proxies[pIdx], nil
}
routingUniqueParam := r.Header.Get("X-Architeuthis-Unique")
r.Header.Del("X-Architeuthis-Unique")
if routingUniqueParam != "" {
var blankProxy *Proxy
for _, p := range b.proxies {
if p.UniqueParam == "" {
blankProxy = p
} else if p.UniqueParam == routingUniqueParam {
return p, nil
}
}
if blankProxy != nil {
blankProxy.UniqueParam = routingUniqueParam
logrus.Infof("Bound unique param %s to %s", routingUniqueParam, blankProxy.Name)
return blankProxy, nil
} else {
logrus.WithField("unique param", routingUniqueParam).Error("No blank proxies to route this request!")
return nil, errors.Errorf("No blank proxies to route this request! unique param: %s", routingUniqueParam)
}
}
}
sort.Sort(ByConnectionCount(b.proxies))
proxyWithLeastConns := b.proxies[0]
proxiesWithSameConnCount := b.getProxiesWithSameConnCountAs(proxyWithLeastConns)
if len(proxiesWithSameConnCount) > 1 {
return proxiesWithSameConnCount[rand.Intn(len(proxiesWithSameConnCount))], nil
} else {
return proxyWithLeastConns, nil
}
}
func (b *Balancer) getProxyByNameOrNil(routingParam string) *Proxy {
if routingParam != "" {
for _, p := range b.proxies {
if p.Name == routingParam {
return p
}
}
}
return nil
}
func (b *Balancer) getProxiesWithSameConnCountAs(p0 *Proxy) []*Proxy {
proxiesWithSameConnCount := make([]*Proxy, 0)
for _, p := range b.proxies {
if *p.Connections != *p0.Connections {
break
}
proxiesWithSameConnCount = append(proxiesWithSameConnCount, p)
}
return proxiesWithSameConnCount
}
func New() *Balancer {
balancer := new(Balancer)
balancer.proxyMutex = &sync.RWMutex{}
balancer.server = goproxy.NewProxyHttpServer()
balancer.server.OnRequest().HandleConnect(goproxy.AlwaysMitm)
balancer.server.OnRequest().DoFunc(
a.server.OnRequest().DoFunc(
func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
balancer.proxyMutex.RLock()
defer balancer.proxyMutex.RUnlock()
p, err := balancer.chooseProxy(r)
if err != nil {
return nil, goproxy.NewResponse(r, "text/plain", 500, err.Error())
}
logrus.WithFields(logrus.Fields{
"proxy": p.Name,
"conns": *p.Connections,
"url": r.URL,
}).Trace("Routing request")
resp, err := p.processRequest(r)
resp, err := a.processRequest(r)
if err != nil {
logrus.WithError(err).Trace("Could not complete request")
return nil, goproxy.NewResponse(r, "text/plain", 500, err.Error())
return nil, goproxy.NewResponse(r, "text/plain", http.StatusInternalServerError, err.Error())
}
return nil, resp
})
balancer.server.NonproxyHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mux := http.NewServeMux()
a.server.NonproxyHandler = mux
if r.URL.Path == "/reload" {
balancer.reloadConfig()
_, _ = fmt.Fprint(w, "Reloaded\n")
} else {
w.Header().Set("Content-Type", "application/json")
_, _ = fmt.Fprint(w, "{\"name\":\"Architeuthis\",\"version\":1.0}")
}
mux.HandleFunc("/reload", func(w http.ResponseWriter, r *http.Request) {
a.reloadConfig()
_, _ = fmt.Fprint(w, "Reloaded\n")
})
return balancer
templ, _ := template.ParseFiles("templates/stats.html")
mux.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) {
err = templ.Execute(w, a.getStats())
})
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = fmt.Fprint(w, "{\"name\":\"Architeuthis\",\"version\":2.0}")
})
mux.HandleFunc("/add_proxy", func(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
url := r.URL.Query().Get("url")
if name == "" || url == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
err := a.AddProxy(name, url)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"name": name,
"url": url,
}).Error("Could not add proxy")
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
})
return a
}
func getConfsMatchingRequest(r *http.Request) []*HostConfig {
func (a *Architeuthis) processRequest(r *http.Request) (*http.Response, error) {
sHost := simplifyHost(r.Host)
sHost := normalizeHost(r.Host)
configs := getConfigsMatchingHost(sHost)
configs := make([]*HostConfig, 0)
options := parseOptions(&r.Header)
proxyReq := applyHeaders(cloneRequest(r), configs)
for _, conf := range config.Hosts {
if glob.Glob(conf.Host, sHost) {
configs = append(configs, conf)
}
requestCtx := RequestCtx{
Request: proxyReq,
Retries: 0,
RequestTime: time.Now(),
options: options,
configs: configs,
}
return configs
for {
responseCtx := a.processRequestWithCtx(&requestCtx)
a.writeMetricRequest(responseCtx)
if requestCtx.p != nil {
a.UpdateProxy(requestCtx.p)
}
if responseCtx.Error == nil {
return responseCtx.Response, nil
}
if !responseCtx.ShouldRetry {
return nil, responseCtx.Error
}
}
}
func applyHeaders(r *http.Request, configs []*HostConfig) *http.Request {
for _, conf := range configs {
for k, v := range conf.Headers {
r.Header.Set(k, v)
}
func (lim *RedisLimiter) waitRateLimit() (time.Duration, error) {
result, err := lim.Limiter.Allow(lim.Key)
if err != nil {
return 0, err
}
return r
if result.RetryAfter > 0 {
time.Sleep(result.RetryAfter)
}
return result.RetryAfter, nil
}
func computeRules(ctx *RequestCtx, configs []*HostConfig) (dontRetry, forceRetry bool,
limitMultiplier, newLimit float64, shouldRetry bool) {
dontRetry = false
forceRetry = false
shouldRetry = false
limitMultiplier = 1
func (a *Architeuthis) processRequestWithCtx(rCtx *RequestCtx) ResponseCtx {
for _, conf := range configs {
for _, rule := range conf.Rules {
if rule.Matches(ctx) {
switch rule.Action {
case DontRetry:
dontRetry = true
case MultiplyEvery:
limitMultiplier = rule.Arg
case SetEvery:
newLimit = rule.Arg
case ForceRetry:
forceRetry = true
case ShouldRetry:
shouldRetry = true
}
}
}
if !rCtx.LastErrorWasProxyError && rCtx.Retries > config.Retries {
return ResponseCtx{Error: errors.Errorf("Giving up after %d retries", rCtx.Retries)}
}
name, err := a.ChooseProxy(rCtx)
if err != nil {
return ResponseCtx{Error: err}
}
logrus.WithFields(logrus.Fields{
"proxy": name,
"host": rCtx.Request.Host,
}).Info("Routing request")
p, err := a.GetProxy(name)
if err != nil {
return ResponseCtx{Error: err}
}
rCtx.p = p
response, err := a.processRequestWithProxy(rCtx)
responseCtx := ResponseCtx{
Response: response,
ResponseTime: time.Now().Sub(rCtx.RequestTime).Seconds(),
Error: err,
}
p.incrReqTime = responseCtx.ResponseTime
if response != nil && isHttpSuccessCode(response.StatusCode) {
p.incrGood += 1
return responseCtx
}
rCtx.LastFailedProxy = p.Name
if isProxyError(err) {
a.handleFatalProxyError(p)
rCtx.LastErrorWasProxyError = true
responseCtx.ShouldRetry = true
return responseCtx
}
if err != nil {
if isPermanentError(err) {
a.handleProxyError(p, &responseCtx)
return responseCtx
}
a.waitAfterFail(rCtx)
a.handleProxyError(p, &responseCtx)
responseCtx.ShouldRetry = true
}
dontRetry, forceRetry, shouldRetry := computeRules(rCtx, responseCtx)
if forceRetry {
responseCtx.ShouldRetry = true
return responseCtx
} else if dontRetry {
responseCtx.Error = errors.Errorf("Applied dont_retry rule")
return responseCtx
}
if response == nil {
return responseCtx
}
// Handle HTTP errors
responseCtx.Error = errors.Errorf("HTTP error: %d", response.StatusCode)
if shouldRetry || shouldRetryHttpCode(response.StatusCode) {
responseCtx.ShouldRetry = true
}
return responseCtx
}
func (a *Architeuthis) waitAfterFail(rCtx *RequestCtx) {
wait := getWaitTime(rCtx.Retries)
time.Sleep(wait)
a.writeMetricSleep(wait, "retry")
rCtx.Retries += 1
}
func isRemoteProxy(p *Proxy) bool {
return p.HttpClient.Transport != nil
}
func (a *Architeuthis) handleProxyError(p *Proxy, rCtx *ResponseCtx) {
if isRemoteProxy(p) && shouldBlameProxy(rCtx) {
p.incrBad += 1
p.BadRequestCount += 1
}
}
func (a *Architeuthis) handleFatalProxyError(p *Proxy) {
a.setDead(p.Name)
}
func (a *Architeuthis) processRequestWithProxy(rCtx *RequestCtx) (r *http.Response, e error) {
a.incConns(rCtx.p.Name)
limiter := a.getLimiter(rCtx)
duration, err := limiter.waitRateLimit()
if err != nil {
return nil, err
}
if duration > 0 {
a.writeMetricSleep(duration, "rate")
}
r, e = rCtx.p.HttpClient.Do(rCtx.Request)
return
}
func (p *Proxy) processRequest(r *http.Request) (*http.Response, error) {
func (a *Architeuthis) Run() {
atomic.AddInt32(p.Connections, 1)
defer func() {
atomic.AddInt32(p.Connections, -1)
}()
retries := 0
additionalRetries := 0
configs := getConfsMatchingRequest(r)
sHost := simplifyHost(r.Host)
limiter := p.getLimiter(sHost)
proxyReq := applyHeaders(cloneRequest(r), configs)
for {
p.waitRateLimit(limiter)
if retries >= config.Retries+additionalRetries || retries > config.RetriesHard {
return nil, errors.Errorf("giving up after %d retries", retries)
}
ctx := &RequestCtx{
RequestTime: time.Now(),
}
var err error
ctx.Response, err = p.HttpClient.Do(proxyReq)
if err != nil {
if isPermanentError(err) {
return nil, err
}
dontRetry, forceRetry, limitMultiplier, newLimit, _ := computeRules(ctx, configs)
if forceRetry {
additionalRetries += 1
} else if dontRetry {
return nil, errors.Errorf("Applied dont_retry rule for (%s)", err)
}
p.applyLimiterRules(newLimit, limiter, limitMultiplier)
wait := waitTime(retries)
logrus.WithError(err).WithFields(logrus.Fields{
"wait": wait,
}).Trace("Temporary error during request")
time.Sleep(wait)
retries += 1
continue
}
// Compute rules
dontRetry, forceRetry, limitMultiplier, newLimit, shouldRetry := computeRules(ctx, configs)
if forceRetry {
additionalRetries += 1
} else if dontRetry {
return nil, errors.Errorf("Applied dont_retry rule")
}
p.applyLimiterRules(newLimit, limiter, limitMultiplier)
if isHttpSuccessCode(ctx.Response.StatusCode) {
return ctx.Response, nil
} else if forceRetry || shouldRetry || shouldRetryHttpCode(ctx.Response.StatusCode) {
wait := waitTime(retries)
logrus.WithFields(logrus.Fields{
"wait": wait,
"status": ctx.Response.StatusCode,
}).Trace("HTTP error during request")
time.Sleep(wait)
retries += 1
continue
} else {
return nil, errors.Errorf("HTTP error: %d", ctx.Response.StatusCode)
}
}
}
func (p *Proxy) applyLimiterRules(newLimit float64, limiter *rate.Limiter, limitMultiplier float64) {
if newLimit != 0 {
limiter.SetLimit(rate.Limit(newLimit))
} else if limitMultiplier != 1 {
limiter.SetLimit(limiter.Limit() * rate.Limit(1/limitMultiplier))
}
}
func (b *Balancer) Run() {
//b.Verbose = true
logrus.WithFields(logrus.Fields{
"addr": config.Addr,
}).Info("Listening")
err := http.ListenAndServe(config.Addr, b.server)
err := http.ListenAndServe(config.Addr, a.server)
logrus.Fatal(err)
}
func cloneRequest(r *http.Request) *http.Request {
proxyReq := &http.Request{
Method: r.Method,
URL: r.URL,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: r.Header,
Body: r.Body,
Host: r.URL.Host,
}
return proxyReq
}
func NewProxy(name, stringUrl string) (*Proxy, error) {
var parsedUrl *url.URL
var err error
if stringUrl != "" {
parsedUrl, err = url.Parse(stringUrl)
if err != nil {
return nil, err
}
} else {
parsedUrl = nil
}
var httpClient *http.Client
if parsedUrl == nil {
httpClient = &http.Client{}
} else {
httpClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(parsedUrl),
},
}
}
httpClient.Timeout = config.Timeout
p := &Proxy{
Name: name,
Url: parsedUrl,
HttpClient: httpClient,
}
conns := int32(0)
p.Connections = &conns
return p, nil
}
func main() {
logrus.SetLevel(logrus.TraceLevel)
balancer := New()
balancer.reloadConfig()
balancer.setupGarbageCollector()
balancer.Run()
}

212
models.go Normal file
View File

@ -0,0 +1,212 @@
package main
import (
"github.com/elazarl/goproxy"
redisPackage "github.com/go-redis/redis"
"github.com/go-redis/redis_rate"
influx "github.com/influxdata/influxdb1-client/v2"
"math"
"net/http"
"net/url"
"time"
)
type Architeuthis struct {
server *goproxy.ProxyHttpServer
redis *redisPackage.Client
influxdb influx.Client
points chan *influx.Point
}
// Request/Response
type RequestCtx struct {
Request *http.Request
Retries int
LastFailedProxy string
p *Proxy
LastErrorWasProxyError bool
RequestTime time.Time
options RequestOptions
configs []*HostConfig
}
type ResponseCtx struct {
Response *http.Response
ResponseTime float64
Error error
ShouldRetry bool
}
type RequestOptions struct {
DoCloudflareBypass bool
}
// Proxy
type Proxy struct {
Name string
Url *url.URL
HttpClient *http.Client
GoodRequestCount int64
incrGood int64
BadRequestCount int64
incrBad int64
TotalRequestTime float64
incrReqTime float64
Connections int64
KillOnError bool
}
func (p *Proxy) AvgLatency() float64 {
return p.TotalRequestTime / float64(p.GoodRequestCount+p.BadRequestCount)
}
func (p *Proxy) Score() float64 {
if p.GoodRequestCount+p.BadRequestCount == 0 {
return 1000
}
var errorMod float64
var latencyMod float64
if p.BadRequestCount == 0 {
errorMod = 1
} else {
errorMod = math.Min(float64(p.GoodRequestCount)/float64(p.BadRequestCount), 1)
}
avgLatency := p.AvgLatency()
switch {
case avgLatency < 3:
latencyMod = 1
case avgLatency < 4:
latencyMod = 0.8
case avgLatency < 5:
latencyMod = 0.7
case avgLatency < 9:
latencyMod = 0.6
case avgLatency < 10:
latencyMod = 0.5
case avgLatency < 15:
latencyMod = 0.3
case avgLatency < 20:
latencyMod = 0.1
default:
latencyMod = 0
}
return 600*errorMod + 400*latencyMod - 200*(math.Max(float64(p.Connections-1), 0))
}
func (p *Proxy) getStats() proxyStat {
return proxyStat{
Name: p.Name,
Url: p.Url.String(),
GoodRequestCount: p.GoodRequestCount,
BadRequestCount: p.BadRequestCount,
AvgLatency: p.AvgLatency(),
Connections: p.Connections,
Score: int64(p.Score()),
}
}
type proxyStat struct {
Name string
Url string
GoodRequestCount int64
BadRequestCount int64
AvgLatency float64
Connections int64
Score int64
}
type statsData struct {
TotalGood int
TotalBad int
Connections int
AvgLatency float64
AvgScore float64
Proxies []proxyStat
}
type CheckMethod string
const (
CheckIp CheckMethod = "check_ip"
HttpOk CheckMethod = "http_ok"
)
type ProxyJudge struct {
url *url.URL
method CheckMethod
}
type RedisLimiter struct {
Key string
Limiter *redis_rate.Limiter
}
// Config
type HostConfig struct {
Host string `json:"host"`
EveryStr string `json:"every"`
Burst int `json:"burst"`
Headers map[string]string `json:"headers"`
RawRules []*RawHostRule `json:"rules"`
IsGlob bool
Every time.Duration
Rules []*HostRule
}
type RawHostRule struct {
Condition string `json:"condition"`
Action string `json:"action"`
Arg string `json:"arg"`
}
type HostRuleAction int
const (
DontRetry HostRuleAction = 0
ForceRetry HostRuleAction = 1
ShouldRetry HostRuleAction = 2
)
type HostRule struct {
Matches func(r *ResponseCtx) bool
Action HostRuleAction
Arg float64
}
type ProxyConfig struct {
Name string `json:"name"`
Url string `json:"url"`
}
var config struct {
Addr string `json:"addr"`
TimeoutStr string `json:"timeout"`
WaitStr string `json:"wait"`
Multiplier float64 `json:"multiplier"`
Retries int `json:"retries"`
MaxErrorRatio float64 `json:"max_error"`
Hosts []*HostConfig `json:"hosts"`
Proxies []ProxyConfig `json:"proxies"`
RedisUrl string `json:"redis_url"`
Wait int64
Timeout time.Duration
DefaultConfig *HostConfig
Routing bool
}

293
redis.go Normal file
View File

@ -0,0 +1,293 @@
package main
import (
"errors"
"github.com/go-redis/redis"
"github.com/go-redis/redis_rate"
"github.com/sirupsen/logrus"
"math"
"math/rand"
"net/http"
"net/url"
"strconv"
)
const KeyProxyList = "proxies"
const KeyDeadProxyList = "deadProxies"
const PrefixProxy = "proxy:"
const KeyConnectionCount = "conn"
const KeyRequestTime = "reqtime"
const KeyBadRequestCount = "bad"
const KeyGoodRequestCount = "good"
const KeyRevived = "revived"
const KeyUrl = "url"
func (a *Architeuthis) getLimiter(rCtx *RequestCtx) *RedisLimiter {
var hostConfig *HostConfig
if len(rCtx.configs) == 0 {
hostConfig = config.DefaultConfig
} else {
hostConfig = rCtx.configs[len(rCtx.configs)-1]
}
return &RedisLimiter{
Key: hostConfig.Host + ":" + rCtx.p.Name,
Limiter: redis_rate.NewLimiter(a.redis, &redis_rate.Limit{
Rate: 1,
Period: hostConfig.Every,
Burst: hostConfig.Burst,
}),
}
}
func (a *Architeuthis) UpdateProxy(p *Proxy) {
key := PrefixProxy + p.Name
pipe := a.redis.Pipeline()
if p.incrBad != 0 {
pipe.HIncrBy(key, KeyBadRequestCount, p.incrBad)
p.BadRequestCount += p.incrBad
} else {
pipe.HIncrBy(key, KeyGoodRequestCount, p.incrGood)
p.GoodRequestCount += p.incrGood
if p.KillOnError {
pipe.HSet(key, KeyRevived, 0)
}
}
pipe.HIncrByFloat(key, KeyRequestTime, p.incrReqTime)
p.TotalRequestTime += p.incrReqTime
pipe.HIncrBy(key, KeyConnectionCount, -1)
pipe.ZAddXX(KeyProxyList, &redis.Z{
Score: p.Score(),
Member: p.Name,
})
_, _ = pipe.Exec()
newBadRatio := float64(p.BadRequestCount) / float64(p.GoodRequestCount)
if p.incrBad > 0 && (p.KillOnError || (newBadRatio > config.MaxErrorRatio && p.BadRequestCount >= 5)) {
a.setDead(p.Name)
}
}
func (a *Architeuthis) AddProxy(name, stringUrl string) error {
_, err := url.Parse(stringUrl)
if err != nil {
return err
}
pipe := a.redis.Pipeline()
pipe.HMSet(PrefixProxy+name, map[string]interface{}{
KeyUrl: stringUrl,
KeyRequestTime: 0,
KeyGoodRequestCount: 0,
KeyBadRequestCount: 0,
KeyConnectionCount: 0,
KeyRevived: 0,
})
zadd := pipe.ZAdd(KeyProxyList, &redis.Z{
Score: 1000,
Member: name,
})
zcard := pipe.ZCard(KeyProxyList)
_, _ = pipe.Exec()
if zadd.Val() != 0 {
logrus.WithFields(logrus.Fields{
KeyUrl: stringUrl,
}).Info("Add proxy")
a.writeMetricProxyCount(int(zcard.Val()))
}
return nil
}
func (a *Architeuthis) incConns(name string) int64 {
res, _ := a.redis.HIncrBy(PrefixProxy+name, KeyConnectionCount, 1).Result()
return res
}
func (a *Architeuthis) setDead(name string) {
pipe := a.redis.Pipeline()
pipe.ZRem(KeyProxyList, name)
pipe.SAdd(KeyDeadProxyList, name)
count := pipe.ZCard(KeyProxyList)
_, _ = pipe.Exec()
logrus.WithFields(logrus.Fields{
"proxy": name,
}).Trace("dead")
a.writeMetricProxyCount(int(count.Val()))
}
func (a *Architeuthis) setAlive(name string) {
pipe := a.redis.Pipeline()
pipe.SRem(KeyDeadProxyList, name)
pipe.HMSet(KeyProxyList+name, map[string]interface{}{
KeyRevived: 1,
KeyRequestTime: 0,
KeyGoodRequestCount: 0,
KeyBadRequestCount: 0,
KeyConnectionCount: 0,
})
pipe.ZAdd(KeyProxyList, &redis.Z{
Score: 1000,
Member: name,
})
count := pipe.ZCard(KeyProxyList)
_, _ = pipe.Exec()
logrus.WithFields(logrus.Fields{
"proxy": name,
}).Trace("revive")
a.writeMetricProxyCount(int(count.Val()))
}
func (a *Architeuthis) GetDeadProxies() []*Proxy {
result, err := a.redis.SMembers(KeyDeadProxyList).Result()
if err != nil {
return nil
}
return a.getProxies(result)
}
func (a *Architeuthis) GetAliveProxies() []*Proxy {
result, err := a.redis.ZRange(KeyProxyList, 0, math.MaxInt64).Result()
if err != nil {
return nil
}
return a.getProxies(result)
}
func (a *Architeuthis) getProxies(names []string) []*Proxy {
var proxies []*Proxy
for _, name := range names {
p, _ := a.GetProxy(name)
if p != nil {
proxies = append(proxies, p)
}
}
return proxies
}
func (a *Architeuthis) getStats() statsData {
data := statsData{}
var totalTime float64 = 0
var totalScore int64 = 0
for _, p := range a.GetAliveProxies() {
stat := p.getStats()
data.Proxies = append(data.Proxies, stat)
data.TotalBad += int(p.BadRequestCount)
data.TotalGood += int(p.GoodRequestCount)
data.Connections += int(p.Connections)
totalTime += p.TotalRequestTime
totalScore += stat.Score
}
data.AvgLatency = totalTime / float64(data.TotalGood+data.TotalBad)
data.AvgScore = float64(totalScore) / float64(len(data.Proxies))
return data
}
func (a *Architeuthis) GetProxy(name string) (*Proxy, error) {
result, err := a.redis.HGetAll(PrefixProxy + name).Result()
if err != nil {
return nil, err
}
var parsedUrl *url.URL
var httpClient *http.Client
if result[KeyUrl] == "" {
parsedUrl = nil
httpClient = &http.Client{
Timeout: config.Timeout,
}
} else {
parsedUrl, err = url.Parse(result[KeyUrl])
if err != nil {
return nil, err
}
httpClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyURL(parsedUrl),
},
Timeout: config.Timeout,
}
}
conns, _ := strconv.ParseInt(result[KeyConnectionCount], 10, 64)
good, _ := strconv.ParseInt(result[KeyGoodRequestCount], 10, 64)
bad, _ := strconv.ParseInt(result[KeyBadRequestCount], 10, 64)
reqtime, _ := strconv.ParseFloat(result[KeyRequestTime], 64)
return &Proxy{
Name: name,
Url: parsedUrl,
HttpClient: httpClient,
Connections: conns,
GoodRequestCount: good,
BadRequestCount: bad,
TotalRequestTime: reqtime,
}, nil
}
func (a *Architeuthis) ChooseProxy(rCtx *RequestCtx) (string, error) {
results, err := a.redis.ZRevRangeWithScores(KeyProxyList, 0, 12).Result()
if err != nil {
return "", err
}
if len(results) == 0 {
return "", errors.New("no proxies available")
}
if len(results) == 1 {
return results[0].Member.(string), nil
}
for {
idx := rand.Intn(len(results))
if results[idx].Member != rCtx.LastFailedProxy {
return results[idx].Member.(string), nil
}
}
}

53
templates/stats.html Normal file
View File

@ -0,0 +1,53 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Architeuthis - Stats</title>
<style>
tr:nth-child(even) {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<table>
<thead>
<tr>
<th>Proxy</th>
<th>Url</th>
<th>Conns</th>
<th>Good</th>
<th>Bad</th>
<th>Latency</th>
<th>Score</th>
</tr>
</thead>
<tbody>
{{ range .Proxies}}
<tr>
<td>{{ .Name}}</td>
<td>{{ .Url}}</td>
<td>{{ .Connections}}</td>
<td>{{ .GoodRequestCount}}</td>
<td>{{ .BadRequestCount}}</td>
<td>{{ printf "%.2f" .AvgLatency}}</td>
<td>{{ .Score}}</td>
</tr>
{{end}}
</tbody>
<tfoot>
<tr>
<td colspan="2">Total</td>
<td>{{ .Connections}}</td>
<td>{{ .TotalGood}}</td>
<td>{{ .TotalBad}}</td>
<td>{{ printf "%.2f" .AvgLatency}}</td>
<td>{{ printf "%.2f" .AvgScore}}</td>
</tr>
</tfoot>
</table>
</body>
</html>

93
util.go Normal file
View File

@ -0,0 +1,93 @@
package main
import (
"github.com/ryanuber/go-glob"
"net/http"
"strings"
)
func normalizeHost(host string) string {
col := strings.LastIndex(host, ":")
if col > 0 {
host = host[:col]
}
return "." + host
}
func parseOptions(header *http.Header) RequestOptions {
opts := RequestOptions{}
cfParam := header.Get("X-Architeuthis-CF-Bypass")
if cfParam != "" {
header.Del("X-Architeuthis-CF-Bypass")
opts.DoCloudflareBypass = true
}
return opts
}
func getConfigsMatchingHost(sHost string) []*HostConfig {
configs := make([]*HostConfig, 0)
for _, conf := range config.Hosts {
if glob.Glob(conf.Host, sHost) {
configs = append(configs, conf)
}
}
return configs
}
func applyHeaders(r *http.Request, configs []*HostConfig) *http.Request {
for _, conf := range configs {
for k, v := range conf.Headers {
r.Header.Set(k, v)
}
}
return r
}
func computeRules(requestCtx *RequestCtx, responseCtx ResponseCtx) (dontRetry, forceRetry bool, shouldRetry bool) {
dontRetry = false
forceRetry = false
shouldRetry = false
for _, conf := range requestCtx.configs {
for _, rule := range conf.Rules {
if rule.Matches(&responseCtx) {
switch rule.Action {
case DontRetry:
dontRetry = true
case ForceRetry:
forceRetry = true
case ShouldRetry:
shouldRetry = true
}
}
}
}
return
}
func cloneRequest(r *http.Request) *http.Request {
proxyReq := &http.Request{
Method: r.Method,
URL: r.URL,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: r.Header,
Body: r.Body,
Host: r.URL.Host,
}
return proxyReq
}