feed_archiver/influxdb.go

111 lines
2.1 KiB
Go

package main
import (
influx "github.com/influxdata/influxdb1-client/v2"
"github.com/sirupsen/logrus"
"time"
)
type Monitoring struct {
influxdb influx.Client
points chan *influx.Point
connString string
bufferSize int
retentionPolicy string
database string
}
func NewMonitoring(connString string, buffer int) *Monitoring {
m := new(Monitoring)
m.bufferSize = buffer
m.points = make(chan *influx.Point, m.bufferSize)
m.retentionPolicy = ""
m.database = "feed_archiver"
m.influxdb, _ = influx.NewHTTPClient(influx.HTTPConfig{
Addr: connString,
})
go m.asyncWriter()
return m
}
func (m *Monitoring) asyncWriter() {
logrus.Info("Started async influxdb writer")
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: m.database,
RetentionPolicy: m.retentionPolicy,
})
for point := range m.points {
bp.AddPoint(point)
if len(bp.Points()) >= m.bufferSize {
m.flushQueue(&bp)
}
}
m.flushQueue(&bp)
}
func (m *Monitoring) flushQueue(bp *influx.BatchPoints) {
err := m.influxdb.Write(*bp)
if err != nil {
logrus.WithError(err).Error("influxdb write")
return
}
logrus.WithFields(logrus.Fields{
"size": len((*bp).Points()),
}).Info("Wrote points")
*bp, _ = influx.NewBatchPoints(influx.BatchPointsConfig{
Database: m.database,
RetentionPolicy: m.retentionPolicy,
})
}
func (m *Monitoring) writeMetricInsertRow(size int, table string) {
point, _ := influx.NewPoint(
"insert_row",
map[string]string{
"table": table,
},
map[string]interface{}{
"size": size,
},
time.Now(),
)
m.points <- point
}
func (m *Monitoring) writeMetricUniqueViolation(size int, table string) {
point, _ := influx.NewPoint(
"unique_violation",
map[string]string{
"table": table,
},
map[string]interface{}{
"size": size,
},
time.Now(),
)
m.points <- point
}
func (m *Monitoring) writeMetricIndexDoc(table string) {
point, _ := influx.NewPoint(
"index_doc",
map[string]string{
"table": table,
},
map[string]interface{}{},
time.Now(),
)
m.points <- point
}