Skip to content

Commit

Permalink
Merge pull request #1278 from OCR-D/page-wise-param
Browse files Browse the repository at this point in the history
post_ps_workflow_request: pagewise configurable
  • Loading branch information
kba authored Oct 2, 2024
2 parents d7df200 + 97427e0 commit 7a3be1e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
23 changes: 18 additions & 5 deletions src/ocrd_network/cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from json import dumps
from typing import List, Optional, Tuple
from ocrd.decorators.parameter_option import parameter_option, parameter_override_option
from ocrd_network.constants import JobState
from ocrd_utils import DEFAULT_METS_BASENAME
from ocrd_utils.introspect import set_json_key_value_overrides
from ocrd_utils.str import parse_json_string_or_file
Expand Down Expand Up @@ -179,6 +180,7 @@ def check_workflow_job_status(address: Optional[str], workflow_job_id: str):
'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" env variable is used by default')
@click.option('-m', '--path-to-mets', required=True)
@click.option('-w', '--path-to-workflow', required=True)
@click.option('--page-wise/--no-page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs")
@click.option('-b', '--block', default=False, is_flag=True,
help='If set, the client will block till job timeout, fail or success.')
@click.option('-p', '--print-state', default=False, is_flag=True,
Expand All @@ -187,19 +189,30 @@ def send_workflow_job_request(
address: Optional[str],
path_to_mets: str,
path_to_workflow: str,
block: Optional[bool],
print_state: Optional[bool]
page_wise: bool,
block: bool,
print_state: bool
):
"""
Submit a workflow job to the processing server.
"""
client = Client(server_addr_processing=address)
workflow_job_id = client.send_workflow_job_request(path_to_wf=path_to_workflow, path_to_mets=path_to_mets)
workflow_job_id = client.send_workflow_job_request(
path_to_wf=path_to_workflow,
path_to_mets=path_to_mets,
page_wise=page_wise,
)
assert workflow_job_id
print(f"Workflow job id: {workflow_job_id}")
if block:
client.poll_workflow_status(job_id=workflow_job_id, print_state=print_state)

print(f"Polling state of workflow job {workflow_job_id}")
state = client.poll_workflow_status(job_id=workflow_job_id, print_state=print_state)
if state != JobState.success:
print(f"Workflow failed with {state}")
exit(1)
else:
print(f"Workflow succeeded")
exit(0)

@client_cli.group('workspace')
def workspace_cli():
Expand Down
5 changes: 3 additions & 2 deletions src/ocrd_network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def send_processing_job_request(self, processor_name: str, req_params: dict) ->
return post_ps_processing_request(
ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params)

def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str):
def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str, page_wise: bool):
return post_ps_workflow_request(
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets)
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets,
page_wise=page_wise)
28 changes: 17 additions & 11 deletions src/ocrd_network/client_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import json
from requests import get as request_get, post as request_post
from time import sleep
from .constants import JobState, NETWORK_PROTOCOLS


def _poll_endpoint_status(
ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_state: bool = False):
def _poll_endpoint_status(ps_server_host: str, job_id: str, job_type: str, tries: int, wait: int, print_state: bool = False) -> JobState:
if job_type not in ["workflow", "processor"]:
raise ValueError(f"Unknown job type '{job_type}', expected 'workflow' or 'processor'")
job_state = JobState.unset
Expand Down Expand Up @@ -52,22 +52,21 @@ def get_ps_processing_job_log(ps_server_host: str, processing_job_id: str):
return response


def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> str:
def get_ps_processing_job_status(ps_server_host: str, processing_job_id: str) -> JobState:
request_url = f"{ps_server_host}/processor/job/{processing_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state

return getattr(JobState, job_state.lower())

def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> str:
def get_ps_workflow_job_status(ps_server_host: str, workflow_job_id: str) -> JobState:
request_url = f"{ps_server_host}/workflow/job-simple/{workflow_job_id}"
response = request_get(url=request_url, headers={"accept": "application/json; charset=utf-8"})
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
job_state = response.json()["state"]
assert job_state
return job_state
return getattr(JobState, job_state.lower())


def post_ps_processing_request(ps_server_host: str, processor: str, job_input: dict) -> str:
Expand All @@ -83,18 +82,25 @@ def post_ps_processing_request(ps_server_host: str, processor: str, job_input: d
return processing_job_id


# TODO: Can be extended to include other parameters such as page_wise
def post_ps_workflow_request(ps_server_host: str, path_to_wf: str, path_to_mets: str) -> str:
request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise=True"
def post_ps_workflow_request(
ps_server_host: str,
path_to_wf: str,
path_to_mets: str,
page_wise: bool,
) -> str:
request_url = f"{ps_server_host}/workflow/run?mets_path={path_to_mets}&page_wise={'True' if page_wise else 'False'}"
response = request_post(
url=request_url,
headers={"accept": "application/json; charset=utf-8"},
files={"workflow": open(path_to_wf, "rb")}
)
# print(response.json())
# print(response.__dict__)
json_resp_raw = response.text
# print(f'post_ps_workflow_request >> {response.status_code}')
# print(f'post_ps_workflow_request >> {json_resp_raw}')
assert response.status_code == 200, f"Processing server: {request_url}, {response.status_code}"
wf_job_id = response.json()["job_id"]
wf_job_id = json.loads(json_resp_raw)["job_id"]
assert wf_job_id
return wf_job_id

Expand Down

0 comments on commit 7a3be1e

Please sign in to comment.