feed_archiver/archive_sql.go

81 lines
1.8 KiB
Go

package main
import (
"fmt"
"github.com/jackc/pgx"
"github.com/sirupsen/logrus"
"github.com/valyala/fastjson"
"strings"
)
func routingKeyToTable(key string, replacer *strings.Replacer) string {
var table string
if idx := strings.LastIndex(key, "."); idx != -1 {
table = key[:idx]
}
table = replacer.Replace(table)
return table
}
func archiveSql(record *Record) error {
table := routingKeyToTable(record.RoutingKey, replacer)
archiverCtx.m.RLock()
_, tableExists := archiverCtx.tables[table]
archiverCtx.m.RUnlock()
if !tableExists {
createTable(table, record.IdType)
}
logrus.WithFields(logrus.Fields{
"table": table,
"id": record.IdValue,
}).Debug("Insert row")
itemString := record.Item.String()
messageSize := len(itemString)
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), record.IdValue, itemString)
if err != nil {
if err.(pgx.PgError).Code != "23505" {
logrus.WithError(err).Error("Error during insert")
} else if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricUniqueViolation(messageSize, table)
}
} else if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricInsertRow(messageSize, table)
}
return nil
}
func createTable(table string, idType fastjson.Type) {
logrus.WithFields(logrus.Fields{
"table": table,
}).Info("Create table (If not exists)")
var err error
var strType string
if idType == fastjson.TypeNumber {
strType = "bigint"
} else {
strType = "bytea"
}
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
"data JSONB NOT NULL"+
")", table, strType))
if err != nil {
logrus.WithError(err).Error("Error during create table")
}
archiverCtx.m.Lock()
archiverCtx.tables[table] = true
archiverCtx.m.Unlock()
}