import time
from typing import TypeVar, List, Generator
try:
import dask
from distributed import as_completed, Client
except ImportError:
pass
from jmetal.algorithm.singleobjective.genetic_algorithm import GeneticAlgorithm
from jmetal.config import store
from jmetal.core.algorithm import DynamicAlgorithm, Algorithm
from jmetal.core.operator import Mutation, Crossover, Selection
from jmetal.core.problem import Problem, DynamicProblem
from jmetal.operator import BinaryTournamentSelection
from jmetal.util.density_estimator import CrowdingDistance
from jmetal.util.evaluator import Evaluator
from jmetal.util.ranking import FastNonDominatedRanking
from jmetal.util.replacement import RankingAndDensityEstimatorReplacement, RemovalPolicyType
from jmetal.util.comparator import DominanceComparator, Comparator, MultiComparator
from jmetal.util.termination_criterion import TerminationCriterion
S = TypeVar('S')
R = TypeVar('R')
"""
.. module:: NSGA-II
:platform: Unix, Windows
:synopsis: NSGA-II (Non-dominance Sorting Genetic Algorithm II) implementation.
.. moduleauthor:: Antonio J. Nebro <antonio@lcc.uma.es>, Antonio Benítez-Hidalgo <antonio.b@uma.es>
"""
[docs]class NSGAII(GeneticAlgorithm[S, R]):
def __init__(self,
problem: Problem,
population_size: int,
offspring_population_size: int,
mutation: Mutation,
crossover: Crossover,
selection: Selection = BinaryTournamentSelection(
MultiComparator([FastNonDominatedRanking.get_comparator(),
CrowdingDistance.get_comparator()])),
termination_criterion: TerminationCriterion = store.default_termination_criteria,
population_generator: Generator = store.default_generator,
population_evaluator: Evaluator = store.default_evaluator,
dominance_comparator: Comparator = store.default_comparator):
"""
NSGA-II implementation as described in
* K. Deb, A. Pratap, S. Agarwal and T. Meyarivan, "A fast and elitist
multiobjective genetic algorithm: NSGA-II," in IEEE Transactions on Evolutionary Computation,
vol. 6, no. 2, pp. 182-197, Apr 2002. doi: 10.1109/4235.996017
NSGA-II is a genetic algorithm (GA), i.e. it belongs to the evolutionary algorithms (EAs)
family. The implementation of NSGA-II provided in jMetalPy follows the evolutionary
algorithm template described in the algorithm module (:py:mod:`jmetal.core.algorithm`).
.. note:: A steady-state version of this algorithm can be run by setting the offspring size to 1.
:param problem: The problem to solve.
:param population_size: Size of the population.
:param mutation: Mutation operator (see :py:mod:`jmetal.operator.mutation`).
:param crossover: Crossover operator (see :py:mod:`jmetal.operator.crossover`).
:param selection: Selection operator (see :py:mod:`jmetal.operator.selection`).
"""
super(NSGAII, self).__init__(
problem=problem,
population_size=population_size,
offspring_population_size=offspring_population_size,
mutation=mutation,
crossover=crossover,
selection=selection,
termination_criterion=termination_criterion,
population_evaluator=population_evaluator,
population_generator=population_generator
)
self.dominance_comparator = dominance_comparator
[docs] def replacement(self, population: List[S], offspring_population: List[S]) -> List[List[S]]:
""" This method joins the current and offspring populations to produce the population of the next generation
by applying the ranking and crowding distance selection.
:param population: Parent population.
:param offspring_population: Offspring population.
:return: New population after ranking and crowding distance selection is applied.
"""
ranking = FastNonDominatedRanking(self.dominance_comparator)
density_estimator = CrowdingDistance()
r = RankingAndDensityEstimatorReplacement(ranking, density_estimator, RemovalPolicyType.ONE_SHOT)
solutions = r.replace(population, offspring_population)
return solutions
[docs] def get_result(self) -> R:
return self.solutions
[docs] def get_name(self) -> str:
return 'NSGAII'
[docs]class DynamicNSGAII(NSGAII[S, R], DynamicAlgorithm):
def __init__(self,
problem: DynamicProblem[S],
population_size: int,
offspring_population_size: int,
mutation: Mutation,
crossover: Crossover,
selection: Selection = BinaryTournamentSelection(
MultiComparator([FastNonDominatedRanking.get_comparator(),
CrowdingDistance.get_comparator()])),
termination_criterion: TerminationCriterion = store.default_termination_criteria,
population_generator: Generator = store.default_generator,
population_evaluator: Evaluator = store.default_evaluator,
dominance_comparator: DominanceComparator = DominanceComparator()):
super(DynamicNSGAII, self).__init__(
problem=problem,
population_size=population_size,
offspring_population_size=offspring_population_size,
mutation=mutation,
crossover=crossover,
selection=selection,
population_evaluator=population_evaluator,
population_generator=population_generator,
termination_criterion=termination_criterion,
dominance_comparator=dominance_comparator)
self.completed_iterations = 0
self.start_computing_time = 0
self.total_computing_time = 0
[docs] def restart(self):
self.solutions = self.evaluate(self.solutions)
[docs] def update_progress(self):
if self.problem.the_problem_has_changed():
self.restart()
self.problem.clear_changed()
observable_data = self.get_observable_data()
self.observable.notify_all(**observable_data)
self.evaluations += self.offspring_population_size
[docs] def stopping_condition_is_met(self):
if self.termination_criterion.is_met:
observable_data = self.get_observable_data()
observable_data['TERMINATION_CRITERIA_IS_MET'] = True
self.observable.notify_all(**observable_data)
self.restart()
self.init_progress()
self.completed_iterations += 1
[docs]class DistributedNSGAII(Algorithm[S, R]):
def __init__(self,
problem: Problem,
population_size: int,
mutation: Mutation,
crossover: Crossover,
number_of_cores: int,
client,
selection: Selection = BinaryTournamentSelection(
MultiComparator([FastNonDominatedRanking.get_comparator(),
CrowdingDistance.get_comparator()])),
termination_criterion: TerminationCriterion = store.default_termination_criteria,
dominance_comparator: DominanceComparator = DominanceComparator()):
super(DistributedNSGAII, self).__init__()
self.problem = problem
self.population_size = population_size
self.mutation_operator = mutation
self.crossover_operator = crossover
self.selection_operator = selection
self.dominance_comparator = dominance_comparator
self.termination_criterion = termination_criterion
self.observable.register(termination_criterion)
self.number_of_cores = number_of_cores
self.client = client
[docs] def create_initial_solutions(self) -> List[S]:
return [self.problem.create_solution() for _ in range(self.number_of_cores)]
[docs] def evaluate(self, solutions: List[S]) -> List[S]:
return self.client.map(self.problem.evaluate, solutions)
[docs] def stopping_condition_is_met(self) -> bool:
return self.termination_criterion.is_met
[docs] def get_observable_data(self) -> dict:
ctime = time.time() - self.start_computing_time
return {'PROBLEM': self.problem,
'EVALUATIONS': self.evaluations,
'SOLUTIONS': self.get_result(),
'COMPUTING_TIME': ctime}
[docs] def init_progress(self) -> None:
self.evaluations = self.number_of_cores
observable_data = self.get_observable_data()
self.observable.notify_all(**observable_data)
[docs] def step(self) -> None:
pass
[docs] def update_progress(self):
observable_data = self.get_observable_data()
self.observable.notify_all(**observable_data)
[docs] def run(self):
""" Execute the algorithm. """
self.start_computing_time = time.time()
create_solution = dask.delayed(self.problem.create_solution)
evaluate_solution = dask.delayed(self.problem.evaluate)
task_pool = as_completed([], with_results=True)
for _ in range(self.number_of_cores):
new_solution = create_solution()
new_evaluated_solution = evaluate_solution(new_solution)
future = self.client.compute(new_evaluated_solution)
task_pool.add(future)
batches = task_pool.batches()
auxiliar_population = []
while len(auxiliar_population) < self.population_size:
batch = next(batches)
for _, received_solution in batch:
auxiliar_population.append(received_solution)
if len(auxiliar_population) < self.population_size:
break
# submit as many new tasks as we collected
for _ in batch:
new_solution = create_solution()
new_evaluated_solution = evaluate_solution(new_solution)
future = self.client.compute(new_evaluated_solution)
task_pool.add(future)
self.init_progress()
# perform an algorithm step to create a new solution to be evaluated
while not self.stopping_condition_is_met():
batch = next(batches)
for _, received_solution in batch:
offspring_population = [received_solution]
# replacement
ranking = FastNonDominatedRanking(self.dominance_comparator)
density_estimator = CrowdingDistance()
r = RankingAndDensityEstimatorReplacement(ranking, density_estimator, RemovalPolicyType.ONE_SHOT)
auxiliar_population = r.replace(auxiliar_population, offspring_population)
# selection
mating_population = []
for _ in range(2):
solution = self.selection_operator.execute(auxiliar_population)
mating_population.append(solution)
# Reproduction and evaluation
new_task = self.client.submit(reproduction, mating_population, self.problem,
self.crossover_operator, self.mutation_operator)
task_pool.add(new_task)
# update progress
self.evaluations += 1
self.solutions = auxiliar_population
self.update_progress()
if self.stopping_condition_is_met():
break
self.total_computing_time = time.time() - self.start_computing_time
# at this point, computation is done
for future, _ in task_pool:
future.cancel()
[docs] def get_result(self) -> R:
return self.solutions
[docs] def get_name(self) -> str:
return 'dNSGA-II'
def reproduction(mating_population: List[S], problem, crossover_operator, mutation_operator) -> S:
offspring_pool = []
for parents in zip(*[iter(mating_population)] * 2):
offspring_pool.append(crossover_operator.execute(parents))
offspring_population = []
for pair in offspring_pool:
for solution in pair:
mutated_solution = mutation_operator.execute(solution)
offspring_population.append(mutated_solution)
return problem.evaluate(offspring_population[0])