Skip to content
Permalink
Browse files

We should search and find the necessary jars from python env rather t…

…han upload them again (#1460)

* fix path

* jars
  • Loading branch information...
zhichao-li committed Jun 17, 2019
1 parent 31e7aed commit 66bbec8eafc2a61ce838a1a7e0e7dc501e6c245e
Showing with 47 additions and 24 deletions.
  1. +3 −1 pyzoo/zoo/common/nncontext.py
  2. +44 −23 pyzoo/zoo/ray/util/spark.py
@@ -53,6 +53,7 @@ def init_spark_on_yarn(hadoop_conf,
spark_yarn_archive=None,
spark_log_level="WARN",
redirect_spark_log=True,
jars=None,
spark_conf=None):
"""
Create a SparkContext with Zoo configuration on Yarn cluster on "Yarn-client" mode.
@@ -74,6 +75,7 @@ def init_spark_on_yarn(hadoop_conf,
:param hadoop_user_name: User name for running in yarn cluster. Default value is: root
:param spark_log_level: Log level of Spark
:param redirect_spark_log: Direct the Spark log to local file or not.
:param jars: Comma-separated list of jars to include on the driver and executor classpaths.
:param spark_conf: You can append extra spark conf here in key value format.
i.e spark_conf={"spark.executor.extraJavaOptions": "-XX:+PrintGCDetails"}
:return: SparkContext
@@ -94,7 +96,7 @@ def init_spark_on_yarn(hadoop_conf,
penv_archive=penv_archive,
hadoop_user_name=hadoop_user_name,
spark_yarn_archive=spark_yarn_archive,
jars=None,
jars=jars,
spark_conf=spark_conf)
return sc

@@ -15,10 +15,11 @@
#

import os
import glob

from pyspark import SparkContext

from zoo.common.nncontext import get_analytics_zoo_conf, init_spark_conf
from zoo.common.nncontext import init_spark_conf

from zoo import init_nncontext

@@ -27,10 +28,10 @@ class SparkRunner():
def __init__(self, spark_log_level="WARN", redirect_spark_log=True):
self.spark_log_level = spark_log_level
self.redirect_spark_log = redirect_spark_log
self.PYTHON_ENV = "python_env"
with SparkContext._lock:
if SparkContext._active_spark_context:
raise Exception("There's existing SparkContext. Please close it first.")

import pyspark
print("Current pyspark location is : {}".format(pyspark.__file__))

@@ -72,7 +73,7 @@ def _pack_conda_main(self, args):
def pack_penv(self, conda_name):
import tempfile
tmp_dir = tempfile.mkdtemp()
tmp_path = "{}/python_env.tar.gz".format(tmp_dir)
tmp_path = "{}/{}.tar.gz".format(tmp_dir, self.PYTHON_ENV)
print("Start to pack current python env")
self._pack_conda_main(["--output", tmp_path, "--n-threads", "8", "--name", conda_name])
print("Packing has been completed: {}".format(tmp_path))
@@ -98,21 +99,34 @@ def _detect_python_location(self):
"Cannot detect current python location. Please set it manually by python_location")
return process_info.out

def _gather_essential_jars(self):
def _get_bigdl_jar_name_on_driver(self):
from bigdl.util.engine import get_bigdl_classpath
from zoo.util.engine import get_analytics_zoo_classpath
bigdl_classpath = get_bigdl_classpath()
zoo_classpath = get_analytics_zoo_classpath()
assert bigdl_classpath, "Cannot find bigdl classpath"
return bigdl_classpath.split("/")[-1]

def _get_zoo_jar_name_on_driver(self):
from zoo.util.engine import get_analytics_zoo_classpath
zoo_classpath = get_analytics_zoo_classpath()
assert zoo_classpath, "Cannot find Analytics-Zoo classpath"
if bigdl_classpath == zoo_classpath:
return [zoo_classpath]
else:
return [zoo_classpath, bigdl_classpath]
return zoo_classpath.split("/")[-1]

def _assemble_zoo_classpath_for_executor(self):
conda_env_path = "/".join(self._detect_python_location().split("/")[:-2])
python_interpreters = glob.glob("{}/lib/python*".format(conda_env_path))
assert len(python_interpreters) == 1, \
"Conda env should contain a single python, but got: {}:".format(python_interpreters)
python_interpreter_name = python_interpreters[0].split("/")[-1]
prefix = "{}/lib/{}/site-packages/".format(self.PYTHON_ENV, python_interpreter_name)
return ["{}/zoo/share/lib/{}".format(prefix,
self._get_zoo_jar_name_on_driver()),
"{}/bigdl/share/lib/{}".format(prefix,
self._get_bigdl_jar_name_on_driver())
]

def init_spark_on_local(self, cores, conf=None, python_location=None):
print("Start to getOrCreate SparkContext")
os.environ['PYSPARK_PYTHON'] =\
os.environ['PYSPARK_PYTHON'] = \
python_location if python_location else self._detect_python_location()
master = "local[{}]".format(cores)
zoo_conf = init_spark_conf().setMaster(master)
@@ -140,20 +154,17 @@ def init_spark_on_yarn(self,
jars=None):
os.environ["HADOOP_CONF_DIR"] = hadoop_conf
os.environ['HADOOP_USER_NAME'] = hadoop_user_name
os.environ['PYSPARK_PYTHON'] = "python_env/bin/python"
os.environ['PYSPARK_PYTHON'] = "{}/bin/python".format(self.PYTHON_ENV)

def _yarn_opt(jars):
command = " --archives {}#python_env --num-executors {} " \
" --executor-cores {} --executor-memory {}".\
format(penv_archive, num_executor, executor_cores, executor_memory)
jars_list = self._gather_essential_jars()
if jars:
jars_list.append(jars)
command = " --archives {}#{} --num-executors {} " \
" --executor-cores {} --executor-memory {}". \
format(penv_archive, self.PYTHON_ENV, num_executor, executor_cores, executor_memory)

if extra_python_lib:
command = command + " --py-files {} ".format(extra_python_lib)

command = command + " --jars {}".format(",".join(jars_list))
if jars:
command = command + " --jars {}".format(jars)
return command

def _submit_opt():
@@ -176,9 +187,19 @@ def _submit_opt():
pack_env = True

submit_args, conf = _submit_opt()
if spark_conf:
for item in spark_conf.items():
conf[str(item[0])] = str(item[1])

if not spark_conf:
spark_conf = {}
zoo_bigdl_path_on_executor = ":".join(self._assemble_zoo_classpath_for_executor())

if "spark.executor.extraClassPath" in spark_conf:
spark_conf["spark.executor.extraClassPath"] = "{}:{}".format(
zoo_bigdl_path_on_executor, spark_conf["spark.executor.extraClassPath"])
else:
spark_conf["spark.executor.extraClassPath"] = zoo_bigdl_path_on_executor

for item in spark_conf.items():
conf[str(item[0])] = str(item[1])
sc = self._create_sc(submit_args, conf)
finally:
if conda_name and penv_archive and pack_env:

0 comments on commit 66bbec8

Please sign in to comment.
You can’t perform that action at this time.