mirror of
https://github.com/simon987/feed_archiver.git
synced 2025-04-04 01:52:58 +00:00
314 lines
6.7 KiB
Go
314 lines
6.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"github.com/elastic/go-elasticsearch/v7"
|
|
"github.com/go-redis/redis/v8"
|
|
"github.com/jackc/pgx"
|
|
"github.com/sirupsen/logrus"
|
|
"github.com/urfave/cli/v2"
|
|
"github.com/valyala/fastjson"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var archiverCtx struct {
|
|
tables map[string]bool
|
|
m sync.RWMutex
|
|
ArchiveFunc func(*Record) error
|
|
Monitoring *Monitoring
|
|
esIndex string // TODO: based on routing key?
|
|
}
|
|
|
|
type Record struct {
|
|
Item *fastjson.Value
|
|
IdValue interface{}
|
|
IdType fastjson.Type
|
|
RoutingKey string
|
|
}
|
|
|
|
var pool *pgx.ConnPool
|
|
var es *elasticsearch.Client
|
|
var replacer = strings.NewReplacer(".", "_")
|
|
|
|
type FeedArchiverArgs struct {
|
|
DbHost string
|
|
DbUser string
|
|
DbPassword string
|
|
DbDatabase string
|
|
EsHost string
|
|
EsUser string
|
|
EsPassword string
|
|
EsIndex string
|
|
ArchiveTarget string
|
|
RedisAddr string
|
|
RedisPassword string
|
|
Pattern string
|
|
Threads int
|
|
InfluxDb string
|
|
InfluxDbBuffer int
|
|
}
|
|
|
|
func main() {
|
|
app := cli.NewApp()
|
|
app.Name = "feed_archiver"
|
|
app.Authors = []*cli.Author{
|
|
{
|
|
Name: "simon987",
|
|
Email: "me@simon987.net",
|
|
},
|
|
}
|
|
app.Version = "4.0"
|
|
|
|
args := FeedArchiverArgs{}
|
|
|
|
app.Flags = []cli.Flag{
|
|
&cli.StringFlag{
|
|
Name: "db-host",
|
|
Usage: "Database host",
|
|
Destination: &args.DbHost,
|
|
Value: "127.0.0.1",
|
|
EnvVars: []string{"FA_DB_HOST"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "db-user",
|
|
Usage: "Database user",
|
|
Destination: &args.DbUser,
|
|
Value: "feed_archiver",
|
|
EnvVars: []string{"FA_DB_USER"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "db-password",
|
|
Usage: "Database password",
|
|
Destination: &args.DbPassword,
|
|
Value: "feed_archiver",
|
|
EnvVars: []string{"FA_DB_PASSWORD"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "redis-addr",
|
|
Usage: "Redis addr",
|
|
Destination: &args.RedisAddr,
|
|
Value: "localhost:6379",
|
|
EnvVars: []string{"FA_REDIS_ADDR"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "redis-password",
|
|
Usage: "Redis password",
|
|
Destination: &args.RedisPassword,
|
|
Value: "",
|
|
EnvVars: []string{"FA_REDIS_PASSWORD"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "pattern",
|
|
Usage: "redis arc pattern",
|
|
Destination: &args.Pattern,
|
|
EnvVars: []string{"FA_PATTERN"},
|
|
},
|
|
&cli.IntFlag{
|
|
Name: "threads",
|
|
Usage: "number of threads",
|
|
Value: 5,
|
|
Destination: &args.Threads,
|
|
EnvVars: []string{"FA_THREADS"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "influxdb",
|
|
Usage: "Influxdb connection string",
|
|
Value: "",
|
|
Destination: &args.InfluxDb,
|
|
EnvVars: []string{"FA_INFLUXDB"},
|
|
},
|
|
&cli.IntFlag{
|
|
Name: "influxdb-buffer",
|
|
Usage: "Influxdb buffer size",
|
|
Value: 500,
|
|
Destination: &args.InfluxDbBuffer,
|
|
EnvVars: []string{"FA_INFLUXDB_BUFFER"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "es-host",
|
|
Usage: "Elasticsearch host",
|
|
Destination: &args.EsHost,
|
|
Value: "http://localhost:9200",
|
|
EnvVars: []string{"FA_ES_HOST"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "es-user",
|
|
Usage: "Elasticsearch username",
|
|
Destination: &args.EsUser,
|
|
Value: "elastic",
|
|
EnvVars: []string{"FA_ES_USER"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "es-password",
|
|
Usage: "Elasticsearch password",
|
|
Destination: &args.EsPassword,
|
|
Value: "",
|
|
EnvVars: []string{"FA_ES_PASSWORD"},
|
|
},
|
|
|
|
// TODO: Based on routing key?
|
|
&cli.StringFlag{
|
|
Name: "es-index",
|
|
Usage: "Elasticsearch index",
|
|
Destination: &args.EsIndex,
|
|
Value: "feed_archiver",
|
|
EnvVars: []string{"FA_ES_INDEX"},
|
|
},
|
|
&cli.StringFlag{
|
|
Name: "target",
|
|
Usage: "Either 'es' or 'sql'",
|
|
Destination: &args.ArchiveTarget,
|
|
Value: "sql",
|
|
EnvVars: []string{"FA_TARGET"},
|
|
},
|
|
}
|
|
|
|
app.Action = func(c *cli.Context) error {
|
|
|
|
archiverCtx.tables = map[string]bool{}
|
|
|
|
logrus.SetLevel(logrus.InfoLevel)
|
|
|
|
connPoolConfig := pgx.ConnPoolConfig{
|
|
ConnConfig: pgx.ConnConfig{
|
|
Host: args.DbHost,
|
|
User: args.DbUser,
|
|
Password: args.DbPassword,
|
|
Database: args.DbDatabase,
|
|
},
|
|
MaxConnections: args.Threads,
|
|
}
|
|
|
|
if args.ArchiveTarget == "sql" {
|
|
var err error
|
|
pool, err = pgx.NewConnPool(connPoolConfig)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
archiverCtx.ArchiveFunc = archiveSql
|
|
} else {
|
|
es, _ = elasticsearch.NewDefaultClient()
|
|
archiverCtx.ArchiveFunc = archiveEs
|
|
archiverCtx.esIndex = args.EsIndex
|
|
}
|
|
|
|
archiverCtx.Monitoring = nil
|
|
if args.InfluxDb != "" {
|
|
archiverCtx.Monitoring = NewMonitoring(args.InfluxDb, args.InfluxDbBuffer)
|
|
}
|
|
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: args.RedisAddr,
|
|
Password: args.RedisPassword,
|
|
DB: 0,
|
|
})
|
|
|
|
for i := 1; i <= args.Threads; i++ {
|
|
var parser fastjson.Parser
|
|
go dispatchFromQueue(
|
|
rdb,
|
|
args.Pattern,
|
|
func(message string, key string) error {
|
|
|
|
item, _ := parser.Parse(message)
|
|
|
|
id := item.Get("_id")
|
|
if id == nil {
|
|
logrus.WithField("json", key).Error("Item with no _id field!")
|
|
return nil
|
|
}
|
|
|
|
var idValue interface{}
|
|
|
|
if id.Type() == fastjson.TypeNumber {
|
|
idValue, _ = id.Int64()
|
|
} else if id.Type() == fastjson.TypeString {
|
|
idValue, _ = id.StringBytes()
|
|
}
|
|
|
|
return archiverCtx.ArchiveFunc(&Record{
|
|
Item: item,
|
|
IdType: id.Type(),
|
|
IdValue: idValue,
|
|
RoutingKey: key[len(args.Pattern)-1:],
|
|
})
|
|
},
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
err := app.Run(os.Args)
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
logrus.Fatal(app.OnUsageError)
|
|
}
|
|
for {
|
|
time.Sleep(time.Second * 30)
|
|
}
|
|
}
|
|
|
|
var keyCache []string = nil
|
|
|
|
// BLPOP with too many keys is slow!
|
|
const maxKeys = 30
|
|
|
|
func getKeys(ctx context.Context, rdb *redis.Client, pattern string) []string {
|
|
|
|
if keyCache == nil {
|
|
var cur uint64 = 0
|
|
var keyRes []string
|
|
var keys []string
|
|
var err error
|
|
|
|
for {
|
|
keyRes, cur, err = rdb.Scan(ctx, cur, pattern, 10).Result()
|
|
if err != nil {
|
|
logrus.
|
|
WithError(err).
|
|
WithField("Pattern", pattern).
|
|
Error("Could not get keys for Pattern")
|
|
return nil
|
|
}
|
|
|
|
if cur == 0 || len(keys) >= maxKeys {
|
|
break
|
|
}
|
|
|
|
keys = append(keys, keyRes...)
|
|
}
|
|
|
|
keyCache = keys
|
|
}
|
|
|
|
return keyCache
|
|
}
|
|
|
|
func dispatchFromQueue(rdb *redis.Client, pattern string, consume func(message string, key string) error) error {
|
|
|
|
ctx := context.Background()
|
|
|
|
for {
|
|
keys := getKeys(ctx, rdb, pattern)
|
|
if len(keys) == 0 {
|
|
time.Sleep(time.Second * 1)
|
|
keyCache = nil
|
|
continue
|
|
}
|
|
|
|
rawTask, err := rdb.BLPop(ctx, time.Second, keys...).Result()
|
|
if err != nil {
|
|
keyCache = nil
|
|
continue
|
|
}
|
|
|
|
err = consume(rawTask[1], rawTask[0])
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
}
|