From a09e87668e45059360ccb68a8071a24e69ba6718 Mon Sep 17 00:00:00 2001 From: simon987 Date: Thu, 13 Aug 2020 18:54:33 -0400 Subject: [PATCH] Configurable num of goroutines, bug fixes --- Dockerfile | 1 + main.go | 25 ++++++++++++++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index e6105f3..5de39a9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,6 +5,7 @@ WORKDIR /build/ COPY main.go . COPY go.mod . RUN GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o feed_archiver . +RUN strip feed_archiver FROM scratch diff --git a/main.go b/main.go index 4ae1448..e7deaf4 100644 --- a/main.go +++ b/main.go @@ -10,11 +10,13 @@ import ( "github.com/valyala/fastjson" "os" "strings" + "sync" "time" ) var archiverCtx struct { tables map[string]bool + m sync.RWMutex } var pool *pgx.ConnPool @@ -27,6 +29,7 @@ type FeedArchiverArgs struct { DbDatabase string RedisAddr string Pattern string + Threads int } func main() { @@ -77,6 +80,13 @@ func main() { Destination: &args.Pattern, EnvVars: []string{"FA_PATTERN"}, }, + &cli.IntFlag{ + Name: "threads", + Usage: "number of threads", + Value: 5, + Destination: &args.Threads, + EnvVars: []string{"FA_THREADS"}, + }, } app.Action = func(c *cli.Context) error { @@ -92,7 +102,7 @@ func main() { Password: args.DbPassword, Database: args.DbDatabase, }, - MaxConnections: 5, + MaxConnections: args.Threads, } var err error @@ -107,7 +117,7 @@ func main() { DB: 0, }) - for i := 1; i <= 5; i++ { + for i := 1; i <= args.Threads; i++ { var parser fastjson.Parser go dispatchFromQueue( rdb, @@ -158,7 +168,9 @@ func archive(parser fastjson.Parser, table string, json string) { id, _ = idValue.StringBytes() } + archiverCtx.m.RLock() _, tableExists := archiverCtx.tables[table] + archiverCtx.m.RUnlock() if !tableExists { createTable(table, idValue.Type()) } @@ -192,15 +204,17 @@ func createTable(table string, idType fastjson.Type) { _, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+ "id %s PRIMARY KEY,"+ - "archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"+ - "data JSONB"+ + "archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+ + "data JSONB NOT NULL"+ ")", table, strType)) if err != nil { logrus.WithError(err).Error("Error during create table") } + archiverCtx.m.Lock() archiverCtx.tables[table] = true + archiverCtx.m.Unlock() } var keyCache []string = nil @@ -228,10 +242,11 @@ func dispatchFromQueue(rdb *redis.Client, pattern string, consume func(message s keys := getKeys(ctx, rdb, pattern) if len(keys) == 0 { time.Sleep(time.Second * 1) + keyCache = nil continue } - rawTask, err := rdb.BLPop(ctx, time.Second*30, keys...).Result() + rawTask, err := rdb.BLPop(ctx, time.Second, keys...).Result() if err != nil { keyCache = nil continue