mirror of
				https://github.com/simon987/ws_bucket.git
				synced 2025-10-31 08:26:55 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			260 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			260 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package api
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"github.com/fasthttp/websocket"
 | |
| 	"github.com/sirupsen/logrus"
 | |
| 	"github.com/valyala/fasthttp"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"os/exec"
 | |
| 	"path/filepath"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| const WsBufferSize = 4096
 | |
| 
 | |
| var Mutexes sync.Map
 | |
| var upgrader = websocket.FastHTTPUpgrader{
 | |
| 	ReadBufferSize:    WsBufferSize,
 | |
| 	WriteBufferSize:   WsBufferSize,
 | |
| 	EnableCompression: true,
 | |
| 	CheckOrigin: func(ctx *fasthttp.RequestCtx) bool {
 | |
| 		return true
 | |
| 	},
 | |
| }
 | |
| 
 | |
| func (api *WebApi) AllocateUploadSlot(ctx *fasthttp.RequestCtx) {
 | |
| 
 | |
| 	err := validateRequest(ctx)
 | |
| 	if err != nil {
 | |
| 		ctx.Response.Header.SetStatusCode(401)
 | |
| 		Json(GenericResponse{
 | |
| 			Ok:      false,
 | |
| 			Message: err.Error(),
 | |
| 		}, ctx)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	req := &AllocateUploadSlotRequest{}
 | |
| 	err = json.Unmarshal(ctx.Request.Body(), req)
 | |
| 	if err != nil {
 | |
| 		ctx.Response.Header.SetStatusCode(400)
 | |
| 		Json(GenericResponse{
 | |
| 			Ok: false,
 | |
| 		}, ctx)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if !req.IsValid() {
 | |
| 		ctx.Response.Header.SetStatusCode(400)
 | |
| 		Json(GenericResponse{
 | |
| 			Ok: false,
 | |
| 		}, ctx)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	err = api.allocateUploadSlot(req)
 | |
| 
 | |
| 	if err == nil {
 | |
| 		Json(GenericResponse{
 | |
| 			Ok: true,
 | |
| 		}, ctx)
 | |
| 	} else {
 | |
| 		Json(GenericResponse{
 | |
| 			Ok:      false,
 | |
| 			Message: err.Error(),
 | |
| 		}, ctx)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (api *WebApi) Upload(ctx *fasthttp.RequestCtx) {
 | |
| 
 | |
| 	token := string(ctx.Request.Header.Peek("X-Upload-Token"))
 | |
| 
 | |
| 	if token == "" {
 | |
| 		token = string(ctx.Request.URI().QueryArgs().Peek("token"))
 | |
| 	}
 | |
| 
 | |
| 	slot := UploadSlot{}
 | |
| 	err := api.db.Where("token=?", token).First(&slot).Error
 | |
| 	if err != nil {
 | |
| 		ctx.Response.Header.SetStatusCode(400)
 | |
| 		logrus.WithError(err).WithFields(logrus.Fields{
 | |
| 			"token": token,
 | |
| 		}).Warning("Upload slot not found")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	logrus.WithFields(logrus.Fields{
 | |
| 		"slot": slot,
 | |
| 	}).Info("Upgrading connection")
 | |
| 
 | |
| 	err = upgrader.Upgrade(ctx, func(ws *websocket.Conn) {
 | |
| 		defer ws.Close()
 | |
| 
 | |
| 		mt, reader, err := ws.NextReader()
 | |
| 		if err != nil {
 | |
| 			logrus.WithFields(logrus.Fields{
 | |
| 				"token": token,
 | |
| 			}).Warning("Client disconnected before sending the first byte")
 | |
| 			return
 | |
| 		}
 | |
| 		if mt != websocket.BinaryMessage {
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		mu, _ := Mutexes.LoadOrStore(slot.Token, &sync.RWMutex{})
 | |
| 		mu.(*sync.RWMutex).Lock()
 | |
| 		path := filepath.Join(WorkDir, slot.FileName)
 | |
| 		fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
 | |
| 		if err != nil {
 | |
| 			logrus.WithError(err).Error("Error while opening file for writing")
 | |
| 		}
 | |
| 
 | |
| 		buf := make([]byte, WsBufferSize)
 | |
| 		totalRead := int64(0)
 | |
| 		for totalRead < slot.MaxSize {
 | |
| 			read, err := reader.Read(buf)
 | |
| 
 | |
| 			var toWrite int
 | |
| 			if totalRead+int64(read) > slot.MaxSize {
 | |
| 				toWrite = int(slot.MaxSize - totalRead)
 | |
| 			} else {
 | |
| 				toWrite = read
 | |
| 			}
 | |
| 
 | |
| 			_, _ = fp.Write(buf[:toWrite])
 | |
| 			if err == io.EOF {
 | |
| 				break
 | |
| 			}
 | |
| 			totalRead += int64(read)
 | |
| 		}
 | |
| 
 | |
| 		logrus.WithFields(logrus.Fields{
 | |
| 			"totalRead": totalRead,
 | |
| 		}).Info("Finished reading")
 | |
| 		err = fp.Close()
 | |
| 		if err != nil {
 | |
| 			logrus.WithError(err).Error("Error while closing file")
 | |
| 		}
 | |
| 		mu.(*sync.RWMutex).Unlock()
 | |
| 		mu.(*sync.RWMutex).RLock()
 | |
| 
 | |
| 		executeUploadHook(slot)
 | |
| 
 | |
| 		mu.(*sync.RWMutex).RUnlock()
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		logrus.WithError(err).Error("Error while upgrading connexion")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func executeUploadHook(slot UploadSlot) {
 | |
| 
 | |
| 	if slot.UploadHook == "" {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	path := filepath.Join(WorkDir, slot.FileName)
 | |
| 
 | |
| 	commandStr := strings.Replace(slot.UploadHook, "$1", "\""+path+"\"", -1)
 | |
| 	cmd := exec.Command("bash", "-c", commandStr)
 | |
| 	output, err := cmd.CombinedOutput()
 | |
| 
 | |
| 	logrus.WithFields(logrus.Fields{
 | |
| 		"output":     string(output),
 | |
| 		"err":        err,
 | |
| 		"commandStr": commandStr,
 | |
| 	}).Info("Execute upload hook")
 | |
| }
 | |
| 
 | |
| func (api *WebApi) UploadSlotInfo(ctx *fasthttp.RequestCtx) {
 | |
| 	tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token"))
 | |
| 	if tokenStr == "" {
 | |
| 		tokenStr = string(ctx.Request.URI().QueryArgs().Peek("token"))
 | |
| 	}
 | |
| 
 | |
| 	slot := UploadSlot{}
 | |
| 	err := api.db.Where("token=?", tokenStr).First(&slot).Error
 | |
| 
 | |
| 	if err != nil {
 | |
| 		ctx.Response.Header.SetStatusCode(404)
 | |
| 		logrus.WithError(err).WithFields(logrus.Fields{
 | |
| 			"token": tokenStr,
 | |
| 		}).Warning("Upload slot not found")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	Json(GetUploadSlotResponse{
 | |
| 		UploadSlot: slot,
 | |
| 	}, ctx)
 | |
| 	return
 | |
| }
 | |
| func (api *WebApi) ReadUploadSlot(ctx *fasthttp.RequestCtx) {
 | |
| 
 | |
| 	tokenStr := string(ctx.Request.Header.Peek("X-Upload-Token"))
 | |
| 	if tokenStr == "" {
 | |
| 		tokenStr = string(ctx.Request.URI().QueryArgs().Peek("token"))
 | |
| 	}
 | |
| 
 | |
| 	slot := UploadSlot{}
 | |
| 	err := api.db.Where("token=?", tokenStr).First(&slot).Error
 | |
| 
 | |
| 	if err != nil {
 | |
| 		ctx.Response.Header.SetStatusCode(404)
 | |
| 		logrus.WithError(err).WithFields(logrus.Fields{
 | |
| 			"token": tokenStr,
 | |
| 		}).Warning("Upload slot not found")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	logrus.WithFields(logrus.Fields{
 | |
| 		"slot": slot,
 | |
| 	}).Info("Reading")
 | |
| 
 | |
| 	path := filepath.Join(WorkDir, slot.FileName)
 | |
| 
 | |
| 	mu, _ := Mutexes.LoadOrStore(slot.Token, &sync.RWMutex{})
 | |
| 	mu.(*sync.RWMutex).RLock()
 | |
| 	fp, err := os.OpenFile(path, os.O_RDONLY, 0600)
 | |
| 	if err != nil {
 | |
| 		logrus.WithError(err).Error("Error while opening file for reading")
 | |
| 		mu.(*sync.RWMutex).RUnlock()
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	buf := make([]byte, WsBufferSize)
 | |
| 	response := ctx.Response.BodyWriter()
 | |
| 	for {
 | |
| 		read, err := fp.Read(buf)
 | |
| 		_, _ = response.Write(buf[:read])
 | |
| 		if err == io.EOF {
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	err = fp.Close()
 | |
| 	if err != nil {
 | |
| 		logrus.WithError(err).Error("Error while closing file for reading")
 | |
| 	}
 | |
| 	mu.(*sync.RWMutex).RUnlock()
 | |
| }
 | |
| 
 | |
| func (api *WebApi) allocateUploadSlot(req *AllocateUploadSlotRequest) error {
 | |
| 
 | |
| 	slot := &UploadSlot{
 | |
| 		MaxSize:       req.MaxSize,
 | |
| 		FileName:      req.FileName,
 | |
| 		Token:         req.Token,
 | |
| 		ToDisposeDate: req.ToDisposeDate,
 | |
| 		UploadHook:    req.UploadHook,
 | |
| 	}
 | |
| 
 | |
| 	logrus.WithFields(logrus.Fields{
 | |
| 		"slot": slot,
 | |
| 	}).Info("Allocated new upload slot")
 | |
| 
 | |
| 	return api.db.Create(slot).Error
 | |
| }
 |