diff --git a/config.go b/config.go new file mode 100644 index 0000000..f7da327 --- /dev/null +++ b/config.go @@ -0,0 +1,53 @@ +package main + +import ( + "encoding/json" + "golang.org/x/time/rate" + "io/ioutil" + "os" + "time" +) + +type HostConfig struct { + Every string `json:"every"` + Burst int `json:"burst"` + Headers map[string]string `json:"headers"` +} + +type ProxyConfig struct { + Name string `json:"name"` + Url string `json:"url"` +} + +var config struct { + Addr string `json:"addr"` + Hosts map[string]HostConfig `json:"hosts"` + Proxies []ProxyConfig `json:"proxies"` +} + +func loadConfig() { + + configFile, err := os.Open("config.json") + handleErr(err) + + configBytes, err := ioutil.ReadAll(configFile) + handleErr(err) + + err = json.Unmarshal(configBytes, &config) + handleErr(err) +} + +func applyConfig(proxy *Proxy) { + + for host, conf := range config.Hosts { + duration, err := time.ParseDuration(conf.Every) + handleErr(err) + proxy.Limiters.Store(host, rate.NewLimiter(rate.Every(duration), conf.Burst)) + } +} + +func handleErr(err error) { + if err != nil { + panic(err) + } +} diff --git a/config.json b/config.json new file mode 100644 index 0000000..1e7ec8b --- /dev/null +++ b/config.json @@ -0,0 +1,25 @@ +{ + "addr": "localhost:5050", + "proxies": [ + { + "name": "p0", + "url": "" + }, + { + "name": "p1", + "url": "http://localhost:3128" + } + ], + "hosts": { + "*": { + "every": "10s", + "burst": 1, + "headers": {} + }, + "reddit.com": { + "every": "100s", + "burst": 1, + "headers": {} + } + } +} \ No newline at end of file diff --git a/main.go b/main.go index caf5757..ed5c24a 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "golang.org/x/time/rate" "net/http" "net/url" + "sort" "strings" "sync" "time" @@ -18,10 +19,25 @@ type Balancer struct { } type Proxy struct { - Name string - Url *url.URL - Limiters sync.Map - HttpClient *http.Client + Name string + Url *url.URL + Limiters sync.Map + HttpClient *http.Client + Connections int +} + +type ByConnectionCount []*Proxy + +func (a ByConnectionCount) Len() int { + return len(a) +} + +func (a ByConnectionCount) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a ByConnectionCount) Less(i, j int) bool { + return a[i].Connections < a[j].Connections } func LogRequestMiddleware(h goproxy.FuncReqHandler) goproxy.ReqHandler { @@ -41,8 +57,8 @@ func (p *Proxy) getLimiter(host string) *rate.Limiter { limiter, ok := p.Limiters.Load(host) if !ok { - every, _ := time.ParseDuration("100ms") - limiter = rate.NewLimiter(rate.Every(every), 0) + every, _ := time.ParseDuration("1ms") + limiter = rate.NewLimiter(rate.Every(every), 1) p.Limiters.Store(host, limiter) logrus.WithFields(logrus.Fields{ @@ -65,6 +81,8 @@ func (b *Balancer) chooseProxy(host string) *Proxy { _ = simplifyHost(host) + sort.Sort(ByConnectionCount(b.proxies)) + return b.proxies[0] } @@ -79,14 +97,31 @@ func New() *Balancer { balancer.server.OnRequest().Do(LogRequestMiddleware( func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { - p := balancer.chooseProxy(r.Host) + sHost := simplifyHost(r.Host) + p := balancer.chooseProxy(sHost) + p.Connections += 1 logrus.WithFields(logrus.Fields{ - "proxy": p.Name, + "proxy": p.Name, + "connexions": p.Connections, }).Trace("Routing request") + limiter := p.getLimiter(sHost) + reservation := limiter.Reserve() + if !reservation.OK() { + logrus.Warn("Could not reserve") + } + delay := reservation.Delay() + if delay > 0 { + logrus.WithFields(logrus.Fields{ + "time": delay, + }).Trace("Sleeping") + time.Sleep(delay) + } + proxyReq := preprocessRequest(cloneRequest(r)) resp, err := p.HttpClient.Do(proxyReq) + p.Connections -= 1 //TODO: handle err if err != nil { @@ -166,12 +201,21 @@ func NewProxy(name, stringUrl string) (*Proxy, error) { func main() { logrus.SetLevel(logrus.TraceLevel) + loadConfig() balancer := New() - p0, _ := NewProxy("p0", "http://localhost:3128") - //p0, _ := NewProxy("p0", "") + for _, proxyConf := range config.Proxies { + proxy, err := NewProxy(proxyConf.Name, proxyConf.Url) + handleErr(err) + balancer.proxies = append(balancer.proxies, proxy) - balancer.proxies = []*Proxy{p0} + applyConfig(proxy) + + logrus.WithFields(logrus.Fields{ + "name": proxy.Name, + "url": proxy.Url, + }).Info("Proxy") + } balancer.Run() } diff --git a/web.py b/web.py new file mode 100644 index 0000000..310cd69 --- /dev/null +++ b/web.py @@ -0,0 +1,14 @@ +from flask import Flask +import time + +app = Flask(__name__) + + +@app.route("/") +def hello(): + time.sleep(3) + return "Hello World!" + + +if __name__ == "__main__": + app.run(port=9999)