mirror of
https://github.com/simon987/ws_bucket.git
synced 2025-04-10 14:06:46 +00:00
260 lines
5.6 KiB
Go
260 lines
5.6 KiB
Go
package api
|
|
|
|
import (
|
|
"encoding/json"
|
|
"github.com/fasthttp/websocket"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/valyala/fasthttp"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
)
|
|
|
|
const WsBufferSize = 4096
|
|
|
|
var Mutexes sync.Map
|
|
var upgrader = websocket.FastHTTPUpgrader{
|
|
ReadBufferSize: WsBufferSize,
|
|
WriteBufferSize: WsBufferSize,
|
|
EnableCompression: true,
|
|
CheckOrigin: func(ctx *fasthttp.RequestCtx) bool {
|
|
return true
|
|
},
|
|
}
|
|
|
|
func (api *WebApi) AllocateUploadSlot(ctx *fasthttp.RequestCtx) {
|
|
|
|
err := validateRequest(ctx)
|
|
if err != nil {
|
|
ctx.Response.Header.SetStatusCode(401)
|
|
Json(GenericResponse{
|
|
Ok: false,
|
|
Message: err.Error(),
|
|
}, ctx)
|
|
return
|
|
}
|
|
|
|
req := &AllocateUploadSlotRequest{}
|
|
err = json.Unmarshal(ctx.Request.Body(), req)
|
|
if err != nil {
|
|
ctx.Response.Header.SetStatusCode(400)
|
|
Json(GenericResponse{
|
|
Ok: false,
|
|
}, ctx)
|
|
return
|
|
}
|
|
|
|
if !req.IsValid() {
|
|
ctx.Response.Header.SetStatusCode(400)
|
|
Json(GenericResponse{
|
|
Ok: false,
|
|
}, ctx)
|
|
return
|
|
}
|
|
|
|
err = api.allocateUploadSlot(req)
|
|
|
|
if err == nil {
|
|
Json(GenericResponse{
|
|
Ok: true,
|
|
}, ctx)
|
|
} else {
|
|
Json(GenericResponse{
|
|
Ok: false,
|
|
Message: err.Error(),
|
|
}, ctx)
|
|
}
|
|
}
|
|
|
|
func (api *WebApi) Upload(ctx *fasthttp.RequestCtx) {
|
|
|
|
token := string(ctx.Request.Header.Peek("X-Upload-Token"))
|
|
|
|
if token == "" {
|
|
token = string(ctx.Request.URI().QueryArgs().Peek("token"))
|
|
}
|
|
|
|
slot := UploadSlot{}
|
|
err := api.db.Where("token=?", token).First(&slot).Error
|
|
if err != nil {
|
|
ctx.Response.Header.SetStatusCode(400)
|
|
logrus.WithError(err).WithFields(logrus.Fields{
|
|
"token": token,
|
|
}).Warning("Upload slot not found")
|
|
return
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"slot": slot,
|
|
}).Info("Upgrading connection")
|
|
|
|
err = upgrader.Upgrade(ctx, func(ws *websocket.Conn) {
|
|
defer ws.Close()
|
|
|
|
mt, reader, err := ws.NextReader()
|
|
if err != nil {
|
|
logrus.WithFields(logrus.Fields{
|
|
"token": token,
|
|
}).Warning("Client disconnected before sending the first byte")
|
|
return
|
|
}
|
|
if mt != websocket.BinaryMessage {
|
|
return
|
|
}
|
|
|
|
mu, _ := Mutexes.LoadOrStore(slot.Token, &sync.RWMutex{})
|
|
mu.(*sync.RWMutex).Lock()
|
|
path := filepath.Join(WorkDir, slot.FileName)
|
|
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Error while opening file for writing")
|
|
}
|
|
|
|
buf := make([]byte, WsBufferSize)
|
|
totalRead := int64(0)
|
|
for totalRead < slot.MaxSize {
|
|
read, err := reader.Read(buf)
|
|
|
|
var toWrite int
|
|
if totalRead+int64(read) > slot.MaxSize {
|
|
toWrite = int(slot.MaxSize - totalRead)
|
|
} else {
|
|
toWrite = read
|
|
}
|
|
|
|
_, _ = fp.Write(buf[:toWrite])
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
totalRead += int64(read)
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"totalRead": totalRead,
|
|
}).Info("Finished reading")
|
|
err = fp.Close()
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Error while closing file")
|
|
}
|
|
mu.(*sync.RWMutex).Unlock()
|
|
mu.(*sync.RWMutex).RLock()
|
|
|
|
executeUploadHook(slot)
|
|
|
|
mu.(*sync.RWMutex).RUnlock()
|
|
})
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Error while upgrading connexion")
|
|
}
|
|
}
|
|
|
|
func executeUploadHook(slot UploadSlot) {
|
|
|
|
if slot.UploadHook == "" {
|
|
return
|
|
}
|
|
|
|
path := filepath.Join(WorkDir, slot.FileName)
|
|
|
|
commandStr := strings.Replace(slot.UploadHook, "$1", "\""+path+"\"", -1)
|
|
cmd := exec.Command("bash", "-c", commandStr)
|
|
output, err := cmd.CombinedOutput()
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"output": string(output),
|
|
"err": err,
|
|
"commandStr": commandStr,
|
|
}).Info("Execute upload hook")
|
|
}
|
|
|
|
func (api *WebApi) UploadSlotInfo(ctx *fasthttp.RequestCtx) {
|
|
tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token"))
|
|
if tokenStr == "" {
|
|
tokenStr = string(ctx.Request.URI().QueryArgs().Peek("token"))
|
|
}
|
|
|
|
slot := UploadSlot{}
|
|
err := api.db.Where("token=?", tokenStr).First(&slot).Error
|
|
|
|
if err != nil {
|
|
ctx.Response.Header.SetStatusCode(404)
|
|
logrus.WithError(err).WithFields(logrus.Fields{
|
|
"token": tokenStr,
|
|
}).Warning("Upload slot not found")
|
|
return
|
|
}
|
|
|
|
Json(GetUploadSlotResponse{
|
|
UploadSlot: slot,
|
|
}, ctx)
|
|
return
|
|
}
|
|
func (api *WebApi) ReadUploadSlot(ctx *fasthttp.RequestCtx) {
|
|
|
|
tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token"))
|
|
if tokenStr == "" {
|
|
tokenStr = string(ctx.Request.URI().QueryArgs().Peek("token"))
|
|
}
|
|
|
|
slot := UploadSlot{}
|
|
err := api.db.Where("token=?", tokenStr).First(&slot).Error
|
|
|
|
if err != nil {
|
|
ctx.Response.Header.SetStatusCode(404)
|
|
logrus.WithError(err).WithFields(logrus.Fields{
|
|
"token": tokenStr,
|
|
}).Warning("Upload slot not found")
|
|
return
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"slot": slot,
|
|
}).Info("Reading")
|
|
|
|
path := filepath.Join(WorkDir, slot.FileName)
|
|
|
|
mu, _ := Mutexes.LoadOrStore(slot.Token, &sync.RWMutex{})
|
|
mu.(*sync.RWMutex).RLock()
|
|
fp, err := os.OpenFile(path, os.O_RDONLY, 0600)
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Error while opening file for reading")
|
|
mu.(*sync.RWMutex).RUnlock()
|
|
return
|
|
}
|
|
|
|
buf := make([]byte, WsBufferSize)
|
|
response := ctx.Response.BodyWriter()
|
|
for {
|
|
read, err := fp.Read(buf)
|
|
_, _ = response.Write(buf[:read])
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
}
|
|
err = fp.Close()
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Error while closing file for reading")
|
|
}
|
|
mu.(*sync.RWMutex).RUnlock()
|
|
}
|
|
|
|
func (api *WebApi) allocateUploadSlot(req *AllocateUploadSlotRequest) error {
|
|
|
|
slot := &UploadSlot{
|
|
MaxSize: req.MaxSize,
|
|
FileName: req.FileName,
|
|
Token: req.Token,
|
|
ToDisposeDate: req.ToDisposeDate,
|
|
UploadHook: req.UploadHook,
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"slot": slot,
|
|
}).Info("Allocated new upload slot")
|
|
|
|
return api.db.Create(slot).Error
|
|
}
|