Add functions to handle routing keys

This commit is contained in:
simon987 2021-10-22 10:36:05 -04:00
parent d85ad919b3
commit 2f6c2822b6
3 changed files with 44 additions and 2 deletions

View File

@ -1,4 +1,5 @@
import json import json
from collections import namedtuple
from functools import partial from functools import partial
from itertools import islice from itertools import islice
from time import sleep, time from time import sleep, time
@ -6,6 +7,34 @@ from time import sleep, time
from orjson import orjson from orjson import orjson
from redis import Redis from redis import Redis
RoutingKeyParts = namedtuple(
"RoutingKeyParts",
["arc_list", "project", "subproject", "type", "category"]
)
def parse_routing_key(key):
tokens = key.split(".")
if len(tokens) == 4:
arc_list, project, type_, category = tokens
return RoutingKeyParts(
arc_list=arc_list,
project=project,
subproject=None,
type=type_,
category=category
)
else:
arc_list, project, subproject, type_, category = tokens
return RoutingKeyParts(
arc_list=arc_list,
project=project,
subproject=subproject,
type=type_,
category=category
)
class MessageQueue: class MessageQueue:
def read_messages(self, topics): def read_messages(self, topics):

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup( setup(
name="hexlib", name="hexlib",
version="1.62", version="1.63",
description="Misc utility methods", description="Misc utility methods",
author="simon987", author="simon987",
author_email="me@simon987.net", author_email="me@simon987.net",

View File

@ -1,7 +1,7 @@
from unittest import TestCase from unittest import TestCase
from hexlib.env import get_redis from hexlib.env import get_redis
from hexlib.mq import RedisMQ from hexlib.mq import RedisMQ, parse_routing_key, RoutingKeyParts
class TestRedisMQ(TestCase): class TestRedisMQ(TestCase):
@ -47,3 +47,16 @@ class TestRedisMQ(TestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
mq.publish({"a": 1}, item_project="test", item_type="msg") mq.publish({"a": 1}, item_project="test", item_type="msg")
class TestRoutingKey(TestCase):
def test1(self):
key = "arc.chan.4chan.post.b"
parts = parse_routing_key(key)
self.assertEqual(parts, RoutingKeyParts("arc", "chan", "4chan", "post", "b"))
def test2(self):
key = "arc.reddit.submission.birdpics"
parts = parse_routing_key(key)
self.assertEqual(parts, RoutingKeyParts("arc", "reddit", None, "submission", "birdpics"))