import logging
import os
import glob
import pickle
import time
import dask
from pathlib import Path
from joblib import Parallel, delayed
from streamline.modeling.utils import SUPPORTED_MODELS, is_supported_model
from streamline.postanalysis.model_replicate import ReplicateJob
from streamline.utils.runners import num_cores, runner_fn
from streamline.utils.cluster import get_cluster
[docs]
class ReplicationRunner:
"""
Phase 9 of STREAMLINE (Optional)- This 'Main' script manages Phase 9 run parameters,
and submits job to run locally (to run serially) or on
cluster (parallelized).
"""
def __init__(self, rep_data_path, dataset_for_rep, output_path, experiment_name,
class_label=None, instance_label=None, match_label=None, algorithms=None, load_algo=True,
exclude=("XCS", "eLCS"), exclude_plots=None,
run_cluster=False, queue='defq', reserved_memory=4, show_plots=False):
"""
Args:
rep_data_path: path to directory containing replication or \
hold-out testing datasets (must have at least all \
features with same labels as in original training dataset)
dataset_for_rep: path to target original training dataset
output_path: path to output directory
experiment_name: name of experiment (no spaces)
match_label: applies if original training data included column with matched instance ids, default=None
exclude_plots: analysis to exclude from outputs, possible options given below. \
export_feature_correlations, run and export feature correlation analysis (yields correlation heatmap), \
default=True
plot_roc, Plot ROC curves individually for each algorithm including all CV results and averages, \
default=True
plot_prc, Plot PRC curves individually for each algorithm including all CV results and averages, \
default=True
plot_metric_boxplots, Plot box plot summaries comparing algorithms for each metric, default=True
"""
self.rep_data_path = rep_data_path
self.dataset_for_rep = dataset_for_rep
self.output_path = output_path
self.experiment_name = experiment_name
# Param for future expansion
self.plot_lists = None
self.match_label = match_label
known_exclude_options = ['plot_ROC', 'plot_PRC', 'plot_metric_boxplots', 'feature_correlations']
if exclude_plots is not None:
for x in exclude_plots:
if x not in known_exclude_options:
logging.warning("Unknown exclusion option " + str(x))
else:
exclude_plots = list()
self.exclude_plots = exclude_plots
self.plot_roc = 'plot_ROC' not in exclude_plots
self.plot_prc = 'plot_PRC' not in exclude_plots
self.plot_metric_boxplots = 'plot_metric_boxplots' not in exclude_plots
self.plot_fi_box = 'plot_FI_box' not in exclude_plots
self.export_feature_correlations = 'feature_correlations' not in exclude_plots
self.experiment_path = self.output_path + '/' + self.experiment_name
# Save unique dataset names so that analysis is run only once if there is
# both a .txt and .csv version of dataset with same name.
self.data_name = self.dataset_for_rep.split('/')[-1].split('.')[0]
# Unpickle metadata from previous phase
file = open(self.output_path + '/' + self.experiment_name + '/' + "metadata.pickle", 'rb')
metadata = pickle.load(file)
file.close()
# Load variables specified earlier in the pipeline from metadata
self.class_label = class_label
if not class_label:
self.class_label = metadata['Class Label']
self.instance_label = instance_label
if not instance_label:
self.instance_label = metadata['Instance Label']
self.ignore_features = metadata['Ignored Features']
self.categorical_cutoff = metadata['Categorical Cutoff']
self.sig_cutoff = metadata['Statistical Significance Cutoff']
self.featureeng_missingness = metadata['Engineering Missingness Cutoff']
self.cleaning_missingness = metadata['Cleaning Missingness Cutoff']
self.cv_partitions = metadata['CV Partitions']
self.scale_data = metadata['Use Data Scaling']
self.impute_data = metadata['Use Data Imputation']
self.multi_impute = metadata['Use Multivariate Imputation']
self.show_plots = show_plots
self.scoring_metric = metadata['Primary Metric']
self.random_state = metadata['Random Seed']
self.run_cluster = run_cluster
self.queue = queue
self.reserved_memory = reserved_memory
# Argument checks
if not os.path.exists(self.output_path):
raise Exception("Output path must exist (from phase 1-8) before model application can begin")
if not os.path.exists(self.output_path + '/' + self.experiment_name):
raise Exception("Experiment must exist (from phase 1-8) before model application can begin")
# location of folder containing models respective training dataset
self.full_path = self.output_path + "/" + self.experiment_name + "/" + self.data_name
if not os.path.exists(self.full_path + "/replication"):
os.makedirs(self.full_path + "/replication")
if not self.show_plots:
if not os.path.exists(self.output_path + "/" + self.experiment_name + '/jobs'):
os.mkdir(self.output_path + "/" + self.experiment_name + '/jobs')
if not os.path.exists(self.output_path + "/" + self.experiment_name + '/logs'):
os.mkdir(self.output_path + "/" + self.experiment_name + '/logs')
if not load_algo:
if algorithms is None:
self.algorithms = SUPPORTED_MODELS
if exclude is not None:
for algorithm in exclude:
try:
self.algorithms.remove(algorithm)
except Exception:
Exception("Unknown algorithm in exclude: " + str(algorithm))
else:
self.algorithms = list()
for algorithm in algorithms:
self.algorithms.append(is_supported_model(algorithm))
else:
self.get_algorithms()
self.save_metadata()
[docs]
def run(self, run_parallel=False):
# Determine file extension of datasets in target folder:
file_count = 0
unique_datanames = list()
job_list = list()
for dataset_filename in glob.glob(self.rep_data_path + '/*'):
dataset_filename = str(Path(dataset_filename).as_posix())
# Save unique dataset names so that analysis is run only once if
# there is both a .txt and .csv version of dataset with same name.
file_extension = dataset_filename.split('/')[-1].split('.')[-1]
apply_name = dataset_filename.split('/')[-1].split('.')[0]
if not os.path.exists(self.full_path + "/replication/" + apply_name):
os.mkdir(self.full_path + "/replication/" + apply_name)
if file_extension == 'txt' or file_extension == 'csv' or file_extension == 'tsv':
if apply_name not in unique_datanames:
file_count += 1
unique_datanames.append(apply_name)
if self.run_cluster == "SLURMOld":
self.submit_slurm_cluster_job(dataset_filename)
continue
if self.run_cluster == "LSFOld":
self.submit_lsf_cluster_job(dataset_filename)
continue
job_obj = ReplicateJob(dataset_filename,
self.dataset_for_rep, self.full_path, self.class_label, self.instance_label,
self.match_label, ignore_features=self.ignore_features,
algorithms=self.algorithms, exclude=None,
cv_partitions=self.cv_partitions,
exclude_plots=None,
categorical_cutoff=self.categorical_cutoff,
sig_cutoff=self.sig_cutoff, scale_data=self.scale_data,
impute_data=self.impute_data,
multi_impute=self.multi_impute, show_plots=self.show_plots,
scoring_metric=self.scoring_metric,
random_state=self.random_state)
if run_parallel and run_parallel != "False":
# p = multiprocessing.Process(target=runner_fn, args=(job_obj,))
job_list.append(job_obj)
else:
job_obj.run()
if run_parallel and run_parallel != "False" and not self.run_cluster:
Parallel(n_jobs=num_cores)(delayed(runner_fn)(job_obj) for job_obj in job_list)
if self.run_cluster and "Old" not in self.run_cluster:
get_cluster(self.run_cluster, self.output_path + '/' + self.experiment_name,
self.queue, self.reserved_memory)
dask.compute([dask.delayed(runner_fn)(job_obj) for job_obj in job_list])
if file_count == 0:
# Check that there was at least 1 dataset
raise Exception("There must be at least one .txt, .csv, or .tsv dataset in rep_data_path directory")
[docs]
def get_algorithms(self):
pickle_in = open(self.output_path + '/' + self.experiment_name + '/' + "algInfo.pickle", 'rb')
alg_info = pickle.load(pickle_in)
algorithms = list()
for algorithm in alg_info.keys():
if alg_info[algorithm][0]:
algorithms.append(algorithm)
self.algorithms = algorithms
pickle_in.close()
[docs]
def get_cluster_params(self, dataset_filename):
exclude_param = ','.join(self.exclude_plots) if self.exclude_plots else None
cluster_params = [dataset_filename, self.dataset_for_rep, self.full_path, self.class_label, self.instance_label,
self.match_label, None, None, self.cv_partitions, exclude_param,
self.categorical_cutoff, self.sig_cutoff,
self.scale_data, self.impute_data,
self.multi_impute, self.show_plots, self.scoring_metric, self.random_state]
cluster_params = [str(i) for i in cluster_params]
return cluster_params
[docs]
def submit_slurm_cluster_job(self, dataset_filename):
job_ref = str(time.time())
job_name = self.output_path + '/' + self.experiment_name + '/jobs/P9_' + job_ref + '_run.sh'
sh_file = open(job_name, 'w')
sh_file.write('#!/bin/bash\n')
sh_file.write('#SBATCH -p ' + self.queue + '\n')
sh_file.write('#SBATCH --job-name=' + job_ref + '\n')
sh_file.write('#SBATCH --mem=' + str(self.reserved_memory) + 'G' + '\n')
# sh_file.write('#BSUB -M '+str(maximum_memory)+'GB'+'\n')
sh_file.write(
'#SBATCH -o ' + self.output_path + '/' + self.experiment_name +
'/logs/P9_' + job_ref + '.o\n')
sh_file.write(
'#SBATCH -e ' + self.output_path + '/' + self.experiment_name +
'/logs/P9_' + job_ref + '.e\n')
file_path = str(Path(__file__).parent.parent.parent) + "/streamline/legacy" + '/RepJobSubmit.py'
cluster_params = self.get_cluster_params(dataset_filename)
command = ' '.join(['srun', 'python', file_path] + cluster_params)
sh_file.write(command + '\n')
sh_file.close()
os.system('sbatch ' + job_name)
[docs]
def submit_lsf_cluster_job(self, dataset_filename):
job_ref = str(time.time())
job_name = self.output_path + '/' + self.experiment_name + '/jobs/P9_' + job_ref + '_run.sh'
sh_file = open(job_name, 'w')
sh_file.write('#!/bin/bash\n')
sh_file.write('#BSUB -q ' + self.queue + '\n')
sh_file.write('#BSUB -J ' + job_ref + '\n')
sh_file.write('#BSUB -R "rusage[mem=' + str(self.reserved_memory) + 'G]"' + '\n')
sh_file.write('#BSUB -M ' + str(self.reserved_memory) + 'GB' + '\n')
sh_file.write(
'#BSUB -o ' + self.output_path + '/' + self.experiment_name +
'/logs/P9_' + job_ref + '.o\n')
sh_file.write(
'#BSUB -e ' + self.output_path + '/' + self.experiment_name +
'/logs/P9_' + job_ref + '.e\n')
file_path = str(Path(__file__).parent.parent.parent) + "/streamline/legacy" + '/RepJobSubmit.py'
cluster_params = self.get_cluster_params(dataset_filename)
command = ' '.join(['python', file_path] + cluster_params)
sh_file.write(command + '\n')
sh_file.close()
os.system('bsub < ' + job_name)