Source code for streamline.utils.cluster

from dask.distributed import Client
from dask_jobqueue import SLURMCluster, LSFCluster, SGECluster
from dask_jobqueue import HTCondorCluster, MoabCluster, OARCluster, PBSCluster

cluster_dict = {
    'HTCondor': HTCondorCluster,
    'LSF': LSFCluster,
    'Moab': MoabCluster,
    'OAR': OARCluster,
    'PBS': PBSCluster,
    'SGE': SGECluster,
    'SLURM': SLURMCluster,
}


[docs] def get_cluster(cluster_type='SLURM', output_path=".", queue='defq', memory=4): client = None try: if cluster_type == 'SLURM': cluster = SLURMCluster(queue=queue, cores=1, memory=str(memory) + "G", walltime="24:00:00", log_directory=output_path + "/dask_logs/") cluster.adapt(maximum_jobs=400) elif cluster_type == "LSF": cluster = LSFCluster(queue=queue, cores=1, mem=memory * 1000000000, memory=str(memory) + "G", walltime="24:00", log_directory=output_path + "/dask_logs/") cluster.adapt(maximum_jobs=400) elif cluster_type == 'UGE': cluster = SGECluster(queue=queue, cores=1, memory=str(memory) + "G", resource_spec="mem_free=" + str(memory) + "G", walltime="24:00:00", log_directory=output_path + "/dask_logs/") cluster.adapt(maximum_jobs=400) elif cluster_type == 'HTCondor': cluster = HTCondorCluster(cores=1, disk=str(memory) + "G", memory=str(memory) + "G", log_directory=output_path + "/dask_logs/") cluster.adapt(maximum_jobs=400) elif cluster_type == 'Local': c = Client() cluster = c.cluster else: try: cluster = cluster_dict[cluster_type](queue=queue, cores=1, memory=str(memory) + "G", walltime="24:00:00", log_directory=output_path + "/dask_logs/") cluster.adapt(maximum_jobs=400) except KeyError: raise Exception("Unknown or Unsupported Cluster Type") client = Client(cluster) except Exception as e: print(e) raise Exception("Exception: Unknown Exception") # print("Running dask-cluster") # print(client.scheduler_info()) return client