Source code for streamline.runners.report_runner

import os
import time
import dask
from pathlib import Path
from joblib import Parallel, delayed
from streamline.modeling.utils import SUPPORTED_MODELS
from streamline.modeling.utils import is_supported_model
from streamline.postanalysis.gererate_report import ReportJob
from streamline.utils.runners import runner_fn
from streamline.utils.cluster import get_cluster


[docs] class ReportRunner: """ Runner Class for collating dataset compare job """ def __init__(self, output_path=None, experiment_name=None, experiment_path=None, algorithms=None, exclude=("XCS", "eLCS"), training=True, rep_data_path=None, dataset_for_rep=None, run_cluster=False, queue='defq', reserved_memory=4): """ Args: output_path: path to output directory experiment_name: name of experiment (no spaces) algorithms: list of str of ML models to run training: Indicate True or False for whether to generate pdf summary for pipeline \ training or followup application analysis to new dataset,default=True rep_data_path: path to directory containing replication or hold-out testing datasets \ (must have at least all features with same labels as in original training dataset),default=None dataset_for_rep: path to target original training dataset """ assert (output_path is not None and experiment_name is not None) or (experiment_path is not None) if output_path is not None and experiment_name is not None: self.output_path = output_path self.experiment_name = experiment_name self.experiment_path = self.output_path + '/' + self.experiment_name else: self.experiment_path = experiment_path self.experiment_name = self.experiment_path.split('/')[-1] self.output_path = self.experiment_path.split('/')[-2] self.training = training self.rep_data_path = rep_data_path self.train_data_path = dataset_for_rep self.run_cluster = run_cluster self.queue = queue self.reserved_memory = reserved_memory if algorithms is None: self.algorithms = SUPPORTED_MODELS if exclude is not None: for algorithm in exclude: try: self.algorithms.remove(algorithm) except Exception: Exception("Unknown algorithm in exclude: " + str(algorithm)) self.exclude = None else: self.algorithms = list() for algorithm in algorithms: self.algorithms.append(is_supported_model(algorithm)) self.exclude = exclude # Argument checks if not os.path.exists(self.output_path): raise Exception("Output path must exist (from phase 1) before phase 6 can begin") if not os.path.exists(self.output_path + '/' + self.experiment_name): raise Exception("Experiment must exist (from phase 1) before phase 6 can begin")
[docs] def run(self, run_parallel=False): if self.run_cluster in ["SLURMOld", "LSFOld"]: if self.run_cluster == "SLURMOld": self.submit_slurm_cluster_job() if self.run_cluster == "LSFOld": self.submit_lsf_cluster_job() else: job_obj = ReportJob(self.output_path, self.experiment_name, None, self.algorithms, None, self.training, self.train_data_path, self.rep_data_path) # running direct because it's faster HACK = not run_parallel if not HACK: if run_parallel and run_parallel != "False" and not self.run_cluster: # p = multiprocessing.Process(target=runner_fn, args=(job_obj, )) # p.start() # p.join() Parallel()(delayed(runner_fn)(job_obj) for job_obj in [job_obj, ]) elif self.run_cluster and "Old" not in self.run_cluster: get_cluster(self.run_cluster, self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory) dask.compute([dask.delayed(runner_fn)(job_obj) for job_obj in [job_obj, ]]) else: job_obj.run() else: job_obj.run()
[docs] def get_cluster_params(self): cluster_params = [self.output_path, self.experiment_name, None, None, None, self.training, self.train_data_path, self.rep_data_path] cluster_params = [str(i) for i in cluster_params] return cluster_params
[docs] def submit_slurm_cluster_job(self): job_ref = str(time.time()) job_name = self.output_path + '/' + self.experiment_name + '/jobs/PDF_' + job_ref + '_run.sh' sh_file = open(job_name, 'w') sh_file.write('#!/bin/bash\n') sh_file.write('#SBATCH -p ' + self.queue + '\n') sh_file.write('#SBATCH --job-name=' + job_ref + '\n') sh_file.write('#SBATCH --mem=' + str(self.reserved_memory) + 'G' + '\n') # sh_file.write('#BSUB -M '+str(maximum_memory)+'GB'+'\n') sh_file.write( '#SBATCH -o ' + self.output_path + '/' + self.experiment_name + '/logs/PDF_' + job_ref + '.o\n') sh_file.write( '#SBATCH -e ' + self.output_path + '/' + self.experiment_name + '/logs/PDF_' + job_ref + '.e\n') file_path = str(Path(__file__).parent.parent.parent) + "/streamline/legacy" + '/ReportJobSubmit.py' cluster_params = self.get_cluster_params() command = ' '.join(['srun', 'python', file_path] + cluster_params) sh_file.write(command + '\n') sh_file.close() os.system('sbatch ' + job_name)
[docs] def submit_lsf_cluster_job(self): job_ref = str(time.time()) job_name = self.output_path + '/' + self.experiment_name + '/jobs/PDF_' + job_ref + '_run.sh' sh_file = open(job_name, 'w') sh_file.write('#!/bin/bash\n') sh_file.write('#BSUB -q ' + self.queue + '\n') sh_file.write('#BSUB -J ' + job_ref + '\n') sh_file.write('#BSUB -R "rusage[mem=' + str(self.reserved_memory) + 'G]"' + '\n') sh_file.write('#BSUB -M ' + str(self.reserved_memory) + 'GB' + '\n') sh_file.write( '#BSUB -o ' + self.output_path + '/' + self.experiment_name + '/logs/PDF_' + job_ref + '.o\n') sh_file.write( '#BSUB -e ' + self.output_path + '/' + self.experiment_name + '/logs/PDF_' + job_ref + '.e\n') file_path = str(Path(__file__).parent.parent.parent) + "/streamline/legacy" + '/ReportJobSubmit.py' cluster_params = self.get_cluster_params() command = ' '.join(['python', file_path] + cluster_params) sh_file.write(command + '\n') sh_file.close() os.system('bsub < ' + job_name)