import logging
import os
import glob
import time
import dask
import pickle
from pathlib import Path
from joblib import Parallel, delayed
from streamline.modeling.utils import SUPPORTED_MODELS
from streamline.modeling.utils import is_supported_model
from streamline.postanalysis.statistics import StatsJob
from streamline.utils.runners import runner_fn, num_cores
from streamline.utils.cluster import get_cluster
[docs]
class StatsRunner:
"""
Runner Class for collating statistics of all the models
"""
def __init__(self, output_path, experiment_name, algorithms=None, exclude=("XCS", "eLCS"),
class_label="Class", instance_label=None, scoring_metric='balanced_accuracy',
top_features=40, sig_cutoff=0.05, metric_weight='balanced_accuracy', scale_data=True,
exclude_plots=None, show_plots=False,
run_cluster=False, queue='defq', reserved_memory=4):
"""
Args:
output_path: path to output directory
experiment_name: name of experiment (no spaces)
algorithms: list of str of ML models to run
scoring_metric='balanced_accuracy'
sig_cutoff: significance cutoff, default=0.05
metric_weight='balanced_accuracy'
scale_data=True
exclude_plots:
metric_weight: ML model metric used as weight in composite FI plots \
(only supports balanced_accuracy or roc_auc as options). \
Recommend setting the same as primary_metric if possible, \
default='balanced_accuracy'
top_features: number of top features to illustrate in figures, default=40
show_plots: flag to show plots
"""
self.dataset = None
self.output_path = output_path
self.experiment_name = experiment_name
self.class_label = class_label
self.instance_label = instance_label
if algorithms is None:
self.algorithms = SUPPORTED_MODELS
if exclude is not None:
for algorithm in exclude:
try:
self.algorithms.remove(algorithm)
except Exception:
Exception("Unknown algorithm in exclude: " + str(algorithm))
else:
self.algorithms = list()
for algorithm in algorithms:
self.algorithms.append(is_supported_model(algorithm))
self.algorithms = sorted(self.algorithms)
self.scale_data = scale_data
self.sig_cutoff = sig_cutoff
self.show_plots = show_plots
self.scoring_metric = scoring_metric
self.exclude_plots = exclude_plots
known_exclude_options = ['plot_ROC', 'plot_PRC', 'plot_FI_box', 'plot_metric_boxplots']
if exclude_plots is not None:
for x in exclude_plots:
if x not in known_exclude_options:
logging.warning("Unknown exclusion option " + str(x))
else:
exclude_plots = list()
self.plot_roc = 'plot_ROC' not in exclude_plots
self.plot_prc = 'plot_PRC' not in exclude_plots
self.plot_metric_boxplots = 'plot_metric_boxplots' not in exclude_plots
self.plot_fi_box = 'plot_FI_box' not in exclude_plots
self.metric_weight = metric_weight
self.top_features = top_features
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
# Argument checks
if not os.path.exists(self.output_path):
raise Exception("Output path must exist (from phase 1) before phase 6 can begin")
if not os.path.exists(self.output_path + '/' + self.experiment_name):
raise Exception("Experiment must exist (from phase 1) before phase 6 can begin")
self.save_metadata()
[docs]
def run(self, run_parallel=False):
# Iterate through datasets, ignoring common folders
dataset_paths = os.listdir(self.output_path + "/" + self.experiment_name)
remove_list = ['.DS_Store', 'metadata.pickle', 'metadata.csv', 'algInfo.pickle', 'jobsCompleted', 'dask_logs',
'logs', 'jobs', 'DatasetComparisons',
self.experiment_name + '_STREAMLINE_Report.pdf']
for text in remove_list:
if text in dataset_paths:
dataset_paths.remove(text)
job_list = list()
for dataset_directory_path in dataset_paths:
full_path = self.output_path + "/" + self.experiment_name + "/" + dataset_directory_path
# Create folders for DT and GP visualizations
if "DT" in self.algorithms and not os.path.exists(full_path + '/model_evaluation/DT_Viz'):
os.mkdir(full_path + '/model_evaluation/DT_Viz')
if "GP" in self.algorithms and not os.path.exists(full_path + '/model_evaluation/GP_Viz'):
os.mkdir(full_path + '/model_evaluation/GP_Viz')
cv_dataset_paths = list(glob.glob(full_path + "/CVDatasets/*_CV_*Train.csv"))
cv_dataset_paths = [str(Path(cv_dataset_path)) for cv_dataset_path in cv_dataset_paths]
cv_partitions = len(cv_dataset_paths)
if self.run_cluster == "SLURMOld":
self.submit_slurm_cluster_job(full_path, cv_partitions)
continue
if self.run_cluster == "LSFOld":
self.submit_lsf_cluster_job(full_path, cv_partitions)
continue
job_obj = StatsJob(full_path, self.algorithms, self.class_label, self.instance_label, self.scoring_metric,
cv_partitions, self.top_features, self.sig_cutoff, self.metric_weight, self.scale_data,
self.exclude_plots,
self.show_plots)
if run_parallel and run_parallel != "False":
# p = multiprocessing.Process(target=runner_fn, args=(job_obj, ))
job_list.append(job_obj)
else:
job_obj.run()
if run_parallel and run_parallel != "False" and not self.run_cluster:
Parallel(n_jobs=num_cores)(delayed(runner_fn)(job_obj) for job_obj in job_list)
if self.run_cluster and "Old" not in self.run_cluster:
get_cluster(self.run_cluster,
self.output_path + '/' + self.experiment_name, self.queue, self.reserved_memory)
dask.compute([dask.delayed(runner_fn)(job_obj) for job_obj in job_list])
[docs]
def get_cluster_params(self, full_path, len_cv):
exclude_param = ','.join(self.exclude_plots) if self.exclude_plots else None
cluster_params = [full_path, None, self.class_label, self.instance_label, self.scoring_metric,
len_cv, self.top_features, self.sig_cutoff, self.metric_weight, self.scale_data,
exclude_param,
self.show_plots]
cluster_params = [str(i) for i in cluster_params]
return cluster_params
[docs]
def submit_slurm_cluster_job(self, dataset_path, len_cv):
job_ref = str(time.time())
job_name = self.output_path + '/' + self.experiment_name + '/jobs/P6_' + job_ref + '_run.sh'
sh_file = open(job_name, 'w')
sh_file.write('#!/bin/bash\n')
sh_file.write('#SBATCH -p ' + self.queue + '\n')
sh_file.write('#SBATCH --job-name=' + job_ref + '\n')
sh_file.write('#SBATCH --mem=' + str(self.reserved_memory) + 'G' + '\n')
# sh_file.write('#BSUB -M '+str(maximum_memory)+'GB'+'\n')
sh_file.write(
'#SBATCH -o ' + self.output_path + '/' + self.experiment_name +
'/logs/P6_' + job_ref + '.o\n')
sh_file.write(
'#SBATCH -e ' + self.output_path + '/' + self.experiment_name +
'/logs/P6_' + job_ref + '.e\n')
file_path = str(Path(__file__).parent.parent.parent) + "/streamline/legacy" + '/StatsJobSubmit.py'
cluster_params = self.get_cluster_params(dataset_path, len_cv)
command = ' '.join(['srun', 'python', file_path] + cluster_params)
sh_file.write(command + '\n')
sh_file.close()
os.system('sbatch ' + job_name)
[docs]
def submit_lsf_cluster_job(self, dataset_path, len_cv):
job_ref = str(time.time())
job_name = self.output_path + '/' + self.experiment_name + '/jobs/P6_' + job_ref + '_run.sh'
sh_file = open(job_name, 'w')
sh_file.write('#!/bin/bash\n')
sh_file.write('#BSUB -q ' + self.queue + '\n')
sh_file.write('#BSUB -J ' + job_ref + '\n')
sh_file.write('#BSUB -R "rusage[mem=' + str(self.reserved_memory) + 'G]"' + '\n')
sh_file.write('#BSUB -M ' + str(self.reserved_memory) + 'GB' + '\n')
sh_file.write(
'#BSUB -o ' + self.output_path + '/' + self.experiment_name +
'/logs/P6_' + job_ref + '.o\n')
sh_file.write(
'#BSUB -e ' + self.output_path + '/' + self.experiment_name +
'/logs/P6_' + job_ref + '.e\n')
file_path = str(Path(__file__).parent.parent.parent) + "/streamline/legacy" + '/StatsJobSubmit.py'
cluster_params = self.get_cluster_params(dataset_path, len_cv)
command = ' '.join(['python', file_path] + cluster_params)
sh_file.write(command + '\n')
sh_file.close()
os.system('bsub < ' + job_name)