diff --git a/.github/workflows/publish-pypi.yml b/.github/workflows/publish-pypi.yml new file mode 100644 index 0000000000..e811c958ab --- /dev/null +++ b/.github/workflows/publish-pypi.yml @@ -0,0 +1,31 @@ +# This workflow will upload a Python Package using Twine when a release is created +# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries + +name: Upload Python Package + +on: + release: + types: [published] + workflow_dispatch: + +jobs: + deploy: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: '3.8' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel build twine + pip install -r requirements.txt + - name: Build and publish + env: + TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} + TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }} + run: make pypi diff --git a/CHANGELOG.md b/CHANGELOG.md index abbfd5a4d8..ff09794f82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -128,6 +128,58 @@ Added: - `Processor.verify`: handle fileGrp cardinality verification, with default implementation - `Processor.setup`: to set up processor before processing, optional +## [2.70.0] - 2024-10-10 + +Added: + + - `ocrd network client workflow run`: Add `--print-status` flag to periodically print the job status, #1277 + - Processing Server: `DELETE /mets_server_zombies` to kill any renegade METS servers, #1277 + - No more zombie METS Server by properly shutting them down, #1284 + - `OCRD_NETWORK_RABBITMQ_HEARBEAT` to allow overriding the [heartbeat](https://pika.readthedocs.io/en/stable/examples/heartbeat_and_blocked_timeouts.html) behavior of RabbitMQ, #1285 + +Changed: + + - significantly more detailed logging for the METS Server and Processing Server, #1284 + - Only import `ocrd_network` in src/ocrd/decorators/__init__.py once needed, #1289 + - Automate release via GitHub Actions, #1290 + +Fixed: + + - `ocrd/core-cuda-torch`: Install torchvision as well, #1286 + - Processing Server: remove shut down METS servers from deployer's cache, #1287 + - typos, #1274 + +## [2.69.0] - 2024-09-30 + +Fixed: + - tests: ensure `ocrd_utils.config` gets reset whenever changing it globally + - `ocrd.cli.workspace`: consistently pass on `--mets-server-url` and `--backup` + - `ocrd.cli.workspace`: make `list-page` work w/ METS Server + - `ocrd.cli.validate "tasks"`: pass on `--mets-server-url` + - `lib.bash`: fix `errexit` handling + - actually apply CLI `--log-filename`, and show in `--help` + - adapt to Pillow changes + - `ocrd workspace clone`: do pass on `--file-grp` (for download filtering) + - `OcrdMetsServer.add_file`: pass on `force` kwarg + - `Workspace.reload_mets`: handle ClientSideOcrdMets as well + - `OcrdMets.get_physical_pages`: cover `return_divs` w/o `for_fileIds` and `for_pageIds` + - `disableLogging`: also re-instate root logger to Python defaults + - `OcrdExif`: handle multi-frame TIFFs gracefully in `identify` callout, #1276 + +Changed: + - `run_processor`: be robust if `ocrd_tool` is missing `steps` + - `PcGtsType.PageType.id` via `make_xml_id`: replace `/` with `_` + - `ClientSideOcrdMets`: use same logger name prefix as METS Server + - `Processor.zip_input_files`: when `--page-id` yields empty list, just log instead of raise + +Added: + - `OcrdPage`: new `PageType.get_ReadingOrderGroups()` to retrieve recursive RO as dict + - METS Server: export and delegate `physical_pages` + - ocrd.cli.workspace `server`: add subcommands `reload` and `save` + - processor CLI: delegate `--resolve-resource`, too + - `OcrdConfig.reset_defaults` to reset config variables to their defaults + - `ocrd_utils.scale_coordinates` for resizing images + ## [2.68.0] - 2024-08-23 Changed: @@ -2294,6 +2346,8 @@ Initial Release [3.0.0b1]: ../../compare/v3.0.0b1..v3.0.0a2 [3.0.0a2]: ../../compare/v3.0.0a2..v3.0.0a1 [3.0.0a1]: ../../compare/v3.0.0a1..v2.67.2 +[2.70.0]: ../../compare/v2.70.0..v2.69.0 +[2.69.0]: ../../compare/v2.69.0..v2.68.0 [2.68.0]: ../../compare/v2.68.0..v2.67.2 [2.67.2]: ../../compare/v2.67.2..v2.67.1 [2.67.1]: ../../compare/v2.67.1..v2.67.0 diff --git a/Dockerfile.cuda-torch b/Dockerfile.cuda-torch index 8d6c3aa624..59ce1144be 100644 --- a/Dockerfile.cuda-torch +++ b/Dockerfile.cuda-torch @@ -9,7 +9,5 @@ RUN make deps-torch WORKDIR /data -RUN rm -fr /build - CMD ["/usr/local/bin/ocrd", "--help"] diff --git a/Makefile b/Makefile index 1a4a6bbdb8..95fef5fc2d 100644 --- a/Makefile +++ b/Makefile @@ -63,7 +63,7 @@ deps-cuda: CONDA_EXE ?= /usr/local/bin/conda deps-cuda: export CONDA_PREFIX ?= /conda deps-cuda: PYTHON_PREFIX != $(PYTHON) -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])' deps-cuda: - curl -Ls https://micro.mamba.pm/api/micromamba/linux-64/latest | tar -xvj bin/micromamba + curl --retry 6 -Ls https://micro.mamba.pm/api/micromamba/linux-64/latest | tar -xvj bin/micromamba mv bin/micromamba $(CONDA_EXE) # Install Conda system-wide (for interactive / login shells) echo 'export MAMBA_EXE=$(CONDA_EXE) MAMBA_ROOT_PREFIX=$(CONDA_PREFIX) CONDA_PREFIX=$(CONDA_PREFIX) PATH=$(CONDA_PREFIX)/bin:$$PATH' >> /etc/profile.d/98-conda.sh @@ -158,7 +158,7 @@ deps-tf2: fi deps-torch: - $(PIP) install -i https://download.pytorch.org/whl/cu118 torch + $(PIP) install -i https://download.pytorch.org/whl/cu118 torch torchvision # Dependencies for deployment in an ubuntu/debian linux deps-ubuntu: @@ -178,7 +178,7 @@ build: # (Re)install the tool install: #build - # not stricttly necessary but a precaution against outdated python build tools, https://github.com/OCR-D/core/pull/1166 + # not strictly necessary but a precaution against outdated python build tools, https://github.com/OCR-D/core/pull/1166 $(PIP) install -U pip wheel $(PIP_INSTALL) . $(PIP_INSTALL_CONFIG_OPTION) @# workaround for shapely#1598 diff --git a/repo/assets b/repo/assets index 05568aaa2d..ca108faf0e 160000 --- a/repo/assets +++ b/repo/assets @@ -1 +1 @@ -Subproject commit 05568aaa2dc20678bf87ffec77f3baf2924d7c24 +Subproject commit ca108faf0e95cc823a9e84cd0a1602282ae006b1 diff --git a/src/ocrd/cli/__init__.py b/src/ocrd/cli/__init__.py index 9e8a37b8bf..667bddc7c5 100644 --- a/src/ocrd/cli/__init__.py +++ b/src/ocrd/cli/__init__.py @@ -83,6 +83,8 @@ def get_help(self, ctx): \b {config.describe('OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS')} \b +{config.describe('OCRD_NETWORK_RABBITMQ_HEARTBEAT')} +\b {config.describe('OCRD_PROFILE_FILE')} \b {config.describe('OCRD_PROFILE', wrap_text=False)} diff --git a/src/ocrd/decorators/__init__.py b/src/ocrd/decorators/__init__.py index f52a13575b..f659bf58a0 100644 --- a/src/ocrd/decorators/__init__.py +++ b/src/ocrd/decorators/__init__.py @@ -13,7 +13,6 @@ redirect_stderr_and_stdout_to_file, ) from ocrd_validators import WorkspaceValidator -from ocrd_network import ProcessingWorker, ProcessorServer, AgentType from ..resolver import Resolver from ..processor.base import ResourceNotFoundError, run_processor @@ -23,8 +22,6 @@ from .ocrd_cli_options import ocrd_cli_options from .mets_find_options import mets_find_options -SUBCOMMANDS = [AgentType.PROCESSING_WORKER, AgentType.PROCESSOR_SERVER] - def ocrd_cli_wrap_processor( processorClass, @@ -88,11 +85,9 @@ def ocrd_cli_wrap_processor( if list_resources: processor.list_resources() sys.exit() - if subcommand: + if subcommand or address or queue or database: # Used for checking/starting network agents for the WebAPI architecture check_and_run_network_agent(processorClass, subcommand, address, database, queue) - elif address or queue or database: - raise ValueError(f"Subcommand options --address --queue and --database are only valid for subcommands: {SUBCOMMANDS}") # from here: single-run processing context initLogging() @@ -162,6 +157,11 @@ def goexit(): def check_and_run_network_agent(ProcessorClass, subcommand: str, address: str, database: str, queue: str): """ """ + from ocrd_network import ProcessingWorker, ProcessorServer, AgentType + SUBCOMMANDS = [AgentType.PROCESSING_WORKER, AgentType.PROCESSOR_SERVER] + + if not subcommand: + raise ValueError(f"Subcommand options --address --queue and --database are only valid for subcommands: {SUBCOMMANDS}") if subcommand not in SUBCOMMANDS: raise ValueError(f"SUBCOMMAND can only be one of {SUBCOMMANDS}") diff --git a/src/ocrd/mets_server.py b/src/ocrd/mets_server.py index 101727e064..e0f0029570 100644 --- a/src/ocrd/mets_server.py +++ b/src/ocrd/mets_server.py @@ -1,8 +1,10 @@ """ # METS server functionality """ +import os import re from os import _exit, chmod +import signal from typing import Dict, Optional, Union, List, Tuple from time import sleep from pathlib import Path @@ -155,13 +157,13 @@ def save(self): Request writing the changes to the file system """ if not self.multiplexing_mode: - self.session.request("PUT", url=self.url) + return self.session.request("PUT", url=self.url).text else: - self.session.request( + return self.session.request( "POST", self.url, json=MpxReq.save(self.ws_dir_path) - ) + ).json()["text"] def stop(self): """ @@ -169,14 +171,13 @@ def stop(self): """ try: if not self.multiplexing_mode: - self.session.request("DELETE", self.url) - return + return self.session.request("DELETE", self.url).text else: - self.session.request( + return self.session.request( "POST", self.url, json=MpxReq.stop(self.ws_dir_path) - ) + ).json()["text"] except ConnectionError: # Expected because we exit the process without returning pass @@ -323,7 +324,7 @@ def add_file( class MpxReq: - """This class wrapps the request bodies needed for the tcp forwarding + """This class wraps the request bodies needed for the tcp forwarding For every mets-server-call like find_files or workspace_path a special request_body is needed to call `MetsServerProxy.forward_tcp_request`. These are created by this functions. @@ -346,12 +347,12 @@ def __args_wrapper( @staticmethod def save(ws_dir_path: str) -> Dict: return MpxReq.__args_wrapper( - ws_dir_path, method_type="PUT", response_type="empty", request_url="", request_data={}) + ws_dir_path, method_type="PUT", response_type="text", request_url="", request_data={}) @staticmethod def stop(ws_dir_path: str) -> Dict: return MpxReq.__args_wrapper( - ws_dir_path, method_type="DELETE", response_type="empty", request_url="", request_data={}) + ws_dir_path, method_type="DELETE", response_type="text", request_url="", request_data={}) @staticmethod def reload(ws_dir_path: str) -> Dict: @@ -428,18 +429,24 @@ def create_process(mets_server_url: str, ws_dir_path: str, log_file: str) -> int @staticmethod def kill_process(mets_server_pid: int): - subprocess_run(args=["kill", "-s", "SIGINT", f"{mets_server_pid}"], shell=False, universal_newlines=True) + os.kill(mets_server_pid, signal.SIGINT) + sleep(3) + try: + os.kill(mets_server_pid, signal.SIGKILL) + except ProcessLookupError as e: + pass def shutdown(self): + pid = os.getpid() + self.log.info(f"Shutdown method of mets server[{pid}] invoked, sending SIGTERM signal.") + os.kill(pid, signal.SIGTERM) if self.is_uds: if Path(self.url).exists(): - self.log.debug(f'UDS socket {self.url} still exists, removing it') + self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}") Path(self.url).unlink() - # os._exit because uvicorn catches SystemExit raised by sys.exit - _exit(0) def startup(self): - self.log.info("Starting up METS server") + self.log.info(f"Configuring the Mets Server") workspace = self.workspace @@ -465,32 +472,49 @@ def save(): """ Write current changes to the file system """ - return workspace.save_mets() + workspace.save_mets() + response = Response(content="The Mets Server is writing changes to disk.", media_type='text/plain') + self.log.info(f"PUT / -> {response.__dict__}") + return response @app.delete(path='/') - async def stop(): + def stop(): """ Stop the mets server """ - getLogger('ocrd.models.ocrd_mets').info(f'Shutting down METS Server {self.url}') workspace.save_mets() + response = Response(content="The Mets Server will shut down soon...", media_type='text/plain') self.shutdown() + self.log.info(f"DELETE / -> {response.__dict__}") + return response @app.post(path='/reload') - async def workspace_reload_mets(): + def workspace_reload_mets(): """ Reload mets file from the file system """ workspace.reload_mets() - return Response(content=f'Reloaded from {workspace.directory}', media_type="text/plain") + response = Response(content=f"Reloaded from {workspace.directory}", media_type='text/plain') + self.log.info(f"POST /reload -> {response.__dict__}") + return response @app.get(path='/unique_identifier', response_model=str) async def unique_identifier(): - return Response(content=workspace.mets.unique_identifier, media_type='text/plain') + response = Response(content=workspace.mets.unique_identifier, media_type='text/plain') + self.log.info(f"GET /unique_identifier -> {response.__dict__}") + return response @app.get(path='/workspace_path', response_model=str) async def workspace_path(): - return Response(content=workspace.directory, media_type="text/plain") + response = Response(content=workspace.directory, media_type="text/plain") + self.log.info(f"GET /workspace_path -> {response.__dict__}") + return response + + @app.get(path='/physical_pages', response_model=OcrdPageListModel) + async def physical_pages(): + response = {'physical_pages': workspace.mets.physical_pages} + self.log.info(f"GET /physical_pages -> {response}") + return response @app.get(path='/physical_pages', response_model=OcrdPageListModel) async def physical_pages(): @@ -498,18 +522,24 @@ async def physical_pages(): @app.get(path='/file_groups', response_model=OcrdFileGroupListModel) async def file_groups(): - return {'file_groups': workspace.mets.file_groups} + response = {'file_groups': workspace.mets.file_groups} + self.log.info(f"GET /file_groups -> {response}") + return response @app.get(path='/agent', response_model=OcrdAgentListModel) async def agents(): - return OcrdAgentListModel.create(workspace.mets.agents) + response = OcrdAgentListModel.create(workspace.mets.agents) + self.log.info(f"GET /agent -> {response.__dict__}") + return response @app.post(path='/agent', response_model=OcrdAgentModel) async def add_agent(agent: OcrdAgentModel): kwargs = agent.dict() kwargs['_type'] = kwargs.pop('type') workspace.mets.add_agent(**kwargs) - return agent + response = agent + self.log.info(f"POST /agent -> {response.__dict__}") + return response @app.get(path="/file", response_model=OcrdFileListModel) async def find_files( @@ -526,7 +556,9 @@ async def find_files( found = workspace.mets.find_all_files( fileGrp=file_grp, ID=file_id, pageId=page_id, mimetype=mimetype, local_filename=local_filename, url=url ) - return OcrdFileListModel.create(found) + response = OcrdFileListModel.create(found) + self.log.info(f"GET /file -> {response.__dict__}") + return response @app.post(path='/file', response_model=OcrdFileModel) async def add_file( @@ -549,7 +581,9 @@ async def add_file( # Add to workspace kwargs = file_resource.dict() workspace.add_file(**kwargs, force=force) - return file_resource + response = file_resource + self.log.info(f"POST /file -> {response.__dict__}") + return response # ------------- # @@ -557,9 +591,6 @@ async def add_file( # Create socket and change to world-readable and -writable to avoid permission errors self.log.debug(f"chmod 0o677 {self.url}") server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - if Path(self.url).exists() and not is_socket_in_use(self.url): - # remove leftover unused socket which blocks startup - Path(self.url).unlink() server.bind(self.url) # creates the socket file atexit.register(self.shutdown) server.close() @@ -571,16 +602,5 @@ async def add_file( uvicorn_kwargs['log_config'] = None uvicorn_kwargs['access_log'] = False - self.log.debug("Starting uvicorn") + self.log.info("Starting the uvicorn Mets Server") uvicorn.run(app, **uvicorn_kwargs) - - -def is_socket_in_use(socket_path): - if Path(socket_path).exists(): - client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - try: - client.connect(socket_path) - except OSError: - return False - client.close() - return True diff --git a/src/ocrd/processor/helpers.py b/src/ocrd/processor/helpers.py index 2cbbbd97e1..ba89de0036 100644 --- a/src/ocrd/processor/helpers.py +++ b/src/ocrd/processor/helpers.py @@ -4,7 +4,6 @@ from time import perf_counter, process_time from functools import lru_cache import json -import inspect from subprocess import run from typing import List, Optional @@ -98,7 +97,7 @@ def run_processor( backend = 'psutil_pss' if 'PSS' in config.OCRD_PROFILE else 'psutil' from memory_profiler import memory_usage # pylint: disable=import-outside-toplevel try: - mem_usage = memory_usage(proc=(processor.process_workspace, [workspace], {}), + mem_usage = memory_usage(proc=(processor.process_workspace, [workspace], {}), # type: ignore # only run process once max_iterations=1, interval=.1, timeout=None, timestamps=True, diff --git a/src/ocrd/resource_manager.py b/src/ocrd/resource_manager.py index 3c4c603060..95d0fec4e1 100644 --- a/src/ocrd/resource_manager.py +++ b/src/ocrd/resource_manager.py @@ -23,6 +23,8 @@ # pylint: enable=wrong-import-position +# pylint: enable=wrong-import-position + from ocrd_validators import OcrdResourceListValidator from ocrd_utils import getLogger, directory_size, get_moduledir, guess_media_type, config from ocrd_utils.os import get_processor_resource_types, list_all_resources, pushd_popd, get_ocrd_tool_json diff --git a/src/ocrd_models/ocrd_exif.py b/src/ocrd_models/ocrd_exif.py index ab050bae59..937416f5ef 100644 --- a/src/ocrd_models/ocrd_exif.py +++ b/src/ocrd_models/ocrd_exif.py @@ -49,11 +49,11 @@ def run_identify(self, img): for prop in ['compression', 'photometric_interpretation']: setattr(self, prop, img.info[prop] if prop in img.info else None) if img.filename: - ret = run(['identify', '-format', r'%[resolution.x] %[resolution.y] %U', img.filename], check=False, stderr=PIPE, stdout=PIPE) + ret = run(['identify', '-format', r'%[resolution.x] %[resolution.y] %U ', img.filename], check=False, stderr=PIPE, stdout=PIPE) else: with BytesIO() as bio: img.save(bio, format=img.format) - ret = run(['identify', '-format', r'%[resolution.x] %[resolution.y] %U', '/dev/stdin'], check=False, stderr=PIPE, stdout=PIPE, input=bio.getvalue()) + ret = run(['identify', '-format', r'%[resolution.x] %[resolution.y] %U ', '/dev/stdin'], check=False, stderr=PIPE, stdout=PIPE, input=bio.getvalue()) if ret.returncode: stderr = ret.stderr.decode('utf-8') if 'no decode delegate for this image format' in stderr: diff --git a/src/ocrd_network/cli/client.py b/src/ocrd_network/cli/client.py index 9c7f15c88f..350cf64b90 100644 --- a/src/ocrd_network/cli/client.py +++ b/src/ocrd_network/cli/client.py @@ -2,6 +2,7 @@ from json import dumps from typing import List, Optional, Tuple from ocrd.decorators.parameter_option import parameter_option, parameter_override_option +from ocrd_network.constants import JobState from ocrd_utils import DEFAULT_METS_BASENAME from ocrd_utils.introspect import set_json_key_value_overrides from ocrd_utils.str import parse_json_string_or_file @@ -104,8 +105,10 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str): @click.option('--result-queue-name') @click.option('--callback-url') @click.option('--agent-type', default='worker') -@click.option('-b', '--block', default=False, +@click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') +@click.option('-p', '--print-state', default=False, is_flag=True, + help='If set, the client will print job states by each iteration.') def send_processing_job_request( address: Optional[str], processor_name: str, @@ -120,7 +123,8 @@ def send_processing_job_request( # TODO: This is temporally available to toggle # between the ProcessingWorker/ProcessorServer agent_type: Optional[str], - block: Optional[bool] + block: Optional[bool], + print_state: Optional[bool] ): """ Submit a processing job to the processing server. @@ -146,7 +150,7 @@ def send_processing_job_request( assert processing_job_id print(f"Processing job id: {processing_job_id}") if block: - client.poll_job_status(job_id=processing_job_id) + client.poll_job_status(job_id=processing_job_id, print_state=print_state) @client_cli.group('workflow') @@ -176,24 +180,39 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str): 'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default') @click.option('-m', '--path-to-mets', required=True) @click.option('-w', '--path-to-workflow', required=True) -@click.option('-b', '--block', default=False, +@click.option('--page-wise/--no-page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs") +@click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') +@click.option('-p', '--print-state', default=False, is_flag=True, + help='If set, the client will print job states by each iteration.') def send_workflow_job_request( address: Optional[str], path_to_mets: str, path_to_workflow: str, - block: Optional[bool] + page_wise: bool, + block: bool, + print_state: bool ): """ Submit a workflow job to the processing server. """ client = Client(server_addr_processing=address) - workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets) + workflow_job_id = client.send_workflow_job_request( + path_to_wf=path_to_workflow, + path_to_mets=path_to_mets, + page_wise=page_wise, + ) assert workflow_job_id print(f"Workflow job id: {workflow_job_id}") if block: - client.poll_workflow_status(job_id=workflow_job_id) - + print(f"Polling state of workflow job {workflow_job_id}") + state = client.poll_workflow_status(job_id=workflow_job_id, print_state=print_state) + if state != JobState.success: + print(f"Workflow failed with {state}") + exit(1) + else: + print(f"Workflow succeeded") + exit(0) @client_cli.group('workspace') def workspace_cli(): diff --git a/src/ocrd_network/client.py b/src/ocrd_network/client.py index 8ec8e541ea..bb7cf4dbf2 100644 --- a/src/ocrd_network/client.py +++ b/src/ocrd_network/client.py @@ -46,18 +46,21 @@ def check_job_status(self, job_id: str): def check_workflow_status(self, workflow_job_id: str): return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id) - def poll_job_status(self, job_id: str) -> str: + def poll_job_status(self, job_id: str, print_state: bool = False) -> str: return poll_job_status_till_timeout_fail_or_success( - ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, + print_state=print_state) - def poll_workflow_status(self, job_id: str) -> str: + def poll_workflow_status(self, job_id: str, print_state: bool = False) -> str: return poll_wf_status_till_timeout_fail_or_success( - ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait) + ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait, + print_state=print_state) def send_processing_job_request(self, processor_name: str, req_params: dict) -> str: return post_ps_processing_request( ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params) - def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str): + def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str, page_wise: bool = False): return post_ps_workflow_request( - ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets) + ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets, + page_wise=page_wise) diff --git a/src/ocrd_network/client_utils.py b/src/ocrd_network/client_utils.py index 9b924c16a4..4eaf4ea95b 100644 --- a/src/ocrd_network/client_utils.py +++ b/src/ocrd_network/client_utils.py @@ -1,9 +1,10 @@ +import json from requests import get as request_get, post as request_post from time import sleep from .constants import JobState, NETWORK_PROTOCOLS -def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int): +def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_state: bool = False) -> JobState: if job_type not in ["workflow", "processor"]: raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'") job_state = JobState.unset @@ -13,18 +14,22 @@ def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries job_state = get_ps_processing_job_status(ps_server_host, job_id) if job_type == "workflow": job_state = get_ps_workflow_job_status(ps_server_host, job_id) + if print_state: + print(f"State of the {job_type} job {job_id}: {job_state}") if job_state == JobState.success or job_state == JobState.failed: break tries -= 1 return job_state -def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: - return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait) +def poll_job_status_till_timeout_fail_or_success( + ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait, print_state) -def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState: - return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait) +def poll_wf_status_till_timeout_fail_or_success( + ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState: + return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait, print_state) def get_ps_deployed_processors(ps_server_host: str): @@ -47,22 +52,21 @@ def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str): return response -def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str: +def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> JobState: request_url = f"{ps_server_host}/processor/job/{processing_job_id}" response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" job_state = response.json()["state"] assert job_state - return job_state - + return getattr(JobState, job_state.lower()) -def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str: +def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> JobState: request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}" response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"}) assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" job_state = response.json()["state"] assert job_state - return job_state + return getattr(JobState, job_state.lower()) def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str: @@ -78,9 +82,13 @@ def post_ps_processing_request(ps_server_host: str, processor: str, job_input: d return processing_job_id -# TODO: Can be extended to include other parameters such as page_wise -def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str: - request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True" +def post_ps_workflow_request( + ps_server_host: str, + path_to_wf: str, + path_to_mets: str, + page_wise: bool = False, +) -> str: + request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise={'True' if page_wise else 'False'}" response = request_post( url=request_url, headers={"accept": "application/json; charset=utf-8"}, @@ -88,8 +96,11 @@ def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: ) # print(response.json()) # print(response.__dict__) + json_resp_raw = response.text + # print(f'post_ps_workflow_request >> {response.status_code}') + # print(f'post_ps_workflow_request >> {json_resp_raw}') assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}" - wf_job_id = response.json()["job_id"] + wf_job_id = json.loads(json_resp_raw)["job_id"] assert wf_job_id return wf_job_id diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index 34c22e5cf6..83cb1d75f1 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -1,7 +1,7 @@ from datetime import datetime from os import getpid from pathlib import Path -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from uvicorn import run as uvicorn_run from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile @@ -48,6 +48,7 @@ get_workflow_content, get_from_database_workspace, get_from_database_workflow_job, + kill_mets_server_zombies, parse_workflow_tasks, raise_http_exception, request_processor_server_tool_json, @@ -200,6 +201,14 @@ def add_api_routes_others(self): tags=[ServerApiTags.WORKSPACE], summary="Forward a TCP request to UDS mets server" ) + others_router.add_api_route( + path="/kill_mets_server_zombies", + endpoint=self.kill_mets_server_zombies, + methods=["DELETE"], + tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING], + status_code=status.HTTP_200_OK, + summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago." + ) self.include_router(others_router) def add_api_routes_processing(self): @@ -320,7 +329,7 @@ async def forward_tcp_request_to_uds_mets_server(self, request: Request) -> Dict """Forward mets-server-request A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends - a request to this endpoint. This request contains all infomation neccessary to make a call + a request to this endpoint. This request contains all information necessary to make a call to the uds-mets-server. This information is used by `MetsServerProxy` to make a the call to the local (local for the processing-server) reachable the uds-mets-server. """ @@ -574,26 +583,20 @@ async def _cancel_cached_dependent_jobs(self, workspace_key: str, job_id: str) - ) async def _consume_cached_jobs_of_workspace( - self, workspace_key: str, mets_server_url: str + self, workspace_key: str, mets_server_url: str, path_to_mets: str ) -> List[PYJobInput]: - - # Check whether the internal queue for the workspace key still exists - if workspace_key not in self.cache_processing_requests.processing_requests: - self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") - return [] - # decrease the internal cache counter by 1 request_counter = self.cache_processing_requests.update_request_counter( workspace_key=workspace_key, by_value=-1 ) self.log.debug(f"Internal processing job cache counter value: {request_counter}") - if not len(self.cache_processing_requests.processing_requests[workspace_key]): + if (workspace_key not in self.cache_processing_requests.processing_requests or + not len(self.cache_processing_requests.processing_requests[workspace_key])): if request_counter <= 0: # Shut down the Mets Server for the workspace_key since no # more internal callbacks are expected for that workspace self.log.debug(f"Stopping the mets server: {mets_server_url}") - - self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url) + self.deployer.stop_uds_mets_server(mets_server_url=mets_server_url, path_to_mets=path_to_mets) try: # The queue is empty - delete it @@ -609,6 +612,10 @@ async def _consume_cached_jobs_of_workspace( else: self.log.debug(f"Internal request cache is empty but waiting for {request_counter} result callbacks.") return [] + # Check whether the internal queue for the workspace key still exists + if workspace_key not in self.cache_processing_requests.processing_requests: + self.log.debug(f"No internal queue available for workspace with key: {workspace_key}") + return [] consumed_requests = await self.cache_processing_requests.consume_cached_requests(workspace_key=workspace_key) return consumed_requests @@ -643,7 +650,7 @@ async def remove_job_from_request_cache(self, result_message: PYResultMessage): raise_http_exception(self.log, status.HTTP_404_NOT_FOUND, message, error) consumed_cached_jobs = await self._consume_cached_jobs_of_workspace( - workspace_key=workspace_key, mets_server_url=mets_server_url + workspace_key=workspace_key, mets_server_url=mets_server_url, path_to_mets=path_to_mets ) await self.push_cached_jobs_to_agents(processing_jobs=consumed_cached_jobs) @@ -817,6 +824,10 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: response = self._produce_workflow_status_response(processing_jobs=jobs) return response + async def kill_mets_server_zombies(self, minutes_ago : Optional[int] = None, dry_run : Optional[bool] = None) -> List[int]: + pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run) + return pids_killed + async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]: """ Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. diff --git a/src/ocrd_network/rabbitmq_utils/connector.py b/src/ocrd_network/rabbitmq_utils/connector.py index 893d55a219..8fbbc84ab9 100644 --- a/src/ocrd_network/rabbitmq_utils/connector.py +++ b/src/ocrd_network/rabbitmq_utils/connector.py @@ -6,6 +6,7 @@ from typing import Any, Optional, Union from pika import BasicProperties, BlockingConnection, ConnectionParameters, PlainCredentials from pika.adapters.blocking_connection import BlockingChannel +from ocrd_utils import config from .constants import ( DEFAULT_EXCHANGER_NAME, DEFAULT_EXCHANGER_TYPE, @@ -69,8 +70,7 @@ def open_blocking_connection( port=port, virtual_host=vhost, credentials=credentials, - # TODO: The heartbeat should not be disabled (0)! - heartbeat=0 + heartbeat=config.OCRD_NETWORK_RABBITMQ_HEARTBEAT ), ) return blocking_connection diff --git a/src/ocrd_network/runtime_data/deployer.py b/src/ocrd_network/runtime_data/deployer.py index b956904d07..919d5b97ce 100644 --- a/src/ocrd_network/runtime_data/deployer.py +++ b/src/ocrd_network/runtime_data/deployer.py @@ -8,7 +8,7 @@ """ from __future__ import annotations from pathlib import Path -from subprocess import Popen, run as subprocess_run +import psutil from time import sleep from typing import Dict, List, Union @@ -30,6 +30,8 @@ def __init__(self, config_path: str) -> None: self.data_hosts: List[DataHost] = parse_hosts_data(ps_config["hosts"]) self.internal_callback_url = ps_config.get("internal_callback_url", None) self.mets_servers: Dict = {} # {"mets_server_url": "mets_server_pid"} + # This is required to store UDS urls that are multiplexed through the TCP proxy and are not preserved anywhere + self.mets_servers_paths: Dict = {} # {"ws_dir_path": "mets_server_url"} self.use_tcp_mets = ps_config.get("use_tcp_mets", False) # TODO: Reconsider this. @@ -146,25 +148,33 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path: if is_mets_server_running(mets_server_url=str(mets_server_url)): self.log.debug(f"The UDS mets server for {ws_dir_path} is already started: {mets_server_url}") return mets_server_url + elif Path(mets_server_url).is_socket(): + self.log.warning( + f"The UDS mets server for {ws_dir_path} is not running but the socket file exists: {mets_server_url}." + "Removing to avoid any weird behavior before starting the server.") + Path(mets_server_url).unlink() self.log.info(f"Starting UDS mets server: {mets_server_url}") - pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file) - self.mets_servers[mets_server_url] = pid + pid = OcrdMetsServer.create_process(mets_server_url=str(mets_server_url), ws_dir_path=str(ws_dir_path), log_file=str(log_file)) + self.mets_servers[str(mets_server_url)] = pid + self.mets_servers_paths[str(ws_dir_path)] = str(mets_server_url) return mets_server_url - def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False) -> None: + def stop_uds_mets_server(self, mets_server_url: str, path_to_mets: str) -> None: self.log.info(f"Stopping UDS mets server: {mets_server_url}") - if stop_with_pid: - if Path(mets_server_url) not in self.mets_servers: - message = f"UDS Mets server not found at URL: {mets_server_url}" - self.log.exception(message) - raise Exception(message) - mets_server_pid = self.mets_servers[Path(mets_server_url)] - OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid) - return - # TODO: Reconsider this again - # Not having this sleep here causes connection errors - # on the last request processed by the processing worker. - # Sometimes 3 seconds is enough, sometimes not. - sleep(5) - stop_mets_server(mets_server_url=mets_server_url) + self.log.info(f"Path to the mets file: {path_to_mets}") + self.log.debug(f"mets_server: {self.mets_servers}") + self.log.debug(f"mets_server_paths: {self.mets_servers_paths}") + workspace_path = str(Path(path_to_mets).parent) + mets_server_url_uds = self.mets_servers_paths[workspace_path] + mets_server_pid = self.mets_servers[mets_server_url_uds] + self.log.info(f"Terminating mets server with pid: {mets_server_pid}") + p = psutil.Process(mets_server_pid) + stop_mets_server(self.log, mets_server_url=mets_server_url, ws_dir_path=workspace_path) + if p.is_running(): + p.wait() + self.log.info(f"Terminated mets server with pid: {mets_server_pid}") + else: + self.log.info(f"Mets server with pid: {mets_server_pid} has already terminated.") + del self.mets_servers_paths[workspace_path] + del self.mets_servers[mets_server_url_uds] return diff --git a/src/ocrd_network/server_cache.py b/src/ocrd_network/server_cache.py index b57f3fd235..179a76139d 100644 --- a/src/ocrd_network/server_cache.py +++ b/src/ocrd_network/server_cache.py @@ -31,7 +31,7 @@ def check_if_locked_pages_for_output_file_grps( self, workspace_key: str, output_file_grps: List[str], page_ids: List[str] ) -> bool: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") return False debug_message = f"Caching the received request due to locked output file grp pages." for file_group in output_file_grps: @@ -46,46 +46,45 @@ def check_if_locked_pages_for_output_file_grps( def get_locked_pages(self, workspace_key: str) -> Dict[str, List[str]]: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No locked pages available for workspace key: {workspace_key}") + self.log.info(f"No locked pages available for workspace key: {workspace_key}") return {} return self.locked_pages[workspace_key] def lock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") - self.log.debug(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"Creating an entry in the locked pages cache for workspace key: {workspace_key}") self.locked_pages[workspace_key] = {} for file_group in output_file_grps: if file_group not in self.locked_pages[workspace_key]: - self.log.debug(f"Creating an empty list for output file grp: {file_group}") + self.log.info(f"Creating an empty list for output file grp: {file_group}") self.locked_pages[workspace_key][file_group] = [] # The page id list is not empty - only some pages are in the request if page_ids: - self.log.debug(f"Locking pages for '{file_group}': {page_ids}") + self.log.info(f"Locking pages for '{file_group}': {page_ids}") self.locked_pages[workspace_key][file_group].extend(page_ids) - self.log.debug(f"Locked pages of '{file_group}': " - f"{self.locked_pages[workspace_key][file_group]}") + self.log.info(f"Locked pages of '{file_group}': {self.locked_pages[workspace_key][file_group]}") else: # Lock all pages with a single value - self.log.debug(f"Locking pages for '{file_group}': {self.placeholder_all_pages}") + self.log.info(f"Locking pages for '{file_group}': {self.placeholder_all_pages}") self.locked_pages[workspace_key][file_group].append(self.placeholder_all_pages) def unlock_pages(self, workspace_key: str, output_file_grps: List[str], page_ids: List[str]) -> None: if not self.locked_pages.get(workspace_key, None): - self.log.debug(f"No entry found in the locked pages cache for workspace key: {workspace_key}") + self.log.info(f"No entry found in the locked pages cache for workspace key: {workspace_key}") return for file_group in output_file_grps: if file_group in self.locked_pages[workspace_key]: if page_ids: # Unlock the previously locked pages - self.log.debug(f"Unlocking pages of '{file_group}': {page_ids}") + self.log.info(f"Unlocking pages of '{file_group}': {page_ids}") self.locked_pages[workspace_key][file_group] = \ [x for x in self.locked_pages[workspace_key][file_group] if x not in page_ids] - self.log.debug(f"Remaining locked pages of '{file_group}': " - f"{self.locked_pages[workspace_key][file_group]}") + self.log.info(f"Remaining locked pages of '{file_group}': " + f"{self.locked_pages[workspace_key][file_group]}") else: # Remove the single variable used to indicate all pages are locked - self.log.debug(f"Unlocking all pages for: {file_group}") + self.log.info(f"Unlocking all pages for: {file_group}") self.locked_pages[workspace_key][file_group].remove(self.placeholder_all_pages) @@ -127,11 +126,11 @@ def __print_job_input_debug_message(self, job_input: PYJobInput): debug_message += f", page ids: {job_input.page_id}" debug_message += f", job id: {job_input.job_id}" debug_message += f", job depends on: {job_input.depends_on}" - self.log.debug(debug_message) + self.log.info(debug_message) async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]: if not self.has_workspace_cached_requests(workspace_key=workspace_key): - self.log.debug(f"No jobs to be consumed for workspace key: {workspace_key}") + self.log.info(f"No jobs to be consumed for workspace key: {workspace_key}") return [] found_consume_requests = [] for current_element in self.processing_requests[workspace_key]: @@ -165,25 +164,27 @@ def update_request_counter(self, workspace_key: str, by_value: int) -> int: # If a record counter of this workspace key does not exist # in the requests counter cache yet, create one and assign 0 if not self.processing_counter.get(workspace_key, None): - self.log.debug(f"Creating an internal request counter for workspace key: {workspace_key}") + self.log.info(f"Creating an internal request counter for workspace key: {workspace_key}") self.processing_counter[workspace_key] = 0 self.processing_counter[workspace_key] = self.processing_counter[workspace_key] + by_value + self.log.info(f"The new request counter of {workspace_key}: {self.processing_counter[workspace_key]}") return self.processing_counter[workspace_key] def cache_request(self, workspace_key: str, data: PYJobInput): # If a record queue of this workspace key does not exist in the requests cache if not self.processing_requests.get(workspace_key, None): - self.log.debug(f"Creating an internal request queue for workspace_key: {workspace_key}") + self.log.info(f"Creating an internal request queue for workspace_key: {workspace_key}") self.processing_requests[workspace_key] = [] self.__print_job_input_debug_message(job_input=data) # Add the processing request to the end of the internal queue + self.log.info(f"Caching a processing request of {workspace_key}: {data.job_id}") self.processing_requests[workspace_key].append(data) async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str) -> List[PYJobInput]: if not self.has_workspace_cached_requests(workspace_key=workspace_key): - self.log.debug(f"No jobs to be cancelled for workspace key: {workspace_key}") + self.log.info(f"No jobs to be cancelled for workspace key: {workspace_key}") return [] - self.log.debug(f"Cancelling jobs dependent on job id: {processing_job_id}") + self.log.info(f"Cancelling jobs dependent on job id: {processing_job_id}") found_cancel_requests = [] for i, current_element in enumerate(self.processing_requests[workspace_key]): if processing_job_id in current_element.depends_on: @@ -192,7 +193,7 @@ async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str for cancel_element in found_cancel_requests: try: self.processing_requests[workspace_key].remove(cancel_element) - self.log.debug(f"For job id: '{processing_job_id}', cancelling job id: '{cancel_element.job_id}'") + self.log.info(f"For job id: '{processing_job_id}', cancelling job id: '{cancel_element.job_id}'") cancelled_jobs.append(cancel_element) await db_update_processing_job(job_id=cancel_element.job_id, state=JobState.cancelled) # Recursively cancel dependent jobs for the cancelled job @@ -225,9 +226,11 @@ async def sync_is_caching_required(self, job_dependencies: List[str]) -> bool: def has_workspace_cached_requests(self, workspace_key: str) -> bool: if not self.processing_requests.get(workspace_key, None): - self.log.debug(f"In processing requests cache, no workspace key found: {workspace_key}") + self.log.info(f"In processing requests cache, no workspace key found: {workspace_key}") return False if not len(self.processing_requests[workspace_key]): - self.log.debug(f"The processing requests cache is empty for workspace key: {workspace_key}") + self.log.info(f"The processing requests cache is empty for workspace key: {workspace_key}") return False + self.log.info(f"The processing requests cache has {len(self.processing_requests[workspace_key])} " + f"entries for workspace key: {workspace_key} ") return True diff --git a/src/ocrd_network/server_utils.py b/src/ocrd_network/server_utils.py index 9d8628170c..6e485f261f 100644 --- a/src/ocrd_network/server_utils.py +++ b/src/ocrd_network/server_utils.py @@ -1,12 +1,18 @@ +import os +import re +import signal +from pathlib import Path +from json import dumps, loads +from urllib.parse import urljoin +from typing import Dict, List, Optional, Union +from time import time + from fastapi import HTTPException, status, UploadFile from fastapi.responses import FileResponse from httpx import AsyncClient, Timeout -from json import dumps, loads from logging import Logger -from pathlib import Path from requests import get as requests_get -from typing import Dict, List, Union -from urllib.parse import urljoin +from requests_unixsocket import sys from ocrd.resolver import Resolver from ocrd.task_sequence import ProcessorTask @@ -241,3 +247,33 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s if group not in available_groups: message = f"Input file group '{group}' of the first processor not found: {input_file_grps}" raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message) + + +def kill_mets_server_zombies(minutes_ago : Optional[int], dry_run : Optional[bool]) -> List[int]: + if minutes_ago == None: + minutes_ago = 90 + if dry_run == None: + dry_run = False + + now = time() + cmdline_pat = r'.*ocrd workspace -U.*server start $' + ret = [] + for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime): + if not procdir.is_dir(): + continue + cmdline_file = procdir.joinpath('cmdline') + if not cmdline_file.is_file(): + continue + ctime_ago = int((now - procdir.stat().st_ctime) / 60) + if ctime_ago < minutes_ago: + continue + cmdline = cmdline_file.read_text().replace('\x00', ' ') + if re.match(cmdline_pat, cmdline): + pid = int(procdir.name) + ret.append(pid) + print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr) + if dry_run: + print(f'[dry_run is active] kill {pid}') + else: + os.kill(pid, signal.SIGTERM) + return ret diff --git a/src/ocrd_network/tcp_to_uds_mets_proxy.py b/src/ocrd_network/tcp_to_uds_mets_proxy.py index 176f4f1442..3f335435ab 100644 --- a/src/ocrd_network/tcp_to_uds_mets_proxy.py +++ b/src/ocrd_network/tcp_to_uds_mets_proxy.py @@ -1,5 +1,5 @@ from requests_unixsocket import Session as requests_unixsocket_session -from .utils import get_uds_path +from .utils import get_uds_path, convert_url_to_uds_format from typing import Dict from ocrd_utils import getLogger @@ -31,9 +31,13 @@ def forward_tcp_request(self, request_body) -> Dict: if method_type not in SUPPORTED_METHOD_TYPES: raise NotImplementedError(f"Method type: {method_type} not recognized") ws_socket_file = str(get_uds_path(ws_dir_path=ws_dir_path)) - ws_unix_socket_url = f'http+unix://{ws_socket_file.replace("/", "%2F")}' + ws_unix_socket_url = convert_url_to_uds_format(ws_socket_file) uds_request_url = f"{ws_unix_socket_url}/{request_url}" + self.log.info(f"Forwarding TCP mets server request to UDS url: {uds_request_url}") + self.log.info(f"Forwarding method type {method_type}, request data: {request_data}, " + f"expected response type: {response_type}") + if not request_data: response = self.session.request(method_type, uds_request_url) elif "params" in request_data: @@ -45,12 +49,11 @@ def forward_tcp_request(self, request_body) -> Dict: else: raise ValueError("Expecting request_data to be empty or containing single key: params," f"form, or class but not {request_data.keys}") - + if response_type == "empty": + return {} if not response: self.log.error(f"Uds-Mets-Server gives unexpected error. Response: {response.__dict__}") return {"error": response.text} - elif response_type == "empty": - return {} elif response_type == "text": return {"text": response.text} elif response_type == "class" or response_type == "dict": diff --git a/src/ocrd_network/utils.py b/src/ocrd_network/utils.py index a2f563de43..5abe2104fd 100644 --- a/src/ocrd_network/utils.py +++ b/src/ocrd_network/utils.py @@ -4,6 +4,7 @@ from functools import wraps from hashlib import md5 from json import loads +from logging import Logger from pathlib import Path from re import compile as re_compile, split as re_split from requests import get as requests_get, Session as Session_TCP @@ -151,22 +152,25 @@ def is_mets_server_running(mets_server_url: str, ws_dir_path: str = None) -> boo return False -def stop_mets_server(mets_server_url: str, ws_dir_path: str = None) -> bool: +def stop_mets_server(logger: Logger, mets_server_url: str, ws_dir_path: str) -> bool: protocol = "tcp" if (mets_server_url.startswith("http://") or mets_server_url.startswith("https://")) else "uds" - session = Session_TCP() if protocol == "tcp" else Session_UDS() - if protocol == "uds": - mets_server_url = convert_url_to_uds_format(mets_server_url) - try: - if 'tcp_mets' in mets_server_url: - if not ws_dir_path: - return False - response = session.post(url=f"{mets_server_url}", json=MpxReq.stop(ws_dir_path)) - else: - response = session.delete(url=f"{mets_server_url}/") - except Exception: - return False - return response.status_code == 200 - + # If the mets server URL is the proxy endpoint + if protocol == "tcp" and "tcp_mets" in mets_server_url: + # Convert the mets server url to UDS format + ws_socket_file = str(get_uds_path(ws_dir_path)) + mets_server_url = convert_url_to_uds_format(ws_socket_file) + protocol = "uds" + if protocol == "tcp": + request_json = MpxReq.stop(ws_dir_path) + logger.info(f"Sending POST request to: {mets_server_url}, request_json: {request_json}") + response = Session_TCP().post(url=f"{mets_server_url}", json=request_json) + return response.status_code == 200 + elif protocol == "uds": + logger.info(f"Sending DELETE request to: {mets_server_url}/") + response = Session_UDS().delete(url=f"{mets_server_url}/") + return response.status_code == 200 + else: + ValueError(f"Unexpected protocol type: {protocol}") def get_uds_path(ws_dir_path: str) -> Path: return Path(config.OCRD_NETWORK_SOCKETS_ROOT_DIR, f"{safe_filename(ws_dir_path)}.sock") diff --git a/src/ocrd_utils/config.py b/src/ocrd_utils/config.py index 36399870e2..c5f1e16679 100644 --- a/src/ocrd_utils/config.py +++ b/src/ocrd_utils/config.py @@ -231,7 +231,7 @@ def _ocrd_download_timeout_parser(val): config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP", description="How many seconds to sleep before trying again.", parser=int, - default=(True, 30)) + default=(True, 10)) config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT", description="Timeout for a blocking ocrd network client (in seconds).", @@ -247,9 +247,19 @@ def _ocrd_download_timeout_parser(val): default=(True, '')) config.add("OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS", - description="Number of attempts for a RabbitMQ client to connect before failing.", + description="Number of attempts for a RabbitMQ client to connect before failing.", + parser=int, + default=(True, 3)) + +config.add( + name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", + description=""" + Controls AMQP heartbeat timeout (in seconds) negotiation during connection tuning. An integer value always overrides the value + proposed by broker. Use 0 to deactivate heartbeat. + """, parser=int, - default=(True, 3)) + default=(True, 0) +) config.add(name="OCRD_NETWORK_SOCKETS_ROOT_DIR", description="The root directory where all mets server related socket files are created", diff --git a/src/ocrd_utils/logging.py b/src/ocrd_utils/logging.py index 181805118d..a576cc68bc 100644 --- a/src/ocrd_utils/logging.py +++ b/src/ocrd_utils/logging.py @@ -209,7 +209,7 @@ def disableLogging(silent=not config.OCRD_LOGGING_DEBUG): _initialized_flag = False # logging.basicConfig(level=logging.CRITICAL) # logging.disable(logging.ERROR) - # remove all handlers for the ocrd logger + # remove all handlers for the 'ocrd.' and root logger for logger_name in ROOT_OCRD_LOGGERS + ['']: for handler in logging.getLogger(logger_name).handlers[:]: logging.getLogger(logger_name).removeHandler(handler) diff --git a/tests/model/test_exif.py b/tests/model/test_exif.py index f6771fb8ee..18c5e4c467 100644 --- a/tests/model/test_exif.py +++ b/tests/model/test_exif.py @@ -24,7 +24,13 @@ ('leptonica_samples/data/OCR-D-IMG/OCR-D-IMG_1555_007.jpg', 944, 1472, 1, 1, 1, 'inches', 'RGB', None), ('kant_aufklaerung_1784-jp2/data/OCR-D-IMG/INPUT_0020.jp2', - 1457, 2084, 1, 1, 1, 'inches', 'RGB', None) + 1457, 2084, 1, 1, 1, 'inches', 'RGB', None), + # tolerate multi-frame TIFF: + ('gutachten/data/IMG/IMG_1.tif', + 2088, 2634, 300, 300, 300, 'inches', 'RGB', 'raw'), + # multi-frame TIFF with metric pixel density (is actually YCBCR not RGB but Pillow thinks otherwise...) + ('indian-ferns/data/OCR-D-IMG/0004.tif', + 2626, 3620, 28, 28, 28, 'cm', 'RGB', 'jpeg'), ]) def test_ocrd_exif(path, width, height, xResolution, yResolution, resolution, resolutionUnit, photometricInterpretation, compression): """Check EXIF attributes for different input formats diff --git a/tests/network/config.py b/tests/network/config.py index e22cc6ce9d..611ad63821 100644 --- a/tests/network/config.py +++ b/tests/network/config.py @@ -89,11 +89,19 @@ test_config.add( name="OCRD_NETWORK_RABBITMQ_CLIENT_CONNECT_ATTEMPTS", + description="Number of attempts for a RabbitMQ client to connect before failing", + parser=int, + default=(True, 3) +) + +test_config.add( + name="OCRD_NETWORK_RABBITMQ_HEARTBEAT", description=""" - Number of attempts for a RabbitMQ client to connect before failing + Controls AMQP heartbeat timeout (in seconds) negotiation during connection tuning. An integer value always overrides the value + proposed by broker. Use 0 to deactivate heartbeat. """, parser=int, - default=(True, 3) + default=(True, 0) ) test_config.add( diff --git a/tests/network/test_modules_mets_server_proxy.py b/tests/network/test_modules_mets_server_proxy.py index 8b8c0d35f7..f19d7e415e 100644 --- a/tests/network/test_modules_mets_server_proxy.py +++ b/tests/network/test_modules_mets_server_proxy.py @@ -119,7 +119,7 @@ def test_find_files(start_uds_mets_server): {"file_grp": test_file_group} ) response_dict = MetsServerProxy().forward_tcp_request(request_body=request_body) - assert len(response_dict["files"]) == 3, "Expected to find exatly 3 matching files" + assert len(response_dict["files"]) == 3, "Expected to find exactly 3 matching files" request_body = MpxReq.find_files( TEST_WORKSPACE_DIR, {"file_grp": test_non_existing_file_group} diff --git a/tests/test_resolver.py b/tests/test_resolver.py index c2575b6086..97d2ee6658 100644 --- a/tests/test_resolver.py +++ b/tests/test_resolver.py @@ -118,7 +118,7 @@ def test_workspace_from_url_kant_with_resources(mock_request, tmp_path): @patch.object(Session, "get") def test_workspace_from_url_kant_with_resources_existing_local(mock_request, tmp_path): """ - Fail with clobber_mets=False, succeeed with clobber_mets=True + Fail with clobber_mets=False, succeed with clobber_mets=True """ # arrange diff --git a/tests/test_resource_manager.py b/tests/test_resource_manager.py index 653167e10a..286f6ea6b0 100644 --- a/tests/test_resource_manager.py +++ b/tests/test_resource_manager.py @@ -80,7 +80,7 @@ def test_resources_manager_from_environment(tmp_path, monkeypatch): assert mgr.userdir == tmp_path -def test_resources_manager_config_explicite(tmp_path): +def test_resources_manager_config_explicit(tmp_path): # act from ocrd.resource_manager import OcrdResourceManager