Skip to content
Permalink
Browse files

Support init_spark_on_yarn and RayContext (#1344)

* rayrunner

* add a jvm killer

* disable killer from spark job and rely on jvm killer only

* add env and verify the cv2 installation

* enhance

* minor

* style

* comments

* local and enhancement

* better local strategy

* doc and style

* doc

* style

* revert

* doc

* disable

* comments

* fix test
  • Loading branch information...
zhichao-li committed Jun 10, 2019
1 parent 614ee46 commit fe6b03f08d8a4a51a6603360dcf10a00150f70ea
@@ -6,6 +6,7 @@
target/
/dist/
pyzoo/.pytest_cache/
pyzoo/ray_poc.zip

# External Python packages
python_packages/
@@ -0,0 +1,93 @@
## Run Ray on Spark

AnalyticsZoo has already provided a mechanism to deploy Python dependencies and Ray services automatically
across yarn cluster,meaning python user would be able to run `Analytics-Zoo` or `Ray`
in a pythonic way on yarn without `spark-submit` or installing Analytics-Zoo or Ray across all cluster nodes.

## Here are the steps to run RayOnSpark:

1) You should install Conda first and create a conda-env named "ray36"

2) Install some essential dependencies inthe conda env

```
pip install analytics-zoo
pip install pyspark==2.4.0 # 2.4.3 is OK as well.
pip install ray
pip install conda-pack
pip install psutil
pip install aiohttp
pip install setproctitle
```

3) Download JDK8 and set the environment variable: JAVA_HOME (recommended).
- You can also install JDK via conda without setting the JAVA_HOME manually:
`conda install -c anaconda openjdk=8.0.152`

4) Start python and then execute the following example

- Create a SparkContext on Yarn

``` python
import ray
from zoo import init_spark_on_yarn
from zoo.ray.util.raycontext import RayContext
slave_num = 2
sc = init_spark_on_yarn(
hadoop_conf="/opt/work/almaren-yarn-config/",
conda_name="ray36",
num_executor=slave_num,
executor_cores=4,
executor_memory="8g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="10g")
```

- [Optional] If you don't have a yarn cluster, this can also be test locally by creating `SparkContext`
with `init_spark_on_local`

```Python
from zoo import init_spark_on_local
sc = init_spark_on_local(cores=4)
```


- Once the SparkContext created, we can write more logic here either training Analytics-Zoo model
or launching ray on spark.

- The following code would launch a ray cluster on top of the SparkContext configuration and also verify with a simple Ray example.

```python
ray_ctx = RayContext(sc=sc,
object_store_memory="5g")
ray_ctx.init()
@ray.remote
class TestRay():
def hostname(self):
import socket
return socket.gethostname()
def check_cv2(self):
# conda install -c conda-forge opencv==3.4.2
import cv2
return cv2.__version__
def ip(self):
import ray.services as rservices
return rservices.get_node_ip_address()
actors = [TestRay.remote() for i in range(0, slave_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
print([ray.get(actor.ip.remote()) for actor in actors])
ray_ctx.stop()
```
@@ -28,7 +28,9 @@ py_version="$(python -V 2>&1)"
python -m pytest -v --doctest-modules ../zoo \
--ignore=../zoo/pipeline/api/keras2 \
--ignore=../zoo/tfpark/text \
--ignore=../zoo/examples
--ignore=../zoo/examples \
--ignore=../zoo/ray/

exit_status_1=$?
if [ $exit_status_1 -ne 0 ];
then
@@ -43,7 +45,8 @@ if [[ $py_version == *"Python 2.7"* ]]; then
--ignore=../test/zoo/pipeline/api/keras2/ \
--ignore=../test/zoo/pipeline/autograd/ \
--ignore=../test/zoo/pipeline/inference/ \
--ignore=../test/zoo/tfpark/test_text_models.py
--ignore=../test/zoo/tfpark/test_text_models.py \
--ignore=../test/zoo/ray/
exit_status_2=$?
if [ $exit_status_2 -ne 0 ];
then
@@ -53,7 +56,9 @@ else
echo "Running Python 3"
python -m pytest -v ../test --ignore=../test/zoo/pipeline/utils/test_utils.py \
--ignore=../test/zoo/pipeline/api/keras2/ \
--ignore=../test/zoo/pipeline/autograd/
--ignore=../test/zoo/pipeline/autograd/ \
--ignore=../test/zoo/ray/integration/ \
--ignore=../test/zoo/ray/
exit_status_3=$?
if [ $exit_status_3 -ne 0 ];
then
@@ -0,0 +1,15 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
@@ -0,0 +1,15 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
@@ -0,0 +1,71 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import ray

from zoo import init_spark_on_yarn
from zoo.ray.util.raycontext import RayContext

slave_num = 2

sc = init_spark_on_yarn(
hadoop_conf="/opt/work/almaren-yarn-config/",
conda_name="ray36-dev",
num_executor=slave_num,
executor_cores=28,
executor_memory="10g",
driver_memory="2g",
driver_cores=4,
extra_executor_memory_for_ray="30g")

ray_ctx = RayContext(sc=sc,
object_store_memory="25g",
env={"http_proxy": "http://child-prc.intel.com:913",
"http_proxys": "http://child-prc.intel.com:913"})
ray_ctx.init()


@ray.remote
class TestRay():
def hostname(self):
import socket
return socket.gethostname()

def check_cv2(self):
# conda install -c conda-forge opencv==3.4.2
import cv2
return cv2.__version__

def ip(self):
import ray.services as rservices
return rservices.get_node_ip_address()

def network(self):
from urllib.request import urlopen
try:
urlopen('http://www.baidu.com', timeout=3)
return True
except Exception as err:
return False


actors = [TestRay.remote() for i in range(0, slave_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
print([ray.get(actor.ip.remote()) for actor in actors])
# print([ray.get(actor.network.remote()) for actor in actors])

ray_ctx.stop()
@@ -0,0 +1,54 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from unittest import TestCase

import numpy as np
import psutil
import pytest
import ray

from zoo import init_spark_on_local
from zoo.ray.util.raycontext import RayContext

np.random.seed(1337) # for reproducibility


@ray.remote
class TestRay():
def hostname(self):
import socket
return socket.gethostname()


class TestUtil(TestCase):

def test_local(self):
node_num = 4
sc = init_spark_on_local(cores=node_num)
ray_ctx = RayContext(sc=sc)
ray_ctx.init()
actors = [TestRay.remote() for i in range(0, node_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
ray_ctx.stop()
sc.stop()
for process_info in ray_ctx.ray_processesMonitor.process_infos:
for pid in process_info.pids:
assert not psutil.pid_exists(pid)
sc.stop()


if __name__ == "__main__":
pytest.main([__file__])
@@ -0,0 +1,45 @@
#
# Copyright 2018 Analytics Zoo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from unittest import TestCase

import numpy as np
import pytest

import zoo.ray.util.utils as rutils

np.random.seed(1337) # for reproducibility


class TestUtil(TestCase):

def test_split(self):
vector = np.ones([10])
result = rutils.split(vector, 4)
assert len(result) == 4
assert len(result[0]) == 3
assert len(result[1]) == 3
assert len(result[2]) == 2
assert len(result[3]) == 2

def test_resource_to_bytes(self):
assert 10 == rutils.resourceToBytes("10b")
assert 10000 == rutils.resourceToBytes("10k")
assert 10000000 == rutils.resourceToBytes("10m")
assert 10000000000 == rutils.resourceToBytes("10g")


if __name__ == "__main__":
pytest.main([__file__])

0 comments on commit fe6b03f

Please sign in to comment.
You can’t perform that action at this time.