import time
from typing import Generator, List, TypeVar
try:
import dask
from distributed import Client, as_completed
except ImportError:
pass
from jmetal.algorithm.singleobjective.genetic_algorithm import GeneticAlgorithm
from jmetal.config import store
from jmetal.core.algorithm import Algorithm, DynamicAlgorithm
from jmetal.core.operator import Crossover, Mutation, Selection
from jmetal.core.problem import DynamicProblem, Problem
from jmetal.operator.selection import BinaryTournamentSelection
from jmetal.util.comparator import Comparator, DominanceComparator, MultiComparator
from jmetal.util.density_estimator import CrowdingDistance
from jmetal.util.evaluator import Evaluator
from jmetal.util.ranking import FastNonDominatedRanking
from jmetal.util.replacement import (
RankingAndDensityEstimatorReplacement,
RemovalPolicyType,
)
from jmetal.util.termination_criterion import TerminationCriterion
S = TypeVar("S")
R = TypeVar("R")
"""
.. module:: NSGA-II
:platform: Unix, Windows
:synopsis: NSGA-II (Non-dominance Sorting Genetic Algorithm II) implementation.
.. moduleauthor:: Antonio J. Nebro <antonio@lcc.uma.es>, Antonio Benítez-Hidalgo <antonio.b@uma.es>
"""
[docs]
class NSGAII(GeneticAlgorithm[S, R]):
def __init__(
self,
problem: Problem,
population_size: int,
offspring_population_size: int,
mutation: Mutation,
crossover: Crossover,
selection: Selection = BinaryTournamentSelection(
MultiComparator([FastNonDominatedRanking.get_comparator(), CrowdingDistance.get_comparator()])
),
termination_criterion: TerminationCriterion = store.default_termination_criteria,
population_generator: Generator = store.default_generator,
population_evaluator: Evaluator = store.default_evaluator,
dominance_comparator: Comparator = store.default_comparator,
):
"""
NSGA-II implementation as described in
* K. Deb, A. Pratap, S. Agarwal and T. Meyarivan, "A fast and elitist
multiobjective genetic algorithm: NSGA-II," in IEEE Transactions on Evolutionary Computation,
vol. 6, no. 2, pp. 182-197, Apr 2002. doi: 10.1109/4235.996017
NSGA-II is a genetic algorithm (GA), i.e. it belongs to the evolutionary algorithms (EAs)
family. The implementation of NSGA-II provided in jMetalPy follows the evolutionary
algorithm template described in the algorithm module (:py:mod:`jmetal.core.algorithm`).
.. note:: A steady-state version of this algorithm can be run by setting the offspring size to 1.
:param problem: The problem to solve.
:param population_size: Size of the population.
:param mutation: Mutation operator (see :py:mod:`jmetal.operator.mutation`).
:param crossover: Crossover operator (see :py:mod:`jmetal.operator.crossover`).
"""
super(NSGAII, self).__init__(
problem=problem,
population_size=population_size,
offspring_population_size=offspring_population_size,
mutation=mutation,
crossover=crossover,
selection=selection,
termination_criterion=termination_criterion,
population_evaluator=population_evaluator,
population_generator=population_generator,
)
self.dominance_comparator = dominance_comparator
[docs]
def replacement(self, population: List[S], offspring_population: List[S]) -> List[List[S]]:
"""This method joins the current and offspring populations to produce the population of the next generation
by applying the ranking and crowding distance selection.
:param population: Parent population.
:param offspring_population: Offspring population.
:return: New population after ranking and crowding distance selection is applied.
"""
ranking = FastNonDominatedRanking(self.dominance_comparator)
density_estimator = CrowdingDistance()
r = RankingAndDensityEstimatorReplacement(ranking, density_estimator, RemovalPolicyType.ONE_SHOT)
solutions = r.replace(population, offspring_population)
return solutions
[docs]
def result(self) -> R:
return self.solutions
[docs]
def get_name(self) -> str:
return "NSGAII"
[docs]
class DynamicNSGAII(NSGAII[S, R], DynamicAlgorithm):
def __init__(
self,
problem: DynamicProblem[S],
population_size: int,
offspring_population_size: int,
mutation: Mutation,
crossover: Crossover,
selection: Selection = BinaryTournamentSelection(
MultiComparator([FastNonDominatedRanking.get_comparator(), CrowdingDistance.get_comparator()])
),
termination_criterion: TerminationCriterion = store.default_termination_criteria,
population_generator: Generator = store.default_generator,
population_evaluator: Evaluator = store.default_evaluator,
dominance_comparator: DominanceComparator = DominanceComparator(),
):
super(DynamicNSGAII, self).__init__(
problem=problem,
population_size=population_size,
offspring_population_size=offspring_population_size,
mutation=mutation,
crossover=crossover,
selection=selection,
population_evaluator=population_evaluator,
population_generator=population_generator,
termination_criterion=termination_criterion,
dominance_comparator=dominance_comparator,
)
self.completed_iterations = 0
self.start_computing_time = 0
self.total_computing_time = 0
[docs]
def restart(self):
self.solutions = self.evaluate(self.solutions)
[docs]
def update_progress(self):
if self.problem.the_problem_has_changed():
self.restart()
self.problem.clear_changed()
observable_data = self.observable_data()
self.observable.notify_all(**observable_data)
self.evaluations += self.offspring_population_size
[docs]
def stopping_condition_is_met(self):
if self.termination_criterion.is_met:
observable_data = self.observable_data()
observable_data["TERMINATION_CRITERIA_IS_MET"] = True
self.observable.notify_all(**observable_data)
self.restart()
self.init_progress()
self.completed_iterations += 1
[docs]
class DistributedNSGAII(Algorithm[S, R]):
def __init__(
self,
problem: Problem,
population_size: int,
mutation: Mutation,
crossover: Crossover,
number_of_cores: int,
client,
selection: Selection = BinaryTournamentSelection(
MultiComparator([FastNonDominatedRanking.get_comparator(), CrowdingDistance.get_comparator()])
),
termination_criterion: TerminationCriterion = store.default_termination_criteria,
dominance_comparator: DominanceComparator = DominanceComparator(),
):
super(DistributedNSGAII, self).__init__()
self.problem = problem
self.population_size = population_size
self.mutation_operator = mutation
self.crossover_operator = crossover
self.selection_operator = selection
self.dominance_comparator = dominance_comparator
self.termination_criterion = termination_criterion
self.observable.register(termination_criterion)
self.number_of_cores = number_of_cores
self.client = client
[docs]
def create_initial_solutions(self) -> List[S]:
return [self.problem.create_solution() for _ in range(self.number_of_cores)]
[docs]
def evaluate(self, solutions: List[S]) -> List[S]:
return self.client.map(self.problem.evaluate, solutions)
[docs]
def stopping_condition_is_met(self) -> bool:
return self.termination_criterion.is_met
[docs]
def observable_data(self) -> dict:
ctime = time.time() - self.start_computing_time
return {
"PROBLEM": self.problem,
"EVALUATIONS": self.evaluations,
"SOLUTIONS": self.result(),
"COMPUTING_TIME": ctime,
}
[docs]
def init_progress(self) -> None:
self.evaluations = self.number_of_cores
observable_data = self.observable_data()
self.observable.notify_all(**observable_data)
[docs]
def step(self) -> None:
pass
[docs]
def update_progress(self):
observable_data = self.observable_data()
self.observable.notify_all(**observable_data)
[docs]
def run(self):
"""Execute the algorithm."""
self.start_computing_time = time.time()
create_solution = dask.delayed(self.problem.create_solution)
evaluate_solution = dask.delayed(self.problem.evaluate)
task_pool = as_completed([], with_results=True)
for _ in range(self.number_of_cores):
new_solution = create_solution()
new_evaluated_solution = evaluate_solution(new_solution)
future = self.client.compute(new_evaluated_solution)
task_pool.add(future)
batches = task_pool.batches()
auxiliar_population = []
while len(auxiliar_population) < self.population_size:
batch = next(batches)
for _, received_solution in batch:
auxiliar_population.append(received_solution)
if len(auxiliar_population) < self.population_size:
break
# submit as many new tasks as we collected
for _ in batch:
new_solution = create_solution()
new_evaluated_solution = evaluate_solution(new_solution)
future = self.client.compute(new_evaluated_solution)
task_pool.add(future)
self.init_progress()
# perform an algorithm step to create a new solution to be evaluated
while not self.stopping_condition_is_met():
batch = next(batches)
for _, received_solution in batch:
offspring_population = [received_solution]
# replacement
ranking = FastNonDominatedRanking(self.dominance_comparator)
density_estimator = CrowdingDistance()
r = RankingAndDensityEstimatorReplacement(ranking, density_estimator, RemovalPolicyType.ONE_SHOT)
auxiliar_population = r.replace(auxiliar_population, offspring_population)
# selection
mating_population = []
for _ in range(2):
solution = self.selection_operator.execute(auxiliar_population)
mating_population.append(solution)
# Reproduction and evaluation
new_task = self.client.submit(
reproduction, mating_population, self.problem, self.crossover_operator, self.mutation_operator
)
task_pool.add(new_task)
# update progress
self.evaluations += 1
self.solutions = auxiliar_population
self.update_progress()
if self.stopping_condition_is_met():
break
self.total_computing_time = time.time() - self.start_computing_time
# at this point, computation is done
for future, _ in task_pool:
future.cancel()
[docs]
def result(self) -> R:
return self.solutions
[docs]
def get_name(self) -> str:
return "dNSGA-II"
def reproduction(mating_population: List[S], problem, crossover_operator, mutation_operator) -> S:
offspring_pool = []
for parents in zip(*[iter(mating_population)] * 2):
offspring_pool.append(crossover_operator.execute(parents))
offspring_population = []
for pair in offspring_pool:
for solution in pair:
mutated_solution = mutation_operator.execute(solution)
offspring_population.append(mutated_solution)
return problem.evaluate(offspring_population[0])