Compare commits

...

2 Commits

Author SHA1 Message Date
d85ad919b3 Add redis MQ, update influxdb monitoring 2021-10-21 20:19:09 -04:00
ed9d148411 Fix tests 2021-10-21 19:52:42 -04:00
10 changed files with 236 additions and 22 deletions

5
.gitignore vendored
View File

@ -1,4 +1,7 @@
*.iml
.idea/
*.db
*.png
*.png
hexlib.egg-info
build/
dist/

View File

@ -36,6 +36,9 @@ class VolatileState:
def __getitem__(self, table):
return RedisTable(self, table, self._sep)
def __delitem__(self, key):
self.rdb.delete(f"{self.prefix}{self._sep}{key}")
class VolatileQueue:
"""Quick and dirty volatile queue-like redis wrapper"""
@ -68,6 +71,9 @@ class VolatileBooleanState:
def __getitem__(self, table):
return RedisBooleanTable(self, table, self._sep)
def __delitem__(self, table):
self.rdb.delete(f"{self.prefix}{self._sep}{table}")
class RedisTable:
def __init__(self, state, table, sep=""):
@ -89,9 +95,9 @@ class RedisTable:
self._state.rdb.hdel(self._key, str(key))
def __iter__(self):
val = self._state.rdb.hgetall(self._key)
if val:
return ((k, umsgpack.loads(v)) for k, v in val.items())
for val in self._state.rdb.hscan(self._key):
if val:
return ((k, umsgpack.loads(v)) for k, v in val.items())
class RedisBooleanTable:
@ -114,7 +120,7 @@ class RedisBooleanTable:
self._state.rdb.srem(self._key, str(key))
def __iter__(self):
return iter(self._state.rdb.smembers(self._key))
yield from self._state.rdb.sscan_iter(self._key)
class Table:

View File

@ -1,22 +1,29 @@
import logging
import traceback
from abc import ABC
from influxdb import InfluxDBClient
from hexlib.misc import buffered
class Monitoring:
def __init__(self, db, host="localhost", logger=logging.getLogger("default"), batch_size=1, flush_on_exit=False):
self._db = db
self._client = InfluxDBClient(host, 8086, "", "", db)
class Monitoring(ABC):
def log(self, points):
raise NotImplementedError()
class BufferedInfluxDBMonitoring(Monitoring):
def __init__(self, db_name, host="localhost", port=8086, logger=None, batch_size=1, flush_on_exit=False):
self._db = db_name
self._client = InfluxDBClient(host, port, "", "", db_name)
self._logger = logger
self._init()
if not self.db_exists(self._db):
self._client.create_database(self._db)
@buffered(batch_size, flush_on_exit)
def log(points):
self._log(points)
self.log = log
def db_exists(self, name):
@ -25,14 +32,16 @@ class Monitoring:
return True
return False
def _init(self):
if not self.db_exists(self._db):
self._client.create_database(self._db)
def log(self, points):
# Is overwritten in __init__()
pass
def _log(self, points):
try:
self._client.write_points(points)
self._logger.debug("InfluxDB: Wrote %d points" % len(points))
if self._logger:
self._logger.debug("InfluxDB: Wrote %d points" % len(points))
except Exception as e:
self._logger.debug(traceback.format_exc())
self._logger.error(str(e))
if self._logger:
self._logger.debug(traceback.format_exc())
self._logger.error(str(e))

132
hexlib/mq.py Normal file
View File

@ -0,0 +1,132 @@
import json
from functools import partial
from itertools import islice
from time import sleep, time
from orjson import orjson
from redis import Redis
class MessageQueue:
def read_messages(self, topics):
raise NotImplementedError()
def publish(self, item, item_project, item_type, item_subproject, item_category):
raise NotImplementedError()
class RedisMQ(MessageQueue):
_MAX_KEYS = 30
def __init__(self, rdb, consumer_name="redis_mq", sep=".", max_pending_time=120, logger=None, publish_channel=None,
arc_lists=None, wait=1):
self._rdb: Redis = rdb
self._key_cache = None
self._consumer_id = consumer_name
self._pending_list = f"pending{sep}{consumer_name}"
self._max_pending_time = max_pending_time
self._logger = logger
self._publish_channel = publish_channel
self._arc_lists = arc_lists
self._wait = wait
def _get_keys(self, pattern):
if self._key_cache:
return self._key_cache
keys = list(islice(
self._rdb.scan_iter(match=pattern, count=RedisMQ._MAX_KEYS), RedisMQ._MAX_KEYS
))
self._key_cache = keys
return keys
def _get_pending_tasks(self):
for task_id, pending_task in self._rdb.hscan_iter(self._pending_list):
pending_task_json = orjson.loads(pending_task)
if time() >= pending_task_json["resubmit_at"]:
yield pending_task_json["topic"], pending_task_json["task"], partial(self._ack, task_id)
def _ack(self, task_id):
self._rdb.hdel(self._pending_list, task_id)
def read_messages(self, topics):
"""
Assumes json-encoded tasks with an _id field
Tasks are automatically put into a pending list until ack() is called.
When a task has been in the pending list for at least max_pending_time seconds, it
gets submitted again
"""
assert len(topics) == 1, "RedisMQ only supports 1 topic pattern"
pattern = topics[0]
counter = 0
if self._logger:
self._logger.info(f"MQ>Listening for new messages in {pattern}")
while True:
counter += 1
if counter % 1000 == 0:
yield from self._get_pending_tasks()
keys = self._get_keys(pattern)
if not keys:
sleep(self._wait)
self._key_cache = None
continue
result = self._rdb.blpop(keys, timeout=1)
if not result:
self._key_cache = None
continue
topic, task = result
task_json = orjson.loads(task)
topic = topic.decode()
if "_id" not in task_json:
raise ValueError(f"Task doesn't have _id field: {task}")
# Immediately put in pending queue
self._rdb.hset(
self._pending_list, task_json["_id"],
orjson.dumps({
"resubmit_at": time() + self._max_pending_time,
"topic": topic,
"task": task_json
})
)
yield topic, task_json, partial(self._ack, task_json["_id"])
def publish(self, item, item_project, item_type, item_subproject=None, item_category="x"):
if "_id" not in item:
raise ValueError("_id field must be set for item")
item = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True)
item_project = item_project.replace(".", "-")
item_subproject = item_subproject.replace(".", "-") if item_subproject else None
item_source = item_project if not item_subproject else f"{item_project}.{item_subproject}"
item_type = item_type.replace(".", "-")
item_category = item_category.replace(".", "-")
# If specified, fan-out to pub/sub channel
if self._publish_channel is not None:
routing_key = f"{self._publish_channel}.{item_source}.{item_type}.{item_category}"
self._rdb.publish(routing_key, item)
# Save to list
for arc_list in self._arc_lists:
routing_key = f"{arc_list}.{item_source}.{item_type}.{item_category}"
self._rdb.lpush(routing_key, item)

View File

@ -126,6 +126,7 @@ def download_file(url, destination, session=None, headers=None, overwrite=False,
"url": url,
"timestamp": datetime.utcnow().replace(microsecond=0).isoformat()
}))
r.close()
break
except Exception as e:
if err_cb:
@ -142,7 +143,7 @@ class Web:
self._logger = logger
self._current_req = None
if retry_codes is None or not retry_codes:
retry_codes = {502, 504, 520, 522, 524, 429}
retry_codes = {500, 502, 504, 520, 522, 524, 429}
self._retry_codes = retry_codes
if session is None:

View File

@ -14,6 +14,7 @@ setup(
install_requires=[
"ImageHash", "influxdb", "siphash", "python-dateutil", "redis", "orjson", "zstandard",
"u-msgpack-python", "psycopg2-binary", "bs4", "lxml", "nltk", "numpy",
"matplotlib", "scikit-learn", "fake-useragent @ git+git://github.com/Jordan9675/fake-useragent"
"matplotlib", "scikit-learn", "fake-useragent @ git+git://github.com/Jordan9675/fake-useragent",
"requests"
]
)

0
test/__init__.py Normal file
View File

View File

@ -1,10 +1,15 @@
from unittest import TestCase
from hexlib.db import VolatileState, VolatileBooleanState, VolatileQueue
from hexlib.env import get_redis
class TestVolatileState(TestCase):
def setUp(self) -> None:
rdb = get_redis()
rdb.delete("test1a", "test1b", "test1c", "test1:a", "test2b")
def test_get_set(self):
s = VolatileState(prefix="test1")
val = {
@ -53,6 +58,10 @@ class TestVolatileState(TestCase):
class TestVolatileBoolState(TestCase):
def setUp(self) -> None:
rdb = get_redis()
rdb.delete("test1a", "test1b", "test1c", "test1:a", "test2b")
def test_get_set(self):
s = VolatileBooleanState(prefix="test1")

View File

@ -2,12 +2,16 @@ import os
from unittest import TestCase
from hexlib.web import download_file
import warnings
class TestDownloadFile(TestCase):
def setUp(self) -> None:
warnings.filterwarnings(action="ignore", category=ResourceWarning)
def test_download_file(self):
download_file("http://ovh.net/files/10Mb.dat", "/tmp/10Mb.dat")
download_file("https://github.com/simon987/hexlib/raw/master/10MB.bin", "/tmp/10Mb.dat")
self.assertTrue(os.path.exists("/tmp/10Mb.dat"))
os.remove("/tmp/10Mb.dat")
@ -22,8 +26,8 @@ class TestDownloadFile(TestCase):
self.assertEqual(len(exceptions), 3)
def test_download_file_meta(self):
download_file("http://ovh.net/files/10Mb.dat", "/tmp/10Mb.dat", save_meta=True)
download_file("https://github.com/simon987/hexlib/raw/master/10MB.bin", "/tmp/10Mb.dat", save_meta=True)
self.assertTrue(os.path.exists("/tmp/10Mb.dat"))
self.assertTrue(os.path.exists("/tmp/10Mb.dat.meta"))
os.remove("/tmp/10Mb.dat")
# os.remove("/tmp/10Mb.dat.meta")
os.remove("/tmp/10Mb.dat.meta")

49
test/test_redis_mq.py Normal file
View File

@ -0,0 +1,49 @@
from unittest import TestCase
from hexlib.env import get_redis
from hexlib.mq import RedisMQ
class TestRedisMQ(TestCase):
def setUp(self) -> None:
self.rdb = get_redis()
self.rdb.delete("pending.test", "test_mq", "arc.test.msg.x")
def test_ack(self):
mq = RedisMQ(self.rdb, consumer_name="test", max_pending_time=2, arc_lists=["arc"])
mq.publish({"_id": 1}, item_project="test", item_type="msg")
topic1, msg1, ack1 = next(mq.read_messages(topics=["arc.*"]))
self.assertEqual(self.rdb.hlen("pending.test"), 1)
ack1()
self.assertEqual(self.rdb.hlen("pending.test"), 0)
def test_pending_timeout(self):
mq = RedisMQ(self.rdb, consumer_name="test", max_pending_time=0.5, arc_lists=["arc"], wait=0)
mq.publish({"_id": 1}, item_project="test", item_type="msg")
topic1, msg1, ack1 = next(mq.read_messages(topics=["arc.test.*"]))
self.assertEqual(self.rdb.hlen("pending.test"), 1)
# msg1 will timeout after 0.5s, next iteration takes ceil(0.5)s
topic1_, msg1_, ack1_ = next(mq.read_messages(topics=["arc.test.*"]))
self.assertEqual(self.rdb.hlen("pending.test"), 1)
ack1_()
self.assertEqual(self.rdb.hlen("pending.test"), 0)
self.assertEqual(msg1, msg1_)
def test_no_id_field(self):
mq = RedisMQ(self.rdb, consumer_name="test", max_pending_time=0.5, arc_lists=["arc"], wait=0)
with self.assertRaises(ValueError):
mq.publish({"a": 1}, item_project="test", item_type="msg")