From 47783bbacc59d9dfff12c21190a386a11ebef11e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Bastian?= Date: Sat, 29 Oct 2022 18:43:22 +0200 Subject: [PATCH] First attempt at repartir_taches.py --- repartir_taches.py | 287 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 287 insertions(+) create mode 100644 repartir_taches.py diff --git a/repartir_taches.py b/repartir_taches.py new file mode 100644 index 0000000..ca0f348 --- /dev/null +++ b/repartir_taches.py @@ -0,0 +1,287 @@ +import yaml +import argparse +import csv +from collections import defaultdict +import typing as t +from dataclasses import dataclass +import random + + +@dataclass +class Task: + name: str + qualified_name: str + descr: str + notes: str + time: str + nb_groups: int + + assigned: t.Optional[list[int]] = None + + +class Category(t.NamedTuple): + name: str + depth: int + time: str + tasks: list # of Category|Task, but mypy doesn't support recursive types + intro: str + + +def levenshtein_distance(s1, s2): + """Shamelessly stolen from https://stackoverflow.com/a/32558749""" + if len(s1) > len(s2): + s1, s2 = s2, s1 + + distances = range(len(s1) + 1) + for i2, c2 in enumerate(s2): + distances_ = [i2 + 1] + for i1, c1 in enumerate(s1): + if c1 == c2: + distances_.append(distances[i1]) + else: + distances_.append( + 1 + min((distances[i1], distances[i1 + 1], distances_[-1])) + ) + distances = distances_ + return distances[-1] + + +class UnionFind: + parent_of: list[int] + _group_size: list[int] + + def __init__(self, elt_count: int): + self.parent_of = list(range(elt_count)) + self._group_size = [1] * elt_count + + def root(self, elt: int) -> int: + if self.parent_of[elt] == elt: + return elt + self.parent_of[elt] = self.root(self.parent_of[elt]) + return self.parent_of[elt] + + def union(self, elt1: int, elt2: int) -> None: + elt1 = self.root(elt1) + elt2 = self.root(elt2) + if elt1 == elt2: + return + if self._group_size[elt1] > self._group_size[elt2]: + self.union(elt2, elt1) + else: + self._group_size[elt2] += self._group_size[elt1] + self._group_size[elt1] = 0 + self.parent_of[self.root(elt1)] = self.root(elt2) + + def group_size(self, elt: int) -> int: + return self._group_size[self.root(elt)] + + +class Config: + tasks_path: str + people_path: str + + choristes: list[str] + ca: list[str] + taches: Category + env: dict[str, str] + + def __init__(self, tasks_path: str, people_path: str): + self.tasks_path = tasks_path + self.people_path = people_path + self.choristes = [] + self.ca = [] + self.env = {} + self._load_tasks() + self._load_people() + + def _load_tasks(self) -> None: + with open(self.tasks_path, "r") as h: + raw_tasks = yaml.safe_load(h) + assert "taches" in raw_tasks + self.env = raw_tasks["env"] + self.taches = Category( + name="", + depth=0, + time="", + intro="", + tasks=list(map(self._load_task_cat, raw_tasks["taches"])), + ) + self.ca = raw_tasks["CA"] + + def _load_task_cat( + self, cat: dict[str, t.Any], depth: int = 1, qual: str = "" + ) -> Task | Category: + if "cat" not in cat: + return self._load_task(cat, qual) + assert "taches" in cat + nqual = cat["cat"] + if qual: + nqual = f"{qual} - {nqual}" + return Category( + name=cat["cat"], + depth=depth, + time=cat.get("heure", ""), + intro=cat.get("intro", ""), + tasks=list( + map( + lambda x: self._load_task_cat(x, depth=depth + 1, qual=nqual), + cat["taches"], + ) + ), + ) + + def _load_task(self, task: dict[str, t.Any], qual: str) -> Task: + for label in ("nom", "descr"): + assert label in task + qual_name = f'{qual}{" - " if qual else ""}{task["nom"]}' + return Task( + name=task["nom"], + qualified_name=qual_name, + descr=task["descr"].format(**self.env), + notes=task.get("notes", ""), + time=task.get("heure", ""), + nb_groups=int(task.get("nb_groups", 1)), + ) + + def _load_people(self) -> None: + with open(self.people_path, "r") as h: + raw_people: list[dict[str, str]] = list(csv.DictReader(h)) + for key in "Nom", "Prénom": + assert key in raw_people[0] + raw_people.sort(key=lambda x: x["Prénom"]) + + # Normalize + def normalize(x: str) -> str: + x = x.strip() + if " " in x: + return " ".join(map(normalize, x.split())) + return x[0].upper() + x[1:].lower() + + for pers in raw_people: + pers["Nom"] = normalize(pers["Nom"]) + pers["Prénom"] = normalize(pers["Prénom"]) + + # Group by name proximity + name_uf = UnionFind(len(raw_people)) + for id1, pers1 in enumerate(raw_people): + for id2, pers2 in enumerate(raw_people): + if ( + id1 < id2 + and levenshtein_distance(pers1["Prénom"], pers2["Prénom"]) <= 2 + ): + name_uf.union(id1, id2) + _name_groups: dict[int, list[dict]] = defaultdict(list) + for pers_id, pers in enumerate(raw_people): + _name_groups[name_uf.root(pers_id)].append(pers) + name_groups: list[list[dict]] = list(_name_groups.values()) + + # Disambiguate names + def make_short_name(pers: dict, disamb: int = 0) -> str: + if disamb: + return f"{pers['Prénom']} {pers['Nom'][:disamb]}." + else: + return pers["Prénom"] + + self.choristes = [] + for grp in name_groups: + if len(grp) == 1: + self.choristes.append(make_short_name(grp[0])) + else: + req_letters = 1 + while req_letters < 100: # safeguard + short_names = list( + map(lambda x: make_short_name(x, req_letters), grp) + ) + if len(set(short_names)) == len(short_names): + # No clashes + self.choristes += short_names + break + req_letters += 1 + + self.choristes.sort() + + +def constituer_groupes(choristes: list[str]) -> list[list[str]]: + """Répartir aléatoirement les gens en groupes""" + TAILLE_GROUPE: int = 4 + nb_choristes = len(choristes) + + groupes: list[list[str]] = [] + + random.shuffle(choristes) + pos = 0 + for _ in range(nb_choristes // TAILLE_GROUPE): + groupes.append(choristes[pos : pos + TAILLE_GROUPE]) + pos += TAILLE_GROUPE + + reste = choristes[pos:] + + if len(reste) == TAILLE_GROUPE - 1: + groupes.append(reste) + else: + for gid, pers in enumerate(reste): + groupes[gid].append(pers) + + for groupe in groupes: + groupe.sort() + random.shuffle(groupes) + + return groupes + + +def assigner_taches(task: Category | Task, group_count: int, cur_group: int = 0) -> int: + """Assigne les tâches aux groupes (round-robin)""" + if isinstance(task, Task): + task.assigned = list( + map( + lambda x: x % group_count, + range(cur_group, cur_group + task.nb_groups), + ) + ) + return (cur_group + task.nb_groups) % group_count + for subtask in task.tasks: + cur_group = assigner_taches(subtask, group_count, cur_group) + return cur_group + + +def export_short(config: Config, groupes: list[list[str]]) -> str: + """Exporte la liste des tâches au format court (pour vérification)""" + + def export_taskcat(grp: Task | Category) -> str: + if isinstance(grp, Task): + return f'* {grp.qualified_name}: {", ".join(map(lambda x: str(x+1), grp.assigned))}' + out = "\n" + "#" * (2 + grp.depth) + f" {grp.name}" + if grp.time: + out += f" ({grp.time})" + out += "\n\n" + if grp.intro: + out += grp.intro + "\n\n" + out += "\n".join(map(export_taskcat, grp.tasks)) + return out + + out = "## Groupes\n\n" + for g_id, group in enumerate(groupes): + out += f"* Groupe {g_id+1} : " + ", ".join(group) + "\n" + + out += "\n## Tâches\n" + + out += "\n".join(map(export_taskcat, config.taches.tasks)) + + return out + + +def export_latex(config: Config, groupes: list[list[str]]) -> str: + """Exporter la liste des tâches en LaTeX (à insérer dans un template)""" + + +def main(): + parser = argparse.ArgumentParser("Répartition des tâches") + parser.add_argument("taches", help="Fichier yaml contenant les tâches") + parser.add_argument("choristes", help="Fichier CSV contenant les choristes") + args = parser.parse_args() + + config = Config(args.taches, args.choristes) + + +if __name__ == "__main__": + main()