diff --git a/config.go b/config.go new file mode 100644 index 0000000..9cc2a8b --- /dev/null +++ b/config.go @@ -0,0 +1,54 @@ +package main + +import ( + "github.com/sirupsen/logrus" + "github.com/spf13/viper" +) + +var config struct { + ServerUrl string + Token string + Retries int + Workers int +} + +const ( + ConfServerUrl = "server_url" + ConfToken = "token" + ConfRetries = "retries" + ConfWorkers = "workers" +) + +func prepareConfig() { + viper.SetDefault(ConfRetries, 3) + viper.SetDefault(ConfWorkers, 50) +} + +func readConfig() { + viper.AddConfigPath(".") + viper.SetConfigName("config") + err := viper.ReadInConfig() + if err != nil { + logrus.Fatal(err) + } + + config.ServerUrl = viper.GetString(ConfServerUrl) + if config.ServerUrl == "" { + logrus.Fatal("config: server_url not set!") + } + + config.Token = viper.GetString(ConfToken) + if config.Token == "" { + logrus.Fatal("config: token not set!") + } + + config.Retries = viper.GetInt(ConfRetries) + if config.Retries < 0 { + config.Retries = 1 << 31 + } + + config.Workers = viper.GetInt(ConfWorkers) + if config.Workers <= 0 { + logrus.Fatal("config: illegal value %d for workers!", config.Workers) + } +} diff --git a/config.yml b/config.yml new file mode 100644 index 0000000..c66122c --- /dev/null +++ b/config.yml @@ -0,0 +1,2 @@ +server_url: localhost:6969 +token: abc \ No newline at end of file diff --git a/crawl.go b/crawl.go index 769dc4b..8e4582c 100644 --- a/crawl.go +++ b/crawl.go @@ -1,33 +1,9 @@ package main import ( - "bytes" - "github.com/sirupsen/logrus" - "github.com/valyala/fasthttp" - "golang.org/x/net/html" - "golang.org/x/net/html/atom" - "net/url" - "os" - "path" - "strconv" - "strings" - "sync" - "sync/atomic" "time" ) -const ( - nConns = 100 -) - -var client = fasthttp.Client{} -var wait sync.WaitGroup - -var visited int64 - -var in chan<- url.URL -var out <-chan url.URL - type File struct { Name string `json:"name"` Size int64 `json:"size"` @@ -36,318 +12,7 @@ type File struct { IsDir bool `json:"-"` } -func main() { - if len(os.Args) != 2 { - println("Usage: ./crawl ") - os.Exit(1) - } - - in, out = makeInfinite() - - go func() { - var visitedLast int64 = 0 - for range time.NewTicker(time.Second).C { - visitedNow := atomic.LoadInt64(&visited) - logrus. - WithField("per_second", visitedNow - visitedLast). - WithField("total", visitedNow). - Info("Tick") - visitedLast = visitedNow - } - }() - - base, _ := url.Parse(os.Args[1]) - in <- *base - wait.Add(nConns) - for i := 0; i < nConns; i++ { - go worker() - } - wait.Wait() +type CrawlResult struct { + FileCount int + Status string } - -func worker() { - for u := range out { - // File - var fil File - if strings.HasSuffix(u.Path, "/") { - // Dir - links, err := listDir(u, &fil) - if err != nil { - logrus.WithError(err). - WithField("url", u.String()). - Error("Failed getting dir") - continue - } - for _, sub := range links { - subrefi, err := url.Parse(sub) - subref := *subrefi - // TODO Print errors - if err != nil { continue } - abs := *u.ResolveReference(&subref) - // TODO Check if host changed - in <- abs - } - //logrus.Infof("LISTED %s", u.Path) - } else { - err := fileInfo(u, &fil) - if err != nil { - logrus.WithError(err). - WithField("url", u.String()). - Error("Failed getting file") - continue - } - } - } - wait.Done() -} - -func listDir(u url.URL, f *File) (links []string, err error) { - f.IsDir = true - u.Path = path.Clean(u.Path) - // TODO Handle external links - f.Name = path.Base(u.Path) - f.Path = strings.TrimLeft(u.Path, "/") - - req := fasthttp.AcquireRequest() - req.SetRequestURI(u.String()) - - res := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(res) - - err = client.Do(req, res) - fasthttp.ReleaseRequest(req) - - if err != nil { - logrus.Error(err) - return - } - - doc := html.NewTokenizer(bytes.NewReader(res.Body())) - - var linkHref string - var linkTexts []string - for { - tokenType := doc.Next() - token := doc.Token() - if tokenType == html.ErrorToken { - break - } - - switch tokenType { - case html.StartTagToken: - if token.DataAtom == atom.A { - for _, attr := range token.Attr { - if attr.Key == "href" { - linkHref = attr.Val - break - } - } - } - - case html.TextToken: - if linkHref != "" { - linkTexts = append(linkTexts, token.Data) - } - - case html.EndTagToken: - if linkHref != "" && token.DataAtom == atom.A { - // Copy params - href := linkHref - linkText := strings.Join(linkTexts, " ") - - // Reset params - linkHref = "" - linkTexts = nil - - // TODO Optimized decision tree - for _, entry := range urlBlackList { - if href == entry { - goto nextToken - } - } - for _, entry := range urlPartBlackList { - if strings.Contains(href, entry) { - goto nextToken - } - } - for _, entry := range fileNameBlackList { - if strings.Contains(linkText, entry) { - goto nextToken - } - } - - links = append(links, href) - } - } - - nextToken: - } - - atomic.AddInt64(&visited, 1) - - return -} - -func fileInfo(u url.URL, f *File) (err error) { - f.IsDir = false - u.Path = path.Clean(u.Path) - f.Name = path.Base(u.Path) - f.Path = strings.Trim(u.Path, "/") - - req := fasthttp.AcquireRequest() - req.Header.SetMethod("HEAD") - req.SetRequestURI(u.String()) - - res := fasthttp.AcquireResponse() - res.SkipBody = true - defer fasthttp.ReleaseResponse(res) - - err = client.Do(req, res) - fasthttp.ReleaseRequest(req) - - if err != nil { return } - - // TODO Inefficient af - header := res.Header.Header() - f.ParseHeader(header) - - atomic.AddInt64(&visited, 1) - - return nil -} - -func (f *File) ParseHeader(h []byte) { - var k1, k2 int - var v1, v2 int - - // Simple finite state machine - state := 0 - for i, b := range h { - switch state { - case 0: - if b == byte(':') { - state = 1 - k2 = i - } - - case 1: - state = 2 - - case 2: - state = 3 - v1 = i - - case 3: - if b == byte('\r') { - state = 4 - } - - case 4: - state = 0 - v2 = i - 1 - - key := string(h[k1:k2]) - val := string(h[v1:v2]) - k1 = i - - f.applyHeader(key, val) - } - } - -} - -func (f *File) applyHeader(k, v string) { - switch k { - case "content-length": - size, err := strconv.ParseInt(v, 10, 64) - if err != nil { break } - if size < 0 { break } - f.Size = size - - case "last-modified": - var err error - f.MTime, err = time.Parse(time.RFC1123, v) - if err == nil { break } - f.MTime, err = time.Parse(time.RFC850, v) - if err == nil { break } - // TODO Parse asctime - f.MTime, err = time.Parse("2006-01-02", v[:10]) - if err == nil { break } - } -} - -func makeInfinite() (chan<- url.URL, <-chan url.URL) { - in := make(chan url.URL) - out := make(chan url.URL) - // Set up in and out queues - go func() { - var inQueue []url.URL - outCh := func() chan url.URL { - if len(inQueue) == 0 { - return nil - } - return out - } - for len(inQueue) > 0 || in != nil { - if len(inQueue) == 0 { - v, ok := <-in - if !ok { - in = nil - } else { - inQueue = append(inQueue, v) - } - } else { - select { - case v, ok := <-in: - if !ok { - in = nil - } else { - inQueue = append(inQueue, v) - } - case outCh() <- inQueue[0]: - inQueue = inQueue[1:] - } - } - } - close(out) - }() - return in, out -} - -var urlBlackList = [...]string { - "", - " ", - ".", - "..", - "/", -} - -var urlPartBlackList = [...]string { - "?C=N&O=D", - "?C=M&O=A", - "?C=S&O=A", - "?C=D&O=A", - "?C=N;O=D", - "?C=M;O=A", - "?C=M&O=D", - "?C=S;O=A", - "?C=S&O=D", - "?C=D;O=A", - "?MA", - "?SA", - "?DA", - "?ND", - "?C=N&O=A", - "?C=N&O=A", - "?M=A", - "?N=D", - "?S=A", - "?D=A", -} - -var fileNameBlackList = [...]string { - "Parent Directory", - " Parent Directory", - "../", -} - diff --git a/crawl_http.go b/crawl_http.go new file mode 100644 index 0000000..28f2a1f --- /dev/null +++ b/crawl_http.go @@ -0,0 +1,247 @@ +package main + +import ( + "bytes" + "github.com/sirupsen/logrus" + "github.com/valyala/fasthttp" + "golang.org/x/net/html" + "golang.org/x/net/html/atom" + "net/url" + "path" + "strconv" + "strings" + "sync" + "time" +) + +var client fasthttp.Client + +type RemoteDir struct { + Wait sync.WaitGroup + BaseUri url.URL + lock sync.Mutex + Files []File +} + +func NewRemoteDir(u url.URL) *RemoteDir { + return &RemoteDir{ BaseUri: u } +} + +func GetDir(u url.URL, f *File) (links []url.URL, err error) { + f.IsDir = true + u.Path = path.Clean(u.Path) + // TODO Handle external links + f.Name = path.Base(u.Path) + f.Path = strings.TrimLeft(u.Path, "/") + + req := fasthttp.AcquireRequest() + req.SetRequestURI(u.String()) + + res := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(res) + + err = client.Do(req, res) + fasthttp.ReleaseRequest(req) + + if err != nil { + logrus.Error(err) + return + } + + doc := html.NewTokenizer(bytes.NewReader(res.Body())) + + var linkHref string + var linkTexts []string + for { + tokenType := doc.Next() + token := doc.Token() + if tokenType == html.ErrorToken { + break + } + + switch tokenType { + case html.StartTagToken: + if token.DataAtom == atom.A { + for _, attr := range token.Attr { + if attr.Key == "href" { + linkHref = attr.Val + break + } + } + } + + case html.TextToken: + if linkHref != "" { + linkTexts = append(linkTexts, token.Data) + } + + case html.EndTagToken: + if linkHref != "" && token.DataAtom == atom.A { + // Copy params + href := linkHref + linkText := strings.Join(linkTexts, " ") + + // Reset params + linkHref = "" + linkTexts = nil + + // TODO Optimized decision tree + for _, entry := range urlBlackList { + if href == entry { + goto nextToken + } + } + for _, entry := range urlPartBlackList { + if strings.Contains(href, entry) { + goto nextToken + } + } + for _, entry := range fileNameBlackList { + if strings.Contains(linkText, entry) { + goto nextToken + } + } + + subref, err := url.Parse(href) + if err != nil { continue } + + link := *u.ResolveReference(subref) + + if link.Scheme != u.Scheme || + link.Host != u.Host { + continue + } + + links = append(links, link) + } + } + + nextToken: + } + + return +} + +func GetFile(u url.URL, f *File) (err error) { + f.IsDir = false + u.Path = path.Clean(u.Path) + f.Name = path.Base(u.Path) + f.Path = strings.Trim(u.Path, "/") + + req := fasthttp.AcquireRequest() + req.Header.SetMethod("HEAD") + req.SetRequestURI(u.String()) + + res := fasthttp.AcquireResponse() + res.SkipBody = true + defer fasthttp.ReleaseResponse(res) + + err = client.Do(req, res) + fasthttp.ReleaseRequest(req) + + if err != nil { return } + + // TODO Inefficient af + header := res.Header.Header() + f.ParseHeader(header) + + return nil +} + +func (f *File) ParseHeader(h []byte) { + var k1, k2 int + var v1, v2 int + + // Simple finite state machine + state := 0 + for i, b := range h { + switch state { + case 0: + if b == byte(':') { + state = 1 + k2 = i + } + + case 1: + state = 2 + + case 2: + state = 3 + v1 = i + + case 3: + if b == byte('\r') { + state = 4 + } + + case 4: + state = 0 + v2 = i - 1 + + key := string(h[k1:k2]) + val := string(h[v1:v2]) + k1 = i + + f.applyHeader(key, val) + } + } + +} + +func (f *File) applyHeader(k, v string) { + switch k { + case "content-length": + size, err := strconv.ParseInt(v, 10, 64) + if err != nil { break } + if size < 0 { break } + f.Size = size + + case "last-modified": + var err error + f.MTime, err = time.Parse(time.RFC1123, v) + if err == nil { break } + f.MTime, err = time.Parse(time.RFC850, v) + if err == nil { break } + // TODO Parse asctime + f.MTime, err = time.Parse("2006-01-02", v[:10]) + if err == nil { break } + } +} + +var urlBlackList = [...]string { + "", + " ", + ".", + "..", + "/", +} + +var urlPartBlackList = [...]string { + "?C=N&O=D", + "?C=M&O=A", + "?C=S&O=A", + "?C=D&O=A", + "?C=N;O=D", + "?C=M;O=A", + "?C=M&O=D", + "?C=S;O=A", + "?C=S&O=D", + "?C=D;O=A", + "?MA", + "?SA", + "?DA", + "?ND", + "?C=N&O=A", + "?C=N&O=A", + "?M=A", + "?N=D", + "?S=A", + "?D=A", +} + +var fileNameBlackList = [...]string { + "Parent Directory", + " Parent Directory", + "../", +} + diff --git a/crawler.go b/crawler.go deleted file mode 100644 index 092b3bc..0000000 --- a/crawler.go +++ /dev/null @@ -1,47 +0,0 @@ -package main - -import ( - "fmt" - "net/url" -) - -const ( - maxTimeoutRetries = 3 -) - -type File struct { - Name string `json:"name"` - Size int64 `json:"size"` - Mtime int `json:"mtime"` - Path string `json:"path"` - IsDir bool `json:"-"` -} - -type RemoteDir interface { - ListDir(path string) -} - -func GetRemoteDir(u url.URL) (RemoteDir, error) { - switch u.Scheme { - case "http", "https": - return nil, nil //&HttpDirectory{}, nil - default: - return nil, fmt.Errorf("unsupported scheme: %s", u.Scheme) - } -} - -type CrawlResult struct { - FileCount int - Status string -} - -type RemoteDirCrawler struct { - Url string - MaxThreads int - // CrawledPaths - StatusCode string -} - -func (r *RemoteDirCrawler) CrawlDir(outFile string) CrawlResult { - return CrawlResult{} -} diff --git a/main.go b/main.go index 532ca36..ba8a427 100644 --- a/main.go +++ b/main.go @@ -1,23 +1,25 @@ package main import ( - "github.com/sirupsen/logrus" - "github.com/spf13/viper" + "context" + "net/url" ) -type Config struct { - ServerUrl string - Token string -} - - -func main2() { - var err error - - viper.SetConfigName("config.yml") - viper.SetConfigType("yml") - err = viper.ReadInConfig() - if err != nil { - logrus.Fatal(err) - } +func main() { + prepareConfig() + readConfig() + + c := context.Background() + + remotes := make(chan *RemoteDir) + go Schedule(c, remotes) + + u, _ := url.Parse("http://mine.terorie.com:420/") + remote := NewRemoteDir(*u) + + globalWait.Add(1) + remotes <- remote + + // Wait for all jobs to finish + globalWait.Wait() } diff --git a/manager.go b/manager.go index a04aa32..7c86ef9 100644 --- a/manager.go +++ b/manager.go @@ -21,13 +21,13 @@ const ( var serverClient = http.DefaultClient -func (c *Config) FetchTask() (t *Task, err error) { - escToken, _ := json.Marshal(c.Token) +func FetchTask() (t *Task, err error) { + escToken, _ := json.Marshal(config.Token) payload := `{"token":` + string(escToken) + `}` req, err := http.NewRequest( http.MethodPost, - c.ServerUrl + "/task/get", + config.ServerUrl + "/task/get", strings.NewReader(payload)) if err != nil { return } @@ -47,7 +47,7 @@ func (c *Config) FetchTask() (t *Task, err error) { return } -func (c *Config) PushResult(result *TaskResult) (err error) { +func PushResult(result *TaskResult) (err error) { filePath := filepath.Join( ".", "crawled", fmt.Sprintf("%d.json", result.WebsiteId)) @@ -63,20 +63,20 @@ func (c *Config) PushResult(result *TaskResult) (err error) { } defer f.Close() - err = c.uploadChunks(result.WebsiteId, f) + err = uploadChunks(result.WebsiteId, f) if err != nil { logrus.Errorf("Failed to upload file list: %s", err) - err2 := c.CancelTask(result.WebsiteId) + err2 := CancelTask(result.WebsiteId) if err2 != nil { logrus.Error(err2) } return } - err = c.uploadResult(result) + err = uploadResult(result) if err != nil { logrus.Errorf("Failed to upload result: %s", err) - err2 := c.CancelTask(result.WebsiteId) + err2 := CancelTask(result.WebsiteId) if err2 != nil { logrus.Error(err2) } @@ -86,7 +86,7 @@ func (c *Config) PushResult(result *TaskResult) (err error) { return } -func (c *Config) uploadChunks(websiteId uint64, f *os.File) (err error) { +func uploadChunks(websiteId uint64, f *os.File) (err error) { for iter := 1; iter > 0; iter++ { // TODO Stream with io.Pipe? var b bytes.Buffer @@ -94,7 +94,7 @@ func (c *Config) uploadChunks(websiteId uint64, f *os.File) (err error) { multi := multipart.NewWriter(&b) // Set upload fields - err = multi.WriteField("token", c.Token) + err = multi.WriteField("token", config.Token) if err != nil { return } err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId)) if err != nil { return } @@ -112,7 +112,7 @@ func (c *Config) uploadChunks(websiteId uint64, f *os.File) (err error) { req, err := http.NewRequest( http.MethodPost, - c.ServerUrl + "/task/upload", + config.ServerUrl + "/task/upload", &b) if err != nil { return err } @@ -131,18 +131,18 @@ func (c *Config) uploadChunks(websiteId uint64, f *os.File) (err error) { return } -func (c *Config) uploadResult(result *TaskResult) (err error) { +func uploadResult(result *TaskResult) (err error) { resultEnc, err := json.Marshal(result) if err != nil { panic(err) } payload := url.Values { - "token": {c.Token}, + "token": {config.Token}, "result": {string(resultEnc)}, }.Encode() req, err := http.NewRequest( http.MethodPost, - c.ServerUrl + "/task/complete", + config.ServerUrl + "/task/complete", strings.NewReader(payload)) if err != nil { return } @@ -157,16 +157,16 @@ func (c *Config) uploadResult(result *TaskResult) (err error) { return } -func (c *Config) CancelTask(websiteId uint64) (err error) { +func CancelTask(websiteId uint64) (err error) { form := url.Values{ - "token": {c.Token}, + "token": {config.Token}, "website_id": {strconv.FormatUint(websiteId, 10)}, } encForm := form.Encode() req, err := http.NewRequest( http.MethodPost, - c.ServerUrl + "/task/cancel", + config.ServerUrl + "/task/cancel", strings.NewReader(encForm)) if err != nil { return } diff --git a/remote_http.go b/remote_http.go deleted file mode 100644 index ed3bad9..0000000 --- a/remote_http.go +++ /dev/null @@ -1,32 +0,0 @@ -package main - -/*import ( - "net/http" - "path" - "time" -) - -const ( - maxRetries = 2 - timeout = 25 * time.Second -) - -type HttpDirectory struct { - -} - -func (h *HttpDirectory) ListDir(filePath string) { - dir := path.Base(filePath) - -} - -func requestFile(url string, baseUrl string) (err error) { - retries := maxRetries - for retries > 0 { - res, err := http.Head(url) - if err != nil { return } - - } -} - -*/ diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 0000000..853f64b --- /dev/null +++ b/scheduler.go @@ -0,0 +1,111 @@ +package main + +import ( + "context" + "github.com/sirupsen/logrus" + "net/url" + "sync/atomic" + "time" +) + +type Job struct { + Remote *RemoteDir + Uri url.URL + UriStr string + Fails int +} + +func Schedule(c context.Context, remotes <-chan *RemoteDir) { + in, out := makeJobBuffer() + wCtx := WorkerContext{ in, out } + for i := 0; i < config.Workers; i++ { + go wCtx.Worker() + } + go Stats(c) + + for { + select { + case <-c.Done(): + close(in) + return + + case remote := <-remotes: + // Enqueue initial job + queueJob(in, Job{ + Remote: remote, + Uri: remote.BaseUri, + UriStr: remote.BaseUri.String(), + Fails: 0, + }) + globalWait.Done() + // Upload result when ready + go remote.Watch() + } + } +} + +func (r *RemoteDir) Watch() { + // Wait for all jobs on remote to finish + r.Wait.Wait() +} + +func Stats(c context.Context) { + var startedLast uint64 = 0 + ticker := time.NewTicker(time.Second).C + for { + select { + case <-ticker: + startedNow := atomic.LoadUint64(&totalStarted) + logrus.WithFields(logrus.Fields{ + "per_second": startedNow - startedLast, + "done": atomic.LoadUint64(&totalDone), + "retries": atomic.LoadUint64(&totalRetries), + "aborted": atomic.LoadUint64(&totalAborted), + }).Info("Stats") + + startedLast = startedNow + + case <-c.Done(): + return + } + } +} + +func makeJobBuffer() (chan<- Job, <-chan Job) { + in := make(chan Job) + out := make(chan Job) + go bufferJobs(in, out) + return in, out +} + +func bufferJobs(in chan Job, out chan Job) { + var inQueue []Job + outCh := func() chan Job { + if len(inQueue) == 0 { + return nil + } + return out + } + for len(inQueue) > 0 || in != nil { + if len(inQueue) == 0 { + v, ok := <-in + if !ok { + in = nil + } else { + inQueue = append(inQueue, v) + } + } else { + select { + case v, ok := <-in: + if !ok { + in = nil + } else { + inQueue = append(inQueue, v) + } + case outCh() <- inQueue[0]: + inQueue = inQueue[1:] + } + } + } + close(out) +} diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..002182c --- /dev/null +++ b/worker.go @@ -0,0 +1,104 @@ +package main + +import ( + "github.com/sirupsen/logrus" + "strings" + "sync" + "sync/atomic" +) + +var totalStarted uint64 +var totalDone uint64 +var totalRetries uint64 +var totalAborted uint64 + +var globalWait sync.WaitGroup + +type WorkerContext struct { + in chan<- Job + out <-chan Job +} + +func (w WorkerContext) Worker() { + for job := range w.out { + w.step(job) + } +} + +func (w WorkerContext) step(job Job) { + defer finishJob(&job) + + var f File + + newJobs, err := DoJob(&job, &f) + atomic.AddUint64(&totalStarted, 1) + + if err != nil { + job.Fails++ + + logrus.WithFields(logrus.Fields{ + "error": err.Error(), + "url": job.UriStr, + }).Warningf("Crawl error: %s", err) + + if job.Fails > config.Retries { + atomic.AddUint64(&totalAborted, 1) + logrus.WithField("url", job.UriStr). + Errorf("Giving up after %d fails", job.Fails) + } else { + atomic.AddUint64(&totalRetries, 1) + queueJob(w.in, job) + } + return + } + + atomic.AddUint64(&totalDone, 1) + for _, job := range newJobs { + queueJob(w.in, job) + } + + job.Remote.Files = append(job.Remote.Files, f) +} + +func DoJob(job *Job, f *File) (newJobs []Job, err error) { + // File + if strings.HasSuffix(job.Uri.Path, "/") { + // Dir + links, err := GetDir(job.Uri, f) + if err != nil { + logrus.WithError(err). + WithField("url", job.Uri.String()). + Error("Failed getting dir") + return nil, err + } + for _, link := range links { + job.Remote.Wait.Add(1) + newJobs = append(newJobs, Job{ + Remote: job.Remote, + Uri: link, + UriStr: link.String(), + Fails: 0, + }) + } + } else { + err := GetFile(job.Uri, f) + if err != nil { + logrus.WithError(err). + WithField("url", job.Uri.String()). + Error("Failed getting file") + return nil, err + } + } + return +} + +func queueJob(in chan<- Job, job Job) { + job.Remote.Wait.Add(1) + globalWait.Add(1) + in <- job +} + +func finishJob(job *Job) { + job.Remote.Wait.Done() + globalWait.Done() +}