WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions apps/worker/tasks/manual_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from database.models import Commit, Pull
from database.models.reports import CommitReport, Upload
from services.comparison import get_or_create_comparison
from services.processing.state import ProcessingState
from shared.celery_config import (
compute_comparison_task_name,
manual_upload_completion_trigger_task_name,
Expand Down Expand Up @@ -96,6 +97,26 @@ def process_impl_within_lock(
for upload in uploads:
if not upload.state or upload.state_id == UploadState.UPLOADED.db_id:
still_processing += 1

# Also check Redis processing state to prevent race conditions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I buy this, I think it makes more sense to use either the DB or Redis as the source of truth even with a race condition.

# where uploads have been marked complete in DB but are still
# being processed/merged by the finisher task
state = ProcessingState(repoid, commitid)
upload_numbers = state.get_upload_numbers()

if upload_numbers.processing > 0 or upload_numbers.processed > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processed to me implies that it's "done," e.g. a processing upload turns to a processed one. Is that not the case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is true, but there's another state merged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to this, I think we should have a helper function in state.py that does this logic. Perhaps has_unmerged_uploads

log.info(
"Retrying ManualTriggerTask. Redis shows uploads still being processed.",
extra={
"repoid": repoid,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: alphabetize

"commitid": commitid,
"redis_processing": upload_numbers.processing,
"redis_processed": upload_numbers.processed,
"db_still_processing": still_processing,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure the value this will have if it doesn't equal redis_processing + redis_processed

},
)
still_processing += upload_numbers.processing + upload_numbers.processed

if still_processing == 0:
self.trigger_notifications(repoid, commitid, commit_yaml)
if commit.pullid:
Expand Down
61 changes: 61 additions & 0 deletions apps/worker/tasks/tests/unit/test_manual_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from database.tests.factories import CommitFactory, PullFactory
from database.tests.factories.core import UploadFactory
from services.processing.state import UploadNumbers
from shared.reports.enums import UploadState
from tasks.manual_trigger import ManualTriggerTask

Expand All @@ -17,6 +18,14 @@ def test_manual_upload_completion_trigger(
mock_redis,
celery_app,
):
# Mock ProcessingState to return no pending uploads in Redis
mock_processing_state = mocker.patch("tasks.manual_trigger.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

mocked_app = mocker.patch.object(
ManualTriggerTask,
"app",
Expand Down Expand Up @@ -78,6 +87,14 @@ def test_manual_upload_completion_trigger_uploads_still_processing(
mock_redis,
celery_app,
):
# Mock ProcessingState to return no pending uploads in Redis
mock_processing_state = mocker.patch("tasks.manual_trigger.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

mocker.patch.object(
ManualTriggerTask,
"app",
Expand Down Expand Up @@ -105,3 +122,47 @@ def test_manual_upload_completion_trigger_uploads_still_processing(
"notifications_called": False,
"message": "Uploads are still in process and the task got retired so many times. Not triggering notifications.",
} == result

def test_manual_upload_completion_trigger_redis_pending(
self,
mocker,
mock_configuration,
dbsession,
mock_storage,
mock_redis,
celery_app,
):
"""Test that task retries when Redis shows pending uploads even if DB shows complete"""
# Mock ProcessingState to return pending uploads in Redis
mock_processing_state = mocker.patch("tasks.manual_trigger.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=1,
processed=2, # Redis shows 3 uploads still being processed/merged
)
mock_processing_state.return_value = mock_state_instance

mocker.patch.object(
ManualTriggerTask,
"app",
celery_app,
)
commit = CommitFactory.create()
# Upload is complete in DB
upload = UploadFactory.create(
report__commit=commit,
state="complete",
state_id=UploadState.PROCESSED.db_id,
)
dbsession.add(commit)
dbsession.add(upload)
dbsession.flush()

# Should retry because Redis shows pending uploads
with pytest.raises(Retry):
ManualTriggerTask().run_impl(
dbsession,
repoid=commit.repoid,
commitid=commit.commitid,
current_yaml={},
)
151 changes: 148 additions & 3 deletions apps/worker/tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import ANY, call

Expand All @@ -16,6 +17,7 @@
from helpers.log_context import LogContext, set_log_context
from services.processing.intermediate import intermediate_report_key
from services.processing.merging import get_joined_flag, update_uploads
from services.processing.state import UploadNumbers
from services.processing.types import MergeResult, ProcessingResult
from services.timeseries import MeasurementName
from shared.celery_config import (
Expand Down Expand Up @@ -394,7 +396,15 @@ def test_upload_finisher_task_call_different_branch(
]
)

def test_should_call_notifications(self, dbsession):
def test_should_call_notifications(self, dbsession, mocker):
# Mock ProcessingState to return no pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {"codecov": {"max_report_age": "1y ago"}}
commit = CommitFactory.create(
message="dsidsahdsahdsa",
Expand Down Expand Up @@ -432,7 +442,15 @@ def test_should_call_notifications_manual_trigger(self, dbsession):
== ShouldCallNotifyResult.DO_NOT_NOTIFY
)

def test_should_call_notifications_manual_trigger_off(self, dbsession):
def test_should_call_notifications_manual_trigger_off(self, dbsession, mocker):
# Mock ProcessingState to return no pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {
"codecov": {"max_report_age": "1y ago", "notify": {"manual_trigger": False}}
}
Expand Down Expand Up @@ -463,8 +481,16 @@ def test_should_call_notifications_manual_trigger_off(self, dbsession):
],
)
def test_should_call_notifications_no_successful_reports(
self, dbsession, notify_error, result
self, dbsession, mocker, notify_error, result
):
# Mock ProcessingState to return no pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {
"codecov": {
"max_report_age": "1y ago",
Expand All @@ -491,6 +517,14 @@ def test_should_call_notifications_no_successful_reports(
)

def test_should_call_notifications_not_enough_builds(self, dbsession, mocker):
# Mock ProcessingState to return no pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {"codecov": {"notify": {"after_n_builds": 9}}}
commit = CommitFactory.create(
message="dsidsahdsahdsa",
Expand Down Expand Up @@ -518,6 +552,14 @@ def test_should_call_notifications_not_enough_builds(self, dbsession, mocker):
)

def test_should_call_notifications_more_than_enough_builds(self, dbsession, mocker):
# Mock ProcessingState to return no pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {"codecov": {"notify": {"after_n_builds": 9}}}
commit = CommitFactory.create(
message="dsidsahdsahdsa",
Expand All @@ -544,7 +586,48 @@ def test_should_call_notifications_more_than_enough_builds(self, dbsession, mock
== ShouldCallNotifyResult.NOTIFY
)

def test_should_call_notifications_with_pending_uploads_in_redis(
self, dbsession, mocker
):
"""Test that notifications are not called when Redis shows pending uploads"""
# Mock ProcessingState to return pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=2,
processed=1, # Redis shows 3 uploads still being processed
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {"codecov": {"max_report_age": "1y ago"}}
commit = CommitFactory.create(
message="dsidsahdsahdsa",
commitid="abf6d4df662c47e32460020ab14abf9303581429",
repository__author__unencrypted_oauth_token="testulk3d54rlhxkjyzomq2wh8b7np47xabcrkx8",
repository__author__username="ThiagoCodecov",
repository__yaml=commit_yaml,
)
dbsession.add(commit)
dbsession.flush()

assert (
UploadFinisherTask().should_call_notifications(
commit,
commit_yaml,
[{"arguments": {"url": "url"}, "successful": True}],
)
== ShouldCallNotifyResult.DO_NOT_NOTIFY
)

def test_finish_reports_processing(self, dbsession, mocker, mock_self_app):
# Mock ProcessingState to return no pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {}
commit = CommitFactory.create(
message="dsidsahdsahdsa",
Expand Down Expand Up @@ -590,6 +673,14 @@ def test_finish_reports_processing(self, dbsession, mocker, mock_self_app):
def test_finish_reports_processing_with_pull(
self, dbsession, mocker, mock_self_app
):
# Mock ProcessingState to return no pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {}
repository = RepositoryFactory.create(
author__unencrypted_oauth_token="testulk3d54rlhxkjyzomq2wh8b7np47xabcrkx8",
Expand Down Expand Up @@ -663,6 +754,14 @@ def test_finish_reports_processing_with_pull(
def test_finish_reports_processing_no_notification(
self, dbsession, mocker, notify_error, mock_self_app
):
# Mock ProcessingState to return no pending uploads
mock_processing_state = mocker.patch("tasks.upload_finisher.ProcessingState")
mock_state_instance = mocker.MagicMock()
mock_state_instance.get_upload_numbers.return_value = UploadNumbers(
processing=0, processed=0
)
mock_processing_state.return_value = mock_state_instance

commit_yaml = {"codecov": {"notify": {"notify_error": notify_error}}}
commit = CommitFactory.create(
message="dsidsahdsahdsa",
Expand Down Expand Up @@ -1156,3 +1255,49 @@ def test_reconstruct_processing_results_returns_empty_when_no_uploads_found(

# Verify empty list returned when no uploads found
assert result == []

@pytest.mark.django_db
def test_upload_finisher_updates_repository_timestamp(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test need to exist as a standalone?

self,
mocker,
mock_configuration,
dbsession,
mock_storage,
mock_repo_provider,
mock_redis,
mock_self_app,
):
"""Test that repository.updatestamp is updated when None or old"""

mock_redis.scard.return_value = 0
mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[])
mocker.patch("tasks.upload_finisher.update_uploads")

# Create commit with repository that has no updatestamp
commit = CommitFactory.create(
message="test",
branch="main",
repository__branch="main",
)
# Ensure updatestamp is None
commit.repository.updatestamp = None
dbsession.add(commit)
dbsession.flush()

previous_results = [
{"upload_id": 0, "arguments": {"url": "test_url"}, "successful": True}
]

result = UploadFinisherTask().run_impl(
dbsession,
previous_results,
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

assert result == {"notifications_called": True}
dbsession.refresh(commit.repository)
# Repository timestamp should now be set
assert commit.repository.updatestamp is not None
assert (datetime.now(tz=UTC) - commit.repository.updatestamp).seconds < 60
26 changes: 26 additions & 0 deletions apps/worker/tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@ def run_impl(

except SoftTimeLimitExceeded:
log.warning("run_impl: soft time limit exceeded")
# Clean up orphaned state so future finishers can proceed
state.mark_uploads_as_merged(upload_ids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is wrong, this will mark all uploads as merged correct? But if we hit a SoftTimeLimitExceeded, I'm guessing this is not true

log.info(
"Cleaned up processing state after timeout",
extra={"upload_ids": upload_ids},
)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
Expand All @@ -361,6 +367,12 @@ def run_impl(

except Exception as e:
log.exception("run_impl: unexpected error in upload finisher")
# Clean up orphaned state so future finishers can proceed
state.mark_uploads_as_merged(upload_ids)
log.info(
"Cleaned up processing state after exception",
extra={"upload_ids": upload_ids, "error": str(e)},
)
sentry_sdk.capture_exception(e)
log.exception(
"Unexpected error in upload finisher",
Expand Down Expand Up @@ -676,6 +688,20 @@ def should_call_notifications(
"parent_task": self.request.parent_id,
}

# Check if there are still pending uploads
state = ProcessingState(commit.repoid, commit.commitid)
upload_numbers = state.get_upload_numbers()
if upload_numbers.processing > 0 or upload_numbers.processed > 0:
log.info(
"Not scheduling notify because there are still pending uploads",
extra={
**extra_dict,
"processing": upload_numbers.processing,
"processed": upload_numbers.processed,
},
)
return ShouldCallNotifyResult.DO_NOT_NOTIFY

manual_trigger = read_yaml_field(
commit_yaml, ("codecov", "notify", "manual_trigger")
)
Expand Down
Loading