Architeuthis/main.go
2020-01-03 12:25:24 -05:00

296 lines
6.1 KiB
Go

package main
import (
"fmt"
"github.com/elazarl/goproxy"
"github.com/go-redis/redis/v7"
influx "github.com/influxdata/influxdb1-client/v2"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"html/template"
"net/http"
"time"
)
func New() *Architeuthis {
a := new(Architeuthis)
a.reloadConfig()
a.redis = redis.NewClient(&redis.Options{
Addr: config.RedisUrl,
Password: "",
DB: 0,
})
a.setupProxyReviver()
a.server = goproxy.NewProxyHttpServer()
a.server.OnRequest().HandleConnect(goproxy.AlwaysMitm)
a.server.OnRequest().DoFunc(
func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
resp, err := a.processRequest(r)
if err != nil {
logrus.WithError(err).Trace("Could not complete request")
if resp != nil {
return nil, resp
} else {
return nil, goproxy.NewResponse(r, "text/plain", http.StatusInternalServerError, err.Error())
}
}
return nil, resp
})
mux := http.NewServeMux()
a.server.NonproxyHandler = mux
mux.HandleFunc("/reload", func(w http.ResponseWriter, r *http.Request) {
a.reloadConfig()
_, _ = fmt.Fprint(w, "Reloaded\n")
})
templ, _ := template.ParseFiles("templates/stats.html")
mux.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) {
_ = templ.Execute(w, a.getStats())
})
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = fmt.Fprint(w, "{\"name\":\"Architeuthis\",\"version\":2.1}")
})
mux.HandleFunc("/add_proxy", func(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("name")
url := r.URL.Query().Get("url")
if name == "" || url == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
err := a.AddProxy(name, url)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"name": name,
"url": url,
}).Error("Could not add proxy")
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
})
return a
}
func (a *Architeuthis) processRequest(r *http.Request) (*http.Response, error) {
sHost := normalizeHost(r.Host)
configs := getConfigsMatchingHost(sHost)
options := parseOptions(&r.Header)
proxyReq := applyHeaders(cloneRequest(r), configs)
requestCtx := RequestCtx{
Request: proxyReq,
Retries: 0,
RequestTime: time.Now(),
options: options,
configs: configs,
}
for {
responseCtx := a.processRequestWithCtx(&requestCtx)
a.writeMetricRequest(responseCtx)
if requestCtx.p != nil {
a.UpdateProxy(requestCtx.p)
}
if responseCtx.Error == nil {
return responseCtx.Response, nil
}
if !responseCtx.ShouldRetry {
return responseCtx.Response, responseCtx.Error
}
}
}
func (lim *RedisLimiter) waitRateLimit() (time.Duration, error) {
result, err := lim.Limiter.Allow(lim.Key, lim.Limit)
if err != nil {
return 0, err
}
if result.RetryAfter > 0 {
time.Sleep(result.RetryAfter)
}
return result.RetryAfter, nil
}
func (a *Architeuthis) processRequestWithCtx(rCtx *RequestCtx) ResponseCtx {
if !rCtx.LastErrorWasProxyError && rCtx.Retries > config.Retries {
return ResponseCtx{Error: errors.Errorf("Giving up after %d retries", rCtx.Retries)}
}
name, err := a.ChooseProxy(rCtx)
if err != nil {
return ResponseCtx{Error: err}
}
logrus.WithFields(logrus.Fields{
"proxy": name,
"host": rCtx.Request.Host,
}).Info("Routing request")
p, err := a.GetProxy(name)
if err != nil {
return ResponseCtx{Error: err}
}
rCtx.p = p
response, err := a.processRequestWithProxy(rCtx)
responseCtx := ResponseCtx{
Response: response,
ResponseTime: time.Now().Sub(rCtx.RequestTime).Seconds(),
Error: err,
}
p.incrReqTime = responseCtx.ResponseTime
if response != nil && isHttpSuccessCode(response.StatusCode) {
p.incrGood += 1
return responseCtx
}
rCtx.LastFailedProxy = p.Name
if isProxyError(err) {
a.handleFatalProxyError(p)
rCtx.LastErrorWasProxyError = true
responseCtx.ShouldRetry = true
return responseCtx
}
if err != nil {
if isPermanentError(err) {
a.handleProxyError(p, &responseCtx)
return responseCtx
}
a.waitAfterFail(rCtx)
a.handleProxyError(p, &responseCtx)
responseCtx.ShouldRetry = true
}
dontRetry, forceRetry, shouldRetry := computeRules(rCtx, responseCtx)
if forceRetry {
responseCtx.ShouldRetry = true
return responseCtx
} else if dontRetry {
responseCtx.Error = errors.Errorf("Applied dont_retry rule")
return responseCtx
}
if response == nil {
return responseCtx
}
// Handle HTTP errors
responseCtx.Error = errors.Errorf("HTTP error: %d", response.StatusCode)
if shouldRetry || shouldRetryHttpCode(response.StatusCode) {
responseCtx.ShouldRetry = true
}
return responseCtx
}
func (a *Architeuthis) waitAfterFail(rCtx *RequestCtx) {
wait := getWaitTime(rCtx.Retries)
time.Sleep(wait)
a.writeMetricSleep(wait, "retry")
rCtx.Retries += 1
}
func isRemoteProxy(p *Proxy) bool {
return p.HttpClient.Transport != nil
}
func (a *Architeuthis) handleProxyError(p *Proxy, rCtx *ResponseCtx) {
if isRemoteProxy(p) && shouldBlameProxy(rCtx) {
p.incrBad += 1
p.BadRequestCount += 1
}
}
func (a *Architeuthis) handleFatalProxyError(p *Proxy) {
a.setDead(p.Name)
}
func (a *Architeuthis) processRequestWithProxy(rCtx *RequestCtx) (r *http.Response, e error) {
a.incConns(rCtx.p.Name)
limiter := a.getLimiter(rCtx)
duration, err := limiter.waitRateLimit()
if err != nil {
return nil, err
}
if duration > 0 {
a.writeMetricSleep(duration, "rate")
}
r, e = rCtx.p.HttpClient.Do(rCtx.Request)
return
}
func (a *Architeuthis) Run() {
logrus.WithFields(logrus.Fields{
"addr": config.Addr,
}).Info("Listening")
err := http.ListenAndServe(config.Addr, a.server)
logrus.Fatal(err)
}
func main() {
logrus.SetLevel(logrus.TraceLevel)
balancer := New()
var err error = nil
balancer.influxdb, err = influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxUrl,
Username: config.InfluxUser,
Password: config.InfluxPass,
})
if err != nil {
panic(err)
}
balancer.points = make(chan *influx.Point, InfluxDbBufferSize)
go balancer.asyncWriter(balancer.points)
balancer.Run()
}