Add per project rate limit

This commit is contained in:
simon987
2019-02-24 21:18:55 -05:00
parent 9acf6e27c1
commit 3415f95337
13 changed files with 347 additions and 267 deletions

View File

@@ -9,15 +9,18 @@ import (
"github.com/simon987/task_tracker/config"
"github.com/simon987/task_tracker/storage"
"github.com/valyala/fasthttp"
"sync"
)
type WebAPI struct {
server *fasthttp.Server
router *fasthttprouter.Router
Database *storage.Database
SessionConfig sessions.Config
Session *sessions.Sessions
Cron *cron.Cron
server *fasthttp.Server
router *fasthttprouter.Router
Database *storage.Database
SessionConfig sessions.Config
Session *sessions.Sessions
Cron *cron.Cron
AssignLimiters sync.Map
SubmitLimiters sync.Map
}
type RequestHandler func(*Request)
@@ -98,7 +101,6 @@ func New() *WebAPI {
api.router.POST("/task/submit", LogRequestMiddleware(api.SubmitTask))
api.router.GET("/task/get/:project", LogRequestMiddleware(api.GetTaskFromProject))
api.router.GET("/task/get", LogRequestMiddleware(api.GetTask))
api.router.POST("/task/release", LogRequestMiddleware(api.ReleaseTask))
api.router.POST("/git/receivehook", LogRequestMiddleware(api.ReceiveGitWebHook))

View File

@@ -3,6 +3,7 @@ package api
import (
"encoding/json"
"github.com/simon987/task_tracker/storage"
"golang.org/x/time/rate"
)
const (
@@ -12,9 +13,10 @@ const (
)
type JsonResponse struct {
Ok bool `json:"ok"`
Message string `json:"message,omitempty"`
Content interface{} `json:"content,omitempty"`
Ok bool `json:"ok"`
Message string `json:"message,omitempty"`
RateLimitDelay string `json:"rate_limit_delay,omitempty"`
Content interface{} `json:"content,omitempty"`
}
type GitPayload struct {
@@ -115,15 +117,17 @@ type GetSnapshotsResponse struct {
}
type CreateProjectRequest struct {
Name string `json:"name"`
CloneUrl string `json:"clone_url"`
GitRepo string `json:"git_repo"`
Version string `json:"version"`
Priority int64 `json:"priority"`
Motd string `json:"motd"`
Public bool `json:"public"`
Hidden bool `json:"hidden"`
Chain int64 `json:"chain"`
Name string `json:"name"`
CloneUrl string `json:"clone_url"`
GitRepo string `json:"git_repo"`
Version string `json:"version"`
Priority int64 `json:"priority"`
Motd string `json:"motd"`
Public bool `json:"public"`
Hidden bool `json:"hidden"`
Chain int64 `json:"chain"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
}
func (req *CreateProjectRequest) isValid() bool {
@@ -140,15 +144,17 @@ func (req *CreateProjectRequest) isValid() bool {
}
type UpdateProjectRequest struct {
Name string `json:"name"`
CloneUrl string `json:"clone_url"`
GitRepo string `json:"git_repo"`
Priority int64 `json:"priority"`
Motd string `json:"motd"`
Public bool `json:"public"`
Hidden bool `json:"hidden"`
Chain int64 `json:"chain"`
Paused bool `json:"paused"`
Name string `json:"name"`
CloneUrl string `json:"clone_url"`
GitRepo string `json:"git_repo"`
Priority int64 `json:"priority"`
Motd string `json:"motd"`
Public bool `json:"public"`
Hidden bool `json:"hidden"`
Chain int64 `json:"chain"`
Paused bool `json:"paused"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
}
func (req *UpdateProjectRequest) isValid() bool {

View File

@@ -5,6 +5,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/google/uuid"
"github.com/simon987/task_tracker/storage"
"golang.org/x/time/rate"
"strconv"
)
@@ -62,16 +63,25 @@ func (api *WebAPI) CreateProject(r *Request) {
}, 400)
return
}
if createReq.AssignRate == 0 {
createReq.AssignRate = rate.Inf
}
if createReq.SubmitRate == 0 {
createReq.SubmitRate = rate.Inf
}
project := &storage.Project{
Name: createReq.Name,
Version: createReq.Version,
CloneUrl: createReq.CloneUrl,
GitRepo: createReq.GitRepo,
Priority: createReq.Priority,
Motd: createReq.Motd,
Public: createReq.Public,
Hidden: createReq.Hidden,
Chain: createReq.Chain,
Name: createReq.Name,
Version: createReq.Version,
CloneUrl: createReq.CloneUrl,
GitRepo: createReq.GitRepo,
Priority: createReq.Priority,
Motd: createReq.Motd,
Public: createReq.Public,
Hidden: createReq.Hidden,
Chain: createReq.Chain,
AssignRate: createReq.AssignRate,
SubmitRate: createReq.SubmitRate,
}
if !createReq.isValid() {
@@ -156,16 +166,18 @@ func (api *WebAPI) UpdateProject(r *Request) {
}
project := &storage.Project{
Id: id,
Name: updateReq.Name,
CloneUrl: updateReq.CloneUrl,
GitRepo: updateReq.GitRepo,
Priority: updateReq.Priority,
Motd: updateReq.Motd,
Public: updateReq.Public,
Hidden: updateReq.Hidden,
Chain: updateReq.Chain,
Paused: updateReq.Paused,
Id: id,
Name: updateReq.Name,
CloneUrl: updateReq.CloneUrl,
GitRepo: updateReq.GitRepo,
Priority: updateReq.Priority,
Motd: updateReq.Motd,
Public: updateReq.Public,
Hidden: updateReq.Hidden,
Chain: updateReq.Chain,
Paused: updateReq.Paused,
AssignRate: updateReq.AssignRate,
SubmitRate: updateReq.SubmitRate,
}
sess := api.Session.StartFasthttp(r.Ctx)
manager := sess.Get("manager")

40
api/rate.go Normal file
View File

@@ -0,0 +1,40 @@
package api
import (
"fmt"
"golang.org/x/time/rate"
"time"
)
func (api *WebAPI) ReserveSubmit(pid int64) *rate.Reservation {
limiter, ok := api.SubmitLimiters.Load(pid)
if !ok {
project := api.Database.GetProject(pid)
if project == nil {
return &rate.Reservation{}
}
limiter = rate.NewLimiter(project.SubmitRate, 1)
api.SubmitLimiters.Store(pid, limiter)
}
return limiter.(*rate.Limiter).ReserveN(time.Now(), 1)
}
func (api *WebAPI) ReserveAssign(pid int64) *rate.Reservation {
limiter, ok := api.AssignLimiters.Load(pid)
if !ok {
project := api.Database.GetProject(pid)
if project == nil {
return &rate.Reservation{}
}
limiter = rate.NewLimiter(project.AssignRate, 1)
api.AssignLimiters.Store(pid, limiter)
fmt.Printf("Create")
}
return limiter.(*rate.Limiter).ReserveN(time.Now(), 1)
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/dchest/siphash"
"github.com/simon987/task_tracker/storage"
"strconv"
"time"
)
func (api *WebAPI) SubmitTask(r *Request) {
@@ -53,6 +54,17 @@ func (api *WebAPI) SubmitTask(r *Request) {
return
}
reservation := api.ReserveSubmit(createReq.Project)
delay := reservation.DelayFrom(time.Now()).Seconds()
if delay > 0 {
r.Json(JsonResponse{
Ok: false,
Message: "Too many requests",
RateLimitDelay: strconv.FormatFloat(delay, 'f', -1, 64),
}, 429)
return
}
if createReq.UniqueString != "" {
//TODO: Load key from config
createReq.Hash64 = int64(siphash.Hash(1, 2, []byte(createReq.UniqueString)))
@@ -65,6 +77,7 @@ func (api *WebAPI) SubmitTask(r *Request) {
Ok: false,
Message: err.Error(),
}, 400)
reservation.Cancel()
} else {
r.OkJson(JsonResponse{
Ok: true,
@@ -84,47 +97,33 @@ func (api *WebAPI) GetTaskFromProject(r *Request) {
}
project, err := strconv.ParseInt(r.Ctx.UserValue("project").(string), 10, 64)
handleErr(err, r)
task := api.Database.GetTaskFromProject(worker, project)
if task == nil {
r.OkJson(JsonResponse{
Ok: false,
Message: "No task available",
})
} else {
r.OkJson(JsonResponse{
Ok: true,
Content: GetTaskResponse{
Task: task,
},
})
}
}
func (api *WebAPI) GetTask(r *Request) {
worker, err := api.validateSignature(r)
if err != nil {
if err != nil || project <= 0 {
r.Json(JsonResponse{
Ok: false,
Message: err.Error(),
}, 403)
Message: "Invalid project id",
}, 400)
return
}
task := api.Database.GetTask(worker)
if task == nil {
reservation := api.ReserveAssign(project)
delay := reservation.DelayFrom(time.Now()).Seconds()
if delay > 0 {
r.Json(JsonResponse{
Ok: false,
Message: "Too many requests",
RateLimitDelay: strconv.FormatFloat(delay, 'f', -1, 64),
}, 429)
return
}
task := api.Database.GetTaskFromProject(worker, project)
if task == nil {
r.OkJson(JsonResponse{
Ok: false,
Message: "No task available",
})
reservation.CancelAt(time.Now())
} else {
r.OkJson(JsonResponse{
@@ -134,8 +133,8 @@ func (api *WebAPI) GetTask(r *Request) {
},
})
}
}
}
func (api WebAPI) validateSignature(r *Request) (*storage.Worker, error) {
widStr := string(r.Ctx.Request.Header.Peek("X-Worker-Id"))