mirror of
				https://github.com/simon987/Architeuthis.git
				synced 2025-10-31 06:46:51 +00:00 
			
		
		
		
	least-connected load-balancing, per-host rate-limiting & load from config
This commit is contained in:
		
							parent
							
								
									730616da98
								
							
						
					
					
						commit
						6f0637dc3d
					
				
							
								
								
									
										53
									
								
								config.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								config.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,53 @@ | ||||
| package main | ||||
| 
 | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"golang.org/x/time/rate" | ||||
| 	"io/ioutil" | ||||
| 	"os" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| type HostConfig struct { | ||||
| 	Every   string            `json:"every"` | ||||
| 	Burst   int               `json:"burst"` | ||||
| 	Headers map[string]string `json:"headers"` | ||||
| } | ||||
| 
 | ||||
| type ProxyConfig struct { | ||||
| 	Name string `json:"name"` | ||||
| 	Url  string `json:"url"` | ||||
| } | ||||
| 
 | ||||
| var config struct { | ||||
| 	Addr    string                `json:"addr"` | ||||
| 	Hosts   map[string]HostConfig `json:"hosts"` | ||||
| 	Proxies []ProxyConfig         `json:"proxies"` | ||||
| } | ||||
| 
 | ||||
| func loadConfig() { | ||||
| 
 | ||||
| 	configFile, err := os.Open("config.json") | ||||
| 	handleErr(err) | ||||
| 
 | ||||
| 	configBytes, err := ioutil.ReadAll(configFile) | ||||
| 	handleErr(err) | ||||
| 
 | ||||
| 	err = json.Unmarshal(configBytes, &config) | ||||
| 	handleErr(err) | ||||
| } | ||||
| 
 | ||||
| func applyConfig(proxy *Proxy) { | ||||
| 
 | ||||
| 	for host, conf := range config.Hosts { | ||||
| 		duration, err := time.ParseDuration(conf.Every) | ||||
| 		handleErr(err) | ||||
| 		proxy.Limiters.Store(host, rate.NewLimiter(rate.Every(duration), conf.Burst)) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func handleErr(err error) { | ||||
| 	if err != nil { | ||||
| 		panic(err) | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										25
									
								
								config.json
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								config.json
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,25 @@ | ||||
| { | ||||
|   "addr": "localhost:5050", | ||||
|   "proxies": [ | ||||
|     { | ||||
|       "name": "p0", | ||||
|       "url": "" | ||||
|     }, | ||||
|     { | ||||
|       "name": "p1", | ||||
|       "url": "http://localhost:3128" | ||||
|     } | ||||
|   ], | ||||
|   "hosts": { | ||||
|     "*": { | ||||
|       "every": "10s", | ||||
|       "burst": 1, | ||||
|       "headers": {} | ||||
|     }, | ||||
|     "reddit.com": { | ||||
|       "every": "100s", | ||||
|       "burst": 1, | ||||
|       "headers": {} | ||||
|     } | ||||
|   } | ||||
| } | ||||
							
								
								
									
										66
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										66
									
								
								main.go
									
									
									
									
									
								
							| @ -7,6 +7,7 @@ import ( | ||||
| 	"golang.org/x/time/rate" | ||||
| 	"net/http" | ||||
| 	"net/url" | ||||
| 	"sort" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| @ -18,10 +19,25 @@ type Balancer struct { | ||||
| } | ||||
| 
 | ||||
| type Proxy struct { | ||||
| 	Name       string | ||||
| 	Url        *url.URL | ||||
| 	Limiters   sync.Map | ||||
| 	HttpClient *http.Client | ||||
| 	Name        string | ||||
| 	Url         *url.URL | ||||
| 	Limiters    sync.Map | ||||
| 	HttpClient  *http.Client | ||||
| 	Connections int | ||||
| } | ||||
| 
 | ||||
| type ByConnectionCount []*Proxy | ||||
| 
 | ||||
| func (a ByConnectionCount) Len() int { | ||||
| 	return len(a) | ||||
| } | ||||
| 
 | ||||
| func (a ByConnectionCount) Swap(i, j int) { | ||||
| 	a[i], a[j] = a[j], a[i] | ||||
| } | ||||
| 
 | ||||
| func (a ByConnectionCount) Less(i, j int) bool { | ||||
| 	return a[i].Connections < a[j].Connections | ||||
| } | ||||
| 
 | ||||
| func LogRequestMiddleware(h goproxy.FuncReqHandler) goproxy.ReqHandler { | ||||
| @ -41,8 +57,8 @@ func (p *Proxy) getLimiter(host string) *rate.Limiter { | ||||
| 	limiter, ok := p.Limiters.Load(host) | ||||
| 	if !ok { | ||||
| 
 | ||||
| 		every, _ := time.ParseDuration("100ms") | ||||
| 		limiter = rate.NewLimiter(rate.Every(every), 0) | ||||
| 		every, _ := time.ParseDuration("1ms") | ||||
| 		limiter = rate.NewLimiter(rate.Every(every), 1) | ||||
| 		p.Limiters.Store(host, limiter) | ||||
| 
 | ||||
| 		logrus.WithFields(logrus.Fields{ | ||||
| @ -65,6 +81,8 @@ func (b *Balancer) chooseProxy(host string) *Proxy { | ||||
| 
 | ||||
| 	_ = simplifyHost(host) | ||||
| 
 | ||||
| 	sort.Sort(ByConnectionCount(b.proxies)) | ||||
| 
 | ||||
| 	return b.proxies[0] | ||||
| } | ||||
| 
 | ||||
| @ -79,14 +97,31 @@ func New() *Balancer { | ||||
| 	balancer.server.OnRequest().Do(LogRequestMiddleware( | ||||
| 		func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { | ||||
| 
 | ||||
| 			p := balancer.chooseProxy(r.Host) | ||||
| 			sHost := simplifyHost(r.Host) | ||||
| 			p := balancer.chooseProxy(sHost) | ||||
| 
 | ||||
| 			p.Connections += 1 | ||||
| 			logrus.WithFields(logrus.Fields{ | ||||
| 				"proxy": p.Name, | ||||
| 				"proxy":      p.Name, | ||||
| 				"connexions": p.Connections, | ||||
| 			}).Trace("Routing request") | ||||
| 
 | ||||
| 			limiter := p.getLimiter(sHost) | ||||
| 			reservation := limiter.Reserve() | ||||
| 			if !reservation.OK() { | ||||
| 				logrus.Warn("Could not reserve") | ||||
| 			} | ||||
| 			delay := reservation.Delay() | ||||
| 			if delay > 0 { | ||||
| 				logrus.WithFields(logrus.Fields{ | ||||
| 					"time": delay, | ||||
| 				}).Trace("Sleeping") | ||||
| 				time.Sleep(delay) | ||||
| 			} | ||||
| 
 | ||||
| 			proxyReq := preprocessRequest(cloneRequest(r)) | ||||
| 			resp, err := p.HttpClient.Do(proxyReq) | ||||
| 			p.Connections -= 1 | ||||
| 
 | ||||
| 			//TODO: handle err | ||||
| 			if err != nil { | ||||
| @ -166,12 +201,21 @@ func NewProxy(name, stringUrl string) (*Proxy, error) { | ||||
| func main() { | ||||
| 	logrus.SetLevel(logrus.TraceLevel) | ||||
| 
 | ||||
| 	loadConfig() | ||||
| 	balancer := New() | ||||
| 
 | ||||
| 	p0, _ := NewProxy("p0", "http://localhost:3128") | ||||
| 	//p0, _ := NewProxy("p0", "") | ||||
| 	for _, proxyConf := range config.Proxies { | ||||
| 		proxy, err := NewProxy(proxyConf.Name, proxyConf.Url) | ||||
| 		handleErr(err) | ||||
| 		balancer.proxies = append(balancer.proxies, proxy) | ||||
| 
 | ||||
| 	balancer.proxies = []*Proxy{p0} | ||||
| 		applyConfig(proxy) | ||||
| 
 | ||||
| 		logrus.WithFields(logrus.Fields{ | ||||
| 			"name": proxy.Name, | ||||
| 			"url":  proxy.Url, | ||||
| 		}).Info("Proxy") | ||||
| 	} | ||||
| 
 | ||||
| 	balancer.Run() | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user