From ddfdce9d0f3f44c4ed580fbe79b14bc04b445ff3 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Sun, 28 Oct 2018 13:43:45 +0100 Subject: [PATCH] Refactor a bit --- .gitignore | 3 +- crawl.go | 274 ++++++++++++++++++++++++++++++++++++++-- crawl_http.go | 274 ---------------------------------------- scheduler.go | 5 + manager.go => server.go | 0 stats.go | 8 ++ 6 files changed, 280 insertions(+), 284 deletions(-) delete mode 100644 crawl_http.go rename manager.go => server.go (100%) diff --git a/.gitignore b/.gitignore index d827413..ed039da 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /.idea/ -.DS_Store \ No newline at end of file +.DS_Store +/oddb-go \ No newline at end of file diff --git a/crawl.go b/crawl.go index 8e4582c..30e63d7 100644 --- a/crawl.go +++ b/crawl.go @@ -1,18 +1,274 @@ package main import ( + "bytes" + "errors" + "fmt" + "github.com/sirupsen/logrus" + "github.com/valyala/fasthttp" + "golang.org/x/net/html" + "golang.org/x/net/html/atom" + "net/url" + "path" + "strconv" + "strings" + "sync" "time" ) -type File struct { - Name string `json:"name"` - Size int64 `json:"size"` - MTime time.Time `json:"mtime"` - Path string `json:"path"` - IsDir bool `json:"-"` +var client fasthttp.Client +var ErrRateLimit = errors.New("too many requests") +var ErrForbidden = errors.New("access denied") + +type RemoteDir struct { + Wait sync.WaitGroup + BaseUri url.URL + lock sync.Mutex + Files []File } -type CrawlResult struct { - FileCount int - Status string +func NewRemoteDir(u url.URL) *RemoteDir { + return &RemoteDir{ BaseUri: u } } + +func GetDir(j *Job, f *File) (links []url.URL, err error) { + f.IsDir = true + f.Name = path.Base(j.Uri.Path) + + req := fasthttp.AcquireRequest() + req.SetRequestURI(j.Uri.String()) + + res := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(res) + + err = client.Do(req, res) + fasthttp.ReleaseRequest(req) + + if err != nil { + logrus.Error(err) + return + } + + err = checkStatusCode(res.StatusCode()) + if err != nil { return } + + body := res.Body() + doc := html.NewTokenizer(bytes.NewReader(body)) + + var linkHref string + var linkTexts []string + for { + tokenType := doc.Next() + token := doc.Token() + if tokenType == html.ErrorToken { + break + } + + switch tokenType { + case html.StartTagToken: + if token.DataAtom == atom.A { + for _, attr := range token.Attr { + if attr.Key == "href" { + linkHref = attr.Val + break + } + } + } + + case html.TextToken: + if linkHref != "" { + linkTexts = append(linkTexts, token.Data) + } + + case html.EndTagToken: + if linkHref != "" && token.DataAtom == atom.A { + // Copy params + href := linkHref + linkText := strings.Join(linkTexts, " ") + + // Reset params + linkHref = "" + linkTexts = nil + + // TODO Optimized decision tree + for _, entry := range urlBlackList { + if href == entry { + goto nextToken + } + } + for _, entry := range urlPartBlackList { + if strings.Contains(href, entry) { + goto nextToken + } + } + for _, entry := range fileNameBlackList { + if strings.Contains(linkText, entry) { + goto nextToken + } + } + + subref, err := url.Parse(href) + if err != nil { continue } + + link := *j.Uri.ResolveReference(subref) + + if link.Scheme != j.Uri.Scheme || + link.Host != j.Uri.Host || + link.Path == j.Uri.Path || + !strings.HasPrefix(link.Path, j.Uri.Path) { + continue + } + + links = append(links, link) + } + } + + nextToken: + } + + return +} + +func GetFile(u url.URL, f *File) (err error) { + f.IsDir = false + u.Path = path.Clean(u.Path) + f.Name = path.Base(u.Path) + f.Path = strings.Trim(u.Path, "/") + + req := fasthttp.AcquireRequest() + req.Header.SetMethod("HEAD") + req.SetRequestURI(u.String()) + + res := fasthttp.AcquireResponse() + res.SkipBody = true + defer fasthttp.ReleaseResponse(res) + + err = client.Do(req, res) + fasthttp.ReleaseRequest(req) + + if err != nil { return } + + err = checkStatusCode(res.StatusCode()) + if err != nil { return } + + // TODO Inefficient af + header := res.Header.Header() + f.ParseHeader(header) + + return nil +} + +func (f *File) ParseHeader(h []byte) { + var k1, k2 int + var v1, v2 int + + // Simple finite state machine + state := 0 + for i, b := range h { + switch state { + case 0: + if b == byte(':') { + state = 1 + k2 = i + } + + case 1: + state = 2 + + case 2: + state = 3 + v1 = i + + case 3: + if b == byte('\r') { + state = 4 + } + + case 4: + state = 0 + v2 = i - 1 + + key := string(h[k1:k2]) + val := string(h[v1:v2]) + k1 = i + + f.applyHeader(key, val) + } + } + +} + +func (f *File) applyHeader(k, v string) { + switch k { + case "content-length": + size, err := strconv.ParseInt(v, 10, 64) + if err != nil { break } + if size < 0 { break } + f.Size = size + + case "last-modified": + var err error + f.MTime, err = time.Parse(time.RFC1123, v) + if err == nil { break } + f.MTime, err = time.Parse(time.RFC850, v) + if err == nil { break } + // TODO Parse asctime + f.MTime, err = time.Parse("2006-01-02", v[:10]) + if err == nil { break } + } +} + +func checkStatusCode(status int) error { + switch status { + case fasthttp.StatusOK: + return nil + + case fasthttp.StatusTooManyRequests: + return ErrRateLimit + + case fasthttp.StatusForbidden, + fasthttp.StatusUnauthorized: + return ErrForbidden + + default: + return fmt.Errorf("got HTTP status %d", status) + } +} + +var urlBlackList = [...]string { + "", + " ", + ".", + "..", + "/", +} + +var urlPartBlackList = [...]string { + "?C=N&O=D", + "?C=M&O=A", + "?C=S&O=A", + "?C=D&O=A", + "?C=N;O=D", + "?C=M;O=A", + "?C=M&O=D", + "?C=S;O=A", + "?C=S&O=D", + "?C=D;O=A", + "?MA", + "?SA", + "?DA", + "?ND", + "?C=N&O=A", + "?C=N&O=A", + "?M=A", + "?N=D", + "?S=A", + "?D=A", +} + +var fileNameBlackList = [...]string { + "Parent Directory", + " Parent Directory", + "../", +} + diff --git a/crawl_http.go b/crawl_http.go deleted file mode 100644 index 30e63d7..0000000 --- a/crawl_http.go +++ /dev/null @@ -1,274 +0,0 @@ -package main - -import ( - "bytes" - "errors" - "fmt" - "github.com/sirupsen/logrus" - "github.com/valyala/fasthttp" - "golang.org/x/net/html" - "golang.org/x/net/html/atom" - "net/url" - "path" - "strconv" - "strings" - "sync" - "time" -) - -var client fasthttp.Client -var ErrRateLimit = errors.New("too many requests") -var ErrForbidden = errors.New("access denied") - -type RemoteDir struct { - Wait sync.WaitGroup - BaseUri url.URL - lock sync.Mutex - Files []File -} - -func NewRemoteDir(u url.URL) *RemoteDir { - return &RemoteDir{ BaseUri: u } -} - -func GetDir(j *Job, f *File) (links []url.URL, err error) { - f.IsDir = true - f.Name = path.Base(j.Uri.Path) - - req := fasthttp.AcquireRequest() - req.SetRequestURI(j.Uri.String()) - - res := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(res) - - err = client.Do(req, res) - fasthttp.ReleaseRequest(req) - - if err != nil { - logrus.Error(err) - return - } - - err = checkStatusCode(res.StatusCode()) - if err != nil { return } - - body := res.Body() - doc := html.NewTokenizer(bytes.NewReader(body)) - - var linkHref string - var linkTexts []string - for { - tokenType := doc.Next() - token := doc.Token() - if tokenType == html.ErrorToken { - break - } - - switch tokenType { - case html.StartTagToken: - if token.DataAtom == atom.A { - for _, attr := range token.Attr { - if attr.Key == "href" { - linkHref = attr.Val - break - } - } - } - - case html.TextToken: - if linkHref != "" { - linkTexts = append(linkTexts, token.Data) - } - - case html.EndTagToken: - if linkHref != "" && token.DataAtom == atom.A { - // Copy params - href := linkHref - linkText := strings.Join(linkTexts, " ") - - // Reset params - linkHref = "" - linkTexts = nil - - // TODO Optimized decision tree - for _, entry := range urlBlackList { - if href == entry { - goto nextToken - } - } - for _, entry := range urlPartBlackList { - if strings.Contains(href, entry) { - goto nextToken - } - } - for _, entry := range fileNameBlackList { - if strings.Contains(linkText, entry) { - goto nextToken - } - } - - subref, err := url.Parse(href) - if err != nil { continue } - - link := *j.Uri.ResolveReference(subref) - - if link.Scheme != j.Uri.Scheme || - link.Host != j.Uri.Host || - link.Path == j.Uri.Path || - !strings.HasPrefix(link.Path, j.Uri.Path) { - continue - } - - links = append(links, link) - } - } - - nextToken: - } - - return -} - -func GetFile(u url.URL, f *File) (err error) { - f.IsDir = false - u.Path = path.Clean(u.Path) - f.Name = path.Base(u.Path) - f.Path = strings.Trim(u.Path, "/") - - req := fasthttp.AcquireRequest() - req.Header.SetMethod("HEAD") - req.SetRequestURI(u.String()) - - res := fasthttp.AcquireResponse() - res.SkipBody = true - defer fasthttp.ReleaseResponse(res) - - err = client.Do(req, res) - fasthttp.ReleaseRequest(req) - - if err != nil { return } - - err = checkStatusCode(res.StatusCode()) - if err != nil { return } - - // TODO Inefficient af - header := res.Header.Header() - f.ParseHeader(header) - - return nil -} - -func (f *File) ParseHeader(h []byte) { - var k1, k2 int - var v1, v2 int - - // Simple finite state machine - state := 0 - for i, b := range h { - switch state { - case 0: - if b == byte(':') { - state = 1 - k2 = i - } - - case 1: - state = 2 - - case 2: - state = 3 - v1 = i - - case 3: - if b == byte('\r') { - state = 4 - } - - case 4: - state = 0 - v2 = i - 1 - - key := string(h[k1:k2]) - val := string(h[v1:v2]) - k1 = i - - f.applyHeader(key, val) - } - } - -} - -func (f *File) applyHeader(k, v string) { - switch k { - case "content-length": - size, err := strconv.ParseInt(v, 10, 64) - if err != nil { break } - if size < 0 { break } - f.Size = size - - case "last-modified": - var err error - f.MTime, err = time.Parse(time.RFC1123, v) - if err == nil { break } - f.MTime, err = time.Parse(time.RFC850, v) - if err == nil { break } - // TODO Parse asctime - f.MTime, err = time.Parse("2006-01-02", v[:10]) - if err == nil { break } - } -} - -func checkStatusCode(status int) error { - switch status { - case fasthttp.StatusOK: - return nil - - case fasthttp.StatusTooManyRequests: - return ErrRateLimit - - case fasthttp.StatusForbidden, - fasthttp.StatusUnauthorized: - return ErrForbidden - - default: - return fmt.Errorf("got HTTP status %d", status) - } -} - -var urlBlackList = [...]string { - "", - " ", - ".", - "..", - "/", -} - -var urlPartBlackList = [...]string { - "?C=N&O=D", - "?C=M&O=A", - "?C=S&O=A", - "?C=D&O=A", - "?C=N;O=D", - "?C=M;O=A", - "?C=M&O=D", - "?C=S;O=A", - "?C=S&O=D", - "?C=D;O=A", - "?MA", - "?SA", - "?DA", - "?ND", - "?C=N&O=A", - "?C=N&O=A", - "?M=A", - "?N=D", - "?S=A", - "?D=A", -} - -var fileNameBlackList = [...]string { - "Parent Directory", - " Parent Directory", - "../", -} - diff --git a/scheduler.go b/scheduler.go index 2224c4a..e6a9ca3 100644 --- a/scheduler.go +++ b/scheduler.go @@ -3,6 +3,7 @@ package main import ( "context" "net/url" + "sync/atomic" ) type Job struct { @@ -13,6 +14,8 @@ type Job struct { LastError error } +var activeTasks int32 + func Schedule(c context.Context, remotes <-chan *RemoteDir) { in, out := makeJobBuffer() wCtx := WorkerContext{ @@ -32,6 +35,7 @@ func Schedule(c context.Context, remotes <-chan *RemoteDir) { case remote := <-remotes: // Enqueue initial job + atomic.AddInt32(&activeTasks, 1) wCtx.queueJob(Job{ Remote: remote, Uri: remote.BaseUri, @@ -48,6 +52,7 @@ func Schedule(c context.Context, remotes <-chan *RemoteDir) { func (r *RemoteDir) Watch() { // Wait for all jobs on remote to finish r.Wait.Wait() + atomic.AddInt32(&activeTasks, -1) } func makeJobBuffer() (chan<- Job, <-chan Job) { diff --git a/manager.go b/server.go similarity index 100% rename from manager.go rename to server.go diff --git a/stats.go b/stats.go index f78e557..1b12c27 100644 --- a/stats.go +++ b/stats.go @@ -13,6 +13,14 @@ var totalDone uint64 var totalRetries uint64 var totalAborted uint64 +type File struct { + Name string `json:"name"` + Size int64 `json:"size"` + MTime time.Time `json:"mtime"` + Path string `json:"path"` + IsDir bool `json:"-"` +} + func Stats(c context.Context) { var startedLast uint64 = 0 ticker := time.NewTicker(config.StatsInterval).C