minimum viable

This commit is contained in:
simon987 2019-03-30 09:02:55 -04:00
parent 3470be6086
commit 76bc8293d6
5 changed files with 144 additions and 156 deletions

View File

@ -14,8 +14,10 @@ import (
var config struct { var config struct {
TrackerUrl string TrackerUrl string
TrackerProject string TrackerProject int
Token string TrackerAlias string
WsBucketScheme string
WsBucketHost string
ServerTimeout time.Duration ServerTimeout time.Duration
Recheck time.Duration Recheck time.Duration
ChunkSize int64 ChunkSize int64
@ -33,6 +35,9 @@ var onlineMode bool
const ( const (
ConfTrackerUrl = "server.url" ConfTrackerUrl = "server.url"
ConfTrackerProject = "server.project" ConfTrackerProject = "server.project"
ConfTrackerAlias = "server.alias"
ConfWsBucketScheme = "server.ws_bucket_scheme"
ConfWsBucketHost = "server.ws_bucket_host"
ConfServerTimeout = "server.timeout" ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck" ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown" ConfCooldown = "server.cooldown"
@ -66,6 +71,12 @@ func prepareConfig() {
pf.String(ConfTrackerProject, "3", "task_tracker project id") pf.String(ConfTrackerProject, "3", "task_tracker project id")
pf.String(ConfWsBucketScheme, "ws", "ws_bucket scheme")
pf.String(ConfWsBucketHost, "localhost:3020", "ws_bucket host") //todo def val
pf.String(ConfTrackerAlias, "crawler", "task_tracker worker alias")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout") pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs") pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
@ -152,7 +163,13 @@ func readConfig() {
} }
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/") config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
} }
config.TrackerProject = viper.GetString(ConfTrackerProject) config.TrackerProject = viper.GetInt(ConfTrackerProject)
config.TrackerAlias = viper.GetString(ConfTrackerAlias)
config.WsBucketHost = viper.GetString(ConfWsBucketHost)
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout) config.ServerTimeout = viper.GetDuration(ConfServerTimeout)

View File

@ -1,11 +1,14 @@
# OD-Database server settings # OD-Database server settings
server: server:
# Connection URL # Connection URL
url: http://localhost/api url: http://localhost:3010
# OD-Database project id (for crawling)
project: 3 project: 3
# Your worker alias
# Server auth token alias: changeme
token: # Websocket bucket host & scheme (ws/wss)
ws_bucket_host: localhost:3020
ws_bucket_scheme: ws
# Request timeout # Request timeout
timeout: 60s timeout: 60s

View File

@ -18,6 +18,8 @@ const (
type Task struct { type Task struct {
WebsiteId uint64 `json:"website_id"` WebsiteId uint64 `json:"website_id"`
Url string `json:"url"` Url string `json:"url"`
UploadToken string `json:"upload_token"`
TaskId int64
} }
type TaskResult struct { type TaskResult struct {

View File

@ -148,7 +148,7 @@ func (o *OD) Watch(results chan File) {
} }
// Upload results // Upload results
err = PushResult(&o.Result, f) err = PushResult(&o.Task, f)
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed uploading crawl results") Error("Failed uploading crawl results")

258
server.go
View File

@ -7,9 +7,11 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/fasthttp/websocket"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
@ -27,125 +29,57 @@ var serverClient = http.Client{
var serverUserAgent = "od-database-crawler/" + rootCmd.Version var serverUserAgent = "od-database-crawler/" + rootCmd.Version
//TODO: Move those elsewhere? func getOrCreateWorker() {
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
//todo: only keep necessary info
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Name string `json:"name"`
CloneUrl string `json:"clone_url"`
GitRepo string `json:"git_repo"`
Version string `json:"version"`
Motd string `json:"motd"`
Public bool `json:"public"`
Hidden bool `json:"hidden"`
Chain int64 `json:"chain"`
Paused bool `json:"paused"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Assignee int64 `json:"assignee"`
Retries int64 `json:"retries"`
MaxRetries int64 `json:"max_retries"`
Status int64 `json:"status"`
Recipe string `json:"recipe"`
MaxAssignTime int64 `json:"max_assign_time"`
AssignTime int64 `json:"assign_time"`
VerificationCount int64 `json:"verification_count"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}
func getOrCreateWorker() *TrackerWorker {
var worker TrackerWorker
if _, err := os.Stat("worker.json"); os.IsNotExist(err) { if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{ req := CreateTrackerWorkerRequest{
Alias: "crawler", //todo: load from config Alias: config.TrackerAlias,
} }
body, _ := json.Marshal(&req) body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body) buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf) resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
//todo: handle err
fmt.Println(resp.StatusCode)
workerResponse := CreateTrackerWorkerResponse{} workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body) respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse) _ = json.Unmarshal(respBody, &workerResponse)
//todo handle err
fmt.Println(workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker) workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData) _, _ = fp.Write(workerJsonData)
//todo: handle err
} else {
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
//todo: handle err
}
return &worker
}
func FetchTask() (t *Task, err error) {
//todo: this whole block should definitely be extracted elsewhere
if serverWorker == nil {
serverWorker = getOrCreateWorker()
//Request ASSIGN permission //Request ASSIGN permission
//todo: project ID should be stored as a int in the first place serverWorker = &workerResponse.Content.Worker
pid, _ := strconv.Atoi(config.TrackerProject)
accessReq, _ := json.Marshal(WorkerAccessRequest{ accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: int(pid), Project: config.TrackerProject,
Assign: true, Assign: true,
Submit: false, Submit: false,
}) })
buf := bytes.NewBuffer(accessReq) buf = bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf) res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil { if err != nil {
panic(err) panic(err)
} }
fmt.Println(res.StatusCode) logrus.WithFields(logrus.Fields{
"response": res.StatusCode,
}).Info("Requested ASSIGN permission")
} else {
var worker TrackerWorker
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
serverWorker = &worker
}
} }
res, err := serverClient.Get( func FetchTask() (t *Task, err error) {
config.TrackerUrl + "/task/get/" + config.TrackerProject)
if serverWorker == nil {
getOrCreateWorker()
}
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
if err != nil { if err != nil {
return return
@ -155,9 +89,6 @@ func FetchTask() (t *Task, err error) {
switch res.StatusCode { switch res.StatusCode {
case 200: case 200:
break break
//TODO: 404 should not happen.
case 404, 500:
return nil, nil
default: default:
return nil, fmt.Errorf("http %s", res.Status) return nil, fmt.Errorf("http %s", res.Status)
} }
@ -171,12 +102,12 @@ func FetchTask() (t *Task, err error) {
} }
if !jsonResponse.Ok { if !jsonResponse.Ok {
//The tracker will return Ok=false when no tasks are available if jsonResponse.Message == "No task available" {
err = errors.New(jsonResponse.Message) return nil, nil
return }
return nil, errors.New(jsonResponse.Message)
} }
fmt.Println(jsonResponse.Content.Task.Recipe)
task := Task{} task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task) err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok { if _, ok := err.(*json.SyntaxError); ok {
@ -186,12 +117,13 @@ func FetchTask() (t *Task, err error) {
} }
t = &task t = &task
t.TaskId = jsonResponse.Content.Task.Id
return return
} }
func PushResult(result *TaskResult, f *os.File) (err error) { func PushResult(task *Task, f *os.File) (err error) {
if result.WebsiteId == 0 { if task.WebsiteId == 0 {
// Not a real result, don't push // Not a real result, don't push
return nil return nil
} }
@ -202,10 +134,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return return
} }
err = uploadWebsocket(result.WebsiteId, f) err = uploadWebsocket(f, task.UploadToken)
if err != nil { if err != nil {
logrus.Errorf("Failed to upload file list: %s", err) logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId) err2 := releaseTask(task, TR_SKIP)
if err2 != nil { if err2 != nil {
logrus.Error(err2) logrus.Error(err2)
} }
@ -213,36 +145,45 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
} }
// Upload result ignoring errors // Upload result ignoring errors
uploadResult(result) _ = releaseTask(task, TR_OK)
return return
} }
func uploadWebsocket(websiteId uint64, f *os.File) error { func uploadWebsocket(f *os.File, token string) (err error) {
u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"}
header := http.Header{}
header.Add("X-Upload-Token", token)
conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
return
}
conn.EnableWriteCompression(true) //TODO: Is this necessary?
socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
_, _ = io.Copy(socketWriter, f)
err = socketWriter.Close()
if err != nil {
panic(err)
return
}
err = conn.Close()
if err != nil {
return
}
//TODO:
/*
* OD-DB will give you an Upload token when you fetch the task
* Open a WS connection at {ws_bucket_addr}/upload with the 'X-Upload-Token' as header
* Stream whole file as a single WS message
* Close connection
*/
return nil return nil
} }
func uploadResult(result *TaskResult) (err error) { func releaseTask(task *Task, taskResult ResultCode) (err error) {
//TODO:
/*
* When the file has been uploaded, just release the task with the TR_OK code
* Don't bother sending the ODDB-related stuff, You just need the task id
* Probably a good idea to wrap this around a new releaseTask() function
*/
req := releaseTaskRequest{ req := releaseTaskRequest{
TaskId: int64(result.WebsiteId), TaskId: task.TaskId,
ResultCode: result.ResultCode, ResultCode: taskResult,
// TODO What is verification // TODO Will implement verification in a later ODDB update
Verification: 0, Verification: 0,
} }
@ -250,13 +191,12 @@ func uploadResult(result *TaskResult) (err error) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
body := bytes.NewBuffer(resultEnc)
res, err := serverClient.PostForm( res, err := serverClient.Post(
config.TrackerUrl+"/task/release", config.TrackerUrl+"/task/release",
url.Values{ "application/json",
"token": {config.Token}, body,
"result": {string(resultEnc)},
},
) )
if err != nil { if err != nil {
return return
@ -270,12 +210,6 @@ func uploadResult(result *TaskResult) (err error) {
return return
} }
func CancelTask(websiteId uint64) (err error) {
//TODO: Maybe wrap this function around releaseTask(cancel: bool) ?
return
}
type ServerTripper struct{} type ServerTripper struct{}
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) { func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
@ -285,7 +219,7 @@ func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err er
if serverWorker != nil { if serverWorker != nil {
var content []byte var content []byte
if req.Method == "GET" { if req.Method == "GET" {
content = []byte("/task/get/" + config.TrackerProject) //todo: find a less retarded way of doing that content = []byte("/task/get/" + strconv.Itoa(config.TrackerProject))
} else { } else {
//todo: this is retarded and should be moved elsewhere //todo: this is retarded and should be moved elsewhere
buf, _ := ioutil.ReadAll(req.Body) buf, _ := ioutil.ReadAll(req.Body)
@ -316,19 +250,51 @@ const mimeJSON = "application/json"
// https://github.com/simon987/task_tracker/blob/master/api/models.go // https://github.com/simon987/task_tracker/blob/master/api/models.go
type submitTaskRequest struct {
Project int64 `json:"project"`
MaxRetries int64 `json:"max_retries"`
Recipe string `json:"recipe"`
Priority int64 `json:"priority"`
MaxAssignTime int64 `json:"max_assign_time"`
Hash64 int64 `json:"hash_u64"`
UniqueString string `json:"unique_string"`
VerificationCount int64 `json:"verification_count"`
}
type releaseTaskRequest struct { type releaseTaskRequest struct {
TaskId int64 `json:"task_id"` TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"` ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"` Verification int64 `json:"verification"`
} }
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Recipe string `json:"recipe"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}