WE-repartir-taches/repartir_taches/entrypoint.py

243 lines
7.4 KiB
Python
Raw Normal View History

import argparse
import typing as t
import random
2022-10-30 16:59:13 +01:00
from pathlib import Path
2023-03-04 11:45:48 +01:00
import logging
2022-10-30 16:59:13 +01:00
import jinja2 as j2
from .config import Task, Category, Config
from .partition import TaskId, partition
2022-10-30 16:59:13 +01:00
from . import util
2023-03-04 11:45:48 +01:00
logger = logging.getLogger(__name__)
2023-03-04 11:45:48 +01:00
Group: t.TypeAlias = list[str]
def constituer_groupes(choristes: list[str]) -> list[Group]:
"""Répartir aléatoirement les gens en groupes"""
TAILLE_GROUPE: int = 4
nb_choristes = len(choristes)
groupes: list[list[str]] = []
random.shuffle(choristes)
pos = 0
for _ in range(nb_choristes // TAILLE_GROUPE):
groupes.append(choristes[pos : pos + TAILLE_GROUPE])
pos += TAILLE_GROUPE
reste = choristes[pos:]
if len(reste) == TAILLE_GROUPE - 1:
groupes.append(reste)
else:
for gid, pers in enumerate(reste):
groupes[gid].append(pers)
for groupe in groupes:
groupe.sort()
random.shuffle(groupes)
return groupes
2023-03-04 11:45:48 +01:00
class AssignError(Exception):
"""Levé lorsque les tâches ont été mal partitionnées"""
def assigner_taches(root_task: Category | Task, group_count: int):
"""Assigne les tâches aux groupes (multiway number partitioning)"""
2023-03-04 11:45:48 +01:00
def flatten(task: Category | Task) -> list[Task]:
if isinstance(task, Task):
return [task]
out = []
for subtask in task.tasks:
out += flatten(subtask)
return out
all_tasks = flatten(root_task)
def pp_assigned_toughness(repart: list[list[TaskId]]) -> str:
2023-03-04 11:45:48 +01:00
"""Pretty-print the assigned toughness for each group"""
out = []
for grp_id, grp in enumerate(repart):
toughness: int = sum(map(lambda x: all_tasks[x].tough, grp))
out.append(f"{grp_id+1:2d}: {toughness:>3d}")
2023-03-04 11:45:48 +01:00
return "\n".join(out)
costs: dict[TaskId, int] = {}
multiplicity: dict[TaskId, int] = {}
2023-03-04 11:45:48 +01:00
for task_id, task in enumerate(all_tasks):
t_id: TaskId = TaskId(task_id)
costs[t_id] = task.tough
multiplicity[t_id] = task.nb_groups
repart: list[list[TaskId]] = partition(
bin_count=group_count,
tasks=all_tasks,
costs=costs,
multiplicity=multiplicity,
2023-03-04 11:45:48 +01:00
)
# Sanity-check
assigned_count = sum(map(len, repart))
task_count = sum(multiplicity.values())
if task_count != assigned_count:
raise AssignError(
f"{assigned_count} tâches ont été attribuées, mais il y en a {task_count} !"
)
2023-03-04 11:45:48 +01:00
for g_id, grp in enumerate(repart):
taskset: set[TaskId] = set()
for task_id in grp:
2023-03-04 11:45:48 +01:00
if task_id in taskset:
raise AssignError(
f"Le groupe {g_id + 1} a deux fois la tâche {task.qualified_name}"
)
taskset.add(task_id)
# Actually assign
for g_id, grp in enumerate(repart):
for task_id in grp:
2023-03-04 11:45:48 +01:00
task = all_tasks[task_id]
if task.assigned is None:
task.assigned = [g_id]
else:
task.assigned.append(g_id)
logger.debug("Assigned toughness mapping:\n%s", pp_assigned_toughness(repart))
# Check that all tasks are assigned
assert all(map(lambda x: x.assigned is not None and x.assigned, all_tasks))
2022-10-30 16:59:13 +01:00
def export_short_md(config: Config, groupes: list[list[str]]) -> str:
"""Exporte la liste des tâches au format Markdown court (pour vérification)"""
def export_taskcat(grp: Task | Category) -> str:
if isinstance(grp, Task):
assert grp.assigned is not None
return (
f"* {grp.qualified_name}: "
+ f'{", ".join(map(lambda x: str(x+1), grp.assigned))}'
)
out = "\n" + "#" * (2 + grp.depth) + f" {grp.name}"
if grp.time:
out += f" ({grp.time})"
out += "\n\n"
if grp.intro:
out += grp.intro + "\n\n"
out += "\n".join(map(export_taskcat, grp.tasks))
return out
out = "## Groupes\n\n"
for g_id, group in enumerate(groupes):
out += f"* Groupe {g_id+1} : " + ", ".join(group) + "\n"
out += "\n## Tâches\n"
out += "\n".join(map(export_taskcat, config.taches.tasks))
return out
2023-02-26 16:29:49 +01:00
def export_bare_tasks_md(config: Config) -> str:
"""Exporte la liste des tâches sans assignation en markdown, pour relecture de la
liste, des nombres de groupes assignés et du coefficient de pénibilité"""
def export_taskcat(grp: Task | Category) -> str:
if isinstance(grp, Task):
out = f"* **{grp.name}** : "
out += f"{grp.nb_groups} groupe{'s' if grp.nb_groups > 1 else ''}, "
out += f"pénible x{grp.tough}"
if grp.referent is not None:
out += f" (référent {grp.referent})"
return out
out = "\n" + "#" * (2 + grp.depth) + f" {grp.name}"
if grp.time:
out += f" ({grp.time})"
out += "\n\n"
if grp.intro:
out += grp.intro + "\n\n"
out += "\n".join(map(export_taskcat, grp.tasks))
return out
return "\n".join(map(export_taskcat, config.taches.tasks))
def export_latex(config: Config, groupes: list[list[str]]) -> str:
"""Exporter la liste des tâches en LaTeX (à insérer dans un template)"""
2022-10-30 16:59:13 +01:00
j2_env = util.j2_environment()
template = j2_env.get_template("repartition.tex.j2")
env = {
"groupes": {g_id: grp for g_id, grp in enumerate(groupes)},
"taches": config.taches.tasks,
2022-10-30 18:06:02 +01:00
"couleur": util.group_colors,
2022-10-30 16:59:13 +01:00
}
return template.render(**env)
2023-03-04 11:45:48 +01:00
def repartition(config: Config) -> list[Group]:
"""Crée des groupes et assigne des tâches"""
groupes: list[Group] = constituer_groupes(config.choristes)
assigner_taches(config.taches, len(groupes))
return groupes
def main() -> None:
parser = argparse.ArgumentParser("Répartition des tâches")
parser.add_argument("taches", help="Fichier yaml contenant les tâches")
parser.add_argument("choristes", help="Fichier CSV contenant les choristes")
2022-10-30 16:59:13 +01:00
parser.add_argument("--to-tex", help="Exporter vers un fichier LaTeX")
2023-02-26 16:29:49 +01:00
parser.add_argument(
"--bare-tasks",
help=(
"Exporter seulement les tâches sans assignation pour revue vers ce fichier"
),
)
2022-10-30 16:59:13 +01:00
parser.add_argument(
"--to-short-md",
help="Exporter vers un fichier Markdown (pour vérification uniquement)",
)
2023-03-04 11:45:48 +01:00
parser.add_argument(
"-g",
"--debug",
dest="loglevel",
action="store_const",
const=logging.DEBUG,
default=logging.INFO,
)
args = parser.parse_args()
2023-03-04 11:45:48 +01:00
logging.basicConfig(level=args.loglevel)
config = Config(args.taches, args.choristes)
2023-02-26 16:29:49 +01:00
if args.bare_tasks:
util.write_to_file(args.bare_tasks, export_bare_tasks_md(config))
retry: int = 0
MAX_RETRY: int = 4
while retry < MAX_RETRY:
try:
groupes = repartition(config)
break
except AssignError as exn:
retry += 1
logger.warning(
"[essai %d/%d] Échec de répartition des tâches : %s",
retry,
MAX_RETRY,
exn,
)
if retry == MAX_RETRY:
logger.critical("Échec de répartition des tâches.")
raise exn from exn
2022-10-30 16:59:13 +01:00
if args.to_tex:
util.write_to_file(args.to_tex, export_latex(config, groupes))
if args.to_short_md:
util.write_to_file(args.to_short_md, export_short_md(config, groupes))