Compare commits

..

No commits in common. "a7cc28a6ac394a3bccb5986f41e028e1aa58a796" and "b18a57d2562b840b4375cbc865fd3014aa2acb2e" have entirely different histories.

7 changed files with 97 additions and 285 deletions

View File

@ -1,74 +0,0 @@
package main
import (
"context"
"fmt"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/sirupsen/logrus"
"github.com/valyala/fastjson"
"strconv"
"strings"
)
func routingKeyToIndex(key string) string {
if idx := strings.Index(key, "."); idx != -1 {
project := key[:idx]
return fmt.Sprintf("%s-data", project)
}
panic(fmt.Sprintf("Cannot get ES index name for key %s", key))
}
func archiveEs(record *Record) error {
var stringId string
if record.IdType == fastjson.TypeNumber {
stringId = strconv.FormatInt(record.IdValue.(int64), 10)
} else if record.IdType == fastjson.TypeString {
stringId = string(record.IdValue.([]uint8))
}
index := routingKeyToIndex(record.RoutingKey)
// Will cause error in ES if we specify the _id field in the document body
record.Item.Del("_id")
req := esapi.IndexRequest{
Index: index,
DocumentID: stringId,
Body: strings.NewReader(record.Item.String()),
Refresh: "false",
}
res, err := req.Do(context.Background(), es)
if err != nil {
logrus.WithFields(logrus.Fields{
"id": record.IdValue,
"item": record.Item.String(),
}).WithError(err).Error("Request error during document index")
return nil
}
if res.IsError() {
logrus.WithFields(logrus.Fields{
"id": stringId,
"status": res.String(),
"item": record.Item.String(),
}).Error("ES error during document index")
}
logrus.WithFields(logrus.Fields{
"id": stringId,
"index": index,
}).Debug("Insert document")
if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricIndexDoc(index)
}
_ = res.Body.Close()
return nil
}

View File

@ -1,80 +0,0 @@
package main
import (
"fmt"
"github.com/jackc/pgx"
"github.com/sirupsen/logrus"
"github.com/valyala/fastjson"
"strings"
)
func routingKeyToTable(key string, replacer *strings.Replacer) string {
var table string
if idx := strings.LastIndex(key, "."); idx != -1 {
table = key[:idx]
}
table = replacer.Replace(table)
return table
}
func archiveSql(record *Record) error {
table := routingKeyToTable(record.RoutingKey, replacer)
archiverCtx.m.RLock()
_, tableExists := archiverCtx.tables[table]
archiverCtx.m.RUnlock()
if !tableExists {
createTable(table, record.IdType)
}
logrus.WithFields(logrus.Fields{
"table": table,
"id": record.IdValue,
}).Debug("Insert row")
itemString := record.Item.String()
messageSize := len(itemString)
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), record.IdValue, itemString)
if err != nil {
if err.(pgx.PgError).Code != "23505" {
logrus.WithError(err).Error("Error during insert")
} else if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricUniqueViolation(messageSize, table)
}
} else if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricInsertRow(messageSize, table)
}
return nil
}
func createTable(table string, idType fastjson.Type) {
logrus.WithFields(logrus.Fields{
"table": table,
}).Info("Create table (If not exists)")
var err error
var strType string
if idType == fastjson.TypeNumber {
strType = "bigint"
} else {
strType = "bytea"
}
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
"data JSONB NOT NULL"+
")", table, strType))
if err != nil {
logrus.WithError(err).Error("Error during create table")
}
archiverCtx.m.Lock()
archiverCtx.tables[table] = true
archiverCtx.m.Unlock()
}

1
go.mod
View File

@ -5,7 +5,6 @@ go 1.13
require ( require (
github.com/cockroachdb/apd v1.1.0 // indirect github.com/cockroachdb/apd v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/elastic/go-elasticsearch/v7 v7.12.0
github.com/go-redis/redis/v8 v8.0.0-beta.2 github.com/go-redis/redis/v8 v8.0.0-beta.2
github.com/gofrs/uuid v3.2.0+incompatible // indirect github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab

2
go.sum
View File

@ -16,8 +16,6 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s=
github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=

View File

@ -95,16 +95,3 @@ func (m *Monitoring) writeMetricUniqueViolation(size int, table string) {
) )
m.points <- point m.points <- point
} }
func (m *Monitoring) writeMetricIndexDoc(table string) {
point, _ := influx.NewPoint(
"index_doc",
map[string]string{
"table": table,
},
map[string]interface{}{},
time.Now(),
)
m.points <- point
}

198
main.go
View File

@ -2,7 +2,7 @@ package main
import ( import (
"context" "context"
"github.com/elastic/go-elasticsearch/v7" "fmt"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/jackc/pgx" "github.com/jackc/pgx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -17,20 +17,9 @@ import (
var archiverCtx struct { var archiverCtx struct {
tables map[string]bool tables map[string]bool
m sync.RWMutex m sync.RWMutex
ArchiveFunc func(*Record) error
Monitoring *Monitoring
esIndex string // TODO: based on routing key?
}
type Record struct {
Item *fastjson.Value
IdValue interface{}
IdType fastjson.Type
RoutingKey string
} }
var pool *pgx.ConnPool var pool *pgx.ConnPool
var es *elasticsearch.Client
var replacer = strings.NewReplacer(".", "_") var replacer = strings.NewReplacer(".", "_")
type FeedArchiverArgs struct { type FeedArchiverArgs struct {
@ -38,11 +27,6 @@ type FeedArchiverArgs struct {
DbUser string DbUser string
DbPassword string DbPassword string
DbDatabase string DbDatabase string
EsHost string
EsUser string
EsPassword string
EsIndex string
ArchiveTarget string
RedisAddr string RedisAddr string
RedisPassword string RedisPassword string
Pattern string Pattern string
@ -127,43 +111,6 @@ func main() {
Destination: &args.InfluxDbBuffer, Destination: &args.InfluxDbBuffer,
EnvVars: []string{"FA_INFLUXDB_BUFFER"}, EnvVars: []string{"FA_INFLUXDB_BUFFER"},
}, },
&cli.StringFlag{
Name: "es-host",
Usage: "Elasticsearch host",
Destination: &args.EsHost,
Value: "http://localhost:9200",
EnvVars: []string{"FA_ES_HOST"},
},
&cli.StringFlag{
Name: "es-user",
Usage: "Elasticsearch username",
Destination: &args.EsUser,
Value: "elastic",
EnvVars: []string{"FA_ES_USER"},
},
&cli.StringFlag{
Name: "es-password",
Usage: "Elasticsearch password",
Destination: &args.EsPassword,
Value: "",
EnvVars: []string{"FA_ES_PASSWORD"},
},
// TODO: Based on routing key?
&cli.StringFlag{
Name: "es-index",
Usage: "Elasticsearch index",
Destination: &args.EsIndex,
Value: "feed_archiver",
EnvVars: []string{"FA_ES_INDEX"},
},
&cli.StringFlag{
Name: "target",
Usage: "Either 'es' or 'sql'",
Destination: &args.ArchiveTarget,
Value: "sql",
EnvVars: []string{"FA_TARGET"},
},
} }
app.Action = func(c *cli.Context) error { app.Action = func(c *cli.Context) error {
@ -182,23 +129,16 @@ func main() {
MaxConnections: args.Threads, MaxConnections: args.Threads,
} }
if args.ArchiveTarget == "sql" { var mon *Monitoring = nil
if args.InfluxDb != "" {
mon = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer)
}
var err error var err error
pool, err = pgx.NewConnPool(connPoolConfig) pool, err = pgx.NewConnPool(connPoolConfig)
if err != nil { if err != nil {
panic(err) panic(err)
} }
archiverCtx.ArchiveFunc = archiveSql
} else {
es, _ = elasticsearch.NewDefaultClient()
archiverCtx.ArchiveFunc = archiveEs
archiverCtx.esIndex = args.EsIndex
}
archiverCtx.Monitoring = nil
if args.InfluxDb != "" {
archiverCtx.Monitoring = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer)
}
rdb := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
Addr: args.RedisAddr, Addr: args.RedisAddr,
@ -213,28 +153,9 @@ func main() {
args.Pattern, args.Pattern,
func(message string, key string) error { func(message string, key string) error {
item, _ := parser.Parse(message) table := routingKeyToTable(key[len(args.Pattern)-1:], replacer)
archive(parser, table, message, mon)
id := item.Get("_id")
if id == nil {
logrus.WithField("json", key).Error("Item with no _id field!")
return nil return nil
}
var idValue interface{}
if id.Type() == fastjson.TypeNumber {
idValue, _ = id.Int64()
} else if id.Type() == fastjson.TypeString {
idValue, _ = id.StringBytes()
}
return archiverCtx.ArchiveFunc(&Record{
Item: item,
IdType: id.Type(),
IdValue: idValue,
RoutingKey: key[len(args.Pattern)-1:],
})
}, },
) )
} }
@ -251,36 +172,97 @@ func main() {
} }
} }
var keyCache []string = nil func routingKeyToTable(key string, replacer *strings.Replacer) string {
var table string
if idx := strings.LastIndex(key, "."); idx != -1 {
table = key[:idx]
}
table = replacer.Replace(table)
return table
}
// BLPOP with too many keys is slow! func archive(parser fastjson.Parser, table string, json string, mon *Monitoring) {
const maxKeys = 30 item, _ := parser.Parse(json)
idValue := item.Get("_id")
if idValue == nil {
logrus.WithField("json", string(json)).Error("Item with no _id field!")
return
}
var id interface{}
if idValue.Type() == fastjson.TypeNumber {
id, _ = idValue.Int64()
} else if idValue.Type() == fastjson.TypeString {
id, _ = idValue.StringBytes()
}
archiverCtx.m.RLock()
_, tableExists := archiverCtx.tables[table]
archiverCtx.m.RUnlock()
if !tableExists {
createTable(table, idValue.Type())
}
logrus.WithFields(logrus.Fields{
"table": table,
"id": idValue,
}).Debug("Insert row")
messageSize := len(json)
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String())
if err != nil {
if err.(pgx.PgError).Code != "23505" {
logrus.WithError(err).Error("Error during insert")
} else if mon != nil {
mon.writeMetricUniqueViolation(messageSize, table)
}
} else if mon != nil {
mon.writeMetricInsertRow(messageSize, table)
}
}
func createTable(table string, idType fastjson.Type) {
logrus.WithFields(logrus.Fields{
"table": table,
}).Info("Create table (If not exists)")
var err error
var strType string
if idType == fastjson.TypeNumber {
strType = "bigint"
} else {
strType = "bytea"
}
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
"data JSONB NOT NULL"+
")", table, strType))
if err != nil {
logrus.WithError(err).Error("Error during create table")
}
archiverCtx.m.Lock()
archiverCtx.tables[table] = true
archiverCtx.m.Unlock()
}
var keyCache []string = nil
func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string { func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string {
if keyCache == nil { if keyCache == nil {
var cur uint64 = 0 keys, err := rdb.Keys(ctx, pattern).Result()
var keyRes []string
var keys []string
var err error
for {
keyRes, cur, err = rdb.Scan(ctx, cur, pattern, 10).Result()
if err != nil { if err != nil {
logrus. logrus.WithField("Pattern", pattern).Error("Could not get keys for Pattern")
WithError(err).
WithField("Pattern", pattern).
Error("Could not get keys for Pattern")
return nil return nil
} }
if cur == 0 || len(keys) >= maxKeys {
break
}
keys = append(keys, keyRes...)
}
keyCache = keys keyCache = keys
} }