WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit ed304ff

Browse files
committed
addressing feedback
1 parent c12a408 commit ed304ff

File tree

17 files changed

+371
-200
lines changed

17 files changed

+371
-200
lines changed

packages/sync-service/lib/electric/postgres/snapshot_query.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
defmodule Electric.Postgres.SnapshotQuery do
2+
alias Electric.SnapshotError
23
alias Electric.Shapes.Shape
34
alias Electric.Telemetry.OpenTelemetry
45

@@ -72,6 +73,9 @@ defmodule Electric.Postgres.SnapshotQuery do
7273
end,
7374
timeout: :infinity
7475
)
76+
catch
77+
:exit, {_, {DBConnection.Holder, :checkout, _}} ->
78+
raise SnapshotError.connection_not_available()
7579
end
7680

7781
defp shape_attrs(shape_handle, shape) do

packages/sync-service/lib/electric/shape_cache/pure_file_storage/file_info.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,8 @@ defmodule Electric.ShapeCache.PureFileStorage.FileInfo do
8484
:file.truncate(file)
8585
end)
8686
end
87+
88+
def delete(path) do
89+
:prim_file.delete(path)
90+
end
8791
end

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
defmodule Electric.Shapes.Consumer do
22
use GenServer, restart: :temporary
33

4-
alias Electric.Shapes.Consumer.MoveHandlingState
5-
alias Electric.Shapes.Consumer.InitialSnapshotState
4+
alias Electric.Shapes.Consumer.MoveIns
5+
alias Electric.Shapes.Consumer.InitialSnapshot
66
alias Electric.Shapes.Consumer.MoveHandling
77
alias Electric.Shapes.Consumer.State
88

@@ -437,7 +437,7 @@ defmodule Electric.Shapes.Consumer do
437437
do: {:cont, State.add_to_buffer(state, txn)}
438438

439439
defp handle_txn(txn, state) when needs_initial_filtering(state) do
440-
case InitialSnapshotState.filter(state.initial_snapshot_state, state.storage, txn) do
440+
case InitialSnapshot.filter(state.initial_snapshot_state, state.storage, txn) do
441441
{:consider_flushed, initial_snapshot_state} ->
442442
{:cont, consider_flushed(%{state | initial_snapshot_state: initial_snapshot_state}, txn)}
443443

@@ -455,7 +455,7 @@ defmodule Electric.Shapes.Consumer do
455455
txn,
456456
%State{move_handling_state: move_handling_state} = state
457457
) do
458-
case MoveHandlingState.check_txn(move_handling_state, txn) do
458+
case MoveIns.check_txn(move_handling_state, txn) do
459459
{:start_buffering, new_move_handling_state} ->
460460
handle_txn(txn, %{state | move_handling_state: new_move_handling_state, buffering?: true})
461461

@@ -666,7 +666,7 @@ defmodule Electric.Shapes.Consumer do
666666
change_acc,
667667
total_ops
668668
) do
669-
if not MoveHandlingState.change_already_visible?(filter_state, xid, change) do
669+
if not MoveIns.change_already_visible?(filter_state, xid, change) do
670670
case Shape.convert_change(shape, change,
671671
extra_refs: extra_refs,
672672
stack_id: stack_id,

packages/sync-service/lib/electric/shapes/consumer/initial_snapshot_state.ex renamed to packages/sync-service/lib/electric/shapes/consumer/initial_snapshot.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule Electric.Shapes.Consumer.InitialSnapshotState do
1+
defmodule Electric.Shapes.Consumer.InitialSnapshot do
22
@moduledoc false
33
# Internal module, used as a part of the consumer state, dealing
44
# with the initial snapshot state and the waiting for the snapshot to start.

packages/sync-service/lib/electric/shapes/consumer/materializer.ex

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ defmodule Electric.Shapes.Consumer.Materializer do
149149
end
150150

151151
%{"headers" => %{"event" => "move-out", "patterns" => patterns}} ->
152+
patterns =
153+
Enum.map(patterns, fn %{"pos" => pos, "value" => value} ->
154+
%{pos: pos, value: value}
155+
end)
156+
152157
%{headers: %{event: "move-out", patterns: patterns}}
153158
end)
154159
|> apply_changes(state)
@@ -375,21 +380,21 @@ defmodule Electric.Shapes.Consumer.Materializer do
375380

376381
defp add_row_to_tag_indices(tag_indices, key, move_tags) do
377382
# For now we only support one move tag per row (i.e. no `OR`s in the where clause if there's a subquery)
378-
Enum.reduce(move_tags, tag_indices, fn [val1], acc ->
379-
Map.update(acc, val1, MapSet.new([key]), &MapSet.put(&1, key))
383+
Enum.reduce(move_tags, tag_indices, fn tag, acc when is_binary(tag) ->
384+
Map.update(acc, tag, MapSet.new([key]), &MapSet.put(&1, key))
380385
end)
381386
end
382387

383388
defp remove_row_from_tag_indices(tag_indices, key, move_tags) do
384-
Enum.reduce(move_tags, tag_indices, fn [val1], acc ->
385-
case Map.fetch(acc, val1) do
389+
Enum.reduce(move_tags, tag_indices, fn tag, acc when is_binary(tag) ->
390+
case Map.fetch(acc, tag) do
386391
{:ok, v} ->
387392
new_mapset = MapSet.delete(v, key)
388393

389394
if MapSet.size(new_mapset) == 0 do
390-
Map.delete(acc, val1)
395+
Map.delete(acc, tag)
391396
else
392-
Map.put(acc, val1, new_mapset)
397+
Map.put(acc, tag, new_mapset)
393398
end
394399

395400
:error ->
@@ -400,8 +405,9 @@ defmodule Electric.Shapes.Consumer.Materializer do
400405

401406
defp pop_keys_from_tag_indices(tag_indices, patterns) do
402407
# This implementation is naive while we support only one tag per row and no composite tags.
403-
Enum.reduce(patterns, {MapSet.new(), tag_indices}, fn [val1], {keys, acc} ->
404-
case Map.pop(acc, val1) do
408+
Enum.reduce(patterns, {MapSet.new(), tag_indices}, fn %{pos: _pos, value: value},
409+
{keys, acc} ->
410+
case Map.pop(acc, value) do
405411
{nil, acc} -> {keys, acc}
406412
{v, acc} -> {MapSet.union(keys, v), acc}
407413
end

packages/sync-service/lib/electric/shapes/consumer/move_handling.ex

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ defmodule Electric.Shapes.Consumer.MoveHandling do
55
alias Electric.Shapes.PartialModes
66
alias Electric.Shapes.Shape
77
alias Electric.Shapes.Shape.SubqueryMoves
8-
alias Electric.Shapes.Consumer.MoveHandlingState
8+
alias Electric.Shapes.Consumer.MoveIns
99

1010
@spec process_move_ins(State.t(), Shape.handle(), list(term())) :: State.t()
1111
def process_move_ins(state, _, []), do: state
@@ -42,7 +42,7 @@ defmodule Electric.Shapes.Consumer.MoveHandling do
4242
)
4343

4444
move_handling_state =
45-
MoveHandlingState.add_waiting_move_in(state.move_handling_state, name, pg_snapshot)
45+
MoveIns.add_waiting(state.move_handling_state, name, pg_snapshot)
4646

4747
%{state | move_handling_state: move_handling_state}
4848
end
@@ -53,14 +53,21 @@ defmodule Electric.Shapes.Consumer.MoveHandling do
5353

5454
def process_move_outs(state, dep_handle, removed_values) do
5555
message =
56-
SubqueryMoves.make_move_out_control_message(state.shape, [{dep_handle, removed_values}])
56+
SubqueryMoves.make_move_out_control_message(
57+
state.shape,
58+
state.stack_id,
59+
state.shape_handle,
60+
[
61+
{dep_handle, removed_values}
62+
]
63+
)
5764

5865
{{_, upper_bound}, writer} = Storage.append_control_message!(message, state.writer)
5966

6067
{%{state | writer: writer}, {[message], upper_bound}}
6168
end
6269

63-
@spec query_complete(State.t(), MoveHandlingState.move_in_name(), list(String.t())) ::
70+
@spec query_complete(State.t(), MoveIns.move_in_name(), list(String.t())) ::
6471
{State.t(), notification :: term()}
6572
def query_complete(%State{} = state, name, key_set) do
6673
# 1. Splice the stored data into the main log
@@ -69,7 +76,7 @@ defmodule Electric.Shapes.Consumer.MoveHandling do
6976

7077
# 2. Remove this entry from "waiting" and add to "filtering"
7178
move_handling_state =
72-
MoveHandlingState.change_move_in_to_filtering(state.move_handling_state, name, key_set)
79+
MoveIns.change_to_filtering(state.move_handling_state, name, key_set)
7380

7481
state = %{state | move_handling_state: move_handling_state, writer: writer}
7582

packages/sync-service/lib/electric/shapes/consumer/move_handling_state.ex renamed to packages/sync-service/lib/electric/shapes/consumer/move_ins.ex

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule Electric.Shapes.Consumer.MoveHandlingState do
1+
defmodule Electric.Shapes.Consumer.MoveIns do
22
alias Electric.Replication.Changes
33
alias Electric.Replication.Changes.Transaction
44
alias Electric.Postgres.Xid
@@ -25,8 +25,8 @@ defmodule Electric.Shapes.Consumer.MoveHandlingState do
2525
Add information about a new move-in to the state for which we're waiting
2626
and update the buffering boundary.
2727
"""
28-
@spec add_waiting_move_in(t(), move_in_name(), pg_snapshot()) :: t()
29-
def add_waiting_move_in(%__MODULE__{waiting_move_ins: waiting_move_ins} = state, name, snapshot) do
28+
@spec add_waiting(t(), move_in_name(), pg_snapshot()) :: t()
29+
def add_waiting(%__MODULE__{waiting_move_ins: waiting_move_ins} = state, name, snapshot) do
3030
new_waiting_move_ins = Map.put(waiting_move_ins, name, snapshot)
3131
new_buffering_snapshot = make_move_in_buffering_snapshot(new_waiting_move_ins)
3232

@@ -54,8 +54,8 @@ defmodule Electric.Shapes.Consumer.MoveHandlingState do
5454
@doc """
5555
Change a move-in from "waiting" to "filtering" and update the buffering boundary.
5656
"""
57-
@spec change_move_in_to_filtering(t(), move_in_name(), list(String.t())) :: t()
58-
def change_move_in_to_filtering(%__MODULE__{} = state, name, key_set) do
57+
@spec change_to_filtering(t(), move_in_name(), list(String.t())) :: t()
58+
def change_to_filtering(%__MODULE__{} = state, name, key_set) do
5959
{snapshot, waiting_move_ins} = Map.pop!(state.waiting_move_ins, name)
6060
filtering_move_ins = [{snapshot, key_set} | state.filtering_move_ins]
6161
buffering_snapshot = make_move_in_buffering_snapshot(waiting_move_ins)
@@ -77,8 +77,8 @@ defmodule Electric.Shapes.Consumer.MoveHandlingState do
7777
Filtering generally is applied only to transactions that are already visible
7878
in the snapshot, and those can only be with `xid < xmax`.
7979
"""
80-
@spec remove_completed_move_ins(t(), Transaction.t()) :: t()
81-
def remove_completed_move_ins(%__MODULE__{} = state, %Transaction{xid: xid}) do
80+
@spec remove_completed(t(), Transaction.t()) :: t()
81+
def remove_completed(%__MODULE__{} = state, %Transaction{xid: xid}) do
8282
state.filtering_move_ins
8383
|> Enum.reject(fn {snapshot, _} -> Xid.after_snapshot?(xid, snapshot) end)
8484
|> then(&%{state | filtering_move_ins: &1})
@@ -92,7 +92,7 @@ defmodule Electric.Shapes.Consumer.MoveHandlingState do
9292
@spec check_txn(t(), Transaction.t()) :: {:continue, t()} | {:start_buffering, t()}
9393
def check_txn(%__MODULE__{move_in_buffering_snapshot: snapshot} = state, %Transaction{} = txn) do
9494
if is_nil(snapshot) or Transaction.visible_in_snapshot?(txn, snapshot) do
95-
state = remove_completed_move_ins(state, txn)
95+
state = remove_completed(state, txn)
9696
{:continue, state}
9797
else
9898
{:start_buffering, state}

packages/sync-service/lib/electric/shapes/consumer/state.ex

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Electric.Shapes.Consumer.State do
22
@moduledoc false
3-
alias Electric.Shapes.Consumer.MoveHandlingState
4-
alias Electric.Shapes.Consumer.InitialSnapshotState
3+
alias Electric.Shapes.Consumer.MoveIns
4+
alias Electric.Shapes.Consumer.InitialSnapshot
55
alias Electric.Shapes.Shape
66
alias Electric.Replication.Changes.Transaction
77
alias Electric.Postgres.SnapshotQuery
@@ -18,8 +18,8 @@ defmodule Electric.Shapes.Consumer.State do
1818
:latest_offset,
1919
:storage,
2020
:writer,
21-
initial_snapshot_state: InitialSnapshotState.new(nil),
22-
move_handling_state: MoveHandlingState.new(),
21+
initial_snapshot_state: InitialSnapshot.new(nil),
22+
move_handling_state: MoveIns.new(),
2323
buffer: [],
2424
txn_offset_mapping: [],
2525
materializer_subscribed?: false,
@@ -116,11 +116,11 @@ defmodule Electric.Shapes.Consumer.State do
116116
# }
117117

118118
defguard is_snapshot_started(state)
119-
when is_struct(state.initial_snapshot_state, InitialSnapshotState) and
119+
when is_struct(state.initial_snapshot_state, InitialSnapshot) and
120120
state.initial_snapshot_state.snapshot_started?
121121

122122
defguard needs_initial_filtering(state)
123-
when is_struct(state.initial_snapshot_state, InitialSnapshotState) and
123+
when is_struct(state.initial_snapshot_state, InitialSnapshot) and
124124
state.initial_snapshot_state.filtering?
125125

126126
@spec new(Electric.stack_id(), Shape.handle(), Shape.t()) :: uninitialized_t()
@@ -156,15 +156,15 @@ defmodule Electric.Shapes.Consumer.State do
156156
do: LogOffset.last_before_real_offsets(),
157157
else: latest_offset
158158

159-
initial_snapshot_state = InitialSnapshotState.new(pg_snapshot)
159+
initial_snapshot_state = InitialSnapshot.new(pg_snapshot)
160160

161161
%__MODULE__{
162162
state
163163
| latest_offset: normalized_latest_offset,
164164
storage: storage,
165165
writer: writer,
166166
initial_snapshot_state: initial_snapshot_state,
167-
buffering?: InitialSnapshotState.needs_buffering?(initial_snapshot_state)
167+
buffering?: InitialSnapshot.needs_buffering?(initial_snapshot_state)
168168
}
169169
end
170170

@@ -195,7 +195,7 @@ defmodule Electric.Shapes.Consumer.State do
195195
def add_waiter(%__MODULE__{initial_snapshot_state: initial_snapshot_state} = state, from) do
196196
%{
197197
state
198-
| initial_snapshot_state: InitialSnapshotState.add_waiter(initial_snapshot_state, from)
198+
| initial_snapshot_state: InitialSnapshot.add_waiter(initial_snapshot_state, from)
199199
}
200200
end
201201

@@ -204,25 +204,25 @@ defmodule Electric.Shapes.Consumer.State do
204204
snapshot
205205
) do
206206
initial_snapshot_state =
207-
InitialSnapshotState.set_initial_snapshot(initial_snapshot_state, state.storage, snapshot)
207+
InitialSnapshot.set_initial_snapshot(initial_snapshot_state, state.storage, snapshot)
208208

209209
%{
210210
state
211211
| initial_snapshot_state: initial_snapshot_state,
212-
buffering?: InitialSnapshotState.needs_buffering?(initial_snapshot_state)
212+
buffering?: InitialSnapshot.needs_buffering?(initial_snapshot_state)
213213
}
214214
end
215215

216216
def mark_snapshot_started(%__MODULE__{initial_snapshot_state: initial_snapshot_state} = state) do
217217
initial_snapshot_state =
218-
InitialSnapshotState.mark_snapshot_started(initial_snapshot_state, state.storage)
218+
InitialSnapshot.mark_snapshot_started(initial_snapshot_state, state.storage)
219219

220220
%{state | initial_snapshot_state: initial_snapshot_state}
221221
end
222222

223223
def reply_to_snapshot_waiters(state, reason) do
224224
initial_snapshot_state =
225-
InitialSnapshotState.reply_to_waiters(state.initial_snapshot_state, reason)
225+
InitialSnapshot.reply_to_waiters(state.initial_snapshot_state, reason)
226226

227227
%{state | initial_snapshot_state: initial_snapshot_state}
228228
end

packages/sync-service/lib/electric/shapes/querying.ex

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,17 @@ defmodule Electric.Shapes.Querying do
137137
shape_handle
138138
) do
139139
additional_headers =
140-
Shape.SubqueryMoves.move_in_tag_structure(shape)
141-
|> Electric.Utils.deep_map(fn column_name ->
142-
"$${md5('#{stack_id}#{shape_handle}' || #{column_name}::text)}"
140+
shape.tag_structure
141+
|> Enum.map(fn pattern ->
142+
Enum.map(pattern, fn
143+
column_name when is_binary(column_name) ->
144+
"$${md5('#{stack_id}#{shape_handle}' || #{column_name}::text)}"
145+
146+
{:hash_together, columns} ->
147+
column_parts = Enum.map(columns, &~s['#{&1}:' || #{&1}::text])
148+
"$${md5('#{stack_id}#{shape_handle}' || #{Enum.join(column_parts, " || ")})}"
149+
end)
150+
|> Enum.join("/")
143151
end)
144152
|> case do
145153
[] -> additional_headers

packages/sync-service/lib/electric/shapes/shape.ex

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -633,17 +633,20 @@ defmodule Electric.Shapes.Shape do
633633
%{change | move_tags: make_tags_from_pattern(tag_structure, record, stack_id, shape_handle)}
634634
end
635635

636-
defp make_tags_from_pattern(pattern, record, stack_id, shape_handle) do
637-
Utils.deep_map(pattern, fn column_name ->
638-
make_value_hash(stack_id, shape_handle, Map.get(record, column_name))
636+
defp make_tags_from_pattern(patterns, record, stack_id, shape_handle) do
637+
Enum.map(patterns, fn pattern ->
638+
Enum.map(pattern, fn
639+
column_name when is_binary(column_name) ->
640+
SubqueryMoves.make_value_hash(stack_id, shape_handle, Map.get(record, column_name))
641+
642+
{:hash_together, columns} ->
643+
column_parts = Enum.map(columns, &(&1 <> ":" <> Map.get(record, &1)))
644+
SubqueryMoves.make_value_hash(stack_id, shape_handle, Enum.join(column_parts, ":"))
645+
end)
646+
|> Enum.join("/")
639647
end)
640648
end
641649

642-
defp make_value_hash(stack_id, shape_handle, value) do
643-
:crypto.hash(:md5, "#{stack_id}#{shape_handle}#{value}")
644-
|> Base.encode16(case: :lower)
645-
end
646-
647650
defp filtered_columns_changed?(%Changes.UpdatedRecord{old_record: record, record: record}),
648651
do: false
649652

0 commit comments

Comments
 (0)