diff --git a/README.md b/README.md index 62b7e0a..7e21e7f 100644 --- a/README.md +++ b/README.md @@ -1 +1,47 @@ -wip \ No newline at end of file +# Architeuthis ? + +[![CodeFactor](https://www.codefactor.io/repository/github/simon987/architeuthis/badge)](https://www.codefactor.io/repository/github/simon987/architeuthis) + +*NOTE: this is very WIP* + +HTTP(S) proxy with integrated load-balancing, rate-limiting +and error handling. Built for automated web scraping. + +* Strictly obeys configured rate-limiting for each IP & Host +* Seamless exponential backoff retries on timeout or error HTTP codes +* Requires no additional configuration for integration into existing programs + +### Typical use case +![user_case](use_case.png) + +### Sample configuration + +```json +{ + "addr": "localhost:5050", + "proxies": [ + { + "name": "squid_P0", + "url": "http://p0.exemple.com:8080" + }, + { + "name": "privoxy_P1", + "url": "http://p1.exemple.com:8080" + } + ], + "hosts": { + "*": { + "every": "750ms", + "burst": 5, + "headers": {} + }, + "reddit.com": { + "every": "2s", + "burst": 1, + "headers": {"User-Agent": "mybot_v0.1"} + }, + ... + } +} +``` + diff --git a/config.go b/config.go index f7da327..c7aa662 100644 --- a/config.go +++ b/config.go @@ -42,7 +42,10 @@ func applyConfig(proxy *Proxy) { for host, conf := range config.Hosts { duration, err := time.ParseDuration(conf.Every) handleErr(err) - proxy.Limiters.Store(host, rate.NewLimiter(rate.Every(duration), conf.Burst)) + proxy.Limiters[host] = &ExpiringLimiter{ + rate.NewLimiter(rate.Every(duration), conf.Burst), + time.Now(), + } } } diff --git a/config.json b/config.json index 1e7ec8b..f115997 100644 --- a/config.json +++ b/config.json @@ -7,7 +7,7 @@ }, { "name": "p1", - "url": "http://localhost:3128" + "url": "" } ], "hosts": { diff --git a/gc.go b/gc.go new file mode 100644 index 0000000..f274a83 --- /dev/null +++ b/gc.go @@ -0,0 +1,65 @@ +package main + +import ( + "github.com/robfig/cron" + "github.com/sirupsen/logrus" + "time" +) + +func (b *Balancer) setupGarbageCollector() { + + const gcInterval = time.Minute * 5 + + gcCron := cron.New() + gcSchedule := cron.Every(gcInterval) + gcCron.Schedule(gcSchedule, cron.FuncJob(b.cleanAllExpiredLimits)) + + go gcCron.Run() + + logrus.WithFields(logrus.Fields{ + "every": gcInterval, + }).Info("Started task cleanup cron") +} + +func (b *Balancer) cleanAllExpiredLimits() { + before := 0 + after := 0 + for _, p := range b.proxies { + before += len(p.Limiters) + cleanExpiredLimits(p) + after += len(p.Limiters) + } + + logrus.WithFields(logrus.Fields{ + "removed": before - after, + }).Info("Did limiters cleanup") +} + +func cleanExpiredLimits(proxy *Proxy) { + + const ttl = time.Second + + limits := make(map[string]*ExpiringLimiter, 0) + now := time.Now() + + for host, limiter := range proxy.Limiters { + if now.Sub(limiter.LastRead) > ttl && shouldPruneLimiter(host) { + logrus.WithFields(logrus.Fields{ + "proxy": proxy.Name, + "limiter": host, + "last_read": now.Sub(limiter.LastRead), + }).Trace("Pruning limiter") + } else { + limits[host] = limiter + } + } + + proxy.Limiters = limits +} + +func shouldPruneLimiter(host string) bool { + + // Don't remove hosts that are coming from the config + _, ok := config.Hosts[host] + return !ok +} diff --git a/main.go b/main.go index aa230c8..1a28fc2 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,6 @@ import ( "net/url" "sort" "strings" - "sync" "time" ) @@ -19,10 +18,15 @@ type Balancer struct { proxies []*Proxy } +type ExpiringLimiter struct { + Limiter *rate.Limiter + LastRead time.Time +} + type Proxy struct { Name string Url *url.URL - Limiters sync.Map + Limiters map[string]*ExpiringLimiter HttpClient *http.Client Connections int } @@ -41,33 +45,33 @@ func (a ByConnectionCount) Less(i, j int) bool { return a[i].Connections < a[j].Connections } -func LogRequestMiddleware(h goproxy.FuncReqHandler) goproxy.ReqHandler { - return goproxy.FuncReqHandler(func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { - - logrus.WithFields(logrus.Fields{ - "host": r.Host, - }).Trace(strings.ToUpper(r.URL.Scheme) + " " + r.Method) - - return h(r, ctx) - }) -} - -//TODO: expiration ? func (p *Proxy) getLimiter(host string) *rate.Limiter { - limiter, ok := p.Limiters.Load(host) + expLimit, ok := p.Limiters[host] if !ok { - - every, _ := time.ParseDuration("1ms") - limiter = rate.NewLimiter(rate.Every(every), 1) - p.Limiters.Store(host, limiter) - - logrus.WithFields(logrus.Fields{ - "host": host, - }).Trace("New limiter") + newExpiringLimiter := p.makeNewLimiter(host) + return newExpiringLimiter.Limiter } - return limiter.(*rate.Limiter) + expLimit.LastRead = time.Now() + return expLimit.Limiter +} + +func (p *Proxy) makeNewLimiter(host string) *ExpiringLimiter { + every := time.Millisecond //todo load default from conf + + newExpiringLimiter := &ExpiringLimiter{ + LastRead: time.Now(), + Limiter: rate.NewLimiter(rate.Every(every), 1), + } + + p.Limiters[host] = newExpiringLimiter + + logrus.WithFields(logrus.Fields{ + "host": host, + }).Trace("New limiter") + + return newExpiringLimiter } func simplifyHost(host string) string { @@ -92,7 +96,7 @@ func New() *Balancer { balancer.server.OnRequest().HandleConnect(goproxy.AlwaysMitm) - balancer.server.OnRequest().Do(LogRequestMiddleware( + balancer.server.OnRequest().DoFunc( func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { p := balancer.chooseProxy() @@ -110,7 +114,7 @@ func New() *Balancer { } return nil, resp - })) + }) return balancer } @@ -234,6 +238,7 @@ func NewProxy(name, stringUrl string) (*Proxy, error) { Name: name, Url: parsedUrl, HttpClient: httpClient, + Limiters: make(map[string]*ExpiringLimiter), }, nil } @@ -256,5 +261,6 @@ func main() { }).Info("Proxy") } + balancer.setupGarbageCollector() balancer.Run() } diff --git a/test/web.py b/test/web.py index cebbdd7..d63c50c 100644 --- a/test/web.py +++ b/test/web.py @@ -17,6 +17,7 @@ def e500(): @app.route("/404") def e404(): + time.sleep(0.5) return Response(status=404) diff --git a/use_case.dia b/use_case.dia new file mode 100644 index 0000000..2575c2b Binary files /dev/null and b/use_case.dia differ diff --git a/use_case.png b/use_case.png new file mode 100644 index 0000000..90aa0f4 Binary files /dev/null and b/use_case.png differ