import io
import logging
import os
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from statistics import median
from typing import List
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.stats import mannwhitneyu
from jmetal.core.algorithm import Algorithm
from jmetal.core.quality_indicator import QualityIndicator
from jmetal.util.solution import print_function_values_to_file, print_variables_to_file, read_solutions
LOGGER = logging.getLogger('jmetal')
"""
.. module:: laboratory
:platform: Unix, Windows
:synopsis: Run experiments. WIP!
.. moduleauthor:: Antonio Benítez-Hidalgo <antonio.b@uma.es>
"""
[docs]class Job:
def __init__(self, algorithm: Algorithm, algorithm_tag: str, problem_tag: str, run: int):
self.algorithm = algorithm
self.algorithm_tag = algorithm_tag
self.problem_tag = problem_tag
self.run_tag = run
[docs] def execute(self, output_path: str = ''):
self.algorithm.run()
if output_path:
file_name = os.path.join(output_path, 'FUN.{}.tsv'.format(self.run_tag))
print_function_values_to_file(self.algorithm.get_result(), filename=file_name)
file_name = os.path.join(output_path, 'VAR.{}.tsv'.format(self.run_tag))
print_variables_to_file(self.algorithm.get_result(), filename=file_name)
file_name = os.path.join(output_path, 'TIME.{}'.format(self.run_tag))
with open(file_name, 'w+') as of:
of.write(str(self.algorithm.total_computing_time))
[docs]class Experiment:
def __init__(self, output_dir: str, jobs: List[Job], m_workers: int = 6):
""" Run an experiment to execute a list of jobs.
:param output_dir: Base directory where each job will save its results.
:param jobs: List of Jobs (from :py:mod:`jmetal.util.laboratory)`) to be executed.
:param m_workers: Maximum number of workers to execute the Jobs in parallel.
"""
self.jobs = jobs
self.m_workers = m_workers
self.output_dir = output_dir
[docs] def run(self) -> None:
with ProcessPoolExecutor(max_workers=self.m_workers) as executor:
for job in self.jobs:
output_path = os.path.join(self.output_dir, job.algorithm_tag, job.problem_tag)
executor.submit(job.execute(output_path))
[docs]def generate_summary_from_experiment(input_dir: str, quality_indicators: List[QualityIndicator],
reference_fronts: str = ''):
""" Compute a list of quality indicators. The input data directory *must* met the following structure (this is generated
automatically by the Experiment class):
* <base_dir>
* algorithm_a
* problem_a
* FUN.0.tsv
* FUN.1.tsv
* VAR.0.tsv
* VAR.1.tsv
* ...
:param input_dir: Directory where all the input data is found (function values and variables).
:param reference_fronts: Directory where reference fronts are found.
:param quality_indicators: List of quality indicators to compute.
:return: None.
"""
if not quality_indicators:
quality_indicators = []
with open('QualityIndicatorSummary.csv', 'w+') as of:
of.write('Algorithm,Problem,ExecutionId,IndicatorName,IndicatorValue\n')
for dirname, _, filenames in os.walk(input_dir):
for filename in filenames:
try:
# Linux filesystem
algorithm, problem = dirname.split('/')[-2:]
except ValueError:
# Windows filesystem
algorithm, problem = dirname.split('\\')[-2:]
if 'TIME' in filename:
run_tag = [s for s in filename.split('.') if s.isdigit()].pop()
with open(os.path.join(dirname, filename), 'r') as content_file:
content = content_file.read()
with open('QualityIndicatorSummary.csv', 'a+') as of:
of.write(','.join([algorithm, problem, run_tag, 'Time', str(content)]))
of.write('\n')
if 'FUN' in filename:
solutions = read_solutions(os.path.join(dirname, filename))
run_tag = [s for s in filename.split('.') if s.isdigit()].pop()
for indicator in quality_indicators:
reference_front_file = os.path.join(reference_fronts, problem + '.pf')
# Add reference front if any
if hasattr(indicator, 'reference_front'):
if Path(reference_front_file).is_file():
indicator.reference_front = read_solutions(reference_front_file)
else:
LOGGER.warning('Reference front not found at', reference_front_file)
result = indicator.compute(solutions)
# Save quality indicator value to file
with open('QualityIndicatorSummary.csv', 'a+') as of:
of.write(','.join([algorithm, problem, run_tag, indicator.get_name(), str(result)]))
of.write('\n')
def generate_boxplot(filename: str, output_dir: str = 'boxplot'):
""" Generate boxplot diagrams.
:param filename: Input filename (summary).
:param output_dir: Output path.
"""
df = pd.read_csv(filename, skipinitialspace=True)
if len(set(df.columns.tolist())) != 5:
raise Exception('Wrong number of columns')
if Path(output_dir).is_dir():
LOGGER.warning('Directory {} exists. Removing contents.'.format(output_dir))
for file in os.listdir(output_dir):
os.remove('{0}/{1}'.format(output_dir, file))
else:
LOGGER.warning('Directory {} does not exist. Creating it.'.format(output_dir))
Path(output_dir).mkdir(parents=True)
algorithms = pd.unique(df['Algorithm'])
problems = pd.unique(df['Problem'])
indicators = pd.unique(df['IndicatorName'])
# We consider the quality indicator indicator_name
for indicator_name in indicators:
data = df[df['IndicatorName'] == indicator_name]
for pr in problems:
data_to_plot = []
for alg in algorithms:
data_to_plot.append(data['IndicatorValue'][np.logical_and(
data['Algorithm'] == alg, data['Problem'] == pr)])
# Create a figure instance
fig = plt.figure(1, figsize=(9, 6))
plt.suptitle(pr, y=0.95, fontsize=18)
ax = fig.add_subplot(111)
ax.boxplot(data_to_plot)
ax.set_xticklabels(algorithms)
ax.tick_params(labelsize=20)
plt.savefig(os.path.join(output_dir, 'boxplot-{}-{}.png'.format(pr, indicator_name)), bbox_inches='tight')
plt.savefig(os.path.join(output_dir, 'boxplot-{}-{}.eps'.format(pr, indicator_name)), bbox_inches='tight')
plt.close(fig)
def generate_latex_tables(filename: str, output_dir: str = 'latex/statistical'):
""" Computes a number of statistical values (mean, median, standard deviation, interquartile range).
:param filename: Input filename (summary).
:param output_dir: Output path.
"""
df = pd.read_csv(filename, skipinitialspace=True)
if len(set(df.columns.tolist())) != 5:
raise Exception('Wrong number of columns')
if Path(output_dir).is_dir():
LOGGER.warning('Directory {} exists. Removing contents.'.format(output_dir))
for file in os.listdir(output_dir):
os.remove('{0}/{1}'.format(output_dir, file))
else:
LOGGER.warning('Directory {} does not exist. Creating it.'.format(output_dir))
Path(output_dir).mkdir(parents=True)
# Generate median & iqr tables
median, iqr = pd.DataFrame(), pd.DataFrame()
mean, std = pd.DataFrame(), pd.DataFrame()
for algorithm_name, subset in df.groupby('Algorithm', sort=False):
subset = subset.drop('Algorithm', axis=1)
subset = subset.rename(columns={'IndicatorValue': algorithm_name})
subset = subset.set_index(['Problem', 'IndicatorName', 'ExecutionId'])
# Compute Median and Interquartile range
median_ = subset.groupby(level=[0, 1]).median()
median = pd.concat([median, median_], axis=1)
iqr_ = subset.groupby(level=[0, 1]).quantile(0.75) - subset.groupby(level=[0, 1]).quantile(0.25)
iqr = pd.concat([iqr, iqr_], axis=1)
# Compute Mean and Standard deviation
mean_ = subset.groupby(level=[0, 1]).mean()
mean = pd.concat([mean, mean_], axis=1)
std_ = subset.groupby(level=[0, 1]).std()
std = pd.concat([std, std_], axis=1)
# Generate mean & std tables
for indicator_name, subset in std.groupby('IndicatorName', sort=False):
subset = median.groupby('IndicatorName', sort=False).get_group(indicator_name)
subset.index = subset.index.droplevel(1)
subset.to_csv(os.path.join(output_dir, 'Median-{}.csv'.format(indicator_name)), sep='\t', encoding='utf-8')
subset = iqr.groupby('IndicatorName', sort=False).get_group(indicator_name)
subset.index = subset.index.droplevel(1)
subset.to_csv(os.path.join(output_dir, 'IQR-{}.csv'.format(indicator_name)), sep='\t', encoding='utf-8')
subset = mean.groupby('IndicatorName', sort=False).get_group(indicator_name)
subset.index = subset.index.droplevel(1)
subset.to_csv(os.path.join(output_dir, 'Mean-{}.csv'.format(indicator_name)), sep='\t', encoding='utf-8')
subset = std.groupby('IndicatorName', sort=False).get_group(indicator_name)
subset.index = subset.index.droplevel(1)
subset.to_csv(os.path.join(output_dir, 'Std-{}.csv'.format(indicator_name)), sep='\t', encoding='utf-8')
# Generate LaTeX tables
for indicator_name in df.groupby('IndicatorName', sort=False).groups.keys():
# Median & IQR
md = median.groupby('IndicatorName', sort=False).get_group(indicator_name)
md.index = md.index.droplevel(1)
i = iqr.groupby('IndicatorName', sort=False).get_group(indicator_name)
i.index = i.index.droplevel(1)
with open(os.path.join(output_dir, 'MedianIQR-{}.tex'.format(indicator_name)), 'w') as latex:
latex.write(
__averages_to_latex(
md,
i,
caption='Median and Interquartile Range of the {} quality indicator.'.format(indicator_name),
minimization=check_minimization(indicator_name),
label='table:{}'.format(indicator_name)
)
)
# Mean & Std
mn = mean.groupby('IndicatorName', sort=False).get_group(indicator_name)
mn.index = mn.index.droplevel(1)
s = std.groupby('IndicatorName', sort=False).get_group(indicator_name)
s.index = s.index.droplevel(1)
with open(os.path.join(output_dir, 'MeanStd-{}.tex'.format(indicator_name)), 'w') as latex:
latex.write(
__averages_to_latex(
mn,
s,
caption='Mean and Standard Deviation of the {} quality indicator.'.format(indicator_name),
minimization=check_minimization(indicator_name),
label='table:{}'.format(indicator_name)
)
)
def compute_wilcoxon(filename: str, output_dir: str = 'latex/wilcoxon'):
"""
:param filename: Input filename (summary).
:param output_dir: Output path.
"""
df = pd.read_csv(filename, skipinitialspace=True)
if len(set(df.columns.tolist())) != 5:
raise Exception('Wrong number of columns')
if Path(output_dir).is_dir():
LOGGER.warning('Directory {} exists. Removing contents.'.format(output_dir))
for file in os.listdir(output_dir):
os.remove('{0}/{1}'.format(output_dir, file))
else:
LOGGER.warning('Directory {} does not exist. Creating it.'.format(output_dir))
Path(output_dir).mkdir(parents=True)
algorithms = pd.unique(df['Algorithm'])
problems = pd.unique(df['Problem'])
indicators = pd.unique(df['IndicatorName'])
table = pd.DataFrame(index=algorithms[0:-1], columns=algorithms[1:])
for indicator_name in indicators:
for i, row_algorithm in enumerate(algorithms[0:-1]):
wilcoxon = []
for j, col_algorithm in enumerate(algorithms[1:]):
line = []
if i <= j:
for problem in problems:
df1 = df[(df["Algorithm"] == row_algorithm) & (df["Problem"] == problem) & (
df["IndicatorName"] == indicator_name)]
df2 = df[(df["Algorithm"] == col_algorithm) & (df["Problem"] == problem) & (
df["IndicatorName"] == indicator_name)]
data1 = df1["IndicatorValue"]
data2 = df2["IndicatorValue"]
median1 = median(data1)
median2 = median(data2)
stat, p = mannwhitneyu(data1, data2)
if p <= 0.05:
if check_minimization(indicator_name):
if median1 <= median2:
line.append('+')
else:
line.append('o')
else:
if median1 >= median2:
line.append('+')
else:
line.append('o')
else:
line.append('-')
wilcoxon.append(''.join(line))
if len(wilcoxon) < len(algorithms): wilcoxon = [''] * (len(algorithms) - len(wilcoxon) - 1) + wilcoxon
table.loc[row_algorithm] = wilcoxon
table.to_csv(os.path.join(output_dir, 'Wilcoxon-{}.csv'.format(indicator_name)), sep='\t', encoding='utf-8')
with open(os.path.join(output_dir, 'Wilcoxon-{}.tex'.format(indicator_name)), 'w') as latex:
latex.write(
__wilcoxon_to_latex(
table,
caption='Wilcoxon values of the {} quality indicator ({}).'.format(indicator_name,
', '.join(problems)),
label='table:{}'.format(indicator_name)
)
)
def compute_mean_indicator(filename: str, indicator_name: str):
""" Compute the mean values of an indicator.
:param filename:
:param indicator_name: Quality indicator name.
"""
df = pd.read_csv(filename, skipinitialspace=True)
if len(set(df.columns.tolist())) != 5:
raise Exception('Wrong number of columns')
algorithms = pd.unique(df['Algorithm'])
problems = pd.unique(df['Problem'])
# We consider the quality indicator indicator_name
data = df[df['IndicatorName'] == indicator_name]
# Compute for each pair algorithm/problem the average of IndicatorValue
average_values = np.zeros((problems.size, algorithms.size))
j = 0
for alg in algorithms:
i = 0
for pr in problems:
average_values[i, j] = data['IndicatorValue'][np.logical_and(
data['Algorithm'] == alg, data['Problem'] == pr)].mean()
i += 1
j += 1
# Generate dataFrame from average values and order columns by name
df = pd.DataFrame(data=average_values, index=problems, columns=algorithms)
df = df.reindex(df.columns, axis=1)
return df
def __averages_to_latex(central_tendency: pd.DataFrame, dispersion: pd.DataFrame,
caption: str, label: str, minimization=True, alignment: str = 'c'):
""" Convert a pandas DataFrame to a LaTeX tabular. Prints labels in bold and does use math mode.
:param caption: LaTeX table caption.
:param label: LaTeX table label.
:param minimization: If indicator is minimization, highlight the best values of mean/median; else, the lowest.
"""
num_columns, num_rows = central_tendency.shape[1], central_tendency.shape[0]
output = io.StringIO()
col_format = '{}|{}'.format(alignment, alignment * num_columns)
column_labels = ['\\textbf{{{0}}}'.format(label.replace('_', '\\_')) for label in central_tendency.columns]
# Write header
output.write('\\documentclass{article}\n')
output.write('\\usepackage[utf8]{inputenc}\n')
output.write('\\usepackage{tabularx}\n')
output.write('\\usepackage{colortbl}\n')
output.write('\\usepackage[table*]{xcolor}\n')
output.write('\\xdefinecolor{gray95}{gray}{0.65}\n')
output.write('\\xdefinecolor{gray25}{gray}{0.8}\n')
output.write('\\title{Median and IQR}\n')
output.write('\\author{}\n')
output.write('\\begin{document}\n')
output.write('\\maketitle\n')
output.write('\\section{Table}\n')
output.write('\\begin{table}[!htp]\n')
output.write(' \\caption{{{}}}\n'.format(caption))
output.write(' \\label{{{}}}\n'.format(label))
output.write(' \\centering\n')
output.write(' \\begin{scriptsize}\n')
output.write(' \\begin{tabular}{%s}\n' % col_format)
output.write(' & {} \\\\\\hline\n'.format(' & '.join(column_labels)))
# Write data lines
for i in range(num_rows):
central_values = [v for v in central_tendency.ix[i]]
dispersion_values = [v for v in dispersion.ix[i]]
# Sort mean/median values (the lower the better if minimization)
# Note that mean/median values could be the same: in that case, sort by Std/IQR (the lower the better)
sorted_values = sorted(
zip(central_values, dispersion_values, [i for i in range(len(central_values))]), key=lambda v: (v[0], -v[1])
)
if minimization:
second_best, best = sorted_values[0][2], sorted_values[1][2]
else:
second_best, best = sorted_values[-1][2], sorted_values[-2][2]
# Compose cell
values = ['{:.2e}_{{{:.2e}}}'.format(central_values[i], dispersion_values[i]) for i in
range(len(central_values))]
# Highlight values
values[best] = '\\cellcolor{gray25} ' + values[best]
values[second_best] = '\\cellcolor{gray95} ' + values[second_best]
output.write(' \\textbf{{{0}}} & ${1}$ \\\\\n'.format(
central_tendency.index[i], ' $ & $ '.join([str(val) for val in values]))
)
# Write footer
output.write(' \\end{tabular}\n')
output.write(' \\end{scriptsize}\n')
output.write('\\end{table}\n')
output.write('\\end{document}')
return output.getvalue()
def __wilcoxon_to_latex(df: pd.DataFrame, caption: str, label: str, minimization=True, alignment: str = 'c'):
""" Convert a pandas DataFrame to a LaTeX tabular. Prints labels in bold and does use math mode.
:param df: Pandas dataframe.
:param caption: LaTeX table caption.
:param label: LaTeX table label.
:param minimization: If indicator is minimization, highlight the best values of mean/median; else, the lowest.
"""
num_columns, num_rows = df.shape[1], df.shape[0]
output = io.StringIO()
col_format = '{}|{}'.format(alignment, alignment * num_columns)
column_labels = ['\\textbf{{{0}}}'.format(label.replace('_', '\\_')) for label in df.columns]
# Write header
output.write('\\documentclass{article}\n')
output.write('\\usepackage[utf8]{inputenc}\n')
output.write('\\usepackage{tabularx}\n')
output.write('\\usepackage{amssymb}\n')
output.write('\\usepackage{amsmath}\n')
output.write('\\title{Wilcoxon - Mann-Whitney rank sum test}\n')
output.write('\\author{}\n')
output.write('\\begin{document}\n')
output.write('\\maketitle\n')
output.write('\\section{Table}\n')
output.write('\\begin{table}[!htp]\n')
output.write(' \\caption{{{}}}\n'.format(caption))
output.write(' \\label{{{}}}\n'.format(label))
output.write(' \\centering\n')
output.write(' \\begin{scriptsize}\n')
output.write(' \\begin{tabular}{%s}\n' % col_format)
output.write(' & {} \\\\\\hline\n'.format(' & '.join(column_labels)))
symbolo = '\\triangledown\ '
symbolplus = '\\blacktriangle\ '
if not minimization:
symbolo, symbolplus = symbolplus, symbolo
# Write data lines
for i in range(num_rows):
values = [val.replace('-', '\\text{--}\ ').replace('o', symbolo).replace('+', symbolplus) for val in df.ix[i]]
output.write(' \\textbf{{{0}}} & ${1}$ \\\\\n'.format(
df.index[i], ' $ & $ '.join([str(val) for val in values]))
)
# Write footer
output.write(' \\end{tabular}\n')
output.write(' \\end{scriptsize}\n')
output.write('\\end{table}\n')
output.write('\\end{document}')
return output.getvalue()
def check_minimization(indicator) -> bool:
if indicator == 'HV':
return False
else:
return True