imhashdb/db.go

313 lines
7.8 KiB
Go

package imhashdb
import (
"context"
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"errors"
"fmt"
"github.com/jackc/pgx"
"github.com/jackc/pgx/pgtype"
"github.com/mailru/easyjson"
"go.uber.org/zap"
)
const MaxDistance = 100
const MaxLimit = 1000
type Entry struct {
H *Hashes
Size int
Sha1 [sha1.Size]byte
Md5 [md5.Size]byte
Sha256 [sha256.Size]byte
Crc32 uint32
Meta []Meta
Url string
}
type MatchTrigger struct {
HashType HashType
MinDistance int
Id int
}
var MatchTriggers = []MatchTrigger{
{
HashType: PHash16,
MinDistance: 25,
Id: 2,
},
}
func Store(entry *Entry) {
row := Pgdb.QueryRow(
`INSERT INTO image (size, sha1, md5, sha256, crc32) VALUES ($1, $2, $3, $4, $5) RETURNING id;`,
entry.Size, entry.Sha1[:], entry.Md5[:], entry.Sha256[:], entry.Crc32,
)
var id int64
imageExists := false
err := row.Scan(&id)
if err != nil {
imageExists = true
row = Pgdb.QueryRow(`SELECT id FROM image WHERE sha1=$1`, entry.Sha1[:])
err := row.Scan(&id)
if err != nil {
Logger.Error("FIXME: Could not insert image", zap.Error(err))
return
}
}
if !imageExists {
_, err = Pgdb.Exec("INSERT INTO hash_dhash8 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.DHash8.Bytes)
if err != nil {
panic(err)
}
_, _ = Pgdb.Exec("INSERT INTO hash_dhash16 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.DHash16.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_dhash32 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.DHash32.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_mhash8 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.MHash8.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_mhash16 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.MHash16.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_mhash32 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.MHash32.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_phash8 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.PHash8.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_phash16 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.PHash16.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_phash32 VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.PHash32.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_whash8haar VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.WHash8.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_whash16haar VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.WHash16.Bytes)
_, _ = Pgdb.Exec("INSERT INTO hash_whash32haar VALUES ($1, $2) ON CONFLICT DO NOTHING", id, entry.H.WHash32.Bytes)
}
for _, meta := range entry.Meta {
_, err = Pgdb.Exec(
"INSERT INTO image_meta (id, retrieved_at, meta) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
meta.Id, meta.RetrievedAt, meta.Meta,
)
if err != nil {
Logger.Error("Could not insert meta", zap.Error(err))
return
}
_, err = Pgdb.Exec(
"INSERT INTO image_has_meta VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
id, entry.Url, meta.Id,
)
if err != nil {
Logger.Error("Could not insert ihm", zap.Error(err))
return
}
}
}
func isHashValid(hash []byte, hashType HashType) bool {
switch hashType {
case DHash8:
fallthrough
case MHash8:
fallthrough
case PHash8:
fallthrough
case WHash8Haar:
return len(hash) == 8
case DHash16:
fallthrough
case MHash16:
fallthrough
case PHash16:
fallthrough
case WHash16Haar:
return len(hash) == 32
case DHash32:
fallthrough
case MHash32:
fallthrough
case PHash32:
fallthrough
case WHash32Haar:
return len(hash) == 128
default:
return false
}
}
func FindImagesByHash(ctx context.Context, hash []byte, hashType HashType, distance, limit, offset uint) ([]byte, error) {
if !isHashValid(hash, hashType) {
return nil, errors.New("invalid hash")
}
if distance > MaxDistance {
return nil, errors.New("Invalid distance")
}
if limit > MaxLimit {
return nil, errors.New("Invalid distance")
}
tx, err := Pgdb.BeginEx(ctx, &pgx.TxOptions{IsoLevel: pgx.ReadUncommitted})
if err != nil {
return nil, err
}
defer tx.Commit()
var sql string
sql = fmt.Sprintf(
`SELECT image.* FROM image INNER JOIN hash_%s h on image.id = h.image_id
WHERE hash_is_within_distance%d(h.hash, $1, $2)
ORDER BY image.id LIMIT $3 OFFSET $4`,
hashType, hashType.HashLength())
rows, err := tx.Query(sql, hash, distance, limit, offset)
if err != nil {
return nil, err
}
var images []*Image
for rows.Next() {
var im Image
err := rows.Scan(&im.id, &im.Crc32, &im.Size, &im.Sha1, &im.Md5, &im.Sha256)
if err != nil {
Logger.Error("Error while fetching db image", zap.String("err", err.Error()))
return nil, err
}
images = append(images, &im)
}
if images == nil {
b, _ := easyjson.Marshal(ImageList{Images: []*Image{}})
return b, nil
}
batch := tx.BeginBatch()
defer batch.Close()
for _, im := range images {
batch.Queue(
`SELECT ihm.url, meta.id, meta.retrieved_at, meta.meta FROM image_has_meta ihm
INNER JOIN image_meta meta on ihm.image_meta_id = meta.id
WHERE image_id=$1`,
[]interface{}{im.id},
[]pgtype.OID{pgtype.Int4OID},
nil,
)
}
err = batch.Send(ctx, nil)
if err != nil {
Logger.Error("Error while fetching db meta", zap.String("err", err.Error()))
return nil, err
}
for _, im := range images {
rows, err := batch.QueryResults()
if err != nil {
Logger.Error("Error while fetching db meta", zap.String("err", err.Error()))
return nil, err
}
for rows.Next() {
var ihm ImageHasMeta
err := rows.Scan(&ihm.Url, &ihm.Meta.Id, &ihm.Meta.RetrievedAt, &ihm.Meta.Meta)
if err != nil {
return nil, err
}
im.Meta = append(im.Meta, ihm)
}
}
b, _ := easyjson.Marshal(ImageList{Images: images})
return b, nil
}
func DbInit(pool *pgx.ConnPool) {
sql := `
CREATE TABLE IF NOT EXISTS image (
id BIGSERIAL PRIMARY KEY NOT NULL,
crc32 bigint NOT NULL,
size INT NOT NULL,
sha1 bytea NOT NULL,
md5 bytea NOT NULL,
sha256 bytea NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_image_sha1 ON image(sha1);
CREATE TABLE IF NOT EXISTS image_meta (
retrieved_at bigint NOT NULL,
id TEXT PRIMARY KEY,
meta JSONB NOT NULL
);
CREATE TABLE IF NOT EXISTS image_has_meta (
image_id bigint REFERENCES image(id) NOT NULL,
url TEXT NOT NULL,
image_meta_id text REFERENCES image_meta(id) NOT NULL,
UNIQUE(image_id, image_meta_id)
);
CREATE TABLE IF NOT EXISTS matchlist (
id smallint,
distance smallint NOT NULL,
im1 bigint NOT NULL,
im2 bigint NOT NULL
);
`
for _, hashType := range HashTypes {
sql += fmt.Sprintf(`CREATE TABLE IF NOT EXISTS hash_%s (
image_id BIGINT REFERENCES image(id) UNIQUE NOT NULL,
hash bytea NOT NULL);`, hashType)
}
for _, trigger := range MatchTriggers {
sql += fmt.Sprintf(`
CREATE OR REPLACE FUNCTION on_%s_insert() RETURNS TRIGGER AS $$
BEGIN
INSERT INTO matchlist (id, distance, im1, im2)
SELECT %d, hash_distance%d(hash, NEW.hash), NEW.image_id, image_id FROM hash_%s AS h
WHERE h.image_id != NEW.image_id AND hash_is_within_distance%d(hash, NEW.hash, %d);
RETURN NEW;
END;
$$ LANGUAGE 'plpgsql';
DROP TRIGGER IF EXISTS on_%s_insert ON hash_%s;
CREATE TRIGGER on_%s_insert AFTER INSERT ON hash_%s
FOR EACH ROW EXECUTE PROCEDURE on_%s_insert();`,
trigger.HashType, trigger.Id, trigger.HashType.HashLength(), trigger.HashType,
trigger.HashType.HashLength(), trigger.MinDistance, trigger.HashType,
trigger.HashType, trigger.HashType, trigger.HashType, trigger.HashType)
}
_, err := pool.Exec(sql)
if err != nil {
Logger.Fatal("Could not initialize database", zap.String("err", err.Error()))
}
}
func DbConnect(host string, port int, user, password, database string) *pgx.ConnPool {
connPoolConfig := pgx.ConnPoolConfig{
ConnConfig: pgx.ConnConfig{
Host: host,
User: user,
Port: uint16(port),
Password: password,
Database: database,
},
MaxConnections: 10,
}
var err error
pool, err := pgx.NewConnPool(connPoolConfig)
if err != nil {
panic(err)
}
return pool
}