From abf069f946e028d22c2d94f5343d1d95d195d757 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Sat, 27 Oct 2018 04:10:08 +0200 Subject: [PATCH] Bits of ODDB API --- crawler.go | 47 +++++++++++++ main.go | 23 +++++++ manager.go | 182 +++++++++++++++++++++++++++++++++++++++++++++++++ remote_http.go | 32 +++++++++ tasks.go | 16 +++++ 5 files changed, 300 insertions(+) create mode 100644 crawler.go create mode 100644 main.go create mode 100644 manager.go create mode 100644 remote_http.go create mode 100644 tasks.go diff --git a/crawler.go b/crawler.go new file mode 100644 index 0000000..092b3bc --- /dev/null +++ b/crawler.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "net/url" +) + +const ( + maxTimeoutRetries = 3 +) + +type File struct { + Name string `json:"name"` + Size int64 `json:"size"` + Mtime int `json:"mtime"` + Path string `json:"path"` + IsDir bool `json:"-"` +} + +type RemoteDir interface { + ListDir(path string) +} + +func GetRemoteDir(u url.URL) (RemoteDir, error) { + switch u.Scheme { + case "http", "https": + return nil, nil //&HttpDirectory{}, nil + default: + return nil, fmt.Errorf("unsupported scheme: %s", u.Scheme) + } +} + +type CrawlResult struct { + FileCount int + Status string +} + +type RemoteDirCrawler struct { + Url string + MaxThreads int + // CrawledPaths + StatusCode string +} + +func (r *RemoteDirCrawler) CrawlDir(outFile string) CrawlResult { + return CrawlResult{} +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..532ca36 --- /dev/null +++ b/main.go @@ -0,0 +1,23 @@ +package main + +import ( + "github.com/sirupsen/logrus" + "github.com/spf13/viper" +) + +type Config struct { + ServerUrl string + Token string +} + + +func main2() { + var err error + + viper.SetConfigName("config.yml") + viper.SetConfigType("yml") + err = viper.ReadInConfig() + if err != nil { + logrus.Fatal(err) + } +} diff --git a/manager.go b/manager.go new file mode 100644 index 0000000..a04aa32 --- /dev/null +++ b/manager.go @@ -0,0 +1,182 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/sirupsen/logrus" + "io" + "mime/multipart" + "net/http" + "net/url" + "os" + "path/filepath" + "strconv" + "strings" +) + +const ( + fileListChunkSize int64 = 5000000 // 5 mb +) + +var serverClient = http.DefaultClient + +func (c *Config) FetchTask() (t *Task, err error) { + escToken, _ := json.Marshal(c.Token) + payload := `{"token":` + string(escToken) + `}` + + req, err := http.NewRequest( + http.MethodPost, + c.ServerUrl + "/task/get", + strings.NewReader(payload)) + if err != nil { return } + + res, err := serverClient.Do(req) + if err != nil { return } + defer res.Body.Close() + + if res.StatusCode != 200 { + err = fmt.Errorf("http %s", res.Status) + return + } + + t = new(Task) + err = json.NewDecoder(res.Body).Decode(t) + if err != nil { return } + + return +} + +func (c *Config) PushResult(result *TaskResult) (err error) { + filePath := filepath.Join( + ".", "crawled", + fmt.Sprintf("%d.json", result.WebsiteId)) + + defer os.Remove(filePath) + + f, err := os.Open(filePath) + if os.IsNotExist(err) { + err = fmt.Errorf("cannot upload result: %s does not exist", filePath) + return + } else if err != nil { + return + } + defer f.Close() + + err = c.uploadChunks(result.WebsiteId, f) + if err != nil { + logrus.Errorf("Failed to upload file list: %s", err) + err2 := c.CancelTask(result.WebsiteId) + if err2 != nil { + logrus.Error(err2) + } + return + } + + err = c.uploadResult(result) + if err != nil { + logrus.Errorf("Failed to upload result: %s", err) + err2 := c.CancelTask(result.WebsiteId) + if err2 != nil { + logrus.Error(err2) + } + return + } + + return +} + +func (c *Config) uploadChunks(websiteId uint64, f *os.File) (err error) { + for iter := 1; iter > 0; iter++ { + // TODO Stream with io.Pipe? + var b bytes.Buffer + + multi := multipart.NewWriter(&b) + + // Set upload fields + err = multi.WriteField("token", c.Token) + if err != nil { return } + err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId)) + if err != nil { return } + + // Copy chunk to file_list + formFile, err := multi.CreateFormFile("file_list", "file_list") + _, err = io.CopyN(formFile, f, fileListChunkSize) + if err == io.EOF { + break + } else if err == io.ErrUnexpectedEOF { + err = nil + // Break at end of iteration + iter = -420 + } + + req, err := http.NewRequest( + http.MethodPost, + c.ServerUrl + "/task/upload", + &b) + if err != nil { return err } + + res, err := serverClient.Do(req) + if err != nil { return err } + res.Body.Close() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("failed to upload list part %d: %s", + iter, res.Status) + } + + logrus.Infof("Uploading file list part %d: %s", + iter, res.Status) + } + return +} + +func (c *Config) uploadResult(result *TaskResult) (err error) { + resultEnc, err := json.Marshal(result) + if err != nil { panic(err) } + + payload := url.Values { + "token": {c.Token}, + "result": {string(resultEnc)}, + }.Encode() + + req, err := http.NewRequest( + http.MethodPost, + c.ServerUrl + "/task/complete", + strings.NewReader(payload)) + if err != nil { return } + + res, err := serverClient.Do(req) + if err != nil { return } + res.Body.Close() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("failed to cancel task: %s", res.Status) + } + + return +} + +func (c *Config) CancelTask(websiteId uint64) (err error) { + form := url.Values{ + "token": {c.Token}, + "website_id": {strconv.FormatUint(websiteId, 10)}, + } + encForm := form.Encode() + + req, err := http.NewRequest( + http.MethodPost, + c.ServerUrl + "/task/cancel", + strings.NewReader(encForm)) + if err != nil { return } + + res, err := serverClient.Do(req) + if err != nil { return } + res.Body.Close() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("failed to cancel task: %s", res.Status) + } + + return +} diff --git a/remote_http.go b/remote_http.go new file mode 100644 index 0000000..ed3bad9 --- /dev/null +++ b/remote_http.go @@ -0,0 +1,32 @@ +package main + +/*import ( + "net/http" + "path" + "time" +) + +const ( + maxRetries = 2 + timeout = 25 * time.Second +) + +type HttpDirectory struct { + +} + +func (h *HttpDirectory) ListDir(filePath string) { + dir := path.Base(filePath) + +} + +func requestFile(url string, baseUrl string) (err error) { + retries := maxRetries + for retries > 0 { + res, err := http.Head(url) + if err != nil { return } + + } +} + +*/ diff --git a/tasks.go b/tasks.go new file mode 100644 index 0000000..a92e35a --- /dev/null +++ b/tasks.go @@ -0,0 +1,16 @@ +package main + +import "time" + +type Task struct { + WebsiteId int `json:"website_id"` + Url string `json:"url"` +} + +type TaskResult struct { + StatusCode int `json:"status_code"` + FileCount uint64 `json:"file_count"` + StartTime time.Time `json:"start_time"` + EndTime time.Time `json:"end_time"` + WebsiteId uint64 `json:"website_id"` +}