Skip to content
Permalink
Browse files

Fix #935 broken extreme scale executor, and add a regression test (#936)

* Add a CI test for exex - broken, see issue #916

* install flake8 from test deps, extreme_scale deps; use pip not setup.py

* Make mpi worker pool report number of workers, to fix #916
  • Loading branch information...
benclifford authored and yadudoc committed May 23, 2019
1 parent 3d0bbf2 commit 7e54e7d842b3821312eb01ab49d2158375a9bca0
Showing with 91 additions and 2 deletions.
  1. +1 −2 .travis.yml
  2. +3 −0 parsl/executors/extreme_scale/mpi_worker_pool.py
  3. +87 −0 parsl/tests/sites/test_local_exex.py
@@ -29,8 +29,7 @@ before_install:
# command to install dependencies
install:
- pip install -r requirements.txt
- pip install flake8
- python setup.py install
- pip install .[extreme_scale]

# Os tests
os:
@@ -99,6 +99,9 @@ def create_reg_message(self):
'os': platform.system(),
'hname': platform.node(),
'dir': os.getcwd(),
'prefetch_capacity': 0,
'worker_count': (self.comm.size - 1),
'max_capacity': (self.comm.size - 1) + 0, # (+prefetch)
}
b_msg = json.dumps(msg).encode('utf-8')
return b_msg
@@ -0,0 +1,87 @@
import argparse

import pytest

import parsl
from parsl.dataflow.dflow import DataFlowKernel
from parsl.app.app import App
from parsl.tests.conftest import load_dfk
from parsl.tests.configs.exex_local import config

parsl.clear()
dfk = DataFlowKernel(config=config)
parsl.set_stream_logger()

import logging
logger = logging.getLogger(__name__)


@App("python", dfk, executors=['Extreme_Local'])
def python_app_2():
import os
import threading
import time
time.sleep(1)
return "Hello from PID[{}] TID[{}]".format(os.getpid(), threading.current_thread())


@App("python", dfk, executors=['Extreme_Local'])
def python_app_1():
import os
import threading
import time
time.sleep(1)
return "Hello from PID[{}] TID[{}]".format(os.getpid(), threading.current_thread())


@App("bash", dfk)
def bash_app(stdout=None, stderr=None):
return 'echo "Hello from $(uname -a)" ; sleep 2'


@pytest.mark.local
def test_python(N=2):
"""Testing basic python functionality."""

r1 = {}
r2 = {}
for i in range(0, N):
r1[i] = python_app_1()
r2[i] = python_app_2()
print("Waiting ....")

for x in r1:
print("python_app_1 : ", r1[x].result())
for x in r2:
print("python_app_2 : ", r2[x].result())

return


@pytest.mark.local
def test_bash():
"""Testing basic bash functionality."""

import os
fname = os.path.basename(__file__)

x = bash_app(stdout="{0}.out".format(fname))
print("Waiting ....")
print(x.result())


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--num", default=10,
help="Count of apps to launch")
parser.add_argument("-d", "--debug", action='store_true',
help="Count of apps to launch")
parser.add_argument("-c", "--config", default='local',
help="Path to configuration file to run")
args = parser.parse_args()
load_dfk(args.config)
if args.debug:
parsl.set_stream_logger()

test_python()
test_bash()

0 comments on commit 7e54e7d

Please sign in to comment.
You can’t perform that action at this time.