From 984388acc7b91edc5610adf1f3d97f843bfbff47 Mon Sep 17 00:00:00 2001 From: simon987 Date: Sat, 30 May 2020 14:51:09 -0400 Subject: [PATCH] replace rabbitMQ with redis --- README.md | 2 +- chan/chan.py | 101 ++++++++++++-------- docker-compose.yml | 227 ++++++++++++++------------------------------- get_8kun_boards.py | 42 +++++++++ post_process.py | 61 +----------- requirements.txt | 6 +- run.py | 174 ++++++++++------------------------ start.sh | 2 + 8 files changed, 231 insertions(+), 384 deletions(-) create mode 100644 get_8kun_boards.py create mode 100755 start.sh diff --git a/README.md b/README.md index 784982e..ec724e1 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ ### chan_feed Daemon that fetches posts from compatible *chan -image boards and publishes serialised JSON to RabbitMQ +image boards and publishes serialised JSON to redis for real-time ingest. Compatible image boards: 4chan, lainchan, uboachan, diff --git a/chan/chan.py b/chan/chan.py index 0363a92..d2c9fb1 100644 --- a/chan/chan.py +++ b/chan/chan.py @@ -391,46 +391,67 @@ CHANS = { "https://media.8kun.top/", "/res/", "file_store/", - ( - "_1", "55chan", "64chen", "8bantb", "8tube", "a", "abdl2", "agdg", "amv", "aneki", "animu", "animus", - "ara", "arda", "arms", "asatru", "asmr", "aus", "ausneets", "_b", "_baka", "baneposting", "_baseballbat", - "bcards", "bleached", "blog", "_bonehurtingjuice", "bq", "_brit", "bubblegum", "builders", "bunkers", - "butt", - "cafechan", "caffe", "canada", "cath", "chori", "choroy", "christian", "christianity", "christianmeme", - "cicachan", "civicrs", "ck", "cloveros", "co", "cow", "_cuckquean", "cute", "cyber", "cyoa", "_czech", - "dadtalk", "danpu", "dao101", "degen", "delete", "dempart", "desu", "diaperfags", "diaperfetish", "dir", - "_dolphin", "dpfag", "dpr", "druid", "e9y", "eatme", "ebola", "eerie", "egy", "egypt", "etika", "eu", - "euskotxa", "_exit", "f1", "fa", "fairy", "fallen", "fast", "faygo", "feet", "femaledomination", "feri", - "_fightcomms", "film", "flemish", "floss", "fortnite", "freedomzine", "fukemo", "fumo", "fur", "furry", "g", - "gamergatehq", "genesis", "gesu", "ggis", "girltalk", "greenbreeze", "gts", "haxxor", "hentai", - "hentaiclub", "_herm", "hermetics", "hgb", "hgg", "_hindu", "hisparefugio", "hissss", "hnt", "hover", - "hybrids", "hydrus", "hypno", "hypnochan", "icup", "imperium", "in", "ipfs", "ircsecrets", "islam", "ita", - "jaooo", "jewess", "jmaatv", "joker", "jp", "k", "kekforceusa", "kemono", "kocsog", "kohlchan", "_komica", - "komika", "kpop", "lain", "lego", "leo", "lewd", "lit", "lol", "loomis", "loroy", "luddite", "magick", - "maka", "mde", "merrychristmas", "miku", "milf", "mom", "monster", "msb", "mtb", "mtt", "mu", "n0thingness", - "nanachi", "natiofr", "nep", "newbrit", "newsplus", "nobody", "nofap", "nofur", "nogatco", "nothingness", - "ntr", "nuke8", "oanda", "_ocb", "_ocult", "omorashi", "opmk", "os", "otter", "p", "panconleche", "pdfs", - "_peaceofmind", "pen", "philosophy", "pkmns", "pnd", "pokeporn", "polymath", "pone", "projectdcomms", - "_pyatibrat", "qm", "qpatriotresearch", "_qresearch", "qrnews", "_rand21", "rec", "rmart", "rusrandom", - "rzabczan", "s", "s8s", "sag", "sapphic", "shousa", "sikhi", "sip", "sl", "snowboarding", "socpl", "strek", - "subs", "_sve", "t", "tan", "tdt", "tech9", "techan", "techbunker", "tek", "templeos", "tenda", "teraha", - "_texit", "tf2", "_tg", "thb", "thedickshow", "throat", "tibby", "tikilounge", "tkr", "tr55", - "_trashcollector", "truthlegion", "tulpamancers", "turul", "tutturu", "tv", "u", "uaco", "ucla", - "underground", "_usersunion", "v", "vichan", "vietkong", "vietnam", "vore", "vr", "warposting", "wdsc", - "webm", "wg", "_wga", "wikieat", "wis", "wmafsex", "workrelated", "wqt", "wx", "x", "_xivl", "_xtian", - "zoomerright", "zundel", - - "0", "55sync", "abdl", "alleycat", "arisu", "arisubunker", "arp", "bane", "bimbohypnosis", "bluemoon", - "bmn", "brains", "cats", "chance", "clang", "comfy", "critters", "cursed", "cvine", "cze", "d", "dcaco", - "demonp", "dnmd", "doomer", "doot", "elitabla", "empanada", "erp", "falseflags", "fashionplus", "fata", - "femdom", "fit", "flg", "fr8chan", "futyitorna", "garrett", "giantesshentai", "hentaiporn", "hmfr", - "hooliedayz", "hsp", "hujszon", "iep", "just", "k46", "kind", "kiwc", "kukichan", "lacajita", "legos", - "lgd", "liveanarchy", "luciddreaming", "m", "mapp", "mental", "mets", "milhis", "monarchy", "myon", - "newhomosuck", "newsci", "nine", "oes", "onepiece", "other369", "otomad", "penguware", "psyid", - "qresearch2gen", "rule34", "satorare", "sonyeon", "split", "sunflower", "tae", "test", "tft", "tftg", "toy", - "trap", "vein", "virtualreality", "vivian", "voros", "wbr", "weird", "wooo", "yuuka", - ), - rps=1 + ("1", "55chan", "_64chen", "8bantb", "8tube", "a", "_abdl2", "agdg", "amv", "aneki", "animu", "animus", "ara", + "arda", "arms", "asatru", "asmr", "aus", "ausneets", "__b", "__baka", "_baneposting", "__baseballbat", + "bcards", "bleached", "blog", "__bonehurtingjuice", "bq", "__brit", "bubblegum", "builders", "bunkers", "butt", + "cafechan", "caffe", "canada", "cath", "chori", "choroy", "christian", "christianity", "christianmeme", + "cicachan", "civicrs", "ck", "cloveros", "co", "cow", "__cuckquean", "cute", "cyber", "cyoa", "__czech", + "dadtalk", "danpu", "dao101", "degen", "delete", "dempart", "desu", "diaperfags", "diaperfetish", "dir", + "__dolphin", "dpfag", "_dpr", "druid", "_e9y", "eatme", "ebola", "eerie", "egy", "egypt", "etika", "eu", + "euskotxa", "__exit", "f1", "fa", "fairy", "fallen", "fast", "faygo", "feet", "femaledomination", "feri", + "__fightcomms", "film", "flemish", "floss", "fortnite", "freedomzine", "fukemo", "fumo", "fur", "furry", "g", + "gamergatehq", "genesis", "_gesu", "ggis", "girltalk", "greenbreeze", "gts", "haxxor", "hentai", "hentaiclub", + "__herm", "hermetics", "hgb", "hgg", "__hindu", "hisparefugio", "hissss", "hnt", "hover", "hybrids", "hydrus", + "hypno", "_hypnochan", "icup", "imperium", "in", "ipfs", "ircsecrets", "islam", "ita", "jaooo", "jewess", + "jmaatv", "joker", "jp", "k", "_kekforceusa", "kemono", "kocsog", "kohlchan", "__(komica)", "_komika", "kpop", + "lain", "_lego", "leo", "lewd", "lit", "lol", "loomis", "loroy", "luddite", "magick", "maka", "mde", + "merrychristmas", "miku", "milf", "mom", "monster", "msb", "mtb", "mtt", "mu", "n0thingness", "nanachi", + "natiofr", "nep", "newbrit", "newsplus", "nobody", "nofap", "nofur", "nogatco", "nothingness", "ntr", "_nuke8", + "oanda", "__ocb", "__ocult", "_omorashi", "opmk", "os", "otter", "p", "panconleche", "pdfs", "__peaceofmind", + "pen", "philosophy", "_pkmns", "pnd", "pokeporn", "polymath", "pone", "projectdcomms", "__pyatibrat", "_qm", + "qpatriotresearch", "__qresearch", "qrnews", "__rand21", "rec", "rmart", "rusrandom", "rzabczan", "s", "s8s", + "sag", "sapphic", "shousa", "sikhi", "sip", "sl", "_snowboarding", "socpl", "strek", "subs", "__sve", "t", + "tan", "tdt", "tech9", "techan", "techbunker", "tek", "templeos", "tenda", "teraha", "__texit", "tf2", "__tg", + "_thb", "thedickshow", "throat", "_tibby", "tikilounge", "tkr", "tr55", "__trashcollector", "truthlegion", + "tulpamancers", "turul", "tutturu", "tv", "u", "uaco", "_ucla", "underground", "__usersunion", "v", "vichan", + "vietkong", "vietnam", "vore", "vr", "_warposting", "wdsc", "webm", "wg", "__wga", "wikieat", "wis", "wmafsex", + "workrelated", "wqt", "wx", "x", "__xivl", "__xtian", "zoomerright", "zundel", "0", "55sync", "abdl", + "alleycat", "_arisu", "arisubunker", "_arp", "bane", "_bimbohypnosis", "_bluemoon", "bmn", "brains", "cats", + "_chance", "clang", "comfy", "critters", "_cursed", "_cvine", "cze", "d", "dcaco", "demonp", "_dnmd", "doomer", + "doot", "elitabla", "_empanada", "erp", "_falseflags", "fashionplus", "fata", "femdom", "fit", "_flg", + "_fr8chan", "futyitorna", "garrett", "_giantesshentai", "hentaiporn", "hmfr", "hooliedayz", "hsp", "hujszon", + "iep", "just", "k46", "kind", "_kiwc", "kukichan", "_lacajita", "_legos", "lgd", "liveanarchy", + "luciddreaming", "m", "_mapp", "mental", "_mets", "_milhis", "monarchy", "_myon", "newhomosuck", "newsci", + "_nine", "oes", "onepiece", "_other369", "otomad", "_penguware", "psyid", "qresearch2gen", "rule34", + "_satorare", "sonyeon", "split", "sunflower", "_tae", "test", "_tft", "tftg", "toy", "trap", "_vein", + "_virtualreality", "vivian", "voros", "wbr", "_weird", "wooo", "yuuka", "fringe", "random", "cuteboys", "tech", + "internatiomall", "interracial", "liberty", "htg", "mai", "komica", "cutebois", "argentina", "r", "tf", + "draftnote", "abcu", "k117", "britfeel", "liberty", "htg", "mai", "komica", "cutebois", "argentina", "r", "tf", + "draftnote", "abcu", "k117", "britfeel", "y", "an", "francofil", "portal", "royalhawk", "vdm", "bullmask", + "imouto", "tripfriend", "arepa", "rwby", "sw", "y", "an", "francofil", "portal", "royalhawk", "vdm", + "bullmask", "imouto", "tripfriend", "arepa", "rwby", "sw", "magali", "hikki", "biz", "eris", "india", "mg", + "magali", "hikki", "biz", "eris", "india", "mg", "out", "infinity", "tifa", "muslim", "out", "infinity", + "tifa", "muslim", "slackware", "archivo", "flatearth", "yaoi", "boombox", "wdp", "thedonald", + "libertedexpression", "khyber", "jsr", "slackware", "archivo", "flatearth", "yaoi", "boombox", "wdp", + "thedonald", "libertedexpression", "khyber", "jsr", "fso", "wumpawhip", "buddhismhotline", "indochinaexpats", + "ett", "redbar", "skyline350gt", "asc", "bazafx", "bestkorea", "covid19", "sokra", "bowsu", "qpatriotsunited", + "verzet", "wlctint", "cultstate", "melody", "vedic", "yhvh", "1cok", "astropolis", "fso", "wumpawhip", + "buddhismhotline", "indochinaexpats", "ett", "redbar", "skyline350gt", "asc", "bazafx", "bestkorea", "covid19", + "sokra", "bowsu", "qpatriotsunited", "verzet", "wlctint", "cultstate", "melody", "vedic", "yhvh", "1cok", + "astropolis", "earthlibfront", "pardochan", "stanislawowski", "thetrump", "yukkuri", "1825kun", "cryptobtc", + "isol", "knights", "language", "rr34", "sperg", "awaken", "belgium", "blizzard", "brain", "buddha", "dbs", + "deestevensvoice4you", "f4net", "fuckuchina", "gbtv", "hairygirls", "hallaca", "homeowner", "indo", "jersey", + "jigglypuff", "lbt", "madh4ckrs", "medcorp", "miamichan", "mrsfrisby", "mulatto", "mupro", "nhoodlink", + "p5porn", "patriotrevolution", "peko", "projectobject", "prop", "pups", "qanonspain", "qcastellano", + "earthlibfront", "pardochan", "stanislawowski", "thetrump", "yukkuri", "1825kun", "cryptobtc", "isol", + "knights", "language", "rr34", "sperg", "awaken", "belgium", "blizzard", "brain", "buddha", "dbs", + "deestevensvoice4you", "f4net", "fuckuchina", "gbtv", "hairygirls", "hallaca", "homeowner", "indo", "jersey", + "jigglypuff", "lbt", "madh4ckrs", "medcorp", "miamichan", "mrsfrisby", "mulatto", "mupro", "nhoodlink", + "p5porn", "patriotrevolution", "peko", "projectobject", "prop", "pups", "qanonspain", "qcastellano", "qsocial", + "resist", "revolu", "skemt", "sketheory", "spaceforce", "surro", "thehand", "transit", "vitaecryptocurrency", + "qsocial", "resist", "revolu", "skemt", "sketheory", "spaceforce", "surro", "thehand", "transit", + "vitaecryptocurrency"), + rps=2 ), "hispachan": HispachanHtmlHelper( 30, diff --git a/docker-compose.yml b/docker-compose.yml index ee24e2b..aaf2998 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,6 +2,7 @@ version: "2.1" volumes: influxdb_data: pg_data: + pg_data_imhash: services: influxdb: @@ -26,8 +27,20 @@ services: interval: 5s timeout: 5s retries: 5 - rabbitmq: - image: rabbitmq:alpine + db_imhashdb: + image: simon987/pg_hamming + volumes: + - pg_data_imhash:/var/lib/postgresql/data + environment: + - "POSTGRES_USER=imhashdb" + - "POSTGRES_PASSWORD=changeme" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U imhashdb"] + interval: 5s + timeout: 5s + retries: 5 + redis: + image: redis archiver: image: simon987/feed_archiver restart: always @@ -38,422 +51,326 @@ services: - "FA_DB_HOST=db" - "FA_DB_USER=feed_archiver" - "FA_DB_PASSWORD=changeme" - - "FA_MQ_CONNSTR=amqp://guest:guest@rabbitmq:5672/" - - "FA_EXCHANGES=chan" - ws_adapter: - image: simon987/ws_feed_adapter + - "FA_REDIS_ADDR=redis:6379" + - "FA_PATTERN=arc.*" + imhashdb: + image: simon987/imhashdb + restart: always + entrypoint: "/build/imhashdb/cli/cli hasher" + volumes: + - ${SAVE_FOLDER}:/data/ environment: - - "WSA_MQ_CONNSTR=amqp://guest:guest@rabbitmq:5672/" - feed_viz_frontend: - build: ./docker_viz/ - ports: - - 127.0.0.1:3005:80 + - "IMHASHDB_STORE=/data" + - "IMHASHDB_REDIS_ADDR=redis:6379" + - "IMHASHDB_PG_USER=imhashdb" + - "IMHASHDB_PG_PASSWORD=changeme" + - "IMHASHDB_PG_DATABASE=imhashdb" + - "IMHASHDB_PG_HOST=db_imhashdb" + - "IMHASHDB_HASH_CONCURRENCY=16" # Image boards + 4chan: + image: simon987/chan_feed + restart: always + user: ${CURRENT_UID} + environment: + - "CF_CHAN=4chan" + - "CF_REDIS_HOST=redis" + - "CF_INFLUXDB=influxdb" + 0chan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=0chan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" 22chan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=22chan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" 2chan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=2chan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" 2chhk: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=2chhk" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" 38chan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=38chan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" 410chan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=410chan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" - - 4chan: - image: simon987/chan_feed - restart: always - user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ - environment: - - "CF_CHAN=4chan" - - "CF_MQ_HOST=rabbitmq" - - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" 4kev: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=4kev" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" 7chan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=7chan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" 8kun: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=8kun" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" alokal: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=alokal" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" aurorachan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=aurorachan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" awsumchan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=awsumchan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" chanon: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=chanon" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" chanorg: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=chanorg" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" desuchan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=desuchan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" doushio: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=doushio" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" endchan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=endchan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" fchan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=fchan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" gnfos: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=gnfos" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" hispachan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=hispachan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" horochan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=horochan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" iichan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=iichan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" lainchan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=lainchan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" lolnada: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=lolnada" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" nowere: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=nowere" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" plus4chan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=plus4chan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" sushigirl: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=sushigirl" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" synch: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=synch" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" tahta: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=tahta" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" tgchan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=tgchan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" uboachan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=uboachan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" waifuist: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=waifuist" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" wizchan: image: simon987/chan_feed restart: always user: ${CURRENT_UID} - volumes: - - ${SAVE_FOLDER}:/data/ environment: - "CF_CHAN=wizchan" - - "CF_MQ_HOST=rabbitmq" + - "CF_REDIS_HOST=redis" - "CF_INFLUXDB=influxdb" - - "CF_SAVE_FOLDER=/data/" diff --git a/get_8kun_boards.py b/get_8kun_boards.py new file mode 100644 index 0000000..80778e3 --- /dev/null +++ b/get_8kun_boards.py @@ -0,0 +1,42 @@ +import json +import requests +from chan.chan import CHANS + +existing = CHANS["8kun2"]._boards +updated = list(existing) +added = set() + + +def mask(board): + for i, b in enumerate(updated): + if b == board: + updated[i] = "_" + board + print("[-] " + board) + + +def unmask(board): + for i, b in enumerate(updated): + if b == ("_" + board): + updated[i] = board + print("[*] " + board) + + +for i in range(0, 500, 50): + r = requests.get("https://sys.8kun.top/board-search.php?page=" + str(i)) + + j = json.loads(r.text) + + for board in j["boards"]: + added.add(board) + + if ("_" + board) in updated: + unmask(board) + elif board not in existing: + updated.append(board) + print("[+] " + board) + +for board in existing: + if board not in added: + mask(board) + +print("(" + ",".join('"' + u + '"' for u in updated) + ")") diff --git a/post_process.py b/post_process.py index b2a7f6c..f9c9672 100644 --- a/post_process.py +++ b/post_process.py @@ -1,14 +1,4 @@ -import hashlib -import os -import zlib -from io import BytesIO -from urllib.parse import urljoin, urlparse - -import imagehash -from PIL import Image -from hexlib.imhash import b64hash - -from util import logger +from urllib.parse import urljoin from hexlib.regex import HTML_HREF_RE, LINK_RE @@ -32,51 +22,8 @@ def _is_image(url): return url.lower().endswith(IMAGE_FILETYPES) -def image_meta(url, url_idx, web, helper, board): - r = web.get(url) - if not r: - logger.warning("Could not download image") - return None - buf = r.content - - sha1 = hashlib.sha1(buf).hexdigest() - - if helper.save_folder: - path = os.path.join(helper.save_folder, str(helper.db_id), board) - path += "/" + sha1[0] - path += "/" + sha1[1:3] - os.makedirs(path, exist_ok=True) - with open(os.path.join(path, sha1 + os.path.splitext(url)[1]), "wb") as out: - out.write(buf) - - try: - f = BytesIO(buf) - im = Image.open(f) - - meta = { - "url": url_idx, - "size": len(buf), - "width": im.width, - "height": im.height, - "sha1": sha1, - "md5": hashlib.md5(buf).hexdigest(), - "crc32": format(zlib.crc32(buf), "x"), - "dhash": b64hash(imagehash.dhash(im, hash_size=12), 18), - "phash": b64hash(imagehash.phash(im, hash_size=12), 18), - "ahash": b64hash(imagehash.average_hash(im, hash_size=12), 18), - "whash": b64hash(imagehash.whash(im, hash_size=8), 8), - } - except Exception as e: - logger.warning("exception during image post processing: " + str(e)) - return None - - del im, r, buf - - return meta - - -def post_process(item, board, helper, web): - item["_v"] = 1.6 +def post_process(item, board, helper): + item["_v"] = 1.7 item["_id"] = helper.item_unique_id(item, board) item["_board"] = board @@ -84,8 +31,6 @@ def post_process(item, board, helper, web): item["_urls"] = helper.item_urls(item, board) - item["_img"] = [image_meta(url, i, web, helper, board) for i, url in enumerate(item["_urls"]) if _is_image(url)] - return item diff --git a/requirements.txt b/requirements.txt index 993b28f..56040dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,10 @@ -imagehash -Pillow requests requests[socks] stem influxdb -pika bs4 urllib3 git+git://github.com/simon987/hexlib.git git+git://github.com/simon987/vanwanet_scrape.git -cloudscraper \ No newline at end of file +cloudscraper +redis \ No newline at end of file diff --git a/run.py b/run.py index 1563757..edf7f85 100644 --- a/run.py +++ b/run.py @@ -1,16 +1,14 @@ import datetime import json import os -import sqlite3 import time import traceback -from collections import defaultdict from datetime import datetime from queue import Queue from threading import Thread +import redis -import pika -from hexlib.misc import buffered +from hexlib.db import VolatileState from hexlib.monitoring import Monitoring from chan.chan import CHANS @@ -23,13 +21,23 @@ DBNAME = "chan_feed" if os.environ.get("CF_INFLUXDB"): influxdb = Monitoring(DBNAME, host=os.environ.get("CF_INFLUXDB"), logger=logger, batch_size=100, flush_on_exit=True) MONITORING = True +else: + MONITORING = False + +REDIS_HOST = os.environ.get("CF_REDIS_HOST", "localhost") +REDIS_PORT = os.environ.get("CF_REDIS_PORT", 6379) +CHAN = os.environ.get("CF_CHAN", None) + +ARC_LISTS = os.environ.get("CF_ARC_LISTS", "arc,imhash").split(",") + +PUB_CHANNEL = os.environ.get("CF_PUB_CHANNEL", "chan_feed") class ChanScanner: def __init__(self, helper, proxy): self.web = Web(influxdb if MONITORING else None, rps=helper.rps, get_method=helper.get_method, proxy=proxy) self.helper = helper - self.state = ChanState() + self.state = state def _threads(self, board): r = self.web.get(self.helper.threads_url(board)) @@ -66,96 +74,50 @@ class ChanScanner: def once(func): - def wrapper(item, board, helper, channel, web): - if not state.has_visited(helper.item_unique_id(item, board), helper): - func(item, board, helper, channel, web) - state.mark_visited(helper.item_unique_id(item, board), helper) + def wrapper(item, board, helper): + if not state.has_visited(helper.item_unique_id(item, board)): + func(item, board, helper) + state.mark_visited(helper.item_unique_id(item, board)) return wrapper class ChanState: - def __init__(self): - self._db = "state.db" + def __init__(self, prefix): + self._state = VolatileState(prefix, 86400 * 7, host=REDIS_HOST, port=REDIS_PORT) + print("redis host=" + REDIS_HOST) - with sqlite3.connect(self._db) as conn: - conn.execute( - "CREATE TABLE IF NOT EXISTS posts " - "(" - " post INT," - " ts INT DEFAULT (strftime('%s','now'))," - " chan INT," - " PRIMARY KEY(post, chan)" - ")" - ) - conn.execute( - "CREATE TABLE IF NOT EXISTS threads " - "(" - " thread INT," - " last_modified INT," - " ts INT DEFAULT (strftime('%s','now'))," - " chan INT," - " PRIMARY KEY(thread, chan)" - ")" - ) - conn.execute("PRAGMA journal_mode=wal") - conn.commit() + def mark_visited(self, item: int): + self._state["posts"][item] = 1 - def mark_visited(self, item: int, helper): - with sqlite3.connect(self._db, timeout=10000) as conn: - conn.execute( - "INSERT INTO posts (post, chan) VALUES (?,?)", - (item, helper.db_id) - ) - - def has_visited(self, item: int, helper): - with sqlite3.connect(self._db, timeout=10000) as conn: - cur = conn.cursor() - cur.execute( - "SELECT post FROM posts WHERE post=? AND chan=?", - (item, helper.db_id) - ) - return cur.fetchone() is not None + def has_visited(self, item: int): + return self._state["posts"][item] is not None def has_new_posts(self, thread, helper, board): mtime = helper.thread_mtime(thread) if mtime == -1: return True - with sqlite3.connect(self._db, timeout=10000) as conn: - cur = conn.cursor() - cur.execute( - "SELECT last_modified, ts FROM threads WHERE thread=? AND chan=?", - (helper.item_unique_id(thread, board), helper.db_id) - ) - row = cur.fetchone() - if not row or helper.thread_mtime(thread) != row[0] or row[1] + 86400 < int(time.time()): - return True - return False + t = self._state["threads"][helper.item_unique_id(thread, board)] + + if not t or helper.thread_mtime(thread) != t["last_modified"] or t["ts"] + 86400 < int(time.time()): + return True + return False def mark_thread_as_visited(self, thread, helper, board): - with sqlite3.connect(self._db, timeout=10000) as conn: - conn.execute( - "INSERT INTO threads (thread, last_modified, chan) " - "VALUES (?,?,?) " - "ON CONFLICT (thread, chan) " - "DO UPDATE SET last_modified=?, ts=(strftime('%s','now'))", - (helper.item_unique_id(thread, board), helper.thread_mtime(thread), helper.db_id, - helper.thread_mtime(thread)) - ) - conn.commit() + self._state["threads"][helper.item_unique_id(thread, board)] = { + "ts": time.time(), + "last_modified": helper.thread_mtime(thread) + } def publish_worker(queue: Queue, helper, p): - channel = connect() - web = Web(influxdb if MONITORING else None, rps=helper.rps, get_method=helper.get_method, proxy=p) - while True: try: item, board = queue.get() if item is None: break - publish(item, board, helper, channel, web) + publish(item, board, helper,) except Exception as e: logger.error(str(e) + ": " + traceback.format_exc()) @@ -163,48 +125,22 @@ def publish_worker(queue: Queue, helper, p): queue.task_done() -@buffered(batch_size=150, flush_on_exit=True) -def _publish_buffered(items): - if not items: - return - - buckets = defaultdict(list) - for item in items: - buckets[item[1]].append(item) - - for bucket in buckets.values(): - channel, routing_key, _ = bucket[0] - body = [item[2] for item in bucket] - - while True: - try: - channel.basic_publish( - exchange='chan', - routing_key=routing_key, - body=json.dumps(body, separators=(',', ':'), ensure_ascii=False, sort_keys=True) - ) - logger.debug("RabbitMQ: published %d items" % len(body)) - break - except Exception as e: - # logger.debug(traceback.format_exc()) - logger.error(str(e)) - time.sleep(0.5) - channel = connect() - - @once -def publish(item, board, helper, channel, web): - post_process(item, board, helper, web) +def publish(item, board, helper): + post_process(item, board, helper) item_type = helper.item_type(item) - routing_key = "%s.%s.%s" % (chan, item_type, board) + routing_key = "%s.%s.%s" % (CHAN, item_type, board) - _publish_buffered([(channel, routing_key, item)]) + message = json.dumps(item, separators=(',', ':'), ensure_ascii=False, sort_keys=True) + rdb.publish("chan." + routing_key, message) + for arc in ARC_LISTS: + rdb.lpush(arc + ".chan." + routing_key, message) if MONITORING: distance = datetime.utcnow() - datetime.utcfromtimestamp(helper.item_mtime(item)) influxdb.log([{ - "measurement": chan, + "measurement": CHAN, "time": str(datetime.utcnow()), "tags": { "board": board @@ -215,24 +151,8 @@ def publish(item, board, helper, channel, web): }]) -def connect(): - while True: - try: - rabbit = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_host)) - channel = rabbit.channel() - channel.exchange_declare(exchange="chan", exchange_type="topic") - return channel - except Exception as e: - logger.error(str(e)) - time.sleep(0.5) - pass - - if __name__ == "__main__": - - rabbitmq_host = os.environ.get("CF_MQ_HOST", "localhost") - chan = os.environ.get("CF_CHAN", None) - chan_helper = CHANS[chan] + chan_helper = CHANS[CHAN] save_folder = os.environ.get("CF_SAVE_FOLDER", "") if save_folder: @@ -246,10 +166,11 @@ if __name__ == "__main__": if BYPASS_RPS: chan_helper.rps = 10 - state = ChanState() + state = ChanState(CHAN) + rdb = redis.Redis(host=REDIS_HOST, port=REDIS_PORT) publish_q = Queue() - for _ in range(10): + for _ in range(3): publish_thread = Thread(target=publish_worker, args=(publish_q, chan_helper, proxy)) publish_thread.setDaemon(True) publish_thread.start() @@ -260,6 +181,7 @@ if __name__ == "__main__": for p, b in s.all_posts(): publish_q.put((p, b)) except KeyboardInterrupt as e: - for _ in range(5): + print("cleanup..") + for _ in range(3): publish_q.put((None, None)) break diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..6a163cd --- /dev/null +++ b/start.sh @@ -0,0 +1,2 @@ +#!/bin/bash +CURRENT_UID=$(id -u):$(id -g) SAVE_FOLDER=$(pwd)/data docker-compose up --force-recreate \ No newline at end of file