WE-repartir-taches/repartir_taches/config.py

159 lines
4.8 KiB
Python

from dataclasses import dataclass
from collections import defaultdict
import csv
import typing as t
from ruamel import yaml
from .util import levenshtein_distance, UnionFind
@dataclass
class Task:
name: str
qualified_name: str
descr: str
tough: int
notes: str
time: str
nb_groups: int
referent: t.Optional[str] = None
assigned: t.Optional[list[int]] = None
class Category(t.NamedTuple):
name: str
depth: int
time: str
tasks: list # of Category|Task, but mypy doesn't support recursive types
intro: str
class Config:
tasks_path: str
people_path: str
choristes: list[str]
ca: list[str]
taches: Category
env: dict[str, str]
def __init__(self, tasks_path: str, people_path: str):
self.tasks_path = tasks_path
self.people_path = people_path
self.choristes = []
self.ca = []
self.env = {}
self._load_tasks()
self._load_people()
def _load_tasks(self) -> None:
with open(self.tasks_path, "r") as h:
raw_tasks = yaml.safe_load(h)
assert "taches" in raw_tasks
self.env = raw_tasks["env"]
self.taches = Category(
name="",
depth=0,
time="",
intro="",
tasks=list(map(self._load_task_cat, raw_tasks["taches"])),
)
self.ca = raw_tasks["CA"]
def _load_task_cat(
self, cat: dict[str, t.Any], depth: int = 1, qual: str = ""
) -> Task | Category:
if "cat" not in cat:
return self._load_task(cat, qual)
assert "taches" in cat
nqual = cat["cat"]
if qual:
nqual = f"{qual} - {nqual}"
return Category(
name=cat["cat"],
depth=depth,
time=cat.get("heure", ""),
intro=cat.get("intro", ""),
tasks=list(
map(
lambda x: self._load_task_cat(x, depth=depth + 1, qual=nqual),
cat["taches"],
)
),
)
def _load_task(self, task: dict[str, t.Any], qual: str) -> Task:
for label in ("nom", "descr"):
assert label in task
qual_name = f'{qual}{" - " if qual else ""}{task["nom"]}'
return Task(
name=task["nom"],
qualified_name=qual_name,
descr=task["descr"].format(**self.env),
tough=int(task["penible"]),
notes=task.get("notes", ""),
time=task.get("heure", ""),
nb_groups=int(task.get("nb_groupes", 1)),
referent=task.get("ref", None),
)
def _load_people(self) -> None:
with open(self.people_path, "r") as h:
raw_people: list[dict[str, str]] = list(csv.DictReader(h))
for key in "Nom", "Prénom":
assert key in raw_people[0]
raw_people.sort(key=lambda x: x["Prénom"])
# Normalize
def normalize(x: str) -> str:
x = x.strip()
if not x:
return ""
for sep in (" ", "-"):
if sep in x:
return sep.join(map(normalize, x.split(sep)))
return x[0].upper() + x[1:].lower()
for pers in raw_people:
pers["Nom"] = normalize(pers["Nom"])
pers["Prénom"] = normalize(pers["Prénom"])
# Group by name proximity
name_uf = UnionFind(len(raw_people))
for id1, pers1 in enumerate(raw_people):
for id2, pers2 in enumerate(raw_people):
if (
id1 < id2
and levenshtein_distance(pers1["Prénom"], pers2["Prénom"]) <= 2
):
name_uf.union(id1, id2)
_name_groups: dict[int, list[dict]] = defaultdict(list)
for pers_id, pers in enumerate(raw_people):
_name_groups[name_uf.root(pers_id)].append(pers)
name_groups: list[list[dict]] = list(_name_groups.values())
# Disambiguate names
def make_short_name(pers: dict, disamb: int = 0) -> str:
if disamb:
return f"{pers['Prénom']} {pers['Nom'][:disamb]}."
else:
return pers["Prénom"]
self.choristes = []
for grp in name_groups:
if len(grp) == 1:
self.choristes.append(make_short_name(grp[0]))
else:
req_letters = 1
while req_letters < 100: # safeguard
short_names = list(
map(lambda x: make_short_name(x, req_letters), grp)
)
if len(set(short_names)) == len(short_names):
# No clashes
self.choristes += short_names
break
req_letters += 1
self.choristes.sort()