""" Implements Multiway number partitioning greedy algorithm """ import typing as t import logging from sortedcontainers import SortedList from .config import Task __all__ = ["TaskId", "partition"] TaskId = t.NewType("TaskId", int) logger = logging.getLogger(__name__) class PartitionException(Exception): """An exception occurring during partitioning""" class UnsolvableConflict(PartitionException): """Cannot partition set due to unsolvable conflicts""" class Bin: """A bin containing assigned tasks""" elts: list[TaskId] cost: int def __init__(self): self.elts = [] self.cost = 0 def add(self, task: TaskId, cost: int): assert task not in self.elts self.elts.append(task) self.cost += cost def __contains__(self, task: TaskId) -> bool: return task in self.elts def partition( bin_count: int, tasks: list[Task], costs: dict[TaskId, int], multiplicity: dict[TaskId, int], ) -> list[list[TaskId]]: """Partitions the tasks, each with cost `costs[i]`, into `bin_count` bins. Each task has multiplicity `multiplicity[i]`, copies of the same task being mutually exclusive (ie. cannot be in the same bin)""" bins = SortedList([Bin() for _ in range(bin_count)], key=lambda x: x.cost) ordered_tasks: list[TaskId] = [] for t_id, reps in multiplicity.items(): for _ in range(reps): ordered_tasks.append(t_id) ordered_tasks.sort(key=lambda x: costs[x], reverse=True) for task in ordered_tasks: possible_bins: list[int] = [] min_possible: t.Optional[int] = None for pos, cur_bin in enumerate(bins): if min_possible is not None and cur_bin.cost > min_possible: break if task not in cur_bin: if min_possible is None: min_possible = cur_bin.cost possible_bins.append(pos) if not possible_bins: raise UnsolvableConflict( "Pas assez de groupes pour affecter la tâche " + f"{tasks[task].qualified_name} {multiplicity[task]} fois." ) # Pick one of the groups -- maximize distance to other tasks closest_assigned = [] for cur_bin_id in possible_bins: cur_bin = bins[cur_bin_id] if cur_bin.elts: closest_assigned.append(min((abs(elt - task) for elt in cur_bin.elts))) else: closest_assigned.append(len(costs) + 1) assign_to_bin_id, _ = max(enumerate(closest_assigned), key=lambda x: x[1]) assign_to_bin = bins[possible_bins[assign_to_bin_id]] del bins[assign_to_bin_id] assign_to_bin.add(task, costs[task]) bins.add(assign_to_bin) out: list[list[TaskId]] = [] for cur_bin in bins: out.append(cur_bin.elts) return out