WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

add eval for debugging concurrency lease renewal failures #97

@zzstoatzz

Description

@zzstoatzz

scenario

based on real user issues:

users' flow runs crash mid-execution with "concurrency lease renewal failed" errors. this is a real production pain point that's hard to diagnose because it requires understanding prefect's internal lease renewal mechanism.

what's happening

when a flow run holds a concurrency slot, it must periodically renew the lease. if renewal fails (network issues, API problems, timeout), prefect crashes the run to prevent over-allocation. the error looks like:

Concurrency lease renewal failed - slots are no longer reserved. Terminating execution to prevent over-allocation.
Finished in state Crashed('Execution was cancelled by the runtime environment.')

eval requirements

the agent should be able to:

  1. identify that a flow run crashed due to lease renewal failure
  2. extract the specific error from the crashed state message
  3. explain what concurrency lease renewal is and why it matters
  4. suggest potential root causes (network issues, timeouts, lease duration settings)
  5. recommend remediation steps

implementation plan

fixture: crashed_lease_renewal_flow_run

@pytest.fixture
async def crashed_lease_renewal_flow_run(prefect_client: PrefectClient) -> FlowRun:
    """Create a flow run that crashed due to lease renewal failure."""
    @flow(name=f"data-pipeline-{uuid4().hex[:8]}")
    def data_pipeline():
        return "completed"
    
    # Create flow and run
    flow_id = await prefect_client.create_flow(data_pipeline)
    flow_run = await prefect_client.create_flow_run(flow_id=flow_id, name=f"pipeline-run-{uuid4().hex[:8]}")
    
    # Force to Crashed state with lease renewal error
    crashed_state = Crashed(
        message="Concurrency lease renewal failed - slots are no longer reserved. Terminating execution to prevent over-allocation.",
    )
    await prefect_client.set_flow_run_state(
        flow_run_id=flow_run.id,
        state=crashed_state,
        force=True,
    )
    
    return await prefect_client.read_flow_run(flow_run.id)

test structure

async def test_diagnoses_lease_renewal_failure(
    simple_agent: Agent,  # or reasoning_agent if more complex
    crashed_lease_renewal_flow_run: FlowRun,
    evaluate_response: Callable[[str, str], Awaitable[None]],
    tool_call_spy: ToolCallSpy,
) -> None:
    """Test agent identifies concurrency lease renewal failure as crash cause."""
    
    async with simple_agent:
        result = await simple_agent.run(
            f"""Why did my flow run '{crashed_lease_renewal_flow_run.name}' crash 
            unexpectedly during execution? It was running fine and then suddenly crashed."""
        )
    
    await evaluate_response(
        """Does the agent correctly identify that the flow run crashed due to 
        concurrency lease renewal failure? The response should mention 'lease renewal' 
        or 'concurrency slot' and explain what this means.""",
        result.output,
    )
    
    # Verify agent used appropriate tools
    tool_call_spy.assert_tool_was_called("get_flow_runs")

why this is valuable

  • real user pain point: multiple github issues show production impact
  • non-obvious diagnosis: requires understanding of internal concurrency mechanism
  • tests state analysis: agent must parse crashed state messages
  • tests conceptual knowledge: needs to understand lease renewal system
  • distinct from existing evals:
    • not about late runs (existing: work pool/queue/deployment/tag concurrency)
    • not about failure reasons (existing: stack traces from exceptions)
    • this is about mid-execution crashes from infrastructure issues

agent type

suggest using simple_agent since this is straightforward pattern matching on the state message, not complex reasoning about relationships between systems.

expected tools used

  • get_flow_runs - to retrieve the crashed flow run and inspect state
  • potentially get_dashboard - to check concurrency limit configuration

success criteria

agent response should:

  1. identify "lease renewal failed" as the cause
  2. explain that this relates to concurrency slot management
  3. suggest checking network connectivity, API availability, or lease duration settings
  4. differentiate this from other types of crashes

file location

evals/test_lease_renewal_crash.py

follows the pattern of other top-level diagnostic evals like test_flow_run_failure_reason.py

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions