feed_archiver/archive_es.go

75 lines
1.6 KiB
Go

package main
import (
"context"
"fmt"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/sirupsen/logrus"
"github.com/valyala/fastjson"
"strconv"
"strings"
)
func routingKeyToIndex(key string) string {
if idx := strings.Index(key, "."); idx != -1 {
project := key[:idx]
return fmt.Sprintf("%s-data", project)
}
panic(fmt.Sprintf("Cannot get ES index name for key %s", key))
}
func archiveEs(record *Record) error {
var stringId string
if record.IdType == fastjson.TypeNumber {
stringId = strconv.FormatInt(record.IdValue.(int64), 10)
} else if record.IdType == fastjson.TypeString {
stringId = string(record.IdValue.([]uint8))
}
index := routingKeyToIndex(record.RoutingKey)
// Will cause error in ES if we specify the _id field in the document body
record.Item.Del("_id")
req := esapi.IndexRequest{
Index: index,
DocumentID: stringId,
Body: strings.NewReader(record.Item.String()),
Refresh: "false",
}
res, err := req.Do(context.Background(), es)
if err != nil {
logrus.WithFields(logrus.Fields{
"id": record.IdValue,
"item": record.Item.String(),
}).WithError(err).Error("Request error during document index")
return nil
}
if res.IsError() {
logrus.WithFields(logrus.Fields{
"id": stringId,
"status": res.String(),
"item": record.Item.String(),
}).Error("ES error during document index")
}
logrus.WithFields(logrus.Fields{
"id": stringId,
"index": index,
}).Debug("Insert document")
if archiverCtx.Monitoring != nil {
archiverCtx.Monitoring.writeMetricIndexDoc(index)
}
_ = res.Body.Close()
return nil
}