commit 404837e0ab4840af840f44f8de6a9a38a7724859 Author: simon987 Date: Mon Aug 26 21:37:41 2019 -0400 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..caa32e6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea/ +*.iml \ No newline at end of file diff --git a/main.go b/main.go new file mode 100644 index 0000000..31d8fd5 --- /dev/null +++ b/main.go @@ -0,0 +1,170 @@ +package main + +import ( + "fmt" + "github.com/jackc/pgx" + "github.com/sirupsen/logrus" + "github.com/streadway/amqp" + "github.com/valyala/fastjson" + "strings" +) + +var archiverCtx struct { + tables map[string]bool +} + +var pool *pgx.ConnPool +var replacer = strings.NewReplacer(".", "_") +var jsonParser fastjson.Parser + + +func main() { + + archiverCtx.tables = map[string]bool{} + + logrus.SetLevel(logrus.InfoLevel) + + var err error + + connPoolConfig := pgx.ConnPoolConfig{ + ConnConfig: pgx.ConnConfig{ + Host: "127.0.0.1", + User: "feed_archiver", + Password: "feed_archiver", + Database: "feed_archiver", + }, + MaxConnections: 5, + } + pool, err = pgx.NewConnPool(connPoolConfig) + if err != nil { + logrus.Fatalf("Unable to create connection pool", "error", err) + } + + err = consumeRabbitmqMessage( + "amqp://guest:guest@localhost:5672/", + []string{"reddit", "chan"}, + func(delivery amqp.Delivery) error { + table := routingKeyToTable(delivery.Exchange, delivery.RoutingKey, replacer) + archive(table, delivery.Body) + return nil + }, + ) + + if err != nil { + logrus.WithError(err).Error("Error during insert") + } +} + +func routingKeyToTable(exchange, routingKey string, replacer *strings.Replacer) string { + var table string + if idx := strings.LastIndex(routingKey, "."); idx != -1 { + table = routingKey[:idx] + } + table = replacer.Replace(table) + return exchange + "_" + table +} + +func archive(table string, json []byte) { + + _, tableExists := archiverCtx.tables[table] + if !tableExists { + createTable(table) + } + + v, _ := jsonParser.ParseBytes(json) + id := v.GetInt64("_id") + + logrus.WithFields(logrus.Fields{ + "table": table, + }).Debug("Insert row") + + _, err := pool.Exec(fmt.Sprintf("INSERT INTO %s (id, data) VALUES ($1, $2)", table), id, json) + if err != nil { + logrus.WithError(err).Error("Error during insert") + } +} + +func createTable(table string) { + + logrus.WithFields(logrus.Fields{ + "table": table, + }).Info("Create table (If not exists)") + + var err error + + _, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+ + "id bigint PRIMARY KEY,"+ + "archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"+ + "data JSONB"+ + ")", table)) + + if err != nil { + logrus.WithError(err).Error("Error during create table") + } + + archiverCtx.tables[table] = true +} + +func consumeRabbitmqMessage(host string, exchanges []string, consume func(amqp.Delivery) error) error { + + logrus.Info() + conn, err := amqp.Dial(host) + if err != nil { + return err + } + defer conn.Close() + + ch, err := conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + q, err := ch.QueueDeclare( + "", + false, + true, + true, + false, + nil, + ) + if err != nil { + return err + } + + for _, exchange := range exchanges { + err = ch.QueueBind( + q.Name, + "#", + exchange, + false, + nil) + if err != nil { + return err + } + logrus.WithFields(logrus.Fields{ + "exchange": exchange, + }).Info("Queue bind") + } + + msgs, err := ch.Consume( + q.Name, + "", + true, + false, + false, + false, + nil, + ) + if err != nil { + return err + } + + for d := range msgs { + err := consume(d) + if err != nil { + return err + } + } + return nil +}