replaced requests by pycurl

This commit is contained in:
Simon 2018-08-23 11:47:09 -04:00
parent 8f218f3c9d
commit 85c3aa918d
3 changed files with 72 additions and 28 deletions

View File

@ -1,14 +1,16 @@
import pycurl
from io import BytesIO
from crawl_server import logger from crawl_server import logger
from urllib.parse import unquote, urljoin from urllib.parse import unquote, urljoin
import os import os
from html.parser import HTMLParser from html.parser import HTMLParser
from itertools import repeat from itertools import repeat
from crawl_server.crawler import RemoteDirectory, File from crawl_server.crawler import RemoteDirectory, File
import requests
from requests.exceptions import RequestException
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
import config import config
from dateutil.parser import parse as parse_date from dateutil.parser import parse as parse_date
from pycurl import Curl
import hashlib import hashlib
import urllib3 import urllib3
@ -94,10 +96,29 @@ class HttpDirectory(RemoteDirectory):
def __init__(self, url): def __init__(self, url):
super().__init__(url) super().__init__(url)
self.session = requests.Session() self.curl = None
self.session.headers = HttpDirectory.HEADERS self.curl_head = None
self.session.verify = False self.init_curl()
self.session.max_redirects = 1
def init_curl(self):
self.curl = Curl()
self.curl.setopt(self.curl.SSL_VERIFYPEER, 0)
self.curl.setopt(self.curl.SSL_VERIFYHOST, 0)
self.curl.setopt(pycurl.TIMEOUT, HttpDirectory.TIMEOUT)
self.curl_head = self._curl_handle()
def _curl_handle(self):
curl_head = Curl()
curl_head.setopt(self.curl.SSL_VERIFYPEER, 0)
curl_head.setopt(self.curl.SSL_VERIFYHOST, 0)
curl_head.setopt(pycurl.NOBODY, 1)
curl_head.setopt(pycurl.TIMEOUT, HttpDirectory.TIMEOUT)
return curl_head
def list_dir(self, path): def list_dir(self, path):
@ -139,7 +160,8 @@ class HttpDirectory(RemoteDirectory):
if len(urls_to_request) > 150: if len(urls_to_request) > 150:
# Many urls, use multi-threaded solution # Many urls, use multi-threaded solution
pool = ThreadPool(processes=10) pool = ThreadPool(processes=10)
files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request)) handles = [self._curl_handle() for _ in range(len(urls_to_request))]
files = pool.starmap(self._request_file, zip(handles, urls_to_request, repeat(self.base_url)))
pool.close() pool.close()
for file in files: for file in files:
if file: if file:
@ -147,31 +169,38 @@ class HttpDirectory(RemoteDirectory):
else: else:
# Too few urls to create thread pool # Too few urls to create thread pool
for url in urls_to_request: for url in urls_to_request:
file = self._request_file(url) file = self._request_file(self.curl_head, url, self.base_url)
if file: if file:
yield file yield file
def _request_file(self, url): @staticmethod
def _request_file(curl, url, base_url):
retries = HttpDirectory.MAX_RETRIES retries = HttpDirectory.MAX_RETRIES
while retries > 0: while retries > 0:
try: try:
r = self.session.head(url, allow_redirects=False, timeout=HttpDirectory.TIMEOUT) raw_headers = BytesIO()
curl.setopt(pycurl.URL, url)
curl.setopt(pycurl.HEADERFUNCTION, raw_headers.write)
curl.perform()
stripped_url = url[len(self.base_url) - 1:] stripped_url = url[len(base_url) - 1:]
headers = HttpDirectory._parse_dict_header(raw_headers.getvalue().decode("utf-8", errors="ignore"))
raw_headers.close()
path, name = os.path.split(stripped_url) path, name = os.path.split(stripped_url)
date = r.headers.get("Last-Modified", "1970-01-01") date = headers.get("Last-Modified", "1970-01-01")
return File( return File(
path=unquote(path).strip("/"), path=unquote(path).strip("/"),
name=unquote(name), name=unquote(name),
size=int(r.headers.get("Content-Length", -1)), size=int(headers.get("Content-Length", -1)),
mtime=int(parse_date(date).timestamp()), mtime=int(parse_date(date).timestamp()),
is_dir=False is_dir=False
) )
except RequestException: except pycurl.error as e:
self.session.close() curl.close()
retries -= 1 retries -= 1
raise e
logger.debug("TimeoutError - _request_file") logger.debug("TimeoutError - _request_file")
raise TimeoutError raise TimeoutError
@ -180,17 +209,19 @@ class HttpDirectory(RemoteDirectory):
retries = HttpDirectory.MAX_RETRIES retries = HttpDirectory.MAX_RETRIES
while retries > 0: while retries > 0:
try: try:
r = self.session.get(url, timeout=HttpDirectory.TIMEOUT) content = BytesIO()
try: self.curl.setopt(pycurl.URL, url)
return r.content.decode(r.encoding if r.encoding else "utf-8", errors="ignore") self.curl.setopt(pycurl.WRITEDATA, content)
except LookupError: self.curl.perform()
# Unsupported encoding
return r.content.decode("utf-8", errors="ignore")
except RequestException:
self.session.close()
retries -= 1
logger.debug("TimeoutError - _stream_body") return content.getvalue().decode("utf-8", errors="ignore")
except pycurl.error as e:
self.curl.close()
retries -= 1
print(e)
raise e
logger.debug("TimeoutError - _fetch_body")
raise TimeoutError raise TimeoutError
@staticmethod @staticmethod
@ -222,8 +253,19 @@ class HttpDirectory(RemoteDirectory):
if "?" in link.href: if "?" in link.href:
return True return True
@staticmethod
def _parse_dict_header(raw):
headers = dict()
for line in raw.split("\r\n")[1:]: # Ignore first 'HTTP/1.0 200 OK' line
if line:
k, v = line.split(":", maxsplit=1)
headers[k.strip()] = v.strip()
return headers
def close(self): def close(self):
self.session.close() self.curl.close()
logger.debug("Closing HTTPRemoteDirectory for " + self.base_url) logger.debug("Closing HTTPRemoteDirectory for " + self.base_url)
self.init_curl()

View File

@ -17,3 +17,5 @@ ujson
urllib3 urllib3
pyOpenSSL pyOpenSSL
pybloom-live pybloom-live
pycurl
lxml

View File

@ -127,7 +127,7 @@ class ElasticSearchEngine(SearchEngine):
def import_json(self, in_lines, website_id: int): def import_json(self, in_lines, website_id: int):
import_every = 400 import_every = 400
cooldown_time = 0.6 cooldown_time = 0
docs = [] docs = []