imhashdb/hasher/hasher.go

122 lines
2.2 KiB
Go

package hasher
import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding/json"
"hash/crc32"
"strconv"
"strings"
"time"
. "github.com/simon987/imhashdb"
"go.uber.org/zap"
)
type Task struct {
Urls []string `json:"_urls"`
Id int64 `json:"_id"`
}
func dispatchFromQueue(pattern string, queue chan []string) error {
for {
keys, err := Rdb.Keys(pattern).Result()
if err != nil {
Logger.Error("Could not get keys for pattern", zap.String("pattern", pattern))
continue
}
if len(keys) == 0 {
time.Sleep(time.Second * 1)
continue
}
rawTask, err := Rdb.BLPop(time.Second*30, keys...).Result()
if err != nil {
continue
}
//TODO: put in WIP list, resume on crash
queue <- rawTask
}
return nil
}
func worker(queue chan []string) {
for rawTask := range queue {
computeAndStore(rawTask)
}
}
func computeAndStore(rawTask []string) {
var task Task
err := json.Unmarshal([]byte(rawTask[1]), &task)
if err != nil {
Logger.Error("Corrupt task body", zap.String("body", rawTask[1]))
return
}
meta := []Meta{{
RetrievedAt: time.Now().Unix(),
Id: rawTask[0][len(RedisPrefix):] + "." + strconv.FormatInt(task.Id, 10),
Meta: []byte(rawTask[1]),
}}
for _, link := range task.Urls {
for _, turl := range TransformLink(link, &meta) {
if !IsImageLink(turl) {
Logger.Debug("Ignoring non-image URL", zap.String("link", link))
continue
}
data, err := Fetch(turl)
if err != nil {
continue
}
if len(data) == 0 {
continue
}
h, err := ComputeHash(data)
if err != nil {
return
}
Store(&Entry{
H: h,
Size: len(data),
Sha256: sha256.Sum256(data),
Sha1: sha1.Sum(data),
Md5: md5.Sum(data),
Crc32: crc32.ChecksumIEEE(data),
Meta: meta,
Url: trimUrl(turl),
})
}
}
}
func trimUrl(link string) string {
if strings.HasPrefix(link, "https://") {
return link[len("https://"):]
} else if strings.HasPrefix(link, "http://") {
return link[len("http://"):]
}
return link
}
func Main() error {
queue := make(chan []string)
for i := 0; i < Conf.HasherConcurrency; i++ {
go worker(queue)
}
return dispatchFromQueue("q.reddit.*", queue)
}