Source code for jmetal.util.evaluator

import functools
from abc import ABC, abstractmethod
from multiprocessing.pool import ThreadPool, Pool
from typing import TypeVar, List, Generic

try:
    import dask
except ImportError:
    pass

try:
    from pyspark import SparkConf, SparkContext
except ImportError:
    pass

from jmetal.core.problem import Problem

S = TypeVar('S')


class Evaluator(Generic[S], ABC):

    @abstractmethod
    def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
        pass

    @staticmethod
    def evaluate_solution(solution: S, problem: Problem) -> None:
        problem.evaluate(solution)


[docs]class SequentialEvaluator(Evaluator[S]):
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: for solution in solution_list: Evaluator.evaluate_solution(solution, problem) return solution_list
[docs]class MapEvaluator(Evaluator[S]): def __init__(self, processes: int = None): self.pool = ThreadPool(processes)
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: self.pool.map(lambda solution: Evaluator.evaluate_solution(solution, problem), solution_list) return solution_list
[docs]class MultiprocessEvaluator(Evaluator[S]): def __init__(self, processes: int = None): super().__init__() self.pool = Pool(processes)
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: return self.pool.map(functools.partial(evaluate_solution, problem=problem), solution_list)
[docs]class SparkEvaluator(Evaluator[S]): def __init__(self, processes: int = 8): self.spark_conf = SparkConf().setAppName("jmetalpy").setMaster(f"local[{processes}]") self.spark_context = SparkContext(conf=self.spark_conf) logger = self.spark_context._jvm.org.apache.log4j logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: solutions_to_evaluate = self.spark_context.parallelize(solution_list) return solutions_to_evaluate \ .map(lambda s: problem.evaluate(s)) \ .collect()
def evaluate_solution(solution, problem): Evaluator[S].evaluate_solution(solution, problem) return solution
[docs]class DaskEvaluator(Evaluator[S]): def __init__(self, scheduler='processes'): self.scheduler = scheduler
[docs] def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]: with dask.config.set(scheduler=self.scheduler): return list(dask.compute(*[ dask.delayed(evaluate_solution)(solution=solution, problem=problem) for solution in solution_list ]))