mpri-webdam/histories/models.py

245 lines
7.5 KiB
Python

""" Models for the history. This history should be able to generate history
entries, which looks like human-based browsing, according to a dedicated user
interests, keywords...
"""
from collections import namedtuple
import random
from math import floor
from queue import Queue
from xml.etree import ElementTree as ET
from datetime import datetime
from django.db import models
import profiles.models as profiles
from crawl import crawl
from pinocchio.settings import HISTORY_MIN
from .tor_runner import TorInstance
class InvalidXml(Exception):
def __init__(self, what='unexpected XML data.'):
super().__init__()
self.what = what
def __str__(self):
return "Invalid XML: " + self.what
class HistoryEntry(models.Model):
""" A history entry, aka a url, and a timestamp.
"""
search = models.URLField(help_text="The url to be searched")
timestamp = models.DateTimeField()
history = models.ForeignKey(
'History',
on_delete=models.CASCADE
)
def __str__(self):
""" Returns the string representation of a history entry.
"""
return "{} : {}".format(self.timestamp, self.search)
def to_xml(self, xml_root):
entry = ET.Element('history')
entry_url = ET.Element('url')
entry_url.text = self.search
entry_ts = ET.Element('timestamp')
entry_ts.text = self.timestamp.timestamp()
entry.append(entry_url)
entry.append(entry_ts)
xml_root.append(entry)
@staticmethod
def from_xml(xml_root, in_history):
if xml_root.tag != 'history':
raise InvalidXml("expected <history> tag here.")
url, timestamp = None, None
for child in xml_root:
if child.tag == 'url':
url = child.text
elif child.tag == 'timestamp':
try:
timestamp = datetime.fromtimestamp(child.text)
except TypeError:
raise InvalidXml("invalid timestamp {}".format(child.text))
else:
raise InvalidXml("unknown tag {} as child of <history>".format(
child.tag))
output = HistoryEntry()
output.search = url
output.timestamp = timestamp
output.history = in_history
return output
class History(models.Model):
""" A history for a user, containing some web connections (http, https).
Each history is timed, in a human-behaviour manner. """
start_ts = models.DateTimeField(
help_text=('The starting timestamp of the history. Useful for '
'cron-like structure.')
)
played = models.BooleanField(default=False)
user = models.ForeignKey(
profiles.Profile,
on_delete=models.CASCADE
)
def return_history(self):
""" Returns the history, sorted by increasing timestamps
"""
output_history = self.historyentry_set.order_by('timestamp')
output_history = [(item.search, item.timestamp.date())
for item in output_history]
return output_history
def __str__(self):
""" Returns the string representation of a history.
"""
history_set = self.historyentry_set.order_by('timestamp')
header = "[History]:\n"
return header + "\n".join(history_set)
def play_histories(self):
""" Actually plays the history.
"""
self.played = True
runner = TorInstance(self.history)
self.save()
def to_xml(self, xml_root):
''' Exports the current history to xml '''
hist_node = ET.Element("history", attrib={
'start-ts': self.start_ts,
'played': 1 if self.played else 0,
'user': self.user.pk,
})
xml_root.append(hist_node)
for entry in self.historyentry_set:
entry.to_xml(hist_node)
@staticmethod
def from_xml(xml_root):
''' Loads an history from an XML file '''
REQUIRED_ATTR = ['start-ts', 'played', 'user']
if xml_root.tag != 'history':
raise InvalidXml('unexpected node {} as root of an history'.format(
xml_root.tag))
for attr in REQUIRED_ATTR:
if attr not in xml_root.attrib:
raise InvalidXml(('missing attribute "{}" for tag of type '
'history').format(attr))
start_ts = xml_root.attrib['start-ts']
played = xml_root.attrib['played']
user_pk = xml_root.attrib['user']
users = History.objects.filter(pk=1)
if len(users) != 1:
raise InvalidXml('primary key for History {} is invalid'.format(
user_pk))
output = History()
output.start_ts = start_ts
output.played = played > 0
output.user = users[0]
for child in xml_root:
HistoryEntry.from_xml(child, output)
return output
PartialHistoryEntry = namedtuple('PartialHistoryEntry',
['url', 'timestamp'])
def generate_partial_history(user, t_start):
""" Generate the part of the history resulting from the crawl starting at
the given url.
"""
timestamp = t_start
result = []
basis = generate_first_url(user)
result.append(PartialHistoryEntry(basis, timestamp))
t_start += 5 * random.weibullvariate(1, 1.5)
queue = Queue()
crawler = crawl.CrawlingThread(basis, queue)
crawler.start()
crawler.join()
urls = queue.get()
for url in urls:
t_start += 5 * random.weibullvariate(1, 1.5)
result.append(PartialHistoryEntry(url, timestamp))
return result
def generate_first_url(user):
""" Generate the first url of a partial history, based on the user
information. """
def nonempty(seq):
out = []
for elt in seq:
if elt:
out.append(elt)
return out
all_keywords = profiles.Keyword.objects.filter(
interest__profile__in=[user])
all_websites = profiles.Website.objects.filter(
interest__profile__in=[user])
all_places = profiles.Place.objects.filter(
interest__profile__in=[user])
all_events = profiles.Event.objects.filter(
interest__profile__in=[user])
interest = random.choice(nonempty([
all_keywords,
all_websites,
all_places,
all_events,
]))
search_term = random.choice(interest)
url = search_term.generate_url(user)
return url
def generate_history(user, start_time):
""" Generate a new history for the user `user`, starting from timestamp
`ts_start`.
A few heuristics are used in order to give the impression that the history
is actually played by a user.
"""
# let's define a new history object.
history = History(start_ts=start_time, user=user)
length = HISTORY_MIN + floor(10 * random.weibullvariate(1, 1.5))
history.full_clean()
history.save()
history_line = 0
current_timestamp = start_time.timestamp()
while history_line < length:
current_timestamp += 5 * random.weibullvariate(1, 2.8)
history_list = generate_partial_history(user, current_timestamp)
current_timestamp = \
history_list[-1].timestamp + 5 * random.weibullvariate(1, 5)
for (url, timestamp) in history_list:
new_line = HistoryEntry(
search=url,
timestamp=datetime.fromtimestamp(timestamp),
history=history
)
new_line.full_clean()
new_line.save()
return history