Add pgsql wrapper & delitem for persistent state

This commit is contained in:
simon987 2021-02-06 15:41:18 -05:00
parent a2cfab55bc
commit 7d330a0f9f
3 changed files with 101 additions and 2 deletions

View File

@ -1,8 +1,12 @@
import base64
import sqlite3
import traceback
import psycopg2
import redis
import orjson as json
import umsgpack
from psycopg2.errorcodes import UNIQUE_VIOLATION
class PersistentState:
@ -172,6 +176,14 @@ class Table:
args
)
def __delitem__(self, key):
with sqlite3.connect(self._state.dbfile, **self._state.dbargs) as conn:
try:
conn.execute("DELETE FROM %s WHERE id=?" % self._table, (key,))
except sqlite3.OperationalError:
pass
def _sqlite_type(value):
if isinstance(value, int):
@ -213,3 +225,66 @@ def pg_fetch_cursor_all(cur, name, batch_size=1000):
for row in cur:
yield row
break
class PgConn:
"""Wrapper for PostgreSQL connection"""
def __init__(self, logger=None, **kwargs):
self._conn_args = kwargs
self.conn = psycopg2.connect(**kwargs)
self.cur = self.conn.cursor()
self._logger = logger
def __enter__(self):
return self
def exec(self, query_string, args=None):
while True:
try:
if self._logger:
self._logger.debug(query_string)
self._logger.debug("With args " + str(args))
self.cur.execute(query_string, args)
break
except psycopg2.Error as e:
if e.pgcode == UNIQUE_VIOLATION:
break
traceback.print_stack()
self._handle_err(e, query_string, args)
def query(self, query_string, args=None, max_retries=1):
retries = max_retries
while retries > 0:
try:
if self._logger:
self._logger.debug(query_string)
self._logger.debug("With args " + str(args))
self.cur.execute(query_string, args)
res = self.cur.fetchall()
if self._logger:
self._logger.debug("result: " + str(res))
return res
except psycopg2.Error as e:
if e.pgcode == UNIQUE_VIOLATION:
break
self._handle_err(e, query_string, args)
retries -= 1
def _handle_err(self, err, query, args):
if self._logger:
self._logger.warning(
"Error during query '%s' with args %s: %s %s (%s)" % (query, args, type(err), err, err.pgcode))
self.conn = psycopg2.connect(**self._conn_args)
self.cur = self.conn.cursor()
def __exit__(self, type, value, traceback):
try:
self.conn.commit()
self.cur.close()
except:
pass

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name="hexlib",
version="1.26",
version="1.28",
description="Misc utility methods",
author="simon987",
author_email="me@simon987.net",
@ -13,6 +13,6 @@ setup(
]},
install_requires=[
"ImageHash", "influxdb", "siphash", "python-dateutil", "redis", "orjson", "zstandard",
"u-msgpack-python"
"u-msgpack-python", "psycopg2-binary"
]
)

View File

@ -86,3 +86,27 @@ class TestPersistentState(TestCase):
items = list(s["a"].sql("WHERE a=0 ORDER BY id"))
self.assertDictEqual(items[0], s["a"][2])
def test_delitem(self):
s = PersistentState()
s["a"][1] = {"a": True}
del s["a"][1]
self.assertIsNone(s["a"][1])
def test_delitem_nonexistent(self):
s = PersistentState()
s["a"][1] = {"a": True}
del s["a"][456]
self.assertIsNotNone(s["a"][1])
def test_delitem_no_table(self):
s = PersistentState()
try:
del s["a"][456]
except Exception as e:
self.fail(e)