Compare commits

...

5 Commits

Author SHA1 Message Date
c560cc2010 tweak StatefulStreamWorker interface 2021-09-19 14:19:17 -04:00
f4a5e6cf53 queue_iter fix 2021-09-19 12:44:49 -04:00
71cd00c063 Add StatfulStreamProcessor 2021-09-19 12:39:57 -04:00
7349c9a5f1 Quick optimisation 2021-09-19 10:57:07 -04:00
d19442b00e Update preprocess: now returns generator objects 2021-09-19 09:35:35 -04:00
7 changed files with 158 additions and 81 deletions

27
bench/text.py Normal file
View File

@ -0,0 +1,27 @@
from timeit import timeit
t = bytes.maketrans(b".,;:\"!?/()|*=>", b" ")
def translate(x: str):
arr = x.encode("utf8")
return arr.translate(t).decode("utf8")
if __name__ == '__main__':
res = timeit(
setup='t = str.maketrans(".,;:\\"!?/()|*=>", " ")',
stmt='x = "Hello, world %123 & *".translate(t)'
)
# 0.865953s
print("translate = %fs" % res)
res = timeit(
setup='from text import translate',
stmt='x = translate("Hello, world %123 & *")'
)
# 0.865953s
print("custom = %fs" % res)

View File

@ -1,9 +1,68 @@
from queue import Queue, Empty
from multiprocessing import Process
from multiprocessing import Queue as MPQueue
from threading import Thread
from hexlib.misc import ichunks
def queue_iter(q: Queue, **get_args):
class StatefulStreamWorker:
def __init__(self):
pass
def run(self, q: Queue):
for chunk in queue_iter(q, joinable=False, timeout=3):
self._process_chunk(chunk)
def _process_chunk(self, chunk):
for item in chunk:
self.process(item)
def process(self, item) -> None:
raise NotImplementedError
def results(self):
raise NotImplementedError
class StatefulStreamProcessor:
def __init__(self, worker_factory, chunk_size=128, processes=1):
self._chunk_size = 128
self._queue = MPQueue(maxsize=chunk_size)
self._process_count = processes
self._processes = []
self._factory = worker_factory
self._workers = []
if processes > 1:
for _ in range(processes):
worker = self._factory()
p = Process(target=worker.run, args=(self._queue,))
p.start()
self._processes.append(p)
self._workers.append(worker)
else:
self._workers.append(self._factory())
def injest(self, iterable):
if self._process_count > 1:
for chunk in ichunks(iterable, self._chunk_size):
self._queue.put(chunk)
for p in self._processes:
p.join()
else:
for item in iterable:
self._workers[0].process(item)
def get_results(self):
for worker in self._workers:
yield worker.results()
def queue_iter(q: Queue, joinable=True, **get_args):
while True:
try:
task = q.get(**get_args)
@ -12,6 +71,7 @@ def queue_iter(q: Queue, **get_args):
break
yield task
if joinable:
q.task_done()
except Empty:
break

View File

@ -62,6 +62,16 @@ COMPRESSION_GZIP = "gz"
COMPRESSION_ZSTD = "zstd"
class NDJsonLine:
__slots__ = "text"
def __init__(self, text):
self.text = text
def json(self):
return json.loads(self.text)
def ndjson_iter(*files, compression=""):
for file in files:
cleanup = None
@ -90,7 +100,6 @@ def ndjson_iter(*files, compression=""):
line_iter.close()
for line in line_iter:
yield json.loads(line)
yield NDJsonLine(line)
if cleanup:
cleanup()

View File

@ -1,4 +1,5 @@
import atexit
import itertools
import os
import sys
import time
@ -33,6 +34,15 @@ def chunks(lst: list, chunk_len: int):
yield lst[i:i + chunk_len]
def ichunks(iterable, chunk_len: int):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, chunk_len))
if not chunk:
break
yield chunk
def rate_limit(per_second):
min_interval = 1.0 / float(per_second)

View File

@ -1,4 +1,5 @@
from functools import partial
from itertools import chain, repeat
from multiprocessing.pool import Pool
import nltk.corpus
@ -36,9 +37,9 @@ def clean_multicore(texts, processes, chunk_size=10000, **kwargs):
def _transform_bigram(ngram_seq, ngrams):
for ngram in ngram_seq:
if ngram in ngrams:
yield "_".join(ngram)
yield ngram[0] + "_" + ngram[1]
ngram_seq.__next__()
next(ngram_seq)
else:
yield ngram[0]
@ -46,23 +47,30 @@ def _transform_bigram(ngram_seq, ngrams):
def _transform_trigram(ngram_seq, ngrams):
for ngram in ngram_seq:
if ngram in ngrams:
# yield ngram[0] + "_" + ngram[1] + "_" + ngram[2]
yield "_".join(ngram)
ngram_seq.__next__()
ngram_seq.__next__()
next(ngram_seq)
next(ngram_seq)
else:
yield ngram[0]
def preprocess(text, lowercase=False, clean_html=False, strip=False, remove_punctuation=False,
remove_stopwords_en=False, lemmatize=False, fix_single_quotes=False, strip_quotes=False,
remove_urls=False, bigrams: set = None, trigrams: set = None, remove_numbers=False):
SINGLE_QUOTES = ("", "`")
SINGLE_QUOTE_TRANS = str.maketrans("".join(SINGLE_QUOTES), "".join(repeat("'", len(SINGLE_QUOTES))))
PUNCTUATION = ".,;:\"!?/()|*=>"
PUNCTUATION_TRANS = str.maketrans(PUNCTUATION, " " * len(PUNCTUATION))
def preprocess(text, lowercase=False, clean_html=False, remove_punctuation=False, remove_stopwords_en=False,
lemmatize=False, fix_single_quotes=False, strip_quotes=False, remove_urls=False, bigrams: set = None,
trigrams: set = None, remove_numbers=False):
if lowercase:
text = text.lower()
if fix_single_quotes:
text = text.replace("`", "'")
text = text.replace("", "'")
text = text.translate(SINGLE_QUOTE_TRANS)
if remove_urls:
text = LINK_RE.sub(" ", text)
@ -79,41 +87,26 @@ def preprocess(text, lowercase=False, clean_html=False, strip=False, remove_punc
pass
if remove_punctuation:
text = PUNCTUATION_RE.sub(" ", text)
text = text.translate(PUNCTUATION_TRANS)
text = WHITESPACE_RE.sub(" ", text)
words = text.split()
if strip_quotes:
words = text.split(" ")
text = " ".join(w.strip("\"'") for w in words)
words = filter(lambda w: w.strip("\"'"), words)
if bigrams:
words = text.split(" ")
words.append("*")
text = " ".join(_transform_bigram(nltk.bigrams(words), bigrams))
words = _transform_bigram(nltk.bigrams(chain(words, ("*",))), bigrams)
if trigrams:
words = text.split(" ")
words.append("*")
words.append("*")
text = " ".join(_transform_trigram(nltk.trigrams(words), trigrams))
if remove_stopwords_en or lemmatize or remove_numbers:
words = text.split(" ")
words = _transform_trigram(nltk.trigrams(chain(words, ("*", "*"))), trigrams)
if remove_numbers:
words = filter(lambda w: not w.isnumeric(), words)
if not lemmatize and not remove_stopwords_en:
text = " ".join(words)
if lemmatize and remove_stopwords_en:
text = " ".join(lemmatizer.lemmatize(w) for w in words if w not in stop_words_en)
elif not lemmatize and remove_stopwords_en:
text = " ".join(w for w in words if w not in stop_words_en)
elif lemmatize and not remove_stopwords_en:
text = " ".join(lemmatizer.lemmatize(w) for w in words)
if lemmatize:
words = map(lambda w: lemmatizer.lemmatize(w), words)
if strip:
text = text.strip()
if remove_stopwords_en:
words = filter(lambda w: w not in stop_words_en, words)
return text
return filter(lambda w: w != "", words)

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup(
name="hexlib",
version="1.50",
version="1.55",
description="Misc utility methods",
author="simon987",
author_email="me@simon987.net",

View File

@ -13,7 +13,7 @@ class TestText(TestCase):
)
expected = ""
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_1(self):
text = "<div>Hello, <strong>world</strong></div>"
@ -23,7 +23,7 @@ class TestText(TestCase):
)
expected = "Hello, world"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_2(self):
text = "<div>Hello, <strong>world</strong></div>"
@ -34,18 +34,7 @@ class TestText(TestCase):
)
expected = "hello, world"
self.assertEqual(cleaned, expected)
def test_html_3(self):
text = "<div>\n Hello, \t\n<strong> world </strong>\n\t</div>"
cleaned = preprocess(
text,
clean_html=True,
lowercase=True,
)
expected = " hello, world "
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_4(self):
text = "<div>\n Hello, \t\n<strong> world </strong>\n\t</div>"
@ -53,11 +42,10 @@ class TestText(TestCase):
text,
clean_html=True,
lowercase=True,
strip=True
)
expected = "hello, world"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_5(self):
text = "<div>\n Hello, \t\n<strong> world </strong>\n\t</div>"
@ -65,12 +53,11 @@ class TestText(TestCase):
text,
clean_html=True,
lowercase=True,
strip=True,
remove_punctuation=True
)
expected = "hello world"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_6(self):
text = "<div>\n Hello, \t\n<strong>a the world </strong>\n\t</div>"
@ -79,12 +66,11 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
remove_stopwords_en=True
)
expected = "hello world"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_7(self):
text = "<div>\n Hello, \t\n<strong>a the worlds </strong>\n\t</div>"
@ -93,13 +79,12 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
remove_stopwords_en=True,
lemmatize=True
)
expected = "hello world"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_8(self):
text = "<div>\n Hello, \t\n<strong>a the worlds! </strong>\n\t</div>"
@ -108,13 +93,12 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
remove_stopwords_en=True,
lemmatize=True
)
expected = "hello world"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_9(self):
text = "<div>\n Hello, \t\n<strong>world! it's it`s </strong>\n\t</div>"
@ -123,13 +107,12 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
lemmatize=True,
fix_single_quotes=True
)
expected = "hello world it's it's"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_single_quote(self):
text = "it's it`s its"
@ -140,7 +123,7 @@ class TestText(TestCase):
)
expected = "it's it's it's"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_10(self):
text = "<div>\n Hello, \t\n<strong>world! it's it`s https://google.ca/test/abc.pdf </strong>\n\t</div>"
@ -149,14 +132,13 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
lemmatize=True,
fix_single_quotes=True,
remove_urls=True
)
expected = "hello world it's it's"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_11(self):
text = "<div>\n Hello, \t\n<strong>world! it's it`s & | </strong>\n\t</div>"
@ -165,7 +147,6 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
lemmatize=True,
fix_single_quotes=True,
remove_stopwords_en=True,
@ -173,7 +154,7 @@ class TestText(TestCase):
)
expected = "hello world"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_no_root(self):
text = "<a href=\"#p217709510\" class=\"quotelink\">&gt;&gt;217709510</a><br>Is there a<wbr>servant that is against civilization and humanity?<br>Literally instant summon."
@ -183,7 +164,6 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
lemmatize=False,
fix_single_quotes=True,
remove_stopwords_en=False,
@ -191,7 +171,7 @@ class TestText(TestCase):
)
expected = "217709510 is there a servant that is against civilization and humanity literally instant summon"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_entity(self):
text = "doesn&#039;t"
@ -201,7 +181,6 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
lemmatize=False,
fix_single_quotes=True,
remove_stopwords_en=False,
@ -209,7 +188,7 @@ class TestText(TestCase):
)
expected = "doesn't"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_html_invalid_attribute(self):
text = '<root><iframe width="560" height="315" src=" " title="youtube video player" frameborder="0" allowfullscreen></iframe></root>'
@ -219,7 +198,6 @@ class TestText(TestCase):
clean_html=True,
lowercase=True,
remove_punctuation=True,
strip=True,
lemmatize=False,
fix_single_quotes=True,
remove_stopwords_en=False,
@ -228,7 +206,7 @@ class TestText(TestCase):
expected = ""
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_bigrams(self):
text = "x A b c d e f g h"
@ -243,7 +221,7 @@ class TestText(TestCase):
)
expected = "x a_b c_d e f_g h"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_trigrams(self):
text = "x A b c d e f g h"
@ -257,7 +235,7 @@ class TestText(TestCase):
)
expected = "x a_b_c d e_f_g h"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)
def test_remove_numbers(self):
text = "Hello1 test1124test 12 1 1111111 world"
@ -268,4 +246,4 @@ class TestText(TestCase):
)
expected = "hello1 test1124test world"
self.assertEqual(cleaned, expected)
self.assertEqual(" ".join(cleaned), expected)