Accept arrays instead of single items

This commit is contained in:
simon 2019-12-19 10:19:02 -05:00
parent 404837e0ab
commit 40a6aa0340
3 changed files with 148 additions and 43 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
.idea/ .idea/
*.iml *.iml
feed_archiver

15
go.mod Normal file
View File

@ -0,0 +1,15 @@
module github.com/simon987/feed_archiver
go 1.13
require (
github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgconn v1.1.0
github.com/jackc/pgx v3.6.0+incompatible
github.com/sirupsen/logrus v1.4.2
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/urfave/cli v1.22.2
github.com/urfave/cli/v2 v2.0.0
github.com/valyala/fastjson v1.4.1
)

173
main.go
View File

@ -5,7 +5,9 @@ import (
"github.com/jackc/pgx" "github.com/jackc/pgx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/streadway/amqp" "github.com/streadway/amqp"
"github.com/urfave/cli/v2"
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
"os"
"strings" "strings"
) )
@ -17,41 +19,103 @@ var pool *pgx.ConnPool
var replacer = strings.NewReplacer(".", "_") var replacer = strings.NewReplacer(".", "_")
var jsonParser fastjson.Parser var jsonParser fastjson.Parser
type FeedArchiverArgs struct {
DbHost string
DbUser string
DbPassword string
DbDatabase string
MqConnstring string
Exchanges []string
}
func main() { func main() {
app := cli.NewApp()
archiverCtx.tables = map[string]bool{} app.Name = "feed_archiver"
app.Authors = []*cli.Author{
logrus.SetLevel(logrus.InfoLevel) {
Name: "simon987",
var err error Email: "me@simon987.net",
connPoolConfig := pgx.ConnPoolConfig{
ConnConfig: pgx.ConnConfig{
Host: "127.0.0.1",
User: "feed_archiver",
Password: "feed_archiver",
Database: "feed_archiver",
}, },
MaxConnections: 5,
} }
pool, err = pgx.NewConnPool(connPoolConfig) app.Version = "2.0"
if err != nil {
logrus.Fatalf("Unable to create connection pool", "error", err) args := FeedArchiverArgs{}
app.Flags = []cli.Flag{
&cli.StringFlag{
Name: "db-host",
Usage: "Database host",
Destination: &args.DbHost,
Value: "127.0.0.1",
EnvVars: []string{"FA_DB_HOST"},
},
&cli.StringFlag{
Name: "db-user",
Usage: "Database user",
Destination: &args.DbUser,
Value: "feed_archiver",
EnvVars: []string{"FA_DB_USER"},
},
&cli.StringFlag{
Name: "db-password",
Usage: "Database password",
Destination: &args.DbPassword,
Value: "feed_archiver",
EnvVars: []string{"FA_DB_USER"},
},
&cli.StringFlag{
Name: "mq-connstring",
Usage: "RabbitMQ connection string",
Destination: &args.MqConnstring,
Value: "amqp://guest:guest@localhost:5672/",
EnvVars: []string{"FA_MQ_CONNSTRING"},
},
&cli.StringSliceFlag{
Name: "exchanges",
Usage: "RabbitMQ exchanges",
Required: true,
EnvVars: []string{"FA_EXCHANGES"},
},
} }
err = consumeRabbitmqMessage( app.Action = func(c *cli.Context) error {
"amqp://guest:guest@localhost:5672/",
[]string{"reddit", "chan"},
func(delivery amqp.Delivery) error {
table := routingKeyToTable(delivery.Exchange, delivery.RoutingKey, replacer)
archive(table, delivery.Body)
return nil
},
)
args.Exchanges = c.StringSlice("exchanges")
archiverCtx.tables = map[string]bool{}
logrus.SetLevel(logrus.InfoLevel)
connPoolConfig := pgx.ConnPoolConfig{
ConnConfig: pgx.ConnConfig{
Host: args.DbHost,
User: args.DbUser,
Password: args.DbPassword,
Database: args.DbDatabase,
},
MaxConnections: 5,
}
var err error
pool, err = pgx.NewConnPool(connPoolConfig)
if err != nil {
panic(err)
}
return consumeRabbitmqMessage(
args.MqConnstring,
args.Exchanges,
func(delivery amqp.Delivery) error {
table := routingKeyToTable(delivery.Exchange, delivery.RoutingKey, replacer)
archive(table, delivery.Body)
return nil
},
)
}
err := app.Run(os.Args)
if err != nil { if err != nil {
logrus.WithError(err).Error("Error during insert") logrus.Fatal(app.OnUsageError)
} }
} }
@ -65,38 +129,64 @@ func routingKeyToTable(exchange, routingKey string, replacer *strings.Replacer)
} }
func archive(table string, json []byte) { func archive(table string, json []byte) {
v, _ := jsonParser.ParseBytes(json)
_, tableExists := archiverCtx.tables[table] arr, err := v.Array()
if !tableExists { if err != nil {
createTable(table) logrus.WithField("json", string(json)).Error("Message is not an array!")
return
} }
v, _ := jsonParser.ParseBytes(json) for _, item := range arr {
id := v.GetInt64("_id") idValue := item.Get("_id")
if idValue == nil {
logrus.WithField("json", string(json)).Error("Item with no _id field!")
return
}
logrus.WithFields(logrus.Fields{ var id interface{}
"table": table, if idValue.Type() == fastjson.TypeNumber {
}).Debug("Insert row") id, _ = idValue.Int64()
} else if idValue.Type() == fastjson.TypeString {
id, _ = idValue.StringBytes()
}
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, json) _, tableExists := archiverCtx.tables[table]
if err != nil { if !tableExists {
logrus.WithError(err).Error("Error during insert") createTable(table, idValue.Type())
}
logrus.WithFields(logrus.Fields{
"table": table,
"id": idValue,
}).Debug("Insert row")
_, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String())
if err != nil {
logrus.WithError(err).Error("Error during insert")
}
} }
} }
func createTable(table string) { func createTable(table string, idType fastjson.Type) {
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"table": table, "table": table,
}).Info("Create table (If not exists)") }).Info("Create table (If not exists)")
var err error var err error
var strType string
if idType == fastjson.TypeNumber {
strType = "bigint"
} else {
strType = "bytea"
}
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+ _, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id bigint PRIMARY KEY,"+ "id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"+ "archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"+
"data JSONB"+ "data JSONB"+
")", table)) ")", table, strType))
if err != nil { if err != nil {
logrus.WithError(err).Error("Error during create table") logrus.WithError(err).Error("Error during create table")
@ -107,7 +197,6 @@ func createTable(table string) {
func consumeRabbitmqMessage(host string, exchanges []string, consume func(amqp.Delivery) error) error { func consumeRabbitmqMessage(host string, exchanges []string, consume func(amqp.Delivery) error) error {
logrus.Info()
conn, err := amqp.Dial(host) conn, err := amqp.Dial(host)
if err != nil { if err != nil {
return err return err