Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce -p/--print-status to ocrd network client blocking commands #1277

Merged
merged 28 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f44e28b
introduce: OCRD_NETWORK_CLIENT_POLLING_PRINT
MehmedGIT Oct 1, 2024
7177eb1
fix: config value description
MehmedGIT Oct 1, 2024
df8e8ee
add default value param to preserver backwards compatibility
MehmedGIT Oct 1, 2024
b183cfc
make -b/--block as flags
MehmedGIT Oct 1, 2024
342ef3a
implement feedback
MehmedGIT Oct 1, 2024
0e80a7c
fix: missed params
MehmedGIT Oct 1, 2024
d7df200
fix: integration client tests
MehmedGIT Oct 1, 2024
0bfef64
post_ps_workflow_request: pagewise configurable
kba Oct 1, 2024
19aad83
Merge remote-tracking branch 'github/network_client_block_prints' int…
kba Oct 1, 2024
611b6b5
deployer: Remove any pre-existing socket file before starting the ser…
kba Oct 1, 2024
9a71d04
remove UDS socket files
MehmedGIT Oct 2, 2024
854403d
remove shortcuts for page-wise
MehmedGIT Oct 2, 2024
4d01e66
fix: pass page-wise argument to relevant methods
MehmedGIT Oct 2, 2024
97427e0
Update src/ocrd_network/client_utils.py
MehmedGIT Oct 2, 2024
7454845
add endpoint DELETE /workflow/kill-mets-server-zombies to kill -SIGTE…
kba Oct 2, 2024
0506e9d
move mets-zombie killer to / and return list of killed PIDs
kba Oct 2, 2024
ad81356
/kill_mets_server_zombies use underscores not slashes
kba Oct 2, 2024
7a3be1e
Merge pull request #1278 from OCR-D/page-wise-param
kba Oct 2, 2024
4862d72
use 3.8 compatible typing
kba Oct 2, 2024
2cb3e2a
Merge branch 'network_client_block_prints' into mets-server-kill-zombies
kba Oct 2, 2024
8b6a49c
Merge pull request #1282 from OCR-D/mets-server-rm-socket
kba Oct 2, 2024
0d297e7
Merge branch 'network_client_block_prints' into mets-server-kill-zombies
kba Oct 2, 2024
4f6775f
OcrdMetsServer.kill_process: try the easy way (SIGINT) then the hard …
kba Oct 2, 2024
3882e7a
fix: add default to page_wise param
MehmedGIT Oct 2, 2024
a8bfbe4
Merge branch 'network_client_block_prints' into mets-server-kill-zombies
kba Oct 2, 2024
c5fd843
Merge pull request #1283 from OCR-D/mets-server-kill-zombies
kba Oct 2, 2024
d39c3d7
kill_mets_server_zombies: actually return List[int]
kba Oct 10, 2024
7512bd6
kill_mets_server_zombies: allow dry_run to test
kba Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/ocrd/mets_server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""
# METS server functionality
"""
import os
import re
from os import _exit, chmod
import signal
from typing import Dict, Optional, Union, List, Tuple
from time import sleep
from pathlib import Path
Expand Down Expand Up @@ -313,7 +315,7 @@
else:
r = self.session.request("POST", self.url, json=MpxReq.add_file(self.ws_dir_path, kwargs))
if not r.ok:
raise RuntimeError(f"Failed to add file ({str(data)}): {r.json()[errors]}")

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.11, macos-latest)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.9, macos-latest)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.10, macos-latest)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.11, ubuntu-22.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.9, ubuntu-20.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.10, ubuntu-22.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.11, ubuntu-20.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.9, ubuntu-22.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.8, ubuntu-22.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.12, ubuntu-20.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.10, ubuntu-20.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.12, ubuntu-22.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.8, ubuntu-20.04)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.12, macos-latest)

F821

undefined name 'errors'

Check warning on line 318 in src/ocrd/mets_server.py

View workflow job for this annotation

GitHub Actions / build (3.8, macos-latest)

F821

undefined name 'errors'

return ClientSideOcrdFile(
None, fileGrp=file_grp,
Expand Down Expand Up @@ -428,13 +430,17 @@

@staticmethod
def kill_process(mets_server_pid: int):
subprocess_run(args=["kill", "-s", "SIGINT", f"{mets_server_pid}"], shell=False, universal_newlines=True)
return
os.kill(mets_server_pid, signal.SIGINT)
sleep(3)
try:
os.kill(mets_server_pid, signal.SIGKILL)
except ProcessLookupError as e:
pass

def shutdown(self):
if self.is_uds:
if Path(self.url).exists():
self.log.debug(f'UDS socket {self.url} still exists, removing it')
self.log.warning(f"Due to a server shutdown, removing the existing UDS socket file: {self.url}")
Path(self.url).unlink()
# os._exit because uvicorn catches SystemExit raised by sys.exit
_exit(0)
Expand Down
35 changes: 27 additions & 8 deletions src/ocrd_network/cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from json import dumps
from typing import List, Optional, Tuple
from ocrd.decorators.parameter_option import parameter_option, parameter_override_option
from ocrd_network.constants import JobState
from ocrd_utils import DEFAULT_METS_BASENAME
from ocrd_utils.introspect import set_json_key_value_overrides
from ocrd_utils.str import parse_json_string_or_file
Expand Down Expand Up @@ -104,8 +105,10 @@ def check_processing_job_status(address: Optional[str], processing_job_id: str):
@click.option('--result-queue-name')
@click.option('--callback-url')
@click.option('--agent-type', default='worker')
@click.option('-b', '--block', default=False,
@click.option('-b', '--block', default=False, is_flag=True,
help='If set, the client will block till job timeout, fail or success.')
@click.option('-p', '--print-state', default=False, is_flag=True,
help='If set, the client will print job states by each iteration.')
def send_processing_job_request(
address: Optional[str],
processor_name: str,
Expand All @@ -120,7 +123,8 @@ def send_processing_job_request(
# TODO: This is temporally available to toggle
# between the ProcessingWorker/ProcessorServer
agent_type: Optional[str],
block: Optional[bool]
block: Optional[bool],
print_state: Optional[bool]
):
"""
Submit a processing job to the processing server.
Expand All @@ -146,7 +150,7 @@ def send_processing_job_request(
assert processing_job_id
print(f"Processing job id: {processing_job_id}")
if block:
client.poll_job_status(job_id=processing_job_id)
client.poll_job_status(job_id=processing_job_id, print_state=print_state)


@client_cli.group('workflow')
Expand Down Expand Up @@ -176,24 +180,39 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str):
'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default')
@click.option('-m', '--path-to-mets', required=True)
@click.option('-w', '--path-to-workflow', required=True)
@click.option('-b', '--block', default=False,
@click.option('--page-wise/--no-page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs")
@click.option('-b', '--block', default=False, is_flag=True,
help='If set, the client will block till job timeout, fail or success.')
@click.option('-p', '--print-state', default=False, is_flag=True,
help='If set, the client will print job states by each iteration.')
def send_workflow_job_request(
address: Optional[str],
path_to_mets: str,
path_to_workflow: str,
block: Optional[bool]
page_wise: bool,
block: bool,
print_state: bool
):
"""
Submit a workflow job to the processing server.
"""
client = Client(server_addr_processing=address)
workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets)
workflow_job_id = client.send_workflow_job_request(
path_to_wf=path_to_workflow,
path_to_mets=path_to_mets,
page_wise=page_wise,
)
assert workflow_job_id
print(f"Workflow job id: {workflow_job_id}")
if block:
client.poll_workflow_status(job_id=workflow_job_id)

print(f"Polling state of workflow job {workflow_job_id}")
state = client.poll_workflow_status(job_id=workflow_job_id, print_state=print_state)
if state != JobState.success:
print(f"Workflow failed with {state}")
exit(1)
else:
print(f"Workflow succeeded")
exit(0)

@client_cli.group('workspace')
def workspace_cli():
Expand Down
15 changes: 9 additions & 6 deletions src/ocrd_network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,21 @@ def check_job_status(self, job_id: str):
def check_workflow_status(self, workflow_job_id: str):
return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id)

def poll_job_status(self, job_id: str) -> str:
def poll_job_status(self, job_id: str, print_state: bool = False) -> str:
return poll_job_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait,
print_state=print_state)

def poll_workflow_status(self, job_id: str) -> str:
def poll_workflow_status(self, job_id: str, print_state: bool = False) -> str:
return poll_wf_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait,
print_state=print_state)

def send_processing_job_request(self, processor_name: str, req_params: dict) -> str:
return post_ps_processing_request(
ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params)

def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str):
def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str, page_wise: bool = False):
return post_ps_workflow_request(
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets)
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets,
page_wise=page_wise)
39 changes: 25 additions & 14 deletions src/ocrd_network/client_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
from requests import get as request_get, post as request_post
from time import sleep
from .constants import JobState, NETWORK_PROTOCOLS


def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int):
def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_state: bool = False) -> JobState:
if job_type not in ["workflow", "processor"]:
raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'")
job_state = JobState.unset
Expand All @@ -13,18 +14,22 @@ def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries
job_state = get_ps_processing_job_status(ps_server_host, job_id)
if job_type == "workflow":
job_state = get_ps_workflow_job_status(ps_server_host, job_id)
if print_state:
print(f"State of the {job_type} job {job_id}: {job_state}")
if job_state == JobState.success or job_state == JobState.failed:
break
tries -= 1
return job_state


def poll_job_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait)
def poll_job_status_till_timeout_fail_or_success(
ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "processor", tries, wait, print_state)


def poll_wf_status_till_timeout_fail_or_success(ps_server_host: str, job_id: str, tries: int, wait: int) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait)
def poll_wf_status_till_timeout_fail_or_success(
ps_server_host: str, job_id: str, tries: int, wait: int, print_state: bool = False) -> JobState:
return _poll_endpoint_status(ps_server_host, job_id, "workflow", tries, wait, print_state)


def get_ps_deployed_processors(ps_server_host: str):
Expand All @@ -47,22 +52,21 @@ def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str):
return response


def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str:
def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> JobState:
request_url = f"{ps_server_host}/processor/job/{processing_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state

return getattr(JobState, job_state.lower())

def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str:
def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> JobState:
request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state
return getattr(JobState, job_state.lower())


def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str:
Expand All @@ -78,18 +82,25 @@ def post_ps_processing_request(ps_server_host: str, processor: str, job_input: d
return processing_job_id


# TODO: Can be extended to include other parameters such as page_wise
def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str:
request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True"
def post_ps_workflow_request(
ps_server_host: str,
path_to_wf: str,
path_to_mets: str,
page_wise: bool = False,
) -> str:
request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise={'True' if page_wise else 'False'}"
response = request_post(
url=request_url,
headers={"accept": "application/json; charset=utf-8"},
files={"workflow": open(path_to_wf, "rb")}
)
# print(response.json())
# print(response.__dict__)
json_resp_raw = response.text
# print(f'post_ps_workflow_request >> {response.status_code}')
# print(f'post_ps_workflow_request >> {json_resp_raw}')
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
wf_job_id = response.json()["job_id"]
wf_job_id = json.loads(json_resp_raw)["job_id"]
assert wf_job_id
return wf_job_id

Expand Down
15 changes: 14 additions & 1 deletion src/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from os import getpid
from pathlib import Path
from typing import Dict, List, Union
from typing import Dict, List, Optional, Union
from uvicorn import run as uvicorn_run

from fastapi import APIRouter, FastAPI, File, HTTPException, Request, status, UploadFile
Expand Down Expand Up @@ -48,6 +48,7 @@
get_workflow_content,
get_from_database_workspace,
get_from_database_workflow_job,
kill_mets_server_zombies,
parse_workflow_tasks,
raise_http_exception,
request_processor_server_tool_json,
Expand Down Expand Up @@ -200,6 +201,14 @@ def add_api_routes_others(self):
tags=[ServerApiTags.WORKSPACE],
summary="Forward a TCP request to UDS mets server"
)
others_router.add_api_route(
path="/kill_mets_server_zombies",
endpoint=self.kill_mets_server_zombies,
methods=["DELETE"],
tags=[ServerApiTags.WORKFLOW, ServerApiTags.PROCESSING],
status_code=status.HTTP_200_OK,
summary="!! Workaround Do Not Use Unless You Have A Reason !! Kill all METS servers on this machine that have been created more than 60 minutes ago."
)
self.include_router(others_router)

def add_api_routes_processing(self):
Expand Down Expand Up @@ -817,6 +826,10 @@ async def get_workflow_info(self, workflow_job_id) -> Dict:
response = self._produce_workflow_status_response(processing_jobs=jobs)
return response

async def kill_mets_server_zombies(self, minutes_ago : Optional[int] = None, dry_run : Optional[bool] = None) -> List[int]:
pids_killed = kill_mets_server_zombies(minutes_ago=minutes_ago, dry_run=dry_run)
return pids_killed

async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, JobState]:
"""
Simplified version of the `get_workflow_info` that returns a single state for the entire workflow.
Expand Down
8 changes: 8 additions & 0 deletions src/ocrd_network/runtime_data/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ def start_uds_mets_server(self, ws_dir_path: str) -> Path:
if is_mets_server_running(mets_server_url=str(mets_server_url)):
self.log.debug(f"The UDS mets server for {ws_dir_path} is already started: {mets_server_url}")
return mets_server_url
elif Path(mets_server_url).is_socket():
self.log.warning(
f"The UDS mets server for {ws_dir_path} is not running but the socket file exists: {mets_server_url}."
"Removing to avoid any weird behavior before starting the server.")
Path(mets_server_url).unlink()
self.log.info(f"Starting UDS mets server: {mets_server_url}")
pid = OcrdMetsServer.create_process(mets_server_url=mets_server_url, ws_dir_path=ws_dir_path, log_file=log_file)
self.mets_servers[mets_server_url] = pid
Expand All @@ -160,6 +165,9 @@ def stop_uds_mets_server(self, mets_server_url: str, stop_with_pid: bool = False
raise Exception(message)
mets_server_pid = self.mets_servers[Path(mets_server_url)]
OcrdMetsServer.kill_process(mets_server_pid=mets_server_pid)
if Path(mets_server_url).exists():
self.log.warning(f"Deployer is removing the existing UDS socket file: {mets_server_url}")
Path(mets_server_url).unlink()
return
# TODO: Reconsider this again
# Not having this sleep here causes connection errors
Expand Down
44 changes: 40 additions & 4 deletions src/ocrd_network/server_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import os
import re
import signal
from pathlib import Path
from json import dumps, loads
from urllib.parse import urljoin
from typing import Dict, List, Optional, Union
from time import time

from fastapi import HTTPException, status, UploadFile
from fastapi.responses import FileResponse
from httpx import AsyncClient, Timeout
from json import dumps, loads
from logging import Logger
from pathlib import Path
from requests import get as requests_get
from typing import Dict, List, Union
from urllib.parse import urljoin
from requests_unixsocket import sys

from ocrd.resolver import Resolver
from ocrd.task_sequence import ProcessorTask
Expand Down Expand Up @@ -241,3 +247,33 @@ def validate_first_task_input_file_groups_existence(logger: Logger, mets_path: s
if group not in available_groups:
message = f"Input file group '{group}' of the first processor not found: {input_file_grps}"
raise_http_exception(logger, status.HTTP_422_UNPROCESSABLE_ENTITY, message)


def kill_mets_server_zombies(minutes_ago : Optional[int], dry_run : Optional[bool]) -> List[int]:
if minutes_ago == None:
minutes_ago = 90
if dry_run == None:
dry_run = False

now = time()
cmdline_pat = r'.*ocrd workspace -U.*server start $'
ret = []
for procdir in sorted(Path('/proc').glob('*'), key=os.path.getctime):
if not procdir.is_dir():
continue
cmdline_file = procdir.joinpath('cmdline')
if not cmdline_file.is_file():
continue
ctime_ago = int((now - procdir.stat().st_ctime) / 60)
if ctime_ago < minutes_ago:
continue
cmdline = cmdline_file.read_text().replace('\x00', ' ')
if re.match(cmdline_pat, cmdline):
pid = int(procdir.name)
ret.append(pid)
print(f'METS Server with PID {pid} was created {ctime_ago} minutes ago, more than {minutes_ago}, so killing (cmdline="{cmdline})', file=sys.stderr)
if dry_run:
print(f'[dry_run is active] kill {pid}')
else:
os.kill(pid, signal.SIGTERM)
return ret
kba marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion src/ocrd_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def _ocrd_download_timeout_parser(val):
config.add("OCRD_NETWORK_CLIENT_POLLING_SLEEP",
description="How many seconds to sleep before trying again.",
parser=int,
default=(True, 30))
default=(True, 10))

config.add("OCRD_NETWORK_CLIENT_POLLING_TIMEOUT",
description="Timeout for a blocking ocrd network client (in seconds).",
Expand Down
Loading