From 76bc8293d68a892d1fe0bf1d2eff7409d8db78e6 Mon Sep 17 00:00:00 2001 From: simon987 Date: Sat, 30 Mar 2019 09:02:55 -0400 Subject: [PATCH] minimum viable --- config.go | 23 ++++- config.yml | 11 ++- model.go | 6 +- scheduler.go | 2 +- server.go | 258 ++++++++++++++++++++++----------------------------- 5 files changed, 144 insertions(+), 156 deletions(-) diff --git a/config.go b/config.go index 718410b..a459933 100644 --- a/config.go +++ b/config.go @@ -14,8 +14,10 @@ import ( var config struct { TrackerUrl string - TrackerProject string - Token string + TrackerProject int + TrackerAlias string + WsBucketScheme string + WsBucketHost string ServerTimeout time.Duration Recheck time.Duration ChunkSize int64 @@ -33,6 +35,9 @@ var onlineMode bool const ( ConfTrackerUrl = "server.url" ConfTrackerProject = "server.project" + ConfTrackerAlias = "server.alias" + ConfWsBucketScheme = "server.ws_bucket_scheme" + ConfWsBucketHost = "server.ws_bucket_host" ConfServerTimeout = "server.timeout" ConfRecheck = "server.recheck" ConfCooldown = "server.cooldown" @@ -66,6 +71,12 @@ func prepareConfig() { pf.String(ConfTrackerProject, "3", "task_tracker project id") + pf.String(ConfWsBucketScheme, "ws", "ws_bucket scheme") + + pf.String(ConfWsBucketHost, "localhost:3020", "ws_bucket host") //todo def val + + pf.String(ConfTrackerAlias, "crawler", "task_tracker worker alias") + pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout") pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs") @@ -152,7 +163,13 @@ func readConfig() { } config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/") } - config.TrackerProject = viper.GetString(ConfTrackerProject) + config.TrackerProject = viper.GetInt(ConfTrackerProject) + + config.TrackerAlias = viper.GetString(ConfTrackerAlias) + + config.WsBucketHost = viper.GetString(ConfWsBucketHost) + + config.WsBucketScheme = viper.GetString(ConfWsBucketScheme) config.ServerTimeout = viper.GetDuration(ConfServerTimeout) diff --git a/config.yml b/config.yml index 3408c0d..00314d9 100644 --- a/config.yml +++ b/config.yml @@ -1,11 +1,14 @@ # OD-Database server settings server: # Connection URL - url: http://localhost/api + url: http://localhost:3010 + # OD-Database project id (for crawling) project: 3 - - # Server auth token - token: + # Your worker alias + alias: changeme + # Websocket bucket host & scheme (ws/wss) + ws_bucket_host: localhost:3020 + ws_bucket_scheme: ws # Request timeout timeout: 60s diff --git a/model.go b/model.go index 629c4ff..0c0493f 100644 --- a/model.go +++ b/model.go @@ -16,8 +16,10 @@ const ( ) type Task struct { - WebsiteId uint64 `json:"website_id"` - Url string `json:"url"` + WebsiteId uint64 `json:"website_id"` + Url string `json:"url"` + UploadToken string `json:"upload_token"` + TaskId int64 } type TaskResult struct { diff --git a/scheduler.go b/scheduler.go index 716243a..816d1ea 100644 --- a/scheduler.go +++ b/scheduler.go @@ -148,7 +148,7 @@ func (o *OD) Watch(results chan File) { } // Upload results - err = PushResult(&o.Result, f) + err = PushResult(&o.Task, f) if err != nil { logrus.WithError(err). Error("Failed uploading crawl results") diff --git a/server.go b/server.go index 2f2f550..a9ce324 100644 --- a/server.go +++ b/server.go @@ -7,9 +7,11 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/fasthttp/websocket" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/time/rate" + "io" "io/ioutil" "net/http" "net/url" @@ -27,125 +29,57 @@ var serverClient = http.Client{ var serverUserAgent = "od-database-crawler/" + rootCmd.Version -//TODO: Move those elsewhere? -type WorkerAccessRequest struct { - Assign bool `json:"assign"` - Submit bool `json:"submit"` - Project int `json:"project"` -} - -//todo: only keep necessary info -type FetchTaskResponse struct { - Ok bool `json:"ok"` - Message string `json:"message"` - Content struct { - Task struct { - Id int64 `json:"id"` - Priority int64 `json:"priority"` - Project struct { - Id int64 `json:"id"` - Priority int64 `json:"priority"` - Name string `json:"name"` - CloneUrl string `json:"clone_url"` - GitRepo string `json:"git_repo"` - Version string `json:"version"` - Motd string `json:"motd"` - Public bool `json:"public"` - Hidden bool `json:"hidden"` - Chain int64 `json:"chain"` - Paused bool `json:"paused"` - AssignRate rate.Limit `json:"assign_rate"` - SubmitRate rate.Limit `json:"submit_rate"` - } `json:"project"` - Assignee int64 `json:"assignee"` - Retries int64 `json:"retries"` - MaxRetries int64 `json:"max_retries"` - Status int64 `json:"status"` - Recipe string `json:"recipe"` - MaxAssignTime int64 `json:"max_assign_time"` - AssignTime int64 `json:"assign_time"` - VerificationCount int64 `json:"verification_count"` - } `json:"task"` - } `json:"content"` -} - -type TrackerWorker struct { - Alias string `json:"alias"` - Id int `json:"id"` - Secret []byte `json:"secret"` -} - -type CreateTrackerWorkerResponse struct { - Ok bool `json:"ok"` - Message string `json:"message"` - Content struct { - Worker TrackerWorker `json:"worker"` - } `json:"content"` -} - -type CreateTrackerWorkerRequest struct { - Alias string `json:"alias"` -} - -func getOrCreateWorker() *TrackerWorker { - - var worker TrackerWorker +func getOrCreateWorker() { if _, err := os.Stat("worker.json"); os.IsNotExist(err) { req := CreateTrackerWorkerRequest{ - Alias: "crawler", //todo: load from config + Alias: config.TrackerAlias, } body, _ := json.Marshal(&req) buf := bytes.NewBuffer(body) resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf) - //todo: handle err - - fmt.Println(resp.StatusCode) workerResponse := CreateTrackerWorkerResponse{} respBody, _ := ioutil.ReadAll(resp.Body) _ = json.Unmarshal(respBody, &workerResponse) - //todo handle err - fmt.Println(workerResponse) workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker) fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) _, _ = fp.Write(workerJsonData) - //todo: handle err - } else { - fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600) - workerJsonData, _ := ioutil.ReadAll(fp) - _ = json.Unmarshal(workerJsonData, &worker) - //todo: handle err - } - - return &worker -} - -func FetchTask() (t *Task, err error) { - - //todo: this whole block should definitely be extracted elsewhere - if serverWorker == nil { - serverWorker = getOrCreateWorker() //Request ASSIGN permission - //todo: project ID should be stored as a int in the first place - pid, _ := strconv.Atoi(config.TrackerProject) + serverWorker = &workerResponse.Content.Worker accessReq, _ := json.Marshal(WorkerAccessRequest{ - Project: int(pid), + Project: config.TrackerProject, Assign: true, Submit: false, }) - buf := bytes.NewBuffer(accessReq) + buf = bytes.NewBuffer(accessReq) res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf) if err != nil { panic(err) } - fmt.Println(res.StatusCode) + logrus.WithFields(logrus.Fields{ + "response": res.StatusCode, + }).Info("Requested ASSIGN permission") + } else { + var worker TrackerWorker + + fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600) + workerJsonData, _ := ioutil.ReadAll(fp) + _ = json.Unmarshal(workerJsonData, &worker) + + serverWorker = &worker + } +} + +func FetchTask() (t *Task, err error) { + + if serverWorker == nil { + getOrCreateWorker() } - res, err := serverClient.Get( - config.TrackerUrl + "/task/get/" + config.TrackerProject) + res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject)) if err != nil { return @@ -155,9 +89,6 @@ func FetchTask() (t *Task, err error) { switch res.StatusCode { case 200: break - //TODO: 404 should not happen. - case 404, 500: - return nil, nil default: return nil, fmt.Errorf("http %s", res.Status) } @@ -171,12 +102,12 @@ func FetchTask() (t *Task, err error) { } if !jsonResponse.Ok { - //The tracker will return Ok=false when no tasks are available - err = errors.New(jsonResponse.Message) - return + if jsonResponse.Message == "No task available" { + return nil, nil + } + return nil, errors.New(jsonResponse.Message) } - fmt.Println(jsonResponse.Content.Task.Recipe) task := Task{} err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task) if _, ok := err.(*json.SyntaxError); ok { @@ -186,12 +117,13 @@ func FetchTask() (t *Task, err error) { } t = &task + t.TaskId = jsonResponse.Content.Task.Id return } -func PushResult(result *TaskResult, f *os.File) (err error) { - if result.WebsiteId == 0 { +func PushResult(task *Task, f *os.File) (err error) { + if task.WebsiteId == 0 { // Not a real result, don't push return nil } @@ -202,10 +134,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) { return } - err = uploadWebsocket(result.WebsiteId, f) + err = uploadWebsocket(f, task.UploadToken) if err != nil { logrus.Errorf("Failed to upload file list: %s", err) - err2 := CancelTask(result.WebsiteId) + err2 := releaseTask(task, TR_SKIP) if err2 != nil { logrus.Error(err2) } @@ -213,36 +145,45 @@ func PushResult(result *TaskResult, f *os.File) (err error) { } // Upload result ignoring errors - uploadResult(result) + _ = releaseTask(task, TR_OK) return } -func uploadWebsocket(websiteId uint64, f *os.File) error { +func uploadWebsocket(f *os.File, token string) (err error) { + + u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"} + + header := http.Header{} + header.Add("X-Upload-Token", token) + conn, _, err := websocket.DefaultDialer.Dial(u.String(), header) + if err != nil { + return + } + + conn.EnableWriteCompression(true) //TODO: Is this necessary? + + socketWriter, _ := conn.NextWriter(websocket.BinaryMessage) + _, _ = io.Copy(socketWriter, f) + err = socketWriter.Close() + if err != nil { + panic(err) + return + } + err = conn.Close() + if err != nil { + return + } - //TODO: - /* - * OD-DB will give you an Upload token when you fetch the task - * Open a WS connection at {ws_bucket_addr}/upload with the 'X-Upload-Token' as header - * Stream whole file as a single WS message - * Close connection - */ return nil } -func uploadResult(result *TaskResult) (err error) { - - //TODO: - /* - * When the file has been uploaded, just release the task with the TR_OK code - * Don't bother sending the ODDB-related stuff, You just need the task id - * Probably a good idea to wrap this around a new releaseTask() function - */ +func releaseTask(task *Task, taskResult ResultCode) (err error) { req := releaseTaskRequest{ - TaskId: int64(result.WebsiteId), - ResultCode: result.ResultCode, - // TODO What is verification + TaskId: task.TaskId, + ResultCode: taskResult, + // TODO Will implement verification in a later ODDB update Verification: 0, } @@ -250,13 +191,12 @@ func uploadResult(result *TaskResult) (err error) { if err != nil { panic(err) } + body := bytes.NewBuffer(resultEnc) - res, err := serverClient.PostForm( + res, err := serverClient.Post( config.TrackerUrl+"/task/release", - url.Values{ - "token": {config.Token}, - "result": {string(resultEnc)}, - }, + "application/json", + body, ) if err != nil { return @@ -270,12 +210,6 @@ func uploadResult(result *TaskResult) (err error) { return } -func CancelTask(websiteId uint64) (err error) { - - //TODO: Maybe wrap this function around releaseTask(cancel: bool) ? - return -} - type ServerTripper struct{} func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) { @@ -285,7 +219,7 @@ func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err er if serverWorker != nil { var content []byte if req.Method == "GET" { - content = []byte("/task/get/" + config.TrackerProject) //todo: find a less retarded way of doing that + content = []byte("/task/get/" + strconv.Itoa(config.TrackerProject)) } else { //todo: this is retarded and should be moved elsewhere buf, _ := ioutil.ReadAll(req.Body) @@ -316,19 +250,51 @@ const mimeJSON = "application/json" // https://github.com/simon987/task_tracker/blob/master/api/models.go -type submitTaskRequest struct { - Project int64 `json:"project"` - MaxRetries int64 `json:"max_retries"` - Recipe string `json:"recipe"` - Priority int64 `json:"priority"` - MaxAssignTime int64 `json:"max_assign_time"` - Hash64 int64 `json:"hash_u64"` - UniqueString string `json:"unique_string"` - VerificationCount int64 `json:"verification_count"` -} - type releaseTaskRequest struct { TaskId int64 `json:"task_id"` ResultCode ResultCode `json:"result"` Verification int64 `json:"verification"` } + +type WorkerAccessRequest struct { + Assign bool `json:"assign"` + Submit bool `json:"submit"` + Project int `json:"project"` +} + +type FetchTaskResponse struct { + Ok bool `json:"ok"` + Message string `json:"message"` + Content struct { + Task struct { + Id int64 `json:"id"` + Priority int64 `json:"priority"` + Project struct { + Id int64 `json:"id"` + Name string `json:"name"` + Version string `json:"version"` + AssignRate rate.Limit `json:"assign_rate"` + SubmitRate rate.Limit `json:"submit_rate"` + } `json:"project"` + Recipe string `json:"recipe"` + } `json:"task"` + } `json:"content"` +} + +type TrackerWorker struct { + Alias string `json:"alias"` + Id int `json:"id"` + Secret []byte `json:"secret"` +} + +type CreateTrackerWorkerResponse struct { + Ok bool `json:"ok"` + Message string `json:"message"` + Content struct { + Worker TrackerWorker `json:"worker"` + } `json:"content"` +} + +type CreateTrackerWorkerRequest struct { + Alias string `json:"alias"` +}