Source code for jmetal.util.evaluator

import functools
from abc import ABC, abstractmethod
from multiprocessing.pool import Pool, ThreadPool
from typing import Generic, List, TypeVar

try:
    import dask
except ImportError:
    pass

try:
    from pyspark import SparkConf, SparkContext
except ImportError:
    pass

from jmetal.core.problem import Problem

S = TypeVar("S")


class Evaluator(Generic[S], ABC):
    @abstractmethod
    def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
        pass

    @staticmethod
    def evaluate_solution(solution: S, problem: Problem) -> None:
        problem.evaluate(solution)


[docs] class SequentialEvaluator(Evaluator[S]):
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: for solution in solution_list: Evaluator.evaluate_solution(solution, problem) return solution_list
[docs] class MapEvaluator(Evaluator[S]): def __init__(self, processes: int = None): self.pool = ThreadPool(processes)
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: self.pool.map(lambda solution: Evaluator.evaluate_solution(solution, problem), solution_list) return solution_list
[docs] class MultiprocessEvaluator(Evaluator[S]): def __init__(self, processes: int = None): super().__init__() self.pool = Pool(processes)
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: return self.pool.map(functools.partial(evaluate_solution, problem=problem), solution_list)
[docs] class SparkEvaluator(Evaluator[S]): def __init__(self, processes: int = 8): self.spark_conf = SparkConf().setAppName("jmetalpy").setMaster(f"local[{processes}]") self.spark_context = SparkContext(conf=self.spark_conf) logger = self.spark_context._jvm.org.apache.log4j logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: solutions_to_evaluate = self.spark_context.parallelize(solution_list) return solutions_to_evaluate.map(lambda s: problem.evaluate(s)).collect()
def evaluate_solution(solution, problem): Evaluator[S].evaluate_solution(solution, problem) return solution
[docs] class DaskEvaluator(Evaluator[S]): def __init__(self, scheduler="processes"): self.scheduler = scheduler
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: with dask.config.set(scheduler=self.scheduler): return list( dask.compute( *[dask.delayed(evaluate_solution)(solution=solution, problem=problem) for solution in solution_list] ) )