Merge branch 'rewrite'

This commit is contained in:
Richard Patel 2018-10-27 17:27:52 +02:00
commit 5ac9fc10a1
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
5 changed files with 300 additions and 0 deletions

47
crawler.go Normal file
View File

@ -0,0 +1,47 @@
package main
import (
"fmt"
"net/url"
)
const (
maxTimeoutRetries = 3
)
type File struct {
Name string `json:"name"`
Size int64 `json:"size"`
Mtime int `json:"mtime"`
Path string `json:"path"`
IsDir bool `json:"-"`
}
type RemoteDir interface {
ListDir(path string)
}
func GetRemoteDir(u url.URL) (RemoteDir, error) {
switch u.Scheme {
case "http", "https":
return nil, nil //&HttpDirectory{}, nil
default:
return nil, fmt.Errorf("unsupported scheme: %s", u.Scheme)
}
}
type CrawlResult struct {
FileCount int
Status string
}
type RemoteDirCrawler struct {
Url string
MaxThreads int
// CrawledPaths
StatusCode string
}
func (r *RemoteDirCrawler) CrawlDir(outFile string) CrawlResult {
return CrawlResult{}
}

23
main.go Normal file
View File

@ -0,0 +1,23 @@
package main
import (
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
type Config struct {
ServerUrl string
Token string
}
func main2() {
var err error
viper.SetConfigName("config.yml")
viper.SetConfigType("yml")
err = viper.ReadInConfig()
if err != nil {
logrus.Fatal(err)
}
}

182
manager.go Normal file
View File

@ -0,0 +1,182 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/sirupsen/logrus"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
)
const (
fileListChunkSize int64 = 5000000 // 5 mb
)
var serverClient = http.DefaultClient
func (c *Config) FetchTask() (t *Task, err error) {
escToken, _ := json.Marshal(c.Token)
payload := `{"token":` + string(escToken) + `}`
req, err := http.NewRequest(
http.MethodPost,
c.ServerUrl + "/task/get",
strings.NewReader(payload))
if err != nil { return }
res, err := serverClient.Do(req)
if err != nil { return }
defer res.Body.Close()
if res.StatusCode != 200 {
err = fmt.Errorf("http %s", res.Status)
return
}
t = new(Task)
err = json.NewDecoder(res.Body).Decode(t)
if err != nil { return }
return
}
func (c *Config) PushResult(result *TaskResult) (err error) {
filePath := filepath.Join(
".", "crawled",
fmt.Sprintf("%d.json", result.WebsiteId))
defer os.Remove(filePath)
f, err := os.Open(filePath)
if os.IsNotExist(err) {
err = fmt.Errorf("cannot upload result: %s does not exist", filePath)
return
} else if err != nil {
return
}
defer f.Close()
err = c.uploadChunks(result.WebsiteId, f)
if err != nil {
logrus.Errorf("Failed to upload file list: %s", err)
err2 := c.CancelTask(result.WebsiteId)
if err2 != nil {
logrus.Error(err2)
}
return
}
err = c.uploadResult(result)
if err != nil {
logrus.Errorf("Failed to upload result: %s", err)
err2 := c.CancelTask(result.WebsiteId)
if err2 != nil {
logrus.Error(err2)
}
return
}
return
}
func (c *Config) uploadChunks(websiteId uint64, f *os.File) (err error) {
for iter := 1; iter > 0; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
multi := multipart.NewWriter(&b)
// Set upload fields
err = multi.WriteField("token", c.Token)
if err != nil { return }
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
_, err = io.CopyN(formFile, f, fileListChunkSize)
if err == io.EOF {
break
} else if err == io.ErrUnexpectedEOF {
err = nil
// Break at end of iteration
iter = -420
}
req, err := http.NewRequest(
http.MethodPost,
c.ServerUrl + "/task/upload",
&b)
if err != nil { return err }
res, err := serverClient.Do(req)
if err != nil { return err }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to upload list part %d: %s",
iter, res.Status)
}
logrus.Infof("Uploading file list part %d: %s",
iter, res.Status)
}
return
}
func (c *Config) uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
payload := url.Values {
"token": {c.Token},
"result": {string(resultEnc)},
}.Encode()
req, err := http.NewRequest(
http.MethodPost,
c.ServerUrl + "/task/complete",
strings.NewReader(payload))
if err != nil { return }
res, err := serverClient.Do(req)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
}
return
}
func (c *Config) CancelTask(websiteId uint64) (err error) {
form := url.Values{
"token": {c.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
}
encForm := form.Encode()
req, err := http.NewRequest(
http.MethodPost,
c.ServerUrl + "/task/cancel",
strings.NewReader(encForm))
if err != nil { return }
res, err := serverClient.Do(req)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
}
return
}

32
remote_http.go Normal file
View File

@ -0,0 +1,32 @@
package main
/*import (
"net/http"
"path"
"time"
)
const (
maxRetries = 2
timeout = 25 * time.Second
)
type HttpDirectory struct {
}
func (h *HttpDirectory) ListDir(filePath string) {
dir := path.Base(filePath)
}
func requestFile(url string, baseUrl string) (err error) {
retries := maxRetries
for retries > 0 {
res, err := http.Head(url)
if err != nil { return }
}
}
*/

16
tasks.go Normal file
View File

@ -0,0 +1,16 @@
package main
import "time"
type Task struct {
WebsiteId int `json:"website_id"`
Url string `json:"url"`
}
type TaskResult struct {
StatusCode int `json:"status_code"`
FileCount uint64 `json:"file_count"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
}