Skip to content

Commit

Permalink
Synchronizes QUEUED and RUNNING Runs with their related RQ Jobs #130 (#…
Browse files Browse the repository at this point in the history
…357)

* Flag stale runs on app ready in SYNC mode #130

Signed-off-by: Thomas Druez <[email protected]>

* Enable redis data persistence using AOF #130

with default policy of fsync every second

Signed-off-by: Thomas Druez <[email protected]>

* Make sure the job is found before calling delete in Run.delete_task #130

Signed-off-by: Thomas Druez <[email protected]>

* Add a sync_with_job method on the run model #130

Synchronise the `self` Run instance with its related RQ Job

Signed-off-by: Thomas Druez <[email protected]>

* Synchronizes QUEUED and RUNNING Runs with their related Jobs on app ready #130

Signed-off-by: Thomas Druez <[email protected]>

* Add unit test for sync_with_job method #130

Signed-off-by: Thomas Druez <[email protected]>

* Move the synchronization process in a custom Worker class #130

Signed-off-by: Thomas Druez <[email protected]>

* Simplify the synchronization logic #130

Signed-off-by: Thomas Druez <[email protected]>

* Reduce the "cleaning lock" ttl from 899 seconds to 60 seconds in ASYNC queue #130

Signed-off-by: Thomas Druez <[email protected]>

* Add unit tests for better coverage #130

Signed-off-by: Thomas Druez <[email protected]>

* Add CHANGELOG entry #130

Signed-off-by: Thomas Druez <[email protected]>
  • Loading branch information
tdruez authored Nov 22, 2021
1 parent 4356921 commit 5042da0
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 6 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
**/.aws
**/.ssh
**/.DS_Store
**/.aof
**/venv
**/env
**/bin
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ local
/scancodeio.egg-info/
policies.yml
*.rdb
*.aof

# This is only created when packaging for external redistribution
/thirdparty/
19 changes: 19 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@ Changelog
Unreleased
----------

- Synchronize QUEUED and RUNNING pipeline runs with their related worker jobs during
worker maintenance tasks scheduled every 10 minutes.
If a container was taken down while a pipeline was running, or if pipeline process
was killed unexpectedly, that pipeline run status will be updated to a FAILED state
during the next maintenance tasks.
QUEUED pipeline will be restored in the queue as the worker redis cache backend data
is now persistent and reloaded on starting the image.
Note that internaly, a running job emits a "heartbeat" every 60 seconds to let all the
workers know that it is properly running.
After 90 seconds without any heartbeats, a worker will determine that the job is not
active anymore and that job will be moved to the failed registry during the worker
maintenance tasks. The pipeline run will be updated as well to reflect this failure
in the Web UI, the REST API, and the command line interface.
https:/nexB/scancode.io/issues/130

- Enable redis data persistence using the "Append Only File" with the default policy of
fsync every second in the docker-compose.
https:/nexB/scancode.io/issues/130

- Add a new tutorial chapter about license policies and compliance alerts.
https:/nexB/scancode.io/issues/337

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ test:
${MANAGE} test --noinput

worker:
${MANAGE} rqworker
${MANAGE} rqworker --worker-class scancodeio.worker.ScanCodeIOWorker --queue-class scancodeio.worker.ScanCodeIOQueue --verbosity 2

bump:
@echo "-> Bump the version"
Expand Down
10 changes: 8 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ services:

redis:
image: redis
# Enable redis data persistence using the "Append Only File" with the
# default policy of fsync every second. See https://redis.io/topics/persistence
command: redis-server --appendonly yes
volumes:
- redis_data:/data

web:
build: .
Expand All @@ -30,7 +35,7 @@ services:

worker:
build: .
command: ./manage.py rqworker
command: ./manage.py rqworker --worker-class scancodeio.worker.ScanCodeIOWorker --queue-class scancodeio.worker.ScanCodeIOQueue --verbosity 2
env_file:
- docker.env
volumes:
Expand All @@ -39,7 +44,7 @@ services:
depends_on:
- redis
- db
- web # ensure that potential db migrations run first
- web # Ensure that potential db migrations run first

nginx:
image: nginx
Expand All @@ -54,5 +59,6 @@ services:

volumes:
db_data:
redis_data:
static:
workspace:
74 changes: 74 additions & 0 deletions scancodeio/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# SPDX-License-Identifier: Apache-2.0
#
# http://nexb.com and https:/nexB/scancode.io
# The ScanCode.io software is licensed under the Apache License version 2.0.
# Data generated with ScanCode.io is provided as-is without warranties.
# ScanCode is a trademark of nexB Inc.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# ScanCode.io should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
#
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https:/nexB/scancode.io for support and download.

from django.apps import apps
from django.conf import settings

from rq.queue import Queue
from rq.worker import Worker

scanpipe_app = apps.get_app_config("scanpipe")


class ScanCodeIOWorker(Worker):
"""
Modified version of RQ Worker including ScanCode.io customizations.
"""

def run_maintenance_tasks(self):
"""
Add Runs and Jobs synchronization to the periodic maintenance tasks.
Maintenance tasks should run on first worker startup or every 10 minutes.
During the maintenance, one of the worker will acquire a "cleaning lock" and
will run the registries cleanup.
During that cleanup, started Jobs that haven't sent a heartbeat in the past 90
seconds (job_monitoring_interval + 60) will be considered failed and will be
moved to the FailedJobRegistry.
This happens when the Job process is killed (voluntary or not) and the heartbeat
is the RQ approach to determine if the job is stills active.
The `sync_runs_and_jobs` will see this Job as failed and will update its related
Run accordingly.
"""
super().run_maintenance_tasks()

# The Runs and Jobs synchronization needs to be executed after the
# `self.clean_registries()` that takes place in the in the parent
# `super().run_maintenance_tasks()`.
scanpipe_app.sync_runs_and_jobs()


class ScanCodeIOQueue(Queue):
"""
Modified version of RQ Queue including ScanCode.io customizations.
"""

# Reduce the "cleaning lock" ttl from default hardcoded 899 seconds to 60 seconds.
cleaning_lock_ttl = 60

def acquire_cleaning_lock(self):
"""
Returns a boolean indicating whether a lock to clean this queue is acquired.
"""
return self.connection.set(
self.registry_cleaning_key, 1, nx=1, ex=self.cleaning_lock_ttl
)
26 changes: 26 additions & 0 deletions scanpipe/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import inspect
import logging
import sys
from importlib.machinery import SourceFileLoader
from pathlib import Path

Expand Down Expand Up @@ -64,6 +65,15 @@ def ready(self):
self.load_pipelines()
self.set_policies()

# In SYNC mode, the Run instances cleanup is triggered on app.ready()
# only when the app is started through "runserver".
# This cleanup is required if the a running pipeline process gets killed and
# since KeyboardInterrupt cannot be captured to properly update the Run instance
# before its running process death.
# In ASYNC mode, the cleanup is handled by the "ScanCodeIOWorker" worker.
if not settings.SCANCODEIO_ASYNC and "runserver" in sys.argv:
self.sync_runs_and_jobs()

def load_pipelines(self):
"""
Loads pipelines from the "scancodeio_pipelines" entry point group and from the
Expand Down Expand Up @@ -188,3 +198,19 @@ def policies_enabled(self):
Returns True if the policies were provided and loaded properly.
"""
return bool(self.license_policies_index)

def sync_runs_and_jobs(self):
"""
Synchronizes QUEUED and RUNNING Runs with their related Jobs.
"""
logger.info("Synchronizing QUEUED and RUNNING Runs with their related Jobs...")

run_model = self.get_model("Run")
queued_or_running = run_model.objects.queued_or_running()

if queued_or_running:
logger.info(f"{len(queued_or_running)} Runs to synchronize:")
for run in queued_or_running:
run.sync_with_job()
else:
logger.info("No Runs to synchronize.")
74 changes: 72 additions & 2 deletions scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# Visit https:/nexB/scancode.io for support and download.

import inspect
import logging
import re
import shutil
import uuid
Expand Down Expand Up @@ -61,6 +62,7 @@
from scanpipe.packagedb_models import AbstractPackage
from scanpipe.packagedb_models import AbstractResource

logger = logging.getLogger(__name__)
scanpipe_app = apps.get_app_config("scanpipe")


Expand Down Expand Up @@ -328,8 +330,9 @@ def delete_task(self, delete_self=True):
Deletes a "not started" or "queued" task.
"""
if settings.SCANCODEIO_ASYNC and self.task_id:
# Cancels the job and deletes the job hash from Redis.
self.job.delete()
job = self.job
if job:
self.job.delete()

if delete_self:
self.delete()
Expand Down Expand Up @@ -1047,6 +1050,73 @@ def execute_task_async(self):

return job

def sync_with_job(self):
"""
Synchronise this Run instance with its related RQ Job.
This is required when a Run gets out of sync with its Job, this can happen
when the worker or one of its processes is killed, the Run status is not
properly updated and may stay in a Queued or Running state forever.
In case the Run is out of sync of its related Job, the Run status will be
updated accordingly. When the run was in the queue, it will be enqueued again.
"""
RunStatus = self.Status

if settings.SCANCODEIO_ASYNC:
job_status = self.job_status
else:
job_status = None

if not job_status:
if self.status == RunStatus.QUEUED:
logger.info(
f"No Job found for QUEUED Run={self.task_id}. "
f"Enqueueing a new Job in the worker registery."
)
self.execute_task_async()

elif self.status == RunStatus.RUNNING:
logger.info(
f"No Job found for RUNNING Run={self.task_id}. "
f"Flagging this Run as STALE."
)
self.set_task_staled()

return

job_is_out_of_sync = any(
[
self.status == RunStatus.RUNNING and job_status != JobStatus.STARTED,
self.status == RunStatus.QUEUED and job_status != JobStatus.QUEUED,
]
)

if job_is_out_of_sync:
if job_status == JobStatus.STOPPED:
logger.info(
f"Job found as {job_status} for RUNNING Run={self.task_id}. "
f"Flagging this Run as STOPPED."
)
self.set_task_stopped()

elif job_status == JobStatus.FAILED:
logger.info(
f"Job found as {job_status} for RUNNING Run={self.task_id}. "
f"Flagging this Run as FAILED."
)
self.set_task_ended(
exitcode=1,
output=f"Job was moved to the FailedJobRegistry during cleanup",
)

else:
logger.info(
f"Job found as {job_status} for RUNNING Run={self.task_id}. "
f"Flagging this Run as STALE."
)
self.set_task_staled()

def set_scancodeio_version(self):
"""
Sets the current ScanCode.io version on the `Run.scancodeio_version` field.
Expand Down
25 changes: 25 additions & 0 deletions scanpipe/tests/test_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https:/nexB/scancode.io for support and download.

import uuid
from pathlib import Path
from unittest import mock

from django.apps import apps
from django.core.exceptions import ImproperlyConfigured
from django.test import TestCase
from django.test import override_settings
from django.utils import timezone

from scanpipe.apps import ScanPipeConfig
from scanpipe.models import Project
from scanpipe.models import Run
from scanpipe.tests import license_policies
from scanpipe.tests import license_policies_index
from scanpipe.tests.pipelines.register_from_file import RegisterFromFile
Expand Down Expand Up @@ -94,3 +98,24 @@ def test_scanpipe_apps_register_pipeline_from_file(self):

exitcode, output = pipeline_instance.execute()
self.assertEqual(0, exitcode)

@mock.patch("scanpipe.models.Run.sync_with_job")
def test_scanpipe_apps_sync_runs_and_jobs(self, mock_sync_with_job):
project1 = Project.objects.create(name="Analysis")
not_started = Run.objects.create(project=project1, pipeline_name="pipeline")
queued = Run.objects.create(
project=project1, pipeline_name="pipeline", task_id=uuid.uuid4()
)
running = Run.objects.create(
project=project1,
pipeline_name="pipeline",
task_id=uuid.uuid4(),
task_start_date=timezone.now(),
)

self.assertEqual(Run.Status.NOT_STARTED, not_started.status)
self.assertEqual(Run.Status.QUEUED, queued.status)
self.assertEqual(Run.Status.RUNNING, running.status)

scanpipe_app.sync_runs_and_jobs()
self.assertEqual(2, mock_sync_with_job.call_count)
Loading

0 comments on commit 5042da0

Please sign in to comment.