Join GitHub today
GitHub is home to over 36 million developers working together to host and review code, manage projects, and build software together.
Sign upSupport init_spark_on_yarn and RayContext #1344
Conversation
zhichao-li
force-pushed the
zhichao-li:nohome
branch
from
56a44a9
to
80c43fb
May 21, 2019
zhichao-li
added some commits
May 22, 2019
zhichao-li
reviewed
May 23, 2019
@@ -137,4 +139,39 @@ class PythonZooNet[T: ClassTag](implicit ev: TensorNumeric[T]) extends PythonZoo | |||
toJSample(x).asInstanceOf[RDD[JSample[Float]]], batchSize) | |||
} | |||
|
|||
val processToBeKill = new ArrayList[String]() |
This comment has been minimized.
This comment has been minimized.
zhichao-li
May 23, 2019
Author
Contributor
switch this to a copy on write array and avoid using locking here.
zhichao-li
added some commits
May 28, 2019
zhichao-li
changed the title
[WIP] RayRunner
RayRunner
May 30, 2019
This comment has been minimized.
This comment has been minimized.
|
jason-dai
reviewed
May 30, 2019
override def run(): Unit = { | ||
// Give it a chance to be gracefully killed | ||
killPids(processToBeKill, "kill ") | ||
Thread.sleep(2000) |
This comment has been minimized.
This comment has been minimized.
jason-dai
May 30, 2019
Contributor
if (!processToBeKill.isEmply()){
Thread.sleep(2000)
killPids(processToBeKill, "kill -9")
}
jason-dai
reviewed
May 30, 2019
driver_memory="1g", | ||
driver_cores=10, | ||
extra_executor_memory_for_ray=None, | ||
extra_pmodule_zip=None, |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
May 30, 2019
penv_archive=None, | ||
master="yarn", | ||
hadoop_user_name="root", | ||
spark_yarn_jars=None, |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
May 30, 2019
sc = self._init_yarn(hadoop_conf=hadoop_conf, | ||
spark_yarn_jars=spark_yarn_jars, | ||
penv_archive=penv_archive, | ||
python_zip_file=extra_pmodule_zip, |
This comment has been minimized.
This comment has been minimized.
jason-dai
May 30, 2019
Contributor
why different names? make sure _init_yarn
matches init_spark_on_yarn
for their parameter lists
jason-dai
reviewed
May 30, 2019
command = " --archives {}#python_env --num-executors {} " \ | ||
" --executor-cores {} --executor-memory {}".\ | ||
format(penv_archive, num_executor, executor_cores, executor_memory) | ||
path_to_zoo_jar = get_analytics_zoo_classpath() |
This comment has been minimized.
This comment has been minimized.
jason-dai
May 30, 2019
Contributor
this can possibly return BigDL class path, or return nothing at all?
This comment has been minimized.
This comment has been minimized.
zhichao-li
May 31, 2019
Author
Contributor
yes, it would either return a zoo.jar or an empty string.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
zhichao-li
Jun 4, 2019
Author
Contributor
no, since it searches from the zoo distributed folder. Actually, I'm thinking we should throw an Exception here when the jar cannot be found then the user would aware that there's a problem here and can be fixed either by specify proper environment or reinstall Analytics-Zoo.
This comment has been minimized.
This comment has been minimized.
jason-dai
Jun 7, 2019
Contributor
in get_analytics_zoo_classpath
:
if os.getenv("BIGDL_CLASSPATH"):
return os.environ["BIGDL_CLASSPATH"]
jason-dai
reviewed
May 30, 2019
return value | ||
except Exception: | ||
raise Exception("Size must be specified as bytes(b)," | ||
"kibibytes(k), mebibytes(m), gibibytes(g). " |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
May 30, 2019
self.verbose = verbose | ||
self.labels = """--resources='{"trainer": %s, "ps": %s }' """ % (1, 1) | ||
|
||
def gen_stop(self): |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
May 30, 2019
time.sleep(self.WAITING_TIME_SEC) | ||
return process_info | ||
|
||
def _start_raylet(self, redis_address): |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
May 30, 2019
process_info = session_execute(command=command, env=modified_env, tag="ray_master") | ||
JVMGuard.registerPids(process_info.pids) | ||
process_info.node_ip = rservices.get_node_ip_address() | ||
time.sleep(self.WAITING_TIME_SEC) |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
May 30, 2019
python_bin_dir = "/".join(self.python_loc.split("/")[:-1]) | ||
return "{}/python {}/ray".format(python_bin_dir, python_bin_dir) | ||
|
||
def gen_ray_booter(self): |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
May 30, 2019
object_store_memory=resourceToBytes( | ||
str(object_store_memory)) if object_store_memory else None, | ||
verbose=verbose, env=env) | ||
self._gather_cluster_ips() |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
May 30, 2019
yield task_addrs | ||
tc.barrier() | ||
|
||
ips = self.sc.range(0, total_cores, |
This comment has been minimized.
This comment has been minimized.
jason-dai
May 30, 2019
Contributor
potentially two executors can run on the same node in YARN, and they will have the same ip
jason-dai
reviewed
May 30, 2019
|
||
def _get_num_ray_nodes(self): | ||
if "local" in self.sc.master: | ||
return int(re.match(r"local\[(.*)\]", self.sc.master).group(1)) |
This comment has been minimized.
This comment has been minimized.
zhichao-li
added some commits
May 31, 2019
zhichao-li
changed the title
RayRunner
Support init_on_yarn and RayContext
Jun 5, 2019
zhichao-li
added some commits
Jun 5, 2019
zhichao-li
reviewed
Jun 5, 2019
@@ -43,7 +43,8 @@ if [[ $py_version == *"Python 2.7"* ]]; then | |||
--ignore=../test/zoo/pipeline/api/keras2/ \ | |||
--ignore=../test/zoo/pipeline/autograd/ \ | |||
--ignore=../test/zoo/pipeline/inference/ \ | |||
--ignore=../test/zoo/tfpark/test_text_models.py | |||
--ignore=../test/zoo/tfpark/test_text_models.py \ | |||
--ignore=../test/zoo/ray/ |
This comment has been minimized.
This comment has been minimized.
zhichao-li
Jun 5, 2019
Author
Contributor
Disable this and manually test it for now. Would add it back once the environment on Jenkins is ready.
zhichao-li
added some commits
Jun 5, 2019
jason-dai
reviewed
Jun 7, 2019
@@ -111,56 +149,28 @@ def _yarn_opt(jars): | |||
format(get_analytics_zoo_classpath()) | |||
return command | |||
|
|||
def _submit_opt(master): | |||
def _submit_opt(): | |||
conf = { | |||
"spark.driver.memory": driver_memory, | |||
"spark.driver.cores": driver_cores, | |||
"spark.scheduler.minRegisterreResourcesRatio": "1.0"} | |||
# "spark.task.cpus": executor_cores} |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
Jun 7, 2019
python_location=python_location) | ||
|
||
|
||
def init_on_yarn(hadoop_conf, |
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
zhichao-li
Jun 10, 2019
Author
Contributor
only support yarn-client for now. would add doc for that.
jason-dai
reviewed
Jun 7, 2019
@@ -20,6 +20,81 @@ | |||
import os | |||
|
|||
|
|||
def init_on_local(cores=2, conf=None, python_location=None, spark_log_level="WARN", |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
Jun 7, 2019
self.ray_processesMonitor = None | ||
self.verbose = verbose | ||
self.redis_port = self._new_port() if not redis_port else redis_port | ||
self.ray_context = RayServiceFuncGenerator( |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
Jun 7, 2019
return value | ||
except Exception: | ||
raise Exception("Size must be specified as bytes(b)," | ||
"kilobytes(k), megabytes(m), gibabytes(g). " |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
Jun 7, 2019
override def run(): Unit = { | ||
// Give it a chance to be gracefully killed | ||
killPids(processToBeKill, "kill ") | ||
if (processToBeKill.isEmpty) { |
This comment has been minimized.
This comment has been minimized.
jason-dai
reviewed
Jun 7, 2019
self.ray_processesMonitor.clean_fn() | ||
self.stopped = True | ||
|
||
def raw_stop(self): |
This comment has been minimized.
This comment has been minimized.
zhichao-li
changed the title
Support init_on_yarn and RayContext
Support init_spark_on_yarn and RayContext
Jun 10, 2019
This comment has been minimized.
This comment has been minimized.
Build # 2356 |
zhichao-li commentedMay 21, 2019
•
edited
This patch would provide a mechanism to deploy Python dependencies and Ray services automatically across yarn cluster.
Base on
init_spark_on_yarn
andConda
, python user would be able to runAnalytics-Zoo
orRay
in a more pythonic way on yarn withpip install analytics-zoo
only without spark-submit or installing Analytics-Zoo or Ray across all cluster nodes.Example