Add influxdb monitoring

This commit is contained in:
simon987 2021-02-06 11:11:52 -05:00
parent e8ea4ff1dd
commit 2b25f2afe0
4 changed files with 139 additions and 12 deletions

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/go-redis/redis/v8 v8.0.0-beta.2
github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgx v3.6.2+incompatible
github.com/lib/pq v1.2.0 // indirect

2
go.sum
View File

@ -39,6 +39,8 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=

97
influxdb.go Normal file
View File

@ -0,0 +1,97 @@
package main
import (
influx "github.com/influxdata/influxdb1-client/v2"
"github.com/sirupsen/logrus"
"time"
)
type Monitoring struct {
influxdb influx.Client
points chan *influx.Point
connString string
bufferSize int
retentionPolicy string
database string
}
func NewMonitoring(connString string, buffer int) *Monitoring {
m := new(Monitoring)
m.bufferSize = buffer
m.points = make(chan *influx.Point, m.bufferSize)
m.retentionPolicy = ""
m.database = "feed_archiver"
m.influxdb, _ = influx.NewHTTPClient(influx.HTTPConfig{
Addr: connString,
})
go m.asyncWriter()
return m
}
func (m *Monitoring) asyncWriter() {
logrus.Info("Started async influxdb writer")
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: m.database,
RetentionPolicy: m.retentionPolicy,
})
for point := range m.points {
bp.AddPoint(point)
if len(bp.Points()) >= m.bufferSize {
m.flushQueue(&bp)
}
}
m.flushQueue(&bp)
}
func (m *Monitoring) flushQueue(bp *influx.BatchPoints) {
err := m.influxdb.Write(*bp)
if err != nil {
logrus.WithError(err).Error("influxdb write")
return
}
logrus.WithFields(logrus.Fields{
"size": len((*bp).Points()),
}).Debug("Wrote points")
*bp, _ = influx.NewBatchPoints(influx.BatchPointsConfig{
Database: m.database,
RetentionPolicy: m.retentionPolicy,
})
}
func (m *Monitoring) writeMetricInsertRow(size int, table string) {
point, _ := influx.NewPoint(
"insert_row",
map[string]string{
"table": table,
},
map[string]interface{}{
"size": size,
},
time.Now(),
)
m.points <- point
}
func (m *Monitoring) writeMetricUniqueViolation(size int, table string) {
point, _ := influx.NewPoint(
"unique_violation",
map[string]string{
"table": table,
},
map[string]interface{}{
"size": size,
},
time.Now(),
)
m.points <- point
}

51
main.go
View File

@ -23,14 +23,16 @@ var pool *pgx.ConnPool
var replacer = strings.NewReplacer(".", "_")
type FeedArchiverArgs struct {
DbHost string
DbUser string
DbPassword string
DbDatabase string
RedisAddr string
RedisPassword string
Pattern string
Threads int
DbHost string
DbUser string
DbPassword string
DbDatabase string
RedisAddr string
RedisPassword string
Pattern string
Threads int
InfluxDb string
InfluxDbBuffer int
}
func main() {
@ -42,7 +44,7 @@ func main() {
Email: "me@simon987.net",
},
}
app.Version = "3.0"
app.Version = "4.0"
args := FeedArchiverArgs{}
@ -95,13 +97,27 @@ func main() {
Destination: &args.Threads,
EnvVars: []string{"FA_THREADS"},
},
&cli.StringFlag{
Name: "influxdb",
Usage: "Influxdb connection string",
Value: "",
Destination: &args.InfluxDb,
EnvVars: []string{"FA_INFLUXDB"},
},
&cli.IntFlag{
Name: "influxdb-buffer",
Usage: "Influxdb buffer size",
Value: 500,
Destination: &args.InfluxDbBuffer,
EnvVars: []string{"FA_INFLUXDB_BUFFER"},
},
}
app.Action = func(c *cli.Context) error {
archiverCtx.tables = map[string]bool{}
logrus.SetLevel(logrus.InfoLevel)
logrus.SetLevel(logrus.DebugLevel)
connPoolConfig := pgx.ConnPoolConfig{
ConnConfig: pgx.ConnConfig{
@ -113,6 +129,11 @@ func main() {
MaxConnections: args.Threads,
}
var mon *Monitoring = nil
if args.InfluxDb != "" {
mon = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer)
}
var err error
pool, err = pgx.NewConnPool(connPoolConfig)
if err != nil {
@ -133,7 +154,7 @@ func main() {
func(message string, key string) error {
table := routingKeyToTable(key[len(args.Pattern)-1:], replacer)
archive(parser, table, message)
archive(parser, table, message, mon)
return nil
},
)
@ -160,7 +181,7 @@ func routingKeyToTable(key string, replacer *strings.Replacer) string {
return table
}
func archive(parser fastjson.Parser, table string, json string) {
func archive(parser fastjson.Parser, table string, json string, mon *Monitoring) {
item, _ := parser.Parse(json)
idValue := item.Get("_id")
@ -188,11 +209,17 @@ func archive(parser fastjson.Parser, table string, json string) {
"id": idValue,
}).Debug("Insert row")
messageSize := len(json)
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String())
if err != nil {
if err.(pgx.PgError).Code != "23505" {
logrus.WithError(err).Error("Error during insert")
} else if mon != nil {
mon.writeMetricUniqueViolation(messageSize, table)
}
} else if mon != nil {
mon.writeMetricInsertRow(messageSize, table)
}
}