Upload hooks

This commit is contained in:
simon987 2019-03-23 14:28:51 -04:00
parent 532a7fe726
commit 2b41ce4edb
7 changed files with 100 additions and 13 deletions

View File

@ -6,10 +6,12 @@ import (
"github.com/fasthttp/websocket" "github.com/fasthttp/websocket"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/postgres" _ "github.com/jinzhu/gorm/dialects/postgres"
"github.com/robfig/cron"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"os" "os"
"path/filepath" "path/filepath"
"time"
) )
var WorkDir = getWorkDir() var WorkDir = getWorkDir()
@ -28,6 +30,7 @@ type WebApi struct {
server fasthttp.Server server fasthttp.Server
db *gorm.DB db *gorm.DB
MotdMessage *websocket.PreparedMessage MotdMessage *websocket.PreparedMessage
Cron *cron.Cron
} }
func Index(ctx *fasthttp.RequestCtx) { func Index(ctx *fasthttp.RequestCtx) {
@ -67,6 +70,7 @@ func New(db *gorm.DB) *WebApi {
} }
api := &WebApi{} api := &WebApi{}
api.Cron = cron.New()
logrus.SetLevel(getLogLevel()) logrus.SetLevel(getLogLevel())
@ -90,6 +94,7 @@ func New(db *gorm.DB) *WebApi {
func (api *WebApi) Run() { func (api *WebApi) Run() {
address := GetServerAddress() address := GetServerAddress()
api.setupCronJobs()
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"addr": address, "addr": address,
@ -100,7 +105,20 @@ func (api *WebApi) Run() {
logrus.Fatalf("Error in ListenAndServe: %s", err) logrus.Fatalf("Error in ListenAndServe: %s", err)
} }
} }
func (api *WebApi) setupCronJobs() {
duration, _ := time.ParseDuration("5m")
api.Cron = cron.New()
schedule := cron.Every(duration)
api.Cron.Schedule(schedule, cron.FuncJob(api.DisposeStaleUploadSlots))
api.Cron.Start()
logrus.WithFields(logrus.Fields{
"every": duration,
}).Info("Scheduled job for DisposeStaleUploadSlots")
}
//TODO: Move those to a different file/package
func GetServerAddress() string { func GetServerAddress() string {
serverAddress := os.Getenv("WS_BUCKET_ADDR") serverAddress := os.Getenv("WS_BUCKET_ADDR")
if serverAddress == "" { if serverAddress == "" {

30
api/maintenance.go Normal file
View File

@ -0,0 +1,30 @@
package api
import (
"github.com/sirupsen/logrus"
"os"
"path/filepath"
"time"
)
func (api *WebApi) DisposeStaleUploadSlots() {
var toDispose []UploadSlot
api.db.Where("? >= to_dispose_date", time.Now().Unix()).Find(&toDispose)
for _, slot := range toDispose {
path := filepath.Join(WorkDir, slot.FileName)
err := os.Remove(path)
api.db.Where("token = ?", slot.Token).Delete(UploadSlot{})
logrus.WithFields(logrus.Fields{
"fileName": slot.FileName,
"err": err,
}).Trace("Deleted file")
}
logrus.WithFields(logrus.Fields{
"staleUploadSlots": len(toDispose),
}).Info("Disposed stale upload slots")
}

View File

@ -12,9 +12,11 @@ type GenericResponse struct {
} }
type AllocateUploadSlotRequest struct { type AllocateUploadSlotRequest struct {
Token string `json:"token"` Token string `json:"token"`
MaxSize int64 `json:"max_size"` MaxSize int64 `json:"max_size"`
FileName string `json:"file_name"` FileName string `json:"file_name"`
ToDisposeDate int64 `json:"to_dispose_date"`
UploadHook string `json:"upload_hook"`
} }
func (req *AllocateUploadSlotRequest) IsValid() bool { func (req *AllocateUploadSlotRequest) IsValid() bool {
@ -45,9 +47,11 @@ func (req *AllocateUploadSlotRequest) IsValid() bool {
} }
type UploadSlot struct { type UploadSlot struct {
MaxSize int64 `json:"max_size"` MaxSize int64 `json:"max_size"`
Token string `gorm:"primary_key",json:"token"` Token string `gorm:"primary_key",json:"token"`
FileName string `json:"file_name"` FileName string `json:"file_name"`
ToDisposeDate int64 `json:"to_dispose_date"`
UploadHook string `json:"upload_hook"`
} }
type WebsocketMotd struct { type WebsocketMotd struct {

View File

@ -7,7 +7,9 @@ import (
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"io" "io"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
) )
@ -133,12 +135,32 @@ func (api *WebApi) Upload(ctx *fasthttp.RequestCtx) {
logrus.WithError(err).Error("Error while closing file") logrus.WithError(err).Error("Error while closing file")
} }
mu.(*sync.RWMutex).Unlock() mu.(*sync.RWMutex).Unlock()
mu.(*sync.RWMutex).RLock()
executeUploadHook(slot)
mu.(*sync.RWMutex).RUnlock()
}) })
if err != nil { if err != nil {
logrus.WithError(err).Error("Error while upgrading connexion") logrus.WithError(err).Error("Error while upgrading connexion")
} }
} }
func executeUploadHook(slot UploadSlot) {
path := filepath.Join(WorkDir, slot.FileName)
commandStr := strings.Replace(slot.UploadHook, "$1", "\""+path+"\"", -1)
cmd := exec.Command("bash", "-c", commandStr)
output, err := cmd.CombinedOutput()
logrus.WithFields(logrus.Fields{
"output": string(output),
"err": err,
"commandStr": commandStr,
}).Info("Execute upload hook")
}
func (api *WebApi) ReadUploadSlot(ctx *fasthttp.RequestCtx) { func (api *WebApi) ReadUploadSlot(ctx *fasthttp.RequestCtx) {
tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token")) tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token"))
@ -191,9 +213,11 @@ func (api *WebApi) ReadUploadSlot(ctx *fasthttp.RequestCtx) {
func (api *WebApi) allocateUploadSlot(req *AllocateUploadSlotRequest) error { func (api *WebApi) allocateUploadSlot(req *AllocateUploadSlotRequest) error {
slot := &UploadSlot{ slot := &UploadSlot{
MaxSize: req.MaxSize, MaxSize: req.MaxSize,
FileName: req.FileName, FileName: req.FileName,
Token: req.Token, Token: req.Token,
ToDisposeDate: req.ToDisposeDate,
UploadHook: req.UploadHook,
} }
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{

2
jenkins/Jenkinsfile vendored
View File

@ -61,7 +61,7 @@ pipeline {
sshCommand remote: remote, command: "cd ws_bucket && rm -rf ws_bucket deploy.sh" sshCommand remote: remote, command: "cd ws_bucket && rm -rf ws_bucket deploy.sh"
sshPut remote: remote, from: 'ws_bucket', into: 'ws_bucket/ws_bucket' sshPut remote: remote, from: 'ws_bucket', into: 'ws_bucket/ws_bucket'
sshPut remote: remote, from: 'jenkins/deploy.sh', into: 'ws_bucket/' sshPut remote: remote, from: 'jenkins/deploy.sh', into: 'ws_bucket/'
sshCommand remote: remote, command: 'chmod +x ws_bucket/deploy.sh && ./ws_bucket/deploy.sh' sshCommand remote: remote, command: 'chmod +x ws_bucket/deploy.sh ./ws_bucket/deploy.sh'
} }
} }
} }

View File

@ -4,6 +4,6 @@ export WSBROOT="ws_bucket"
screen -S ws_bucket -X quit screen -S ws_bucket -X quit
echo "starting ws_bucket" echo "starting ws_bucket"
screen -S ws_bucket -d -m bash -c "cd ${WSBROOT} && chmod +x ws_bucket && ./ws_bucket" screen -S ws_bucket -d -m bash -c "cd ${WSBROOT} && chmod +x ws_bucket && export WS_BUCKET_SECRET=${WS_BUCKET_SECRET} && ./ws_bucket"
sleep 1 sleep 1
screen -list screen -list

View File

@ -10,11 +10,17 @@ import (
"github.com/simon987/ws_bucket/api" "github.com/simon987/ws_bucket/api"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os"
"time" "time"
) )
func Post(path string, x interface{}) *http.Response { func Post(path string, x interface{}) *http.Response {
secret := os.Getenv("WS_BUCKET_SECRET")
if secret == "" {
secret = "default_secret"
}
s := http.Client{} s := http.Client{}
body, err := json.Marshal(x) body, err := json.Marshal(x)
@ -24,7 +30,7 @@ func Post(path string, x interface{}) *http.Response {
handleErr(err) handleErr(err)
ts := time.Now().Format(time.RFC1123) ts := time.Now().Format(time.RFC1123)
mac := hmac.New(crypto.SHA256.New, []byte("default_secret")) mac := hmac.New(crypto.SHA256.New, []byte(secret))
mac.Write(body) mac.Write(body)
mac.Write([]byte(ts)) mac.Write([]byte(ts))
sig := hex.EncodeToString(mac.Sum(nil)) sig := hex.EncodeToString(mac.Sum(nil))
@ -39,13 +45,18 @@ func Post(path string, x interface{}) *http.Response {
func Get(path string, token string) *http.Response { func Get(path string, token string) *http.Response {
secret := os.Getenv("WS_BUCKET_SECRET")
if secret == "" {
secret = "default_secret"
}
s := http.Client{} s := http.Client{}
req, err := http.NewRequest("GET", "http://"+api.GetServerAddress()+path, nil) req, err := http.NewRequest("GET", "http://"+api.GetServerAddress()+path, nil)
handleErr(err) handleErr(err)
ts := time.Now().Format(time.RFC1123) ts := time.Now().Format(time.RFC1123)
mac := hmac.New(crypto.SHA256.New, []byte("default_secret")) mac := hmac.New(crypto.SHA256.New, []byte(secret))
mac.Write([]byte(path)) mac.Write([]byte(path))
mac.Write([]byte(ts)) mac.Write([]byte(ts))
sig := hex.EncodeToString(mac.Sum(nil)) sig := hex.EncodeToString(mac.Sum(nil))