Compare commits

...

5 commits

5 changed files with 193 additions and 18 deletions

6
mypy.ini Normal file
View file

@ -0,0 +1,6 @@
[mypy]
check_untyped_defs = True
[mypy-sortedcontainers.*]
follow_imports = skip
ignore_missing_imports = True

View file

@ -2,13 +2,19 @@ import argparse
import typing as t import typing as t
import random import random
from pathlib import Path from pathlib import Path
import logging
import jinja2 as j2 import jinja2 as j2
from .config import Task, Category, Config from .config import Task, Category, Config
from .partition import TaskId, partition
from . import util from . import util
logger = logging.getLogger(__name__)
def constituer_groupes(choristes: list[str]) -> list[list[str]]: Group: t.TypeAlias = list[str]
def constituer_groupes(choristes: list[str]) -> list[Group]:
"""Répartir aléatoirement les gens en groupes""" """Répartir aléatoirement les gens en groupes"""
TAILLE_GROUPE: int = 4 TAILLE_GROUPE: int = 4
nb_choristes = len(choristes) nb_choristes = len(choristes)
@ -36,19 +42,72 @@ def constituer_groupes(choristes: list[str]) -> list[list[str]]:
return groupes return groupes
def assigner_taches(task: Category | Task, group_count: int, cur_group: int = 0) -> int: class AssignError(Exception):
"""Assigne les tâches aux groupes (round-robin)""" """Levé lorsque les tâches ont été mal partitionnées"""
if isinstance(task, Task):
task.assigned = list(
map( def assigner_taches(root_task: Category | Task, group_count: int):
lambda x: x % group_count, """Assigne les tâches aux groupes (multiway number partitioning)"""
range(cur_group, cur_group + task.nb_groups),
) def flatten(task: Category | Task) -> list[Task]:
if isinstance(task, Task):
return [task]
out = []
for subtask in task.tasks:
out += flatten(subtask)
return out
all_tasks = flatten(root_task)
def pp_assigned_toughness(repart: list[list[TaskId]]) -> str:
"""Pretty-print the assigned toughness for each group"""
out = []
for grp_id, grp in enumerate(repart):
toughness: int = sum(map(lambda x: all_tasks[x].tough, grp))
out.append(f"{grp_id+1:2d}: {toughness:>3d}")
return "\n".join(out)
costs: dict[TaskId, int] = {}
multiplicity: dict[TaskId, int] = {}
for task_id, task in enumerate(all_tasks):
t_id: TaskId = TaskId(task_id)
costs[t_id] = task.tough
multiplicity[t_id] = task.nb_groups
repart: list[list[TaskId]] = partition(
bin_count=group_count,
costs=costs,
multiplicity=multiplicity,
)
# Sanity-check
assigned_count = sum(map(len, repart))
task_count = sum(multiplicity.values())
if task_count != assigned_count:
raise AssignError(
f"{assigned_count} tâches ont été attribuées, mais il y en a {task_count} !"
) )
return (cur_group + task.nb_groups) % group_count for g_id, grp in enumerate(repart):
for subtask in task.tasks: taskset: set[TaskId] = set()
cur_group = assigner_taches(subtask, group_count, cur_group) for task_id in grp:
return cur_group if task_id in taskset:
raise AssignError(
f"Le groupe {g_id + 1} a deux fois la tâche {task.qualified_name}"
)
taskset.add(task_id)
# Actually assign
for g_id, grp in enumerate(repart):
for task_id in grp:
task = all_tasks[task_id]
if task.assigned is None:
task.assigned = [g_id]
else:
task.assigned.append(g_id)
logger.debug("Assigned toughness mapping:\n%s", pp_assigned_toughness(repart))
# Check that all tasks are assigned
assert all(map(lambda x: x.assigned is not None and x.assigned, all_tasks))
def export_short_md(config: Config, groupes: list[list[str]]) -> str: def export_short_md(config: Config, groupes: list[list[str]]) -> str:
@ -57,7 +116,10 @@ def export_short_md(config: Config, groupes: list[list[str]]) -> str:
def export_taskcat(grp: Task | Category) -> str: def export_taskcat(grp: Task | Category) -> str:
if isinstance(grp, Task): if isinstance(grp, Task):
assert grp.assigned is not None assert grp.assigned is not None
return f'* {grp.qualified_name}: {", ".join(map(lambda x: str(x+1), grp.assigned))}' return (
f"* {grp.qualified_name}: "
+ f'{", ".join(map(lambda x: str(x+1), grp.assigned))}'
)
out = "\n" + "#" * (2 + grp.depth) + f" {grp.name}" out = "\n" + "#" * (2 + grp.depth) + f" {grp.name}"
if grp.time: if grp.time:
out += f" ({grp.time})" out += f" ({grp.time})"
@ -115,7 +177,15 @@ def export_latex(config: Config, groupes: list[list[str]]) -> str:
return template.render(**env) return template.render(**env)
def main(): def repartition(config: Config) -> list[Group]:
"""Crée des groupes et assigne des tâches"""
groupes: list[Group] = constituer_groupes(config.choristes)
assigner_taches(config.taches, len(groupes))
return groupes
def main() -> None:
parser = argparse.ArgumentParser("Répartition des tâches") parser = argparse.ArgumentParser("Répartition des tâches")
parser.add_argument("taches", help="Fichier yaml contenant les tâches") parser.add_argument("taches", help="Fichier yaml contenant les tâches")
parser.add_argument("choristes", help="Fichier CSV contenant les choristes") parser.add_argument("choristes", help="Fichier CSV contenant les choristes")
@ -130,15 +200,40 @@ def main():
"--to-short-md", "--to-short-md",
help="Exporter vers un fichier Markdown (pour vérification uniquement)", help="Exporter vers un fichier Markdown (pour vérification uniquement)",
) )
parser.add_argument(
"-g",
"--debug",
dest="loglevel",
action="store_const",
const=logging.DEBUG,
default=logging.INFO,
)
args = parser.parse_args() args = parser.parse_args()
logging.basicConfig(level=args.loglevel)
config = Config(args.taches, args.choristes) config = Config(args.taches, args.choristes)
if args.bare_tasks: if args.bare_tasks:
util.write_to_file(args.bare_tasks, export_bare_tasks_md(config)) util.write_to_file(args.bare_tasks, export_bare_tasks_md(config))
groupes = constituer_groupes(config.choristes) retry: int = 0
assigner_taches(config.taches, len(groupes)) MAX_RETRY: int = 4
while retry < MAX_RETRY:
try:
groupes = repartition(config)
break
except AssignError as exn:
retry += 1
logger.warning(
"[essai %d/%d] Échec de répartition des tâches : %s",
retry,
MAX_RETRY,
exn,
)
if retry == MAX_RETRY:
logger.critical("Échec de répartition des tâches.")
raise exn from exn
if args.to_tex: if args.to_tex:
util.write_to_file(args.to_tex, export_latex(config, groupes)) util.write_to_file(args.to_tex, export_latex(config, groupes))

View file

@ -0,0 +1,73 @@
""" Implements Multiway number partitioning greedy algorithm """
import typing as t
from sortedcontainers import SortedList
__all__ = ["TaskId", "partition"]
TaskId = t.NewType("TaskId", int)
class PartitionException(Exception):
"""An exception occurring during partitioning"""
class UnsolvableConflict(PartitionException):
"""Cannot partition set due to unsolvable conflicts"""
class Bin:
"""A bin containing assigned tasks"""
elts: list[TaskId]
cost: int
def __init__(self):
self.elts = []
self.cost = 0
def add(self, task: TaskId, cost: int):
assert task not in self.elts
self.elts.append(task)
self.cost += cost
def __contains__(self, task: TaskId) -> bool:
return task in self.elts
def partition(
bin_count: int, costs: dict[TaskId, int], multiplicity: dict[TaskId, int]
) -> list[list[TaskId]]:
"""Partitions the tasks, each with cost `costs[i]`, into `bin_count` bins. Each
task has multiplicity `multiplicity[i]`, copies of the same task being mutually
exclusive (ie. cannot be in the same bin)"""
bins = SortedList([Bin() for _ in range(bin_count)], key=lambda x: x.cost)
ordered_tasks: list[TaskId] = []
for t_id, reps in multiplicity.items():
for _ in range(reps):
ordered_tasks.append(t_id)
ordered_tasks.sort(key=lambda x: costs[x], reverse=True)
for task in ordered_tasks:
least_full: Bin
least_full_pos: int
for pos, cur_bin in enumerate(bins):
if task not in cur_bin:
least_full = cur_bin
least_full_pos = pos
break
else:
raise UnsolvableConflict(
"Pas assez de groupes pour affecter la tâche "
+ f"{task} {multiplicity[task]} fois."
)
del bins[least_full_pos]
least_full.add(task, costs[task])
bins.add(least_full)
out: list[list[TaskId]] = []
for cur_bin in bins:
out.append(cur_bin.elts)
return out

View file

@ -29,7 +29,7 @@ def levenshtein_distance(s1, s2):
if len(s1) > len(s2): if len(s1) > len(s2):
s1, s2 = s2, s1 s1, s2 = s2, s1
distances = range(len(s1) + 1) distances: list[int] = list(range(len(s1) + 1))
for i2, c2 in enumerate(s2): for i2, c2 in enumerate(s2):
distances_ = [i2 + 1] distances_ = [i2 + 1]
for i1, c1 in enumerate(s1): for i1, c1 in enumerate(s1):

View file

@ -1,2 +1,3 @@
ruamel.yaml ruamel.yaml
Jinja2 Jinja2
sortedcontainers