Real async crawling

This commit is contained in:
Théophile Bastian 2018-02-26 15:27:57 +01:00
parent 968ff6d24c
commit 98fe69ba62
2 changed files with 94 additions and 57 deletions

View File

@ -174,7 +174,7 @@ class CrawlingThread(Thread):
""" A separate thread for the crawling task. This is needed to use asyncio, """ A separate thread for the crawling task. This is needed to use asyncio,
since the thread will need its own event loop. """ since the thread will need its own event loop. """
def __init__(self, url, output_tree): def __init__(self, url):
engine_list = [engine.url for engine in SearchEngine.objects.all()] engine_list = [engine.url for engine in SearchEngine.objects.all()]
WebsiteScheduler.search_engines = engine_list WebsiteScheduler.search_engines = engine_list
@ -183,7 +183,7 @@ class CrawlingThread(Thread):
randint(0, nb_fingerprint - 1)] randint(0, nb_fingerprint - 1)]
self.headers = fingerprint.serialize_headers() self.headers = fingerprint.serialize_headers()
self.output_tree = output_tree self.output_tree = []
super(CrawlingThread, self).__init__() super(CrawlingThread, self).__init__()
self.url = url self.url = url
@ -192,12 +192,14 @@ class CrawlingThread(Thread):
#tasks.append(async_crawler("http://plus.google.com/+Python")) #tasks.append(async_crawler("http://plus.google.com/+Python"))
#tasks.append(async_crawler('https://python.org/')) #tasks.append(async_crawler('https://python.org/'))
tasks.append(async_crawler(self.url, self.output_tree)) tasks.append(run_crawl(self.url, self.output_tree, self.headers))
loop = asyncio.new_event_loop() try:
asyncio.set_event_loop(loop) loop = asyncio.new_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) asyncio.set_event_loop(loop)
loop.close() loop.run_until_complete(asyncio.wait(tasks))
finally:
loop.close()
class PageGetter: class PageGetter:
@ -242,7 +244,6 @@ async def async_print(url):
)) ))
class CrawlElem: class CrawlElem:
''' Describes a crawled element, to be assembled into a tree ''' ''' Describes a crawled element, to be assembled into a tree '''
@ -250,54 +251,69 @@ class CrawlElem:
self.url = url self.url = url
self.parent = parent self.parent = parent
async def async_crawler(url, output_tree, headers=None):
async def run_crawl(url, output_tree, headers=None):
''' Starts a crawling session '''
if headers is None: if headers is None:
headers = {} headers = {}
if 'User-Agent' not in headers: if 'User-Agent' not in headers:
headers['User-Agent'] = settings.USER_AGENT headers['User-Agent'] = settings.USER_AGENT
user_agent = headers['User-Agent'] user_agent = headers['User-Agent']
queued = [CrawlElem(url, None)]
crawled = set() crawled = set()
crawl_tree = []
while queued and (len(crawled) < HARD_LIMIT): async with aiohttp.ClientSession(headers=headers) as session:
async with aiohttp.ClientSession(headers=headers) as session: await async_crawler(
try: url, output_tree, crawled, user_agent, session, None)
crawl_elt = queued.pop(0)
url = crawl_elt.url
except IndexError:
print("Error queue is empty")
return crawled
crawled.add(url)
parsed_url = urlparse(url)
print("Crawling {}".format(url))
html = await PageGetter(session, url, user_agent).get(ssl=False)
if html:
crawl_tree.append(crawl_elt)
new_urls = url_getter(
html,
url,
parsed_url.scheme + "://" + parsed_url.netloc
)
if new_urls:
sampled = sample(
new_urls,
randrange(min(MAX_PER_PAGE, len(new_urls)))
)
queued += [
CrawlElem(sample_url, crawl_elt)
for sample_url in sampled
if sample_url not in queued
and sample_url not in crawled
]
else:
print("No html received")
print(crawled)
output_tree += crawl_tree
if __name__ == '__main__':
crawl_tree = [] def simplify_url(url):
crawl = CrawlingThread(None, "https://google.com/search?q=fabriquer+masque+manif", crawl_tree) anchor = url.find('#')
crawl.start() if anchor >= 0:
crawl.join() url = url[:anchor]
prot = url.find('://')
if prot >= 0:
url = url[prot+3:]
if url.startswith('www.'):
url = url[4:]
return url
async def async_crawler(url, out_tree, crawled, user_agent, session, parent):
if len(crawled) >= HARD_LIMIT:
return
crawled.add(simplify_url(url))
parsed_url = urlparse(url)
print("Crawling {}".format(url))
html = await PageGetter(session, url, user_agent).get(ssl=False)
new_tasks = []
if html:
this_elem = CrawlElem(url, parent)
out_tree.append(this_elem)
new_urls = url_getter(
html,
url,
parsed_url.scheme + "://" + parsed_url.netloc
)
if new_urls:
sampled = sample(
new_urls,
randrange(min(MAX_PER_PAGE, len(new_urls)))
)
for sample_url in sampled:
if simplify_url(sample_url) not in crawled:
new_tasks.append(async_crawler(
sample_url, out_tree, crawled, user_agent, session,
this_elem))
else:
print("No html received")
if len(crawled) >= HARD_LIMIT:
return
if new_tasks:
await asyncio.wait(new_tasks)

View File

@ -101,9 +101,11 @@ class History(models.Model):
def __str__(self): def __str__(self):
""" Returns the string representation of a history. """ Returns the string representation of a history.
""" """
history_set = self.historyentry_set.order_by('timestamp') entries = self.historyentry_set.order_by('timestamp')
header = "[History]:\n" output = "[History]:\n"
return header + "\n".join(history_set) for entry in entries:
output += str(entry) + '\n'
return output
def play_histories(self): def play_histories(self):
""" Actually plays the history. """ Actually plays the history.
@ -169,6 +171,7 @@ def generate_partial_history(user, t_start):
basis = generate_first_url(user) basis = generate_first_url(user)
result.append(PartialHistoryEntry(basis, timestamp)) result.append(PartialHistoryEntry(basis, timestamp))
t_start += 5 * random.weibullvariate(1, 1.5) t_start += 5 * random.weibullvariate(1, 1.5)
<<<<<<< HEAD
output_tree = [] output_tree = []
crawler = crawl.CrawlingThread(basis, output_tree) crawler = crawl.CrawlingThread(basis, output_tree)
crawler.start() crawler.start()
@ -177,6 +180,23 @@ def generate_partial_history(user, t_start):
for url in urls: for url in urls:
t_start += 5 * random.weibullvariate(1, 1.5) t_start += 5 * random.weibullvariate(1, 1.5)
result.append(PartialHistoryEntry(url.url, timestamp)) result.append(PartialHistoryEntry(url.url, timestamp))
=======
crawler = crawl.CrawlingThread(basis)
crawler.start()
crawler.join()
urls_tree = crawler.output_tree
open_time = {}
for elem in urls_tree:
url, parent = elem.url, elem.parent
timestamp = 0
if parent is None:
timestamp = t_start
else:
timestamp = open_time[parent] + 5 * random.weibullvariate(1, 1.5)
open_time[elem] = timestamp
result.append(PartialHistoryEntry(url, timestamp))
>>>>>>> Real async crawling
return result return result
@ -224,11 +244,11 @@ def generate_history(user, start_time):
history.full_clean() history.full_clean()
history.save() history.save()
history_line = 0
current_timestamp = start_time.timestamp() current_timestamp = start_time.timestamp()
while history_line < length: hist_size = 0
while hist_size < length:
current_timestamp += 5 * random.weibullvariate(1, 2.8) current_timestamp += 5 * random.weibullvariate(1, 2.8)
history_list = generate_partial_history(user, current_timestamp) history_list = generate_partial_history(user, current_timestamp)
current_timestamp = \ current_timestamp = \
@ -237,12 +257,13 @@ def generate_history(user, start_time):
if len(url) < 200: if len(url) < 200:
new_line = HistoryEntry( new_line = HistoryEntry(
search=url, search=url,
timestamp=datetime.fromtimestamp(timestamp), timestamp=datetime.fromtimestamp(timestamp), # FIXME tz
history=history history=history
) )
try: try:
new_line.full_clean() new_line.full_clean()
new_line.save() new_line.save()
hist_size += 1
except ValidationError: except ValidationError:
continue continue