Source code for ablator.mp.utils
import copy
import os
from dataclasses import dataclass, field
import numpy as np
import ray
import torch
[docs]@dataclass
class Resource:
gpu_free_mem: dict[str, int]
mem: int
cpu_usage: float
cpu_count: int
running_tasks: list[str] = field(default_factory=lambda: [])
@property
def gpu_free_mem_arr(self) -> np.ndarray:
return np.array(list(self.gpu_free_mem.values()))
@property
def cpu_mean_util(self) -> float:
return np.array(self.cpu_usage).mean()
@property
def least_used_gpu(self):
return min(self.gpu_free_mem, key=self.gpu_free_mem.get)
[docs]def ray_init(**kwargs):
if (
"CUDA_VISIBLE_DEVICES" not in os.environ
or os.environ["CUDA_VISIBLE_DEVICES"] != ""
) and torch.cuda.is_available():
kwargs["num_gpus"] = 1
return ray.init(**kwargs)
def _sorted_gpu_util(resources: list[Resource]):
return np.argsort(
[
_resources.gpu_free_mem_arr.max() if len(_resources.gpu_free_mem_arr) else 0
for _resources in resources
]
)[::-1]
def _sorted_cpu_util(resources: list[Resource]):
return np.argsort([_resources.cpu_mean_util for _resources in resources])
def _sorted_mem_util(resources: list[Resource]):
return np.argsort([_resources.mem for _resources in resources])
def _sorted_task_util(resources: list[Resource]):
return np.argsort([len(_resources.running_tasks) for _resources in resources])
def _sort_node_ips_by_util(resources: dict[str, Resource], eval_gpu: bool):
node_ips = list(resources.keys())
resources_list = list(resources.values())
_node_ips = []
for _ in range(len(node_ips)):
usage_lists = []
if eval_gpu:
usage_lists.append(_sorted_gpu_util(resources_list))
usage_lists.append(_sorted_cpu_util(resources_list))
usage_lists.append(_sorted_mem_util(resources_list))
usage_lists.append(_sorted_task_util(resources_list))
np_usage_lists = np.array(usage_lists)
idxs, values = np.unique(np_usage_lists[:, 0], return_counts=True)
least_util_idx = idxs[np.argmax(values)]
_node_ips.append(node_ips[least_util_idx])
del resources_list[least_util_idx]
del node_ips[least_util_idx]
return _node_ips
def _sorted_nodes_by_util(
resources: dict[str, Resource],
gpu_util_requirement: int | None = None,
memory_perc_limit: int = 80,
cpu_util_perc_limit: int = 80,
) -> list[str]:
"""
_sorted_nodes_by_util Sorts the nodes based on their available resources from
the least used to the most used node. If a node does not meet the `gpu_util_requirement` or
`memory_perc_limit` and `cpu_util_perc_limit` it is excluded from the list.
Parameters
----------
resources : dict[str, Resource]
a dictionary of the nodes with their available resources
gpu_util_requirement : int | None
the gpu requirement for the task, by default ``None``.
memory_perc_limit : int
the percentage upper limit to memory utilization, by default ``80``.
cpu_util_perc_limit : int
the percentage upper limit to cpu utilization, by default ``80``.
Returns
-------
list[str]
the sorted list of node ips sorted from the least to most used.
"""
node_ips = _sort_node_ips_by_util(resources, gpu_util_requirement is not None)
def _should_sample(node_ip):
ray_cluster_gpu_limit = gpu_util_requirement is None or any(
resources[node_ip].gpu_free_mem_arr > gpu_util_requirement
)
ray_cluster_cpu_limit = resources[node_ip].cpu_mean_util < cpu_util_perc_limit
ray_cluster_mem_limit = resources[node_ip].mem < memory_perc_limit
return ray_cluster_mem_limit and ray_cluster_cpu_limit and ray_cluster_gpu_limit
_free_nodes = []
for node_ip in copy.deepcopy(node_ips):
if _should_sample(node_ip):
_free_nodes.append(node_ip)
return _free_nodes