from abc import ABC, abstractmethod
from typing import TypeVar, Generic
S = TypeVar('S')
class Archive(Generic[S], ABC):
def __init__(self):
self.solution_list: List[S] = []
@abstractmethod
def add(self, solution: S) -> bool:
pass
def get(self, index: int) -> S:
return self.solution_list[index]
def size(self) -> int:
return len(self.solution_list)
def get_name(self) -> str:
return self.__class__.__name__
import copy
import random
import threading
from abc import ABC, abstractmethod
from threading import Lock
from typing import Generic, List, TypeVar, Optional
import numpy as np
from jmetal.util.comparator import Comparator, DominanceComparator, SolutionAttributeComparator
from jmetal.util.density_estimator import DensityEstimator, CrowdingDistanceDensityEstimator
from jmetal.util.distance import DistanceMetric, DistanceCalculator
S = TypeVar('S')
"""
.. module:: archive
:platform: Unix, Windows
:synopsis: Archive implementation.
.. moduleauthor:: Antonio J. Nebro <antonio@lcc.uma.es>
"""
[docs]
class Archive(Generic[S], ABC):
def __init__(self):
self.solution_list: List[S] = []
@abstractmethod
def add(self, solution: S) -> bool:
pass
def get(self, index: int) -> S:
return self.solution_list[index]
def size(self) -> int:
return len(self.solution_list)
def get_name(self) -> str:
return self.__class__.__name__
[docs]
class BoundedArchive(Archive[S]):
def __init__(self, maximum_size: int, comparator: Comparator[S] = None, density_estimator: DensityEstimator = None,
dominance_comparator: Comparator[S] = DominanceComparator()):
super(BoundedArchive, self).__init__()
self.maximum_size = maximum_size
self.comparator = comparator
self.density_estimator = density_estimator
self.non_dominated_solution_archive = NonDominatedSolutionsArchive(dominance_comparator=dominance_comparator)
self.solution_list = self.non_dominated_solution_archive.solution_list
def compute_density_estimator(self):
self.density_estimator.compute_density_estimator(self.solution_list)
def add(self, solution: S) -> bool:
success = self.non_dominated_solution_archive.add(solution)
if success:
if self.size() > self.maximum_size:
self.compute_density_estimator()
worst_solution, index_to_remove = self.__find_worst_solution(self.solution_list)
self.solution_list.pop(index_to_remove)
return success
def __find_worst_solution(self, solution_list: List[S]) -> S:
if solution_list is None:
raise Exception("The solution list is None")
elif len(solution_list) == 0:
raise Exception("The solution list is empty")
worst_solution = solution_list[0]
index_to_remove = 0
for solution_index, solution in enumerate(solution_list[1:]):
if self.comparator.compare(worst_solution, solution) < 0:
worst_solution = solution
index_to_remove = solution_index + 1
return worst_solution, index_to_remove
[docs]
class NonDominatedSolutionsArchive(Archive[S]):
"""
Archive that maintains only non-dominated solutions using Pareto dominance.
This implementation efficiently manages a collection of solutions by:
- Adding new non-dominated solutions
- Removing solutions dominated by new ones
- Preventing duplicate solutions based on objectives
Time Complexity: O(n) per insertion, where n is archive size
Space Complexity: O(n) for storing solutions
"""
def __init__(self, dominance_comparator: Comparator = DominanceComparator(),
objective_tolerance: float = 1e-10):
"""
Initialize the non-dominated solutions archive.
Args:
dominance_comparator: Comparator to determine dominance relationships
objective_tolerance: Tolerance for comparing floating-point objectives
"""
super(NonDominatedSolutionsArchive, self).__init__()
self.comparator = dominance_comparator
self.objective_tolerance = objective_tolerance
def _objectives_equal(self, solution1: S, solution2: S) -> bool:
"""
Check if two solutions have equal objectives within tolerance.
Args:
solution1: First solution to compare
solution2: Second solution to compare
Returns:
True if objectives are equal within tolerance, False otherwise
"""
if len(solution1.objectives) != len(solution2.objectives):
return False
for obj1, obj2 in zip(solution1.objectives, solution2.objectives):
if abs(obj1 - obj2) > self.objective_tolerance:
return False
return True
[docs]
def add(self, solution: S) -> bool:
"""
Add a solution to the archive if it's non-dominated and not duplicate.
This method efficiently handles the archive by:
1. Checking if the new solution is dominated by existing ones
2. Removing existing solutions dominated by the new one
3. Preventing addition of duplicate solutions
Args:
solution: Solution to add to the archive
Returns:
True if solution was added, False if rejected (dominated or duplicate)
Time Complexity: O(n) where n is the number of solutions in archive
"""
# Handle empty archive case
if not self.solution_list:
self.solution_list.append(solution)
return True
# Check dominance against all existing solutions
remaining_solutions = []
for current_solution in self.solution_list:
dominance_flag = self.comparator.compare(solution, current_solution)
if dominance_flag == 1:
# New solution is dominated by current -> reject immediately
return False
elif dominance_flag == 0:
# No dominance relationship -> check for duplicates
if self._objectives_equal(solution, current_solution):
# Duplicate found -> reject
return False
# Keep the current solution as it's not dominated
remaining_solutions.append(current_solution)
# dominance_flag == -1: current solution is dominated -> don't add to remaining
# Update archive with non-dominated solutions and add new one
# IMPORTANT: Modify list in-place to maintain references from BoundedArchive
self.solution_list.clear()
self.solution_list.extend(remaining_solutions)
self.solution_list.append(solution)
return True
[docs]
class CrowdingDistanceArchive(BoundedArchive[S]):
def __init__(self, maximum_size: int, dominance_comparator=DominanceComparator()):
super(CrowdingDistanceArchive, self).__init__(
maximum_size=maximum_size,
comparator=SolutionAttributeComparator("crowding_distance", lowest_is_best=False),
dominance_comparator=dominance_comparator,
density_estimator=CrowdingDistanceDensityEstimator(),
)
[docs]
class ArchiveWithReferencePoint(BoundedArchive[S]):
def __init__(
self,
maximum_size: int,
reference_point: List[float],
comparator: Comparator[S],
density_estimator: DensityEstimator,
):
super(ArchiveWithReferencePoint, self).__init__(maximum_size, comparator, density_estimator)
self.__reference_point = reference_point
self.__comparator = comparator
self.__density_estimator = density_estimator
self.lock = Lock()
def add(self, solution: S) -> bool:
with self.lock:
dominated_solution = None
if self.__dominance_test(solution.objectives, self.__reference_point) == 0:
if len(self.solution_list) == 0:
result = True
else:
if random.uniform(0.0, 1.0) < 0.05:
result = True
dominated_solution = solution
else:
result = False
else:
result = True
if result:
result = super(ArchiveWithReferencePoint, self).add(solution)
if result and dominated_solution is not None and len(self.solution_list) > 1:
if dominated_solution in self.solution_list:
self.solution_list.remove(dominated_solution)
if result and len(self.solution_list) > self.maximum_size:
self.compute_density_estimator()
return result
def filter(self):
# In case of having at least a solution which is non-dominated with the reference point, filter it
if len(self.solution_list) > 1:
self.solution_list[:] = [
sol for sol in self.solution_list if self.__dominance_test(sol.objectives, self.__reference_point) != 0
]
def update_reference_point(self, new_reference_point) -> None:
with self.lock:
self.__reference_point = new_reference_point
first_solution = copy.deepcopy(self.solution_list[0])
self.filter()
if len(self.solution_list) == 0:
self.solution_list.append(first_solution)
def get_reference_point(self) -> List[float]:
with self.lock:
return self.__reference_point
def __dominance_test(self, vector1: List[float], vector2: List[float]) -> int:
best_is_one = 0
best_is_two = 0
for value1, value2 in zip(vector1, vector2):
if value1 != value2:
if value1 < value2:
best_is_one = 1
if value2 < value1:
best_is_two = 1
if best_is_one > best_is_two:
result = -1
elif best_is_two > best_is_one:
result = 1
else:
result = 0
return result
class CrowdingDistanceArchiveWithReferencePoint(ArchiveWithReferencePoint[S]):
def __init__(self, maximum_size: int, reference_point: List[float]):
super(CrowdingDistanceArchiveWithReferencePoint, self).__init__(
maximum_size=maximum_size,
reference_point=reference_point,
comparator=SolutionAttributeComparator("crowding_distance", lowest_is_best=False),
density_estimator=CrowdingDistanceDensityEstimator(),
)
def distance_based_subset_selection_robust(solution_list: List[S], subset_size: int,
metric: DistanceMetric = DistanceMetric.L2_SQUARED,
weights: Optional[np.ndarray] = None,
random_seed: Optional[int] = None,
use_vectorized: bool = True) -> List[S]:
"""
Robust distance-based subset selection with multiple metrics and improved normalization.
This implementation follows the SafeBestSolutionsArchive approach:
- For 2 objectives: Uses crowding distance selection (fast and standard)
- For >2 objectives: Uses robust distance-based selection with smart normalization
- Only normalizes dimensions with non-zero range to avoid division by zero
- Supports multiple distance metrics for performance and flexibility
Args:
solution_list: List of solutions to select from
subset_size: Number of solutions to select
metric: Distance metric to use (default: L2_SQUARED)
weights: Optional weights for TCHEBY_WEIGHTED metric
random_seed: Optional seed for reproducible results
use_vectorized: Whether to use vectorized implementation (default: True)
Returns:
List of selected solutions
Raises:
ValueError: If parameters are invalid
"""
if not solution_list:
return []
if subset_size <= 0:
raise ValueError("Subset size must be positive")
if subset_size >= len(solution_list):
return solution_list[:]
# Set random seed if provided for reproducibility
if random_seed is not None:
np.random.seed(random_seed)
random.seed(random_seed)
# Get number of objectives from first solution
num_objectives = len(solution_list[0].objectives)
# For 2 objectives, use crowding distance (fast and standard in jMetal)
if num_objectives == 2:
return _crowding_distance_selection(solution_list, subset_size)
# For >2 objectives, use robust distance-based selection
return _robust_distance_based_selection(solution_list, subset_size, metric, weights, use_vectorized)
def _identify_valid_dimensions(objectives_matrix: np.ndarray) -> np.ndarray:
"""
Identify dimensions with non-zero range to avoid normalization issues.
Args:
objectives_matrix: Matrix of objectives (n_solutions x n_objectives)
Returns:
Array of valid dimension indices (those with max > min)
"""
min_vals = np.min(objectives_matrix, axis=0)
max_vals = np.max(objectives_matrix, axis=0)
# Find dimensions where max > min (non-constant objectives)
valid_dims = np.where(max_vals > min_vals)[0]
return valid_dims
def _robust_distance_based_selection(solution_list: List[S], subset_size: int,
metric: DistanceMetric, weights: Optional[np.ndarray] = None,
use_vectorized: bool = True) -> List[S]:
"""
Robust distance-based selection for >2 objective problems.
Algorithm improvements:
1. Identify valid dimensions (non-zero range) for normalization
2. Fallback to crowding distance if all dimensions are constant
3. Use best solution in random objective as seed (not extremes)
4. Apply efficient distance calculations with selectable metrics
5. Choose between vectorized and original implementations
Args:
solution_list: List of solutions to select from
subset_size: Number of solutions to select
metric: Distance metric to use
weights: Optional weights for TCHEBY_WEIGHTED metric
use_vectorized: Whether to use vectorized implementation
"""
# Convert solutions to matrix
objectives_matrix = np.array([sol.objectives for sol in solution_list])
n_solutions, n_objectives = objectives_matrix.shape
# Identify valid dimensions (non-zero range)
valid_dims = _identify_valid_dimensions(objectives_matrix)
if len(valid_dims) == 0:
# All objectives are constant -> fallback to crowding distance
return _crowding_distance_selection(solution_list, subset_size)
# Normalize only valid dimensions
normalized_matrix = np.zeros((n_solutions, len(valid_dims)))
min_vals = np.min(objectives_matrix[:, valid_dims], axis=0)
max_vals = np.max(objectives_matrix[:, valid_dims], axis=0)
ranges = max_vals - min_vals
for i in range(n_solutions):
normalized_matrix[i] = (objectives_matrix[i, valid_dims] - min_vals) / ranges
# Project weights to valid dimensions if using weighted metric
projected_weights = None
if metric == DistanceMetric.TCHEBY_WEIGHTED:
if weights is None:
projected_weights = np.ones(len(valid_dims))
else:
if len(weights) != n_objectives:
raise ValueError(f"Weights length ({len(weights)}) must match number of objectives ({n_objectives})")
projected_weights = weights[valid_dims]
# Seed selection: best solution in a random valid objective
random_objective_idx = random.randint(0, len(valid_dims) - 1)
random_objective = valid_dims[random_objective_idx]
# Find best (minimum) value in the random objective
seed_idx = np.argmin(objectives_matrix[:, random_objective])
# Choose implementation based on parameter
if use_vectorized:
return _vectorized_subset_selection(solution_list, normalized_matrix, subset_size,
seed_idx, metric, projected_weights)
else:
return _original_subset_selection(solution_list, normalized_matrix, subset_size,
seed_idx, metric, projected_weights)
def _original_subset_selection(solution_list: List[S], normalized_matrix: np.ndarray,
subset_size: int, seed_idx: int, metric: DistanceMetric,
weights: Optional[np.ndarray] = None) -> List[S]:
"""
Original (non-vectorized) implementation of subset selection for higher quality results.
This function uses the original iterative approach which may be slower but can provide
higher quality diversity selection in some cases.
Args:
solution_list: List of solutions to select from
normalized_matrix: Normalized objective matrix
subset_size: Number of solutions to select
seed_idx: Index of seed solution
metric: Distance metric to use
weights: Optional weights for weighted metrics
Returns:
List[S]: Selected solutions
"""
n_solutions = len(solution_list)
# Initialize selection
selected_indices = [seed_idx]
selected_mask = np.zeros(n_solutions, dtype=bool)
selected_mask[seed_idx] = True
# Track minimum distances to selected solutions
min_distances = np.full(n_solutions, np.inf)
_update_min_distances_legacy(normalized_matrix, min_distances, selected_mask, seed_idx, metric, weights)
# Iteratively select solutions with maximum minimum distance
while len(selected_indices) < subset_size:
# Find unselected solution with maximum minimum distance
candidates = np.where(~selected_mask)[0]
if len(candidates) == 0:
break
candidate_distances = min_distances[candidates]
best_candidate_local_idx = np.argmax(candidate_distances)
best_candidate_idx = candidates[best_candidate_local_idx]
# Add to selection
selected_indices.append(best_candidate_idx)
selected_mask[best_candidate_idx] = True
# Update minimum distances
_update_min_distances_legacy(normalized_matrix, min_distances, selected_mask, best_candidate_idx, metric, weights)
# Return selected solutions
return [solution_list[i] for i in selected_indices]
def _vectorized_subset_selection(solution_list: List[S], normalized_matrix: np.ndarray,
subset_size: int, seed_idx: int, metric: DistanceMetric,
weights: Optional[np.ndarray] = None) -> List[S]:
"""
Vectorized implementation of subset selection using optimized distance calculations.
This function uses the new vectorized distance calculation methods to significantly
improve performance when selecting subsets from large solution sets.
Args:
solution_list: List of solutions to select from
normalized_matrix: Normalized objective matrix
subset_size: Number of solutions to select
seed_idx: Index of seed solution
metric: Distance metric to use
weights: Optional weights for weighted metrics
Returns:
List[S]: Selected solutions
"""
n_solutions = len(solution_list)
# Initialize selection with seed
selected_indices = [seed_idx]
# Iteratively select solutions with maximum minimum distance
while len(selected_indices) < subset_size:
# Calculate minimum distances using vectorized operations
min_distances = DistanceCalculator.calculate_min_distances_vectorized(
normalized_matrix, selected_indices, metric, weights
)
# Find unselected solution with maximum minimum distance
# (selected solutions already have infinite distance)
best_candidate_idx = np.argmax(min_distances[np.isfinite(min_distances)])
# Convert to actual index (since argmax works on finite subset)
finite_indices = np.where(np.isfinite(min_distances))[0]
if len(finite_indices) == 0:
# All remaining solutions are already selected
break
best_candidate_idx = finite_indices[np.argmax(min_distances[finite_indices])]
# Add to selection
selected_indices.append(best_candidate_idx)
# Return selected solutions
return [solution_list[i] for i in selected_indices]
def _update_min_distances_legacy(normalized_matrix: np.ndarray, min_distances: np.ndarray,
selected_mask: np.ndarray, new_selected_idx: int,
metric: DistanceMetric, weights: Optional[np.ndarray] = None):
"""
Legacy function for updating minimum distances (kept for compatibility).
Note: This function is now deprecated in favor of vectorized operations.
Use DistanceCalculator.calculate_min_distances_vectorized() instead.
Args:
normalized_matrix: Normalized objective matrix
min_distances: Array of minimum distances to selected solutions
selected_mask: Boolean mask of selected solutions
new_selected_idx: Index of newly selected solution
metric: Distance metric to use
weights: Optional weights for weighted metrics
"""
new_solution = normalized_matrix[new_selected_idx]
for i in range(len(min_distances)):
if selected_mask[i]: # Skip already selected solutions
continue
# Calculate distance to new selected solution
distance = DistanceCalculator.calculate_distance(
normalized_matrix[i], new_solution, metric, weights
)
# Update minimum distance if this is closer
if distance < min_distances[i]:
min_distances[i] = distance
# Maintain backward compatibility
_update_min_distances = _update_min_distances_legacy
def _crowding_distance_selection(solution_list: List[S], subset_size: int) -> List[S]:
"""
Selects solutions using crowding distance for 2-objective problems.
"""
# Create a temporary archive to calculate crowding distances
archive = CrowdingDistanceArchive(len(solution_list))
# Add all solutions to calculate crowding distances
for solution in solution_list:
archive.add(copy.deepcopy(solution))
# Sort by crowding distance (descending)
sorted_solutions = sorted(
archive.solution_list,
key=lambda sol: sol.attributes.get("crowding_distance", 0.0),
reverse=True
)
return sorted_solutions[:subset_size]
# Backward compatibility alias
[docs]
def distance_based_subset_selection(solution_list: List[S], subset_size: int, distance_measure=None,
metric: DistanceMetric = DistanceMetric.L2_SQUARED,
weights: Optional[np.ndarray] = None,
random_seed: Optional[int] = None) -> List[S]:
"""
Backward compatibility wrapper for distance_based_subset_selection_robust.
Args:
solution_list: List of solutions to select from
subset_size: Number of solutions to select
distance_measure: Deprecated parameter (ignored)
metric: Distance metric to use
weights: Optional weights for TCHEBY_WEIGHTED metric
random_seed: Optional seed for reproducible results
Returns:
List of selected solutions
"""
return distance_based_subset_selection_robust(solution_list, subset_size, metric, weights, random_seed)
[docs]
class DistanceBasedArchive(BoundedArchive[S]):
"""
Archive that maintains solutions using adaptive distance-based subset selection.
This archive extends BoundedArchive to use a sophisticated selection mechanism:
- For 2 objectives: Uses crowding distance selection
- For >2 objectives: Uses robust distance-based subset selection with normalization
The implementation follows the Java jMetal SafeBestSolutionsArchive algorithm.
"""
def __init__(self, maximum_size: int,
metric: DistanceMetric = DistanceMetric.L2_SQUARED,
weights: Optional[np.ndarray] = None,
random_seed: Optional[int] = None,
dominance_comparator=None,
use_vectorized: bool = True):
"""
Initialize the distance-based archive.
Args:
maximum_size: Maximum number of solutions to maintain
metric: Distance metric to use (default: L2_SQUARED)
weights: Optional weights for TCHEBY_WEIGHTED metric
random_seed: Optional seed for reproducible results
dominance_comparator: Comparator for dominance (default: DominanceComparator)
use_vectorized: Whether to use vectorized implementation (default: True)
"""
if dominance_comparator is None:
dominance_comparator = DominanceComparator()
# Initialize parent with dummy comparator and density estimator
# We'll override the selection mechanism in our custom add method
super(DistanceBasedArchive, self).__init__(
maximum_size=maximum_size,
comparator=SolutionAttributeComparator("dummy", lowest_is_best=True),
density_estimator=CrowdingDistanceDensityEstimator(),
dominance_comparator=dominance_comparator
)
self.metric = metric
self.weights = weights
self.random_seed = random_seed
self.use_vectorized = use_vectorized
# Thread safety for concurrent access
self._lock = threading.Lock()
[docs]
def add(self, solution: S) -> bool:
"""
Add a solution to the archive using non-dominated sorting and distance-based selection.
Thread-safe implementation for concurrent use.
Args:
solution: Solution to add
Returns:
True if solution was added or archive was modified, False otherwise
"""
with self._lock:
# First, add to non-dominated archive (this handles dominance)
success = self.non_dominated_solution_archive.add(solution)
if success and self.size() > self.maximum_size:
# Apply distance-based subset selection
selected_solutions = distance_based_subset_selection_robust(
self.solution_list,
self.maximum_size,
self.metric,
self.weights,
self.random_seed,
self.use_vectorized
)
# Update solution list with selected solutions
# IMPORTANT: Clear and extend to maintain reference from parent class
self.solution_list.clear()
self.solution_list.extend(selected_solutions)
return success
[docs]
def compute_density_estimator(self):
"""
Override parent method since we use distance-based selection instead.
This method is called by parent class but we don't need density estimation.
"""
# Do nothing - we use distance-based selection instead of density estimation
pass