This commit is contained in:
2020-04-15 09:42:50 -04:00
parent 91f0c9b1f9
commit 5cd367456c
12 changed files with 273 additions and 166 deletions

View File

@@ -1,4 +1,4 @@
package main
package hasher
import (
"crypto/md5"
@@ -19,7 +19,7 @@ type Task struct {
Id int64 `json:"_id"`
}
func dispatchFromQueue(pattern string, queue chan []string) {
func dispatchFromQueue(pattern string, queue chan []string) error {
for {
keys, err := Rdb.Keys(pattern).Result()
@@ -33,8 +33,11 @@ func dispatchFromQueue(pattern string, queue chan []string) {
continue
}
//TODO: put in WIP list, resume on crash
queue <- rawTask
}
return nil
}
func worker(queue chan []string) {
@@ -66,11 +69,6 @@ func computeAndStore(rawTask []string) {
data, err := Fetch(turl)
if err != nil {
if !IsPermanentError(err) {
// Retry later
Logger.Debug("Will retry task later", zap.String("link", link))
Rdb.RPush(rawTask[0], rawTask[1])
}
continue
}
@@ -107,19 +105,12 @@ func trimUrl(link string) string {
return link
}
func main() {
Init()
func Main() error {
queue := make(chan []string, Conf.HasherConcurrency*2)
_, err := Rdb.Ping().Result()
if err != nil {
Logger.Fatal("Could not connect to redis server")
}
queue := make(chan []string, 100)
for i := 0; i < Concurrency; i++ {
for i := 0; i < Conf.HasherConcurrency; i++ {
go worker(queue)
}
dispatchFromQueue("q.reddit.*", queue)
return dispatchFromQueue("q.reddit.*", queue)
}