Add ES support (beta) and performance improvement

This commit is contained in:
simon987 2021-06-03 16:41:21 -04:00
parent b18a57d256
commit d670b04d79
6 changed files with 285 additions and 97 deletions

74
archive_es.go Normal file
View File

@ -0,0 +1,74 @@
package main
import (
"context"
"fmt"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/sirupsen/logrus"
"github.com/valyala/fastjson"
"strconv"
"strings"
)
func routingKeyToIndex(key string) string {
if idx := strings.Index(key, "."); idx != -1 {
project := key[:idx]
return fmt.Sprintf("%s-data", project)
}
panic(fmt.Sprintf("Cannot get ES index name for key %s", key))
}
func archiveEs(record *Record) error {
var stringId string
if record.IdType == fastjson.TypeNumber {
stringId = strconv.FormatInt(record.IdValue.(int64), 10)
} else if record.IdType == fastjson.TypeString {
stringId = string(record.IdValue.([]uint8))
}
index := routingKeyToIndex(record.RoutingKey)
// Will cause error in ES if we specify the _id field in the document body
record.Item.Del("_id")
req := esapi.IndexRequest{
Index: index,
DocumentID: stringId,
Body: strings.NewReader(record.Item.String()),
Refresh: "false",
}
res, err := req.Do(context.Background(), es)
if err != nil {
logrus.WithFields(logrus.Fields{
"id": record.IdValue,
"item": record.Item.String(),
}).WithError(err).Error("Request error during document index")
return nil
}
if res.IsError() {
logrus.WithFields(logrus.Fields{
"id": stringId,
"status": res.String(),
"item": record.Item.String(),
}).Error("ES error during document index")
}
logrus.WithFields(logrus.Fields{
"id": stringId,
"index": index,
}).Debug("Insert document")
if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricIndexDoc(index)
}
_ = res.Body.Close()
return nil
}

80
archive_sql.go Normal file
View File

@ -0,0 +1,80 @@
package main
import (
"fmt"
"github.com/jackc/pgx"
"github.com/sirupsen/logrus"
"github.com/valyala/fastjson"
"strings"
)
func routingKeyToTable(key string, replacer *strings.Replacer) string {
var table string
if idx := strings.LastIndex(key, "."); idx != -1 {
table = key[:idx]
}
table = replacer.Replace(table)
return table
}
func archiveSql(record *Record) error {
table := routingKeyToTable(record.RoutingKey, replacer)
archiverCtx.m.RLock()
_, tableExists := archiverCtx.tables[table]
archiverCtx.m.RUnlock()
if !tableExists {
createTable(table, record.IdType)
}
logrus.WithFields(logrus.Fields{
"table": table,
"id": record.IdValue,
}).Debug("Insert row")
itemString := record.Item.String()
messageSize := len(itemString)
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), record.IdValue, itemString)
if err != nil {
if err.(pgx.PgError).Code != "23505" {
logrus.WithError(err).Error("Error during insert")
} else if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricUniqueViolation(messageSize, table)
}
} else if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricInsertRow(messageSize, table)
}
return nil
}
func createTable(table string, idType fastjson.Type) {
logrus.WithFields(logrus.Fields{
"table": table,
}).Info("Create table (If not exists)")
var err error
var strType string
if idType == fastjson.TypeNumber {
strType = "bigint"
} else {
strType = "bytea"
}
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
"data JSONB NOT NULL"+
")", table, strType))
if err != nil {
logrus.WithError(err).Error("Error during create table")
}
archiverCtx.m.Lock()
archiverCtx.tables[table] = true
archiverCtx.m.Unlock()
}

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.13
require ( require (
github.com/cockroachdb/apd v1.1.0 // indirect github.com/cockroachdb/apd v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/elastic/go-elasticsearch/v7 v7.12.0
github.com/go-redis/redis/v8 v8.0.0-beta.2 github.com/go-redis/redis/v8 v8.0.0-beta.2
github.com/gofrs/uuid v3.2.0+incompatible // indirect github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab

2
go.sum
View File

@ -16,6 +16,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s=
github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=

View File

@ -95,3 +95,16 @@ func (m *Monitoring) writeMetricUniqueViolation(size int, table string) {
) )
m.points <- point m.points <- point
} }
func (m *Monitoring) writeMetricIndexDoc(table string) {
point, _ := influx.NewPoint(
"index_doc",
map[string]string{
"table": table,
},
map[string]interface{}{},
time.Now(),
)
m.points <- point
}

212
main.go
View File

@ -2,7 +2,7 @@ package main
import ( import (
"context" "context"
"fmt" "github.com/elastic/go-elasticsearch/v7"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/jackc/pgx" "github.com/jackc/pgx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -15,11 +15,22 @@ import (
) )
var archiverCtx struct { var archiverCtx struct {
tables map[string]bool tables map[string]bool
m sync.RWMutex m sync.RWMutex
ArchiveFunc func(*Record) error
Monitoring *Monitoring
esIndex string // TODO: based on routing key?
}
type Record struct {
Item *fastjson.Value
IdValue interface{}
IdType fastjson.Type
RoutingKey string
} }
var pool *pgx.ConnPool var pool *pgx.ConnPool
var es *elasticsearch.Client
var replacer = strings.NewReplacer(".", "_") var replacer = strings.NewReplacer(".", "_")
type FeedArchiverArgs struct { type FeedArchiverArgs struct {
@ -27,6 +38,11 @@ type FeedArchiverArgs struct {
DbUser string DbUser string
DbPassword string DbPassword string
DbDatabase string DbDatabase string
EsHost string
EsUser string
EsPassword string
EsIndex string
ArchiveTarget string
RedisAddr string RedisAddr string
RedisPassword string RedisPassword string
Pattern string Pattern string
@ -111,6 +127,43 @@ func main() {
Destination: &args.InfluxDbBuffer, Destination: &args.InfluxDbBuffer,
EnvVars: []string{"FA_INFLUXDB_BUFFER"}, EnvVars: []string{"FA_INFLUXDB_BUFFER"},
}, },
&cli.StringFlag{
Name: "es-host",
Usage: "Elasticsearch host",
Destination: &args.EsHost,
Value: "http://localhost:9200",
EnvVars: []string{"FA_ES_HOST"},
},
&cli.StringFlag{
Name: "es-user",
Usage: "Elasticsearch username",
Destination: &args.EsUser,
Value: "elastic",
EnvVars: []string{"FA_ES_USER"},
},
&cli.StringFlag{
Name: "es-password",
Usage: "Elasticsearch password",
Destination: &args.EsPassword,
Value: "",
EnvVars: []string{"FA_ES_PASSWORD"},
},
// TODO: Based on routing key?
&cli.StringFlag{
Name: "es-index",
Usage: "Elasticsearch index",
Destination: &args.EsIndex,
Value: "feed_archiver",
EnvVars: []string{"FA_ES_INDEX"},
},
&cli.StringFlag{
Name: "target",
Usage: "Either 'es' or 'sql'",
Destination: &args.ArchiveTarget,
Value: "sql",
EnvVars: []string{"FA_TARGET"},
},
} }
app.Action = func(c *cli.Context) error { app.Action = func(c *cli.Context) error {
@ -129,15 +182,22 @@ func main() {
MaxConnections: args.Threads, MaxConnections: args.Threads,
} }
var mon *Monitoring = nil if args.ArchiveTarget == "sql" {
if args.InfluxDb != "" { var err error
mon = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer) pool, err = pgx.NewConnPool(connPoolConfig)
if err != nil {
panic(err)
}
archiverCtx.ArchiveFunc = archiveSql
} else {
es, _ = elasticsearch.NewDefaultClient()
archiverCtx.ArchiveFunc = archiveEs
archiverCtx.esIndex = args.EsIndex
} }
var err error archiverCtx.Monitoring = nil
pool, err = pgx.NewConnPool(connPoolConfig) if args.InfluxDb != "" {
if err != nil { archiverCtx.Monitoring = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer)
panic(err)
} }
rdb := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
@ -153,9 +213,28 @@ func main() {
args.Pattern, args.Pattern,
func(message string, key string) error { func(message string, key string) error {
table := routingKeyToTable(key[len(args.Pattern)-1:], replacer) item, _ := parser.Parse(message)
archive(parser, table, message, mon)
return nil id := item.Get("_id")
if id == nil {
logrus.WithField("json", key).Error("Item with no _id field!")
return nil
}
var idValue interface{}
if id.Type() == fastjson.TypeNumber {
idValue, _ = id.Int64()
} else if id.Type() == fastjson.TypeString {
idValue, _ = id.StringBytes()
}
return archiverCtx.ArchiveFunc(&Record{
Item: item,
IdType: id.Type(),
IdValue: idValue,
RoutingKey: key[len(args.Pattern)-1:],
})
}, },
) )
} }
@ -172,95 +251,34 @@ func main() {
} }
} }
func routingKeyToTable(key string, replacer *strings.Replacer) string {
var table string
if idx := strings.LastIndex(key, "."); idx != -1 {
table = key[:idx]
}
table = replacer.Replace(table)
return table
}
func archive(parser fastjson.Parser, table string, json string, mon *Monitoring) {
item, _ := parser.Parse(json)
idValue := item.Get("_id")
if idValue == nil {
logrus.WithField("json", string(json)).Error("Item with no _id field!")
return
}
var id interface{}
if idValue.Type() == fastjson.TypeNumber {
id, _ = idValue.Int64()
} else if idValue.Type() == fastjson.TypeString {
id, _ = idValue.StringBytes()
}
archiverCtx.m.RLock()
_, tableExists := archiverCtx.tables[table]
archiverCtx.m.RUnlock()
if !tableExists {
createTable(table, idValue.Type())
}
logrus.WithFields(logrus.Fields{
"table": table,
"id": idValue,
}).Debug("Insert row")
messageSize := len(json)
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String())
if err != nil {
if err.(pgx.PgError).Code != "23505" {
logrus.WithError(err).Error("Error during insert")
} else if mon != nil {
mon.writeMetricUniqueViolation(messageSize, table)
}
} else if mon != nil {
mon.writeMetricInsertRow(messageSize, table)
}
}
func createTable(table string, idType fastjson.Type) {
logrus.WithFields(logrus.Fields{
"table": table,
}).Info("Create table (If not exists)")
var err error
var strType string
if idType == fastjson.TypeNumber {
strType = "bigint"
} else {
strType = "bytea"
}
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
"data JSONB NOT NULL"+
")", table, strType))
if err != nil {
logrus.WithError(err).Error("Error during create table")
}
archiverCtx.m.Lock()
archiverCtx.tables[table] = true
archiverCtx.m.Unlock()
}
var keyCache []string = nil var keyCache []string = nil
// BLPOP with too many keys is slow!
const maxKeys = 30
func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string { func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string {
if keyCache == nil { if keyCache == nil {
keys, err := rdb.Keys(ctx, pattern).Result() var cur uint64 = 0
if err != nil { var keyRes []string
logrus.WithField("Pattern", pattern).Error("Could not get keys for Pattern") var keys []string
return nil var err error
for {
keyRes, cur, err = rdb.Scan(ctx, cur, pattern, 10).Result()
if err != nil {
logrus.
WithError(err).
WithField("Pattern", pattern).
Error("Could not get keys for Pattern")
return nil
}
if cur == 0 || len(keys) >= maxKeys {
break
}
keys = append(keys, keyRes...)
} }
keyCache = keys keyCache = keys