mirror of
https://github.com/simon987/feed_archiver.git
synced 2025-12-20 09:05:57 +00:00
Compare commits
7 Commits
a09e87668e
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| a7cc28a6ac | |||
| d670b04d79 | |||
| b18a57d256 | |||
| 632f05c9ea | |||
| 2b25f2afe0 | |||
| e8ea4ff1dd | |||
| 9ac16ae71e |
@@ -1,8 +1,8 @@
|
|||||||
# Build
|
# Build
|
||||||
FROM golang:1.13 as go_build
|
FROM golang:1.15 as go_build
|
||||||
WORKDIR /build/
|
WORKDIR /build/
|
||||||
|
|
||||||
COPY main.go .
|
COPY *.go ./
|
||||||
COPY go.mod .
|
COPY go.mod .
|
||||||
RUN GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o feed_archiver .
|
RUN GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o feed_archiver .
|
||||||
RUN strip feed_archiver
|
RUN strip feed_archiver
|
||||||
|
|||||||
74
archive_es.go
Normal file
74
archive_es.go
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/valyala/fastjson"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func routingKeyToIndex(key string) string {
|
||||||
|
|
||||||
|
if idx := strings.Index(key, "."); idx != -1 {
|
||||||
|
project := key[:idx]
|
||||||
|
return fmt.Sprintf("%s-data", project)
|
||||||
|
}
|
||||||
|
|
||||||
|
panic(fmt.Sprintf("Cannot get ES index name for key %s", key))
|
||||||
|
}
|
||||||
|
|
||||||
|
func archiveEs(record *Record) error {
|
||||||
|
|
||||||
|
var stringId string
|
||||||
|
|
||||||
|
if record.IdType == fastjson.TypeNumber {
|
||||||
|
stringId = strconv.FormatInt(record.IdValue.(int64), 10)
|
||||||
|
} else if record.IdType == fastjson.TypeString {
|
||||||
|
stringId = string(record.IdValue.([]uint8))
|
||||||
|
}
|
||||||
|
|
||||||
|
index := routingKeyToIndex(record.RoutingKey)
|
||||||
|
|
||||||
|
// Will cause error in ES if we specify the _id field in the document body
|
||||||
|
record.Item.Del("_id")
|
||||||
|
|
||||||
|
req := esapi.IndexRequest{
|
||||||
|
Index: index,
|
||||||
|
DocumentID: stringId,
|
||||||
|
Body: strings.NewReader(record.Item.String()),
|
||||||
|
Refresh: "false",
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := req.Do(context.Background(), es)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"id": record.IdValue,
|
||||||
|
"item": record.Item.String(),
|
||||||
|
}).WithError(err).Error("Request error during document index")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.IsError() {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"id": stringId,
|
||||||
|
"status": res.String(),
|
||||||
|
"item": record.Item.String(),
|
||||||
|
}).Error("ES error during document index")
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"id": stringId,
|
||||||
|
"index": index,
|
||||||
|
}).Debug("Insert document")
|
||||||
|
|
||||||
|
if archiverCtx.Monitoring != nil {
|
||||||
|
archiverCtx.Monitoring.writeMetricIndexDoc(index)
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = res.Body.Close()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
80
archive_sql.go
Normal file
80
archive_sql.go
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/jackc/pgx"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/valyala/fastjson"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func routingKeyToTable(key string, replacer *strings.Replacer) string {
|
||||||
|
var table string
|
||||||
|
if idx := strings.LastIndex(key, "."); idx != -1 {
|
||||||
|
table = key[:idx]
|
||||||
|
}
|
||||||
|
table = replacer.Replace(table)
|
||||||
|
return table
|
||||||
|
}
|
||||||
|
|
||||||
|
func archiveSql(record *Record) error {
|
||||||
|
|
||||||
|
table := routingKeyToTable(record.RoutingKey, replacer)
|
||||||
|
archiverCtx.m.RLock()
|
||||||
|
_, tableExists := archiverCtx.tables[table]
|
||||||
|
archiverCtx.m.RUnlock()
|
||||||
|
|
||||||
|
if !tableExists {
|
||||||
|
createTable(table, record.IdType)
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"table": table,
|
||||||
|
"id": record.IdValue,
|
||||||
|
}).Debug("Insert row")
|
||||||
|
|
||||||
|
itemString := record.Item.String()
|
||||||
|
messageSize := len(itemString)
|
||||||
|
|
||||||
|
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), record.IdValue, itemString)
|
||||||
|
if err != nil {
|
||||||
|
if err.(pgx.PgError).Code != "23505" {
|
||||||
|
logrus.WithError(err).Error("Error during insert")
|
||||||
|
} else if archiverCtx.Monitoring != nil {
|
||||||
|
archiverCtx.Monitoring.writeMetricUniqueViolation(messageSize, table)
|
||||||
|
}
|
||||||
|
} else if archiverCtx.Monitoring != nil {
|
||||||
|
archiverCtx.Monitoring.writeMetricInsertRow(messageSize, table)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTable(table string, idType fastjson.Type) {
|
||||||
|
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"table": table,
|
||||||
|
}).Info("Create table (If not exists)")
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var strType string
|
||||||
|
if idType == fastjson.TypeNumber {
|
||||||
|
strType = "bigint"
|
||||||
|
} else {
|
||||||
|
strType = "bytea"
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
|
||||||
|
"id %s PRIMARY KEY,"+
|
||||||
|
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
|
||||||
|
"data JSONB NOT NULL"+
|
||||||
|
")", table, strType))
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("Error during create table")
|
||||||
|
}
|
||||||
|
|
||||||
|
archiverCtx.m.Lock()
|
||||||
|
archiverCtx.tables[table] = true
|
||||||
|
archiverCtx.m.Unlock()
|
||||||
|
}
|
||||||
38
export_to_ndjson.py
Normal file
38
export_to_ndjson.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
from hexlib.db import pg_fetch_cursor_all
|
||||||
|
import psycopg2
|
||||||
|
from tqdm import tqdm
|
||||||
|
import orjson
|
||||||
|
import zstandard as zstd
|
||||||
|
|
||||||
|
TABLE = "chan_8kun2_post"
|
||||||
|
THREADS = 12
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
conn = psycopg2.connect(
|
||||||
|
host="",
|
||||||
|
port="",
|
||||||
|
user="",
|
||||||
|
password="",
|
||||||
|
dbname="feed_archiver"
|
||||||
|
)
|
||||||
|
|
||||||
|
cur = conn.cursor()
|
||||||
|
|
||||||
|
cur.execute("SELECT COUNT(*) FROM %s" % TABLE)
|
||||||
|
row_count = cur.fetchone()[0]
|
||||||
|
|
||||||
|
cur.execute("DECLARE cur1 CURSOR FOR SELECT * FROM %s" % TABLE)
|
||||||
|
|
||||||
|
rows = pg_fetch_cursor_all(cur, name="cur1", batch_size=5000)
|
||||||
|
|
||||||
|
with open("out_mp.ndjson.zst", "wb") as f:
|
||||||
|
cctx = zstd.ZstdCompressor(level=19, threads=THREADS)
|
||||||
|
with cctx.stream_writer(f) as compressor:
|
||||||
|
for row in tqdm(rows, total=row_count, unit="row"):
|
||||||
|
_id, archived_on, data = row
|
||||||
|
data["_archived_on"] = int(archived_on.timestamp())
|
||||||
|
compressor.write(orjson.dumps(data))
|
||||||
|
compressor.write(b"\n")
|
||||||
|
|
||||||
|
conn.close()
|
||||||
2
go.mod
2
go.mod
@@ -5,8 +5,10 @@ go 1.13
|
|||||||
require (
|
require (
|
||||||
github.com/cockroachdb/apd v1.1.0 // indirect
|
github.com/cockroachdb/apd v1.1.0 // indirect
|
||||||
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
|
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
|
||||||
|
github.com/elastic/go-elasticsearch/v7 v7.12.0
|
||||||
github.com/go-redis/redis/v8 v8.0.0-beta.2
|
github.com/go-redis/redis/v8 v8.0.0-beta.2
|
||||||
github.com/gofrs/uuid v3.2.0+incompatible // indirect
|
github.com/gofrs/uuid v3.2.0+incompatible // indirect
|
||||||
|
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
|
||||||
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
|
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
|
||||||
github.com/jackc/pgx v3.6.2+incompatible
|
github.com/jackc/pgx v3.6.2+incompatible
|
||||||
github.com/lib/pq v1.2.0 // indirect
|
github.com/lib/pq v1.2.0 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -16,6 +16,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
|
|||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/elastic/go-elasticsearch/v7 v7.12.0 h1:j4tvcMrZJLp39L2NYvBb7f+lHKPqPHSL3nvB8+/DV+s=
|
||||||
|
github.com/elastic/go-elasticsearch/v7 v7.12.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||||
@@ -39,6 +41,8 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
|
|||||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||||
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
||||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||||
|
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig=
|
||||||
|
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
|
||||||
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
|
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc=
|
||||||
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
|
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
|
||||||
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=
|
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=
|
||||||
|
|||||||
110
influxdb.go
Normal file
110
influxdb.go
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
influx "github.com/influxdata/influxdb1-client/v2"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Monitoring struct {
|
||||||
|
influxdb influx.Client
|
||||||
|
points chan *influx.Point
|
||||||
|
connString string
|
||||||
|
bufferSize int
|
||||||
|
retentionPolicy string
|
||||||
|
database string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMonitoring(connString string, buffer int) *Monitoring {
|
||||||
|
m := new(Monitoring)
|
||||||
|
m.bufferSize = buffer
|
||||||
|
m.points = make(chan *influx.Point, m.bufferSize)
|
||||||
|
m.retentionPolicy = ""
|
||||||
|
m.database = "feed_archiver"
|
||||||
|
m.influxdb, _ = influx.NewHTTPClient(influx.HTTPConfig{
|
||||||
|
Addr: connString,
|
||||||
|
})
|
||||||
|
go m.asyncWriter()
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitoring) asyncWriter() {
|
||||||
|
|
||||||
|
logrus.Info("Started async influxdb writer")
|
||||||
|
|
||||||
|
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
|
||||||
|
Database: m.database,
|
||||||
|
RetentionPolicy: m.retentionPolicy,
|
||||||
|
})
|
||||||
|
|
||||||
|
for point := range m.points {
|
||||||
|
bp.AddPoint(point)
|
||||||
|
|
||||||
|
if len(bp.Points()) >= m.bufferSize {
|
||||||
|
m.flushQueue(&bp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.flushQueue(&bp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitoring) flushQueue(bp *influx.BatchPoints) {
|
||||||
|
|
||||||
|
err := m.influxdb.Write(*bp)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).Error("influxdb write")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"size": len((*bp).Points()),
|
||||||
|
}).Info("Wrote points")
|
||||||
|
|
||||||
|
*bp, _ = influx.NewBatchPoints(influx.BatchPointsConfig{
|
||||||
|
Database: m.database,
|
||||||
|
RetentionPolicy: m.retentionPolicy,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitoring) writeMetricInsertRow(size int, table string) {
|
||||||
|
|
||||||
|
point, _ := influx.NewPoint(
|
||||||
|
"insert_row",
|
||||||
|
map[string]string{
|
||||||
|
"table": table,
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"size": size,
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
m.points <- point
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitoring) writeMetricUniqueViolation(size int, table string) {
|
||||||
|
|
||||||
|
point, _ := influx.NewPoint(
|
||||||
|
"unique_violation",
|
||||||
|
map[string]string{
|
||||||
|
"table": table,
|
||||||
|
},
|
||||||
|
map[string]interface{}{
|
||||||
|
"size": size,
|
||||||
|
},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
m.points <- point
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitoring) writeMetricIndexDoc(table string) {
|
||||||
|
|
||||||
|
point, _ := influx.NewPoint(
|
||||||
|
"index_doc",
|
||||||
|
map[string]string{
|
||||||
|
"table": table,
|
||||||
|
},
|
||||||
|
map[string]interface{}{},
|
||||||
|
time.Now(),
|
||||||
|
)
|
||||||
|
m.points <- point
|
||||||
|
}
|
||||||
215
main.go
215
main.go
@@ -2,7 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"github.com/elastic/go-elasticsearch/v7"
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"github.com/jackc/pgx"
|
"github.com/jackc/pgx"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@@ -17,9 +17,20 @@ import (
|
|||||||
var archiverCtx struct {
|
var archiverCtx struct {
|
||||||
tables map[string]bool
|
tables map[string]bool
|
||||||
m sync.RWMutex
|
m sync.RWMutex
|
||||||
|
ArchiveFunc func(*Record) error
|
||||||
|
Monitoring *Monitoring
|
||||||
|
esIndex string // TODO: based on routing key?
|
||||||
|
}
|
||||||
|
|
||||||
|
type Record struct {
|
||||||
|
Item *fastjson.Value
|
||||||
|
IdValue interface{}
|
||||||
|
IdType fastjson.Type
|
||||||
|
RoutingKey string
|
||||||
}
|
}
|
||||||
|
|
||||||
var pool *pgx.ConnPool
|
var pool *pgx.ConnPool
|
||||||
|
var es *elasticsearch.Client
|
||||||
var replacer = strings.NewReplacer(".", "_")
|
var replacer = strings.NewReplacer(".", "_")
|
||||||
|
|
||||||
type FeedArchiverArgs struct {
|
type FeedArchiverArgs struct {
|
||||||
@@ -27,9 +38,17 @@ type FeedArchiverArgs struct {
|
|||||||
DbUser string
|
DbUser string
|
||||||
DbPassword string
|
DbPassword string
|
||||||
DbDatabase string
|
DbDatabase string
|
||||||
|
EsHost string
|
||||||
|
EsUser string
|
||||||
|
EsPassword string
|
||||||
|
EsIndex string
|
||||||
|
ArchiveTarget string
|
||||||
RedisAddr string
|
RedisAddr string
|
||||||
|
RedisPassword string
|
||||||
Pattern string
|
Pattern string
|
||||||
Threads int
|
Threads int
|
||||||
|
InfluxDb string
|
||||||
|
InfluxDbBuffer int
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -41,7 +60,7 @@ func main() {
|
|||||||
Email: "me@simon987.net",
|
Email: "me@simon987.net",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
app.Version = "3.0"
|
app.Version = "4.0"
|
||||||
|
|
||||||
args := FeedArchiverArgs{}
|
args := FeedArchiverArgs{}
|
||||||
|
|
||||||
@@ -74,6 +93,13 @@ func main() {
|
|||||||
Value: "localhost:6379",
|
Value: "localhost:6379",
|
||||||
EnvVars: []string{"FA_REDIS_ADDR"},
|
EnvVars: []string{"FA_REDIS_ADDR"},
|
||||||
},
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "redis-password",
|
||||||
|
Usage: "Redis password",
|
||||||
|
Destination: &args.RedisPassword,
|
||||||
|
Value: "",
|
||||||
|
EnvVars: []string{"FA_REDIS_PASSWORD"},
|
||||||
|
},
|
||||||
&cli.StringFlag{
|
&cli.StringFlag{
|
||||||
Name: "pattern",
|
Name: "pattern",
|
||||||
Usage: "redis arc pattern",
|
Usage: "redis arc pattern",
|
||||||
@@ -87,6 +113,57 @@ func main() {
|
|||||||
Destination: &args.Threads,
|
Destination: &args.Threads,
|
||||||
EnvVars: []string{"FA_THREADS"},
|
EnvVars: []string{"FA_THREADS"},
|
||||||
},
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "influxdb",
|
||||||
|
Usage: "Influxdb connection string",
|
||||||
|
Value: "",
|
||||||
|
Destination: &args.InfluxDb,
|
||||||
|
EnvVars: []string{"FA_INFLUXDB"},
|
||||||
|
},
|
||||||
|
&cli.IntFlag{
|
||||||
|
Name: "influxdb-buffer",
|
||||||
|
Usage: "Influxdb buffer size",
|
||||||
|
Value: 500,
|
||||||
|
Destination: &args.InfluxDbBuffer,
|
||||||
|
EnvVars: []string{"FA_INFLUXDB_BUFFER"},
|
||||||
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "es-host",
|
||||||
|
Usage: "Elasticsearch host",
|
||||||
|
Destination: &args.EsHost,
|
||||||
|
Value: "http://localhost:9200",
|
||||||
|
EnvVars: []string{"FA_ES_HOST"},
|
||||||
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "es-user",
|
||||||
|
Usage: "Elasticsearch username",
|
||||||
|
Destination: &args.EsUser,
|
||||||
|
Value: "elastic",
|
||||||
|
EnvVars: []string{"FA_ES_USER"},
|
||||||
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "es-password",
|
||||||
|
Usage: "Elasticsearch password",
|
||||||
|
Destination: &args.EsPassword,
|
||||||
|
Value: "",
|
||||||
|
EnvVars: []string{"FA_ES_PASSWORD"},
|
||||||
|
},
|
||||||
|
|
||||||
|
// TODO: Based on routing key?
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "es-index",
|
||||||
|
Usage: "Elasticsearch index",
|
||||||
|
Destination: &args.EsIndex,
|
||||||
|
Value: "feed_archiver",
|
||||||
|
EnvVars: []string{"FA_ES_INDEX"},
|
||||||
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "target",
|
||||||
|
Usage: "Either 'es' or 'sql'",
|
||||||
|
Destination: &args.ArchiveTarget,
|
||||||
|
Value: "sql",
|
||||||
|
EnvVars: []string{"FA_TARGET"},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
app.Action = func(c *cli.Context) error {
|
app.Action = func(c *cli.Context) error {
|
||||||
@@ -105,15 +182,27 @@ func main() {
|
|||||||
MaxConnections: args.Threads,
|
MaxConnections: args.Threads,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if args.ArchiveTarget == "sql" {
|
||||||
var err error
|
var err error
|
||||||
pool, err = pgx.NewConnPool(connPoolConfig)
|
pool, err = pgx.NewConnPool(connPoolConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
archiverCtx.ArchiveFunc = archiveSql
|
||||||
|
} else {
|
||||||
|
es, _ = elasticsearch.NewDefaultClient()
|
||||||
|
archiverCtx.ArchiveFunc = archiveEs
|
||||||
|
archiverCtx.esIndex = args.EsIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
archiverCtx.Monitoring = nil
|
||||||
|
if args.InfluxDb != "" {
|
||||||
|
archiverCtx.Monitoring = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer)
|
||||||
|
}
|
||||||
|
|
||||||
rdb := redis.NewClient(&redis.Options{
|
rdb := redis.NewClient(&redis.Options{
|
||||||
Addr: args.RedisAddr,
|
Addr: args.RedisAddr,
|
||||||
Password: "",
|
Password: args.RedisPassword,
|
||||||
DB: 0,
|
DB: 0,
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -124,9 +213,28 @@ func main() {
|
|||||||
args.Pattern,
|
args.Pattern,
|
||||||
func(message string, key string) error {
|
func(message string, key string) error {
|
||||||
|
|
||||||
table := routingKeyToTable(key[len(args.Pattern)-1:], replacer)
|
item, _ := parser.Parse(message)
|
||||||
archive(parser, table, message)
|
|
||||||
|
id := item.Get("_id")
|
||||||
|
if id == nil {
|
||||||
|
logrus.WithField("json", key).Error("Item with no _id field!")
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var idValue interface{}
|
||||||
|
|
||||||
|
if id.Type() == fastjson.TypeNumber {
|
||||||
|
idValue, _ = id.Int64()
|
||||||
|
} else if id.Type() == fastjson.TypeString {
|
||||||
|
idValue, _ = id.StringBytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
return archiverCtx.ArchiveFunc(&Record{
|
||||||
|
Item: item,
|
||||||
|
IdType: id.Type(),
|
||||||
|
IdValue: idValue,
|
||||||
|
RoutingKey: key[len(args.Pattern)-1:],
|
||||||
|
})
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -143,91 +251,36 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func routingKeyToTable(key string, replacer *strings.Replacer) string {
|
|
||||||
var table string
|
|
||||||
if idx := strings.LastIndex(key, "."); idx != -1 {
|
|
||||||
table = key[:idx]
|
|
||||||
}
|
|
||||||
table = replacer.Replace(table)
|
|
||||||
return table
|
|
||||||
}
|
|
||||||
|
|
||||||
func archive(parser fastjson.Parser, table string, json string) {
|
|
||||||
item, _ := parser.Parse(json)
|
|
||||||
|
|
||||||
idValue := item.Get("_id")
|
|
||||||
if idValue == nil {
|
|
||||||
logrus.WithField("json", string(json)).Error("Item with no _id field!")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var id interface{}
|
|
||||||
if idValue.Type() == fastjson.TypeNumber {
|
|
||||||
id, _ = idValue.Int64()
|
|
||||||
} else if idValue.Type() == fastjson.TypeString {
|
|
||||||
id, _ = idValue.StringBytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
archiverCtx.m.RLock()
|
|
||||||
_, tableExists := archiverCtx.tables[table]
|
|
||||||
archiverCtx.m.RUnlock()
|
|
||||||
if !tableExists {
|
|
||||||
createTable(table, idValue.Type())
|
|
||||||
}
|
|
||||||
|
|
||||||
logrus.WithFields(logrus.Fields{
|
|
||||||
"table": table,
|
|
||||||
"id": idValue,
|
|
||||||
}).Debug("Insert row")
|
|
||||||
|
|
||||||
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String())
|
|
||||||
if err != nil {
|
|
||||||
if err.(pgx.PgError).Code != "23505" {
|
|
||||||
logrus.WithError(err).Error("Error during insert")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func createTable(table string, idType fastjson.Type) {
|
|
||||||
|
|
||||||
logrus.WithFields(logrus.Fields{
|
|
||||||
"table": table,
|
|
||||||
}).Info("Create table (If not exists)")
|
|
||||||
|
|
||||||
var err error
|
|
||||||
var strType string
|
|
||||||
if idType == fastjson.TypeNumber {
|
|
||||||
strType = "bigint"
|
|
||||||
} else {
|
|
||||||
strType = "bytea"
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
|
|
||||||
"id %s PRIMARY KEY,"+
|
|
||||||
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
|
|
||||||
"data JSONB NOT NULL"+
|
|
||||||
")", table, strType))
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logrus.WithError(err).Error("Error during create table")
|
|
||||||
}
|
|
||||||
|
|
||||||
archiverCtx.m.Lock()
|
|
||||||
archiverCtx.tables[table] = true
|
|
||||||
archiverCtx.m.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
var keyCache []string = nil
|
var keyCache []string = nil
|
||||||
|
|
||||||
|
// BLPOP with too many keys is slow!
|
||||||
|
const maxKeys = 30
|
||||||
|
|
||||||
func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string {
|
func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string {
|
||||||
|
|
||||||
if keyCache == nil {
|
if keyCache == nil {
|
||||||
keys, err := rdb.Keys(ctx, pattern).Result()
|
var cur uint64 = 0
|
||||||
|
var keyRes []string
|
||||||
|
var keys []string
|
||||||
|
var err error
|
||||||
|
|
||||||
|
for {
|
||||||
|
keyRes, cur, err = rdb.Scan(ctx, cur, pattern, 10).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithField("Pattern", pattern).Error("Could not get keys for Pattern")
|
logrus.
|
||||||
|
WithError(err).
|
||||||
|
WithField("Pattern", pattern).
|
||||||
|
Error("Could not get keys for Pattern")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cur == 0 || len(keys) >= maxKeys {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
keys = append(keys, keyRes...)
|
||||||
|
}
|
||||||
|
|
||||||
keyCache = keys
|
keyCache = keys
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
elasticsearch
|
elasticsearch
|
||||||
psycopg2
|
psycopg2
|
||||||
ujson
|
ujson
|
||||||
|
git+git://github.com/simon987/hexlib.git
|
||||||
Reference in New Issue
Block a user