Source code for streamline.runners.compare_runner

import os
import time
import dask
from pathlib import Path
from joblib import Parallel, delayed
from streamline.modeling.utils import SUPPORTED_MODELS
from streamline.modeling.utils import is_supported_model
from streamline.postanalysis.dataset_compare import CompareJob
from streamline.utils.runners import runner_fn
from streamline.utils.cluster import get_cluster


[docs] class CompareRunner: """ Runner Class for collating dataset compare job """ def __init__(self, output_path, experiment_name, experiment_path=None, algorithms=None, exclude=("XCS", "eLCS"), class_label="Class", instance_label=None, sig_cutoff=0.05, show_plots=False, run_cluster=False, queue='defq', reserved_memory=4): """ Args: output_path: path to output directory experiment_name: name of experiment (no spaces) algorithms: list of str of ML models to run sig_cutoff: significance cutoff, default=0.05 show_plots: flag to show plots """ self.output_path = output_path self.experiment_name = experiment_name self.class_label = class_label self.instance_label = instance_label self.experiment_path = experiment_path if algorithms is None: self.algorithms = SUPPORTED_MODELS if exclude is not None: for algorithm in exclude: try: self.algorithms.remove(algorithm) except Exception: Exception("Unknown algorithm in exclude: " + str(algorithm)) else: self.algorithms = list() for algorithm in algorithms: self.algorithms.append(is_supported_model(algorithm)) self.algorithms = sorted(self.algorithms) self.sig_cutoff = sig_cutoff self.show_plots = show_plots self.run_cluster = run_cluster self.queue = queue self.reserved_memory = reserved_memory # Argument checks if not os.path.exists(self.output_path): raise Exception("Output path must exist (from phase 1) before phase 6 can begin") if not os.path.exists(self.output_path + '/' + self.experiment_name): raise Exception("Experiment must exist (from phase 1) before phase 6 can begin")
[docs] def run(self, run_parallel=False): if self.run_cluster in ["SLURMOld", "LSFOld"]: if self.run_cluster == "SLURMOld": self.submit_slurm_cluster_job() if self.run_cluster == "LSFOld": self.submit_lsf_cluster_job() else: job_obj = CompareJob(self.output_path, self.experiment_name, None, self.algorithms, None, self.class_label, self.instance_label, self.sig_cutoff, self.show_plots) if run_parallel in ["multiprocessing", "True", True]: # p = multiprocessing.Process(target=runner_fn, args=(job_obj, )) # p.start() # p.join() Parallel()(delayed(runner_fn)(job_obj) for job_obj in [job_obj, ]) elif self.run_cluster and "Old" not in self.run_cluster: get_cluster(self.run_cluster, self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory) dask.compute([dask.delayed(runner_fn)(job_obj) for job_obj in [job_obj, ]]) else: job_obj.run()
[docs] def get_cluster_params(self): cluster_params = [self.output_path, self.experiment_name, None, False, None, self.class_label, self.instance_label, self.sig_cutoff, self.show_plots] cluster_params = [str(i) for i in cluster_params] return cluster_params
[docs] def submit_slurm_cluster_job(self): job_ref = str(time.time()) job_name = self.output_path + '/' + self.experiment_name + '/jobs/P1_' + job_ref + '_run.sh' sh_file = open(job_name, 'w') sh_file.write('#!/bin/bash\n') sh_file.write('#SBATCH -p ' + self.queue + '\n') sh_file.write('#SBATCH --job-name=' + job_ref + '\n') sh_file.write('#SBATCH --mem=' + str(self.reserved_memory) + 'G' + '\n') # sh_file.write('#BSUB -M '+str(maximum_memory)+'GB'+'\n') sh_file.write( '#SBATCH -o ' + self.output_path + '/' + self.experiment_name + '/logs/P7_' + job_ref + '.o\n') sh_file.write( '#SBATCH -e ' + self.output_path + '/' + self.experiment_name + '/logs/P7_' + job_ref + '.e\n') file_path = str(Path(__file__).parent.parent.parent) + "/streamline/legacy" + '/CompareJobSubmit.py' cluster_params = self.get_cluster_params() command = ' '.join(['srun', 'python', file_path] + cluster_params) sh_file.write(command + '\n') sh_file.close() os.system('sbatch ' + job_name)
[docs] def submit_lsf_cluster_job(self): job_ref = str(time.time()) job_name = self.output_path + '/' + self.experiment_name + '/jobs/P7_' + job_ref + '_run.sh' sh_file = open(job_name, 'w') sh_file.write('#!/bin/bash\n') sh_file.write('#BSUB -q ' + self.queue + '\n') sh_file.write('#BSUB -J ' + job_ref + '\n') sh_file.write('#BSUB -R "rusage[mem=' + str(self.reserved_memory) + 'G]"' + '\n') sh_file.write('#BSUB -M ' + str(self.reserved_memory) + 'GB' + '\n') sh_file.write( '#BSUB -o ' + self.output_path + '/' + self.experiment_name + '/logs/P7_' + job_ref + '.o\n') sh_file.write( '#BSUB -e ' + self.output_path + '/' + self.experiment_name + '/logs/P7_' + job_ref + '.e\n') file_path = str(Path(__file__).parent.parent.parent) + "/streamline/legacy" + '/CompareJobSubmit.py' cluster_params = self.get_cluster_params() command = ' '.join(['python', file_path] + cluster_params) sh_file.write(command + '\n') sh_file.close() os.system('bsub < ' + job_name)