mirror of
https://github.com/simon987/imhashdb.git
synced 2025-12-15 23:49:01 +00:00
Initial commit
This commit is contained in:
125
hasher/hasher.go
Normal file
125
hasher/hasher.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/md5"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"hash/crc32"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
. "github.com/simon987/imhashdb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
Urls []string `json:"_urls"`
|
||||
Id int64 `json:"_id"`
|
||||
}
|
||||
|
||||
func dispatchFromQueue(pattern string, queue chan []string) {
|
||||
|
||||
for {
|
||||
keys, err := Rdb.Keys(pattern).Result()
|
||||
if err != nil {
|
||||
Logger.Error("Could not get keys for pattern", zap.String("pattern", pattern))
|
||||
continue
|
||||
}
|
||||
|
||||
rawTask, err := Rdb.BLPop(time.Second*30, keys...).Result()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
queue <- rawTask
|
||||
}
|
||||
}
|
||||
|
||||
func worker(queue chan []string) {
|
||||
for rawTask := range queue {
|
||||
computeAndStore(rawTask)
|
||||
}
|
||||
}
|
||||
|
||||
func computeAndStore(rawTask []string) {
|
||||
var task Task
|
||||
err := json.Unmarshal([]byte(rawTask[1]), &task)
|
||||
if err != nil {
|
||||
Logger.Error("Corrupt task body", zap.String("body", rawTask[1]))
|
||||
return
|
||||
}
|
||||
|
||||
meta := []Meta{{
|
||||
RetrievedAt: time.Now().Unix(),
|
||||
Id: rawTask[0][len(RedisPrefix):] + "." + strconv.FormatInt(task.Id, 10),
|
||||
Meta: []byte(rawTask[1]),
|
||||
}}
|
||||
|
||||
for _, link := range task.Urls {
|
||||
for _, turl := range TransformLink(link, &meta) {
|
||||
if !IsImageLink(turl) {
|
||||
Logger.Debug("Ignoring non-image URL", zap.String("link", link))
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := Fetch(turl)
|
||||
if err != nil {
|
||||
if !IsPermanentError(err) {
|
||||
// Retry later
|
||||
Logger.Debug("Will retry task later", zap.String("link", link))
|
||||
Rdb.RPush(rawTask[0], rawTask[1])
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
h, err := ComputeHash(data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
Store(&Entry{
|
||||
AHash: &h.AHash,
|
||||
DHash: &h.DHash,
|
||||
MHash: &h.MHash,
|
||||
PHash: &h.PHash,
|
||||
WHash: &h.WHash,
|
||||
Size: len(data),
|
||||
Sha256: sha256.Sum256(data),
|
||||
Sha1: sha1.Sum(data),
|
||||
Md5: md5.Sum(data),
|
||||
Crc32: crc32.ChecksumIEEE(data),
|
||||
Meta: meta,
|
||||
Url: trimUrl(turl),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func trimUrl(link string) string {
|
||||
if strings.HasPrefix(link, "https://") {
|
||||
return link[len("https://"):]
|
||||
} else if strings.HasPrefix(link, "http://") {
|
||||
return link[len("http://"):]
|
||||
}
|
||||
|
||||
return link
|
||||
}
|
||||
|
||||
func main() {
|
||||
Init()
|
||||
|
||||
_, err := Rdb.Ping().Result()
|
||||
if err != nil {
|
||||
Logger.Fatal("Could not connect to redis server")
|
||||
}
|
||||
|
||||
queue := make(chan []string, 100)
|
||||
|
||||
for i := 0; i < Concurrency; i++ {
|
||||
go worker(queue)
|
||||
}
|
||||
|
||||
dispatchFromQueue("q.reddit.*", queue)
|
||||
}
|
||||
Reference in New Issue
Block a user