mirror of
https://github.com/simon987/Architeuthis.git
synced 2025-04-04 08:02:59 +00:00
295 lines
6.0 KiB
Go
295 lines
6.0 KiB
Go
package main
|
|
|
|
import (
|
|
"errors"
|
|
"github.com/go-redis/redis/v7"
|
|
"github.com/go-redis/redis_rate/v8"
|
|
"github.com/sirupsen/logrus"
|
|
"math"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
)
|
|
|
|
const KeyProxyList = "proxies"
|
|
const KeyDeadProxyList = "deadProxies"
|
|
const PrefixProxy = "proxy:"
|
|
|
|
const KeyConnectionCount = "conn"
|
|
const KeyRequestTime = "reqtime"
|
|
const KeyBadRequestCount = "bad"
|
|
const KeyGoodRequestCount = "good"
|
|
const KeyRevived = "revived"
|
|
const KeyUrl = "url"
|
|
|
|
func (a *Architeuthis) getLimiter(rCtx *RequestCtx) *RedisLimiter {
|
|
|
|
var hostConfig *HostConfig
|
|
if len(rCtx.configs) == 0 {
|
|
hostConfig = config.DefaultConfig
|
|
} else {
|
|
hostConfig = rCtx.configs[len(rCtx.configs)-1]
|
|
}
|
|
|
|
return &RedisLimiter{
|
|
Key: hostConfig.Host + ":" + rCtx.p.Name,
|
|
Limiter: redis_rate.NewLimiter(a.redis),
|
|
Limit: &redis_rate.Limit{
|
|
Rate: 1,
|
|
Period: hostConfig.Every,
|
|
Burst: hostConfig.Burst,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (a *Architeuthis) UpdateProxy(p *Proxy) {
|
|
|
|
key := PrefixProxy + p.Name
|
|
pipe := a.redis.Pipeline()
|
|
|
|
if p.incrBad != 0 {
|
|
pipe.HIncrBy(key, KeyBadRequestCount, p.incrBad)
|
|
p.BadRequestCount += p.incrBad
|
|
} else {
|
|
pipe.HIncrBy(key, KeyGoodRequestCount, p.incrGood)
|
|
p.GoodRequestCount += p.incrGood
|
|
|
|
if p.KillOnError {
|
|
pipe.HSet(key, KeyRevived, 0)
|
|
}
|
|
}
|
|
|
|
pipe.HIncrByFloat(key, KeyRequestTime, p.incrReqTime)
|
|
p.TotalRequestTime += p.incrReqTime
|
|
|
|
pipe.HIncrBy(key, KeyConnectionCount, -1)
|
|
|
|
pipe.ZAddXX(KeyProxyList, &redis.Z{
|
|
Score: p.Score(),
|
|
Member: p.Name,
|
|
})
|
|
|
|
_, _ = pipe.Exec()
|
|
|
|
newBadRatio := float64(p.BadRequestCount) / float64(p.GoodRequestCount)
|
|
|
|
if p.incrBad > 0 && (p.KillOnError || (newBadRatio > config.MaxErrorRatio && p.BadRequestCount >= 5)) {
|
|
a.setDead(p.Name)
|
|
}
|
|
}
|
|
|
|
func (a *Architeuthis) AddProxy(name, stringUrl string) error {
|
|
|
|
_, err := url.Parse(stringUrl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pipe := a.redis.Pipeline()
|
|
|
|
pipe.HMSet(PrefixProxy+name, map[string]interface{}{
|
|
KeyUrl: stringUrl,
|
|
KeyRequestTime: 0,
|
|
KeyGoodRequestCount: 0,
|
|
KeyBadRequestCount: 0,
|
|
KeyConnectionCount: 0,
|
|
KeyRevived: 0,
|
|
})
|
|
|
|
zadd := pipe.ZAdd(KeyProxyList, &redis.Z{
|
|
Score: 1000,
|
|
Member: name,
|
|
})
|
|
|
|
zcard := pipe.ZCard(KeyProxyList)
|
|
|
|
_, _ = pipe.Exec()
|
|
|
|
if zadd.Val() != 0 {
|
|
logrus.WithFields(logrus.Fields{
|
|
KeyUrl: stringUrl,
|
|
}).Info("Add proxy")
|
|
|
|
a.writeMetricProxyCount(int(zcard.Val()))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *Architeuthis) incConns(name string) int64 {
|
|
res, _ := a.redis.HIncrBy(PrefixProxy+name, KeyConnectionCount, 1).Result()
|
|
return res
|
|
}
|
|
|
|
func (a *Architeuthis) setDead(name string) {
|
|
|
|
pipe := a.redis.Pipeline()
|
|
|
|
pipe.ZRem(KeyProxyList, name)
|
|
pipe.SAdd(KeyDeadProxyList, name)
|
|
count := pipe.ZCard(KeyProxyList)
|
|
|
|
_, _ = pipe.Exec()
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"proxy": name,
|
|
}).Trace("dead")
|
|
|
|
a.writeMetricProxyCount(int(count.Val()))
|
|
}
|
|
|
|
func (a *Architeuthis) setAlive(name string) {
|
|
|
|
pipe := a.redis.Pipeline()
|
|
|
|
pipe.SRem(KeyDeadProxyList, name)
|
|
pipe.HMSet(KeyProxyList+name, map[string]interface{}{
|
|
KeyRevived: 1,
|
|
KeyRequestTime: 0,
|
|
KeyGoodRequestCount: 0,
|
|
KeyBadRequestCount: 0,
|
|
KeyConnectionCount: 0,
|
|
})
|
|
pipe.ZAdd(KeyProxyList, &redis.Z{
|
|
Score: 1000,
|
|
Member: name,
|
|
})
|
|
count := pipe.ZCard(KeyProxyList)
|
|
|
|
_, _ = pipe.Exec()
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"proxy": name,
|
|
}).Trace("revive")
|
|
|
|
a.writeMetricProxyCount(int(count.Val()))
|
|
}
|
|
|
|
func (a *Architeuthis) GetDeadProxies() []*Proxy {
|
|
|
|
result, err := a.redis.SMembers(KeyDeadProxyList).Result()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return a.getProxies(result)
|
|
}
|
|
|
|
func (a *Architeuthis) GetAliveProxies() []*Proxy {
|
|
|
|
result, err := a.redis.ZRange(KeyProxyList, 0, math.MaxInt64).Result()
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return a.getProxies(result)
|
|
}
|
|
|
|
func (a *Architeuthis) getProxies(names []string) []*Proxy {
|
|
|
|
var proxies []*Proxy
|
|
|
|
for _, name := range names {
|
|
p, _ := a.GetProxy(name)
|
|
if p != nil {
|
|
proxies = append(proxies, p)
|
|
}
|
|
}
|
|
|
|
return proxies
|
|
}
|
|
|
|
func (a *Architeuthis) getStats() statsData {
|
|
|
|
data := statsData{}
|
|
|
|
var totalTime float64 = 0
|
|
var totalScore int64 = 0
|
|
|
|
for _, p := range a.GetAliveProxies() {
|
|
stat := p.getStats()
|
|
data.Proxies = append(data.Proxies, stat)
|
|
|
|
data.TotalBad += int(p.BadRequestCount)
|
|
data.TotalGood += int(p.GoodRequestCount)
|
|
data.Connections += int(p.Connections)
|
|
|
|
totalTime += p.TotalRequestTime
|
|
totalScore += stat.Score
|
|
}
|
|
|
|
data.AvgLatency = totalTime / float64(data.TotalGood+data.TotalBad)
|
|
data.AvgScore = float64(totalScore) / float64(len(data.Proxies))
|
|
|
|
return data
|
|
}
|
|
|
|
func (a *Architeuthis) GetProxy(name string) (*Proxy, error) {
|
|
|
|
result, err := a.redis.HGetAll(PrefixProxy + name).Result()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var parsedUrl *url.URL
|
|
var httpClient *http.Client
|
|
|
|
if result[KeyUrl] == "" {
|
|
parsedUrl = nil
|
|
httpClient = &http.Client{
|
|
Timeout: config.Timeout,
|
|
}
|
|
} else {
|
|
parsedUrl, err = url.Parse(result[KeyUrl])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
httpClient = &http.Client{
|
|
Transport: &http.Transport{
|
|
Proxy: http.ProxyURL(parsedUrl),
|
|
},
|
|
Timeout: config.Timeout,
|
|
}
|
|
}
|
|
|
|
conns, _ := strconv.ParseInt(result[KeyConnectionCount], 10, 64)
|
|
good, _ := strconv.ParseInt(result[KeyGoodRequestCount], 10, 64)
|
|
bad, _ := strconv.ParseInt(result[KeyBadRequestCount], 10, 64)
|
|
reqtime, _ := strconv.ParseFloat(result[KeyRequestTime], 64)
|
|
|
|
return &Proxy{
|
|
Name: name,
|
|
Url: parsedUrl,
|
|
HttpClient: httpClient,
|
|
Connections: conns,
|
|
GoodRequestCount: good,
|
|
BadRequestCount: bad,
|
|
TotalRequestTime: reqtime,
|
|
}, nil
|
|
}
|
|
|
|
func (a *Architeuthis) ChooseProxy(rCtx *RequestCtx) (string, error) {
|
|
results, err := a.redis.ZRevRangeWithScores(KeyProxyList, 0, 12).Result()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if len(results) == 0 {
|
|
return "", errors.New("no proxies available")
|
|
}
|
|
|
|
if len(results) == 1 {
|
|
return results[0].Member.(string), nil
|
|
}
|
|
|
|
for {
|
|
idx := rand.Intn(len(results))
|
|
|
|
if results[idx].Member != rCtx.LastFailedProxy {
|
|
return results[idx].Member.(string), nil
|
|
}
|
|
}
|
|
}
|