ws_bucket/api/slot.go
2019-04-04 19:41:40 -04:00

260 lines
5.6 KiB
Go

package api
import (
"encoding/json"
"github.com/fasthttp/websocket"
"github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
)
const WsBufferSize = 4096
var Mutexes sync.Map
var upgrader = websocket.FastHTTPUpgrader{
ReadBufferSize: WsBufferSize,
WriteBufferSize: WsBufferSize,
EnableCompression: true,
CheckOrigin: func(ctx *fasthttp.RequestCtx) bool {
return true
},
}
func (api *WebApi) AllocateUploadSlot(ctx *fasthttp.RequestCtx) {
err := validateRequest(ctx)
if err != nil {
ctx.Response.Header.SetStatusCode(401)
Json(GenericResponse{
Ok: false,
Message: err.Error(),
}, ctx)
return
}
req := &AllocateUploadSlotRequest{}
err = json.Unmarshal(ctx.Request.Body(), req)
if err != nil {
ctx.Response.Header.SetStatusCode(400)
Json(GenericResponse{
Ok: false,
}, ctx)
return
}
if !req.IsValid() {
ctx.Response.Header.SetStatusCode(400)
Json(GenericResponse{
Ok: false,
}, ctx)
return
}
err = api.allocateUploadSlot(req)
if err == nil {
Json(GenericResponse{
Ok: true,
}, ctx)
} else {
Json(GenericResponse{
Ok: false,
Message: err.Error(),
}, ctx)
}
}
func (api *WebApi) Upload(ctx *fasthttp.RequestCtx) {
token := string(ctx.Request.Header.Peek("X-Upload-Token"))
if token == "" {
token = string(ctx.Request.URI().QueryArgs().Peek("token"))
}
slot := UploadSlot{}
err := api.db.Where("token=?", token).First(&slot).Error
if err != nil {
ctx.Response.Header.SetStatusCode(400)
logrus.WithError(err).WithFields(logrus.Fields{
"token": token,
}).Warning("Upload slot not found")
return
}
logrus.WithFields(logrus.Fields{
"slot": slot,
}).Info("Upgrading connection")
err = upgrader.Upgrade(ctx, func(ws *websocket.Conn) {
defer ws.Close()
mt, reader, err := ws.NextReader()
if err != nil {
logrus.WithFields(logrus.Fields{
"token": token,
}).Warning("Client disconnected before sending the first byte")
return
}
if mt != websocket.BinaryMessage {
return
}
mu, _ := Mutexes.LoadOrStore(slot.Token, &sync.RWMutex{})
mu.(*sync.RWMutex).Lock()
path := filepath.Join(WorkDir, slot.FileName)
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
logrus.WithError(err).Error("Error while opening file for writing")
}
buf := make([]byte, WsBufferSize)
totalRead := int64(0)
for totalRead < slot.MaxSize {
read, err := reader.Read(buf)
var toWrite int
if totalRead+int64(read) > slot.MaxSize {
toWrite = int(slot.MaxSize - totalRead)
} else {
toWrite = read
}
_, _ = fp.Write(buf[:toWrite])
if err == io.EOF {
break
}
totalRead += int64(read)
}
logrus.WithFields(logrus.Fields{
"totalRead": totalRead,
}).Info("Finished reading")
err = fp.Close()
if err != nil {
logrus.WithError(err).Error("Error while closing file")
}
mu.(*sync.RWMutex).Unlock()
mu.(*sync.RWMutex).RLock()
executeUploadHook(slot)
mu.(*sync.RWMutex).RUnlock()
})
if err != nil {
logrus.WithError(err).Error("Error while upgrading connexion")
}
}
func executeUploadHook(slot UploadSlot) {
if slot.UploadHook == "" {
return
}
path := filepath.Join(WorkDir, slot.FileName)
commandStr := strings.Replace(slot.UploadHook, "$1", "\""+path+"\"", -1)
cmd := exec.Command("bash", "-c", commandStr)
output, err := cmd.CombinedOutput()
logrus.WithFields(logrus.Fields{
"output": string(output),
"err": err,
"commandStr": commandStr,
}).Info("Execute upload hook")
}
func (api *WebApi) UploadSlotInfo(ctx *fasthttp.RequestCtx) {
tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token"))
if tokenStr == "" {
tokenStr = string(ctx.Request.URI().QueryArgs().Peek("token"))
}
slot := UploadSlot{}
err := api.db.Where("token=?", tokenStr).First(&slot).Error
if err != nil {
ctx.Response.Header.SetStatusCode(404)
logrus.WithError(err).WithFields(logrus.Fields{
"token": tokenStr,
}).Warning("Upload slot not found")
return
}
Json(GetUploadSlotResponse{
UploadSlot: slot,
}, ctx)
return
}
func (api *WebApi) ReadUploadSlot(ctx *fasthttp.RequestCtx) {
tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token"))
if tokenStr == "" {
tokenStr = string(ctx.Request.URI().QueryArgs().Peek("token"))
}
slot := UploadSlot{}
err := api.db.Where("token=?", tokenStr).First(&slot).Error
if err != nil {
ctx.Response.Header.SetStatusCode(404)
logrus.WithError(err).WithFields(logrus.Fields{
"token": tokenStr,
}).Warning("Upload slot not found")
return
}
logrus.WithFields(logrus.Fields{
"slot": slot,
}).Info("Reading")
path := filepath.Join(WorkDir, slot.FileName)
mu, _ := Mutexes.LoadOrStore(slot.Token, &sync.RWMutex{})
mu.(*sync.RWMutex).RLock()
fp, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
logrus.WithError(err).Error("Error while opening file for reading")
mu.(*sync.RWMutex).RUnlock()
return
}
buf := make([]byte, WsBufferSize)
response := ctx.Response.BodyWriter()
for {
read, err := fp.Read(buf)
_, _ = response.Write(buf[:read])
if err == io.EOF {
break
}
}
err = fp.Close()
if err != nil {
logrus.WithError(err).Error("Error while closing file for reading")
}
mu.(*sync.RWMutex).RUnlock()
}
func (api *WebApi) allocateUploadSlot(req *AllocateUploadSlotRequest) error {
slot := &UploadSlot{
MaxSize: req.MaxSize,
FileName: req.FileName,
Token: req.Token,
ToDisposeDate: req.ToDisposeDate,
UploadHook: req.UploadHook,
}
logrus.WithFields(logrus.Fields{
"slot": slot,
}).Info("Allocated new upload slot")
return api.db.Create(slot).Error
}