WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 2e71c52

Browse files
committed
Allow teams to use global executors by default
If someone has configured a team, but has not added any executors to it, allow the team to then use the default global executor (which we enforce must exist). This was always possible but previously had to be configured directly in the tasks/dags, now it will be used by default.
1 parent d51e572 commit 2e71c52

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2935,6 +2935,10 @@ def _try_to_load_executor(self, ti: TaskInstance, session, team_name=NOTSET) ->
29352935
# First executor that resolves should be the default for that team
29362936
if _executor.team_name == team_name:
29372937
executor = _executor
2938+
break
2939+
else:
2940+
# No executor found for that team, fall back to global default
2941+
executor = self.job.executor
29382942
else:
29392943
# An executor is specified on the TaskInstance (as a str), so we need to find it in the list of executors
29402944
for _executor in self.job.executors:

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7570,6 +7570,38 @@ def test_multi_team_try_to_load_executor_no_explicit_executor_with_team(
75707570
# Should return the team-specific default executor set above
75717571
assert result == mock_executors[1]
75727572

7573+
@conf_vars({("core", "multi_team"): "true"})
7574+
def test_multi_team_try_to_load_executor_no_explicit_executor_with_team_no_team_default(
7575+
self, dag_maker, mock_executors, session
7576+
):
7577+
"""Test executor selection when no explicit executor but team exists and team has no executors (should
7578+
fallback to the global executor)."""
7579+
clear_db_teams()
7580+
clear_db_dag_bundles()
7581+
7582+
team = Team(name="team_a")
7583+
session.add(team)
7584+
session.flush()
7585+
7586+
bundle = DagBundleModel(name="bundle_a")
7587+
bundle.teams.append(team)
7588+
session.add(bundle)
7589+
session.flush()
7590+
7591+
with dag_maker(dag_id="dag_a", bundle_name="bundle_a", session=session):
7592+
task = EmptyOperator(task_id="test_task") # No explicit executor
7593+
7594+
dr = dag_maker.create_dagrun()
7595+
ti = dr.get_task_instance(task.task_id, session)
7596+
7597+
scheduler_job = Job()
7598+
self.job_runner = SchedulerJobRunner(job=scheduler_job)
7599+
7600+
result = self.job_runner._try_to_load_executor(ti, session)
7601+
7602+
# Should return the team-specific default executor set above
7603+
assert result == mock_executors[0]
7604+
75737605
@conf_vars({("core", "multi_team"): "true"})
75747606
def test_multi_team_try_to_load_executor_explicit_executor_matches_team(
75757607
self, dag_maker, mock_executors, session

0 commit comments

Comments
 (0)