Source code for jmetal.util.evaluator
import functools
from abc import ABC, abstractmethod
from multiprocessing.pool import Pool, ThreadPool
from typing import Generic, List, TypeVar
try:
import dask
except ImportError:
pass
try:
from pyspark import SparkConf, SparkContext
except ImportError:
pass
from jmetal.core.problem import Problem
S = TypeVar("S")
class Evaluator(Generic[S], ABC):
@abstractmethod
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
pass
@staticmethod
def evaluate_solution(solution: S, problem: Problem) -> None:
problem.evaluate(solution)
[docs]
class SequentialEvaluator(Evaluator[S]):
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
for solution in solution_list:
Evaluator.evaluate_solution(solution, problem)
return solution_list
[docs]
class MapEvaluator(Evaluator[S]):
def __init__(self, processes: int = None):
self.pool = ThreadPool(processes)
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
self.pool.map(lambda solution: Evaluator.evaluate_solution(solution, problem), solution_list)
return solution_list
[docs]
class MultiprocessEvaluator(Evaluator[S]):
def __init__(self, processes: int = None):
super().__init__()
self.pool = Pool(processes)
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
return self.pool.map(functools.partial(evaluate_solution, problem=problem), solution_list)
[docs]
class SparkEvaluator(Evaluator[S]):
def __init__(self, processes: int = 8):
self.spark_conf = SparkConf().setAppName("jmetalpy").setMaster(f"local[{processes}]")
self.spark_context = SparkContext(conf=self.spark_conf)
logger = self.spark_context._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
solutions_to_evaluate = self.spark_context.parallelize(solution_list)
return solutions_to_evaluate.map(lambda s: problem.evaluate(s)).collect()
def evaluate_solution(solution, problem):
Evaluator[S].evaluate_solution(solution, problem)
return solution
[docs]
class DaskEvaluator(Evaluator[S]):
def __init__(self, scheduler="processes"):
self.scheduler = scheduler
[docs]
def evaluate(self, solution_list: List[S], problem: Problem) -> List[S]:
with dask.config.set(scheduler=self.scheduler):
return list(
dask.compute(
*[dask.delayed(evaluate_solution)(solution=solution, problem=problem) for solution in solution_list]
)
)