From 2b25f2afe0ddaf6d5b88665277b6c58aa75ac588 Mon Sep 17 00:00:00 2001 From: simon987 Date: Sat, 6 Feb 2021 11:11:52 -0500 Subject: [PATCH] Add influxdb monitoring --- go.mod | 1 + go.sum | 2 ++ influxdb.go | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 51 +++++++++++++++++++++------- 4 files changed, 139 insertions(+), 12 deletions(-) create mode 100644 influxdb.go diff --git a/go.mod b/go.mod index 9e851e1..a9242b0 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/go-redis/redis/v8 v8.0.0-beta.2 github.com/gofrs/uuid v3.2.0+incompatible // indirect + github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect github.com/jackc/pgx v3.6.2+incompatible github.com/lib/pq v1.2.0 // indirect diff --git a/go.sum b/go.sum index 361e65c..e49038f 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig= +github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= diff --git a/influxdb.go b/influxdb.go new file mode 100644 index 0000000..ba29416 --- /dev/null +++ b/influxdb.go @@ -0,0 +1,97 @@ +package main + +import ( + influx "github.com/influxdata/influxdb1-client/v2" + "github.com/sirupsen/logrus" + "time" +) + +type Monitoring struct { + influxdb influx.Client + points chan *influx.Point + connString string + bufferSize int + retentionPolicy string + database string +} + +func NewMonitoring(connString string, buffer int) *Monitoring { + m := new(Monitoring) + m.bufferSize = buffer + m.points = make(chan *influx.Point, m.bufferSize) + m.retentionPolicy = "" + m.database = "feed_archiver" + m.influxdb, _ = influx.NewHTTPClient(influx.HTTPConfig{ + Addr: connString, + }) + go m.asyncWriter() + return m +} + +func (m *Monitoring) asyncWriter() { + + logrus.Info("Started async influxdb writer") + + bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{ + Database: m.database, + RetentionPolicy: m.retentionPolicy, + }) + + for point := range m.points { + bp.AddPoint(point) + + if len(bp.Points()) >= m.bufferSize { + m.flushQueue(&bp) + } + } + m.flushQueue(&bp) +} + +func (m *Monitoring) flushQueue(bp *influx.BatchPoints) { + + err := m.influxdb.Write(*bp) + + if err != nil { + logrus.WithError(err).Error("influxdb write") + return + } + + logrus.WithFields(logrus.Fields{ + "size": len((*bp).Points()), + }).Debug("Wrote points") + + *bp, _ = influx.NewBatchPoints(influx.BatchPointsConfig{ + Database: m.database, + RetentionPolicy: m.retentionPolicy, + }) +} + +func (m *Monitoring) writeMetricInsertRow(size int, table string) { + + point, _ := influx.NewPoint( + "insert_row", + map[string]string{ + "table": table, + }, + map[string]interface{}{ + "size": size, + }, + time.Now(), + ) + m.points <- point +} + +func (m *Monitoring) writeMetricUniqueViolation(size int, table string) { + + point, _ := influx.NewPoint( + "unique_violation", + map[string]string{ + "table": table, + }, + map[string]interface{}{ + "size": size, + }, + time.Now(), + ) + m.points <- point +} diff --git a/main.go b/main.go index 2252250..b6d0aea 100644 --- a/main.go +++ b/main.go @@ -23,14 +23,16 @@ var pool *pgx.ConnPool var replacer = strings.NewReplacer(".", "_") type FeedArchiverArgs struct { - DbHost string - DbUser string - DbPassword string - DbDatabase string - RedisAddr string - RedisPassword string - Pattern string - Threads int + DbHost string + DbUser string + DbPassword string + DbDatabase string + RedisAddr string + RedisPassword string + Pattern string + Threads int + InfluxDb string + InfluxDbBuffer int } func main() { @@ -42,7 +44,7 @@ func main() { Email: "me@simon987.net", }, } - app.Version = "3.0" + app.Version = "4.0" args := FeedArchiverArgs{} @@ -95,13 +97,27 @@ func main() { Destination: &args.Threads, EnvVars: []string{"FA_THREADS"}, }, + &cli.StringFlag{ + Name: "influxdb", + Usage: "Influxdb connection string", + Value: "", + Destination: &args.InfluxDb, + EnvVars: []string{"FA_INFLUXDB"}, + }, + &cli.IntFlag{ + Name: "influxdb-buffer", + Usage: "Influxdb buffer size", + Value: 500, + Destination: &args.InfluxDbBuffer, + EnvVars: []string{"FA_INFLUXDB_BUFFER"}, + }, } app.Action = func(c *cli.Context) error { archiverCtx.tables = map[string]bool{} - logrus.SetLevel(logrus.InfoLevel) + logrus.SetLevel(logrus.DebugLevel) connPoolConfig := pgx.ConnPoolConfig{ ConnConfig: pgx.ConnConfig{ @@ -113,6 +129,11 @@ func main() { MaxConnections: args.Threads, } + var mon *Monitoring = nil + if args.InfluxDb != "" { + mon = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer) + } + var err error pool, err = pgx.NewConnPool(connPoolConfig) if err != nil { @@ -133,7 +154,7 @@ func main() { func(message string, key string) error { table := routingKeyToTable(key[len(args.Pattern)-1:], replacer) - archive(parser, table, message) + archive(parser, table, message, mon) return nil }, ) @@ -160,7 +181,7 @@ func routingKeyToTable(key string, replacer *strings.Replacer) string { return table } -func archive(parser fastjson.Parser, table string, json string) { +func archive(parser fastjson.Parser, table string, json string, mon *Monitoring) { item, _ := parser.Parse(json) idValue := item.Get("_id") @@ -188,11 +209,17 @@ func archive(parser fastjson.Parser, table string, json string) { "id": idValue, }).Debug("Insert row") + messageSize := len(json) + _, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String()) if err != nil { if err.(pgx.PgError).Code != "23505" { logrus.WithError(err).Error("Error during insert") + } else if mon != nil { + mon.writeMetricUniqueViolation(messageSize, table) } + } else if mon != nil { + mon.writeMetricInsertRow(messageSize, table) } }