import csv
import glob
import os
import pickle
import time
import logging
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rc
from statistics import mean, median, stdev
from scipy.stats import kruskal, wilcoxon, mannwhitneyu
from streamline.utils.job import Job
from streamline.modeling.utils import ABBREVIATION, COLORS
import seaborn as sns
sns.set_theme()
[docs]
class StatsJob(Job):
"""
This 'Job' script creates summaries of ML classification evaluation statistics
(means and standard deviations), ROC and PRC plots (comparing CV performance
in the same ML algorithm and comparing average performance
between ML algorithms), model feature importance averages over CV runs,
boxplot comparing ML algorithms for each metric, Kruskal Wallis
and Mann Whitney statistical comparisons between ML algorithms, model
feature importance boxplot for each algorithm, and composite feature
importance plots summarizing model feature importance across all ML algorithms.
It is run for a single dataset from the original target
dataset folder (data_path) in Phase 1 (i.e. stats summary completed for all cv datasets).
"""
def __init__(self, full_path, algorithms, class_label, instance_label, scoring_metric='balanced_accuracy',
cv_partitions=5, top_features=40, sig_cutoff=0.05, metric_weight='balanced_accuracy', scale_data=True,
exclude_plots=None, show_plots=False):
"""
Args:
full_path:
algorithms:
class_label:
instance_label:
scoring_metric:
cv_partitions:
top_features:
sig_cutoff:
metric_weight:
scale_data:
show_plots:
"""
super().__init__()
self.full_path = full_path
self.algorithms = sorted(algorithms)
self.class_label = class_label
self.instance_label = instance_label
self.data_name = self.full_path.split('/')[-1]
self.experiment_path = '/'.join(self.full_path.split('/')[:-1])
known_exclude_options = ['plot_ROC', 'plot_PRC', 'plot_FI_box', 'plot_metric_boxplots']
if exclude_plots is not None:
for x in exclude_plots:
if x not in known_exclude_options:
logging.warning("Unknown exclusion option " + str(x))
else:
exclude_plots = list()
self.plot_roc = 'plot_ROC' not in exclude_plots
self.plot_prc = 'plot_PRC' not in exclude_plots
self.plot_metric_boxplots = 'plot_metric_boxplots' not in exclude_plots
self.plot_fi_box = 'plot_FI_box' not in exclude_plots
self.cv_partitions = cv_partitions
self.scale_data = scale_data
self.scoring_metric = scoring_metric
self.top_features = top_features
self.sig_cutoff = sig_cutoff
self.metric_weight = metric_weight
self.show_plots = show_plots
if self.plot_fi_box:
self.feature_headers = pd.read_csv(self.full_path + "/exploratory/ProcessedFeatureNames.csv",
sep=',').columns.values.tolist() # Get Original Headers
else:
try:
self.feature_headers = pd.read_csv(self.full_path
+ "/exploratory/ProcessedFeatureNames.csv",
sep=',').columns.values.tolist() # Get Original Headers
# if self.plot_fi_box:
# self.original_headers = pd.read_csv(self.full_path + "/exploratory/OriginalFeatureNames.csv",
# sep=',').columns.values.tolist() # Get Original Headers
# else:
# try:
# self.original_headers = pd.read_csv(self.full_path
# + "/exploratory/OriginalFeatureNames.csv",
# sep=',').columns.values.tolist()
# # Get Original Headers
except Exception:
self.original_headers = None
# self.feature_headers = self.original_headers.copy()
# if self.instance_label is not None:
# if self.instance_label in self.feature_headers:
# self.feature_headers.remove(self.instance_label)
# self.feature_headers.remove(self.class_label)
self.abbrev = dict((k, ABBREVIATION[k]) for k in self.algorithms if k in ABBREVIATION)
self.colors = dict((k, COLORS[k]) for k in self.algorithms if k in COLORS)
[docs]
def run(self):
self.job_start_time = time.time() # for tracking phase runtime
logging.info('Running Statistics Summary for ' + str(self.data_name))
# Translate metric name from scikit-learn standard
# (currently balanced accuracy is hardcoded for use in generating FI plots due to no-skill normalization)
metric_term_dict = {'balanced_accuracy': 'Balanced Accuracy', 'accuracy': 'Accuracy', 'f1': 'F1_Score',
'recall': 'Sensitivity (Recall)', 'precision': 'Precision (PPV)', 'roc_auc': 'ROC AUC'}
self.metric_weight = metric_term_dict[self.metric_weight]
# Get algorithms run, specify algorithm abbreviations, colors to use for
# algorithms in plots, and original ordered feature name list
self.preparation()
# Gather and summarize all evaluation metrics for each algorithm across all CVs.
# Returns result_table used to plot average ROC and PRC plots and metric_dict
# organizing all metrics over all algorithms and CVs.
result_table, metric_dict = self.primary_stats()
# Plot ROC and PRC curves comparing average ML algorithm performance (averaged over all CVs)
logging.info('Generating ROC and PRC plots...')
self.do_plot_roc(result_table)
self.do_plot_prc(result_table)
# Make list of metric names
logging.info('Saving Metric Summaries...')
metrics = list(metric_dict[self.algorithms[0]].keys())
# Save metric means, median and standard deviations
self.save_metric_stats(metrics, metric_dict)
# Generate boxplot comparing algorithm performance for each standard metric, if specified by user
if self.plot_metric_boxplots:
logging.info('Generating Metric Boxplots...')
self.metric_boxplots(metrics, metric_dict)
# Calculate and export Kruskal Wallis, Mann Whitney, and wilcoxon Rank sum stats
# if more than one ML algorithm has been run (for the comparison) - note stats are based on
# comparing the multiple CV models for each algorithm.
if len(self.algorithms) > 1:
logging.info('Running Non-Parametric Statistical Significance Analysis...')
kruskal_summary = self.kruskal_wallis(metrics, metric_dict)
self.wilcoxon_rank(metrics, metric_dict, kruskal_summary)
self.mann_whitney_u(metrics, metric_dict, kruskal_summary)
# Run FI Related stats and plots
self.fi_stats(metric_dict)
# Export phase runtime
self.save_runtime()
# Parse all pipeline runtime files into a single runtime report
self.parse_runtime()
# Print phase completion
logging.info(self.data_name + " phase 5 complete")
job_file = open(self.experiment_path + '/jobsCompleted/job_stats_' + self.data_name + '.txt', 'w')
job_file.write('complete')
job_file.close()
[docs]
def fi_stats(self, metric_dict):
metric_ranking = 'mean'
metric_weighting = 'mean'
# mean or median #Ryan add a run parameter to STREAMLINE to
# allow user to decide plot rankings for FI using mean by default
# Prepare for feature importance visualizations
logging.info('Preparing for Model Feature Importance Plotting...')
# old - 'Balanced Accuracy'
fi_df_list, fi_med_list, fi_med_norm_list, med_metric_list, all_feature_list, \
non_zero_union_features, \
non_zero_union_indexes = self.prep_fi(metric_dict, metric_ranking, metric_weighting)
# Select 'top' features for composite visualisation
features_to_viz = self.select_for_composite_viz(non_zero_union_features, non_zero_union_indexes,
med_metric_list, fi_med_norm_list)
# Generate FI boxplots for each modeling algorithm if specified by user
if self.plot_fi_box:
logging.info('Generating Feature Importance Boxplot and Histograms...')
self.do_fi_boxplots(fi_df_list, fi_med_list, metric_ranking)
self.do_fi_histogram(fi_med_list, metric_ranking)
# Visualize composite FI - Currently set up to only use Balanced Accuracy for composite FI plot visualization
logging.info('Generating Composite Feature Importance Plots...')
# Take top feature names to visualize and get associated feature importance values for each algorithm,
# and original data ordered feature names list
# If we want composite FI plots to be displayed in descending total bar height order.
top_fi_med_norm_list, all_feature_list_to_viz = self.get_fi_to_viz_sorted(features_to_viz, all_feature_list,
fi_med_norm_list)
# Generate Normalized composite FI plot
if metric_ranking == 'mean':
self.composite_fi_plot(top_fi_med_norm_list, all_feature_list_to_viz, 'Norm',
'Normalized Mean Feature Importance', metric_ranking, metric_weighting)
elif metric_ranking == 'median':
self.composite_fi_plot(top_fi_med_norm_list, all_feature_list_to_viz, 'Norm',
'Normalized Median Feature Importance', metric_ranking, metric_weighting)
else:
print("Error: metric_ranking selection not found (must be mean or median)")
# # Fractionate FI scores for normalized and fractionated composite FI plot
# frac_lists = self.frac_fi(top_fi_med_norm_list)
# # Generate Normalized and Fractionated composite FI plot
# composite_fi_plot(frac_lists, algorithms, list(colors.values()),
# all_feature_list_to_viz, 'Norm_Frac',
# 'Normalized and Fractionated Feature Importance')
# Weight FI scores for normalized and (model performance) weighted composite FI plot
weighted_lists, weights = self.weight_fi(med_metric_list, top_fi_med_norm_list)
# Generate Normalized and Weighted Compound FI plot
if metric_ranking == 'mean':
self.composite_fi_plot(weighted_lists, all_feature_list_to_viz,
'Norm_Weight', 'Normalized and Weighted Mean Feature Importance', metric_ranking,
metric_weighting)
elif metric_ranking == 'median':
self.composite_fi_plot(weighted_lists, all_feature_list_to_viz,
'Norm_Weight', 'Normalized and Weighted Median Feature Importance', metric_ranking,
metric_weighting)
else:
print("Error: metric_ranking selection not found (must be mean or median)")
# Weight the Fractionated FI scores for normalized,fractionated, and weighted compound FI plot
# weighted_frac_lists = self.weight_frac_fi(frac_lists,weights)
# Generate Normalized, Fractionated, and Weighted Compound FI plot
# self.composite_fi_plot(weighted_frac_lists, algorithms, list(colors.values()),
# all_feature_list_to_viz, 'Norm_Frac_Weight',
# 'Normalized, Fractionated, and Weighted Feature Importance')
[docs]
def preparation(self):
"""
Creates directory for all results files, decodes included ML modeling
algorithms that were run
"""
# Create Directory
if not os.path.exists(self.full_path + '/model_evaluation'):
os.mkdir(self.full_path + '/model_evaluation')
if not os.path.exists(self.full_path + '/model_evaluation/feature_importance/'):
os.mkdir(self.full_path + '/model_evaluation/feature_importance/')
[docs]
def primary_stats(self, master_list=None, rep_data=None):
"""
Combine classification metrics and model feature importance scores
as well as ROC and PRC plot data across all CV datasets.
Generate ROC and PRC plots comparing separate CV models for each individual modeling algorithm.
"""
result_table = []
metric_dict = {}
# completed for each individual ML modeling algorithm
for algorithm in self.algorithms:
# stores values used in ROC and PRC plots
alg_result_table = []
# Define evaluation stats variable lists
s_bac, s_ac, s_f1, s_re, s_sp, s_pr, s_tp, s_tn, s_fp, s_fn, s_npv, s_lrp, s_lrm = [[] for _ in range(13)]
# Define feature importance lists
# used to save model feature importance individually for
# each cv within single summary file (all original features
# in dataset prior to feature selection included)
fi_all = []
# Define ROC plot variable lists
tprs = [] # stores interpolated true positive rates for average CV line in ROC
aucs = [] # stores individual CV areas under ROC curve to calculate average
mean_fpr = np.linspace(0, 1, 100) # used to plot average of CV line in ROC plot
mean_recall = np.linspace(0, 1, 100) # used to plot average of CV line in PRC plot
# Define PRC plot variable lists
precs = [] # stores interpolated precision values for average CV line in PRC
praucs = [] # stores individual CV areas under PRC curve to calculate average
aveprecs = [] # stores individual CV average precisions for PRC to calculate CV average
# Gather statistics over all CV partitions
for cv_count in range(0, self.cv_partitions):
if master_list is None:
# Unpickle saved metrics from previous phase
result_file = self.full_path + '/model_evaluation/pickled_metrics/' \
+ self.abbrev[algorithm] + "_CV_" + str(cv_count) + "_metrics.pickle"
file = open(result_file, 'rb')
results = pickle.load(file)
# [metricList, fpr, tpr, roc_auc, prec, recall, prec_rec_auc, ave_prec, fi, probas_]
file.close()
# Separate pickled results
metric_list, fpr, tpr, roc_auc, prec, recall, prec_rec_auc, ave_prec, fi, probas_ = results
else:
results = master_list[cv_count][algorithm]
# grabs evalDict for a specific algorithm entry (with data values)
metric_list = results[0]
fpr = results[1]
tpr = results[2]
roc_auc = results[3]
prec = results[4]
recall = results[5]
prec_rec_auc = results[6]
ave_prec = results[7]
fi = results[8]
probas_ = results[9]
# Separate metrics from metricList
s_bac.append(metric_list[0])
s_ac.append(metric_list[1])
s_f1.append(metric_list[2])
s_re.append(metric_list[3])
s_sp.append(metric_list[4])
s_pr.append(metric_list[5])
s_tp.append(metric_list[6])
s_tn.append(metric_list[7])
s_fp.append(metric_list[8])
s_fn.append(metric_list[9])
s_npv.append(metric_list[10])
s_lrp.append(metric_list[11])
s_lrm.append(metric_list[12])
# update list that stores values used in ROC and PRC plots
alg_result_table.append([fpr, tpr, roc_auc, prec, recall, prec_rec_auc,
ave_prec])
# Update ROC plot variable lists needed to plot all CVs in one ROC plot
tprs.append(np.interp(mean_fpr, fpr, tpr))
tprs[-1][0] = 0.0
aucs.append(roc_auc)
# Update PRC plot variable lists needed to plot all CVs in one PRC plot
precs.append(np.interp(mean_recall, recall, prec))
praucs.append(prec_rec_auc)
aveprecs.append(ave_prec)
if master_list is None:
# Format feature importance scores as list
# (takes into account that all features are not in each CV partition)
temp_list = []
j = 0
headers = pd.read_csv(
self.full_path + '/CVDatasets/' + self.data_name
+ '_CV_' + str(cv_count) + '_Test.csv').columns.values.tolist()
if self.instance_label is not None: # Match label will never be in CV datasets
if self.instance_label in headers:
headers.remove(self.instance_label)
headers.remove(self.class_label)
# for each in self.original_headers:
for each in self.feature_headers:
# Check if current feature from original dataset was in the partition
if each in headers:
# Deal with features not being in original order (find index of current feature list.index()
f_index = headers.index(each)
temp_list.append(fi[f_index])
else:
temp_list.append(0)
j += 1
fi_all.append(temp_list)
logging.info("Running stats on " + algorithm)
# Define values for the mean ROC line (mean of individual CVs)
mean_tpr = np.mean(tprs, axis=0)
mean_tpr[-1] = 1.0
mean_auc = np.mean(aucs)
if self.plot_roc:
self.do_model_roc(algorithm, tprs, aucs, mean_fpr, alg_result_table)
# Define values for the mean PRC line (mean of individual CVs)
mean_prec = np.mean(precs, axis=0)
mean_pr_auc = np.mean(praucs)
if self.plot_prc:
if master_list is None:
self.do_model_prc(algorithm, precs, praucs, mean_recall, alg_result_table)
else:
self.do_model_prc(algorithm, precs, praucs, mean_recall, alg_result_table, rep_data, True)
# Export and save all CV metric stats for each individual algorithm
results = {'Balanced Accuracy': s_bac, 'Accuracy': s_ac, 'F1 Score': s_f1, 'Sensitivity (Recall)': s_re,
'Specificity': s_sp, 'Precision (PPV)': s_pr, 'TP': s_tp, 'TN': s_tn, 'FP': s_fp, 'FN': s_fn,
'NPV': s_npv, 'LR+': s_lrp, 'LR-': s_lrm, 'ROC AUC': aucs, 'PRC AUC': praucs,
'PRC APS': aveprecs}
dr = pd.DataFrame(results)
filepath = self.full_path + '/model_evaluation/' + self.abbrev[algorithm] + "_performance.csv"
dr.to_csv(filepath, header=True, index=False)
metric_dict[algorithm] = results
# Save FI scores for all CV models
if master_list is None:
self.save_fi(fi_all, self.abbrev[algorithm], self.feature_headers)
# self.save_fi(fi_all, self.abbrev[algorithm], self.original_headers) #bug
# Store ave metrics for creating global ROC and PRC plots later
mean_ave_prec = np.mean(aveprecs)
# result_dict = {'algorithm':algorithm,'fpr':mean_fpr, 'tpr':mean_tpr,
# 'auc':mean_auc, 'prec':mean_prec, 'pr_auc':mean_pr_auc,
# 'ave_prec':mean_ave_prec}
result_dict = {'algorithm': algorithm, 'fpr': mean_fpr, 'tpr': mean_tpr,
'auc': mean_auc, 'prec': mean_prec, 'recall': mean_recall,
'pr_auc': mean_pr_auc, 'ave_prec': mean_ave_prec}
result_table.append(result_dict)
# Result table later used to create global ROC an PRC plots comparing average ML algorithm performance.
result_table = pd.DataFrame.from_dict(result_table)
result_table.set_index('algorithm', inplace=True)
return result_table, metric_dict
[docs]
def save_fi(self, fi_all, algorithm, global_feature_list):
"""
Creates directory to store model feature importance results and,
for each algorithm, exports a file of feature importance scores from each CV.
"""
dr = pd.DataFrame(fi_all)
if not os.path.exists(self.full_path + '/model_evaluation/feature_importance/'):
os.mkdir(self.full_path + '/model_evaluation/feature_importance/')
filepath = self.full_path + '/model_evaluation/feature_importance/' + algorithm + "_FI.csv"
dr.to_csv(filepath, header=global_feature_list, index=False)
[docs]
def do_model_roc(self, algorithm, tprs, aucs, mean_fpr, alg_result_table):
# Define values for the mean ROC line (mean of individual CVs)
mean_tpr = np.mean(tprs, axis=0)
mean_tpr[-1] = 1.0
mean_auc = np.mean(aucs)
# Generate ROC Plot (including individual CV's lines, average line, and no skill line)
# based on https://scikit-learn.org/stable/auto_examples/model_selection/plot_roc_crossval.html
if self.plot_roc:
# Set figure dimensions
plt.rcParams["figure.figsize"] = (6, 6)
# Plot individual CV ROC lines
for i in range(self.cv_partitions):
plt.plot(alg_result_table[i][0], alg_result_table[i][1], lw=1, alpha=0.3,
label='ROC fold %d (AUC = %0.3f)' % (i, alg_result_table[i][2]))
# Plot no-skill line
plt.plot([0, 1], [0, 1],
linestyle='--', lw=2, color='black', label='No-Skill', alpha=.8)
# Plot average line for all CVs
std_auc = np.std(aucs) # AUC standard deviations across CVs
plt.plot(mean_fpr, mean_tpr, color=self.colors[algorithm],
label=r'Mean ROC (AUC = %0.3f $\pm$ %0.3f)' % (float(mean_auc), float(std_auc)),
lw=2, alpha=.8)
# Plot standard deviation grey zone of curves
std_tpr = np.std(tprs, axis=0)
tprs_upper = np.minimum(mean_tpr + std_tpr, 1)
tprs_lower = np.maximum(mean_tpr - std_tpr, 0)
plt.fill_between(mean_fpr, tprs_lower, tprs_upper, color='grey', alpha=.2, label=r'$\pm$ 1 std. dev.')
# Specify plot axes,labels, and legend
plt.xlim([-0.05, 1.05])
plt.ylim([-0.05, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title(algorithm)
plt.legend(loc="upper left", bbox_to_anchor=(1.01, 1))
# Export and/or show plot
plt.savefig(self.full_path + '/model_evaluation/' +
self.abbrev[algorithm] + "_ROC.png", bbox_inches="tight")
if self.show_plots:
plt.show()
else:
plt.close('all')
# plt.cla() # not required
[docs]
def do_model_prc(self, algorithm, precs, praucs, mean_recall, alg_result_table, rep_data=None, replicate=False):
# Define values for the mean PRC line (mean of individual CVs)
mean_prec = np.mean(precs, axis=0)
mean_pr_auc = np.mean(praucs)
# Generate PRC Plot (including individual CV's lines, average line, and no skill line)
if self.plot_prc:
# Set figure dimensions
plt.rcParams["figure.figsize"] = (6, 6)
# Plot individual CV PRC lines
for i in range(self.cv_partitions):
plt.plot(alg_result_table[i][4], alg_result_table[i][3], lw=1, alpha=0.3,
label='PRC fold %d (AUC = %0.3f)' % (i, alg_result_table[i][5]))
# Estimate no skill line based on the fraction of cases found in the first test dataset
# Technically there could be a unique no-skill line for each CV dataset based
# on final class balance (however only one is needed, and stratified CV attempts
# to keep partitions with similar/same class balance)
if not replicate:
# Estimate no skill line based on the fraction of cases found in the first test dataset
test = pd.read_csv(
self.full_path + '/CVDatasets/' + self.data_name + '_CV_0_Test.csv')
test_y = test[self.class_label].values
else:
test_y = rep_data[self.class_label].values
no_skill = len(test_y[test_y == 1]) / len(test_y) # Fraction of cases
# Plot no-skill line
plt.plot([0, 1], [no_skill, no_skill], color='black', linestyle='--', label='No-Skill', alpha=.8)
# Plot average line for all CVs
std_pr_auc = np.std(praucs)
plt.plot(mean_recall, mean_prec, color=self.colors[algorithm],
label=r'Mean PRC (AUC = %0.3f $\pm$ %0.3f)' % (float(mean_pr_auc), float(std_pr_auc)),
lw=2, alpha=.8)
# Plot standard deviation grey zone of curves
std_prec = np.std(precs, axis=0)
precs_upper = np.minimum(mean_prec + std_prec, 1)
precs_lower = np.maximum(mean_prec - std_prec, 0)
plt.fill_between(mean_recall, precs_lower, precs_upper, color='grey',
alpha=.2, label=r'$\pm$ 1 std. dev.')
# Specify plot axes,labels, and legend
plt.xlim([-0.05, 1.05])
plt.ylim([-0.05, 1.05])
plt.xlabel('Recall (Sensitivity)')
plt.ylabel('Precision (PPV)')
plt.title(algorithm)
plt.legend(loc="upper left", bbox_to_anchor=(1.01, 1))
# Export and/or show plot
plt.savefig(self.full_path + '/model_evaluation/' +
self.abbrev[algorithm] + "_PRC.png", bbox_inches="tight")
if self.show_plots:
plt.show()
else:
plt.close('all')
# plt.cla() # not required
[docs]
def do_plot_roc(self, result_table):
"""
Generate ROC plot comparing average ML algorithm performance
(over all CV training/testing sets)
"""
count = 0
# Plot curves for each individual ML algorithm
for i in result_table.index:
# plt.plot(result_table.loc[i]['fpr'],result_table.loc[i]['tpr'],
# color=colors[i],label="{}, AUC={:.3f}".format(i, result_table.loc[i]['auc']))
plt.plot(result_table.loc[i]['fpr'], result_table.loc[i]['tpr'], color=self.colors[i],
label="{}, AUC={:.3f}".format(i, result_table.loc[i]['auc']))
count += 1
# Set figure dimensions
plt.rcParams["figure.figsize"] = (6, 6)
# Plot no-skill line
plt.plot([0, 1], [0, 1], color='black', linestyle='--', label='No-Skill', alpha=.8)
# Specify plot axes,labels, and legend
plt.xticks(np.arange(0.0, 1.1, step=0.1))
plt.xlabel("False Positive Rate", fontsize=15)
plt.yticks(np.arange(0.0, 1.1, step=0.1))
plt.ylabel("True Positive Rate", fontsize=15)
plt.legend(loc="upper left", bbox_to_anchor=(1.01, 1))
# Export and/or show plot
plt.savefig(self.full_path + '/model_evaluation/Summary_ROC.png', bbox_inches="tight")
if self.show_plots:
plt.show()
else:
plt.close('all')
# plt.cla() # not required
[docs]
def do_plot_prc(self, result_table, rep_data=None, replicate=False):
"""
Generate PRC plot comparing average ML algorithm performance
(over all CV training/testing sets)
"""
count = 0
# Plot curves for each individual ML algorithm
for i in result_table.index:
plt.plot(result_table.loc[i]['recall'], result_table.loc[i]['prec'], color=self.colors[i],
label="{}, AUC={:.3f}, APS={:.3f}".format(i, result_table.loc[i]['pr_auc'],
result_table.loc[i]['ave_prec']))
count += 1
if not replicate:
# Estimate no skill line based on the fraction of cases found in the first test dataset
test = pd.read_csv(self.full_path + '/CVDatasets/' + self.data_name + '_CV_0_Test.csv')
if self.instance_label is not None:
test = test.drop(self.instance_label, axis=1)
test_y = test[self.class_label].values
else:
test_y = rep_data[self.class_label].values
no_skill = len(test_y[test_y == 1]) / len(test_y) # Fraction of cases
# Plot no-skill line
plt.plot([0, 1], [no_skill, no_skill], color='black', linestyle='--', label='No-Skill', alpha=.8)
# Specify plot axes,labels, and legend
plt.xticks(np.arange(0.0, 1.1, step=0.1))
plt.xlabel("Recall (Sensitivity)", fontsize=15)
plt.yticks(np.arange(0.0, 1.1, step=0.1))
plt.ylabel("Precision (PPV)", fontsize=15)
plt.legend(loc="upper left", bbox_to_anchor=(1.01, 1))
# Export and/or show plot
plt.savefig(self.full_path + '/model_evaluation/Summary_PRC.png', bbox_inches="tight")
if self.show_plots:
plt.show()
else:
plt.close('all')
# plt.cla() # not required
[docs]
def save_metric_stats(self, metrics, metric_dict):
"""
Exports csv file with mean, median and std dev metric values
(over all CVs) for each ML modeling algorithm
"""
# TODO: Clean this function up, save everything together
with open(self.full_path + '/model_evaluation/Summary_performance_median.csv', mode='w', newline="") as file:
writer = csv.writer(file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
e = ['']
e.extend(metrics)
writer.writerow(e) # Write headers (balanced accuracy, etc.)
for algorithm in metric_dict:
astats = []
for li in list(metric_dict[algorithm].values()):
li = [float(i) for i in li]
mediani = median(li)
astats.append(str(mediani))
to_add = [algorithm]
to_add.extend(astats)
writer.writerow(to_add)
file.close()
with open(self.full_path + '/model_evaluation/Summary_performance_mean.csv', mode='w', newline="") as file:
writer = csv.writer(file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
e = ['']
e.extend(metrics)
writer.writerow(e) # Write headers (balanced accuracy, etc.)
for algorithm in metric_dict:
astats = []
for li in list(metric_dict[algorithm].values()):
li = [float(i) for i in li]
meani = mean(li)
astats.append(str(meani))
to_add = [algorithm]
to_add.extend(astats)
writer.writerow(to_add)
file.close()
with open(self.full_path + '/model_evaluation/Summary_performance_std.csv', mode='w', newline="") as file:
writer = csv.writer(file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
e = ['']
e.extend(metrics)
writer.writerow(e) # Write headers (balanced accuracy, etc.)
for algorithm in metric_dict:
astats = []
for li in list(metric_dict[algorithm].values()):
li = [float(i) for i in li]
std = stdev(li)
astats.append(str(std))
to_add = [algorithm]
to_add.extend(astats)
writer.writerow(to_add)
file.close()
[docs]
def metric_boxplots(self, metrics, metric_dict):
"""
Export boxplots comparing algorithm performance for each standard metric
"""
if not os.path.exists(self.full_path + '/model_evaluation/metricBoxplots'):
os.mkdir(self.full_path + '/model_evaluation/metricBoxplots')
for metric in metrics:
temp_list = []
for algorithm in self.algorithms:
temp_list.append(metric_dict[algorithm][metric])
td = pd.DataFrame(temp_list)
td = td.transpose().astype('float')
td.columns = self.algorithms
# Generate boxplot
td.plot(kind='box', rot=90)
# Specify plot labels
plt.ylabel(str(metric))
plt.xlabel('ML Algorithm')
# Export and/or show plot
plt.savefig(self.full_path +
'/model_evaluation/metricBoxplots/Compare_' + metric + '.png', bbox_inches="tight")
if self.show_plots:
plt.show()
else:
plt.close('all')
# plt.cla() # not required
[docs]
def kruskal_wallis(self, metrics, metric_dict):
"""
Apply non-parametric Kruskal Wallis one-way ANOVA on ranks.
Determines if there is a statistically significant difference in algorithm performance across CV runs.
Completed for each standard metric separately.
"""
# Create directory to store significance testing results (used for both Kruskal Wallis and MannWhitney U-test)
if not os.path.exists(self.full_path + '/model_evaluation/statistical_comparisons'):
os.mkdir(self.full_path + '/model_evaluation/statistical_comparisons')
# Create dataframe to store analysis results for each metric
label = ['Statistic', 'P-Value', 'Sig(*)']
kruskal_summary = pd.DataFrame(index=metrics, columns=label)
# Apply Kruskal Wallis test for each metric
for metric in metrics:
temp_array = []
for algorithm in self.algorithms:
temp_array.append(metric_dict[algorithm][metric])
try:
result = kruskal(*temp_array)
except Exception:
result = [temp_array[0], 1]
kruskal_summary.at[metric, 'Statistic'] = str(round(result[0], 6))
kruskal_summary.at[metric, 'P-Value'] = str(round(result[1], 6))
if result[1] < self.sig_cutoff:
kruskal_summary.at[metric, 'Sig(*)'] = str('*')
else:
kruskal_summary.at[metric, 'Sig(*)'] = str('')
# Export analysis summary to .csv file
kruskal_summary.to_csv(self.full_path + '/model_evaluation/statistical_comparisons/KruskalWallis.csv')
return kruskal_summary
[docs]
def wilcoxon_rank(self, metrics, metric_dict, kruskal_summary):
"""
Apply non-parametric Wilcoxon signed-rank test (pairwise comparisons).
If a significant Kruskal Wallis algorithm difference was found for a
given metric, Wilcoxon tests individual algorithm pairs
to determine if there is a statistically significant difference in
algorithm performance across CV runs. Test statistic will be zero if
all scores from one set are
larger than the other.
"""
for metric in metrics:
if kruskal_summary['Sig(*)'][metric] == '*':
wilcoxon_stats = []
done = []
for algorithm1 in self.algorithms:
for algorithm2 in self.algorithms:
if (not [algorithm1, algorithm2] in done) and \
(not [algorithm2, algorithm1] in done) and (algorithm1 != algorithm2):
set1 = metric_dict[algorithm1][metric]
set2 = metric_dict[algorithm2][metric]
# handle error when metric values are equal for both algorithms
if set1 == set2: # Check if all nums are equal in sets
report = ['NA', 1]
else: # Apply Wilcoxon Rank Sum test
try:
report = wilcoxon(set1, set2)
except Exception:
report = ['NA_error', 1]
# Summarize test information in list
tempstats = [algorithm1, algorithm2, report[0], report[1], '']
if report[1] < self.sig_cutoff:
tempstats[4] = '*'
wilcoxon_stats.append(tempstats)
done.append([algorithm1, algorithm2])
# Export test results
wilcoxon_stats_df = pd.DataFrame(wilcoxon_stats)
wilcoxon_stats_df.columns = ['Algorithm 1', 'Algorithm 2', 'Statistic', 'P-Value', 'Sig(*)']
wilcoxon_stats_df.to_csv(self.full_path
+ '/model_evaluation/statistical_comparisons/'
'WilcoxonRank_' + metric + '.csv', index=False)
[docs]
def mann_whitney_u(self, metrics, metric_dict, kruskal_summary):
"""
Apply non-parametric Mann Whitney U-test (pairwise comparisons).
If a significant Kruskal Wallis algorithm difference was found for
a given metric, Mann Whitney tests individual algorithm pairs
to determine if there is a statistically significant difference
in algorithm performance across CV runs. Test statistic will be
zero if all scores from one set are larger than the other.
"""
for metric in metrics:
if kruskal_summary['Sig(*)'][metric] == '*':
mann_stats = []
done = []
for algorithm1 in self.algorithms:
for algorithm2 in self.algorithms:
if (not [algorithm1, algorithm2] in done) and \
(not [algorithm2, algorithm1] in done) and (algorithm1 != algorithm2):
set1 = metric_dict[algorithm1][metric]
set2 = metric_dict[algorithm2][metric]
if set1 == set2: # Check if all nums are equal in sets
report = ['NA', 1]
else: # Apply Mann Whitney U test
try:
report = mannwhitneyu(set1, set2)
except Exception:
report = ['NA_error', 1]
# Summarize test information in list
tempstats = [algorithm1, algorithm2, report[0], report[1], '']
if report[1] < self.sig_cutoff:
tempstats[4] = '*'
mann_stats.append(tempstats)
done.append([algorithm1, algorithm2])
# Export test results
mann_stats_df = pd.DataFrame(mann_stats)
mann_stats_df.columns = ['Algorithm 1', 'Algorithm 2', 'Statistic', 'P-Value', 'Sig(*)']
mann_stats_df.to_csv(self.full_path +
'/model_evaluation/'
'statistical_comparisons/MannWhitneyU_' + metric + '.csv', index=False)
[docs]
def prep_fi(self, metric_dict, metric_ranking, metric_weighting):
"""
Organizes and prepares model feature importance
data for boxplot and composite feature importance figure generation.
"""
# Initialize required lists
# algorithm feature importance dataframe list (used to generate FI boxplot for each algorithm)
fi_df_list = []
# algorithm feature importance medians list (used to generate composite FI barplots)
fi_med_list = []
# algorithm focus metric medians list (used in weighted FI viz)
med_metric_list = []
# list of pre-feature selection feature names as they appear in FI reports for each algorithm
all_feature_list = []
# Get necessary feature importance data and primary metric data
# (currently only 'balanced accuracy' can be used for this)
for algorithm in self.algorithms:
# Get relevant feature importance info
temp_df = pd.read_csv(self.full_path + '/model_evaluation/feature_importance/' + self.abbrev[
algorithm] + "_FI.csv") # CV FI scores for all original features in dataset.
# Should be same for all algorithm files (i.e. all original features in standard CV dataset order)
if algorithm == self.algorithms[0]:
all_feature_list = temp_df.columns.tolist()
fi_df_list.append(temp_df)
if metric_ranking == 'mean':
fi_med_list.append(temp_df.mean().tolist()) # Saves mean FI scores over CV runs
elif metric_ranking == 'median':
fi_med_list.append(temp_df.median().tolist()) # Saves median FI scores over CV runs
else:
raise Exception("Error: metric_ranking selection not found (must be mean or median)")
# Get relevant metric info
if metric_weighting == 'mean':
med_ba = mean(metric_dict[algorithm][self.metric_weight])
elif metric_weighting == 'median':
med_ba = median(metric_dict[algorithm][self.metric_weight])
else: # use mean as backup
raise Exception("Error: metric_weighting selection not found (must be mean or median)")
med_metric_list.append(med_ba)
# Normalize Median Feature importance scores, so they fall between (0 - 1)
fi_med_norm_list = []
for each in fi_med_list: # each algorithm
norm_list = []
for i in range(len(each)): # each feature (score) in original data order
if each[i] <= 0: # Feature importance scores assumed to be uninformative if at or below 0
norm_list.append(0)
else:
norm_list.append((each[i]) / (max(each)))
fi_med_norm_list.append(norm_list)
# Identify features with non-zero medians
# (step towards excluding features that had zero feature importance for all algorithms)
alg_non_zero_fi_list = [] # stores list of feature name lists that are non-zero for each algorithm
for each in fi_med_list: # each algorithm
temp_non_zero_list = []
for i in range(len(each)): # each feature
if each[i] > 0.0:
# add feature names with positive values (doesn't need to be normalized for this)
temp_non_zero_list.append(all_feature_list[i])
alg_non_zero_fi_list.append(temp_non_zero_list)
non_zero_union_features = alg_non_zero_fi_list[0] # grab first algorithm's list
# Identify union of features with non-zero averages over all algorithms
# (i.e. if any algorithm found a non-zero score it will be considered
# for inclusion in top feature visualizations)
for j in range(1, len(self.algorithms)):
non_zero_union_features = list(set(non_zero_union_features) | set(alg_non_zero_fi_list[j]))
non_zero_union_indexes = []
for i in non_zero_union_features:
non_zero_union_indexes.append(all_feature_list.index(i))
# return fi_df_list, fi_ave_list, fi_ave_norm_list, ave_metric_list,\
# all_feature_list, non_zero_union_features, non_zero_union_indexes
return fi_df_list, fi_med_list, fi_med_norm_list, med_metric_list, \
all_feature_list, non_zero_union_features, non_zero_union_indexes
[docs]
def select_for_composite_viz(self, non_zero_union_features,
non_zero_union_indexes,
ave_metric_list, fi_ave_norm_list):
"""
Identify list of top features over all algorithms to visualize
(note that best features to visualize are chosen using algorithm
performance weighting and normalization:
frac plays no useful role here only for viz). All features included
if there are fewer than 'top_model_features'. Top features are
determined by the sum of performance
(i.e. balanced accuracy) weighted feature importance over all algorithms.
"""
# Create performance weighted score sum dictionary for all features
score_sum_dict = {}
i = 0
for each in non_zero_union_features: # for each non-zero feature
for j in range(len(self.algorithms)): # for each algorithm
# grab target score from each algorithm
score = fi_ave_norm_list[j][non_zero_union_indexes[i]]
# multiply score by algorithm performance weight
weight = ave_metric_list[j]
if weight <= .5: # This is why this method is limited to balanced_accuracy and roc_auc
weight = 0
if not weight == 0:
weight = (weight - 0.5) / 0.5
score = score * weight
if not (each in score_sum_dict):
score_sum_dict[each] = score
else:
score_sum_dict[each] += score
i += 1
# Sort features by decreasing score
score_sum_dict_features = sorted(score_sum_dict, key=lambda x: score_sum_dict[x], reverse=True)
# Keep all features if there are fewer than specified top results
if len(non_zero_union_features) > self.top_features:
features_to_viz = score_sum_dict_features[0:self.top_features]
else:
features_to_viz = score_sum_dict_features
return features_to_viz # list of feature names to visualize in composite FI plots.
[docs]
def do_fi_boxplots(self, fi_df_list, fi_med_list, metric_ranking):
"""
Generate individual feature importance boxplot for each algorithm
"""
algorithm_counter = 0
for algorithm in self.algorithms: # each algorithms
# Make median feature importance score dictionary
score_dict = {}
counter = 0
for med_score in fi_med_list[algorithm_counter]: # each feature
# score_dict[self.original_headers[counter]] = med_score
score_dict[self.feature_headers[counter]] = med_score
counter += 1
# Sort features by decreasing score
score_dict_features = sorted(score_dict, key=lambda x: score_dict[x], reverse=True)
# Make list of feature names to visualize
# if len(self.original_headers) > self.top_features:
if len(self.feature_headers) > self.top_features:
features_to_viz = score_dict_features[0:self.top_features]
else:
features_to_viz = score_dict_features
# FI score dataframe for current algorithm
df = fi_df_list[algorithm_counter]
# Subset of dataframe (in ranked order) to visualize
viz_df = df[features_to_viz]
# Generate Boxplot
plt.figure(figsize=(15, 4))
viz_df.boxplot(rot=90)
plt.title(algorithm)
plt.ylabel('Feature Importance')
if metric_ranking == 'mean':
plt.xlabel('Features (Mean Ranking)')
elif metric_ranking == 'median':
plt.xlabel('Features (Median Ranking)')
else:
print("Error: metric_ranking selection not found (must be mean or median)")
plt.xticks(np.arange(1, len(features_to_viz) + 1), features_to_viz, rotation='vertical')
plt.savefig(self.full_path + '/model_evaluation/feature_importance/' + algorithm + '_boxplot',
bbox_inches="tight")
if self.show_plots:
plt.show()
else:
plt.close('all')
# plt.cla() # not required
# Identify and sort (decreasing) features with top median FI
algorithm_counter += 1
[docs]
def do_fi_histogram(self, fi_med_list, metric_ranking):
"""
Generate histogram showing distribution of median feature importance scores for each algorithm.
"""
algorithm_counter = 0
for algorithm in self.algorithms: # each algorithms
med_scores = fi_med_list[algorithm_counter]
# Plot a histogram of average feature importance
plt.hist(med_scores, bins=100)
if metric_ranking == 'mean':
plt.xlabel("Mean Feature Importance")
elif metric_ranking == 'median':
plt.xlabel("Median Feature Importance")
else:
print("Error: metric_ranking selection not found (must be mean or median)")
plt.ylabel("Frequency")
plt.title(str(algorithm))
plt.xticks(rotation='vertical')
plt.savefig(self.full_path
+ '/model_evaluation/'
'feature_importance/' + algorithm + '_histogram', bbox_inches="tight")
if self.show_plots:
plt.show()
else:
plt.close('all')
# plt.cla() # not required
[docs]
def composite_fi_plot(self, fi_list, all_feature_list_to_viz, fig_name,
y_label_text, metric_ranking, metric_weighting):
"""
Generate composite feature importance plot given list of feature names
and associated feature importance scores for each algorithm.
This is run for different transformations of the normalized feature importance scores.
"""
alg_colors = [COLORS[k] for k in self.algorithms]
algorithms, alg_colors, fi_list = (list(t) for t in zip(*sorted(zip(self.algorithms, alg_colors, fi_list), reverse=True)))
# Set basic plot properties
rc('font', weight='bold', size=16)
# The position of the bars on the x-axis
r = all_feature_list_to_viz # feature names
# Set width of bars
bar_width = 0.75
# Set figure dimensions
plt.figure(figsize=(24, 12))
# Plot first algorithm FI scores (lowest) bar
p1 = plt.bar(r, fi_list[0], color=alg_colors[0], edgecolor='white', width=bar_width)
# Automatically calculate space needed to plot next bar on top of the one before it
bottoms = [] # list of space used by previous
# algorithms for each feature (so next bar can be placed directly above it)
bottom = None
for i in range(len(algorithms) - 1):
for j in range(i + 1):
if j == 0:
bottom = np.array(fi_list[0]).astype('float64')
else:
bottom += np.array(fi_list[j]).astype('float64')
bottoms.append(bottom)
if not isinstance(bottoms, list):
bottoms = bottoms.tolist()
if len(self.algorithms) > 1:
# Plot subsequent feature bars for each subsequent algorithm
ps = [p1[0]]
for i in range(len(algorithms) - 1):
p = plt.bar(r, fi_list[i + 1], bottom=bottoms[i], color=alg_colors[i + 1], edgecolor='white',
width=bar_width)
ps.append(p[0])
lines = tuple(ps)
else:
ps = [p1[0]]
lines = tuple(ps)
# Specify axes info and legend
plt.xticks(np.arange(len(all_feature_list_to_viz)), all_feature_list_to_viz, rotation='vertical')
plt.xlabel("Features (ranked by sum of " + metric_ranking + " feature importance: weighted by " +
metric_weighting + " model " + self.metric_weight.lower() + ")", fontsize=20)
plt.ylabel(y_label_text, fontsize=20)
plt.legend(lines[::-1], algorithms[::-1],loc="upper left", bbox_to_anchor=(1.01,1)) #legend outside plot
# algorithms_list, lines_list = (list(t) for t in zip(*sorted(zip(algorithms, lines))))
# plt.legend(lines_list, algorithms_list, loc="upper right")
# Export and/or show plot
plt.savefig(self.full_path + '/model_evaluation/feature_importance/Compare_FI_' + fig_name + '.png',
bbox_inches='tight')
if self.show_plots:
plt.show()
else:
plt.close('all')
# plt.cla() # not required
[docs]
def get_fi_to_viz_sorted(self, features_to_viz, all_feature_list, fi_med_norm_list):
"""
Takes a list of top features names for visualisation, gets their
indexes. In every composite FI plot features are ordered the same way
they are selected for visualisation (i.e. normalized and performance
weighted). Because of this feature bars are only perfectly ordered in
descending order for the normalized + performance weighted composite plot.
"""
# Get original feature indexs for selected feature names
feature_index_to_viz = [] # indexes of top features
for i in features_to_viz:
feature_index_to_viz.append(all_feature_list.index(i))
# Create list of top feature importance values in original dataset feature order
top_fi_med_norm_list = [] # feature importance values of top features for each algorithm (list of lists)
for i in range(len(self.algorithms)):
temp_list = []
for j in feature_index_to_viz: # each top feature index
temp_list.append(fi_med_norm_list[i][j]) # add corresponding FI value
top_fi_med_norm_list.append(temp_list)
all_feature_list_to_viz = features_to_viz
return top_fi_med_norm_list, all_feature_list_to_viz
[docs]
@staticmethod
def frac_fi(top_fi_med_norm_list):
"""
Transforms feature scores so that they sum to 1 over all features
for a given algorithm. This way the normalized and fracionated composit bar plot
offers equal total bar area for every algorithm. The intuition
here is that if an algorithm gives the same FI scores for all top features it won't be
overly represented in the resulting plot (i.e. all features can
have the same maximum feature importance which might lead to the impression that an
algorithm is working better than it is.) Instead, that maximum
'bar-real-estate' has to be divided by the total number of features. Notably, this
transformation has the potential to alter total algorithm FI bar height ranking of features.
"""
frac_lists = []
for each in top_fi_med_norm_list: # each algorithm
frac_list = []
for i in range(len(each)): # each feature
if sum(each) == 0: # check that all feature scores are not zero to avoid zero division error
frac_list.append(0)
else:
frac_list.append((each[i] / (sum(each))))
frac_lists.append(frac_list)
return frac_lists
[docs]
@staticmethod
def weight_fi(med_metric_list, top_fi_med_norm_list):
"""
Weights the feature importance scores by algorithm performance
(intuitive because when interpreting feature importances we want
to place more weight on better performing algorithms)
"""
# Prepare weights
weights = []
# replace all balanced accuraces <=.5 with 0 (i.e. these are no better than random chance)
for i in range(len(med_metric_list)):
if med_metric_list[i] <= .5:
med_metric_list[i] = 0
# normalize balanced accuracies
for i in range(len(med_metric_list)):
if med_metric_list[i] == 0:
weights.append(0)
else:
weights.append((med_metric_list[i] - 0.5) / 0.5)
# Weight normalized feature importances
weighted_lists = []
for i in range(len(top_fi_med_norm_list)): # each algorithm
weight_list = np.multiply(weights[i], top_fi_med_norm_list[i]).tolist()
weighted_lists.append(weight_list)
return weighted_lists, weights
[docs]
@staticmethod
def weight_frac_fi(frac_lists, weights):
""" Weight normalized and fractionated feature importances. """
weighted_frac_lists = []
for i in range(len(frac_lists)):
weight_list = np.multiply(weights[i], frac_lists[i]).tolist()
weighted_frac_lists.append(weight_list)
return weighted_frac_lists
[docs]
def parse_runtime(self):
"""
Loads runtime summaries from entire pipeline and parses them into a single summary file.
"""
dict_obj = dict()
dict_obj['preprocessing'] = 0
for file_path in glob.glob(self.full_path + '/runtime/*.txt'):
file_path = str(Path(file_path).as_posix())
f = open(file_path, 'r')
val = float(f.readline())
ref = file_path.split('/')[-1].split('_')[1].split('.')[0]
if ref in self.abbrev:
ref = self.abbrev[ref]
if not (ref in dict_obj):
if 'preprocessing' in ref:
dict_obj['preprocessing'] += val
dict_obj[ref] = val
else:
dict_obj[ref] += val
with open(self.full_path + '/runtimes.csv', mode='w', newline="") as file:
writer = csv.writer(file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["Pipeline Component", "Phase", "Time (sec)"])
writer.writerow(["Exploratory Analysis", 1, dict_obj['exploratory']])
writer.writerow(["Scale and Impute", 2, dict_obj['preprocessing']])
try:
writer.writerow(["Mutual Information (Feature Importance)", 3, dict_obj['mutual']])
except KeyError:
pass
try:
writer.writerow(["MultiSURF (Feature Importance)", 3, dict_obj['multisurf']])
except KeyError:
pass
writer.writerow(["Feature Selection", 4, dict_obj['featureselection']])
for algorithm in self.algorithms: # Report runtimes for each algorithm
writer.writerow(([algorithm + "(Modeling)", 5, dict_obj[self.abbrev[algorithm]]]))
writer.writerow(["Stats Summary", 6, dict_obj['Stats']])
[docs]
def save_runtime(self):
"""
Save phase runtime
"""
runtime_file = open(self.full_path + '/runtime/runtime_Stats.txt', 'w')
runtime_file.write(str(time.time() - self.job_start_time))
runtime_file.close()