Skip to content

Commit

Permalink
Hooks for R bindings (#263)
Browse files Browse the repository at this point in the history
Hooks for R support for Metaflow

Co-authored-by: Savin <[email protected]>
  • Loading branch information
jasonge27 and savingoyal authored Aug 4, 2020
1 parent 4d5954e commit 6e63acf
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 14 deletions.
114 changes: 114 additions & 0 deletions metaflow/R.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import os
import imp
from tempfile import NamedTemporaryFile

from .util import to_bytes

R_FUNCTIONS = {}
R_PACKAGE_PATHS = None
RDS_FILE_PATH = None
R_CONTAINER_IMAGE = None
METAFLOW_R_VERSION = None
R_VERSION = None
R_VERSION_CODE = None

def call_r(func_name, args):
R_FUNCTIONS[func_name](*args)

def get_r_func(func_name):
return R_FUNCTIONS[func_name]

def package_paths():
if R_PACKAGE_PATHS is not None:
root = R_PACKAGE_PATHS['package']
prefixlen = len('%s/' % root.rstrip('/'))
for path, dirs, files in os.walk(R_PACKAGE_PATHS['package']):
if '/.' in path:
continue
for fname in files:
if fname[0] == '.':
continue
p = os.path.join(path, fname)
yield p, os.path.join('metaflow-r', p[prefixlen:])
flow = R_PACKAGE_PATHS['flow']
yield flow, os.path.basename(flow)

def entrypoint():
return 'PYTHONPATH=/root/metaflow R_LIBS_SITE=`Rscript -e \'cat(paste(.libPaths(), collapse=\\":\\"))\'`:metaflow/ Rscript metaflow-r/run_batch.R --flowRDS=%s' % RDS_FILE_PATH

def use_r():
return R_PACKAGE_PATHS is not None

def container_image():
return R_CONTAINER_IMAGE

def metaflow_r_version():
return METAFLOW_R_VERSION

def r_version():
return R_VERSION

def r_version_code():
return R_VERSION_CODE

def working_dir():
if use_r():
return R_PACKAGE_PATHS['wd']
return None

def run(flow_script,
r_functions,
rds_file,
metaflow_args,
full_cmdline,
r_paths,
r_container_image,
metaflow_r_version,
r_version,
r_version_code):
global R_FUNCTIONS, \
R_PACKAGE_PATHS, \
RDS_FILE_PATH, \
R_CONTAINER_IMAGE, \
METAFLOW_R_VERSION, \
R_VERSION, \
R_VERSION_CODE

R_FUNCTIONS = r_functions
R_PACKAGE_PATHS = r_paths
RDS_FILE_PATH = rds_file
R_CONTAINER_IMAGE = r_container_image
METAFLOW_R_VERSION = metaflow_r_version
R_VERSION = r_version
R_VERSION_CODE = r_version_code

# there's some reticulate(?) sillyness which causes metaflow_args
# not to be a list if it has only one item. Here's a workaround
if not isinstance(metaflow_args, list):
metaflow_args = [metaflow_args]
# remove any reference to local path structure from R
full_cmdline[0] = os.path.basename(full_cmdline[0])
with NamedTemporaryFile(prefix="metaflowR.", delete=False) as tmp:
tmp.write(to_bytes(flow_script))
module = imp.load_source('metaflowR', tmp.name)
flow = module.FLOW(use_cli=False)

from . import exception
from . import cli
try:
cli.main(flow,
args=metaflow_args,
handle_exceptions=False,
entrypoint=full_cmdline[:-len(metaflow_args)])
except exception.MetaflowException as e:
cli.print_metaflow_exception(e)
os.remove(tmp.name)
os._exit(1)
except Exception as e:
import sys
print(e)
sys.stdout.flush()
os.remove(tmp.name)
os._exit(1)
finally:
os.remove(tmp.name)
6 changes: 5 additions & 1 deletion metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .pylint_wrapper import PyLint
from .event_logger import EventLogger
from .monitor import Monitor
from .R import use_r, metaflow_r_version

ERASE_TO_EOL = '\033[K'
HIGHLIGHT = 'red'
Expand Down Expand Up @@ -739,8 +740,11 @@ def start(ctx,
echo = echo_always

ctx.obj.version = metaflow_version.get_version()
version = ctx.obj.version
if use_r():
version = metaflow_r_version()

echo('Metaflow %s' % ctx.obj.version, fg='magenta', bold=True, nl=False)
echo('Metaflow %s' % version, fg='magenta', bold=True, nl=False)
echo(" executing *%s*" % ctx.obj.flow.name, fg='magenta', nl=False)
echo(" for *%s*" % resolve_identity(), fg='magenta')

Expand Down
13 changes: 11 additions & 2 deletions metaflow/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .util import get_username, to_unicode
from . import metaflow_version
from metaflow.exception import MetaflowException
from . import R

version_cache = None

Expand Down Expand Up @@ -79,7 +80,7 @@ def get_client_info(cls, flow_name, metadata):
def get_package_commands(self, code_package_url):
cmds = ["set -e",
"echo \'Setting up task environment.\'",
"%s -m pip install awscli click requests boto3 --user -qqq"
"%s -m pip install awscli click requests boto3 -qqq"
% self._python(),
"mkdir metaflow",
"cd metaflow",
Expand Down Expand Up @@ -112,14 +113,22 @@ def get_environment_info(self):
'runtime': os.environ.get('METAFLOW_RUNTIME_NAME', 'dev'),
'app': os.environ.get('APP'),
'environment_type': self.TYPE,
'use_r': R.use_r(),
'python_version': sys.version,
'python_version_code': '%d.%d.%d' % sys.version_info[:3],
'metaflow_version': version_cache,
'script': os.path.basename(os.path.abspath(sys.argv[0]))}
if R.use_r():
env['metaflow_r_version'] = R.metaflow_r_version()
env['r_version'] = R.r_version()
env['r_version_code'] = R.r_version_code()
return env

def executable(self, step_name):
return self._python()

def _python(self):
return "python"
if R.use_r():
return "python3"
else:
return "python"
4 changes: 4 additions & 0 deletions metaflow/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,10 @@ def _tags(self):
'date:' + datetime.utcnow().strftime('%Y-%m-%d')]
if env['metaflow_version']:
tags.append('metaflow_version:' + env['metaflow_version'])
if 'metaflow_r_version' in env:
tags.append('metaflow_r_version:' + env['metaflow_r_version'])
if 'r_version_code' in env:
tags.append('r_version:' + env['r_version_code'])
return tags

def _register_code_package_metadata(self, run_id, step_name, task_id):
Expand Down
19 changes: 14 additions & 5 deletions metaflow/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from itertools import chain

from .util import to_unicode
from . import R

try:
# python2
Expand All @@ -16,7 +17,7 @@
import io
BytesIO = io.BytesIO

DEFAULT_SUFFIXES = ['.py']
DEFAULT_SUFFIXES = ['.py', '.R']


class MetaflowPackage(object):
Expand Down Expand Up @@ -61,10 +62,18 @@ def path_tuples(self):
# the package folders for environment
for path_tuple in self.environment.add_to_package():
yield path_tuple
# the user's working directory
flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + '/'
for path_tuple in self._walk(flowdir):
yield path_tuple
if R.use_r():
# the R working directory
for path_tuple in self._walk('%s/' % R.working_dir()):
yield path_tuple
# the R package
for path_tuple in R.package_paths():
yield path_tuple
else:
# the user's working directory
flowdir = os.path.dirname(os.path.abspath(sys.argv[0])) + '/'
for path_tuple in self._walk(flowdir):
yield path_tuple

def _add_info(self, tar):
info = tarfile.TarInfo('INFO')
Expand Down
11 changes: 8 additions & 3 deletions metaflow/plugins/aws/batch/batch_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from metaflow.datastore.util.s3util import get_s3_client
from metaflow.metaflow_config import DATASTORE_LOCAL_DIR
from metaflow import util
from metaflow import R
from metaflow.exception import (
CommandException,
METAFLOW_EXIT_DISALLOW_RETRY,
Expand Down Expand Up @@ -180,9 +181,13 @@ def echo(batch_id, msg, stream=sys.stdout):
if ctx.obj.datastore.datastore_root is None:
ctx.obj.datastore.datastore_root = ctx.obj.datastore.get_datastore_root_from_config(echo)

if executable is None:
executable = ctx.obj.environment.executable(step_name)
entrypoint = "%s -u %s" % (executable, os.path.basename(sys.argv[0]))
if R.use_r():
entrypoint = R.entrypoint()
else:
if executable is None:
executable = ctx.obj.environment.executable(step_name)
entrypoint = '%s -u %s' % (executable,
os.path.basename(sys.argv[0]))

top_args = " ".join(util.dict_to_cli_options(ctx.parent.parent.params))

Expand Down
11 changes: 8 additions & 3 deletions metaflow/plugins/aws/batch/batch_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from metaflow.metadata import MetaDatum

from metaflow import util
from metaflow import R

from .batch import Batch, BatchException
from metaflow.metaflow_config import ECS_S3_ACCESS_IAM_ROLE, BATCH_JOB_QUEUE, \
Expand Down Expand Up @@ -110,8 +111,11 @@ def __init__(self, attributes=None, statically_defined=False):
if BATCH_CONTAINER_IMAGE:
self.attributes['image'] = BATCH_CONTAINER_IMAGE
else:
self.attributes['image'] = 'python:%s.%s' % (platform.python_version_tuple()[0],
platform.python_version_tuple()[1])
if R.use_r():
self.attributes['image'] = R.container_image()
else:
self.attributes['image'] = 'python:%s.%s' % (platform.python_version_tuple()[0],
platform.python_version_tuple()[1])
if not BatchDecorator._get_registry(self.attributes['image']):
if BATCH_CONTAINER_REGISTRY:
self.attributes['image'] = '%s/%s' % (BATCH_CONTAINER_REGISTRY.rstrip('/'),
Expand Down Expand Up @@ -156,7 +160,8 @@ def runtime_step_cli(self, cli_args, retry_count, max_user_code_retries):
cli_args.command_args.append(self.package_url)
cli_args.command_options.update(self.attributes)
cli_args.command_options['run-time-limit'] = self.run_time_limit
cli_args.entrypoint[0] = sys.executable
if not R.use_r():
cli_args.entrypoint[0] = sys.executable

def task_pre_step(self,
step_name,
Expand Down

0 comments on commit 6e63acf

Please sign in to comment.