From 7a360de2ab4e55ba2f5080fff657f4feb9caf4c7 Mon Sep 17 00:00:00 2001 From: simon Date: Thu, 19 Dec 2019 09:19:51 -0500 Subject: [PATCH] buffer monitoring --- monitoring.py | 26 -------------------------- run.py | 17 ++++++++++------- 2 files changed, 10 insertions(+), 33 deletions(-) delete mode 100644 monitoring.py diff --git a/monitoring.py b/monitoring.py deleted file mode 100644 index 8259e5c..0000000 --- a/monitoring.py +++ /dev/null @@ -1,26 +0,0 @@ -import traceback - -from influxdb import InfluxDBClient - -from util import logger - -client = InfluxDBClient("localhost", 8086, "root", "root", "chan_feed") - - -def init(): - db_exists = False - for db in client.get_list_database(): - if db["name"] == "chan_feed": - db_exists = True - break - - if not db_exists: - client.create_database("chan_feed") - - -def log(event): - try: - client.write_points(event) - except Exception as e: - logger.debug(traceback.format_exc()) - logger.error(str(e)) diff --git a/run.py b/run.py index 4226f0a..cd8dcf9 100644 --- a/run.py +++ b/run.py @@ -9,8 +9,8 @@ from queue import Queue from threading import Thread import pika +from hexlib.monitoring import Monitoring -import monitoring from chan.chan import CHANS from post_process import post_process from util import logger, Web @@ -19,9 +19,14 @@ MONITORING = True BYPASS_RPS = False +DBNAME = "chan_feed" +if MONITORING: + influxdb = Monitoring(DBNAME, logger=logger, batch_size=100, flush_on_exit=True) + + class ChanScanner: def __init__(self, helper, proxy): - self.web = Web(monitoring if MONITORING else None, rps=helper.rps, get_method=helper.get_method, proxy=proxy) + self.web = Web(influxdb if MONITORING else None, rps=helper.rps, get_method=helper.get_method, proxy=proxy) self.helper = helper self.state = ChanState() @@ -142,7 +147,7 @@ class ChanState: def publish_worker(queue: Queue, helper, p): channel = connect() - web = Web(monitoring if MONITORING else None, rps=helper.rps, get_method=helper.get_method, proxy=p) + web = Web(influxdb if MONITORING else None, rps=helper.rps, get_method=helper.get_method, proxy=p) while True: try: @@ -170,7 +175,7 @@ def publish(item, board, helper, channel, web): if MONITORING: distance = datetime.utcnow() - datetime.utcfromtimestamp(helper.item_mtime(item)) - monitoring.log([{ + influxdb.log([{ "measurement": chan, "time": str(datetime.utcnow()), "tags": { @@ -182,7 +187,7 @@ def publish(item, board, helper, channel, web): }]) break except Exception as e: - logger.debug(traceback.format_exc()) + # logger.debug(traceback.format_exc()) logger.error(str(e)) time.sleep(0.5) channel = connect() @@ -213,8 +218,6 @@ if __name__ == "__main__": if BYPASS_RPS: chan_helper.rps = 10 - if MONITORING: - monitoring.init() state = ChanState() publish_q = Queue()