First attempt at repartir_taches.py
This commit is contained in:
parent
3121d0ca58
commit
47783bbacc
1 changed files with 287 additions and 0 deletions
287
repartir_taches.py
Normal file
287
repartir_taches.py
Normal file
|
@ -0,0 +1,287 @@
|
|||
import yaml
|
||||
import argparse
|
||||
import csv
|
||||
from collections import defaultdict
|
||||
import typing as t
|
||||
from dataclasses import dataclass
|
||||
import random
|
||||
|
||||
|
||||
@dataclass
|
||||
class Task:
|
||||
name: str
|
||||
qualified_name: str
|
||||
descr: str
|
||||
notes: str
|
||||
time: str
|
||||
nb_groups: int
|
||||
|
||||
assigned: t.Optional[list[int]] = None
|
||||
|
||||
|
||||
class Category(t.NamedTuple):
|
||||
name: str
|
||||
depth: int
|
||||
time: str
|
||||
tasks: list # of Category|Task, but mypy doesn't support recursive types
|
||||
intro: str
|
||||
|
||||
|
||||
def levenshtein_distance(s1, s2):
|
||||
"""Shamelessly stolen from https://stackoverflow.com/a/32558749"""
|
||||
if len(s1) > len(s2):
|
||||
s1, s2 = s2, s1
|
||||
|
||||
distances = range(len(s1) + 1)
|
||||
for i2, c2 in enumerate(s2):
|
||||
distances_ = [i2 + 1]
|
||||
for i1, c1 in enumerate(s1):
|
||||
if c1 == c2:
|
||||
distances_.append(distances[i1])
|
||||
else:
|
||||
distances_.append(
|
||||
1 + min((distances[i1], distances[i1 + 1], distances_[-1]))
|
||||
)
|
||||
distances = distances_
|
||||
return distances[-1]
|
||||
|
||||
|
||||
class UnionFind:
|
||||
parent_of: list[int]
|
||||
_group_size: list[int]
|
||||
|
||||
def __init__(self, elt_count: int):
|
||||
self.parent_of = list(range(elt_count))
|
||||
self._group_size = [1] * elt_count
|
||||
|
||||
def root(self, elt: int) -> int:
|
||||
if self.parent_of[elt] == elt:
|
||||
return elt
|
||||
self.parent_of[elt] = self.root(self.parent_of[elt])
|
||||
return self.parent_of[elt]
|
||||
|
||||
def union(self, elt1: int, elt2: int) -> None:
|
||||
elt1 = self.root(elt1)
|
||||
elt2 = self.root(elt2)
|
||||
if elt1 == elt2:
|
||||
return
|
||||
if self._group_size[elt1] > self._group_size[elt2]:
|
||||
self.union(elt2, elt1)
|
||||
else:
|
||||
self._group_size[elt2] += self._group_size[elt1]
|
||||
self._group_size[elt1] = 0
|
||||
self.parent_of[self.root(elt1)] = self.root(elt2)
|
||||
|
||||
def group_size(self, elt: int) -> int:
|
||||
return self._group_size[self.root(elt)]
|
||||
|
||||
|
||||
class Config:
|
||||
tasks_path: str
|
||||
people_path: str
|
||||
|
||||
choristes: list[str]
|
||||
ca: list[str]
|
||||
taches: Category
|
||||
env: dict[str, str]
|
||||
|
||||
def __init__(self, tasks_path: str, people_path: str):
|
||||
self.tasks_path = tasks_path
|
||||
self.people_path = people_path
|
||||
self.choristes = []
|
||||
self.ca = []
|
||||
self.env = {}
|
||||
self._load_tasks()
|
||||
self._load_people()
|
||||
|
||||
def _load_tasks(self) -> None:
|
||||
with open(self.tasks_path, "r") as h:
|
||||
raw_tasks = yaml.safe_load(h)
|
||||
assert "taches" in raw_tasks
|
||||
self.env = raw_tasks["env"]
|
||||
self.taches = Category(
|
||||
name="",
|
||||
depth=0,
|
||||
time="",
|
||||
intro="",
|
||||
tasks=list(map(self._load_task_cat, raw_tasks["taches"])),
|
||||
)
|
||||
self.ca = raw_tasks["CA"]
|
||||
|
||||
def _load_task_cat(
|
||||
self, cat: dict[str, t.Any], depth: int = 1, qual: str = ""
|
||||
) -> Task | Category:
|
||||
if "cat" not in cat:
|
||||
return self._load_task(cat, qual)
|
||||
assert "taches" in cat
|
||||
nqual = cat["cat"]
|
||||
if qual:
|
||||
nqual = f"{qual} - {nqual}"
|
||||
return Category(
|
||||
name=cat["cat"],
|
||||
depth=depth,
|
||||
time=cat.get("heure", ""),
|
||||
intro=cat.get("intro", ""),
|
||||
tasks=list(
|
||||
map(
|
||||
lambda x: self._load_task_cat(x, depth=depth + 1, qual=nqual),
|
||||
cat["taches"],
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
def _load_task(self, task: dict[str, t.Any], qual: str) -> Task:
|
||||
for label in ("nom", "descr"):
|
||||
assert label in task
|
||||
qual_name = f'{qual}{" - " if qual else ""}{task["nom"]}'
|
||||
return Task(
|
||||
name=task["nom"],
|
||||
qualified_name=qual_name,
|
||||
descr=task["descr"].format(**self.env),
|
||||
notes=task.get("notes", ""),
|
||||
time=task.get("heure", ""),
|
||||
nb_groups=int(task.get("nb_groups", 1)),
|
||||
)
|
||||
|
||||
def _load_people(self) -> None:
|
||||
with open(self.people_path, "r") as h:
|
||||
raw_people: list[dict[str, str]] = list(csv.DictReader(h))
|
||||
for key in "Nom", "Prénom":
|
||||
assert key in raw_people[0]
|
||||
raw_people.sort(key=lambda x: x["Prénom"])
|
||||
|
||||
# Normalize
|
||||
def normalize(x: str) -> str:
|
||||
x = x.strip()
|
||||
if " " in x:
|
||||
return " ".join(map(normalize, x.split()))
|
||||
return x[0].upper() + x[1:].lower()
|
||||
|
||||
for pers in raw_people:
|
||||
pers["Nom"] = normalize(pers["Nom"])
|
||||
pers["Prénom"] = normalize(pers["Prénom"])
|
||||
|
||||
# Group by name proximity
|
||||
name_uf = UnionFind(len(raw_people))
|
||||
for id1, pers1 in enumerate(raw_people):
|
||||
for id2, pers2 in enumerate(raw_people):
|
||||
if (
|
||||
id1 < id2
|
||||
and levenshtein_distance(pers1["Prénom"], pers2["Prénom"]) <= 2
|
||||
):
|
||||
name_uf.union(id1, id2)
|
||||
_name_groups: dict[int, list[dict]] = defaultdict(list)
|
||||
for pers_id, pers in enumerate(raw_people):
|
||||
_name_groups[name_uf.root(pers_id)].append(pers)
|
||||
name_groups: list[list[dict]] = list(_name_groups.values())
|
||||
|
||||
# Disambiguate names
|
||||
def make_short_name(pers: dict, disamb: int = 0) -> str:
|
||||
if disamb:
|
||||
return f"{pers['Prénom']} {pers['Nom'][:disamb]}."
|
||||
else:
|
||||
return pers["Prénom"]
|
||||
|
||||
self.choristes = []
|
||||
for grp in name_groups:
|
||||
if len(grp) == 1:
|
||||
self.choristes.append(make_short_name(grp[0]))
|
||||
else:
|
||||
req_letters = 1
|
||||
while req_letters < 100: # safeguard
|
||||
short_names = list(
|
||||
map(lambda x: make_short_name(x, req_letters), grp)
|
||||
)
|
||||
if len(set(short_names)) == len(short_names):
|
||||
# No clashes
|
||||
self.choristes += short_names
|
||||
break
|
||||
req_letters += 1
|
||||
|
||||
self.choristes.sort()
|
||||
|
||||
|
||||
def constituer_groupes(choristes: list[str]) -> list[list[str]]:
|
||||
"""Répartir aléatoirement les gens en groupes"""
|
||||
TAILLE_GROUPE: int = 4
|
||||
nb_choristes = len(choristes)
|
||||
|
||||
groupes: list[list[str]] = []
|
||||
|
||||
random.shuffle(choristes)
|
||||
pos = 0
|
||||
for _ in range(nb_choristes // TAILLE_GROUPE):
|
||||
groupes.append(choristes[pos : pos + TAILLE_GROUPE])
|
||||
pos += TAILLE_GROUPE
|
||||
|
||||
reste = choristes[pos:]
|
||||
|
||||
if len(reste) == TAILLE_GROUPE - 1:
|
||||
groupes.append(reste)
|
||||
else:
|
||||
for gid, pers in enumerate(reste):
|
||||
groupes[gid].append(pers)
|
||||
|
||||
for groupe in groupes:
|
||||
groupe.sort()
|
||||
random.shuffle(groupes)
|
||||
|
||||
return groupes
|
||||
|
||||
|
||||
def assigner_taches(task: Category | Task, group_count: int, cur_group: int = 0) -> int:
|
||||
"""Assigne les tâches aux groupes (round-robin)"""
|
||||
if isinstance(task, Task):
|
||||
task.assigned = list(
|
||||
map(
|
||||
lambda x: x % group_count,
|
||||
range(cur_group, cur_group + task.nb_groups),
|
||||
)
|
||||
)
|
||||
return (cur_group + task.nb_groups) % group_count
|
||||
for subtask in task.tasks:
|
||||
cur_group = assigner_taches(subtask, group_count, cur_group)
|
||||
return cur_group
|
||||
|
||||
|
||||
def export_short(config: Config, groupes: list[list[str]]) -> str:
|
||||
"""Exporte la liste des tâches au format court (pour vérification)"""
|
||||
|
||||
def export_taskcat(grp: Task | Category) -> str:
|
||||
if isinstance(grp, Task):
|
||||
return f'* {grp.qualified_name}: {", ".join(map(lambda x: str(x+1), grp.assigned))}'
|
||||
out = "\n" + "#" * (2 + grp.depth) + f" {grp.name}"
|
||||
if grp.time:
|
||||
out += f" ({grp.time})"
|
||||
out += "\n\n"
|
||||
if grp.intro:
|
||||
out += grp.intro + "\n\n"
|
||||
out += "\n".join(map(export_taskcat, grp.tasks))
|
||||
return out
|
||||
|
||||
out = "## Groupes\n\n"
|
||||
for g_id, group in enumerate(groupes):
|
||||
out += f"* Groupe {g_id+1} : " + ", ".join(group) + "\n"
|
||||
|
||||
out += "\n## Tâches\n"
|
||||
|
||||
out += "\n".join(map(export_taskcat, config.taches.tasks))
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def export_latex(config: Config, groupes: list[list[str]]) -> str:
|
||||
"""Exporter la liste des tâches en LaTeX (à insérer dans un template)"""
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser("Répartition des tâches")
|
||||
parser.add_argument("taches", help="Fichier yaml contenant les tâches")
|
||||
parser.add_argument("choristes", help="Fichier CSV contenant les choristes")
|
||||
args = parser.parse_args()
|
||||
|
||||
config = Config(args.taches, args.choristes)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in a new issue