hello-barcelona-17/misc/voronoi_twal.py

381 lines
15 KiB
Python
Raw Normal View History

2017-10-02 21:15:34 +02:00
import heapq
import sys
from math import sqrt
from functools import reduce
from random import randint
# J'ai utilisé dans cet exo l'algorithme de Fortune pour calculer le diagramme de Voronoï des points d'accès wifi
# J'ai utilisé une skiplist à la place d'un arbre binaire balancé car j'ai un peu la flemme de coder un rb-tree ^^
# Avant d'utiliser une skiplist, j'avais une liste doublement chaînée utilisant la même interface, car c'est plus facile à
# coder et du coup moins de chance d'avoir des bugs. Avec j'avais une complexitée de O(n*(n+h)) où h est le nombre de points
# sur l'enveloppe convexe. Avec une skiplist, j'ai une complexité de O(n*(log(n)+h)), mais j'ai été déçu car la constante
# de temps fait que la skiplist devient avantageuse autour de N=20000, et l'énoncé donne N<=2000...
# On m'a néanmoins conseillé de donner l'algorithme avec la meilleure complexité, donc la voici.
# Une priority queue car elles sont nécessairement synchronisées dans python... grr.
class PriorityQueue:
def __init__(self, elems):
self.queue = [[prio, i, elem] for (i, (elem, prio)) in enumerate(elems)]
self.entryFinder = {}
for entry in self.queue:
self.entryFinder[entry[2]] = entry
heapq.heapify(self.queue)
self.counter = len(self.queue)
def push(self, elem, priority):
entry = [priority, self.counter, elem]
self.entryFinder[elem] = entry
heapq.heappush(self.queue, entry)
self.counter += 1
def top(self):
while self.queue:
candidate = self.queue[0][2]
if candidate != None:
return candidate
heapq.heappop(self.queue)
return None
def pop(self):
while self.queue:
candidate = heapq.heappop(self.queue)[2]
if candidate != None:
del self.entryFinder[candidate]
return candidate
return None
def contains(self, elem):
return elem in self.entryFinder
def remove(self, elem):
entry = self.entryFinder.pop(elem)
entry[2] = None
def empty(self):
return self.top() == None
# Une classe représentant l'enveloppe convexe d'un ensemble de points
class ConvexHull:
def __init__(self, sites):
# On la calcule avec un parcours de graham
sortedSites = sorted(sites, key=lambda s:s.x)
up = reduce(ConvexHull.keepLeft, sortedSites, [])
down = reduce(ConvexHull.keepLeft, reversed(sortedSites), [])
self.hull = up + down[1:-1]
def det(a, b, n):
return (b.x-a.x)*(n.y-a.y) - (b.y-a.y)*(n.x-a.x)
def keepLeft(hull, n):
while len(hull) > 1 and ConvexHull.det(hull[-2], hull[-1], n) <= 0:
hull.pop()
hull.append(n)
return hull
def isLeftTo(a, b, s):
return (s.x-a.x)*(b.y-a.y)-(s.y-a.y)*(b.x-a.x) < 0
def isInsideHull(self, s):
for i in range(len(self.hull)):
if not ConvexHull.isLeftTo(self.hull[i-1], self.hull[i], s):
return False
return True
# Classe représentant un point dans le plan. C'est nommé "Site" car c'est comme ça que sont nommés les points
# dont il faut calculer le diagramme de voronoi dans les papiers de Fortune
class Site:
def __init__(self, x, y):
self.x = x
self.y = y
def dist(self, other):
return sqrt((self.x-other.x)**2 + (self.y-other.y)**2)
# Une parabole de la beachline
class Parabola:
def __init__(self, ctx, site):
self.ctx = ctx
self.site = site
self.ledge = None
self.redge = None
def getEquation(self):
x = self.site.x
y = self.site.y
l = self.ctx.sweepline
div = 2*(y-l)
if div == 0: #FIXME
return None
# Retourne (a, b, c) tels que y = a*x^2 + b*x + c
# Obtenus en résolvant en y sqrt((x-xa)^2 + (y-ya)^2) = y-l avec xa = self.site.x, ya = self.site.y
return (1/div, -2*x/div, (x*x + y*y - l*l)/div)
def intersect(self, other):
if self.getEquation() == None:
return self.site.x
if other.getEquation() == None:
return other.site.x
(a, b, c) = tuple(map(lambda x, y: x-y, self.getEquation(), other.getEquation()))
if a == 0: #FIXME
# b*x + c = 0
return -c/b
else:
# a*x^2 + b*x + c = 0
discr = sqrt(b*b-4*a*c)
x1 = (-b-discr)/(2*a)
x2 = (-b+discr)/(2*a)
if self.site.y < other.site.y:
return min(x1, x2)
else:
return max(x1, x2)
# Donne le point où la parabole va être "écrasée" par ses deux voisines
def circleCenter(self):
return self.ledge.intersect(self.redge)
def hasCircleCenter(self):
return self.ledge != None and self.redge != None and self.circleCenter() != None
# Renvoie la position de la sweepline quand la parabole va disparaitre
def circleEventY(self):
voronoiVertex = self.circleCenter()
return voronoiVertex.y + voronoiVertex.dist(self.site)
# Renvoie une copie, sans ledge et redge car elles sont recalculées après une copie
def copy(self):
return Parabola(self.ctx, self.site)
# Représente une arête du diagramme de voronoï
class Edge:
def __init__(self, leftSite, rightSite):
self.ls = leftSite
self.rs = rightSite
# a*x + b*y = c
# Obtenus en résolvant en x et y sqrt((x-ls.x)^2 + (y-ls.y)^2) = sqrt((x-lr.x)^2 + (y-lr.y)^2)
self.a = 2*(rightSite.x - leftSite.x)
self.b = 2*(rightSite.y - leftSite.y)
self.c = rightSite.x**2 + rightSite.y**2 - (leftSite.x**2 + leftSite.y**2)
def intersect(self, other):
# Résolution via la méthode de Cramer
det = self.a*other.b - self.b*other.a
if det == 0: #FIXME
return None
return Site((self.c*other.b - self.b*other.c)/det, (self.a*other.c - self.c*other.a)/det)
# Représente un noeud d'une skip-list doublement chaînée
class SkipListNode:
def __init__(self, height, parabola):
self.prv = [None]*height
self.nxt = [None]*height
self.parabola = parabola
# Représente une skiplist
class SkipList:
def __init__(self, elems):
self.maxHeight = SkipList.randomHeight()
self.head = SkipListNode(self.maxHeight, elems[0])
updateList = [self.head]*self.maxHeight
for i in range(1, len(elems)):
self.insertAfter(elems[i], updateList)
# Renvoie une hauteur aléatoire
def randomHeight():
height = 1
while randint(1, 2) != 1:
height += 1
return height
# Insertion d'un élément en fournissant le résultat de getElemByX() dans updateList
# Ça met aussi à jour updateList pour insérer à nouveau un élément après celui qu'on vient d'insérer
def insertAfter(self, elem, updateList):
height = SkipList.randomHeight()
self.maxHeight = max(height, self.maxHeight)
newNode = SkipListNode(height, elem)
# On ajuste la hauteur de updateList et self.head.nxt
while len(self.head.nxt) < self.maxHeight:
self.head.nxt.append(None)
while len(updateList) < self.maxHeight:
updateList.append(self.head)
for i in range(height):
# Là c'est comme dans une liste doublement chaînée sauf qu'il y a des [i] partout
newNode.prv[i] = updateList[i]
newNode.nxt[i] = updateList[i].nxt[i]
if updateList[i].nxt[i] != None:
updateList[i].nxt[i].prv[i] = newNode
updateList[i].nxt[i] = newNode
updateList[i] = newNode
return newNode
# Enlève un élément
def remove(self, node):
for i in range(len(node.nxt)-1, -1, -1):
# Là c'est aussi comme pour une liste doublement chaînée mais avec des [i] partout
node.prv[i].nxt[i] = node.nxt[i]
if node.nxt[i] != None:
node.nxt[i].prv[i] = node.prv[i]
if self.head.nxt[i] == None:
self.maxHeight -= 1
# On donne un X, et ça dit quelle est la parabole présente à cet endroit
# Ça renvoie les éléments à modifier si on insère une autre parabole après celle cherchée
def getElemByX(self, x):
updateList = [None]*self.maxHeight
node = self.head
for i in range(self.maxHeight-1, -1, -1):
while node.nxt[i] != None and self.getX(node) < x:
node = node.nxt[i]
# Souvent on "overshoot" la parabole et il faut revenir un peu en arrière
if node.prv[0] != None and self.getX(node.prv[0]) > x:
node = node.prv[i]
updateList[i] = node
return updateList
# Renvoie le x où node.parabola se termine
def getX(self, node):
return node.parabola.intersect(node.nxt[0].parabola)
# Représente un évènement, circle event ou site event
class Event:
def __init__(self, isSiteEvent, elem):
self.isSiteEvent = isSiteEvent
self.elem = elem
# Pour pouvoir enlever des éléments de la hashtable de la priority queue
def __hash__(self):
return hash(self.isSiteEvent) ^ hash(self.elem)
def __eq__(self, other):
return other != None and self.isSiteEvent == other.isSiteEvent and self.elem == other.elem
class Context:
def __init__(self, sites):
self.sweepline = 0
self.sites = [Site(x, y) for (x, y) in sites]
minY = min([site.y for site in self.sites])
# Les sites qui seront dans la beachline
sitesBL = [site for site in self.sites if site.y == minY] #FIXME
# Les sites qui seront dans la priority queue
sitesQ = [site for site in self.sites if site.y != minY] #FIXME
self.queue = PriorityQueue([(Event(True, site), site.y) for site in sitesQ])
self.beachline = SkipList(list(map(lambda site: Parabola(self, site), sorted(sitesBL, key=lambda site: site.x))))
# On calcule les arêtes associées aux paraboles de la beachline
node = self.beachline.head
while node.nxt[0] != None:
self.updateREdge(node)
node = node.nxt[0]
self.sweepline = minY
self.maxDist = 0
self.hull = ConvexHull(self.sites)
# Enlève un circle event de la queue
def removeCircleEvent(self, node):
if self.queue.contains(Event(False, node)):
self.queue.remove(Event(False, node))
# Ajoute un circle event dans la queue après quelques vérifications
def addCircleEvent(self, node):
def isRight(a, b, s):
return (b.x-a.x)*(s.y-a.y)-(b.y-a.y)*(s.x-a.x) >= 0
parabola = node.parabola
if parabola.hasCircleCenter():
pos = parabola.circleEventY()
center = parabola.circleCenter()
# On ajoute l'event si il est derrière la sweepline et si la parabole va en effet disparaitre
# (elle disparait si les points de la parabole de gauche, du milieu, de droite tourne dans le sens horaire, ce que calcule isRight)
if pos > self.sweepline and isRight(node.prv[0].parabola.site, node.parabola.site, node.nxt[0].parabola.site):
self.queue.push(Event(False, node), pos)
# Calcule l'arête à droite d'une parabole
def updateREdge(self, node):
if node.nxt[0] != None:
edge = Edge(node.parabola.site, node.nxt[0].parabola.site)
node.parabola.redge = edge
node.nxt[0].parabola.ledge = edge
# On a trouvé un point du diagramme de voronoi ! C'est là qu'il risque d'y avoir des jaloux
# Si le point du diagramme est en dehors de l'enveloppe convexe de tous les points alors
# Il n'y aura pas de jaloux là, il faut donc ne pas mettre à jour self.maxDist
def handleResult(self, v, p1, p2, p3):
d = v.dist(p1)
if d > self.maxDist and self.hull.isInsideHull(v):
self.maxDist = d
# La partie intéressante de l'algorithme, le traitement d'un event
def processEvent(self):
event = self.queue.pop()
# Si c'est un site event
if event.isSiteEvent:
# On récupère quelques infos
site = event.elem
self.sweepline = site.y
updateList = self.beachline.getElemByX(site.x)
leftNode = updateList[0]
xpos = self.beachline.getX(leftNode) if leftNode.nxt[0] != None else float("nan")
# On ajoute notre parabole
middleNode = self.beachline.insertAfter(Parabola(self, site), updateList)
# On enlève un éventuel circle event de la parabole qu'on "coupe"
self.removeCircleEvent(leftNode)
# Si on coupe une parabole
if site.x != xpos: #FIXME
# On ajoute la parabole qu'on coupe car pour l'instant il n'y a que la partie de gauche
rightNode = self.beachline.insertAfter(leftNode.parabola.copy(), updateList)
self.updateREdge(rightNode)
# Si on s'intercale entre deux paraboles
else:
# On a trouvé un point du diagramme de voronoï, youhou !
self.handleResult(Edge(site, leftNode.parabola.site).intersect(Edge(site, middleNode.nxt[0].parabola.site)), site, leftNode.parabola.site, middleNode.nxt[0].parabola.site)
# On re-calcule les arêtes
self.updateREdge(leftNode)
self.updateREdge(middleNode)
# On ajoute les éventuels circle events
self.addCircleEvent(leftNode)
if site.x != xpos: #FIXME
self.addCircleEvent(rightNode)
# Si c'est un circle event
else:
# On récupère quelques infos
middleNode = event.elem
leftNode = middleNode.prv[0]
rightNode = middleNode.nxt[0]
self.sweepline = middleNode.parabola.circleEventY()
# On a trouvé un point du diagramme de voronoï, youhou !
self.handleResult(middleNode.parabola.circleCenter(), leftNode.parabola.site, middleNode.parabola.site, rightNode.parabola.site)
# On enlève les éventuels circle events
self.removeCircleEvent(leftNode)
self.removeCircleEvent(rightNode)
# On enlève la parabole qui disparait
self.beachline.remove(middleNode)
# On ajoute les nouveaux circle events
self.updateREdge(leftNode)
self.addCircleEvent(leftNode)
self.addCircleEvent(rightNode)
def process(self):
# Tant qu'il y a du boulot, on le traite !
while not self.queue.empty():
self.processEvent()
def wifi(N, coords):
# La fonction "wifi" est maintenant très compliquée à coder
ctx = Context(coords)
ctx.process()
return ctx.maxDist
if __name__ == '__main__':
N = int(input())
coords = [tuple(map(int, input().split())) for _ in range(N)]
f = wifi(N, coords)
if f == 0: #FIXME
print("0")
else:
print("%.3f" % f)