mirror of
https://github.com/simon987/imhashdb.git
synced 2025-04-04 02:22:58 +00:00
313 lines
7.8 KiB
Go
313 lines
7.8 KiB
Go
package imhashdb
|
|
|
|
import (
|
|
"context"
|
|
"crypto/md5"
|
|
"crypto/sha1"
|
|
"crypto/sha256"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/jackc/pgx"
|
|
"github.com/jackc/pgx/pgtype"
|
|
"github.com/mailru/easyjson"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const MaxDistance = 100
|
|
const MaxLimit = 1000
|
|
|
|
type Entry struct {
|
|
H *Hashes
|
|
Size int
|
|
Sha1 [sha1.Size]byte
|
|
Md5 [md5.Size]byte
|
|
Sha256 [sha256.Size]byte
|
|
Crc32 uint32
|
|
Meta []Meta
|
|
Url string
|
|
}
|
|
|
|
type MatchTrigger struct {
|
|
HashType HashType
|
|
MinDistance int
|
|
Id int
|
|
}
|
|
|
|
var MatchTriggers = []MatchTrigger{
|
|
{
|
|
HashType: PHash16,
|
|
MinDistance: 25,
|
|
Id: 2,
|
|
},
|
|
}
|
|
|
|
func Store(entry *Entry) {
|
|
row := Pgdb.QueryRow(
|
|
`INSERT INTO image (size, sha1, md5, sha256, crc32) VALUES ($1, $2, $3, $4, $5) RETURNING id;`,
|
|
entry.Size, entry.Sha1[:], entry.Md5[:], entry.Sha256[:], entry.Crc32,
|
|
)
|
|
|
|
var id int64
|
|
imageExists := false
|
|
err := row.Scan(&id)
|
|
if err != nil {
|
|
imageExists = true
|
|
row = Pgdb.QueryRow(`SELECT id FROM image WHERE sha1=$1`, entry.Sha1[:])
|
|
err := row.Scan(&id)
|
|
|
|
if err != nil {
|
|
Logger.Error("FIXME: Could not insert image", zap.Error(err))
|
|
return
|
|
}
|
|
}
|
|
|
|
if !imageExists {
|
|
_, err = Pgdb.Exec("INSERT INTO hash_dhash8 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.DHash8.Bytes)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_dhash16 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.DHash16.Bytes)
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_dhash32 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.DHash32.Bytes)
|
|
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_mhash8 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.MHash8.Bytes)
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_mhash16 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.MHash16.Bytes)
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_mhash32 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.MHash32.Bytes)
|
|
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_phash8 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.PHash8.Bytes)
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_phash16 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.PHash16.Bytes)
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_phash32 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.PHash32.Bytes)
|
|
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_whash8haar VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.WHash8.Bytes)
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_whash16haar VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.WHash16.Bytes)
|
|
_, _ = Pgdb.Exec("INSERT INTO hash_whash32haar VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.WHash32.Bytes)
|
|
}
|
|
|
|
for _, meta := range entry.Meta {
|
|
|
|
_, err = Pgdb.Exec(
|
|
"INSERT INTO image_meta (id, retrieved_at, meta) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
|
|
meta.Id, meta.RetrievedAt, meta.Meta,
|
|
)
|
|
if err != nil {
|
|
Logger.Error("Could not insert meta", zap.Error(err))
|
|
return
|
|
}
|
|
_, err = Pgdb.Exec(
|
|
"INSERT INTO image_has_meta VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
|
|
id, entry.Url, meta.Id,
|
|
)
|
|
if err != nil {
|
|
Logger.Error("Could not insert ihm", zap.Error(err))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func isHashValid(hash []byte, hashType HashType) bool {
|
|
switch hashType {
|
|
case DHash8:
|
|
fallthrough
|
|
case MHash8:
|
|
fallthrough
|
|
case PHash8:
|
|
fallthrough
|
|
case WHash8Haar:
|
|
return len(hash) == 8
|
|
|
|
case DHash16:
|
|
fallthrough
|
|
case MHash16:
|
|
fallthrough
|
|
case PHash16:
|
|
fallthrough
|
|
case WHash16Haar:
|
|
return len(hash) == 32
|
|
|
|
case DHash32:
|
|
fallthrough
|
|
case MHash32:
|
|
fallthrough
|
|
case PHash32:
|
|
fallthrough
|
|
case WHash32Haar:
|
|
return len(hash) == 128
|
|
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func FindImagesByHash(ctx context.Context, hash []byte, hashType HashType, distance, limit, offset uint) ([]byte, error) {
|
|
|
|
if !isHashValid(hash, hashType) {
|
|
return nil, errors.New("invalid hash")
|
|
}
|
|
|
|
if distance > MaxDistance {
|
|
return nil, errors.New("Invalid distance")
|
|
}
|
|
|
|
if limit > MaxLimit {
|
|
return nil, errors.New("Invalid distance")
|
|
}
|
|
|
|
tx, err := Pgdb.BeginEx(ctx, &pgx.TxOptions{IsoLevel: pgx.ReadUncommitted})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Commit()
|
|
|
|
var sql string
|
|
sql = fmt.Sprintf(
|
|
`SELECT image.* FROM image INNER JOIN hash_%s h on image.id = h.image_id
|
|
WHERE hash_is_within_distance%d(h.hash, $1, $2)
|
|
ORDER BY image.id LIMIT $3 OFFSET $4`,
|
|
hashType, hashType.HashLength())
|
|
|
|
rows, err := tx.Query(sql, hash, distance, limit, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var images []*Image
|
|
for rows.Next() {
|
|
var im Image
|
|
err := rows.Scan(&im.id, &im.Crc32, &im.Size, &im.Sha1, &im.Md5, &im.Sha256)
|
|
if err != nil {
|
|
Logger.Error("Error while fetching db image", zap.String("err", err.Error()))
|
|
return nil, err
|
|
}
|
|
|
|
images = append(images, &im)
|
|
}
|
|
|
|
if images == nil {
|
|
b, _ := easyjson.Marshal(ImageList{Images: []*Image{}})
|
|
return b, nil
|
|
}
|
|
|
|
batch := tx.BeginBatch()
|
|
defer batch.Close()
|
|
for _, im := range images {
|
|
batch.Queue(
|
|
`SELECT ihm.url, meta.id, meta.retrieved_at, meta.meta FROM image_has_meta ihm
|
|
INNER JOIN image_meta meta on ihm.image_meta_id = meta.id
|
|
WHERE image_id=$1`,
|
|
[]interface{}{im.id},
|
|
[]pgtype.OID{pgtype.Int4OID},
|
|
nil,
|
|
)
|
|
}
|
|
|
|
err = batch.Send(ctx, nil)
|
|
if err != nil {
|
|
Logger.Error("Error while fetching db meta", zap.String("err", err.Error()))
|
|
return nil, err
|
|
}
|
|
|
|
for _, im := range images {
|
|
rows, err := batch.QueryResults()
|
|
if err != nil {
|
|
Logger.Error("Error while fetching db meta", zap.String("err", err.Error()))
|
|
return nil, err
|
|
}
|
|
|
|
for rows.Next() {
|
|
var ihm ImageHasMeta
|
|
|
|
err := rows.Scan(&ihm.Url, &ihm.Meta.Id, &ihm.Meta.RetrievedAt, &ihm.Meta.Meta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
im.Meta = append(im.Meta, ihm)
|
|
}
|
|
}
|
|
|
|
b, _ := easyjson.Marshal(ImageList{Images: images})
|
|
return b, nil
|
|
}
|
|
|
|
func DbInit(pool *pgx.ConnPool) {
|
|
|
|
sql := `
|
|
CREATE TABLE IF NOT EXISTS image (
|
|
id BIGSERIAL PRIMARY KEY NOT NULL,
|
|
crc32 bigint NOT NULL,
|
|
size INT NOT NULL,
|
|
sha1 bytea NOT NULL,
|
|
md5 bytea NOT NULL,
|
|
sha256 bytea NOT NULL
|
|
);
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_image_sha1 ON image(sha1);
|
|
|
|
CREATE TABLE IF NOT EXISTS image_meta (
|
|
retrieved_at bigint NOT NULL,
|
|
id TEXT PRIMARY KEY,
|
|
meta JSONB NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS image_has_meta (
|
|
image_id bigint REFERENCES image(id) NOT NULL,
|
|
url TEXT NOT NULL,
|
|
image_meta_id text REFERENCES image_meta(id) NOT NULL,
|
|
UNIQUE(image_id, image_meta_id)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS matchlist (
|
|
id smallint,
|
|
distance smallint NOT NULL,
|
|
im1 bigint NOT NULL,
|
|
im2 bigint NOT NULL
|
|
);
|
|
`
|
|
for _, hashType := range HashTypes {
|
|
sql += fmt.Sprintf(`CREATE TABLE IF NOT EXISTS hash_%s (
|
|
image_id BIGINT REFERENCES image(id) UNIQUE NOT NULL,
|
|
hash bytea NOT NULL);`, hashType)
|
|
}
|
|
|
|
for _, trigger := range MatchTriggers {
|
|
sql += fmt.Sprintf(`
|
|
CREATE OR REPLACE FUNCTION on_%s_insert() RETURNS TRIGGER AS $$
|
|
BEGIN
|
|
INSERT INTO matchlist (id, distance, im1, im2)
|
|
SELECT %d, hash_distance%d(hash, NEW.hash), NEW.image_id, image_id FROM hash_%s AS h
|
|
WHERE h.image_id != NEW.image_id AND hash_is_within_distance%d(hash, NEW.hash, %d);
|
|
RETURN NEW;
|
|
END;
|
|
$$ LANGUAGE 'plpgsql';
|
|
DROP TRIGGER IF EXISTS on_%s_insert ON hash_%s;
|
|
CREATE TRIGGER on_%s_insert AFTER INSERT ON hash_%s
|
|
FOR EACH ROW EXECUTE PROCEDURE on_%s_insert();`,
|
|
trigger.HashType, trigger.Id, trigger.HashType.HashLength(), trigger.HashType,
|
|
trigger.HashType.HashLength(), trigger.MinDistance, trigger.HashType,
|
|
trigger.HashType, trigger.HashType, trigger.HashType, trigger.HashType)
|
|
}
|
|
|
|
_, err := pool.Exec(sql)
|
|
if err != nil {
|
|
Logger.Fatal("Could not initialize database", zap.String("err", err.Error()))
|
|
}
|
|
}
|
|
|
|
func DbConnect(host string, port int, user, password, database string) *pgx.ConnPool {
|
|
connPoolConfig := pgx.ConnPoolConfig{
|
|
ConnConfig: pgx.ConnConfig{
|
|
Host: host,
|
|
User: user,
|
|
Port: uint16(port),
|
|
Password: password,
|
|
Database: database,
|
|
},
|
|
MaxConnections: 10,
|
|
}
|
|
|
|
var err error
|
|
pool, err := pgx.NewConnPool(connPoolConfig)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return pool
|
|
}
|