mpri-webdam/histories/tor_runner.py

124 lines
3.8 KiB
Python

"""
Modules that handles tor instaces creations in order to safely run histories
"""
import shutil
import datetime as dt
from time import sleep
import asyncio
import aiohttp
from aiosocks.connector import ProxyConnector, ProxyClientRequest
import async_timeout
import stem.process as tor
class TorInstance():
"""
A tor instance object, with some useful information.
It is designed to be used as a worker in order to replay an history.
"""
BASE_SOCKS_PORT = 40000
BASE_CONTROL_PORT = 20000
BASE_DATA_DIR = "/tmp/tor{}/"
TOR_RUNNER = 0
@classmethod
async def create(cls, history, headers):
""" Factory creation of tor processes"""
socks_port = cls.BASE_SOCKS_PORT + cls.TOR_RUNNER
control_port = cls.BASE_CONTROL_PORT + cls.TOR_RUNNER
data_dir = cls.BASE_DATA_DIR.format(cls.TOR_RUNNER)
TorInstance.TOR_RUNNER += 1
self = TorInstance()
self.socks_port = socks_port
self.control_port = control_port
self.data_dir = data_dir
self.history = history
self.headers = headers
self.proxy = "socks5://127.0.0.1:{}".format(self.socks_port)
self.create_session()
self.process = tor.launch_tor_with_config(
config={
'ControlPort' : str(control_port),
'SocksPort' : str(socks_port),
'DataDir' : data_dir
}
)
return self
def __init__(self):
self.socks_port = 0
self.control_port = 0
self.data_dir = ""
self.history = None
self.proxy = ""
self.headers = {}
self.session = None
self.process = None
async def run(self):
""" Runs the Tor Instance on the history.
"""
while (self.history) and (dt.datetime.combine(self.history[0][1],
dt.datetime.min.time()) -
dt.datetime.now()).total_seconds() >= 10:
print("Sleeping")
sleep(10)
while self.history:
item = self.history.pop(0)
async with async_timeout.timeout(30):
await(self.query(item[0]))
now = dt.datetime.now()
print(self.history[0])
if now <= dt.datetime.combine(self.history[0][1], dt.datetime.min.time()):
sleep((dt.datetime.combine(self.history[0][1], dt.datetime.min.time()) - now).total_seconds())
def create_session(self):
""" Create a aiohttp session.
"""
conn = ProxyConnector(remote_resolve=True)
self.session = aiohttp.ClientSession(
connector=conn,
headers=self.headers,
request_class=ProxyClientRequest
)
async def query(self, url):
""" Performs a query.
"""
async with async_timeout.timeout(30):
async with self.session.get(
url,
proxy=self.proxy,
proxy_auth=None) as resp:
try:
return await resp.text()
except UnicodeDecodeError:
return None
def __str__(self):
""" Utility function """
return ('[TOR] SOCKSPort: {0.socks_port}, ControlPort: '
'{0.control_port}, DataDir: {0.data_dir}'.format(self))
async def kill(self):
""" Kills the process and remove the data dir"""
self.process.kill()
self.session.close()
shutil.rmtree(self.data_dir)
async def main():
""" Test function """
for _ in range(3):
instance = await TorInstance.create(None, {"user-agent" : "Blah"})
await instance.query("https://python.org/")
print("One page received")
await instance.kill()
if __name__ == "__main__":
LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())