Configurable num of goroutines, bug fixes

This commit is contained in:
simon987 2020-08-13 18:54:33 -04:00
parent df2ce33234
commit a09e87668e
2 changed files with 21 additions and 5 deletions

View File

@ -5,6 +5,7 @@ WORKDIR /build/
COPY main.go . COPY main.go .
COPY go.mod . COPY go.mod .
RUN GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o feed_archiver . RUN GOOS=linux CGO_ENABLED=0 go build -a -installsuffix cgo -o feed_archiver .
RUN strip feed_archiver
FROM scratch FROM scratch

25
main.go
View File

@ -10,11 +10,13 @@ import (
"github.com/valyala/fastjson" "github.com/valyala/fastjson"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
) )
var archiverCtx struct { var archiverCtx struct {
tables map[string]bool tables map[string]bool
m sync.RWMutex
} }
var pool *pgx.ConnPool var pool *pgx.ConnPool
@ -27,6 +29,7 @@ type FeedArchiverArgs struct {
DbDatabase string DbDatabase string
RedisAddr string RedisAddr string
Pattern string Pattern string
Threads int
} }
func main() { func main() {
@ -77,6 +80,13 @@ func main() {
Destination: &args.Pattern, Destination: &args.Pattern,
EnvVars: []string{"FA_PATTERN"}, EnvVars: []string{"FA_PATTERN"},
}, },
&cli.IntFlag{
Name: "threads",
Usage: "number of threads",
Value: 5,
Destination: &args.Threads,
EnvVars: []string{"FA_THREADS"},
},
} }
app.Action = func(c *cli.Context) error { app.Action = func(c *cli.Context) error {
@ -92,7 +102,7 @@ func main() {
Password: args.DbPassword, Password: args.DbPassword,
Database: args.DbDatabase, Database: args.DbDatabase,
}, },
MaxConnections: 5, MaxConnections: args.Threads,
} }
var err error var err error
@ -107,7 +117,7 @@ func main() {
DB: 0, DB: 0,
}) })
for i := 1; i <= 5; i++ { for i := 1; i <= args.Threads; i++ {
var parser fastjson.Parser var parser fastjson.Parser
go dispatchFromQueue( go dispatchFromQueue(
rdb, rdb,
@ -158,7 +168,9 @@ func archive(parser fastjson.Parser, table string, json string) {
id, _ = idValue.StringBytes() id, _ = idValue.StringBytes()
} }
archiverCtx.m.RLock()
_, tableExists := archiverCtx.tables[table] _, tableExists := archiverCtx.tables[table]
archiverCtx.m.RUnlock()
if !tableExists { if !tableExists {
createTable(table, idValue.Type()) createTable(table, idValue.Type())
} }
@ -192,15 +204,17 @@ func createTable(table string, idType fastjson.Type) {
_, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+ _, err = pool.Exec(fmt.Sprintf("CREATE table IF NOT EXISTS %s ("+
"id %s PRIMARY KEY,"+ "id %s PRIMARY KEY,"+
"archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"+ "archived_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"+
"data JSONB"+ "data JSONB NOT NULL"+
")", table, strType)) ")", table, strType))
if err != nil { if err != nil {
logrus.WithError(err).Error("Error during create table") logrus.WithError(err).Error("Error during create table")
} }
archiverCtx.m.Lock()
archiverCtx.tables[table] = true archiverCtx.tables[table] = true
archiverCtx.m.Unlock()
} }
var keyCache []string = nil var keyCache []string = nil
@ -228,10 +242,11 @@ func dispatchFromQueue(rdb *redis.Client, pattern string, consume func(message s
keys := getKeys(ctx, rdb, pattern) keys := getKeys(ctx, rdb, pattern)
if len(keys) == 0 { if len(keys) == 0 {
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
keyCache = nil
continue continue
} }
rawTask, err := rdb.BLPop(ctx, time.Second*30, keys...).Result() rawTask, err := rdb.BLPop(ctx, time.Second, keys...).Result()
if err != nil { if err != nil {
keyCache = nil keyCache = nil
continue continue