diff --git a/hexlib/concurrency.py b/hexlib/concurrency.py index ff9745b..e7dcf58 100644 --- a/hexlib/concurrency.py +++ b/hexlib/concurrency.py @@ -6,6 +6,73 @@ from threading import Thread from hexlib.misc import ichunks +class StatelessStreamWorker: + + def __init__(self): + self._q_out = None + + def run(self, q: Queue, q_out: Queue): + + self._q_out: Queue = q_out + + for chunk in queue_iter(q, joinable=False, timeout=10): + self._process_chunk(chunk) + + def _process_chunk(self, chunk): + results = [] + + for item in chunk: + result = self.process(item) + if result is not None: + results.append(result) + + if results: + self._q_out.put(results) + + def process(self, item): + raise NotImplementedError + + +class StatelessStreamProcessor: + def __init__(self, worker_factory, chunk_size=128, processes=1): + self._chunk_size = 128 + self._queue = MPQueue(maxsize=chunk_size) + self._queue_out = MPQueue(maxsize=processes * 2) + self._process_count = processes + self._processes = [] + self._factory = worker_factory + self._workers = [] + + if processes > 1: + for _ in range(processes): + worker = self._factory() + p = Process(target=worker.run, args=(self._queue, self._queue_out)) + p.start() + + self._processes.append(p) + self._workers.append(worker) + else: + self._workers.append(self._factory()) + + def _ingest(self, iterable): + if self._process_count > 1: + for chunk in ichunks(iterable, self._chunk_size): + self._queue.put(chunk) + else: + for item in iterable: + self._workers[0].process(item) + + def ingest(self, iterable): + + ingest_thread = Thread(target=self._ingest, args=(iterable,)) + ingest_thread.start() + + for results in queue_iter(self._queue_out, joinable=False, timeout=10): + yield from results + + ingest_thread.join() + + class StatefulStreamWorker: def __init__(self): @@ -49,7 +116,7 @@ class StatefulStreamProcessor: else: self._workers.append(self._factory()) - def injest(self, iterable): + def ingest(self, iterable): if self._process_count > 1: for chunk in ichunks(iterable, self._chunk_size): diff --git a/setup.py b/setup.py index 1528ca7..fc520ce 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup( name="hexlib", - version="1.65", + version="1.66", description="Misc utility methods", author="simon987", author_email="me@simon987.net",