imhashdb/hasher/hasher.go
2020-04-11 15:17:57 -04:00

130 lines
2.4 KiB
Go

package main
import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding/json"
"hash/crc32"
"strconv"
"strings"
"time"
. "github.com/simon987/imhashdb"
"go.uber.org/zap"
)
type Task struct {
Urls []string `json:"_urls"`
Id int64 `json:"_id"`
}
func dispatchFromQueue(pattern string, queue chan []string) {
for {
keys, err := Rdb.Keys(pattern).Result()
if err != nil {
Logger.Error("Could not get keys for pattern", zap.String("pattern", pattern))
continue
}
rawTask, err := Rdb.BLPop(time.Second*30, keys...).Result()
if err != nil {
continue
}
queue <- rawTask
}
}
func worker(queue chan []string) {
for rawTask := range queue {
computeAndStore(rawTask)
}
}
func computeAndStore(rawTask []string) {
var task Task
err := json.Unmarshal([]byte(rawTask[1]), &task)
if err != nil {
Logger.Error("Corrupt task body", zap.String("body", rawTask[1]))
return
}
meta := []Meta{{
RetrievedAt: time.Now().Unix(),
Id: rawTask[0][len(RedisPrefix):] + "." + strconv.FormatInt(task.Id, 10),
Meta: []byte(rawTask[1]),
}}
for _, link := range task.Urls {
for _, turl := range TransformLink(link, &meta) {
if !IsImageLink(turl) {
Logger.Debug("Ignoring non-image URL", zap.String("link", link))
continue
}
data, err := Fetch(turl)
if err != nil {
if !IsPermanentError(err) {
// Retry later
Logger.Debug("Will retry task later", zap.String("link", link))
Rdb.RPush(rawTask[0], rawTask[1])
}
continue
}
if len(data) == 0 {
continue
}
h, err := ComputeHash(data)
if err != nil {
return
}
Store(&Entry{
AHash: &h.AHash,
DHash: &h.DHash,
MHash: &h.MHash,
PHash: &h.PHash,
WHash: &h.WHash,
Size: len(data),
Sha256: sha256.Sum256(data),
Sha1: sha1.Sum(data),
Md5: md5.Sum(data),
Crc32: crc32.ChecksumIEEE(data),
Meta: meta,
Url: trimUrl(turl),
})
}
}
}
func trimUrl(link string) string {
if strings.HasPrefix(link, "https://") {
return link[len("https://"):]
} else if strings.HasPrefix(link, "http://") {
return link[len("http://"):]
}
return link
}
func main() {
Init()
_, err := Rdb.Ping().Result()
if err != nil {
Logger.Fatal("Could not connect to redis server")
}
queue := make(chan []string, 100)
for i := 0; i < Concurrency; i++ {
go worker(queue)
}
dispatchFromQueue("q.reddit.*", queue)
}