diff --git a/api/api.go b/api/api.go index 20deb5d..55a21d7 100644 --- a/api/api.go +++ b/api/api.go @@ -6,10 +6,12 @@ import ( "github.com/fasthttp/websocket" "github.com/jinzhu/gorm" _ "github.com/jinzhu/gorm/dialects/postgres" + "github.com/robfig/cron" "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" "os" "path/filepath" + "time" ) var WorkDir = getWorkDir() @@ -28,6 +30,7 @@ type WebApi struct { server fasthttp.Server db *gorm.DB MotdMessage *websocket.PreparedMessage + Cron *cron.Cron } func Index(ctx *fasthttp.RequestCtx) { @@ -67,6 +70,7 @@ func New(db *gorm.DB) *WebApi { } api := &WebApi{} + api.Cron = cron.New() logrus.SetLevel(getLogLevel()) @@ -90,6 +94,7 @@ func New(db *gorm.DB) *WebApi { func (api *WebApi) Run() { address := GetServerAddress() + api.setupCronJobs() logrus.WithFields(logrus.Fields{ "addr": address, @@ -100,7 +105,20 @@ func (api *WebApi) Run() { logrus.Fatalf("Error in ListenAndServe: %s", err) } } +func (api *WebApi) setupCronJobs() { + duration, _ := time.ParseDuration("5m") + api.Cron = cron.New() + schedule := cron.Every(duration) + api.Cron.Schedule(schedule, cron.FuncJob(api.DisposeStaleUploadSlots)) + api.Cron.Start() + + logrus.WithFields(logrus.Fields{ + "every": duration, + }).Info("Scheduled job for DisposeStaleUploadSlots") +} + +//TODO: Move those to a different file/package func GetServerAddress() string { serverAddress := os.Getenv("WS_BUCKET_ADDR") if serverAddress == "" { diff --git a/api/maintenance.go b/api/maintenance.go new file mode 100644 index 0000000..19314ac --- /dev/null +++ b/api/maintenance.go @@ -0,0 +1,30 @@ +package api + +import ( + "github.com/sirupsen/logrus" + "os" + "path/filepath" + "time" +) + +func (api *WebApi) DisposeStaleUploadSlots() { + + var toDispose []UploadSlot + api.db.Where("? >= to_dispose_date", time.Now().Unix()).Find(&toDispose) + + for _, slot := range toDispose { + path := filepath.Join(WorkDir, slot.FileName) + + err := os.Remove(path) + api.db.Where("token = ?", slot.Token).Delete(UploadSlot{}) + + logrus.WithFields(logrus.Fields{ + "fileName": slot.FileName, + "err": err, + }).Trace("Deleted file") + } + + logrus.WithFields(logrus.Fields{ + "staleUploadSlots": len(toDispose), + }).Info("Disposed stale upload slots") +} diff --git a/api/models.go b/api/models.go index 869da1d..3d17dea 100644 --- a/api/models.go +++ b/api/models.go @@ -12,9 +12,11 @@ type GenericResponse struct { } type AllocateUploadSlotRequest struct { - Token string `json:"token"` - MaxSize int64 `json:"max_size"` - FileName string `json:"file_name"` + Token string `json:"token"` + MaxSize int64 `json:"max_size"` + FileName string `json:"file_name"` + ToDisposeDate int64 `json:"to_dispose_date"` + UploadHook string `json:"upload_hook"` } func (req *AllocateUploadSlotRequest) IsValid() bool { @@ -45,9 +47,11 @@ func (req *AllocateUploadSlotRequest) IsValid() bool { } type UploadSlot struct { - MaxSize int64 `json:"max_size"` - Token string `gorm:"primary_key",json:"token"` - FileName string `json:"file_name"` + MaxSize int64 `json:"max_size"` + Token string `gorm:"primary_key",json:"token"` + FileName string `json:"file_name"` + ToDisposeDate int64 `json:"to_dispose_date"` + UploadHook string `json:"upload_hook"` } type WebsocketMotd struct { diff --git a/api/slot.go b/api/slot.go index ca6adff..4e0379e 100644 --- a/api/slot.go +++ b/api/slot.go @@ -7,7 +7,9 @@ import ( "github.com/valyala/fasthttp" "io" "os" + "os/exec" "path/filepath" + "strings" "sync" ) @@ -133,12 +135,32 @@ func (api *WebApi) Upload(ctx *fasthttp.RequestCtx) { logrus.WithError(err).Error("Error while closing file") } mu.(*sync.RWMutex).Unlock() + mu.(*sync.RWMutex).RLock() + + executeUploadHook(slot) + + mu.(*sync.RWMutex).RUnlock() }) if err != nil { logrus.WithError(err).Error("Error while upgrading connexion") } } +func executeUploadHook(slot UploadSlot) { + + path := filepath.Join(WorkDir, slot.FileName) + + commandStr := strings.Replace(slot.UploadHook, "$1", "\""+path+"\"", -1) + cmd := exec.Command("bash", "-c", commandStr) + output, err := cmd.CombinedOutput() + + logrus.WithFields(logrus.Fields{ + "output": string(output), + "err": err, + "commandStr": commandStr, + }).Info("Execute upload hook") +} + func (api *WebApi) ReadUploadSlot(ctx *fasthttp.RequestCtx) { tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token")) @@ -191,9 +213,11 @@ func (api *WebApi) ReadUploadSlot(ctx *fasthttp.RequestCtx) { func (api *WebApi) allocateUploadSlot(req *AllocateUploadSlotRequest) error { slot := &UploadSlot{ - MaxSize: req.MaxSize, - FileName: req.FileName, - Token: req.Token, + MaxSize: req.MaxSize, + FileName: req.FileName, + Token: req.Token, + ToDisposeDate: req.ToDisposeDate, + UploadHook: req.UploadHook, } logrus.WithFields(logrus.Fields{ diff --git a/jenkins/Jenkinsfile b/jenkins/Jenkinsfile index 7997037..09d5998 100644 --- a/jenkins/Jenkinsfile +++ b/jenkins/Jenkinsfile @@ -61,7 +61,7 @@ pipeline { sshCommand remote: remote, command: "cd ws_bucket && rm -rf ws_bucket deploy.sh" sshPut remote: remote, from: 'ws_bucket', into: 'ws_bucket/ws_bucket' sshPut remote: remote, from: 'jenkins/deploy.sh', into: 'ws_bucket/' - sshCommand remote: remote, command: 'chmod +x ws_bucket/deploy.sh && ./ws_bucket/deploy.sh' + sshCommand remote: remote, command: 'chmod +x ws_bucket/deploy.sh ./ws_bucket/deploy.sh' } } } diff --git a/jenkins/deploy.sh b/jenkins/deploy.sh index b535c3f..9f4cfae 100755 --- a/jenkins/deploy.sh +++ b/jenkins/deploy.sh @@ -4,6 +4,6 @@ export WSBROOT="ws_bucket" screen -S ws_bucket -X quit echo "starting ws_bucket" -screen -S ws_bucket -d -m bash -c "cd ${WSBROOT} && chmod +x ws_bucket && ./ws_bucket" +screen -S ws_bucket -d -m bash -c "cd ${WSBROOT} && chmod +x ws_bucket && export WS_BUCKET_SECRET=${WS_BUCKET_SECRET} && ./ws_bucket" sleep 1 screen -list diff --git a/test/common.go b/test/common.go index a418f55..675083f 100644 --- a/test/common.go +++ b/test/common.go @@ -10,11 +10,17 @@ import ( "github.com/simon987/ws_bucket/api" "io/ioutil" "net/http" + "os" "time" ) func Post(path string, x interface{}) *http.Response { + secret := os.Getenv("WS_BUCKET_SECRET") + if secret == "" { + secret = "default_secret" + } + s := http.Client{} body, err := json.Marshal(x) @@ -24,7 +30,7 @@ func Post(path string, x interface{}) *http.Response { handleErr(err) ts := time.Now().Format(time.RFC1123) - mac := hmac.New(crypto.SHA256.New, []byte("default_secret")) + mac := hmac.New(crypto.SHA256.New, []byte(secret)) mac.Write(body) mac.Write([]byte(ts)) sig := hex.EncodeToString(mac.Sum(nil)) @@ -39,13 +45,18 @@ func Post(path string, x interface{}) *http.Response { func Get(path string, token string) *http.Response { + secret := os.Getenv("WS_BUCKET_SECRET") + if secret == "" { + secret = "default_secret" + } + s := http.Client{} req, err := http.NewRequest("GET", "http://"+api.GetServerAddress()+path, nil) handleErr(err) ts := time.Now().Format(time.RFC1123) - mac := hmac.New(crypto.SHA256.New, []byte("default_secret")) + mac := hmac.New(crypto.SHA256.New, []byte(secret)) mac.Write([]byte(path)) mac.Write([]byte(ts)) sig := hex.EncodeToString(mac.Sum(nil))