diff --git a/hexlib/db.py b/hexlib/db.py index 9da6359..0fce394 100644 --- a/hexlib/db.py +++ b/hexlib/db.py @@ -1,8 +1,12 @@ import base64 import sqlite3 +import traceback + +import psycopg2 import redis import orjson as json import umsgpack +from psycopg2.errorcodes import UNIQUE_VIOLATION class PersistentState: @@ -172,6 +176,14 @@ class Table: args ) + def __delitem__(self, key): + with sqlite3.connect(self._state.dbfile, **self._state.dbargs) as conn: + try: + conn.execute("DELETE FROM %s WHERE id=?" % self._table, (key,)) + except sqlite3.OperationalError: + pass + + def _sqlite_type(value): if isinstance(value, int): @@ -213,3 +225,66 @@ def pg_fetch_cursor_all(cur, name, batch_size=1000): for row in cur: yield row break + + +class PgConn: + """Wrapper for PostgreSQL connection""" + + def __init__(self, logger=None, **kwargs): + self._conn_args = kwargs + self.conn = psycopg2.connect(**kwargs) + self.cur = self.conn.cursor() + self._logger = logger + + def __enter__(self): + return self + + def exec(self, query_string, args=None): + while True: + try: + if self._logger: + self._logger.debug(query_string) + self._logger.debug("With args " + str(args)) + + self.cur.execute(query_string, args) + break + except psycopg2.Error as e: + if e.pgcode == UNIQUE_VIOLATION: + break + traceback.print_stack() + self._handle_err(e, query_string, args) + + def query(self, query_string, args=None, max_retries=1): + retries = max_retries + while retries > 0: + try: + if self._logger: + self._logger.debug(query_string) + self._logger.debug("With args " + str(args)) + + self.cur.execute(query_string, args) + res = self.cur.fetchall() + + if self._logger: + self._logger.debug("result: " + str(res)) + + return res + except psycopg2.Error as e: + if e.pgcode == UNIQUE_VIOLATION: + break + self._handle_err(e, query_string, args) + retries -= 1 + + def _handle_err(self, err, query, args): + if self._logger: + self._logger.warning( + "Error during query '%s' with args %s: %s %s (%s)" % (query, args, type(err), err, err.pgcode)) + self.conn = psycopg2.connect(**self._conn_args) + self.cur = self.conn.cursor() + + def __exit__(self, type, value, traceback): + try: + self.conn.commit() + self.cur.close() + except: + pass diff --git a/setup.py b/setup.py index cbabcf9..d81c970 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name="hexlib", - version="1.26", + version="1.28", description="Misc utility methods", author="simon987", author_email="me@simon987.net", @@ -13,6 +13,6 @@ setup( ]}, install_requires=[ "ImageHash", "influxdb", "siphash", "python-dateutil", "redis", "orjson", "zstandard", - "u-msgpack-python" + "u-msgpack-python", "psycopg2-binary" ] ) diff --git a/test/test_PersistantState.py b/test/test_PersistantState.py index 9dc8801..84e7f72 100644 --- a/test/test_PersistantState.py +++ b/test/test_PersistantState.py @@ -86,3 +86,27 @@ class TestPersistentState(TestCase): items = list(s["a"].sql("WHERE a=0 ORDER BY id")) self.assertDictEqual(items[0], s["a"][2]) + + def test_delitem(self): + s = PersistentState() + + s["a"][1] = {"a": True} + del s["a"][1] + + self.assertIsNone(s["a"][1]) + + def test_delitem_nonexistent(self): + s = PersistentState() + + s["a"][1] = {"a": True} + del s["a"][456] + + self.assertIsNotNone(s["a"][1]) + + def test_delitem_no_table(self): + s = PersistentState() + + try: + del s["a"][456] + except Exception as e: + self.fail(e)