Source code for jmetal.util.evaluator
import copy
import functools
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.pool import Pool, ThreadPool
from typing import Generic, List, TypeVar
try:
import dask
except ImportError:
pass
try:
from pyspark import SparkConf, SparkContext
except ImportError:
pass
from jmetal.core.problem import Problem
from jmetal.util.archive import Archive
S = TypeVar("S")
class Evaluator(Generic[S], ABC):
@abstractmethod
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
pass
@staticmethod
def evaluate_solution(solution: S, problem: Problem) -> None:
problem.evaluate(solution)
[docs]
class SequentialEvaluator(Evaluator[S]):
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
for solution in solution_list:
Evaluator.evaluate_solution(solution, problem)
return solution_list
class SequentialEvaluatorWithArchive(SequentialEvaluator[S]):
"""
Sequential evaluator that maintains an archive of evaluated solutions.
This evaluator extends SequentialEvaluator by automatically storing copies
of evaluated solutions in an archive. This is useful for:
- Maintaining a history of all evaluated solutions
- Collecting best solutions found during optimization
- Post-processing analysis of the optimization process
Args:
archive: Archive instance to store evaluated solutions
Example:
>>> from jmetal.util.archive import NonDominatedSolutionsArchive
>>> from jmetal.util.evaluator import SequentialEvaluatorWithArchive
>>>
>>> archive = NonDominatedSolutionsArchive()
>>> evaluator = SequentialEvaluatorWithArchive(archive)
>>>
>>> # Use with optimization algorithm
>>> algorithm = NSGAII(
... problem=problem,
... population_size=100,
... offspring_population_size=100,
... mutation=mutation,
... crossover=crossover,
... selection=selection,
... evaluator=evaluator
... )
>>>
>>> # After optimization, access collected solutions
>>> best_solutions = evaluator.archive.solution_list
"""
def __init__(self, archive: Archive[S]):
"""
Initialize evaluator with archive.
Args:
archive: Archive instance to store evaluated solutions
"""
self.archive = archive
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
"""
Evaluate solutions and store copies in the archive.
Args:
solution_list: List of solutions to evaluate
problem: Problem instance used for evaluation
Returns:
List of evaluated solutions (same as input list)
"""
# Evaluate solutions using parent implementation
evaluated_solutions = super().evaluate(solution_list, problem)
# Add copies of evaluated solutions to archive
for solution in evaluated_solutions:
self.archive.add(copy.deepcopy(solution))
return evaluated_solutions
def get_archive(self) -> Archive[S]:
"""
Get the archive containing evaluated solutions.
Returns:
Archive instance with stored solutions
"""
return self.archive
[docs]
class MapEvaluator(Evaluator[S]):
def __init__(self, processes: int = None):
self.pool = ThreadPool(processes)
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
self.pool.map(lambda solution: Evaluator.evaluate_solution(solution, problem), solution_list)
return solution_list
[docs]
class MultiprocessEvaluator(Evaluator[S]):
def __init__(self, processes: int = None):
super().__init__()
self.pool = Pool(processes)
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
return self.pool.map(functools.partial(evaluate_solution, problem=problem), solution_list)
[docs]
class SparkEvaluator(Evaluator[S]):
def __init__(self, processes: int = 8):
self.spark_conf = SparkConf().setAppName("jmetalpy").setMaster(f"local[{processes}]")
self.spark_context = SparkContext(conf=self.spark_conf)
logger = self.spark_context._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
solutions_to_evaluate = self.spark_context.parallelize(solution_list)
return solutions_to_evaluate.map(lambda s: problem.evaluate(s)).collect()
def evaluate_solution(solution, problem):
Evaluator[S].evaluate_solution(solution, problem)
return solution
[docs]
class DaskEvaluator(Evaluator[S]):
def __init__(self, scheduler="processes", number_of_cores=4):
self.scheduler = scheduler
self.number_of_cores = number_of_cores
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
with dask.config.set(scheduler=self.scheduler, pool=ThreadPoolExecutor(self.number_of_cores)):
return list(
dask.compute(
*[dask.delayed(evaluate_solution)(solution=solution, problem=problem) for solution in solution_list]
)
)