Skip to content

Commit

Permalink
Merge pull request #315 from StackStorm/STORM-343/async_live_actions
Browse files Browse the repository at this point in the history
RFR: Fix: Use eventlet pool to dispatch live actions async
  • Loading branch information
lakshmi-kannan committed Aug 1, 2014
2 parents 897a39c + 023d822 commit ad69ae7
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 44 deletions.
3 changes: 3 additions & 0 deletions conf/stanley.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ modules_path = /opt/stackstorm/actions
user = stanley
ssh_key_file = /home/vagrant/.ssh/stanley_rsa
remote_dir = /tmp

[liveactions]
liveactions_base_url = http://localhost:9501/liveactions
9 changes: 8 additions & 1 deletion st2actioncontroller/st2actioncontroller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,16 @@

actions_opts = [
cfg.StrOpt('modules_path', default='/opt/stackstorm/actions',
help='path where action plugins are located')
help='path where action plugins are located')
]
CONF.register_opts(actions_opts, group='actions')

liveactions_opts = [
cfg.StrOpt('liveactions_base_url', default='http://localhost:9501/liveactions',
help='Base URL for live actions.')
]
CONF.register_opts(liveactions_opts, group='liveactions')


def parse_args(args=None):
CONF(args=args)
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
import datetime
import httplib
import json
import Queue

import eventlet
from pecan import abort
from pecan.rest import RestController

# TODO: Encapsulate mongoengine errors in our persistence layer. Exceptions
# that bubble up to this layer should be core Python exceptions or
# StackStorm defined exceptions.

import requests

from oslo.config import cfg
from wsme import types as wstypes
from wsme import Unset
import wsmeext.pecan as wsme_pecan

from st2common import log as logging
from st2common.exceptions.db import StackStormDBObjectNotFoundError
from st2common.util.http import HTTP_SUCCESS
from st2common.persistence.action import ActionExecution
from st2common.models.api.action import (ActionExecutionAPI,
ACTIONEXEC_STATUS_INIT,
ACTIONEXEC_STATUS_SCHEDULED,
ACTIONEXEC_STATUS_ERROR)
from st2common.util.action_db import (get_action_by_dict, get_actionexec_by_id,
update_actionexecution_status)


LOG = logging.getLogger(__name__)


LIVEACTION_ENDPOINT = 'http://localhost:9501/liveactions'
DEFAULT_LIVEACTIONS_ENDPOINT = cfg.CONF.liveactions.liveactions_base_url
MONITOR_THREAD_EMPTY_Q_SLEEP_TIME = 5
MONITOR_THREAD_NO_WORKERS_SLEEP_TIME = 1


class ActionExecutionsController(RestController):
Expand All @@ -37,6 +40,17 @@ class ActionExecutionsController(RestController):
the lifecycle of ActionExecutions in the system.
"""

def __init__(self, live_actions_ep=DEFAULT_LIVEACTIONS_ENDPOINT, live_actions_pool_size=50):
self._live_actions_ep = live_actions_ep
LOG.info('Live actions ep: %s', self._live_actions_ep)
self.live_actions_pool_size = live_actions_pool_size
self._live_actions_pool = eventlet.GreenPool(self.live_actions_pool_size)
self._threads = {}
self._live_actions = Queue.Queue()
self._live_actions_monitor_thread = eventlet.greenthread.spawn(self._drain_live_actions)
self._monitor_thread_empty_q_sleep_time = MONITOR_THREAD_EMPTY_Q_SLEEP_TIME
self._monitor_thread_no_workers_sleep_time = MONITOR_THREAD_NO_WORKERS_SLEEP_TIME

def _issue_liveaction_delete(self, actionexec_id):
"""
Destroy the LiveActions specified by actionexec_id by performing
Expand All @@ -46,7 +60,7 @@ def _issue_liveaction_delete(self, actionexec_id):
request_error = False
result = None
try:
result = requests.delete(LIVEACTION_ENDPOINT +
result = requests.delete(self._live_actions_ep +
'/?actionexecution_id=' + str(actionexec_id))
except requests.exceptions.ConnectionError as e:
LOG.error('Caught encoundered connection error while performing /liveactions/ '
Expand All @@ -56,7 +70,7 @@ def _issue_liveaction_delete(self, actionexec_id):

LOG.debug('/liveactions/ DELETE request result: %s', result)

return(result, request_error)
return (result, request_error)

def _issue_liveaction_post(self, actionexec_id):
"""
Expand All @@ -73,7 +87,7 @@ def _issue_liveaction_post(self, actionexec_id):
request_error = False
result = None
try:
result = requests.post(LIVEACTION_ENDPOINT,
result = requests.post(self._live_actions_ep,
data=json.dumps(payload), headers=custom_headers)
except requests.exceptions.ConnectionError as e:
LOG.error('Caught encoundered connection error while performing /liveactions/ POST.'
Expand All @@ -82,7 +96,7 @@ def _issue_liveaction_post(self, actionexec_id):

LOG.debug('/liveactions/ POST request result: %s', result)

return(result, request_error)
return (result, request_error)

@staticmethod
def _get_action_executions(action_id, action_name, limit=None):
Expand Down Expand Up @@ -217,38 +231,27 @@ def post(self, actionexecution):
'ActionExecution is: %s', actionexec_db)

actionexec_id = actionexec_db.id
(result, request_error) = self._issue_liveaction_post(actionexec_id)

if (not request_error) and (result.status_code in HTTP_SUCCESS):
LOG.info('/liveactions/ POST request reported successful creation of LiveAction')
# TODO: This should be "running status, or "scheduled" status when execution is async
# actionexec_status = ACTIONEXEC_STATUS_COMPLETE

# Update actionexec_db status.
# With side-effect of re-loading ActionExecution from DB.
# LOG.info('/actionexecutions/ POST update ActionExecution in DB after '
# 'LiveAction POST: %s', actionexec_db)
# actionexec_db = update_actionexecution_status(actionexec_status,
# actionexec_id=actionexec_id)

actionexec_db = get_actionexec_by_id(actionexec_db.id)
else:
LOG.info('/liveactions/ POST request reported error: %s', result.status_code)
try:
LOG.debug('Adding action exec id: %s to live actions queue.', )
self._live_actions.put(actionexec_id, block=True, timeout=1)
except Exception as e:
LOG.error('Aborting /actionexecutions/ POST operation for id: %s. Exception: %s',
actionexec_id, e)
actionexec_status = ACTIONEXEC_STATUS_ERROR

# Update actionexec_db status.
LOG.info('/actionexecutions/ POST update ActionExecution in DB after '
'LiveAction POST: %s', actionexec_db)
actionexec_db = update_actionexecution_status(actionexec_status,
actionexec_id=actionexec_id)
LOG.error('Unable to launch LiveAction.')
LOG.info('Aborting /actionexecutions/ POST operation.')
abort(httplib.INTERNAL_SERVER_ERROR)

actionexec_api = ActionExecutionAPI.from_model(actionexec_db)

LOG.debug('POST /actionexecutions/ client_result=%s', actionexec_api)
return actionexec_api
actionexec_api = ActionExecutionAPI.from_model(actionexec_db)
error = 'Failed to kickoff live action for id: %s, exception: %s' % (actionexec_id,
str(e))
abort(httplib.INTERNAL_SERVER_ERROR, error)
else:
actionexec_status = ACTIONEXEC_STATUS_SCHEDULED
actionexec_db = update_actionexecution_status(actionexec_status,
actionexec_id=actionexec_id)
self._kickoff_live_actions()
actionexec_api = ActionExecutionAPI.from_model(actionexec_db)
LOG.debug('POST /actionexecutions/ client_result=%s', actionexec_api)
return actionexec_api

@wsme_pecan.wsexpose(ActionExecutionAPI, body=ActionExecutionAPI,
status_code=httplib.FORBIDDEN)
Expand Down Expand Up @@ -302,3 +305,18 @@ def delete(self, id):

LOG.info('DELETE /actionexecutions/ with id="%s" completed', id)
return None

def _kickoff_live_actions(self):
if self._live_actions_pool.free() <= 0:
return
while not self._live_actions.empty() and self._live_actions_pool.free() > 0:
action_exec_id = self._live_actions.get_nowait()
self._live_actions_pool.spawn(self._issue_liveaction_post, action_exec_id)

def _drain_live_actions(self):
while True:
while self._live_actions.empty():
eventlet.greenthread.sleep(self._monitor_thread_empty_q_sleep_time)
while self._live_actions_pool.free() <= 0:
eventlet.greenthread.sleep(self._monitor_thread_no_workers_sleep_time)
self._kickoff_live_actions()
7 changes: 4 additions & 3 deletions st2common/st2common/models/api/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,14 @@ def __str__(self):


ACTIONEXEC_STATUS_INIT = 'initializing'
ACTIONEXEC_STATUS_SCHEDULED = 'scheduled'
ACTIONEXEC_STATUS_RUNNING = 'running'
ACTIONEXEC_STATUS_COMPLETE = 'complete'
ACTIONEXEC_STATUS_ERROR = 'error'

ACTIONEXEC_STATUSES = [ACTIONEXEC_STATUS_INIT, ACTIONEXEC_STATUS_RUNNING,
ACTIONEXEC_STATUS_COMPLETE, ACTIONEXEC_STATUS_ERROR,
]
ACTIONEXEC_STATUSES = [ACTIONEXEC_STATUS_INIT, ACTIONEXEC_STATUS_SCHEDULED,
ACTIONEXEC_STATUS_RUNNING, ACTIONEXEC_STATUS_COMPLETE,
ACTIONEXEC_STATUS_ERROR]

ACTION_NAME = 'name'
ACTION_ID = 'id'
Expand Down
30 changes: 28 additions & 2 deletions stormbot/scripts/stacktion.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,11 @@ module.exports = (robot) ->
robot.logger.error (CONN_ERRORS[err.code] || CONN_ERRORS['default']) err

# define basic clients
httpclient = robot.http('http://localhost:9101')

httpclients =
actions: robot.http('http://localhost:9101').path('/actions')
actionexecutions: robot.http('http://localhost:9101').path('/actionexecutions')
actions: httpclient.scope('/actions')
actionexecutions: httpclient.scope('/actionexecutions')

# init for actions
actionsPromise = new rsvp.Promise (resolve, reject) ->
Expand Down Expand Up @@ -145,8 +147,32 @@ module.exports = (robot) ->
msg.send "Action has failed to run"
return

if action_execution.status is 'scheduled'
setTimeout () ->
pullResults(action_execution.id)
, 1000
return

unless action_execution.status is 'complete'
msg.send "Action has failed to execute"
return
action = actions[action_execution.action.name]
PUBLISHERS[action.runner_type] action_execution, msg, robot.adapterName

pullResults = (id) ->
httpclient.scope("/actionexecutions/#{id}")
.get(errorHandler) (err, res, body) ->
action_execution = JSON.parse(body)

if action_execution.status is 'scheduled' or action_execution.status is 'running'
setTimeout () ->
pullResults(action_execution.id)
, 1000
return

unless action_execution.status is 'complete'
msg.send "Action has failed to execute"
return

action = actions[action_execution.action.name]
PUBLISHERS[action.runner_type] action_execution, msg, robot.adapterName

0 comments on commit ad69ae7

Please sign in to comment.