From bcdea5a2adf773bd3bca3e8e8ccc8dba32e29b1d Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 8 Dec 2025 12:33:38 -0500 Subject: [PATCH] Add functions to pipeline membership changes This change adds pipelined versions of ra:add_member/2 and ra:remove_member/2. Pipeline versions can be useful for using the WAL more efficiently when making multiple membership changes to different clusters simultaneously. In RabbitMQ this could be used for "shrinking" operations like forget_cluster_node which remove a member from all Ra clusters. --- src/ra.erl | 65 +++++++++++++++++++++++++++++++++++++++++++++++ src/ra_server.erl | 2 ++ test/ra_SUITE.erl | 26 +++++++++++++++++++ 3 files changed, 93 insertions(+) diff --git a/src/ra.erl b/src/ra.erl index 1a00d4bb..5d6669eb 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -61,8 +61,10 @@ %% membership changes add_member/2, add_member/3, + pipeline_add_member/4, remove_member/2, remove_member/3, + pipeline_remove_member/4, leave_and_terminate/3, leave_and_terminate/4, leave_and_delete_server/3, @@ -613,6 +615,38 @@ add_member(ServerLoc, ServerId, Timeout) -> {'$ra_join', ServerId, after_log_append}, Timeout). +%% @doc Asynchronously add a ra server id to a ra cluster's membership +%% configuration. +%% +%% This is the same operation as {@link add_member/2} but the membership +%% change is pipelined in the same way as {@link pipeline_command/4}. +%% +%% @param ServerRef the ra server or servers to send the command to +%% @param ServerId the ra server id of the server to remove +%% @param Correlation a correlation identifier to be included to receive an +%% async notification after the command is applied to the state machine. If the +%% Correlation is set to `no_correlation' then no notifications will be sent. +%% @param Priority command priority. `low' priority commands will be held back +%% and appended to the Raft log in batches. NB: A `normal' priority command sent +%% from the same process can overtake a low priority command that was +%% sent before. There is no high priority. +%% Only use priority level of `low' with commands that +%% do not rely on total execution ordering. +%% @see add_member/2 +%% @end +-spec pipeline_add_member(ServerRef :: ra_server_id() | [ra_server_id()], + ServerId :: ra_server_id(), + Correlation :: ra_server:command_correlation() | + no_correlation, + Priority :: ra_server:command_priority()) -> ok. +pipeline_add_member(ServerRef, ServerId, no_correlation, Priority) -> + Cmd = {'$ra_join', ServerId, noreply}, + ra_server_proc:cast_command(ServerRef, Priority, Cmd); +pipeline_add_member(ServerRef, ServerId, Correlation, Priority) -> + Cmd = {'$ra_join', ServerId, {notify, Correlation, self()}}, + ra_server_proc:cast_command(ServerRef, Priority, Cmd). + + %% @doc Removes a server from the cluster's membership configuration. %% This function returns after appending a cluster membership change %% command to the log. @@ -647,6 +681,37 @@ remove_member(ServerRef, ServerId, Timeout) -> {'$ra_leave', ServerId, after_log_append}, Timeout). +%% @doc Asynchronously remove a ra server id from a ra cluster's membership +%% configuration. +%% +%% This is the same operation as {@link remove_member/2} but the membership +%% change is pipelined in the same way as {@link pipeline_command/4}. +%% +%% @param ServerRef the ra server or servers to send the command to +%% @param ServerId the ra server id of the server to remove +%% @param Correlation a correlation identifier to be included to receive an +%% async notification after the command is applied to the state machine. If the +%% Correlation is set to `no_correlation' then no notifications will be sent. +%% @param Priority command priority. `low' priority commands will be held back +%% and appended to the Raft log in batches. NB: A `normal' priority command sent +%% from the same process can overtake a low priority command that was +%% sent before. There is no high priority. +%% Only use priority level of `low' with commands that +%% do not rely on total execution ordering. +%% @see remove_member/2 +%% @end +-spec pipeline_remove_member(ServerRef :: ra_server_id() | [ra_server_id()], + ServerId :: ra_server_id(), + Correlation :: ra_server:command_correlation() | + no_correlation, + Priority :: ra_server:command_priority()) -> ok. +pipeline_remove_member(ServerRef, ServerId, no_correlation, Priority) -> + Cmd = {'$ra_leave', ServerId, noreply}, + ra_server_proc:cast_command(ServerRef, Priority, Cmd); +pipeline_remove_member(ServerRef, ServerId, Correlation, Priority) -> + Cmd = {'$ra_leave', ServerId, {notify, Correlation, self()}}, + ra_server_proc:cast_command(ServerRef, Priority, Cmd). + %% @doc Makes the server enter a pre-vote state and attempt to become the leader. %% It is necessary to call this function when starting a new cluster as a %% brand new Ra server (node) will not automatically enter the pre-vote state. diff --git a/src/ra_server.erl b/src/ra_server.erl index 61cc1694..00daf16f 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -3518,6 +3518,8 @@ append_error_reply(Cmd, Reason, Effects0) -> case Cmd of {_, #{from := From}, _, _} -> [{reply, From, {error, Reason}} | Effects0]; + {_, _, _, {notify, Corr, Pid}} -> + [{notify, #{Pid => [{Corr, {error, Reason}}]}} | Effects0]; _ -> Effects0 end. diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index d1681235..98f63357 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -60,6 +60,7 @@ all_tests() -> snapshot_installation, snapshot_installation_with_call_crash, add_member, + pipeline_membership_changes, queue_example, ramp_up_and_ramp_down, start_and_join_then_leave_and_terminate, @@ -877,6 +878,31 @@ add_member(Config) -> {ok, 9, Leader} = ra:consistent_query(C, fun(S) -> S end), terminate_cluster([C | Cluster]). +pipeline_membership_changes(Config) -> + Name = ?config(test_name, Config), + [A, B, C] = Cluster0 = start_local_cluster(3, Name, add_machine()), + {ok, _, Leader0} = ra:process_command(A, 9), + Corr1 = make_ref(), + ok = ra:pipeline_remove_member(Leader0, C, Corr1, normal), + [{Corr1, ok}] = gather_applied([], 0), + stop_server(C), + {ok, Members, Leader} = ra:members(A), + ?assertEqual(lists:sort(Members), lists:sort([A, B])), + %% Process a command to ensure that the cluster change command has + %% been committed - this prevents spurious failures of + %% `cluster_change_not_permitted` from the next leave command: + {ok, _, Leader} = ra:process_command(Leader, 4), + Corr2 = make_ref(), + ok = ra:pipeline_remove_member(Leader, C, Corr2, normal), + [{Corr2, {error, not_member}}] = gather_applied([], 0), + ok = ra:start_server(default, Name, C, add_machine(), [A, B]), + Corr3 = make_ref(), + ok = ra:pipeline_add_member(Leader, C, Corr3, normal), + [{Corr3, ok}] = gather_applied([], 0), + {ok, Members1, _Leader} = ra:members(Leader), + ?assertEqual(lists:sort(Members1), lists:sort(Cluster0)), + terminate_cluster(Cluster0). + server_catches_up(Config) -> N1 = nth_server_name(Config, 1), N2 = nth_server_name(Config, 2),