Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimize function stuck forever when using CPU parallelization #610

Open
muazhari opened this issue Jun 29, 2024 · 2 comments
Open

Minimize function stuck forever when using CPU parallelization #610

muazhari opened this issue Jun 29, 2024 · 2 comments
Assignees

Comments

@muazhari
Copy link
Contributor

muazhari commented Jun 29, 2024

I have failed test results using ElementWiseProblem with dask, ray, starmap multiprocessing, and future process pool executor in jupyter notebook. All of it except starmap have the same outcome, stuck forever and not utilizing all CPU cores (just using one core). Even the starmap configured with >1 core (interpolated to 24 cores), only makes the execution longer in duration and just uses 1 core. What is left is only using the default runner, LoopedElementwiseEvaluation. Unexpectedly, the default runner is the fastest and works compared to all parallelized runners (still only utilizes 1 core). I already tested future executor, dask, and ray separately using a similar Pymoo runner implementation. Unknowingly, ray is too slow and does not utilize all CPU cores, dask can utilize all CPU cores but slower than the future executor, and future executor is the fastest.

  • Reproducible code
from pymoo.core.problem import LoopedElementwiseEvaluation
from pymoo.algorithms.moo.nsga2 import RankAndCrowding
from pymoo.core.mixed import MixedVariableGA
from pymoo.optimize import minimize

from pymoo.core.problem import ElementwiseProblem

import ray

ray.shutdown()
ray.init(dashboard_host="0.0.0.0")
ray.available_resources()

from distributed import LocalCluster
from dask.distributed import Client

# cluster = LocalCUDACluster()
cluster = LocalCluster(n_workers=24, threads_per_worker=1)
client = Client(cluster)
client

class MultiObjectiveMixedVariableProblem(ElementwiseProblem):

    def __init__(self, **kwargs):
        vars = {
            "b": Binary(),
            "x": Choice(options=["nothing", "multiply"]),
            "y": Integer(bounds=(-2 * 10 ** 5, 2 * 10 ** 5)),
            "z": Real(bounds=(-5 * 10 ** 3, 5 * 10 ** 3)),
        }
        super().__init__(vars=vars, n_obj=6, n_ieq_constr=0, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        b, x, z, y = X["b"], X["x"], X["z"], X["y"]
        f1 = z ** 2 + y ** 2
        f2 = (z + 2) ** 2 + (y - 1) ** 2
        f3 = (z ** 2) / 2 + (y + 1)
        f4 = -z ** 2
        f5 = z ** 2
        f6 = z / 2 - y - y / z
        
        if b:
            f2 = 100 * f2
         
        if x == "multiply":
            f2 = 10 * f2

        out["F"] = [f1, f2, f3, f4, f5, f6]

# runner = RayParallelization(
#     job_resources={
#         "num_gpus": 1,
#         "num_cpus": 24,
#     }
# )

# runner = DaskParallelization(
#     client=client
# )

# pool = multiprocessing.Pool(24)
# runner = StarmapParallelization(pool.starmap)

runner = LoopedElementwiseEvaluation()


class ConcurrentParallelization:

    def __init__(self, max_workers) -> None:
        super().__init__()
        self.max_workers = max_workers

    def __call__(self, f, X):
        with futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
            function_futures = [executor.submit(f, x) for x in X]
            futures.wait(function_futures, return_when=futures.ALL_COMPLETED)
            return [function_future.result() for function_future in function_futures]

    def __getstate__(self):
        state = self.__dict__.copy()
        state.pop("max_workers", None)
        return state


runner = ConcurrentParallelization(
    max_workers=24
)

problem = MultiObjectiveMixedVariableProblem(elementwise_runner=runner)

algorithm = MixedVariableGA(
    survival=RankAndCrowding()
)

res = minimize(
    problem,
    algorithm,
    seed=1
)
@blankjul
Copy link
Collaborator

blankjul commented Jul 7, 2024

Looking at your code my assumption is that the parallelization with dask and ray introduces a signifcant amount of overhead (because of serialization). In my opinion the main advantages is using parallelization with a cloud service (e.g. AWS lambda functions) on a larger scale. For instance, launching 200 instances in parallel for sure will beat running this on 4 cores.

Can you confirm your results with a computation heavier problem as well? Let us say a (time-discrete) simulation that requires 1 minute or so? Happy to discuss this a little more here.

@blankjul blankjul self-assigned this Jul 7, 2024
@muazhari
Copy link
Contributor Author

muazhari commented Jul 8, 2024

I think it wasn't caused by overhead. Unknowingly, when using ray, it works when I do not configure the computing resources (using default ray init configuration at 32 logical cores)*. However, sometimes it crashes. Click here for details.

class OptimizationProblemRunner:
    def __init__(self):
        pass

    def __call__(self, f, X):
        runnable = ray.remote(f.__call__.__func__)
        futures = [runnable.remote(f, x) for x in X]
        return ray.get(futures)

    def __getstate__(self):
        state = self.__dict__.copy()
        return state

*update: somehow it doesn't work again, even by not supplying any resource configuration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants