""" Modules that handles tor instaces creations in order to safely run histories """ import shutil import datetime as dt from time import sleep import asyncio import aiohttp from aiosocks.connector import ProxyConnector, ProxyClientRequest import async_timeout import stem.process as tor class TorInstance(): """ A tor instance object, with some useful information. It is designed to be used as a worker in order to replay an history. """ BASE_SOCKS_PORT = 40000 BASE_CONTROL_PORT = 20000 BASE_DATA_DIR = "/tmp/tor{}/" TOR_RUNNER = 0 @classmethod async def create(cls, history, headers): """ Factory creation of tor processes""" socks_port = cls.BASE_SOCKS_PORT + cls.TOR_RUNNER control_port = cls.BASE_CONTROL_PORT + cls.TOR_RUNNER data_dir = cls.BASE_DATA_DIR.format(cls.TOR_RUNNER) TorInstance.TOR_RUNNER += 1 self = TorInstance() self.socks_port = socks_port self.control_port = control_port self.data_dir = data_dir self.history = history self.headers = headers self.proxy = "socks5://127.0.0.1:{}".format(self.socks_port) self.create_session() self.process = tor.launch_tor_with_config( config={ 'ControlPort' : str(control_port), 'SocksPort' : str(socks_port), 'DataDir' : data_dir } ) return self def __init__(self): self.socks_port = 0 self.control_port = 0 self.data_dir = "" self.history = None self.proxy = "" self.headers = {} self.session = None self.process = None async def run(self): """ Runs the Tor Instance on the history. """ while (self.history) and (dt.datetime.combine(self.history[0][1], dt.datetime.min.time()) - dt.datetime.now()).total_seconds() >= 10: print("Sleeping") sleep(10) while self.history: item = self.history.pop(0) async with async_timeout.timeout(30): await(self.query(item[0])) now = dt.datetime.now() print(self.history[0]) if now <= dt.datetime.combine(self.history[0][1], dt.datetime.min.time()): sleep((dt.datetime.combine(self.history[0][1], dt.datetime.min.time()) - now).total_seconds()) def create_session(self): """ Create a aiohttp session. """ conn = ProxyConnector(remote_resolve=True) self.session = aiohttp.ClientSession( connector=conn, headers=self.headers, request_class=ProxyClientRequest ) async def query(self, url): """ Performs a query. """ async with async_timeout.timeout(30): async with self.session.get( url, proxy=self.proxy, proxy_auth=None) as resp: try: return await resp.text() except UnicodeDecodeError: return None def __str__(self): """ Utility function """ return ('[TOR] SOCKSPort: {0.socks_port}, ControlPort: ' '{0.control_port}, DataDir: {0.data_dir}'.format(self)) async def kill(self): """ Kills the process and remove the data dir""" self.process.kill() self.session.close() shutil.rmtree(self.data_dir) async def main(): """ Test function """ for _ in range(3): instance = await TorInstance.create(None, {"user-agent" : "Blah"}) await instance.query("https://python.org/") print("One page received") await instance.kill() if __name__ == "__main__": LOOP = asyncio.get_event_loop() LOOP.run_until_complete(main())