import yaml import argparse import csv from collections import defaultdict import typing as t from dataclasses import dataclass import random @dataclass class Task: name: str qualified_name: str descr: str notes: str time: str nb_groups: int assigned: t.Optional[list[int]] = None class Category(t.NamedTuple): name: str depth: int time: str tasks: list # of Category|Task, but mypy doesn't support recursive types intro: str def levenshtein_distance(s1, s2): """Shamelessly stolen from https://stackoverflow.com/a/32558749""" if len(s1) > len(s2): s1, s2 = s2, s1 distances = range(len(s1) + 1) for i2, c2 in enumerate(s2): distances_ = [i2 + 1] for i1, c1 in enumerate(s1): if c1 == c2: distances_.append(distances[i1]) else: distances_.append( 1 + min((distances[i1], distances[i1 + 1], distances_[-1])) ) distances = distances_ return distances[-1] class UnionFind: parent_of: list[int] _group_size: list[int] def __init__(self, elt_count: int): self.parent_of = list(range(elt_count)) self._group_size = [1] * elt_count def root(self, elt: int) -> int: if self.parent_of[elt] == elt: return elt self.parent_of[elt] = self.root(self.parent_of[elt]) return self.parent_of[elt] def union(self, elt1: int, elt2: int) -> None: elt1 = self.root(elt1) elt2 = self.root(elt2) if elt1 == elt2: return if self._group_size[elt1] > self._group_size[elt2]: self.union(elt2, elt1) else: self._group_size[elt2] += self._group_size[elt1] self._group_size[elt1] = 0 self.parent_of[self.root(elt1)] = self.root(elt2) def group_size(self, elt: int) -> int: return self._group_size[self.root(elt)] class Config: tasks_path: str people_path: str choristes: list[str] ca: list[str] taches: Category env: dict[str, str] def __init__(self, tasks_path: str, people_path: str): self.tasks_path = tasks_path self.people_path = people_path self.choristes = [] self.ca = [] self.env = {} self._load_tasks() self._load_people() def _load_tasks(self) -> None: with open(self.tasks_path, "r") as h: raw_tasks = yaml.safe_load(h) assert "taches" in raw_tasks self.env = raw_tasks["env"] self.taches = Category( name="", depth=0, time="", intro="", tasks=list(map(self._load_task_cat, raw_tasks["taches"])), ) self.ca = raw_tasks["CA"] def _load_task_cat( self, cat: dict[str, t.Any], depth: int = 1, qual: str = "" ) -> Task | Category: if "cat" not in cat: return self._load_task(cat, qual) assert "taches" in cat nqual = cat["cat"] if qual: nqual = f"{qual} - {nqual}" return Category( name=cat["cat"], depth=depth, time=cat.get("heure", ""), intro=cat.get("intro", ""), tasks=list( map( lambda x: self._load_task_cat(x, depth=depth + 1, qual=nqual), cat["taches"], ) ), ) def _load_task(self, task: dict[str, t.Any], qual: str) -> Task: for label in ("nom", "descr"): assert label in task qual_name = f'{qual}{" - " if qual else ""}{task["nom"]}' return Task( name=task["nom"], qualified_name=qual_name, descr=task["descr"].format(**self.env), notes=task.get("notes", ""), time=task.get("heure", ""), nb_groups=int(task.get("nb_groups", 1)), ) def _load_people(self) -> None: with open(self.people_path, "r") as h: raw_people: list[dict[str, str]] = list(csv.DictReader(h)) for key in "Nom", "Prénom": assert key in raw_people[0] raw_people.sort(key=lambda x: x["Prénom"]) # Normalize def normalize(x: str) -> str: x = x.strip() if " " in x: return " ".join(map(normalize, x.split())) return x[0].upper() + x[1:].lower() for pers in raw_people: pers["Nom"] = normalize(pers["Nom"]) pers["Prénom"] = normalize(pers["Prénom"]) # Group by name proximity name_uf = UnionFind(len(raw_people)) for id1, pers1 in enumerate(raw_people): for id2, pers2 in enumerate(raw_people): if ( id1 < id2 and levenshtein_distance(pers1["Prénom"], pers2["Prénom"]) <= 2 ): name_uf.union(id1, id2) _name_groups: dict[int, list[dict]] = defaultdict(list) for pers_id, pers in enumerate(raw_people): _name_groups[name_uf.root(pers_id)].append(pers) name_groups: list[list[dict]] = list(_name_groups.values()) # Disambiguate names def make_short_name(pers: dict, disamb: int = 0) -> str: if disamb: return f"{pers['Prénom']} {pers['Nom'][:disamb]}." else: return pers["Prénom"] self.choristes = [] for grp in name_groups: if len(grp) == 1: self.choristes.append(make_short_name(grp[0])) else: req_letters = 1 while req_letters < 100: # safeguard short_names = list( map(lambda x: make_short_name(x, req_letters), grp) ) if len(set(short_names)) == len(short_names): # No clashes self.choristes += short_names break req_letters += 1 self.choristes.sort() def constituer_groupes(choristes: list[str]) -> list[list[str]]: """Répartir aléatoirement les gens en groupes""" TAILLE_GROUPE: int = 4 nb_choristes = len(choristes) groupes: list[list[str]] = [] random.shuffle(choristes) pos = 0 for _ in range(nb_choristes // TAILLE_GROUPE): groupes.append(choristes[pos : pos + TAILLE_GROUPE]) pos += TAILLE_GROUPE reste = choristes[pos:] if len(reste) == TAILLE_GROUPE - 1: groupes.append(reste) else: for gid, pers in enumerate(reste): groupes[gid].append(pers) for groupe in groupes: groupe.sort() random.shuffle(groupes) return groupes def assigner_taches(task: Category | Task, group_count: int, cur_group: int = 0) -> int: """Assigne les tâches aux groupes (round-robin)""" if isinstance(task, Task): task.assigned = list( map( lambda x: x % group_count, range(cur_group, cur_group + task.nb_groups), ) ) return (cur_group + task.nb_groups) % group_count for subtask in task.tasks: cur_group = assigner_taches(subtask, group_count, cur_group) return cur_group def export_short(config: Config, groupes: list[list[str]]) -> str: """Exporte la liste des tâches au format court (pour vérification)""" def export_taskcat(grp: Task | Category) -> str: if isinstance(grp, Task): return f'* {grp.qualified_name}: {", ".join(map(lambda x: str(x+1), grp.assigned))}' out = "\n" + "#" * (2 + grp.depth) + f" {grp.name}" if grp.time: out += f" ({grp.time})" out += "\n\n" if grp.intro: out += grp.intro + "\n\n" out += "\n".join(map(export_taskcat, grp.tasks)) return out out = "## Groupes\n\n" for g_id, group in enumerate(groupes): out += f"* Groupe {g_id+1} : " + ", ".join(group) + "\n" out += "\n## Tâches\n" out += "\n".join(map(export_taskcat, config.taches.tasks)) return out def export_latex(config: Config, groupes: list[list[str]]) -> str: """Exporter la liste des tâches en LaTeX (à insérer dans un template)""" def main(): parser = argparse.ArgumentParser("Répartition des tâches") parser.add_argument("taches", help="Fichier yaml contenant les tâches") parser.add_argument("choristes", help="Fichier CSV contenant les choristes") args = parser.parse_args() config = Config(args.taches, args.choristes) if __name__ == "__main__": main()