garbage collector for limits, add readme

This commit is contained in:
simon 2019-05-29 17:41:59 -04:00
parent e15cab98ef
commit 4e15ede60f
8 changed files with 150 additions and 29 deletions

View File

@ -1 +1,47 @@
wip
# Architeuthis ?
[![CodeFactor](https://www.codefactor.io/repository/github/simon987/architeuthis/badge)](https://www.codefactor.io/repository/github/simon987/architeuthis)
*NOTE: this is very WIP*
HTTP(S) proxy with integrated load-balancing, rate-limiting
and error handling. Built for automated web scraping.
* Strictly obeys configured rate-limiting for each IP & Host
* Seamless exponential backoff retries on timeout or error HTTP codes
* Requires no additional configuration for integration into existing programs
### Typical use case
![user_case](use_case.png)
### Sample configuration
```json
{
"addr": "localhost:5050",
"proxies": [
{
"name": "squid_P0",
"url": "http://p0.exemple.com:8080"
},
{
"name": "privoxy_P1",
"url": "http://p1.exemple.com:8080"
}
],
"hosts": {
"*": {
"every": "750ms",
"burst": 5,
"headers": {}
},
"reddit.com": {
"every": "2s",
"burst": 1,
"headers": {"User-Agent": "mybot_v0.1"}
},
...
}
}
```

View File

@ -42,7 +42,10 @@ func applyConfig(proxy *Proxy) {
for host, conf := range config.Hosts {
duration, err := time.ParseDuration(conf.Every)
handleErr(err)
proxy.Limiters.Store(host, rate.NewLimiter(rate.Every(duration), conf.Burst))
proxy.Limiters[host] = &ExpiringLimiter{
rate.NewLimiter(rate.Every(duration), conf.Burst),
time.Now(),
}
}
}

View File

@ -7,7 +7,7 @@
},
{
"name": "p1",
"url": "http://localhost:3128"
"url": ""
}
],
"hosts": {

65
gc.go Normal file
View File

@ -0,0 +1,65 @@
package main
import (
"github.com/robfig/cron"
"github.com/sirupsen/logrus"
"time"
)
func (b *Balancer) setupGarbageCollector() {
const gcInterval = time.Minute * 5
gcCron := cron.New()
gcSchedule := cron.Every(gcInterval)
gcCron.Schedule(gcSchedule, cron.FuncJob(b.cleanAllExpiredLimits))
go gcCron.Run()
logrus.WithFields(logrus.Fields{
"every": gcInterval,
}).Info("Started task cleanup cron")
}
func (b *Balancer) cleanAllExpiredLimits() {
before := 0
after := 0
for _, p := range b.proxies {
before += len(p.Limiters)
cleanExpiredLimits(p)
after += len(p.Limiters)
}
logrus.WithFields(logrus.Fields{
"removed": before - after,
}).Info("Did limiters cleanup")
}
func cleanExpiredLimits(proxy *Proxy) {
const ttl = time.Second
limits := make(map[string]*ExpiringLimiter, 0)
now := time.Now()
for host, limiter := range proxy.Limiters {
if now.Sub(limiter.LastRead) > ttl && shouldPruneLimiter(host) {
logrus.WithFields(logrus.Fields{
"proxy": proxy.Name,
"limiter": host,
"last_read": now.Sub(limiter.LastRead),
}).Trace("Pruning limiter")
} else {
limits[host] = limiter
}
}
proxy.Limiters = limits
}
func shouldPruneLimiter(host string) bool {
// Don't remove hosts that are coming from the config
_, ok := config.Hosts[host]
return !ok
}

58
main.go
View File

@ -10,7 +10,6 @@ import (
"net/url"
"sort"
"strings"
"sync"
"time"
)
@ -19,10 +18,15 @@ type Balancer struct {
proxies []*Proxy
}
type ExpiringLimiter struct {
Limiter *rate.Limiter
LastRead time.Time
}
type Proxy struct {
Name string
Url *url.URL
Limiters sync.Map
Limiters map[string]*ExpiringLimiter
HttpClient *http.Client
Connections int
}
@ -41,33 +45,33 @@ func (a ByConnectionCount) Less(i, j int) bool {
return a[i].Connections < a[j].Connections
}
func LogRequestMiddleware(h goproxy.FuncReqHandler) goproxy.ReqHandler {
return goproxy.FuncReqHandler(func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
logrus.WithFields(logrus.Fields{
"host": r.Host,
}).Trace(strings.ToUpper(r.URL.Scheme) + " " + r.Method)
return h(r, ctx)
})
}
//TODO: expiration ?
func (p *Proxy) getLimiter(host string) *rate.Limiter {
limiter, ok := p.Limiters.Load(host)
expLimit, ok := p.Limiters[host]
if !ok {
every, _ := time.ParseDuration("1ms")
limiter = rate.NewLimiter(rate.Every(every), 1)
p.Limiters.Store(host, limiter)
logrus.WithFields(logrus.Fields{
"host": host,
}).Trace("New limiter")
newExpiringLimiter := p.makeNewLimiter(host)
return newExpiringLimiter.Limiter
}
return limiter.(*rate.Limiter)
expLimit.LastRead = time.Now()
return expLimit.Limiter
}
func (p *Proxy) makeNewLimiter(host string) *ExpiringLimiter {
every := time.Millisecond //todo load default from conf
newExpiringLimiter := &ExpiringLimiter{
LastRead: time.Now(),
Limiter: rate.NewLimiter(rate.Every(every), 1),
}
p.Limiters[host] = newExpiringLimiter
logrus.WithFields(logrus.Fields{
"host": host,
}).Trace("New limiter")
return newExpiringLimiter
}
func simplifyHost(host string) string {
@ -92,7 +96,7 @@ func New() *Balancer {
balancer.server.OnRequest().HandleConnect(goproxy.AlwaysMitm)
balancer.server.OnRequest().Do(LogRequestMiddleware(
balancer.server.OnRequest().DoFunc(
func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
p := balancer.chooseProxy()
@ -110,7 +114,7 @@ func New() *Balancer {
}
return nil, resp
}))
})
return balancer
}
@ -234,6 +238,7 @@ func NewProxy(name, stringUrl string) (*Proxy, error) {
Name: name,
Url: parsedUrl,
HttpClient: httpClient,
Limiters: make(map[string]*ExpiringLimiter),
}, nil
}
@ -256,5 +261,6 @@ func main() {
}).Info("Proxy")
}
balancer.setupGarbageCollector()
balancer.Run()
}

View File

@ -17,6 +17,7 @@ def e500():
@app.route("/404")
def e404():
time.sleep(0.5)
return Response(status=404)

BIN
use_case.dia Normal file

Binary file not shown.

BIN
use_case.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB