Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Ray Backend #1049

Merged
merged 12 commits into from
Aug 17, 2023
5 changes: 5 additions & 0 deletions docs/src/code/executor/ray.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Ray Executor
=============

.. automodule:: orion.executor.ray_backend
:members:
7 changes: 7 additions & 0 deletions docs/src/user/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,10 @@ For more control over Dask, you should prefer using Dask executor backend direct
The executor configuration is used to create the Dask Client. See Dask's documentation
`here <https://distributed.dask.org/en/latest/api.html#distributed.Client>`__ for
more information on possible arguments.

Ray
----

We can also use the ray executor backend. For more control with ray, you can see ray's
documentation `here <https://docs.ray.io/en/latest/ray-core/package-ref.html#python-api>`__ for
more information on ray and it's python api.
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"sphinx_gallery",
],
"dask": ["dask[complete]"],
"ray": ["ray"],
"track": ["track @ git+https:/Delaunay/track@master#egg=track"],
"profet": ["emukit", "GPy", "torch", "pybnn"],
"configspace": ["ConfigSpace"],
Expand Down Expand Up @@ -145,6 +146,7 @@
"joblib = orion.executor.joblib_backend:Joblib",
"poolexecutor = orion.executor.multiprocess_backend:PoolExecutor",
"dask = orion.executor.dask_backend:Dask",
"ray = orion.executor.ray_backend:Ray",
],
},
install_requires=[
Expand Down
113 changes: 113 additions & 0 deletions src/orion/executor/ray_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import logging
import traceback

from orion.core.utils.module_import import ImportOptional
from orion.executor.base import (
AsyncException,
AsyncResult,
BaseExecutor,
ExecutorClosed,
Future,
)

with ImportOptional("ray") as import_optional:
import ray

HAS_RAY = not import_optional.failed

logger = logging.getLogger(__name__)


class _Future(Future):
def __init__(self, future):
self.future = future
self.exception = None

def get(self, timeout=None):
if self.exception:
raise self.exception

Check warning on line 28 in src/orion/executor/ray_backend.py

View check run for this annotation

Codecov / codecov/patch

src/orion/executor/ray_backend.py#L28

Added line #L28 was not covered by tests
try:
return ray.get(self.future, timeout=timeout)
except ray.exceptions.GetTimeoutError as e:
raise TimeoutError() from e

Check warning on line 32 in src/orion/executor/ray_backend.py

View check run for this annotation

Codecov / codecov/patch

src/orion/executor/ray_backend.py#L32

Added line #L32 was not covered by tests

def wait(self, timeout=None):
try:
ray.get(self.future, timeout=timeout)
except ray.exceptions.GetTimeoutError:
pass
except Exception as e:
self.exception = e

Check warning on line 40 in src/orion/executor/ray_backend.py

View check run for this annotation

Codecov / codecov/patch

src/orion/executor/ray_backend.py#L39-L40

Added lines #L39 - L40 were not covered by tests

def ready(self):
obj_ready = ray.wait([self.future])
bouthilx marked this conversation as resolved.
Show resolved Hide resolved
return len(obj_ready[0]) == 1

def successful(self):
# Python 3.6 raise assertion error
if not self.ready():
raise ValueError()

Check warning on line 49 in src/orion/executor/ray_backend.py

View check run for this annotation

Codecov / codecov/patch

src/orion/executor/ray_backend.py#L49

Added line #L49 was not covered by tests

return self.future.successful()

Check warning on line 51 in src/orion/executor/ray_backend.py

View check run for this annotation

Codecov / codecov/patch

src/orion/executor/ray_backend.py#L51

Added line #L51 was not covered by tests


class Ray(BaseExecutor):
def __init__(
self,
n_workers=-1,
**config,
):
super().__init__(n_workers=n_workers)
self.initialized = False
if not HAS_RAY:
raise ImportError("Ray must be installed to use Ray executor.")

Check warning on line 63 in src/orion/executor/ray_backend.py

View check run for this annotation

Codecov / codecov/patch

src/orion/executor/ray_backend.py#L63

Added line #L63 was not covered by tests
self.config = config

if not ray.is_initialized():
ray.init(**self.config)
self.initialized = True
logger.debug("Ray was initiated with runtime_env : %s", **config)

def close(self):
if self.initialized:
self.initialized = False
ray.shutdown()

def __del__(self):
self.close()

def __enter__(self):
return self

def submit(self, function, *args, **kwargs):
if not ray.is_initialized():
raise ExecutorClosed()

remote_g = ray.remote(function)
return _Future(remote_g.remote(*args, **kwargs))

def wait(self, futures):
return [future.get() for future in futures]

def async_get(self, futures, timeout=None):
results = []
tobe_deleted = []
for i, future in enumerate(futures):
if timeout and i == 0:
future.wait(timeout)

if future.ready():
try:
results.append(AsyncResult(future, future.get()))
except Exception as err:
results.append(AsyncException(future, err, traceback.format_exc()))

tobe_deleted.append(future)
for future in tobe_deleted:
futures.remove(future)

return results

def __exit__(self, exc_type, exc_value, traceback):
self.close()
super().__exit__(exc_type, exc_value, traceback)
34 changes: 32 additions & 2 deletions tests/unittests/executor/test_executor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import time

import pytest

from orion.executor.base import AsyncException, ExecutorClosed, executor_factory
from orion.executor.dask_backend import HAS_DASK, Dask
from orion.executor.multiprocess_backend import PoolExecutor
from orion.executor.ray_backend import HAS_RAY, Ray
from orion.executor.single_backend import SingleExecutor


Expand All @@ -16,6 +18,11 @@ def thread(n):
return PoolExecutor(n, "threading")


def ray(n):
test_working_dir = os.path.dirname(os.path.abspath(__file__))
return Ray(n, runtime_env={"working_dir": test_working_dir})


def skip_dask_if_not_installed(
value, reason="Dask dependency is required for these tests."
):
Expand All @@ -39,18 +46,43 @@ def xfail_dask_if_not_installed(
)


def skip_ray_if_not_installed(
value, reason="Ray dependency is required for these tests."
):
return pytest.param(
value,
marks=pytest.mark.skipif(
not HAS_RAY,
reason=reason,
),
)


def xfail_ray_if_not_installed(
value, reason="Ray dependency is required for these tests."
):
return pytest.param(
value,
marks=pytest.mark.xfail(
condition=not HAS_RAY, reason=reason, raises=ImportError
),
)


executors = [
"joblib",
"poolexecutor",
"singleexecutor",
skip_dask_if_not_installed("dask"),
skip_ray_if_not_installed("ray"),
]

backends = [
thread,
multiprocess,
SingleExecutor,
skip_dask_if_not_installed(Dask),
skip_ray_if_not_installed(ray),
]


Expand Down Expand Up @@ -191,9 +223,7 @@ def test_execute_async_bad(backend):

def nested_jobs(executor):
with executor:
print("nested_jobs sub")
futures = [executor.submit(function, 1, 2, i) for i in range(10)]
print("nested_jobs wait")
all_results = executor.wait(futures)
return sum(all_results)

Expand Down
Loading