WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
56 changes: 56 additions & 0 deletions apps/worker/tasks/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import time
from contextlib import contextmanager
from datetime import datetime

import sentry_sdk
Expand Down Expand Up @@ -533,6 +535,60 @@ def get_repo_provider_service(

return None

@contextmanager
def with_logged_lock(self, lock, lock_name: str, **extra_log_context):
"""
Context manager that wraps a Redis lock with standardized logging.

This method provides consistent lock logging across all tasks:
- Logs "Acquiring lock" before attempting to acquire
- Logs "Acquired lock" after successful acquisition
- Logs "Releasing lock" with duration after release

Args:
lock: A Redis lock object (from redis_connection.lock())
lock_name: Name of the lock for logging purposes
**extra_log_context: Additional context to include in all log messages

Example:
with self.with_logged_lock(
redis_connection.lock(lock_name, timeout=300),
lock_name=lock_name,
repoid=repoid,
commitid=commitid,
):
# Your code here
pass
"""
log.info(
"Acquiring lock",
extra={
"lock_name": lock_name,
**extra_log_context,
},
)
with lock:
lock_acquired_time = time.time()
log.info(
"Acquired lock",
extra={
"lock_name": lock_name,
**extra_log_context,
},
)
try:
yield
finally:
lock_duration = time.time() - lock_acquired_time
log.info(
"Releasing lock",
extra={
"lock_name": lock_name,
"lock_duration_seconds": lock_duration,
**extra_log_context,
},
)

def _call_upload_breadcrumb_task(
self,
commit_sha: str,
Expand Down
12 changes: 8 additions & 4 deletions apps/worker/tasks/crontasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ def run_impl(self, db_session, *args, cron_task_generation_time_iso, **kwargs):
redis_connection = get_redis_connection()
generation_time = datetime.fromisoformat(cron_task_generation_time_iso)
try:
with redis_connection.lock(
lock_name,
timeout=max(60 * 5, self.hard_time_limit_task),
blocking_timeout=1,
with self.with_logged_lock(
redis_connection.lock(
lock_name,
timeout=max(60 * 5, self.hard_time_limit_task),
blocking_timeout=1,
),
lock_name=lock_name,
task_name=self.name,
):
min_seconds_interval = (
self.get_min_seconds_interval_between_executions()
Expand Down
13 changes: 9 additions & 4 deletions apps/worker/tasks/manual_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@ def run_impl(
lock_name = f"manual_trigger_lock_{repoid}_{commitid}"
redis_connection = get_redis_connection()
try:
with redis_connection.lock(
lock_name,
timeout=60 * 5,
blocking_timeout=5,
with self.with_logged_lock(
redis_connection.lock(
lock_name,
timeout=60 * 5,
blocking_timeout=5,
),
lock_name=lock_name,
repoid=repoid,
commitid=commitid,
):
return self.process_impl_within_lock(
db_session=db_session,
Expand Down
13 changes: 9 additions & 4 deletions apps/worker/tasks/preprocess_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,15 @@ def run_impl(self, db_session, *, repoid: int, commitid: str, **kwargs):
)
return {"preprocessed_upload": False, "reason": "already_running"}
try:
with redis_connection.lock(
lock_name,
timeout=60 * 5,
blocking_timeout=None,
with self.with_logged_lock(
redis_connection.lock(
lock_name,
timeout=60 * 5,
blocking_timeout=None,
),
lock_name=lock_name,
repoid=repoid,
commitid=commitid,
):
return self.process_impl_within_lock(
db_session=db_session,
Expand Down
13 changes: 9 additions & 4 deletions apps/worker/tasks/sync_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ def run_impl(
lock_name = f"pullsync_{repoid}_{pullid}"
start_wait = time.monotonic()
try:
with redis_connection.lock(
lock_name,
timeout=60 * 5,
blocking_timeout=0.5,
with self.with_logged_lock(
redis_connection.lock(
lock_name,
timeout=60 * 5,
blocking_timeout=0.5,
),
lock_name=lock_name,
repoid=repoid,
pullid=pullid,
):
return self.run_impl_within_lock(
db_session,
Expand Down
13 changes: 9 additions & 4 deletions apps/worker/tasks/sync_repos.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,15 @@ def run_impl(
lock_name = f"syncrepos_lock_{ownerid}_{using_integration}"
redis_connection = get_redis_connection()
try:
with redis_connection.lock(
lock_name,
timeout=max(300, self.hard_time_limit_task),
blocking_timeout=5,
with self.with_logged_lock(
redis_connection.lock(
lock_name,
timeout=max(300, self.hard_time_limit_task),
blocking_timeout=5,
),
lock_name=lock_name,
ownerid=ownerid,
using_integration=using_integration,
):
git = get_owner_provider_service(
owner,
Expand Down
165 changes: 165 additions & 0 deletions apps/worker/tasks/tests/unit/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from celery.contrib.testing.mocks import TaskMessage
from celery.exceptions import MaxRetriesExceededError, Retry, SoftTimeLimitExceeded
from prometheus_client import REGISTRY
from redis.exceptions import LockError
from redis.lock import Lock
from sqlalchemy.exc import (
DBAPIError,
IntegrityError,
Expand Down Expand Up @@ -917,3 +919,166 @@ def test_real_example_override_from_upload(
time_limit=450,
user_plan="users-enterprisey",
)


@pytest.mark.django_db(databases={"default", "timeseries"})
class TestBaseCodecovTaskWithLoggedLock:
def test_with_logged_lock_logs_acquiring_and_acquired(self, mocker):
"""Test that with_logged_lock logs 'Acquiring lock' and 'Acquired lock'."""
mock_log = mocker.patch("tasks.base.log")
mock_lock = mocker.MagicMock(spec=Lock)
mock_lock.__enter__ = mocker.MagicMock(return_value=None)
mock_lock.__exit__ = mocker.MagicMock(return_value=None)

task = BaseCodecovTask()
with task.with_logged_lock(mock_lock, lock_name="test_lock", repoid=123):
pass

acquiring_calls = [
call
for call in mock_log.info.call_args_list
if call[0][0] == "Acquiring lock"
]
assert len(acquiring_calls) == 1
assert acquiring_calls[0][1]["extra"]["lock_name"] == "test_lock"
assert acquiring_calls[0][1]["extra"]["repoid"] == 123

acquired_calls = [
call
for call in mock_log.info.call_args_list
if call[0][0] == "Acquired lock"
]
assert len(acquired_calls) == 1
assert acquired_calls[0][1]["extra"]["lock_name"] == "test_lock"
assert acquired_calls[0][1]["extra"]["repoid"] == 123

def test_with_logged_lock_logs_releasing_with_duration(self, mocker):
"""Test that with_logged_lock logs 'Releasing lock' with duration."""
mock_log = mocker.patch("tasks.base.log")
mock_lock = mocker.MagicMock(spec=Lock)
mock_lock.__enter__ = mocker.MagicMock(return_value=None)
mock_lock.__exit__ = mocker.MagicMock(return_value=None)

# Mock time.time to control duration
mock_time = mocker.patch("tasks.base.time.time")
mock_time.side_effect = [1000.0, 1000.5] # 0.5 second duration

task = BaseCodecovTask()
with task.with_logged_lock(mock_lock, lock_name="test_lock", commitid="abc123"):
pass

releasing_calls = [
call
for call in mock_log.info.call_args_list
if call[0][0] == "Releasing lock"
]
assert len(releasing_calls) == 1
assert releasing_calls[0][1]["extra"]["lock_name"] == "test_lock"
assert releasing_calls[0][1]["extra"]["commitid"] == "abc123"
# Use approximate comparison due to floating point precision
assert (
abs(releasing_calls[0][1]["extra"]["lock_duration_seconds"] - 0.5) < 0.001
)

def test_with_logged_lock_includes_extra_context(self, mocker):
"""Test that with_logged_lock includes all extra context in logs."""
mock_log = mocker.patch("tasks.base.log")
mock_lock = mocker.MagicMock(spec=Lock)
mock_lock.__enter__ = mocker.MagicMock(return_value=None)
mock_lock.__exit__ = mocker.MagicMock(return_value=None)

task = BaseCodecovTask()
with task.with_logged_lock(
mock_lock,
lock_name="test_lock",
repoid=123,
commitid="abc123",
report_type="coverage",
custom_field="custom_value",
):
pass

# Check that all extra context is included in all log calls
for log_call in mock_log.info.call_args_list:
extra = log_call[1]["extra"]
assert extra["lock_name"] == "test_lock"
assert extra["repoid"] == 123
assert extra["commitid"] == "abc123"
assert extra["report_type"] == "coverage"
assert extra["custom_field"] == "custom_value"

def test_with_logged_lock_executes_code_within_lock(self, mocker):
"""Test that code within with_logged_lock executes correctly."""
mock_log = mocker.patch("tasks.base.log")
mock_lock = mocker.MagicMock(spec=Lock)
mock_lock.__enter__ = mocker.MagicMock(return_value=None)
mock_lock.__exit__ = mocker.MagicMock(return_value=None)

task = BaseCodecovTask()
result = None
with task.with_logged_lock(mock_lock, lock_name="test_lock"):
result = "executed"

assert result == "executed"
mock_lock.__enter__.assert_called_once()
mock_lock.__exit__.assert_called_once()

def test_with_logged_lock_propagates_lock_error(self, mocker):
"""Test that LockError from lock acquisition is not caught by with_logged_lock."""
mock_log = mocker.patch("tasks.base.log")
mock_lock = mocker.MagicMock(spec=Lock)
mock_lock.__enter__ = mocker.MagicMock(side_effect=LockError("Lock failed"))

task = BaseCodecovTask()
with pytest.raises(LockError, match="Lock failed"):
with task.with_logged_lock(mock_lock, lock_name="test_lock"):
pass

# Should have logged "Acquiring lock" but not "Acquired lock" or "Releasing lock"
acquiring_calls = [
call
for call in mock_log.info.call_args_list
if call[0][0] == "Acquiring lock"
]
assert len(acquiring_calls) == 1

acquired_calls = [
call
for call in mock_log.info.call_args_list
if call[0][0] == "Acquired lock"
]
assert len(acquired_calls) == 0

releasing_calls = [
call
for call in mock_log.info.call_args_list
if call[0][0] == "Releasing lock"
]
assert len(releasing_calls) == 0

def test_with_logged_lock_logs_release_even_on_exception(self, mocker):
"""Test that 'Releasing lock' is logged even if code within raises an exception."""
mock_log = mocker.patch("tasks.base.log")
mock_lock = mocker.MagicMock(spec=Lock)
mock_lock.__enter__ = mocker.MagicMock(return_value=None)
mock_lock.__exit__ = mocker.MagicMock(return_value=None)

# Mock time.time to control duration
mock_time = mocker.patch("tasks.base.time.time")
mock_time.side_effect = [1000.0, 1000.2] # 0.2 second duration

task = BaseCodecovTask()
with pytest.raises(ValueError):
with task.with_logged_lock(mock_lock, lock_name="test_lock"):
raise ValueError("Test exception")

releasing_calls = [
call
for call in mock_log.info.call_args_list
if call[0][0] == "Releasing lock"
]
assert len(releasing_calls) == 1
# Use approximate comparison due to floating point precision
assert (
abs(releasing_calls[0][1]["extra"]["lock_duration_seconds"] - 0.2) < 0.001
)
14 changes: 10 additions & 4 deletions apps/worker/tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,16 @@ def run_impl(

lock_name = upload_context.lock_name("upload")
try:
with upload_context.redis_connection.lock(
lock_name,
timeout=max(300, self.hard_time_limit_task),
blocking_timeout=5,
with self.with_logged_lock(
upload_context.redis_connection.lock(
lock_name,
timeout=max(300, self.hard_time_limit_task),
blocking_timeout=5,
),
lock_name=lock_name,
repoid=repoid,
commitid=commitid,
report_type=report_type,
):
# Check whether a different `Upload` task has "stolen" our uploads
if not upload_context.has_pending_jobs():
Expand Down
Loading