Skip to content

Commit

Permalink
Add Graceful Shutdown Support for the Python Client. (#53)
Browse files Browse the repository at this point in the history
Added support for Graceful Shutdown of the KCL.  This requires version 1.7.6 of the Amazon Kinesis Client.
  • Loading branch information
mikramulhaq authored and pfifer committed Jun 23, 2017
1 parent 917e5b6 commit 1e47120
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 3 deletions.
5 changes: 3 additions & 2 deletions amazon_kclpy/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class MalformedAction(Exception):
"processRecords": messages.ProcessRecordsInput,
"shutdown": messages.ShutdownInput,
"checkpoint": messages.CheckpointInput,
"record": messages.Record
"record": messages.Record,
"shutdownRequested": messages.ShutdownRequestedInput
}


Expand Down Expand Up @@ -56,4 +57,4 @@ def message_decode(json_dict):
raise MalformedAction("Received an action which couldn't be understood. Action was '{action}' -- Allowed {keys}"
.format(action=action, keys=_format_serializer_names()))

return serializer(json_dict)
return serializer(json_dict)
11 changes: 11 additions & 0 deletions amazon_kclpy/kcl.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,17 @@ def shutdown(self, checkpointer, reason):
'''
raise NotImplementedError

@abc.abstractmethod
def shutdown_requested(self, checkpointer):
'''
Called by a KCLProcess instance to indicate that this record processor is being shutdown.
And it gives an opportunity for record processor to checkpoint before shutdown.
:type checkpointer: amazon_kclpy.kcl.Checkpointer
:param checkpointer: A checkpointer which accepts a sequence number or no parameters.
'''
pass

version = 1


Expand Down
32 changes: 32 additions & 0 deletions amazon_kclpy/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,38 @@ def dispatch(self, checkpointer, record_processor):
self._checkpointer = checkpointer
record_processor.shutdown(self)

class ShutdownRequestedInput(MessageDispatcher):
"""
Used to tell the record processor it will be shutdown.
"""
def __init__(self, json_dict):
self._checkpointer = None
self._action = json_dict['action']

@property
def checkpointer(self):
"""
The checkpointer that can be used to checkpoint before actual shutdown.
:return: the checkpointer
:rtype: amazon_kclpy.kcl.Checkpointer
"""
return self._checkpointer

@property
def action(self):
"""
The action that spawned this message
:return: the original action value
:rtype: str
"""
return self._action

def dispatch(self, checkpointer, record_processor):
self._checkpointer = checkpointer
record_processor.shutdown_requested(self)


class CheckpointInput(object):
"""
Expand Down
21 changes: 20 additions & 1 deletion amazon_kclpy/v2/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ def shutdown(self, shutdown_input):
"""
raise NotImplementedError

@abc.abstractmethod
def shutdown_requested(self, shutdown_requested_input):
'''
Called by a KCLProcess instance to indicate that this record processor is being shutdown.
And it gives an opportunity for record processor to checkpoint before shutdown.
:param amazon_kclpy.messages.ShutdownRequestedInput shutdown_requested_input: Information related to shutdown requested.
'''
pass

version = 2


Expand Down Expand Up @@ -120,4 +130,13 @@ def shutdown(self, shutdown_input):
:param amazon_kclpy.messages.ShutdownInput shutdown_input: information related to the record processor shutdown
:return: None
"""
self.delegate.shutdown(shutdown_input.checkpointer, shutdown_input.reason)
self.delegate.shutdown(shutdown_input.checkpointer, shutdown_input.reason)

def shutdown_requested(self, shutdown_requested_input):
"""
Sends the shutdown request to the delegate
:param amazon_kclpy.messages.ShutdownInput shutdown_input: information related to the record processor shutdown
:return: None
"""
self.delegate.shutdown_requested(shutdown_requested_input.checkpointer)

0 comments on commit 1e47120

Please sign in to comment.