Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support init_spark_on_yarn and RayContext #1344

Merged
merged 18 commits into from Jun 10, 2019

Conversation

Projects
None yet
2 participants
@zhichao-li
Copy link
Contributor

commented May 21, 2019

This patch would provide a mechanism to deploy Python dependencies and Ray services automatically across yarn cluster.
Base on init_spark_on_yarn and Conda, python user would be able to run Analytics-Zoo or Ray in a more pythonic way on yarn with pip install analytics-zoo only without spark-submit or installing Analytics-Zoo or Ray across all cluster nodes.

Example

import ray

from zoo import init_on_yarn
from zoo.ray.util.raycontext import RayContext

slave_num = 2

sc = init_spark_on_yarn(
    hadoop_conf="/opt/work/almaren-yarn-config/",
    conda_name="ray36-dev",
    num_executor=slave_num,
    executor_cores=28,
    executor_memory="10g",
    driver_memory="2g",
    driver_cores=4,
    extra_executor_memory_for_ray="30g")

ray_ctx = RayContext(sc=sc,
                       object_store_memory="25g")
ray_ctx.init()


@ray.remote
class TestRay():
    def hostname(self):
        import socket
        return socket.gethostname()

    def check_cv2(self):
        # conda install -c conda-forge opencv==3.4.2
        import cv2
        return cv2.__version__

    def ip(self):
        import ray.services as rservices
        return rservices.get_node_ip_address()


actors = [TestRay.remote() for i in range(0, slave_num)]
print([ray.get(actor.hostname.remote()) for actor in actors])
print([ray.get(actor.ip.remote()) for actor in actors])

ray_ctx.stop()

@zhichao-li zhichao-li force-pushed the zhichao-li:nohome branch from 56a44a9 to 80c43fb May 21, 2019

@@ -137,4 +139,39 @@ class PythonZooNet[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZoo
toJSample(x).asInstanceOf[RDD[JSample[Float]]], batchSize)
}

val processToBeKill = new ArrayList[String]()

This comment has been minimized.

Copy link
@zhichao-li

zhichao-li May 23, 2019

Author Contributor

switch this to a copy on write array and avoid using locking here.

@zhichao-li zhichao-li changed the title [WIP] RayRunner RayRunner May 30, 2019

@jason-dai

This comment has been minimized.

Copy link
Contributor

commented May 30, 2019

  1. Can we have something like:
sc = init_nncontext_on_yarn() #or init_zoo_on_yarn()
ray_ctx = RayContext(sc)
ray_ctx.init()
@ray.remote
class TestRay():
ray_ctx.stop()
  1. When running on YARN, two executors can possibly run on the same node with the same ip; does it work?

  2. Do we support Spark local? Need an example for that. Do we still start a Ray cluster in this case?

override def run(): Unit = {
// Give it a chance to be gracefully killed
killPids(processToBeKill, "kill ")
Thread.sleep(2000)

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor
if (!processToBeKill.isEmply()){
  Thread.sleep(2000)
  killPids(processToBeKill, "kill -9")
}
driver_memory="1g",
driver_cores=10,
extra_executor_memory_for_ray=None,
extra_pmodule_zip=None,

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

should give it a clearer name

penv_archive=None,
master="yarn",
hadoop_user_name="root",
spark_yarn_jars=None,

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

is it spark.yarn.jars or spark.yarn.archive?

sc = self._init_yarn(hadoop_conf=hadoop_conf,
spark_yarn_jars=spark_yarn_jars,
penv_archive=penv_archive,
python_zip_file=extra_pmodule_zip,

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

why different names? make sure _init_yarn matches init_spark_on_yarn for their parameter lists

command = " --archives {}#python_env --num-executors {} " \
" --executor-cores {} --executor-memory {}".\
format(penv_archive, num_executor, executor_cores, executor_memory)
path_to_zoo_jar = get_analytics_zoo_classpath()

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

this can possibly return BigDL class path, or return nothing at all?

This comment has been minimized.

Copy link
@zhichao-li

zhichao-li May 31, 2019

Author Contributor

yes, it would either return a zoo.jar or an empty string.

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 31, 2019

Contributor

it can also possibly return bigdl.jar?

This comment has been minimized.

Copy link
@zhichao-li

zhichao-li Jun 4, 2019

Author Contributor

no, since it searches from the zoo distributed folder. Actually, I'm thinking we should throw an Exception here when the jar cannot be found then the user would aware that there's a problem here and can be fixed either by specify proper environment or reinstall Analytics-Zoo.

This comment has been minimized.

Copy link
@jason-dai

jason-dai Jun 7, 2019

Contributor

in get_analytics_zoo_classpath:

    if os.getenv("BIGDL_CLASSPATH"):
        return os.environ["BIGDL_CLASSPATH"]
return value
except Exception:
raise Exception("Size must be specified as bytes(b),"
"kibibytes(k), mebibytes(m), gibibytes(g). "

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

kilobytes, megabytes, gigabytes

self.verbose = verbose
self.labels = """--resources='{"trainer": %s, "ps": %s }' """ % (1, 1)

def gen_stop(self):

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

is it still needed?

time.sleep(self.WAITING_TIME_SEC)
return process_info

def _start_raylet(self, redis_address):

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

shall we combine the above two methods?

process_info = session_execute(command=command, env=modified_env, tag="ray_master")
JVMGuard.registerPids(process_info.pids)
process_info.node_ip = rservices.get_node_ip_address()
time.sleep(self.WAITING_TIME_SEC)

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

why sleep here?

python_bin_dir = "/".join(self.python_loc.split("/")[:-1])
return "{}/python {}/ray".format(python_bin_dir, python_bin_dir)

def gen_ray_booter(self):

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

gen_ray_start

object_store_memory=resourceToBytes(
str(object_store_memory)) if object_store_memory else None,
verbose=verbose, env=env)
self._gather_cluster_ips()

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

is this still needed?

yield task_addrs
tc.barrier()

ips = self.sc.range(0, total_cores,

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

potentially two executors can run on the same node in YARN, and they will have the same ip


def _get_num_ray_nodes(self):
if "local" in self.sc.master:
return int(re.match(r"local\[(.*)\]", self.sc.master).group(1))

This comment has been minimized.

Copy link
@jason-dai

jason-dai May 30, 2019

Contributor

what if local[*]?

@zhichao-li zhichao-li changed the title RayRunner Support init_on_yarn and RayContext Jun 5, 2019

@@ -43,7 +43,8 @@ if [[ $py_version == *"Python 2.7"* ]]; then
--ignore=../test/zoo/pipeline/api/keras2/ \
--ignore=../test/zoo/pipeline/autograd/ \
--ignore=../test/zoo/pipeline/inference/ \
--ignore=../test/zoo/tfpark/test_text_models.py
--ignore=../test/zoo/tfpark/test_text_models.py \
--ignore=../test/zoo/ray/

This comment has been minimized.

Copy link
@zhichao-li

zhichao-li Jun 5, 2019

Author Contributor

Disable this and manually test it for now. Would add it back once the environment on Jenkins is ready.

@@ -111,56 +149,28 @@ def _yarn_opt(jars):
format(get_analytics_zoo_classpath())
return command

def _submit_opt(master):
def _submit_opt():
conf = {
"spark.driver.memory": driver_memory,
"spark.driver.cores": driver_cores,
"spark.scheduler.minRegisterreResourcesRatio": "1.0"}
# "spark.task.cpus": executor_cores}

This comment has been minimized.

Copy link
@jason-dai

jason-dai Jun 7, 2019

Contributor

remove this line

python_location=python_location)


def init_on_yarn(hadoop_conf,

This comment has been minimized.

Copy link
@jason-dai

jason-dai Jun 7, 2019

Contributor

init_spark_on_yarn? Does it support yarn-client or yarn-cluster?

This comment has been minimized.

Copy link
@zhichao-li

zhichao-li Jun 10, 2019

Author Contributor

only support yarn-client for now. would add doc for that.

@@ -20,6 +20,81 @@
import os


def init_on_local(cores=2, conf=None, python_location=None, spark_log_level="WARN",

This comment has been minimized.

Copy link
@jason-dai

jason-dai Jun 7, 2019

Contributor

init_spark_on_local?

self.ray_processesMonitor = None
self.verbose = verbose
self.redis_port = self._new_port() if not redis_port else redis_port
self.ray_context = RayServiceFuncGenerator(

This comment has been minimized.

Copy link
@jason-dai

jason-dai Jun 7, 2019

Contributor

ray_service instead of ray_context

return value
except Exception:
raise Exception("Size must be specified as bytes(b),"
"kilobytes(k), megabytes(m), gibabytes(g). "

This comment has been minimized.

Copy link
@jason-dai

jason-dai Jun 7, 2019

Contributor

gigabytes

override def run(): Unit = {
// Give it a chance to be gracefully killed
killPids(processToBeKill, "kill ")
if (processToBeKill.isEmpty) {

This comment has been minimized.

Copy link
@jason-dai

jason-dai Jun 7, 2019

Contributor

if (!processToBeKill.isEmpty)

self.ray_processesMonitor.clean_fn()
self.stopped = True

def raw_stop(self):

This comment has been minimized.

Copy link
@jason-dai

jason-dai Jun 7, 2019

Contributor

purge?

@zhichao-li zhichao-li changed the title Support init_on_yarn and RayContext Support init_spark_on_yarn and RayContext Jun 10, 2019

@zhichao-li

This comment has been minimized.

Copy link
Contributor Author

commented Jun 10, 2019

Build # 2356

@zhichao-li zhichao-li merged commit fe6b03f into intel-analytics:master Jun 10, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.