288 lines
8.5 KiB
Python
288 lines
8.5 KiB
Python
|
import yaml
|
|||
|
import argparse
|
|||
|
import csv
|
|||
|
from collections import defaultdict
|
|||
|
import typing as t
|
|||
|
from dataclasses import dataclass
|
|||
|
import random
|
|||
|
|
|||
|
|
|||
|
@dataclass
|
|||
|
class Task:
|
|||
|
name: str
|
|||
|
qualified_name: str
|
|||
|
descr: str
|
|||
|
notes: str
|
|||
|
time: str
|
|||
|
nb_groups: int
|
|||
|
|
|||
|
assigned: t.Optional[list[int]] = None
|
|||
|
|
|||
|
|
|||
|
class Category(t.NamedTuple):
|
|||
|
name: str
|
|||
|
depth: int
|
|||
|
time: str
|
|||
|
tasks: list # of Category|Task, but mypy doesn't support recursive types
|
|||
|
intro: str
|
|||
|
|
|||
|
|
|||
|
def levenshtein_distance(s1, s2):
|
|||
|
"""Shamelessly stolen from https://stackoverflow.com/a/32558749"""
|
|||
|
if len(s1) > len(s2):
|
|||
|
s1, s2 = s2, s1
|
|||
|
|
|||
|
distances = range(len(s1) + 1)
|
|||
|
for i2, c2 in enumerate(s2):
|
|||
|
distances_ = [i2 + 1]
|
|||
|
for i1, c1 in enumerate(s1):
|
|||
|
if c1 == c2:
|
|||
|
distances_.append(distances[i1])
|
|||
|
else:
|
|||
|
distances_.append(
|
|||
|
1 + min((distances[i1], distances[i1 + 1], distances_[-1]))
|
|||
|
)
|
|||
|
distances = distances_
|
|||
|
return distances[-1]
|
|||
|
|
|||
|
|
|||
|
class UnionFind:
|
|||
|
parent_of: list[int]
|
|||
|
_group_size: list[int]
|
|||
|
|
|||
|
def __init__(self, elt_count: int):
|
|||
|
self.parent_of = list(range(elt_count))
|
|||
|
self._group_size = [1] * elt_count
|
|||
|
|
|||
|
def root(self, elt: int) -> int:
|
|||
|
if self.parent_of[elt] == elt:
|
|||
|
return elt
|
|||
|
self.parent_of[elt] = self.root(self.parent_of[elt])
|
|||
|
return self.parent_of[elt]
|
|||
|
|
|||
|
def union(self, elt1: int, elt2: int) -> None:
|
|||
|
elt1 = self.root(elt1)
|
|||
|
elt2 = self.root(elt2)
|
|||
|
if elt1 == elt2:
|
|||
|
return
|
|||
|
if self._group_size[elt1] > self._group_size[elt2]:
|
|||
|
self.union(elt2, elt1)
|
|||
|
else:
|
|||
|
self._group_size[elt2] += self._group_size[elt1]
|
|||
|
self._group_size[elt1] = 0
|
|||
|
self.parent_of[self.root(elt1)] = self.root(elt2)
|
|||
|
|
|||
|
def group_size(self, elt: int) -> int:
|
|||
|
return self._group_size[self.root(elt)]
|
|||
|
|
|||
|
|
|||
|
class Config:
|
|||
|
tasks_path: str
|
|||
|
people_path: str
|
|||
|
|
|||
|
choristes: list[str]
|
|||
|
ca: list[str]
|
|||
|
taches: Category
|
|||
|
env: dict[str, str]
|
|||
|
|
|||
|
def __init__(self, tasks_path: str, people_path: str):
|
|||
|
self.tasks_path = tasks_path
|
|||
|
self.people_path = people_path
|
|||
|
self.choristes = []
|
|||
|
self.ca = []
|
|||
|
self.env = {}
|
|||
|
self._load_tasks()
|
|||
|
self._load_people()
|
|||
|
|
|||
|
def _load_tasks(self) -> None:
|
|||
|
with open(self.tasks_path, "r") as h:
|
|||
|
raw_tasks = yaml.safe_load(h)
|
|||
|
assert "taches" in raw_tasks
|
|||
|
self.env = raw_tasks["env"]
|
|||
|
self.taches = Category(
|
|||
|
name="",
|
|||
|
depth=0,
|
|||
|
time="",
|
|||
|
intro="",
|
|||
|
tasks=list(map(self._load_task_cat, raw_tasks["taches"])),
|
|||
|
)
|
|||
|
self.ca = raw_tasks["CA"]
|
|||
|
|
|||
|
def _load_task_cat(
|
|||
|
self, cat: dict[str, t.Any], depth: int = 1, qual: str = ""
|
|||
|
) -> Task | Category:
|
|||
|
if "cat" not in cat:
|
|||
|
return self._load_task(cat, qual)
|
|||
|
assert "taches" in cat
|
|||
|
nqual = cat["cat"]
|
|||
|
if qual:
|
|||
|
nqual = f"{qual} - {nqual}"
|
|||
|
return Category(
|
|||
|
name=cat["cat"],
|
|||
|
depth=depth,
|
|||
|
time=cat.get("heure", ""),
|
|||
|
intro=cat.get("intro", ""),
|
|||
|
tasks=list(
|
|||
|
map(
|
|||
|
lambda x: self._load_task_cat(x, depth=depth + 1, qual=nqual),
|
|||
|
cat["taches"],
|
|||
|
)
|
|||
|
),
|
|||
|
)
|
|||
|
|
|||
|
def _load_task(self, task: dict[str, t.Any], qual: str) -> Task:
|
|||
|
for label in ("nom", "descr"):
|
|||
|
assert label in task
|
|||
|
qual_name = f'{qual}{" - " if qual else ""}{task["nom"]}'
|
|||
|
return Task(
|
|||
|
name=task["nom"],
|
|||
|
qualified_name=qual_name,
|
|||
|
descr=task["descr"].format(**self.env),
|
|||
|
notes=task.get("notes", ""),
|
|||
|
time=task.get("heure", ""),
|
|||
|
nb_groups=int(task.get("nb_groups", 1)),
|
|||
|
)
|
|||
|
|
|||
|
def _load_people(self) -> None:
|
|||
|
with open(self.people_path, "r") as h:
|
|||
|
raw_people: list[dict[str, str]] = list(csv.DictReader(h))
|
|||
|
for key in "Nom", "Prénom":
|
|||
|
assert key in raw_people[0]
|
|||
|
raw_people.sort(key=lambda x: x["Prénom"])
|
|||
|
|
|||
|
# Normalize
|
|||
|
def normalize(x: str) -> str:
|
|||
|
x = x.strip()
|
|||
|
if " " in x:
|
|||
|
return " ".join(map(normalize, x.split()))
|
|||
|
return x[0].upper() + x[1:].lower()
|
|||
|
|
|||
|
for pers in raw_people:
|
|||
|
pers["Nom"] = normalize(pers["Nom"])
|
|||
|
pers["Prénom"] = normalize(pers["Prénom"])
|
|||
|
|
|||
|
# Group by name proximity
|
|||
|
name_uf = UnionFind(len(raw_people))
|
|||
|
for id1, pers1 in enumerate(raw_people):
|
|||
|
for id2, pers2 in enumerate(raw_people):
|
|||
|
if (
|
|||
|
id1 < id2
|
|||
|
and levenshtein_distance(pers1["Prénom"], pers2["Prénom"]) <= 2
|
|||
|
):
|
|||
|
name_uf.union(id1, id2)
|
|||
|
_name_groups: dict[int, list[dict]] = defaultdict(list)
|
|||
|
for pers_id, pers in enumerate(raw_people):
|
|||
|
_name_groups[name_uf.root(pers_id)].append(pers)
|
|||
|
name_groups: list[list[dict]] = list(_name_groups.values())
|
|||
|
|
|||
|
# Disambiguate names
|
|||
|
def make_short_name(pers: dict, disamb: int = 0) -> str:
|
|||
|
if disamb:
|
|||
|
return f"{pers['Prénom']} {pers['Nom'][:disamb]}."
|
|||
|
else:
|
|||
|
return pers["Prénom"]
|
|||
|
|
|||
|
self.choristes = []
|
|||
|
for grp in name_groups:
|
|||
|
if len(grp) == 1:
|
|||
|
self.choristes.append(make_short_name(grp[0]))
|
|||
|
else:
|
|||
|
req_letters = 1
|
|||
|
while req_letters < 100: # safeguard
|
|||
|
short_names = list(
|
|||
|
map(lambda x: make_short_name(x, req_letters), grp)
|
|||
|
)
|
|||
|
if len(set(short_names)) == len(short_names):
|
|||
|
# No clashes
|
|||
|
self.choristes += short_names
|
|||
|
break
|
|||
|
req_letters += 1
|
|||
|
|
|||
|
self.choristes.sort()
|
|||
|
|
|||
|
|
|||
|
def constituer_groupes(choristes: list[str]) -> list[list[str]]:
|
|||
|
"""Répartir aléatoirement les gens en groupes"""
|
|||
|
TAILLE_GROUPE: int = 4
|
|||
|
nb_choristes = len(choristes)
|
|||
|
|
|||
|
groupes: list[list[str]] = []
|
|||
|
|
|||
|
random.shuffle(choristes)
|
|||
|
pos = 0
|
|||
|
for _ in range(nb_choristes // TAILLE_GROUPE):
|
|||
|
groupes.append(choristes[pos : pos + TAILLE_GROUPE])
|
|||
|
pos += TAILLE_GROUPE
|
|||
|
|
|||
|
reste = choristes[pos:]
|
|||
|
|
|||
|
if len(reste) == TAILLE_GROUPE - 1:
|
|||
|
groupes.append(reste)
|
|||
|
else:
|
|||
|
for gid, pers in enumerate(reste):
|
|||
|
groupes[gid].append(pers)
|
|||
|
|
|||
|
for groupe in groupes:
|
|||
|
groupe.sort()
|
|||
|
random.shuffle(groupes)
|
|||
|
|
|||
|
return groupes
|
|||
|
|
|||
|
|
|||
|
def assigner_taches(task: Category | Task, group_count: int, cur_group: int = 0) -> int:
|
|||
|
"""Assigne les tâches aux groupes (round-robin)"""
|
|||
|
if isinstance(task, Task):
|
|||
|
task.assigned = list(
|
|||
|
map(
|
|||
|
lambda x: x % group_count,
|
|||
|
range(cur_group, cur_group + task.nb_groups),
|
|||
|
)
|
|||
|
)
|
|||
|
return (cur_group + task.nb_groups) % group_count
|
|||
|
for subtask in task.tasks:
|
|||
|
cur_group = assigner_taches(subtask, group_count, cur_group)
|
|||
|
return cur_group
|
|||
|
|
|||
|
|
|||
|
def export_short(config: Config, groupes: list[list[str]]) -> str:
|
|||
|
"""Exporte la liste des tâches au format court (pour vérification)"""
|
|||
|
|
|||
|
def export_taskcat(grp: Task | Category) -> str:
|
|||
|
if isinstance(grp, Task):
|
|||
|
return f'* {grp.qualified_name}: {", ".join(map(lambda x: str(x+1), grp.assigned))}'
|
|||
|
out = "\n" + "#" * (2 + grp.depth) + f" {grp.name}"
|
|||
|
if grp.time:
|
|||
|
out += f" ({grp.time})"
|
|||
|
out += "\n\n"
|
|||
|
if grp.intro:
|
|||
|
out += grp.intro + "\n\n"
|
|||
|
out += "\n".join(map(export_taskcat, grp.tasks))
|
|||
|
return out
|
|||
|
|
|||
|
out = "## Groupes\n\n"
|
|||
|
for g_id, group in enumerate(groupes):
|
|||
|
out += f"* Groupe {g_id+1} : " + ", ".join(group) + "\n"
|
|||
|
|
|||
|
out += "\n## Tâches\n"
|
|||
|
|
|||
|
out += "\n".join(map(export_taskcat, config.taches.tasks))
|
|||
|
|
|||
|
return out
|
|||
|
|
|||
|
|
|||
|
def export_latex(config: Config, groupes: list[list[str]]) -> str:
|
|||
|
"""Exporter la liste des tâches en LaTeX (à insérer dans un template)"""
|
|||
|
|
|||
|
|
|||
|
def main():
|
|||
|
parser = argparse.ArgumentParser("Répartition des tâches")
|
|||
|
parser.add_argument("taches", help="Fichier yaml contenant les tâches")
|
|||
|
parser.add_argument("choristes", help="Fichier CSV contenant les choristes")
|
|||
|
args = parser.parse_args()
|
|||
|
|
|||
|
config = Config(args.taches, args.choristes)
|
|||
|
|
|||
|
|
|||
|
if __name__ == "__main__":
|
|||
|
main()
|