from dataclasses import dataclass from collections import defaultdict import csv import typing as t from ruamel import yaml from .util import levenshtein_distance, UnionFind @dataclass class Task: name: str qualified_name: str descr: str tough: int notes: str time: str nb_groups: int referent: t.Optional[str] = None assigned: t.Optional[list[int]] = None class Category(t.NamedTuple): name: str depth: int time: str tasks: list # of Category|Task, but mypy doesn't support recursive types intro: str class Config: tasks_path: str people_path: str choristes: list[str] ca: list[str] taches: Category env: dict[str, str] def __init__(self, tasks_path: str, people_path: str): self.tasks_path = tasks_path self.people_path = people_path self.choristes = [] self.ca = [] self.env = {} self._load_tasks() self._load_people() def _load_tasks(self) -> None: with open(self.tasks_path, "r") as h: raw_tasks = yaml.safe_load(h) assert "taches" in raw_tasks self.env = raw_tasks["env"] self.taches = Category( name="", depth=0, time="", intro="", tasks=list(map(self._load_task_cat, raw_tasks["taches"])), ) self.ca = raw_tasks["CA"] def _load_task_cat( self, cat: dict[str, t.Any], depth: int = 1, qual: str = "" ) -> Task | Category: if "cat" not in cat: return self._load_task(cat, qual) assert "taches" in cat nqual = cat["cat"] if qual: nqual = f"{qual} - {nqual}" return Category( name=cat["cat"], depth=depth, time=cat.get("heure", ""), intro=cat.get("intro", ""), tasks=list( map( lambda x: self._load_task_cat(x, depth=depth + 1, qual=nqual), cat["taches"], ) ), ) def _load_task(self, task: dict[str, t.Any], qual: str) -> Task: for label in ("nom", "descr"): assert label in task qual_name = f'{qual}{" - " if qual else ""}{task["nom"]}' return Task( name=task["nom"], qualified_name=qual_name, descr=task["descr"].format(**self.env), tough=int(task["penible"]), notes=task.get("notes", ""), time=task.get("heure", ""), nb_groups=int(task.get("nb_groupes", 1)), referent=task.get("ref", None), ) def _load_people(self) -> None: with open(self.people_path, "r") as h: raw_people: list[dict[str, str]] = list(csv.DictReader(h)) for key in "Nom", "Prénom": assert key in raw_people[0] raw_people.sort(key=lambda x: x["Prénom"]) # Normalize def normalize(x: str) -> str: x = x.strip() if not x: return "" for sep in (" ", "-"): if sep in x: return sep.join(map(normalize, x.split(sep))) return x[0].upper() + x[1:].lower() for pers in raw_people: pers["Nom"] = normalize(pers["Nom"]) pers["Prénom"] = normalize(pers["Prénom"]) # Group by name proximity name_uf = UnionFind(len(raw_people)) for id1, pers1 in enumerate(raw_people): for id2, pers2 in enumerate(raw_people): if ( id1 < id2 and levenshtein_distance(pers1["Prénom"], pers2["Prénom"]) <= 2 ): name_uf.union(id1, id2) _name_groups: dict[int, list[dict]] = defaultdict(list) for pers_id, pers in enumerate(raw_people): _name_groups[name_uf.root(pers_id)].append(pers) name_groups: list[list[dict]] = list(_name_groups.values()) # Disambiguate names def make_short_name(pers: dict, disamb: int = 0) -> str: if disamb: return f"{pers['Prénom']} {pers['Nom'][:disamb]}." else: return pers["Prénom"] self.choristes = [] for grp in name_groups: if len(grp) == 1: self.choristes.append(make_short_name(grp[0])) else: req_letters = 1 while req_letters < 100: # safeguard short_names = list( map(lambda x: make_short_name(x, req_letters), grp) ) if len(set(short_names)) == len(short_names): # No clashes self.choristes += short_names break req_letters += 1 self.choristes.sort()