diff --git a/ocrd/ocrd/cli/ocrd_tool.py b/ocrd/ocrd/cli/ocrd_tool.py index 03a8be6995..4d0983751c 100644 --- a/ocrd/ocrd/cli/ocrd_tool.py +++ b/ocrd/ocrd/cli/ocrd_tool.py @@ -6,6 +6,7 @@ :nested: full """ +from inspect import getmodule from json import dumps import codecs import sys @@ -115,16 +116,18 @@ def moduledir(self): show_resource=res_name) @ocrd_tool_tool.command('help', help="Generate help for processors") +@click.argument('subcommand', required=False) @pass_ocrd_tool -def ocrd_tool_tool_params_help(ctx): +def ocrd_tool_tool_params_help(ctx, subcommand): class BashProcessor(Processor): # set docstrings to empty - # fixme: override the module-level docstring, too __doc__ = None + # HACK: override the module-level docstring, too + getmodule(OcrdToolCtx).__doc__ = None def process(self): return super() BashProcessor(None, ocrd_tool=ctx.json['tools'][ctx.tool_name], - show_help=True) + show_help=True, subcommand=subcommand) # ---------------------------------------------------------------------- # ocrd ocrd-tool tool categories diff --git a/ocrd/ocrd/decorators/__init__.py b/ocrd/ocrd/decorators/__init__.py index 3e7b5cae06..773469b53e 100644 --- a/ocrd/ocrd/decorators/__init__.py +++ b/ocrd/ocrd/decorators/__init__.py @@ -20,6 +20,8 @@ from .ocrd_cli_options import ocrd_cli_options from .mets_find_options import mets_find_options +SUBCOMMANDS = ['worker', 'server'] + def ocrd_cli_wrap_processor( processorClass, mets=None, @@ -35,8 +37,8 @@ def ocrd_cli_wrap_processor( show_resource=None, list_resources=False, # ocrd_network params start # - agent_type=None, - agent_address=None, + subcommand=None, + address=None, queue=None, database=None, # ocrd_network params end # @@ -51,17 +53,20 @@ def ocrd_cli_wrap_processor( dump_json=dump_json, dump_module_dir=dump_module_dir, show_help=help, + subcommand=subcommand, show_version=version, show_resource=show_resource, list_resources=list_resources ) sys.exit() + if subcommand: + # Used for checking/starting network agents for the WebAPI architecture + check_and_run_network_agent(processorClass, subcommand, address, database, queue) + elif address or queue or database: + raise ValueError(f"Subcommand options --adress --queue and --database are only valid for subcommands 'worker' or 'server'") - initLogging() - # Used for checking/starting network agents for the WebAPI architecture - # Has no side effects if neither of the 4 ocrd_network parameters are passed - check_and_run_network_agent(processorClass, agent_type, agent_address, database, queue) + initLogging() LOG = getLogger('ocrd_cli_wrap_processor') # LOG.info('kwargs=%s' % kwargs) @@ -128,59 +133,50 @@ def exit(): run_processor(processorClass, mets_url=mets, workspace=workspace, **kwargs) -def check_and_run_network_agent(ProcessorClass, agent_type: str, agent_address: str, database: str, queue: str): - if not agent_type and (agent_address or database or queue): - raise ValueError("Options '--database', '--queue', and '--address' are valid only with '--type'") - if not agent_type: - return +def check_and_run_network_agent(ProcessorClass, subcommand: str, address: str, database: str, queue: str): + """ + """ + if subcommand not in SUBCOMMANDS: + raise ValueError(f"SUBCOMMAND can only be one of {SUBCOMMANDS}") if not database: - raise ValueError("Options '--type' and '--database' are mutually inclusive") - allowed_agent_types = ['server', 'worker'] - if agent_type not in allowed_agent_types: - agents_str = ', '.join(allowed_agent_types) - raise ValueError(f"Wrong type parameter. Allowed types: {agents_str}") - if agent_type == 'server': - if not agent_address: - raise ValueError("Options '--type=server' and '--address' are mutually inclusive") + raise ValueError(f"Option '--database' is invalid for subcommand {subcommand}") + + if subcommand == 'server': + if not address: + raise ValueError(f"Option '--address' required for subcommand {subcommand}") if queue: - raise ValueError("Options '--type=server' and '--queue' are mutually exclusive") - if agent_type == 'worker': + raise ValueError(f"Option '--queue' invalid for subcommand {subcommand}") + if subcommand == 'worker': + if address: + raise ValueError(f"Option '--address' invalid for subcommand {subcommand}") if not queue: - raise ValueError("Options '--type=worker' and '--queue' are mutually inclusive") - if agent_address: - raise ValueError("Options '--type=worker' and '--address' are mutually exclusive") + raise ValueError(f"Option '--queue' required for subcommand {subcommand}") import logging logging.getLogger('ocrd.network').setLevel(logging.DEBUG) processor = ProcessorClass(workspace=None, dump_json=True) - if agent_type == 'worker': - try: - # TODO: Passing processor_name and ocrd_tool is reduntant - processing_worker = ProcessingWorker( - rabbitmq_addr=queue, - mongodb_addr=database, - processor_name=processor.ocrd_tool['executable'], - ocrd_tool=processor.ocrd_tool, - processor_class=ProcessorClass, - ) - # The RMQConsumer is initialized and a connection to the RabbitMQ is performed - processing_worker.connect_consumer() - # Start consuming from the queue with name `processor_name` - processing_worker.start_consuming() - except Exception as e: - sys.exit(f"Processing worker has failed with error: {e}") - if agent_type == 'server': - try: - # TODO: Better validate that inside the ProcessorServer itself - host, port = agent_address.split(':') - processor_server = ProcessorServer( - mongodb_addr=database, - processor_name=processor.ocrd_tool['executable'], - processor_class=ProcessorClass, - ) - processor_server.run_server(host=host, port=int(port)) - except Exception as e: - sys.exit(f"Processor server has failed with error: {e}") + if subcommand == 'worker': + # TODO: Passing processor_name and ocrd_tool is reduntant + processing_worker = ProcessingWorker( + rabbitmq_addr=queue, + mongodb_addr=database, + processor_name=processor.ocrd_tool['executable'], + ocrd_tool=processor.ocrd_tool, + processor_class=ProcessorClass, + ) + # The RMQConsumer is initialized and a connection to the RabbitMQ is performed + processing_worker.connect_consumer() + # Start consuming from the queue with name `processor_name` + processing_worker.start_consuming() + elif subcommand == 'server': + # TODO: Better validate that inside the ProcessorServer itself + host, port = address.split(':') + processor_server = ProcessorServer( + mongodb_addr=database, + processor_name=processor.ocrd_tool['executable'], + processor_class=ProcessorClass, + ) + processor_server.run_server(host=host, port=int(port)) sys.exit(0) diff --git a/ocrd/ocrd/decorators/ocrd_cli_options.py b/ocrd/ocrd/decorators/ocrd_cli_options.py index cd6664c235..74b9e5bc60 100644 --- a/ocrd/ocrd/decorators/ocrd_cli_options.py +++ b/ocrd/ocrd/decorators/ocrd_cli_options.py @@ -1,5 +1,5 @@ import click -from click import option, Path +from click import option, Path, group, command, argument from .parameter_option import parameter_option, parameter_override_option from .loglevel_option import loglevel_option from ocrd_network import ( @@ -39,8 +39,7 @@ def cli(mets_url): parameter_option, parameter_override_option, loglevel_option, - option('--type', 'agent_type', type=click.Choice(['worker', 'server'])), - option('--address', 'agent_address', type=ServerAddressParamType()), + option('--address', type=ServerAddressParamType()), option('--queue', type=QueueServerParamType()), option('--database', type=DatabaseParamType()), option('-C', '--show-resource'), @@ -49,6 +48,12 @@ def cli(mets_url): option('-D', '--dump-module-dir', is_flag=True, default=False), option('-h', '--help', is_flag=True, default=False), option('-V', '--version', is_flag=True, default=False), + # Subcommand, only used for 'worker'/'server'. Cannot be handled in + # click because processors use the @command decorator and even if they + # were using `group`, you cannot combine have a command with + # subcommands. So we have to work around that by creating a + # pseudo-subcommand handled in ocrd_cli_wrap_processor + argument('subcommand', nargs=1, required=False, type=click.Choice(['worker', 'server'])), ] for param in params: param(f) diff --git a/ocrd/ocrd/lib.bash b/ocrd/ocrd/lib.bash index fffdd40f9c..06a0119381 100644 --- a/ocrd/ocrd/lib.bash +++ b/ocrd/ocrd/lib.bash @@ -87,7 +87,7 @@ ocrd__list_resources () { ## ocrd__usage () { - ocrd ocrd-tool "$OCRD_TOOL_JSON" tool "$OCRD_TOOL_NAME" help + ocrd ocrd-tool "$OCRD_TOOL_JSON" tool "$OCRD_TOOL_NAME" help $ocrd__subcommand } @@ -124,6 +124,10 @@ ocrd__parse_argv () { local __parameters=() local __parameter_overrides=() + if [[ $1 == 'worker' || $1 == 'server' ]];then + ocrd__subcommand="$1" ; shift ; + fi + while [[ "${1:-}" = -* ]];do case "$1" in -l|--log-level) ocrd__argv[log_level]=$2 ; shift ;; @@ -146,31 +150,30 @@ ocrd__parse_argv () { -V|--version) ocrd ocrd-tool "$OCRD_TOOL_JSON" version; exit ;; --queue) ocrd__worker_queue="$2" ; shift ;; --database) ocrd__worker_database="$2" ; shift ;; - --type) ocrd__worker_type="$2" ; shift ;; --address) ocrd__worker_address="$2" ; shift ;; *) ocrd__raise "Unknown option '$1'" ;; esac shift done - if [ -v ocrd__worker_queue -o -v ocrd__worker_database -o -v ocrd__worker_type -o -v ocrd__worker_address ]; then - if ! [ -v ocrd__worker_type ] ; then - ocrd__raise "For Processing Worker / Processor Server --type is required" + if [ -v ocrd__worker_queue -o -v ocrd__worker_database -o -v ocrd__subcommand -o -v ocrd__worker_address ]; then + if ! [ -v ocrd__subcommand ] ; then + ocrd__raise "Provide subcommand 'worker' or 'server' for Processing Worker / Processor Server" elif ! [ -v ocrd__worker_database ]; then ocrd__raise "For the Processing Worker / Processor Server --database is required" fi - if [ ${ocrd__worker_type} = "worker" ]; then + if [ ${ocrd__subcommand} = "worker" ]; then if ! [ -v ocrd__worker_queue ]; then ocrd__raise "For the Processing Worker --queue is required" fi ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" - elif [ ${ocrd__worker_type} = "server" ]; then + elif [ ${ocrd__subcommand} = "server" ]; then if ! [ -v ocrd__worker_address ]; then ocrd__raise "For the Processor Server --address is required" fi ocrd network processor-server $OCRD_TOOL_NAME --database "${ocrd__worker_database}" --address "${ocrd__worker_address}" else - ocrd__raise "--type must be either 'worker' or 'server' not '${ocrd__worker_type}'" + ocrd__raise "subcommand must be either 'worker' or 'server' not '${ocrd__subcommand}'" fi exit fi diff --git a/ocrd/ocrd/processor/base.py b/ocrd/ocrd/processor/base.py index 7f6f5c271b..2b91997266 100644 --- a/ocrd/ocrd/processor/base.py +++ b/ocrd/ocrd/processor/base.py @@ -61,6 +61,7 @@ def __init__( show_resource=None, list_resources=False, show_help=False, + subcommand=None, show_version=False, dump_json=False, dump_module_dir=False, @@ -92,6 +93,7 @@ def __init__( show_help (boolean): If true, then instead of processing, print a usage description \ including the standard CLI and all of this processor's ocrd-tool parameters and \ docstrings. + subcommand (string): 'worker' or 'server', only used here for the right --help output show_version (boolean): If true, then instead of processing, print information on \ this processor's version and OCR-D version. Exit afterwards. dump_json (boolean): If true, then instead of processing, print :py:attr:`ocrd_tool` \ @@ -127,7 +129,7 @@ def __init__( sys.stdout.buffer.write(fpath.read_bytes()) return if show_help: - self.show_help() + self.show_help(subcommand=subcommand) return self.version = version if show_version: @@ -149,8 +151,8 @@ def __init__( raise Exception("Invalid parameters %s" % report.errors) self.parameter = parameter - def show_help(self): - print(generate_processor_help(self.ocrd_tool, processor_instance=self)) + def show_help(self, subcommand=None): + print(generate_processor_help(self.ocrd_tool, processor_instance=self, subcommand=subcommand)) def show_version(self): print("Version %s, ocrd/core %s" % (self.version, OCRD_VERSION)) diff --git a/ocrd/ocrd/processor/helpers.py b/ocrd/ocrd/processor/helpers.py index 679491e6ec..a6b2c73c11 100644 --- a/ocrd/ocrd/processor/helpers.py +++ b/ocrd/ocrd/processor/helpers.py @@ -9,9 +9,6 @@ from subprocess import run, PIPE from typing import List, Type -from memory_profiler import memory_usage -from sparklines import sparklines - from click import wrap_text from ocrd.workspace import Workspace from ocrd_utils import freeze_args, getLogger, pushd_popd @@ -106,6 +103,8 @@ def run_processor( t0_cpu = process_time() if any(x in environ.get('OCRD_PROFILE', '') for x in ['RSS', 'PSS']): backend = 'psutil_pss' if 'PSS' in environ['OCRD_PROFILE'] else 'psutil' + from memory_profiler import memory_usage + from sparklines import sparklines try: mem_usage = memory_usage(proc=processor.process, # only run process once @@ -212,32 +211,15 @@ def run_cli( result = run(args, check=False) return result.returncode -def generate_processor_help(ocrd_tool, processor_instance=None): +def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None): """Generate a string describing the full CLI of this processor including params. Args: ocrd_tool (dict): this processor's ``tools`` section of the module's ``ocrd-tool.json`` processor_instance (object, optional): the processor implementation (for adding any module/class/function docstrings) + subcommand (string): 'worker' or 'server' """ - parameter_help = '' - if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']: - parameter_help = ' NONE\n' - else: - def wrap(s): - return wrap_text(s, initial_indent=' '*3, - subsequent_indent=' '*4, - width=72, preserve_paragraphs=True) - for param_name, param in ocrd_tool['parameters'].items(): - parameter_help += wrap('"%s" [%s%s]' % ( - param_name, - param['type'], - ' - REQUIRED' if 'required' in param and param['required'] else - ' - %s' % json.dumps(param['default']) if 'default' in param else '')) - parameter_help += '\n ' + wrap(param['description']) - if 'enum' in param: - parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum'])) - parameter_help += "\n" doc_help = '' if processor_instance: module = inspect.getmodule(processor_instance) @@ -252,12 +234,29 @@ def wrap(s): initial_indent=' > ', subsequent_indent=' > ', preserve_paragraphs=True) - return ''' -Usage: %s [OPTIONS] + subcommands = '''\ + worker Start a processing worker rather than do local processing + server Start a processor server rather than do local processing +''' - %s%s + processing_worker_options = '''\ + --queue The RabbitMQ server address in format + "amqp://{user}:{pass}@{host}:{port}/{vhost}" + [amqp://admin:admin@localhost:5672] + --database The MongoDB server address in format + "mongodb://{host}:{port}" + [mongodb://localhost:27018] +''' -Options for processing: + processing_server_options = '''\ + --address The Processor server address in format + "{host}:{port}" + --database The MongoDB server address in format + "mongodb://{host}:{port}" + [mongodb://localhost:27018] +''' + + processing_options = '''\ -m, --mets URL-PATH URL or file path of METS to process [./mets.xml] -w, --working-dir PATH Working directory of local workspace [dirname(URL-PATH)] -I, --input-file-grp USE File group(s) used as input @@ -278,33 +277,75 @@ def wrap(s): -w, --working-dir PATH Working directory of local workspace -l, --log-level [OFF|ERROR|WARN|INFO|DEBUG|TRACE] Override log level globally [INFO] +''' -Options for Processing Worker server: - --queue The RabbitMQ server address in format - "amqp://{user}:{pass}@{host}:{port}/{vhost}" - [amqp://admin:admin@localhost:5672] - --database The MongoDB server address in format - "mongodb://{host}:{port}" - [mongodb://localhost:27018] - --type type of processing: either "worker" or "server" - -Options for information: + information_options = '''\ -C, --show-resource RESNAME Dump the content of processor resource RESNAME -L, --list-resources List names of processor resources -J, --dump-json Dump tool description as JSON -D, --dump-module-dir Show the 'module' resource location path for this processor -h, --help Show this message -V, --version Show version +''' + + parameter_help = '' + if 'parameters' not in ocrd_tool or not ocrd_tool['parameters']: + parameter_help = ' NONE\n' + else: + def wrap(s): + return wrap_text(s, initial_indent=' '*3, + subsequent_indent=' '*4, + width=72, preserve_paragraphs=True) + for param_name, param in ocrd_tool['parameters'].items(): + parameter_help += wrap('"%s" [%s%s]' % ( + param_name, + param['type'], + ' - REQUIRED' if 'required' in param and param['required'] else + ' - %s' % json.dumps(param['default']) if 'default' in param else '')) + parameter_help += '\n ' + wrap(param['description']) + if 'enum' in param: + parameter_help += '\n ' + wrap('Possible values: %s' % json.dumps(param['enum'])) + parameter_help += "\n" + + if not subcommand: + return f'''\ +Usage: {ocrd_tool['executable']} [worker|server] [OPTIONS] + {ocrd_tool['description']}{doc_help} + +Subcommands: +{subcommands} +Options for processing: +{processing_options} +Options for information: +{information_options} Parameters: -%s - -''' % ( - ocrd_tool['executable'], - ocrd_tool['description'], - doc_help, - parameter_help, -) +{parameter_help} +''' + elif subcommand == 'worker': + return f'''\ +Usage: {ocrd_tool['executable']} worker [OPTIONS] + + Run {ocrd_tool['executable']} as a processing worker. + + {ocrd_tool['description']}{doc_help} + +Options: +{processing_worker_options} +''' + elif subcommand == 'server': + return f'''\ +Usage: {ocrd_tool['executable']} server [OPTIONS] + + Run {ocrd_tool['executable']} as a processor sever. + + {ocrd_tool['description']}{doc_help} + +Options: +{processing_server_options} +''' + else: + pass # Taken from https://github.com/OCR-D/core/pull/884 diff --git a/ocrd_network/ocrd_network/deployer.py b/ocrd_network/ocrd_network/deployer.py index 058fc6e293..f419ed7971 100644 --- a/ocrd_network/ocrd_network/deployer.py +++ b/ocrd_network/ocrd_network/deployer.py @@ -461,7 +461,7 @@ def start_native_processor( self.log.info(f'Starting native processing worker: {processor_name}') channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile('wb'), channel.makefile('rb') - cmd = f'{processor_name} --type worker --database {database_url} --queue {queue_url}' + cmd = f'{processor_name} worker --database {database_url} --queue {queue_url}' # the only way (I could find) to make it work to start a process in the background and # return early is this construction. The pid of the last started background process is # printed with `echo $!` but it is printed inbetween other output. Because of that I added @@ -504,7 +504,7 @@ def start_native_processor_server( self.log.info(f"Starting native processor server: {processor_name} on {agent_address}") channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile('wb'), channel.makefile('rb') - cmd = f'{processor_name} --type server --address {agent_address} --database {database_url}' + cmd = f'{processor_name} server --address {agent_address} --database {database_url}' port = agent_address.split(':')[1] log_path = f'/tmp/server_{processor_name}_{port}_{getpid()}.log' # TODO: This entire stdin/stdout thing is broken with servers!