Compare commits

...

7 Commits

Author SHA1 Message Date
a7cc28a6ac Cleanup 2021-06-03 16:47:41 -04:00
d670b04d79 Add ES support (beta) and performance improvement 2021-06-03 16:41:21 -04:00
b18a57d256 add script to export to .ndjson 2021-02-07 11:58:36 -05:00
632f05c9ea Fix docker build, tweaks logger 2021-02-06 15:14:03 -05:00
2b25f2afe0 Add influxdb monitoring 2021-02-06 11:11:52 -05:00
e8ea4ff1dd update go version 2021-01-30 20:39:36 -05:00
9ac16ae71e Add optional redis password 2021-01-30 20:34:36 -05:00
10 changed files with 462 additions and 100 deletions

View File

@@ -1,8 +1,8 @@
# Build # Build
FROM golang:1.13 as go_build FROM golang:1.15 as go_build
WORKDIR /build/ WORKDIR /build/
COPY main.go . COPY *.go ./
COPY go.mod . COPY go.mod .
RUN GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o feed_archiver . RUN GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o feed_archiver .
RUN strip feed_archiver RUN strip feed_archiver

74
archive_es.go Normal file
View File

@@ -0,0 +1,74 @@
package main
import (
"context"
"fmt"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/sirupsen/logrus"
"github.com/valyala/fastjson"
"strconv"
"strings"
)
func routingKeyToIndex(key string) string {
if idx := strings.Index(key, "."); idx != -1 {
project := key[:idx]
return fmt.Sprintf("%s-data", project)
}
panic(fmt.Sprintf("Cannot get ES index name for key %s", key))
}
func archiveEs(record *Record) error {
var stringId string
if record.IdType == fastjson.TypeNumber {
stringId = strconv.FormatInt(record.IdValue.(int64), 10)
} else if record.IdType == fastjson.TypeString {
stringId = string(record.IdValue.([]uint8))
}
index := routingKeyToIndex(record.RoutingKey)
// Will cause error in ES if we specify the _id field in the document body
record.Item.Del("_id")
req := esapi.IndexRequest{
Index: index,
DocumentID: stringId,
Body: strings.NewReader(record.Item.String()),
Refresh: "false",
}
res, err := req.Do(context.Background(), es)
if err != nil {
logrus.WithFields(logrus.Fields{
"id": record.IdValue,
"item": record.Item.String(),
}).WithError(err).Error("Request error during document index")
return nil
}
if res.IsError() {
logrus.WithFields(logrus.Fields{
"id": stringId,
"status": res.String(),
"item": record.Item.String(),
}).Error("ES error during document index")
}
logrus.WithFields(logrus.Fields{
"id": stringId,
"index": index,
}).Debug("Insert document")
if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricIndexDoc(index)
}
_ = res.Body.Close()
return nil
}

80
archive_sql.go Normal file
View File

@@ -0,0 +1,80 @@
package main
import (
"fmt"
"github.com/jackc/pgx"
"github.com/sirupsen/logrus"
"github.com/valyala/fastjson"
"strings"
)
func routingKeyToTable(key string, replacer *strings.Replacer) string {
var table string
if idx := strings.LastIndex(key, "."); idx != -1 {
table = key[:idx]
}
table = replacer.Replace(table)
return table
}
func archiveSql(record *Record) error {
table := routingKeyToTable(record.RoutingKey, replacer)
archiverCtx.m.RLock()
_, tableExists := archiverCtx.tables[table]
archiverCtx.m.RUnlock()
if !tableExists {
createTable(table, record.IdType)
}
logrus.WithFields(logrus.Fields{
"table": table,
"id": record.IdValue,
}).Debug("Insert row")
itemString := record.Item.String()
messageSize := len(itemString)
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), record.IdValue, itemString)
if err != nil {
if err.(pgx.PgError).Code != "23505" {
logrus.WithError(err).Error("Error during insert")
} else if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricUniqueViolation(messageSize, table)
}
} else if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricInsertRow(messageSize, table)
}
return nil
}
func createTable(table string, idType fastjson.Type) {
logrus.WithFields(logrus.Fields{
"table": table,
}).Info("Create table (If not exists)")
var err error
var strType string
if idType == fastjson.TypeNumber {
strType = "bigint"
} else {
strType = "bytea"
}
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
"data JSONB NOT NULL"+
")", table, strType))
if err != nil {
logrus.WithError(err).Error("Error during create table")
}
archiverCtx.m.Lock()
archiverCtx.tables[table] = true
archiverCtx.m.Unlock()
}

38
export_to_ndjson.py Normal file
View File

@@ -0,0 +1,38 @@
from hexlib.db import pg_fetch_cursor_all
import psycopg2
from tqdm import tqdm
import orjson
import zstandard as zstd
TABLE = "chan_8kun2_post"
THREADS = 12
if __name__ == '__main__':
conn = psycopg2.connect(
host="",
port="",
user="",
password="",
dbname="feed_archiver"
)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM %s" % TABLE)
row_count = cur.fetchone()[0]
cur.execute("DECLARE cur1 CURSOR FOR SELECT * FROM %s" % TABLE)
rows = pg_fetch_cursor_all(cur, name="cur1", batch_size=5000)
with open("out_mp.ndjson.zst", "wb") as f:
cctx = zstd.ZstdCompressor(level=19, threads=THREADS)
with cctx.stream_writer(f) as compressor:
for row in tqdm(rows, total=row_count, unit="row"):
_id, archived_on, data = row
data["_archived_on"] = int(archived_on.timestamp())
compressor.write(orjson.dumps(data))
compressor.write(b"\n")
conn.close()

2
go.mod
View File

@@ -5,8 +5,10 @@ go 1.13
require ( require (
github.com/cockroachdb/apd v1.1.0 // indirect github.com/cockroachdb/apd v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/elastic/go-elasticsearch/v7 v7.12.0
github.com/go-redis/redis/v8 v8.0.0-beta.2 github.com/go-redis/redis/v8 v8.0.0-beta.2
github.com/gofrs/uuid v3.2.0+incompatible // indirect github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgx v3.6.2+incompatible github.com/jackc/pgx v3.6.2+incompatible
github.com/lib/pq v1.2.0 // indirect github.com/lib/pq v1.2.0 // indirect

4
go.sum
View File

@@ -16,6 +16,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s=
github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -39,6 +41,8 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=

110
influxdb.go Normal file
View File

@@ -0,0 +1,110 @@
package main
import (
influx "github.com/influxdata/influxdb1-client/v2"
"github.com/sirupsen/logrus"
"time"
)
type Monitoring struct {
influxdb influx.Client
points chan *influx.Point
connString string
bufferSize int
retentionPolicy string
database string
}
func NewMonitoring(connString string, buffer int) *Monitoring {
m := new(Monitoring)
m.bufferSize = buffer
m.points = make(chan *influx.Point, m.bufferSize)
m.retentionPolicy = ""
m.database = "feed_archiver"
m.influxdb, _ = influx.NewHTTPClient(influx.HTTPConfig{
Addr: connString,
})
go m.asyncWriter()
return m
}
func (m *Monitoring) asyncWriter() {
logrus.Info("Started async influxdb writer")
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: m.database,
RetentionPolicy: m.retentionPolicy,
})
for point := range m.points {
bp.AddPoint(point)
if len(bp.Points()) >= m.bufferSize {
m.flushQueue(&bp)
}
}
m.flushQueue(&bp)
}
func (m *Monitoring) flushQueue(bp *influx.BatchPoints) {
err := m.influxdb.Write(*bp)
if err != nil {
logrus.WithError(err).Error("influxdb write")
return
}
logrus.WithFields(logrus.Fields{
"size": len((*bp).Points()),
}).Info("Wrote points")
*bp, _ = influx.NewBatchPoints(influx.BatchPointsConfig{
Database: m.database,
RetentionPolicy: m.retentionPolicy,
})
}
func (m *Monitoring) writeMetricInsertRow(size int, table string) {
point, _ := influx.NewPoint(
"insert_row",
map[string]string{
"table": table,
},
map[string]interface{}{
"size": size,
},
time.Now(),
)
m.points <- point
}
func (m *Monitoring) writeMetricUniqueViolation(size int, table string) {
point, _ := influx.NewPoint(
"unique_violation",
map[string]string{
"table": table,
},
map[string]interface{}{
"size": size,
},
time.Now(),
)
m.points <- point
}
func (m *Monitoring) writeMetricIndexDoc(table string) {
point, _ := influx.NewPoint(
"index_doc",
map[string]string{
"table": table,
},
map[string]interface{}{},
time.Now(),
)
m.points <- point
}

247
main.go
View File

@@ -2,7 +2,7 @@ package main
import ( import (
"context" "context"
"fmt" "github.com/elastic/go-elasticsearch/v7"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/jackc/pgx" "github.com/jackc/pgx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@@ -15,21 +15,40 @@ import (
) )
var archiverCtx struct { var archiverCtx struct {
tables map[string]bool tables map[string]bool
m sync.RWMutex m sync.RWMutex
ArchiveFunc func(*Record) error
Monitoring *Monitoring
esIndex string // TODO: based on routing key?
}
type Record struct {
Item *fastjson.Value
IdValue interface{}
IdType fastjson.Type
RoutingKey string
} }
var pool *pgx.ConnPool var pool *pgx.ConnPool
var es *elasticsearch.Client
var replacer = strings.NewReplacer(".", "_") var replacer = strings.NewReplacer(".", "_")
type FeedArchiverArgs struct { type FeedArchiverArgs struct {
DbHost string DbHost string
DbUser string DbUser string
DbPassword string DbPassword string
DbDatabase string DbDatabase string
RedisAddr string EsHost string
Pattern string EsUser string
Threads int EsPassword string
EsIndex string
ArchiveTarget string
RedisAddr string
RedisPassword string
Pattern string
Threads int
InfluxDb string
InfluxDbBuffer int
} }
func main() { func main() {
@@ -41,7 +60,7 @@ func main() {
Email: "me@simon987.net", Email: "me@simon987.net",
}, },
} }
app.Version = "3.0" app.Version = "4.0"
args := FeedArchiverArgs{} args := FeedArchiverArgs{}
@@ -74,6 +93,13 @@ func main() {
Value: "localhost:6379", Value: "localhost:6379",
EnvVars: []string{"FA_REDIS_ADDR"}, EnvVars: []string{"FA_REDIS_ADDR"},
}, },
&cli.StringFlag{
Name: "redis-password",
Usage: "Redis password",
Destination: &args.RedisPassword,
Value: "",
EnvVars: []string{"FA_REDIS_PASSWORD"},
},
&cli.StringFlag{ &cli.StringFlag{
Name: "pattern", Name: "pattern",
Usage: "redis arc pattern", Usage: "redis arc pattern",
@@ -87,6 +113,57 @@ func main() {
Destination: &args.Threads, Destination: &args.Threads,
EnvVars: []string{"FA_THREADS"}, EnvVars: []string{"FA_THREADS"},
}, },
&cli.StringFlag{
Name: "influxdb",
Usage: "Influxdb connection string",
Value: "",
Destination: &args.InfluxDb,
EnvVars: []string{"FA_INFLUXDB"},
},
&cli.IntFlag{
Name: "influxdb-buffer",
Usage: "Influxdb buffer size",
Value: 500,
Destination: &args.InfluxDbBuffer,
EnvVars: []string{"FA_INFLUXDB_BUFFER"},
},
&cli.StringFlag{
Name: "es-host",
Usage: "Elasticsearch host",
Destination: &args.EsHost,
Value: "http://localhost:9200",
EnvVars: []string{"FA_ES_HOST"},
},
&cli.StringFlag{
Name: "es-user",
Usage: "Elasticsearch username",
Destination: &args.EsUser,
Value: "elastic",
EnvVars: []string{"FA_ES_USER"},
},
&cli.StringFlag{
Name: "es-password",
Usage: "Elasticsearch password",
Destination: &args.EsPassword,
Value: "",
EnvVars: []string{"FA_ES_PASSWORD"},
},
// TODO: Based on routing key?
&cli.StringFlag{
Name: "es-index",
Usage: "Elasticsearch index",
Destination: &args.EsIndex,
Value: "feed_archiver",
EnvVars: []string{"FA_ES_INDEX"},
},
&cli.StringFlag{
Name: "target",
Usage: "Either 'es' or 'sql'",
Destination: &args.ArchiveTarget,
Value: "sql",
EnvVars: []string{"FA_TARGET"},
},
} }
app.Action = func(c *cli.Context) error { app.Action = func(c *cli.Context) error {
@@ -105,15 +182,27 @@ func main() {
MaxConnections: args.Threads, MaxConnections: args.Threads,
} }
var err error if args.ArchiveTarget == "sql" {
pool, err = pgx.NewConnPool(connPoolConfig) var err error
if err != nil { pool, err = pgx.NewConnPool(connPoolConfig)
panic(err) if err != nil {
panic(err)
}
archiverCtx.ArchiveFunc = archiveSql
} else {
es, _ = elasticsearch.NewDefaultClient()
archiverCtx.ArchiveFunc = archiveEs
archiverCtx.esIndex = args.EsIndex
}
archiverCtx.Monitoring = nil
if args.InfluxDb != "" {
archiverCtx.Monitoring = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer)
} }
rdb := redis.NewClient(&redis.Options{ rdb := redis.NewClient(&redis.Options{
Addr: args.RedisAddr, Addr: args.RedisAddr,
Password: "", Password: args.RedisPassword,
DB: 0, DB: 0,
}) })
@@ -124,9 +213,28 @@ func main() {
args.Pattern, args.Pattern,
func(message string, key string) error { func(message string, key string) error {
table := routingKeyToTable(key[len(args.Pattern)-1:], replacer) item, _ := parser.Parse(message)
archive(parser, table, message)
return nil id := item.Get("_id")
if id == nil {
logrus.WithField("json", key).Error("Item with no _id field!")
return nil
}
var idValue interface{}
if id.Type() == fastjson.TypeNumber {
idValue, _ = id.Int64()
} else if id.Type() == fastjson.TypeString {
idValue, _ = id.StringBytes()
}
return archiverCtx.ArchiveFunc(&Record{
Item: item,
IdType: id.Type(),
IdValue: idValue,
RoutingKey: key[len(args.Pattern)-1:],
})
}, },
) )
} }
@@ -143,89 +251,34 @@ func main() {
} }
} }
func routingKeyToTable(key string, replacer *strings.Replacer) string {
var table string
if idx := strings.LastIndex(key, "."); idx != -1 {
table = key[:idx]
}
table = replacer.Replace(table)
return table
}
func archive(parser fastjson.Parser, table string, json string) {
item, _ := parser.Parse(json)
idValue := item.Get("_id")
if idValue == nil {
logrus.WithField("json", string(json)).Error("Item with no _id field!")
return
}
var id interface{}
if idValue.Type() == fastjson.TypeNumber {
id, _ = idValue.Int64()
} else if idValue.Type() == fastjson.TypeString {
id, _ = idValue.StringBytes()
}
archiverCtx.m.RLock()
_, tableExists := archiverCtx.tables[table]
archiverCtx.m.RUnlock()
if !tableExists {
createTable(table, idValue.Type())
}
logrus.WithFields(logrus.Fields{
"table": table,
"id": idValue,
}).Debug("Insert row")
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String())
if err != nil {
if err.(pgx.PgError).Code != "23505" {
logrus.WithError(err).Error("Error during insert")
}
}
}
func createTable(table string, idType fastjson.Type) {
logrus.WithFields(logrus.Fields{
"table": table,
}).Info("Create table (If not exists)")
var err error
var strType string
if idType == fastjson.TypeNumber {
strType = "bigint"
} else {
strType = "bytea"
}
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
"data JSONB NOT NULL"+
")", table, strType))
if err != nil {
logrus.WithError(err).Error("Error during create table")
}
archiverCtx.m.Lock()
archiverCtx.tables[table] = true
archiverCtx.m.Unlock()
}
var keyCache []string = nil var keyCache []string = nil
// BLPOP with too many keys is slow!
const maxKeys = 30
func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string { func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string {
if keyCache == nil { if keyCache == nil {
keys, err := rdb.Keys(ctx, pattern).Result() var cur uint64 = 0
if err != nil { var keyRes []string
logrus.WithField("Pattern", pattern).Error("Could not get keys for Pattern") var keys []string
return nil var err error
for {
keyRes, cur, err = rdb.Scan(ctx, cur, pattern, 10).Result()
if err != nil {
logrus.
WithError(err).
WithField("Pattern", pattern).
Error("Could not get keys for Pattern")
return nil
}
if cur == 0 || len(keys) >= maxKeys {
break
}
keys = append(keys, keyRes...)
} }
keyCache = keys keyCache = keys

View File

@@ -1,3 +1,4 @@
elasticsearch elasticsearch
psycopg2 psycopg2
ujson ujson
git+git://github.com/simon987/hexlib.git