diff --git a/archive_es.go b/archive_es.go new file mode 100644 index 0000000..5bb87c0 --- /dev/null +++ b/archive_es.go @@ -0,0 +1,74 @@ +package main + +import ( + "context" + "fmt" + "github.com/elastic/go-elasticsearch/v7/esapi" + "github.com/sirupsen/logrus" + "github.com/valyala/fastjson" + "strconv" + "strings" +) + +func routingKeyToIndex(key string) string { + + if idx := strings.Index(key, "."); idx != -1 { + project := key[:idx] + return fmt.Sprintf("%s-data", project) + } + + panic(fmt.Sprintf("Cannot get ES index name for key %s", key)) +} + +func archiveEs(record *Record) error { + + var stringId string + + if record.IdType == fastjson.TypeNumber { + stringId = strconv.FormatInt(record.IdValue.(int64), 10) + } else if record.IdType == fastjson.TypeString { + stringId = string(record.IdValue.([]uint8)) + } + + index := routingKeyToIndex(record.RoutingKey) + + // Will cause error in ES if we specify the _id field in the document body + record.Item.Del("_id") + + req := esapi.IndexRequest{ + Index: index, + DocumentID: stringId, + Body: strings.NewReader(record.Item.String()), + Refresh: "false", + } + + res, err := req.Do(context.Background(), es) + if err != nil { + logrus.WithFields(logrus.Fields{ + "id": record.IdValue, + "item": record.Item.String(), + }).WithError(err).Error("Request error during document index") + return nil + } + + if res.IsError() { + logrus.WithFields(logrus.Fields{ + "id": stringId, + "status": res.String(), + "item": record.Item.String(), + }).Error("ES error during document index") + } + + logrus.WithFields(logrus.Fields{ + "id": stringId, + "index": index, + }).Debug("Insert document") + + if archiverCtx.Monitoring != nil { + archiverCtx.Monitoring.writeMetricIndexDoc(index) + } + + _ = res.Body.Close() + + return nil +} diff --git a/archive_sql.go b/archive_sql.go new file mode 100644 index 0000000..d4c4de7 --- /dev/null +++ b/archive_sql.go @@ -0,0 +1,80 @@ +package main + +import ( + "fmt" + "github.com/jackc/pgx" + "github.com/sirupsen/logrus" + "github.com/valyala/fastjson" + "strings" +) + +func routingKeyToTable(key string, replacer *strings.Replacer) string { + var table string + if idx := strings.LastIndex(key, "."); idx != -1 { + table = key[:idx] + } + table = replacer.Replace(table) + return table +} + +func archiveSql(record *Record) error { + + table := routingKeyToTable(record.RoutingKey, replacer) + archiverCtx.m.RLock() + _, tableExists := archiverCtx.tables[table] + archiverCtx.m.RUnlock() + + if !tableExists { + createTable(table, record.IdType) + } + + logrus.WithFields(logrus.Fields{ + "table": table, + "id": record.IdValue, + }).Debug("Insert row") + + itemString := record.Item.String() + messageSize := len(itemString) + + _, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), record.IdValue, itemString) + if err != nil { + if err.(pgx.PgError).Code != "23505" { + logrus.WithError(err).Error("Error during insert") + } else if archiverCtx.Monitoring != nil { + archiverCtx.Monitoring.writeMetricUniqueViolation(messageSize, table) + } + } else if archiverCtx.Monitoring != nil { + archiverCtx.Monitoring.writeMetricInsertRow(messageSize, table) + } + + return nil +} + +func createTable(table string, idType fastjson.Type) { + + logrus.WithFields(logrus.Fields{ + "table": table, + }).Info("Create table (If not exists)") + + var err error + var strType string + if idType == fastjson.TypeNumber { + strType = "bigint" + } else { + strType = "bytea" + } + + _, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+ + "id %s PRIMARY KEY,"+ + "archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+ + "data JSONB NOT NULL"+ + ")", table, strType)) + + if err != nil { + logrus.WithError(err).Error("Error during create table") + } + + archiverCtx.m.Lock() + archiverCtx.tables[table] = true + archiverCtx.m.Unlock() +} diff --git a/go.mod b/go.mod index a9242b0..afbcb7f 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.13 require ( github.com/cockroachdb/apd v1.1.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect + github.com/elastic/go-elasticsearch/v7 v7.12.0 github.com/go-redis/redis/v8 v8.0.0-beta.2 github.com/gofrs/uuid v3.2.0+incompatible // indirect github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab diff --git a/go.sum b/go.sum index e49038f..761373d 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s= +github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/influxdb.go b/influxdb.go index 3dd16a2..75cd19d 100644 --- a/influxdb.go +++ b/influxdb.go @@ -95,3 +95,16 @@ func (m *Monitoring) writeMetricUniqueViolation(size int, table string) { ) m.points <- point } + +func (m *Monitoring) writeMetricIndexDoc(table string) { + + point, _ := influx.NewPoint( + "index_doc", + map[string]string{ + "table": table, + }, + map[string]interface{}{}, + time.Now(), + ) + m.points <- point +} diff --git a/main.go b/main.go index 8379931..711ca5d 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,7 @@ package main import ( "context" - "fmt" + "github.com/elastic/go-elasticsearch/v7" "github.com/go-redis/redis/v8" "github.com/jackc/pgx" "github.com/sirupsen/logrus" @@ -15,11 +15,22 @@ import ( ) var archiverCtx struct { - tables map[string]bool - m sync.RWMutex + tables map[string]bool + m sync.RWMutex + ArchiveFunc func(*Record) error + Monitoring *Monitoring + esIndex string // TODO: based on routing key? +} + +type Record struct { + Item *fastjson.Value + IdValue interface{} + IdType fastjson.Type + RoutingKey string } var pool *pgx.ConnPool +var es *elasticsearch.Client var replacer = strings.NewReplacer(".", "_") type FeedArchiverArgs struct { @@ -27,6 +38,11 @@ type FeedArchiverArgs struct { DbUser string DbPassword string DbDatabase string + EsHost string + EsUser string + EsPassword string + EsIndex string + ArchiveTarget string RedisAddr string RedisPassword string Pattern string @@ -111,6 +127,43 @@ func main() { Destination: &args.InfluxDbBuffer, EnvVars: []string{"FA_INFLUXDB_BUFFER"}, }, + &cli.StringFlag{ + Name: "es-host", + Usage: "Elasticsearch host", + Destination: &args.EsHost, + Value: "http://localhost:9200", + EnvVars: []string{"FA_ES_HOST"}, + }, + &cli.StringFlag{ + Name: "es-user", + Usage: "Elasticsearch username", + Destination: &args.EsUser, + Value: "elastic", + EnvVars: []string{"FA_ES_USER"}, + }, + &cli.StringFlag{ + Name: "es-password", + Usage: "Elasticsearch password", + Destination: &args.EsPassword, + Value: "", + EnvVars: []string{"FA_ES_PASSWORD"}, + }, + + // TODO: Based on routing key? + &cli.StringFlag{ + Name: "es-index", + Usage: "Elasticsearch index", + Destination: &args.EsIndex, + Value: "feed_archiver", + EnvVars: []string{"FA_ES_INDEX"}, + }, + &cli.StringFlag{ + Name: "target", + Usage: "Either 'es' or 'sql'", + Destination: &args.ArchiveTarget, + Value: "sql", + EnvVars: []string{"FA_TARGET"}, + }, } app.Action = func(c *cli.Context) error { @@ -129,15 +182,22 @@ func main() { MaxConnections: args.Threads, } - var mon *Monitoring = nil - if args.InfluxDb != "" { - mon = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer) + if args.ArchiveTarget == "sql" { + var err error + pool, err = pgx.NewConnPool(connPoolConfig) + if err != nil { + panic(err) + } + archiverCtx.ArchiveFunc = archiveSql + } else { + es, _ = elasticsearch.NewDefaultClient() + archiverCtx.ArchiveFunc = archiveEs + archiverCtx.esIndex = args.EsIndex } - var err error - pool, err = pgx.NewConnPool(connPoolConfig) - if err != nil { - panic(err) + archiverCtx.Monitoring = nil + if args.InfluxDb != "" { + archiverCtx.Monitoring = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer) } rdb := redis.NewClient(&redis.Options{ @@ -153,9 +213,28 @@ func main() { args.Pattern, func(message string, key string) error { - table := routingKeyToTable(key[len(args.Pattern)-1:], replacer) - archive(parser, table, message, mon) - return nil + item, _ := parser.Parse(message) + + id := item.Get("_id") + if id == nil { + logrus.WithField("json", key).Error("Item with no _id field!") + return nil + } + + var idValue interface{} + + if id.Type() == fastjson.TypeNumber { + idValue, _ = id.Int64() + } else if id.Type() == fastjson.TypeString { + idValue, _ = id.StringBytes() + } + + return archiverCtx.ArchiveFunc(&Record{ + Item: item, + IdType: id.Type(), + IdValue: idValue, + RoutingKey: key[len(args.Pattern)-1:], + }) }, ) } @@ -172,95 +251,34 @@ func main() { } } -func routingKeyToTable(key string, replacer *strings.Replacer) string { - var table string - if idx := strings.LastIndex(key, "."); idx != -1 { - table = key[:idx] - } - table = replacer.Replace(table) - return table -} - -func archive(parser fastjson.Parser, table string, json string, mon *Monitoring) { - item, _ := parser.Parse(json) - - idValue := item.Get("_id") - if idValue == nil { - logrus.WithField("json", string(json)).Error("Item with no _id field!") - return - } - - var id interface{} - if idValue.Type() == fastjson.TypeNumber { - id, _ = idValue.Int64() - } else if idValue.Type() == fastjson.TypeString { - id, _ = idValue.StringBytes() - } - - archiverCtx.m.RLock() - _, tableExists := archiverCtx.tables[table] - archiverCtx.m.RUnlock() - if !tableExists { - createTable(table, idValue.Type()) - } - - logrus.WithFields(logrus.Fields{ - "table": table, - "id": idValue, - }).Debug("Insert row") - - messageSize := len(json) - - _, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String()) - if err != nil { - if err.(pgx.PgError).Code != "23505" { - logrus.WithError(err).Error("Error during insert") - } else if mon != nil { - mon.writeMetricUniqueViolation(messageSize, table) - } - } else if mon != nil { - mon.writeMetricInsertRow(messageSize, table) - } -} - -func createTable(table string, idType fastjson.Type) { - - logrus.WithFields(logrus.Fields{ - "table": table, - }).Info("Create table (If not exists)") - - var err error - var strType string - if idType == fastjson.TypeNumber { - strType = "bigint" - } else { - strType = "bytea" - } - - _, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+ - "id %s PRIMARY KEY,"+ - "archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+ - "data JSONB NOT NULL"+ - ")", table, strType)) - - if err != nil { - logrus.WithError(err).Error("Error during create table") - } - - archiverCtx.m.Lock() - archiverCtx.tables[table] = true - archiverCtx.m.Unlock() -} - var keyCache []string = nil +// BLPOP with too many keys is slow! +const maxKeys = 30 + func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string { if keyCache == nil { - keys, err := rdb.Keys(ctx, pattern).Result() - if err != nil { - logrus.WithField("Pattern", pattern).Error("Could not get keys for Pattern") - return nil + var cur uint64 = 0 + var keyRes []string + var keys []string + var err error + + for { + keyRes, cur, err = rdb.Scan(ctx, cur, pattern, 10).Result() + if err != nil { + logrus. + WithError(err). + WithField("Pattern", pattern). + Error("Could not get keys for Pattern") + return nil + } + + if cur == 0 || len(keys) >= maxKeys { + break + } + + keys = append(keys, keyRes...) } keyCache = keys