From b18a57d2562b840b4375cbc865fd3014aa2acb2e Mon Sep 17 00:00:00 2001 From: simon987 Date: Sun, 7 Feb 2021 11:58:36 -0500 Subject: [PATCH] add script to export to .ndjson --- export_to_ndjson.py | 38 ++++++++++++++++++++++++++++++++++++++ requirements.txt | 3 ++- 2 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 export_to_ndjson.py diff --git a/export_to_ndjson.py b/export_to_ndjson.py new file mode 100644 index 0000000..ac46579 --- /dev/null +++ b/export_to_ndjson.py @@ -0,0 +1,38 @@ +from hexlib.db import pg_fetch_cursor_all +import psycopg2 +from tqdm import tqdm +import orjson +import zstandard as zstd + +TABLE = "chan_8kun2_post" +THREADS = 12 + +if __name__ == '__main__': + + conn = psycopg2.connect( + host="", + port="", + user="", + password="", + dbname="feed_archiver" + ) + + cur = conn.cursor() + + cur.execute("SELECT COUNT(*) FROM %s" % TABLE) + row_count = cur.fetchone()[0] + + cur.execute("DECLARE cur1 CURSOR FOR SELECT * FROM %s" % TABLE) + + rows = pg_fetch_cursor_all(cur, name="cur1", batch_size=5000) + + with open("out_mp.ndjson.zst", "wb") as f: + cctx = zstd.ZstdCompressor(level=19, threads=THREADS) + with cctx.stream_writer(f) as compressor: + for row in tqdm(rows, total=row_count, unit="row"): + _id, archived_on, data = row + data["_archived_on"] = int(archived_on.timestamp()) + compressor.write(orjson.dumps(data)) + compressor.write(b"\n") + + conn.close() diff --git a/requirements.txt b/requirements.txt index 73f93e3..428b51d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ elasticsearch psycopg2 -ujson \ No newline at end of file +ujson +git+git://github.com/simon987/hexlib.git \ No newline at end of file