From 40a6aa03400077ab7575cbe93cb0cd50f024552c Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 19 Dec 2019 10:19:02 -0500 Subject: [PATCH] Accept arrays instead of single items --- .gitignore | 3 +- go.mod | 15 +++++ main.go | 173 ++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 148 insertions(+), 43 deletions(-) create mode 100644 go.mod diff --git a/.gitignore b/.gitignore index caa32e6..78ca2d7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea/ -*.iml \ No newline at end of file +*.iml +feed_archiver \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..703bd66 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/simon987/feed_archiver + +go 1.13 + +require ( + github.com/gofrs/uuid v3.2.0+incompatible // indirect + github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect + github.com/jackc/pgconn v1.1.0 + github.com/jackc/pgx v3.6.0+incompatible + github.com/sirupsen/logrus v1.4.2 + github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 + github.com/urfave/cli v1.22.2 + github.com/urfave/cli/v2 v2.0.0 + github.com/valyala/fastjson v1.4.1 +) diff --git a/main.go b/main.go index 31d8fd5..019763d 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,9 @@ import ( "github.com/jackc/pgx" "github.com/sirupsen/logrus" "github.com/streadway/amqp" + "github.com/urfave/cli/v2" "github.com/valyala/fastjson" + "os" "strings" ) @@ -17,41 +19,103 @@ var pool *pgx.ConnPool var replacer = strings.NewReplacer(".", "_") var jsonParser fastjson.Parser +type FeedArchiverArgs struct { + DbHost string + DbUser string + DbPassword string + DbDatabase string + MqConnstring string + Exchanges []string +} func main() { - - archiverCtx.tables = map[string]bool{} - - logrus.SetLevel(logrus.InfoLevel) - - var err error - - connPoolConfig := pgx.ConnPoolConfig{ - ConnConfig: pgx.ConnConfig{ - Host: "127.0.0.1", - User: "feed_archiver", - Password: "feed_archiver", - Database: "feed_archiver", + app := cli.NewApp() + app.Name = "feed_archiver" + app.Authors = []*cli.Author{ + { + Name: "simon987", + Email: "me@simon987.net", }, - MaxConnections: 5, } - pool, err = pgx.NewConnPool(connPoolConfig) - if err != nil { - logrus.Fatalf("Unable to create connection pool", "error", err) + app.Version = "2.0" + + args := FeedArchiverArgs{} + + app.Flags = []cli.Flag{ + &cli.StringFlag{ + Name: "db-host", + Usage: "Database host", + Destination: &args.DbHost, + Value: "127.0.0.1", + EnvVars: []string{"FA_DB_HOST"}, + }, + &cli.StringFlag{ + Name: "db-user", + Usage: "Database user", + Destination: &args.DbUser, + Value: "feed_archiver", + EnvVars: []string{"FA_DB_USER"}, + }, + &cli.StringFlag{ + Name: "db-password", + Usage: "Database password", + Destination: &args.DbPassword, + Value: "feed_archiver", + EnvVars: []string{"FA_DB_USER"}, + }, + &cli.StringFlag{ + Name: "mq-connstring", + Usage: "RabbitMQ connection string", + Destination: &args.MqConnstring, + Value: "amqp://guest:guest@localhost:5672/", + EnvVars: []string{"FA_MQ_CONNSTRING"}, + }, + &cli.StringSliceFlag{ + Name: "exchanges", + Usage: "RabbitMQ exchanges", + Required: true, + EnvVars: []string{"FA_EXCHANGES"}, + }, } - err = consumeRabbitmqMessage( - "amqp://guest:guest@localhost:5672/", - []string{"reddit", "chan"}, - func(delivery amqp.Delivery) error { - table := routingKeyToTable(delivery.Exchange, delivery.RoutingKey, replacer) - archive(table, delivery.Body) - return nil - }, - ) + app.Action = func(c *cli.Context) error { + args.Exchanges = c.StringSlice("exchanges") + + archiverCtx.tables = map[string]bool{} + + logrus.SetLevel(logrus.InfoLevel) + + connPoolConfig := pgx.ConnPoolConfig{ + ConnConfig: pgx.ConnConfig{ + Host: args.DbHost, + User: args.DbUser, + Password: args.DbPassword, + Database: args.DbDatabase, + }, + MaxConnections: 5, + } + + var err error + pool, err = pgx.NewConnPool(connPoolConfig) + if err != nil { + panic(err) + } + + return consumeRabbitmqMessage( + args.MqConnstring, + args.Exchanges, + func(delivery amqp.Delivery) error { + table := routingKeyToTable(delivery.Exchange, delivery.RoutingKey, replacer) + archive(table, delivery.Body) + return nil + }, + ) + } + + err := app.Run(os.Args) if err != nil { - logrus.WithError(err).Error("Error during insert") + logrus.Fatal(app.OnUsageError) } } @@ -65,38 +129,64 @@ func routingKeyToTable(exchange, routingKey string, replacer *strings.Replacer) } func archive(table string, json []byte) { + v, _ := jsonParser.ParseBytes(json) - _, tableExists := archiverCtx.tables[table] - if !tableExists { - createTable(table) + arr, err := v.Array() + if err != nil { + logrus.WithField("json", string(json)).Error("Message is not an array!") + return } - v, _ := jsonParser.ParseBytes(json) - id := v.GetInt64("_id") + for _, item := range arr { + idValue := item.Get("_id") + if idValue == nil { + logrus.WithField("json", string(json)).Error("Item with no _id field!") + return + } - logrus.WithFields(logrus.Fields{ - "table": table, - }).Debug("Insert row") + var id interface{} + if idValue.Type() == fastjson.TypeNumber { + id, _ = idValue.Int64() + } else if idValue.Type() == fastjson.TypeString { + id, _ = idValue.StringBytes() + } - _, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, json) - if err != nil { - logrus.WithError(err).Error("Error during insert") + _, tableExists := archiverCtx.tables[table] + if !tableExists { + createTable(table, idValue.Type()) + } + + logrus.WithFields(logrus.Fields{ + "table": table, + "id": idValue, + }).Debug("Insert row") + + _, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, item.String()) + if err != nil { + logrus.WithError(err).Error("Error during insert") + } } } -func createTable(table string) { +func createTable(table string, idType fastjson.Type) { logrus.WithFields(logrus.Fields{ "table": table, }).Info("Create table (If not exists)") var err error + var strType string + if idType == fastjson.TypeNumber { + strType = "bigint" + } else { + strType = "bytea" + } _, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+ - "id bigint PRIMARY KEY,"+ + "id %s PRIMARY KEY,"+ "archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"+ "data JSONB"+ - ")", table)) + ")", table, strType)) if err != nil { logrus.WithError(err).Error("Error during create table") @@ -107,7 +197,6 @@ func createTable(table string) { func consumeRabbitmqMessage(host string, exchanges []string, consume func(amqp.Delivery) error) error { - logrus.Info() conn, err := amqp.Dial(host) if err != nil { return err