WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 6a67a9c

Browse files
committed
better buffering
1 parent 8245f1f commit 6a67a9c

File tree

11 files changed

+555
-128
lines changed

11 files changed

+555
-128
lines changed

packages/sync-service/lib/electric/shape_cache/crashing_file_storage.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ defmodule Electric.ShapeCache.CrashingFileStorage do
2727
defdelegate hibernate(opts), to: PureFileStorage
2828
defdelegate compact(opts, keep_complete_chunks), to: PureFileStorage
2929
defdelegate append_move_in_snapshot_to_log!(name, writer_state), to: PureFileStorage
30+
defdelegate append_move_in_snapshot_to_log_filtered!(name, writer_state, touch_tracker, snapshot), to: PureFileStorage
3031
defdelegate append_control_message!(control_message, writer_state), to: PureFileStorage
3132
defdelegate write_move_in_snapshot!(stream, name, opts), to: PureFileStorage
3233

packages/sync-service/lib/electric/shape_cache/in_memory_storage.ex

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,49 @@ defmodule Electric.ShapeCache.InMemoryStorage do
362362
{{initial_offset, resulting_offset}, opts}
363363
end
364364

365+
@impl Electric.ShapeCache.Storage
366+
def append_move_in_snapshot_to_log_filtered!(
367+
name,
368+
%MS{log_table: log_table} = opts,
369+
touch_tracker,
370+
snapshot
371+
) do
372+
initial_offset = current_offset(opts)
373+
ref = make_ref()
374+
375+
Stream.unfold(initial_offset, fn offset ->
376+
case :ets.next_lookup(log_table, {:movein, {name, nil}}) do
377+
{{:movein, {^name, _}}, [{_, {key, _} = item}]} ->
378+
# Check if this row should be skipped
379+
if Electric.Shapes.Consumer.MoveIns.should_skip_query_row?(
380+
touch_tracker,
381+
snapshot,
382+
key
383+
) do
384+
# Skip this row - don't increment offset
385+
{[], offset}
386+
else
387+
offset = LogOffset.increment(offset)
388+
{{{:offset, storage_offset(offset)}, item}, offset}
389+
end
390+
391+
_ ->
392+
send(self(), {ref, offset})
393+
nil
394+
end
395+
end)
396+
|> Stream.reject(&(&1 == []))
397+
|> Stream.chunk_every(500)
398+
|> Stream.each(&:ets.insert(log_table, &1))
399+
|> Stream.run()
400+
401+
:ets.match_delete(log_table, {{:movein, {name, :_}}, :_})
402+
403+
resulting_offset = receive(do: ({^ref, offset} -> offset))
404+
405+
{{initial_offset, resulting_offset}, opts}
406+
end
407+
365408
@impl Electric.ShapeCache.Storage
366409
def cleanup!(%MS{} = opts) do
367410
for table <- tables(opts),

packages/sync-service/lib/electric/shape_cache/pure_file_storage.ex

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,60 @@ defmodule Electric.ShapeCache.PureFileStorage do
909909
{inserted_range, state}
910910
end
911911

912+
def append_move_in_snapshot_to_log_filtered!(
913+
name,
914+
writer_state(opts: opts, writer_acc: acc) = state,
915+
touch_tracker,
916+
snapshot
917+
) do
918+
starting_offset = WriteLoop.last_seen_offset(acc)
919+
920+
writer_state(writer_acc: acc) =
921+
state =
922+
Stream.resource(
923+
fn ->
924+
{File.open!(shape_snapshot_path(opts, name), [:read, :raw, :read_ahead]),
925+
LogOffset.increment(starting_offset)}
926+
end,
927+
fn {file, offset} ->
928+
with {:meta, <<key_size::32, json_size::64, op_type::8>>} <-
929+
{:meta, IO.binread(file, 13)},
930+
<<key::binary-size(key_size)>> <- IO.binread(file, key_size),
931+
<<json::binary-size(json_size)>> <- IO.binread(file, json_size) do
932+
# Check if this row should be skipped
933+
if Electric.Shapes.Consumer.MoveIns.should_skip_query_row?(
934+
touch_tracker,
935+
snapshot,
936+
key
937+
) do
938+
# Skip this row - don't increment offset
939+
{[], {file, offset}}
940+
else
941+
# Include this row
942+
{[{offset, key_size, key, op_type, 0, json_size, json}],
943+
{file, LogOffset.increment(offset)}}
944+
end
945+
else
946+
{:meta, :eof} ->
947+
{:halt, {file, offset}}
948+
949+
_ ->
950+
raise Storage.Error,
951+
message: "Incomplete move-in snapshot file at #{shape_snapshot_path(opts, name)}"
952+
end
953+
end,
954+
fn {file, _} ->
955+
File.close(file)
956+
FileInfo.delete(shape_snapshot_path(opts, name))
957+
end
958+
)
959+
|> append_to_log!(state)
960+
961+
inserted_range = {starting_offset, WriteLoop.last_seen_offset(acc)}
962+
963+
{inserted_range, state}
964+
end
965+
912966
def append_control_message!(control_message, writer_state(writer_acc: acc) = state)
913967
when is_binary(control_message) do
914968
offset = WriteLoop.last_seen_offset(acc)

packages/sync-service/lib/electric/shape_cache/storage.ex

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,22 @@ defmodule Electric.ShapeCache.Storage do
108108
@callback append_move_in_snapshot_to_log!(name :: String.t(), writer_state()) ::
109109
{inserted_range :: {LogOffset.t(), LogOffset.t()}, writer_state()} | no_return()
110110

111+
@doc """
112+
Splice a move in snapshot into the main log with filtering.
113+
114+
Rows are filtered using the touch_tracker: if a row's key has been touched by a transaction
115+
that is NOT visible in the snapshot, skip that row (stream has fresher data).
116+
117+
Returns the inserted range (excluding skipped rows) and updated writer state.
118+
"""
119+
@callback append_move_in_snapshot_to_log_filtered!(
120+
name :: String.t(),
121+
writer_state(),
122+
touch_tracker :: %{String.t() => pos_integer()},
123+
snapshot :: {pos_integer(), pos_integer(), [pos_integer()]}
124+
) ::
125+
{inserted_range :: {LogOffset.t(), LogOffset.t()}, writer_state()} | no_return()
126+
111127
@doc """
112128
Append a control message to the log that doesn't have an offset associated with it.
113129
@@ -290,6 +306,14 @@ defmodule Electric.ShapeCache.Storage do
290306
{inserted_range, {mod, new_writer_state}}
291307
end
292308

309+
@impl __MODULE__
310+
def append_move_in_snapshot_to_log_filtered!(name, {mod, writer_state}, touch_tracker, snapshot) do
311+
{inserted_range, new_writer_state} =
312+
mod.append_move_in_snapshot_to_log_filtered!(name, writer_state, touch_tracker, snapshot)
313+
314+
{inserted_range, {mod, new_writer_state}}
315+
end
316+
293317
@impl __MODULE__
294318
def append_control_message!(control_message, state)
295319
when is_map(control_message) do

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -313,15 +313,31 @@ defmodule Electric.Shapes.Consumer do
313313
end
314314
end
315315

316-
def handle_info({:query_move_in_complete, name, key_set}, state) do
316+
def handle_info({:pg_snapshot_known, name, snapshot}, state) do
317+
Logger.debug(fn -> "Snapshot known for move-in #{name}" end)
318+
319+
# Update the snapshot in waiting_move_ins
320+
move_handling_state = MoveIns.set_snapshot(state.move_handling_state, name, snapshot)
321+
322+
# Garbage collect touches visible in all known snapshots
323+
state = %{state | move_handling_state: move_handling_state}
324+
state = State.gc_touch_tracker(state)
325+
326+
{:noreply, state, state.hibernate_after}
327+
end
328+
329+
def handle_info({:query_move_in_complete, name, key_set, snapshot}, state) do
317330
Logger.debug(fn ->
318331
"Consumer query move in complete for #{name} with #{length(key_set)} keys"
319332
end)
320333

321-
{state, notification} = MoveHandling.query_complete(state, name, key_set)
334+
{state, notification} = MoveHandling.query_complete(state, name, key_set, snapshot)
322335
state = notify_new_changes(state, notification)
323336

324-
{:noreply, state, {:continue, :consume_buffer}}
337+
# Garbage collect touches after query completes (no buffer consumption needed)
338+
state = State.gc_touch_tracker(state)
339+
340+
{:noreply, state, state.hibernate_after}
325341
end
326342

327343
def handle_info({:query_move_in_error, _, error, stacktrace}, state) do
@@ -438,6 +454,7 @@ defmodule Electric.Shapes.Consumer do
438454

439455
defp handle_txns(txns, %State{} = state), do: Enum.reduce_while(txns, state, &handle_txn/2)
440456

457+
# Keep buffering for initial snapshot
441458
defp handle_txn(txn, %State{buffering?: true} = state),
442459
do: {:cont, State.add_to_buffer(state, txn)}
443460

@@ -447,27 +464,12 @@ defmodule Electric.Shapes.Consumer do
447464
{:cont, consider_flushed(%{state | initial_snapshot_state: initial_snapshot_state}, txn)}
448465

449466
{:continue, new_initial_snapshot_state} ->
450-
handle_txn_or_start_buffering(txn, %{
451-
state
452-
| initial_snapshot_state: new_initial_snapshot_state
453-
})
467+
handle_txn_in_span(txn, %{state | initial_snapshot_state: new_initial_snapshot_state})
454468
end
455469
end
456470

457-
defp handle_txn(txn, state), do: handle_txn_or_start_buffering(txn, state)
458-
459-
defp handle_txn_or_start_buffering(
460-
txn,
461-
%State{move_handling_state: move_handling_state} = state
462-
) do
463-
case MoveIns.check_txn(move_handling_state, txn) do
464-
{:start_buffering, new_move_handling_state} ->
465-
handle_txn(txn, %{state | move_handling_state: new_move_handling_state, buffering?: true})
466-
467-
{:continue, new_move_handling_state} ->
468-
handle_txn_in_span(txn, %{state | move_handling_state: new_move_handling_state})
469-
end
470-
end
471+
# Remove the move-in buffering check - just process immediately
472+
defp handle_txn(txn, state), do: handle_txn_in_span(txn, state)
471473

472474
defp handle_txn_in_span(txn, %State{} = state) do
473475
ot_attrs =
@@ -530,6 +532,12 @@ defmodule Electric.Shapes.Consumer do
530532
{changes, num_changes, last_log_offset} ->
531533
timestamp = System.monotonic_time()
532534

535+
# Track touches for all filtered changes
536+
state =
537+
Enum.reduce(changes, state, fn change, acc ->
538+
State.track_change(acc, xid, change)
539+
end)
540+
533541
{lines, total_size} = prepare_log_entries(changes, xid, shape)
534542
writer = ShapeCache.Storage.append_to_log!(lines, writer)
535543

packages/sync-service/lib/electric/shapes/consumer/move_handling.ex

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,36 @@ defmodule Electric.Shapes.Consumer.MoveHandling do
1717

1818
storage = state.storage
1919
name = Electric.Utils.uuid4()
20+
consumer_pid = self()
2021

21-
# We're querying and writing to storage in a separate task, but we're blocking until we know
22-
# the insertion conditions to know when to start buffering the changes.
23-
pg_snapshot =
24-
Electric.ProcessRegistry.name(state.stack_id, Electric.StackTaskSupervisor)
25-
|> PartialModes.query_move_in(
26-
state.shape_handle,
27-
state.shape,
28-
formed_where_clause,
29-
stack_id: state.stack_id,
30-
results_fn: fn stream ->
31-
stream
32-
|> Stream.transform(
33-
fn -> [] end,
34-
fn [key, _] = item, acc -> {[item], [key | acc]} end,
35-
fn acc -> send(self(), {:acc, acc}) end
36-
)
37-
|> Storage.write_move_in_snapshot!(name, storage)
22+
# Start async query - don't block on snapshot
23+
Electric.ProcessRegistry.name(state.stack_id, Electric.StackTaskSupervisor)
24+
|> PartialModes.query_move_in_async(
25+
state.shape_handle,
26+
state.shape,
27+
formed_where_clause,
28+
stack_id: state.stack_id,
29+
consumer_pid: consumer_pid,
30+
results_fn: fn stream, pg_snapshot ->
31+
task_pid = self()
3832

39-
receive(do: ({:acc, acc} -> acc))
40-
end,
41-
move_in_name: name
42-
)
33+
# Process query results
34+
stream
35+
|> Stream.transform(
36+
fn -> [] end,
37+
fn [key, _] = item, acc -> {[item], [key | acc]} end,
38+
fn acc -> send(task_pid, {:acc, acc, pg_snapshot}) end
39+
)
40+
|> Storage.write_move_in_snapshot!(name, storage)
4341

44-
move_handling_state =
45-
MoveIns.add_waiting(state.move_handling_state, name, pg_snapshot)
42+
# Return accumulated keys and snapshot
43+
receive(do: ({:acc, acc, snapshot} -> {acc, snapshot}))
44+
end,
45+
move_in_name: name
46+
)
4647

48+
# Add to waiting WITHOUT blocking (snapshot will be set later via message)
49+
move_handling_state = MoveIns.add_waiting(state.move_handling_state, name, nil)
4750
%{state | move_handling_state: move_handling_state}
4851
end
4952

@@ -67,16 +70,26 @@ defmodule Electric.Shapes.Consumer.MoveHandling do
6770
{%{state | writer: writer}, {[message], upper_bound}}
6871
end
6972

70-
@spec query_complete(State.t(), MoveIns.move_in_name(), list(String.t())) ::
73+
@spec query_complete(
74+
State.t(),
75+
MoveIns.move_in_name(),
76+
list(String.t()),
77+
MoveIns.pg_snapshot()
78+
) ::
7179
{State.t(), notification :: term()}
72-
def query_complete(%State{} = state, name, key_set) do
73-
# 1. Splice the stored data into the main log
80+
def query_complete(%State{} = state, name, key_set, snapshot) do
81+
# 1. Splice stored snapshot into main log with filtering
7482
{{_, upper_bound} = bounds, writer} =
75-
Storage.append_move_in_snapshot_to_log!(name, state.writer)
83+
Storage.append_move_in_snapshot_to_log_filtered!(
84+
name,
85+
state.writer,
86+
state.touch_tracker,
87+
snapshot
88+
)
7689

77-
# 2. Remove this entry from "waiting" and add to "filtering"
90+
# 2. Move from "waiting" to "filtering"
7891
move_handling_state =
79-
MoveIns.change_to_filtering(state.move_handling_state, name, key_set)
92+
MoveIns.change_to_filtering(state.move_handling_state, name, MapSet.new(key_set))
8093

8194
state = %{state | move_handling_state: move_handling_state, writer: writer}
8295

0 commit comments

Comments
 (0)