mirror of
https://github.com/simon987/feed_archiver.git
synced 2025-04-04 01:52:58 +00:00
111 lines
2.1 KiB
Go
111 lines
2.1 KiB
Go
package main
|
|
|
|
import (
|
|
influx "github.com/influxdata/influxdb1-client/v2"
|
|
"github.com/sirupsen/logrus"
|
|
"time"
|
|
)
|
|
|
|
type Monitoring struct {
|
|
influxdb influx.Client
|
|
points chan *influx.Point
|
|
connString string
|
|
bufferSize int
|
|
retentionPolicy string
|
|
database string
|
|
}
|
|
|
|
func NewMonitoring(connString string, buffer int) *Monitoring {
|
|
m := new(Monitoring)
|
|
m.bufferSize = buffer
|
|
m.points = make(chan *influx.Point, m.bufferSize)
|
|
m.retentionPolicy = ""
|
|
m.database = "feed_archiver"
|
|
m.influxdb, _ = influx.NewHTTPClient(influx.HTTPConfig{
|
|
Addr: connString,
|
|
})
|
|
go m.asyncWriter()
|
|
return m
|
|
}
|
|
|
|
func (m *Monitoring) asyncWriter() {
|
|
|
|
logrus.Info("Started async influxdb writer")
|
|
|
|
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
|
|
Database: m.database,
|
|
RetentionPolicy: m.retentionPolicy,
|
|
})
|
|
|
|
for point := range m.points {
|
|
bp.AddPoint(point)
|
|
|
|
if len(bp.Points()) >= m.bufferSize {
|
|
m.flushQueue(&bp)
|
|
}
|
|
}
|
|
m.flushQueue(&bp)
|
|
}
|
|
|
|
func (m *Monitoring) flushQueue(bp *influx.BatchPoints) {
|
|
|
|
err := m.influxdb.Write(*bp)
|
|
|
|
if err != nil {
|
|
logrus.WithError(err).Error("influxdb write")
|
|
return
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{
|
|
"size": len((*bp).Points()),
|
|
}).Info("Wrote points")
|
|
|
|
*bp, _ = influx.NewBatchPoints(influx.BatchPointsConfig{
|
|
Database: m.database,
|
|
RetentionPolicy: m.retentionPolicy,
|
|
})
|
|
}
|
|
|
|
func (m *Monitoring) writeMetricInsertRow(size int, table string) {
|
|
|
|
point, _ := influx.NewPoint(
|
|
"insert_row",
|
|
map[string]string{
|
|
"table": table,
|
|
},
|
|
map[string]interface{}{
|
|
"size": size,
|
|
},
|
|
time.Now(),
|
|
)
|
|
m.points <- point
|
|
}
|
|
|
|
func (m *Monitoring) writeMetricUniqueViolation(size int, table string) {
|
|
|
|
point, _ := influx.NewPoint(
|
|
"unique_violation",
|
|
map[string]string{
|
|
"table": table,
|
|
},
|
|
map[string]interface{}{
|
|
"size": size,
|
|
},
|
|
time.Now(),
|
|
)
|
|
m.points <- point
|
|
}
|
|
|
|
func (m *Monitoring) writeMetricIndexDoc(table string) {
|
|
|
|
point, _ := influx.NewPoint(
|
|
"index_doc",
|
|
map[string]string{
|
|
"table": table,
|
|
},
|
|
map[string]interface{}{},
|
|
time.Now(),
|
|
)
|
|
m.points <- point
|
|
}
|