Skip to content

Commit

Permalink
Merge pull request #1069 from OCR-D/processing_server_ext_1046
Browse files Browse the repository at this point in the history
Processing server extension (#1046)
  • Loading branch information
kba authored Sep 4, 2023
2 parents 3d831b4 + 79016bf commit e352a48
Show file tree
Hide file tree
Showing 15 changed files with 546 additions and 93 deletions.
79 changes: 68 additions & 11 deletions ocrd_network/ocrd_network/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
""" The database is used to store information regarding jobs and workspaces.
Jobs: for every process-request a job is inserted into the database with a uuid, status and
Jobs: for every process-request a job is inserted into the database with an uuid, status and
information about the process like parameters and file groups. It is mainly used to track the status
(`ocrd_network.models.job.StateEnum`) of a job so that the state of a job can be queried. Finished
jobs are not deleted from the database.
Expand Down Expand Up @@ -35,18 +35,77 @@ async def sync_initiate_database(db_url: str):
await initiate_database(db_url)


async def db_get_workspace(workspace_id: str) -> DBWorkspace:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
if not workspace:
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
workspace = None
if not workspace_id and not workspace_mets_path:
raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
if workspace_id:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
if not workspace:
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
if workspace_mets_path:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_mets_path == workspace_mets_path
)
if not workspace:
raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')
return workspace


@call_sync
async def sync_db_get_workspace(workspace_id: str) -> DBWorkspace:
return await db_get_workspace(workspace_id)
async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)


async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs):
workspace = None
if not workspace_id and not workspace_mets_path:
raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
if workspace_id:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_id == workspace_id
)
if not workspace:
raise ValueError(f'Workspace with id "{workspace_id}" not in the DB.')
if workspace_mets_path:
workspace = await DBWorkspace.find_one(
DBWorkspace.workspace_mets_path == workspace_mets_path
)
if not workspace:
raise ValueError(f'Workspace with path "{workspace_mets_path}" not in the DB.')

job_keys = list(workspace.__dict__.keys())
for key, value in kwargs.items():
if key not in job_keys:
raise ValueError(f'Field "{key}" is not available.')
if key == 'workspace_id':
workspace.workspace_id = value
elif key == 'workspace_mets_path':
workspace.workspace_mets_path = value
elif key == 'ocrd_identifier':
workspace.ocrd_identifier = value
elif key == 'bagit_profile_identifier':
workspace.bagit_profile_identifier = value
elif key == 'ocrd_base_version_checksum':
workspace.ocrd_base_version_checksum = value
elif key == 'ocrd_mets':
workspace.ocrd_mets = value
elif key == 'bag_info_adds':
workspace.bag_info_adds = value
elif key == 'deleted':
workspace.deleted = value
elif key == 'pages_locked':
workspace.pages_locked = value
else:
raise ValueError(f'Field "{key}" is not updatable.')
await workspace.save()


@call_sync
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs):
await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)


async def db_get_processing_job(job_id: str) -> DBProcessorJob:
Expand All @@ -68,8 +127,6 @@ async def db_update_processing_job(job_id: str, **kwargs):
if not job:
raise ValueError(f'Processing job with id "{job_id}" not in the DB.')

# TODO: This may not be the best Pythonic way to do it. However, it works!
# There must be a shorter way with Pydantic. Suggest an improvement.
job_keys = list(job.__dict__.keys())
for key, value in kwargs.items():
if key not in job_keys:
Expand Down
3 changes: 2 additions & 1 deletion ocrd_network/ocrd_network/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, config_path: str) -> None:
self.data_mongo: DataMongoDB = DataMongoDB(config['database'])
self.data_queue: DataRabbitMQ = DataRabbitMQ(config['process_queue'])
self.data_hosts: List[DataHost] = []
self.internal_callback_url = config.get('internal_callback_url', None)
for config_host in config['hosts']:
self.data_hosts.append(DataHost(config_host))

Expand Down Expand Up @@ -302,7 +303,7 @@ def deploy_mongodb(
) -> str:
if self.data_mongo.skip_deployment:
self.log.debug('MongoDB is externaly managed. Skipping deployment')
verify_mongodb_available(self.data_mongo.url);
verify_mongodb_available(self.data_mongo.url)
return self.data_mongo.url

self.log.debug(f"Trying to deploy '{image}', with modes: "
Expand Down
2 changes: 2 additions & 0 deletions ocrd_network/ocrd_network/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
'PYJobInput',
'PYJobOutput',
'PYOcrdTool',
'PYResultMessage',
'StateEnum',
]

Expand All @@ -18,5 +19,6 @@
PYJobOutput,
StateEnum
)
from .messages import PYResultMessage
from .ocrd_tool import PYOcrdTool
from .workspace import DBWorkspace
13 changes: 13 additions & 0 deletions ocrd_network/ocrd_network/models/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@


class StateEnum(str, Enum):
# The processing job is cached inside the Processing Server requests cache
cached = 'CACHED'
# The processing job is queued inside the RabbitMQ
queued = 'QUEUED'
# Processing job is currently running in a Worker or Processor Server
running = 'RUNNING'
# Processing job finished successfully
success = 'SUCCESS'
# Processing job failed
failed = 'FAILED'


class PYJobInput(BaseModel):
""" Wraps the parameters required to make a run-processor-request
"""
processor_name: Optional[str] = None
path_to_mets: Optional[str] = None
workspace_id: Optional[str] = None
description: Optional[str] = None
Expand All @@ -28,6 +35,10 @@ class PYJobInput(BaseModel):
# Used to toggle between sending requests to 'worker and 'server',
# i.e., Processing Worker and Processor Server, respectively
agent_type: Optional[str] = 'worker'
# Auto generated by the Processing Server when forwarding to the Processor Server
job_id: Optional[str] = None
# If set, specifies a list of job ids this job depends on
depends_on: Optional[List[str]] = None

class Config:
schema_extra = {
Expand Down Expand Up @@ -65,8 +76,10 @@ class DBProcessorJob(Document):
output_file_grps: Optional[List[str]]
page_id: Optional[str]
parameters: Optional[dict]
depends_on: Optional[List[str]]
result_queue_name: Optional[str]
callback_url: Optional[str]
internal_callback_url: Optional[str]
start_time: Optional[datetime]
end_time: Optional[datetime]
exec_time: Optional[str]
Expand Down
22 changes: 22 additions & 0 deletions ocrd_network/ocrd_network/models/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pydantic import BaseModel
from typing import Optional
from .job import StateEnum


class PYResultMessage(BaseModel):
""" Wraps the parameters required to make a result message request
"""
job_id: str
state: StateEnum
path_to_mets: Optional[str] = None
workspace_id: Optional[str] = None

class Config:
schema_extra = {
'example': {
'job_id': '123123123',
'state': 'SUCCESS',
'path_to_mets': '/path/to/mets.xml',
'workspace_id': 'c7f25615-fc17-4365-a74d-ad20e1ddbd0e'
}
}
10 changes: 9 additions & 1 deletion ocrd_network/ocrd_network/models/workspace.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from beanie import Document
from typing import Optional
from typing import Dict, Optional


class DBWorkspace(Document):
Expand All @@ -15,6 +15,10 @@ class DBWorkspace(Document):
ocrd_mets Ocrd-Mets (optional)
bag_info_adds bag-info.txt can also (optionally) contain additional
key-value-pairs which are saved here
deleted the document is deleted if set, however, the record is still preserved
pages_locked a data structure that holds output `fileGrp`s and their respective locked `page_id`
that are currently being processed by an OCR-D processor (server or worker).
If no `page_id` field is set, an identifier "all_pages" will be used.
"""
workspace_id: str
workspace_mets_path: str
Expand All @@ -24,6 +28,10 @@ class DBWorkspace(Document):
ocrd_mets: Optional[str]
bag_info_adds: Optional[dict]
deleted: bool = False
# Dictionary structure:
# Key: fileGrp
# Value: Set of `page_id`s
pages_locked: Optional[Dict] = {}

class Settings:
name = "workspace"
Loading

0 comments on commit e352a48

Please sign in to comment.