Compare commits

..

5 Commits

Author SHA1 Message Date
c560cc2010 tweak StatefulStreamWorker interface 2021-09-19 14:19:17 -04:00
f4a5e6cf53 queue_iter fix 2021-09-19 12:44:49 -04:00
71cd00c063 Add StatfulStreamProcessor 2021-09-19 12:39:57 -04:00
7349c9a5f1 Quick optimisation 2021-09-19 10:57:07 -04:00
d19442b00e Update preprocess: now returns generator objects 2021-09-19 09:35:35 -04:00
7 changed files with 158 additions and 81 deletions

27
bench/text.py Normal file
View File

@ -0,0 +1,27 @@
from timeit import timeit
t = bytes.maketrans(b".,;:\"!?/()|*=>", b" ")
def translate(x: str):
arr = x.encode("utf8")
return arr.translate(t).decode("utf8")
if __name__ == '__main__':
res = timeit(
setup='t = str.maketrans(".,;:\\"!?/()|*=>", " ")',
stmt='x = "Hello, world %123 & *".translate(t)'
)
# 0.865953s
print("translate = %fs" % res)
res = timeit(
setup='from text import translate',
stmt='x = translate("Hello, world %123 & *")'
)
# 0.865953s
print("custom = %fs" % res)

View File

@ -1,9 +1,68 @@
from queue import Queue, Empty from queue import Queue, Empty
from multiprocessing import Process from multiprocessing import Process
from multiprocessing import Queue as MPQueue
from threading import Thread from threading import Thread
from hexlib.misc import ichunks
def queue_iter(q: Queue, **get_args):
class StatefulStreamWorker:
def __init__(self):
pass
def run(self, q: Queue):
for chunk in queue_iter(q, joinable=False, timeout=3):
self._process_chunk(chunk)
def _process_chunk(self, chunk):
for item in chunk:
self.process(item)
def process(self, item) -> None:
raise NotImplementedError
def results(self):
raise NotImplementedError
class StatefulStreamProcessor:
def __init__(self, worker_factory, chunk_size=128, processes=1):
self._chunk_size = 128
self._queue = MPQueue(maxsize=chunk_size)
self._process_count = processes
self._processes = []
self._factory = worker_factory
self._workers = []
if processes > 1:
for _ in range(processes):
worker = self._factory()
p = Process(target=worker.run, args=(self._queue,))
p.start()
self._processes.append(p)
self._workers.append(worker)
else:
self._workers.append(self._factory())
def injest(self, iterable):
if self._process_count > 1:
for chunk in ichunks(iterable, self._chunk_size):
self._queue.put(chunk)
for p in self._processes:
p.join()
else:
for item in iterable:
self._workers[0].process(item)
def get_results(self):
for worker in self._workers:
yield worker.results()
def queue_iter(q: Queue, joinable=True, **get_args):
while True: while True:
try: try:
task = q.get(**get_args) task = q.get(**get_args)
@ -12,7 +71,8 @@ def queue_iter(q: Queue, **get_args):
break break
yield task yield task
q.task_done() if joinable:
q.task_done()
except Empty: except Empty:
break break
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -62,6 +62,16 @@ COMPRESSION_GZIP = "gz"
COMPRESSION_ZSTD = "zstd" COMPRESSION_ZSTD = "zstd"
class NDJsonLine:
__slots__ = "text"
def __init__(self, text):
self.text = text
def json(self):
return json.loads(self.text)
def ndjson_iter(*files, compression=""): def ndjson_iter(*files, compression=""):
for file in files: for file in files:
cleanup = None cleanup = None
@ -90,7 +100,6 @@ def ndjson_iter(*files, compression=""):
line_iter.close() line_iter.close()
for line in line_iter: for line in line_iter:
yield json.loads(line) yield NDJsonLine(line)
if cleanup: if cleanup:
cleanup() cleanup()

View File

@ -1,4 +1,5 @@
import atexit import atexit
import itertools
import os import os
import sys import sys
import time import time
@ -33,6 +34,15 @@ def chunks(lst: list, chunk_len: int):
yield lst[i:i + chunk_len] yield lst[i:i + chunk_len]
def ichunks(iterable, chunk_len: int):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, chunk_len))
if not chunk:
break
yield chunk
def rate_limit(per_second): def rate_limit(per_second):
min_interval = 1.0 / float(per_second) min_interval = 1.0 / float(per_second)

View File

@ -1,4 +1,5 @@
from functools import partial from functools import partial
from itertools import chain, repeat
from multiprocessing.pool import Pool from multiprocessing.pool import Pool
import nltk.corpus import nltk.corpus
@ -36,9 +37,9 @@ def clean_multicore(texts, processes, chunk_size=10000, **kwargs):
def _transform_bigram(ngram_seq, ngrams): def _transform_bigram(ngram_seq, ngrams):
for ngram in ngram_seq: for ngram in ngram_seq:
if ngram in ngrams: if ngram in ngrams:
yield "_".join(ngram) yield ngram[0] + "_" + ngram[1]
ngram_seq.__next__() next(ngram_seq)
else: else:
yield ngram[0] yield ngram[0]
@ -46,23 +47,30 @@ def _transform_bigram(ngram_seq, ngrams):
def _transform_trigram(ngram_seq, ngrams): def _transform_trigram(ngram_seq, ngrams):
for ngram in ngram_seq: for ngram in ngram_seq:
if ngram in ngrams: if ngram in ngrams:
# yield ngram[0] + "_" + ngram[1] + "_" + ngram[2]
yield "_".join(ngram) yield "_".join(ngram)
ngram_seq.__next__() next(ngram_seq)
ngram_seq.__next__() next(ngram_seq)
else: else:
yield ngram[0] yield ngram[0]
def preprocess(text, lowercase=False, clean_html=False, strip=False, remove_punctuation=False, SINGLE_QUOTES = ("", "`")
remove_stopwords_en=False, lemmatize=False, fix_single_quotes=False, strip_quotes=False, SINGLE_QUOTE_TRANS = str.maketrans("".join(SINGLE_QUOTES), "".join(repeat("'", len(SINGLE_QUOTES))))
remove_urls=False, bigrams: set = None, trigrams: set = None, remove_numbers=False):
PUNCTUATION = ".,;:\"!?/()|*=>"
PUNCTUATION_TRANS = str.maketrans(PUNCTUATION, " " * len(PUNCTUATION))
def preprocess(text, lowercase=False, clean_html=False, remove_punctuation=False, remove_stopwords_en=False,
lemmatize=False, fix_single_quotes=False, strip_quotes=False, remove_urls=False, bigrams: set = None,
trigrams: set = None, remove_numbers=False):
if lowercase: if lowercase:
text = text.lower() text = text.lower()
if fix_single_quotes: if fix_single_quotes:
text = text.replace("`", "'") text = text.translate(SINGLE_QUOTE_TRANS)
text = text.replace("", "'")
if remove_urls: if remove_urls:
text = LINK_RE.sub(" ", text) text = LINK_RE.sub(" ", text)
@ -79,41 +87,26 @@ def preprocess(text, lowercase=False, clean_html=False, strip=False, remove_punc
pass pass
if remove_punctuation: if remove_punctuation:
text = PUNCTUATION_RE.sub(" ", text) text = text.translate(PUNCTUATION_TRANS)
text = WHITESPACE_RE.sub(" ", text) words = text.split()
if strip_quotes: if strip_quotes:
words = text.split(" ") words = filter(lambda w: w.strip("\"'"), words)
text = " ".join(w.strip("\"'") for w in words)
if bigrams: if bigrams:
words = text.split(" ") words = _transform_bigram(nltk.bigrams(chain(words, ("*",))), bigrams)
words.append("*")
text = " ".join(_transform_bigram(nltk.bigrams(words), bigrams))
if trigrams: if trigrams:
words = text.split(" ") words = _transform_trigram(nltk.trigrams(chain(words, ("*", "*"))), trigrams)
words.append("*")
words.append("*")
text = " ".join(_transform_trigram(nltk.trigrams(words), trigrams))
if remove_stopwords_en or lemmatize or remove_numbers: if remove_numbers:
words = text.split(" ") words = filter(lambda w: not w.isnumeric(), words)
if remove_numbers: if lemmatize:
words = filter(lambda w: not w.isnumeric(), words) words = map(lambda w: lemmatizer.lemmatize(w), words)
if not lemmatize and not remove_stopwords_en: if remove_stopwords_en:
text = " ".join(words) words = filter(lambda w: w not in stop_words_en, words)
if lemmatize and remove_stopwords_en:
text = " ".join(lemmatizer.lemmatize(w) for w in words if w not in stop_words_en)
elif not lemmatize and remove_stopwords_en:
text = " ".join(w for w in words if w not in stop_words_en)
elif lemmatize and not remove_stopwords_en:
text = " ".join(lemmatizer.lemmatize(w) for w in words)
if strip: return filter(lambda w: w != "", words)
text = text.strip()
return text

View File

@ -2,7 +2,7 @@ from setuptools import setup
setup( setup(
name="hexlib", name="hexlib",
version="1.50", version="1.55",
description="Misc utility methods", description="Misc utility methods",
author="simon987", author="simon987",
author_email="me@simon987.net", author_email="me@simon987.net",

View File

@ -13,7 +13,7 @@ class TestText(TestCase):
) )
expected = "" expected = ""
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_1(self): def test_html_1(self):
text = "<div>Hello, <strong>world</strong></div>" text = "<div>Hello, <strong>world</strong></div>"
@ -23,7 +23,7 @@ class TestText(TestCase):
) )
expected = "Hello, world" expected = "Hello, world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_2(self): def test_html_2(self):
text = "<div>Hello, <strong>world</strong></div>" text = "<div>Hello, <strong>world</strong></div>"
@ -34,18 +34,7 @@ class TestText(TestCase):
) )
expected = "hello, world" expected = "hello, world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_3(self):
text = "<div>\n Hello, \t\n<strong> world </strong>\n\t</div>"
cleaned = preprocess(
text,
clean_html=True,
lowercase=True,
)
expected = " hello, world "
self.assertEqual(cleaned, expected)
def test_html_4(self): def test_html_4(self):
text = "<div>\n Hello, \t\n<strong> world </strong>\n\t</div>" text = "<div>\n Hello, \t\n<strong> world </strong>\n\t</div>"
@ -53,11 +42,10 @@ class TestText(TestCase):
text, text,
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
strip=True
) )
expected = "hello, world" expected = "hello, world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_5(self): def test_html_5(self):
text = "<div>\n Hello, \t\n<strong> world </strong>\n\t</div>" text = "<div>\n Hello, \t\n<strong> world </strong>\n\t</div>"
@ -65,12 +53,11 @@ class TestText(TestCase):
text, text,
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
strip=True,
remove_punctuation=True remove_punctuation=True
) )
expected = "hello world" expected = "hello world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_6(self): def test_html_6(self):
text = "<div>\n Hello, \t\n<strong>a the world </strong>\n\t</div>" text = "<div>\n Hello, \t\n<strong>a the world </strong>\n\t</div>"
@ -79,12 +66,11 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
remove_stopwords_en=True remove_stopwords_en=True
) )
expected = "hello world" expected = "hello world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_7(self): def test_html_7(self):
text = "<div>\n Hello, \t\n<strong>a the worlds </strong>\n\t</div>" text = "<div>\n Hello, \t\n<strong>a the worlds </strong>\n\t</div>"
@ -93,13 +79,12 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
remove_stopwords_en=True, remove_stopwords_en=True,
lemmatize=True lemmatize=True
) )
expected = "hello world" expected = "hello world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_8(self): def test_html_8(self):
text = "<div>\n Hello, \t\n<strong>a the worlds! </strong>\n\t</div>" text = "<div>\n Hello, \t\n<strong>a the worlds! </strong>\n\t</div>"
@ -108,13 +93,12 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
remove_stopwords_en=True, remove_stopwords_en=True,
lemmatize=True lemmatize=True
) )
expected = "hello world" expected = "hello world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_9(self): def test_html_9(self):
text = "<div>\n Hello, \t\n<strong>world! it's it`s </strong>\n\t</div>" text = "<div>\n Hello, \t\n<strong>world! it's it`s </strong>\n\t</div>"
@ -123,13 +107,12 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
lemmatize=True, lemmatize=True,
fix_single_quotes=True fix_single_quotes=True
) )
expected = "hello world it's it's" expected = "hello world it's it's"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_single_quote(self): def test_single_quote(self):
text = "it's it`s its" text = "it's it`s its"
@ -140,7 +123,7 @@ class TestText(TestCase):
) )
expected = "it's it's it's" expected = "it's it's it's"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_10(self): def test_html_10(self):
text = "<div>\n Hello, \t\n<strong>world! it's it`s https://google.ca/test/abc.pdf </strong>\n\t</div>" text = "<div>\n Hello, \t\n<strong>world! it's it`s https://google.ca/test/abc.pdf </strong>\n\t</div>"
@ -149,14 +132,13 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
lemmatize=True, lemmatize=True,
fix_single_quotes=True, fix_single_quotes=True,
remove_urls=True remove_urls=True
) )
expected = "hello world it's it's" expected = "hello world it's it's"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_11(self): def test_html_11(self):
text = "<div>\n Hello, \t\n<strong>world! it's it`s & | </strong>\n\t</div>" text = "<div>\n Hello, \t\n<strong>world! it's it`s & | </strong>\n\t</div>"
@ -165,7 +147,6 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
lemmatize=True, lemmatize=True,
fix_single_quotes=True, fix_single_quotes=True,
remove_stopwords_en=True, remove_stopwords_en=True,
@ -173,7 +154,7 @@ class TestText(TestCase):
) )
expected = "hello world" expected = "hello world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_no_root(self): def test_html_no_root(self):
text = "<a href=\"#p217709510\" class=\"quotelink\">&gt;&gt;217709510</a><br>Is there a<wbr>servant that is against civilization and humanity?<br>Literally instant summon." text = "<a href=\"#p217709510\" class=\"quotelink\">&gt;&gt;217709510</a><br>Is there a<wbr>servant that is against civilization and humanity?<br>Literally instant summon."
@ -183,7 +164,6 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
lemmatize=False, lemmatize=False,
fix_single_quotes=True, fix_single_quotes=True,
remove_stopwords_en=False, remove_stopwords_en=False,
@ -191,7 +171,7 @@ class TestText(TestCase):
) )
expected = "217709510 is there a servant that is against civilization and humanity literally instant summon" expected = "217709510 is there a servant that is against civilization and humanity literally instant summon"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_entity(self): def test_html_entity(self):
text = "doesn&#039;t" text = "doesn&#039;t"
@ -201,7 +181,6 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
lemmatize=False, lemmatize=False,
fix_single_quotes=True, fix_single_quotes=True,
remove_stopwords_en=False, remove_stopwords_en=False,
@ -209,7 +188,7 @@ class TestText(TestCase):
) )
expected = "doesn't" expected = "doesn't"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_html_invalid_attribute(self): def test_html_invalid_attribute(self):
text = '<root><iframe width="560" height="315" src=" " title="youtube video player" frameborder="0" allowfullscreen></iframe></root>' text = '<root><iframe width="560" height="315" src=" " title="youtube video player" frameborder="0" allowfullscreen></iframe></root>'
@ -219,7 +198,6 @@ class TestText(TestCase):
clean_html=True, clean_html=True,
lowercase=True, lowercase=True,
remove_punctuation=True, remove_punctuation=True,
strip=True,
lemmatize=False, lemmatize=False,
fix_single_quotes=True, fix_single_quotes=True,
remove_stopwords_en=False, remove_stopwords_en=False,
@ -228,7 +206,7 @@ class TestText(TestCase):
expected = "" expected = ""
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_bigrams(self): def test_bigrams(self):
text = "x A b c d e f g h" text = "x A b c d e f g h"
@ -243,7 +221,7 @@ class TestText(TestCase):
) )
expected = "x a_b c_d e f_g h" expected = "x a_b c_d e f_g h"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_trigrams(self): def test_trigrams(self):
text = "x A b c d e f g h" text = "x A b c d e f g h"
@ -257,7 +235,7 @@ class TestText(TestCase):
) )
expected = "x a_b_c d e_f_g h" expected = "x a_b_c d e_f_g h"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)
def test_remove_numbers(self): def test_remove_numbers(self):
text = "Hello1 test1124test 12 1 1111111 world" text = "Hello1 test1124test 12 1 1111111 world"
@ -268,4 +246,4 @@ class TestText(TestCase):
) )
expected = "hello1 test1124test world" expected = "hello1 test1124test world"
self.assertEqual(cleaned, expected) self.assertEqual(" ".join(cleaned), expected)