Skip to content

Commit

Permalink
Initial changes for remote kernel support
Browse files Browse the repository at this point in the history
Closes #1
  • Loading branch information
kevin-bates authored and lresende committed May 4, 2017
1 parent ee40f73 commit 9c342c2
Show file tree
Hide file tree
Showing 6 changed files with 393 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ target/

.DS_Store
.ipynb_checkpoints/

# PyCharm
.idea/
16 changes: 12 additions & 4 deletions kernel_gateway/gatewayapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
except ImportError:
from urllib.parse import urlparse

from traitlets import Unicode, Integer, default, observe
from traitlets import Unicode, Integer, default, observe, CaselessStrEnum

from jupyter_core.application import JupyterApp, base_aliases
from jupyter_client.kernelspec import KernelSpecManager
Expand All @@ -27,12 +27,12 @@

from tornado import httpserver
from tornado import web
from tornado.log import enable_pretty_logging
from tornado.log import enable_pretty_logging, LogFormatter

from notebook.notebookapp import random_ports
from ._version import __version__
from .services.sessions.sessionmanager import SessionManager
from .services.kernels.manager import SeedingMappingKernelManager
from .services.kernels.remotemanager import RemoteMappingKernelManager

# Only present for generating help documentation
from .notebook_http import NotebookHTTPPersonality
Expand Down Expand Up @@ -75,6 +75,13 @@ class KernelGatewayApp(JupyterApp):
# Enable some command line shortcuts
aliases = aliases

_log_formatter_cls = LogFormatter

@default('log_format')
def _default_log_format(self):
"""override default log format to include time"""
return u"%(color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(end_color)s %(message)s"

# Server IP / PORT binding
port_env = 'KG_PORT'
port_default_value = 8888
Expand Down Expand Up @@ -366,7 +373,8 @@ def init_configurables(self):
kwargs = {}
if self.default_kernel_name:
kwargs['default_kernel_name'] = self.default_kernel_name
self.kernel_manager = SeedingMappingKernelManager(

self.kernel_manager = RemoteMappingKernelManager(
parent=self,
log=self.log,
connection_dir=self.runtime_dir,
Expand Down
2 changes: 2 additions & 0 deletions kernel_gateway/services/kernels/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def start_kernel(self, *args, **kwargs):
client.stop_channels()
raise gen.Return(kernel_id)


class KernelGatewayIOLoopKernelManager(IOLoopKernelManager):
"""Extends the IOLoopKernelManager used by the SeedingMappingKernelManager.
Expand All @@ -119,6 +120,7 @@ class KernelGatewayIOLoopKernelManager(IOLoopKernelManager):
KG_AUTH_TOKEN from the environment variables passed to the kernel when it
starts.
"""

def _launch_kernel(self, kernel_cmd, **kw):
env = kw['env']
env['KERNEL_GATEWAY'] = '1'
Expand Down
253 changes: 253 additions & 0 deletions kernel_gateway/services/kernels/processproxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
"""Kernel managers that operate against a remote process."""

import os
import signal
import abc
import json
import socket
import paramiko

This comment has been minimized.

Copy link
@ckadner

ckadner May 17, 2017

Collaborator

@kevin-bates -- should we add paramiko>=2.1.2 to requirements.txt and setup.py

Otherwise IntelliJ/PyCharm will not pick up on the dependency, i.e. PyChram will not suggest to install it into the project's (virtual) environment

import logging
import time
from ipython_genutils.py3compat import with_metaclass
from socket import *


#
logging.getLogger('paramiko').setLevel(os.getenv('ELYRA_SSH_LOG_LEVEL', logging.WARNING))


class BaseProcessProxyABC(with_metaclass(abc.ABCMeta, object)):
"""Process Proxy ABC.
Defines the required methods for process proxy classes
"""

@abc.abstractmethod
def poll(self):
pass

@abc.abstractmethod
def wait(self):
pass

@abc.abstractmethod
def send_signal(self, signum):
pass

@abc.abstractmethod
def kill(self):
pass


class StandaloneProcessProxy(BaseProcessProxyABC):

# FIXME - properly deal with ip, username and password
ip = os.getenv('ELYRA_REMOTE_HOST', 'localhost')
username = os.getenv('ELYRA_REMOTE_USER')
password = os.getenv('ELYRA_REMOTE_PWD') # this should use password-less ssh
pid = 0
kernel_manager = None
remote_connection_file = None

def __init__(self, kernel_manager, cmd, **kw):

self.kernel_manager = kernel_manager
self.kernel_manager.ip = gethostbyname(self.ip) # convert to ip if host is provided
# save off connection file name for cleanup later
self.remote_connection_file = kernel_manager.remote_connection_file
# write out connection file - which has the remote IP - prior to copy...
self.kernel_manager.cleanup_connection_file()
self.kernel_manager.write_connection_file()

cmd = self.build_startup_command(cmd)
self.kernel_manager.log.debug('Invoking cmd: {}'.format(cmd))
result_pid = 'bad_pid' # purposely initialize to bad int value
result = self.rsh(cmd, self.kernel_manager.connection_file, self.kernel_manager.remote_connection_file)
for line in result:
result_pid = line.strip()

try:
self.pid = int(result_pid)
except ValueError:
raise RuntimeError("Failure occurred starting remote kernel on '{}'. Returned result: {}"
.format(self.ip, result))

self.kernel_manager.log.info("Remote kernel launched on '{}', pid={}"
.format(self.kernel_manager.ip, self.pid))

def poll(self):
result = self.remote_signal(0)
# self.kernel_manager.log.debug('StandaloneProcessProxy.poll: {}'.format(result))
return result

def wait(self):
poll_interval = 0.2
wait_time = 5.0
for i in range(int(wait_time/poll_interval)):
if self.poll():
time.sleep(poll_interval)
else:
break
else:
self.kernel_manager.log.warning("Wait timeout of 5 seconds exhausted. Continuing...")

def send_signal(self, signum):
result = self.remote_signal(signum)
self.kernel_manager.log.debug("StandaloneProcessProxy.send_signal({}): {}".format(signum, result))
return result

def kill(self):
result = self.remote_signal(signal.SIGKILL)
self.kernel_manager.log.debug("StandaloneProcessProxy.kill: {}".format(result))
return result

def remote_signal(self, signum):
val = None
# Use a negative signal number to signal process group
cmd = 'kill -{} {}; echo $?'.format(signum, self.pid)
result = self.rsh(cmd)
for line in result:
val = line.strip()
if val == '0':
return None
return False

def cleanup(self):
cmd = 'rm -f {}; echo $?'.format(self.remote_connection_file)
result = self.rsh(cmd)
for line in result:
val = line.strip()
if val == '0':
return None
return False

def build_startup_command(self, argv_cmd):
"""
Builds the command to invoke by concatenating envs from kernelspec followed by the kernel argvs.
We also force nohup, redirection to a file and place in background, then follow with an echo
for the background pid.
"""
cmd = ''
for key, value in self.kernel_manager.kernel_spec.env.items():
cmd += 'export {}={};'.format(key,json.dumps(value))

# Add additional envs not in kernelspec...
username = os.getenv('KERNEL_USERNAME')
if username is not None:
cmd += 'export KERNEL_USERNAME="{}";'.format(username)

cmd += 'nohup'
for arg in argv_cmd:
cmd += ' {}'.format(arg)

cmd += ' >> /var/log/jnbg/remote_launch.log 2>&1 & echo $!'

return cmd

def rsh(self, command, srcFile=None, dstFile=None):

ssh = None
try:
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

if len(self.password) > 0:
ssh.connect(self.ip, port=22, username=self.username, password=self.password)
else:
ssh.connect(self.ip, port=22, username=self.username)

if srcFile is not None and dstFile is not None:
try:
self.kernel_manager.log.debug("Copying file '{}' to file '{}@{}' ..."
.format(srcFile, self.ip, dstFile))
sftp = ssh.open_sftp()
sftp.put(srcFile, dstFile)
except Exception as e:
self.kernel_manager.log.error(
"Exception '{}' occurred attempting to copy file '{}' "
"to '{}' on '{}' with user '{}', message='{}'"
.format(type(e).__name__, srcFile, dstFile, self.ip, self.username, e))
raise e

stdin, stdout, stderr = ssh.exec_command(command, timeout=30)
lines = stdout.readlines()
if len(lines) == 0: # if nothing in stdout, return stderr
lines = stderr.readlines()

except Exception as e:
self.kernel_manager.log.error(
"Exception '{}' occurred attempting to connect to '{}' with user '{}', message='{}'"
.format(type(e).__name__, self.ip, self.username, e))
raise e

finally:
if ssh is not None:
ssh.close()

return lines


class YarnProcessProxy(BaseProcessProxyABC):

application_id = None

def __init__(self, kernel_manager, kernel_cmd, **kw):
self.kernel_manager = kernel_manager

self.pre_launch_kernel(kernel_cmd, **kw)

self.post_launch_kernel(kernel_cmd, **kw)

def poll(self):
self.kernel_manager.log.debug("YarnProcessProxy.poll")

def wait(self):
self.kernel_manager.log.debug("YarnProcessProxy.wait")

def send_signal(self, signum):
self.kernel_manager.log.debug("YarnProcessProxy.send_signal {}".format(signum))

def kill(self):
self.kernel_manager.log.debug("YarnProcessProxy.kill")


def pre_launch_kernel(self, kernel_cmd, **kw):
""" Asks Yarn for an application ID and extends kernel_cmd with --yarnAppId parameter that
run.sh knows how to interpret and use.
"""
self.kernel_manager.log.debug(
"YarnProcessProxy.pre_launch_kernel.connection_info: {}"
.format(self.kernel_manager.get_connection_info()))

self.get_yarn_application_id(kernel_cmd, **kw)
# PROTOTYPE - HOOK UP ELYRA to REMOTE KERNEL...
self.kernel_manager.ip = gethostbyname(os.getenv('ELYRA_REMOTE_HOST', 'fwiw1.fyre.ibm.com'))
self.kernel_manager.stdin_port = int(os.getenv('ELYRA_TEST_RM_STDIN', '56759'))
self.kernel_manager.iopub_port = int(os.getenv('ELYRA_TEST_RM_IOPUB', '56758'))
self.kernel_manager.shell_port = int(os.getenv('ELYRA_TEST_RM_SHELL', '56757'))
self.kernel_manager.hb_port = int(os.getenv('ELYRA_TEST_RM_HB', '56761'))
self.kernel_manager.control_port = int(os.getenv('ELYRA_TEST_RM_CONTROL', '56760'))
self.kernel_manager.session.key = b'' # FIXME

def post_launch_kernel(self, kernel_cmd, **kw):
self.kernel_manager.log.debug(
"YarnProcessProxy.post_launch_kernel.connection_info: {}"
.format(self.kernel_manager.get_connection_info()))


def get_yarn_application_id(self, kernel_cmd, **kw):
"""
Invokes the Yarn API to obtain a new application id that will be conveyed to the kernel when
launching cluster-managed kernel.
"""
if self.application_id is None:
self.application_id = os.getenv('ELYRA_TEST_APP_ID', 'application_1492445751293_0018')

if kernel_cmd is not None:
kernel_cmd.extend(['--yarnAppId', self.application_id])

return self.application_id
Loading

0 comments on commit 9c342c2

Please sign in to comment.