From 98fe69ba62985ca5858641026ef95f02e4f87c4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Bastian?= Date: Mon, 26 Feb 2018 15:27:57 +0100 Subject: [PATCH] Real async crawling --- crawl/crawl.py | 116 +++++++++++++++++++++++++------------------- histories/models.py | 35 ++++++++++--- 2 files changed, 94 insertions(+), 57 deletions(-) diff --git a/crawl/crawl.py b/crawl/crawl.py index 2a21693..28564f8 100644 --- a/crawl/crawl.py +++ b/crawl/crawl.py @@ -174,7 +174,7 @@ class CrawlingThread(Thread): """ A separate thread for the crawling task. This is needed to use asyncio, since the thread will need its own event loop. """ - def __init__(self, url, output_tree): + def __init__(self, url): engine_list = [engine.url for engine in SearchEngine.objects.all()] WebsiteScheduler.search_engines = engine_list @@ -183,7 +183,7 @@ class CrawlingThread(Thread): randint(0, nb_fingerprint - 1)] self.headers = fingerprint.serialize_headers() - self.output_tree = output_tree + self.output_tree = [] super(CrawlingThread, self).__init__() self.url = url @@ -192,12 +192,14 @@ class CrawlingThread(Thread): #tasks.append(async_crawler("http://plus.google.com/+Python")) #tasks.append(async_crawler('https://python.org/')) - tasks.append(async_crawler(self.url, self.output_tree)) + tasks.append(run_crawl(self.url, self.output_tree, self.headers)) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(asyncio.wait(tasks)) - loop.close() + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(asyncio.wait(tasks)) + finally: + loop.close() class PageGetter: @@ -242,7 +244,6 @@ async def async_print(url): )) - class CrawlElem: ''' Describes a crawled element, to be assembled into a tree ''' @@ -250,54 +251,69 @@ class CrawlElem: self.url = url self.parent = parent -async def async_crawler(url, output_tree, headers=None): + +async def run_crawl(url, output_tree, headers=None): + ''' Starts a crawling session ''' + if headers is None: headers = {} if 'User-Agent' not in headers: headers['User-Agent'] = settings.USER_AGENT user_agent = headers['User-Agent'] - queued = [CrawlElem(url, None)] crawled = set() - crawl_tree = [] - while queued and (len(crawled) < HARD_LIMIT): - async with aiohttp.ClientSession(headers=headers) as session: - try: - crawl_elt = queued.pop(0) - url = crawl_elt.url - except IndexError: - print("Error queue is empty") - return crawled - crawled.add(url) - parsed_url = urlparse(url) - print("Crawling {}".format(url)) - html = await PageGetter(session, url, user_agent).get(ssl=False) - if html: - crawl_tree.append(crawl_elt) - new_urls = url_getter( - html, - url, - parsed_url.scheme + "://" + parsed_url.netloc - ) - if new_urls: - sampled = sample( - new_urls, - randrange(min(MAX_PER_PAGE, len(new_urls))) - ) - queued += [ - CrawlElem(sample_url, crawl_elt) - for sample_url in sampled - if sample_url not in queued - and sample_url not in crawled - ] - else: - print("No html received") - print(crawled) - output_tree += crawl_tree + async with aiohttp.ClientSession(headers=headers) as session: + await async_crawler( + url, output_tree, crawled, user_agent, session, None) -if __name__ == '__main__': - crawl_tree = [] - crawl = CrawlingThread(None, "https://google.com/search?q=fabriquer+masque+manif", crawl_tree) - crawl.start() - crawl.join() + +def simplify_url(url): + anchor = url.find('#') + if anchor >= 0: + url = url[:anchor] + + prot = url.find('://') + if prot >= 0: + url = url[prot+3:] + + if url.startswith('www.'): + url = url[4:] + + return url + + +async def async_crawler(url, out_tree, crawled, user_agent, session, parent): + if len(crawled) >= HARD_LIMIT: + return + crawled.add(simplify_url(url)) + parsed_url = urlparse(url) + print("Crawling {}".format(url)) + html = await PageGetter(session, url, user_agent).get(ssl=False) + + new_tasks = [] + + if html: + this_elem = CrawlElem(url, parent) + out_tree.append(this_elem) + new_urls = url_getter( + html, + url, + parsed_url.scheme + "://" + parsed_url.netloc + ) + if new_urls: + sampled = sample( + new_urls, + randrange(min(MAX_PER_PAGE, len(new_urls))) + ) + for sample_url in sampled: + if simplify_url(sample_url) not in crawled: + new_tasks.append(async_crawler( + sample_url, out_tree, crawled, user_agent, session, + this_elem)) + else: + print("No html received") + if len(crawled) >= HARD_LIMIT: + return + if new_tasks: + await asyncio.wait(new_tasks) diff --git a/histories/models.py b/histories/models.py index e638281..5103194 100644 --- a/histories/models.py +++ b/histories/models.py @@ -101,9 +101,11 @@ class History(models.Model): def __str__(self): """ Returns the string representation of a history. """ - history_set = self.historyentry_set.order_by('timestamp') - header = "[History]:\n" - return header + "\n".join(history_set) + entries = self.historyentry_set.order_by('timestamp') + output = "[History]:\n" + for entry in entries: + output += str(entry) + '\n' + return output def play_histories(self): """ Actually plays the history. @@ -169,6 +171,7 @@ def generate_partial_history(user, t_start): basis = generate_first_url(user) result.append(PartialHistoryEntry(basis, timestamp)) t_start += 5 * random.weibullvariate(1, 1.5) +<<<<<<< HEAD output_tree = [] crawler = crawl.CrawlingThread(basis, output_tree) crawler.start() @@ -177,6 +180,23 @@ def generate_partial_history(user, t_start): for url in urls: t_start += 5 * random.weibullvariate(1, 1.5) result.append(PartialHistoryEntry(url.url, timestamp)) +======= + crawler = crawl.CrawlingThread(basis) + crawler.start() + crawler.join() + urls_tree = crawler.output_tree + + open_time = {} + for elem in urls_tree: + url, parent = elem.url, elem.parent + timestamp = 0 + if parent is None: + timestamp = t_start + else: + timestamp = open_time[parent] + 5 * random.weibullvariate(1, 1.5) + open_time[elem] = timestamp + result.append(PartialHistoryEntry(url, timestamp)) +>>>>>>> Real async crawling return result @@ -224,11 +244,11 @@ def generate_history(user, start_time): history.full_clean() history.save() - history_line = 0 - current_timestamp = start_time.timestamp() - while history_line < length: + hist_size = 0 + + while hist_size < length: current_timestamp += 5 * random.weibullvariate(1, 2.8) history_list = generate_partial_history(user, current_timestamp) current_timestamp = \ @@ -237,12 +257,13 @@ def generate_history(user, start_time): if len(url) < 200: new_line = HistoryEntry( search=url, - timestamp=datetime.fromtimestamp(timestamp), + timestamp=datetime.fromtimestamp(timestamp), # FIXME tz history=history ) try: new_line.full_clean() new_line.save() + hist_size += 1 except ValidationError: continue