ws_feed_adapter/main.go
2020-03-19 11:00:59 -04:00

206 lines
3.6 KiB
Go

package main
import (
"encoding/json"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/urfave/cli/v2"
"log"
"net/http"
"os"
"sync/atomic"
)
type ConnCtx struct {
Exchange string `json:"exchange"`
Topics []string `json:"topics"`
}
var serverCtx struct {
Connections int32
rabbitmqHost string
}
func main() {
logrus.SetLevel(logrus.TraceLevel)
app := cli.NewApp()
app.Name = "ws_feed_adapter"
app.Usage = "Expose RabbitMQ feed to websocket"
app.Version = "1.0"
app.Authors = []*cli.Author{
{
Name: "simon987",
Email: "me@simon987.net",
},
}
var listenAddr string
app.Flags = []cli.Flag{
&cli.StringFlag{
Name: "rabbitmq_host",
Usage: "RabbitMQ Host",
Destination: &serverCtx.rabbitmqHost,
Value: "amqp://guest:guest@localhost:5672/",
EnvVars: []string{"WSA_MQ_CONNSTR"},
},
&cli.StringFlag{
Name: "listen, l",
Usage: "Listen address",
Destination: &listenAddr,
Value: "0.0.0.0:3090",
EnvVars: []string{"WSA_LISTEN"},
},
}
app.Action = func(c *cli.Context) error {
serverCtx.Connections = 0
logrus.WithField("listen", listenAddr).Info("Listening for incoming connections")
http.HandleFunc("/socket", serveWs)
err := http.ListenAndServe(listenAddr, nil)
return err
}
err := app.Run(os.Args)
if err != nil {
logrus.Fatal(err)
}
}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func serveWs(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logrus.WithError(err).Error("upgrade")
return
}
logrus.WithFields(logrus.Fields{
"remote": c.RemoteAddr().String(),
"num": serverCtx.Connections,
}).Info("New connection")
atomic.AddInt32(&serverCtx.Connections, 1)
defer c.Close()
defer atomic.AddInt32(&serverCtx.Connections, -1)
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
return
}
logrus.WithFields(logrus.Fields{
"mt": mt,
"message": string(message),
"remote": c.RemoteAddr().String(),
}).Debug("Received message")
var ctx ConnCtx
err = json.Unmarshal(message, &ctx)
if err != nil {
logrus.WithError(err).Error("unmarshal")
return
}
err = c.WriteMessage(websocket.TextMessage, []byte("{\"msg\": \"acknowledged, starting write loop.\"}"))
if err != nil {
logrus.WithError(err).Error("write")
return
}
err = consumeRabbitmqMessage(&ctx, func(msg amqp.Delivery) error {
err := c.WriteMessage(websocket.TextMessage, msg.Body)
if err != nil {
logrus.WithError(err).Error("write delivery")
}
return err
})
logrus.WithError(err).Error("consume")
}
func consumeRabbitmqMessage(ctx *ConnCtx, consume func(amqp.Delivery) error) error {
logrus.Info()
conn, err := amqp.Dial(serverCtx.rabbitmqHost)
if err != nil {
return err
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
err = ch.ExchangeDeclare(
ctx.Exchange,
"topic",
false,
false,
false,
false,
nil,
)
if err != nil {
return err
}
q, err := ch.QueueDeclare(
"",
false,
true,
true,
false,
nil,
)
if err != nil {
return err
}
for _, topic := range ctx.Topics {
err = ch.QueueBind(
q.Name,
topic,
ctx.Exchange,
false,
nil)
if err != nil {
return err
}
logrus.WithField("topic", topic).Debug("Bound topic")
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
for d := range msgs {
err := consume(d)
if err != nil {
return err
}
}
return nil
}