Worker Management

class scalable.SlurmCluster(job_cls: ~scalable.core.Job = None, loop=None, security=None, shared_temp_directory=None, silence_logs='error', name=None, asynchronous=False, dashboard_address=None, host=None, scheduler_options={}, scheduler_cls=<class 'distributed.scheduler.Scheduler'>, interface=None, protocol=None, config_overwrite=True, comm_port=None, logs_location=None, suppress_logs=False, **job_kwargs)[source]

Bases: JobQueueCluster

Launch Dask on a SLURM cluster. Inherits the JobQueueCluster class.

Parameters:
  • account (str) – Accounting string associated with each worker job.

  • comm_port (int) – The network port on which the cluster can contact the host

  • config_overwrite (bool) – Remake model config_dict with available containers and their paths only. Defaults to False.

  • logs_location (str) – The location to store worker logs. Default to the logs folder in the current directory.

  • suppress_logs (bool) – Whether or not to suppress logs. Defaults to False.

  • name (str) – The name of the cluster, which would also be used to name workers. Defaults to class name.

  • queue (str) – Destination queue for each worker job.

  • run_scripts_path (str) – The path where the run scripts are located. Defaults to ./run_scripts. The run scripts should be in the format <worker tag>_script.sh.

  • security (Security) – A security object containing the TLS configuration for the worker. If True then a temporary security object with a self signed certificate is created.

  • use_run_scripts (bool) – Whether or not to use the run scripts. Defaults to True.

  • walltime (str) – Walltime for each worker job.

  • *args (tuple) – Positional arguments to pass to JobQueueCluster.

  • **kwargs (dict) – Keyword arguments to pass to JobQueueCluster.

scalable.SlurmCluster.add_container(self, tag, dirs, path=None, cpus=1, memory=None, preload_script=None)

Add containers to enable them launching as workers.

The required dependencies for the workers are assumed to be in the container at the given (or stored) path. The informaton given about the container will be written to the config_dict.

Parameters:
  • tag (str) – The tag or the container type of the worker to be launched. Example could include “gcam” for the gcam container and “stitches” for the stitches container.

  • dirs (dict) – A dictionary of path-on-worker:path-on-host pairs where path-on-worker is a path mounted to path-on-host. When the worker tries to access path-on-worker, it essentially accesssing path-on-work. List of volume/bind mounts. ‘/tmp’ is mounted to the same path on the host by default.

  • path (str) – The path at which the container is located at

  • cpus (int) – The number of cpus/processor cores to be reserved for this container. Note that this should be 1 if the container is only going to run single-threaded functions or programs. Set it to more than 1 only if the container will run multi-threaded functions. It needs to be ensured by the user that the function uses multiple threads, even if it’s launching an external program.

  • memory (str) – The amount of memory to be reserved for this container

  • preload_script (str) – The path to a script that will be run by each worker before it launches.

scalable.SlurmCluster.add_workers(self, tag=None, n=0)

Add workers to the cluster.

Parameters:
  • tag (str) – The tag or the container type of the worker to be launched usually associated with the programs stored in the container. Examples could include “gcam” for the gcam container and “stitches” for the stitches container.

  • n (int) – The number of workers desired to be launched with the given tag.

Examples

>>> cluster.add_workers("gcam", 4)
scalable.SlurmCluster.remove_workers(self, tag=None, n=0)

Remove workers from the cluster.

Parameters:
  • tag (str) – The tag or the container type of the worker to be removed. Examples could include “gcam” for the gcam container and “stitches” for the stitches container.

  • n (int) – The number of workers desired to be removed with the given tag.

Examples

>>> cluster.remove_workers("gcam", 4)
scalable.SlurmCluster.close(self, timeout: float | None = None) Awaitable[None] | None

Close the cluster

This closes all running jobs and the scheduler. Pending jobs belonging to the user are also cancelled.

scalable.SlurmCluster.set_default_request_quantity(nodes)

Set the default number of nodes to request when scaling the cluster.

Static Function. Does not require an instance of the class.

If set to 1 (the original default), the cluster will request one hardware node at a time when scaling. If set to a higher number, like 5, the cluster will request 5 hardware nodes at a time when scaling. This is helpful when each worker may need almost all the resources of a node and it is more efficient to request multiple nodes at once.

Parameters:

nodes (int) – Number of nodes to request when scaling the cluster.