WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit c12a408

Browse files
committed
hashing of tags
1 parent 1137b27 commit c12a408

File tree

10 files changed

+247
-74
lines changed

10 files changed

+247
-74
lines changed

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,12 @@ defmodule Electric.Shapes.Consumer do
497497

498498
extra_refs = Materializer.get_all_as_refs(shape, state.stack_id)
499499

500-
case filter_changes(changes, shape, {xid, state.move_handling_state}, extra_refs) do
500+
case filter_changes(
501+
changes,
502+
shape,
503+
{xid, state.move_handling_state, state.stack_id, state.shape_handle},
504+
extra_refs
505+
) do
501506
:includes_truncate ->
502507
# TODO: This is a very naive way to handle truncations: if ANY relevant truncates are
503508
# present in the transaction, we're considering the whole transaction empty, and
@@ -656,13 +661,17 @@ defmodule Electric.Shapes.Consumer do
656661
defp filter_changes(
657662
[change | rest],
658663
shape,
659-
{xid, filter_state} = filtering,
664+
{xid, filter_state, stack_id, shape_handle} = filtering,
660665
extra_refs,
661666
change_acc,
662667
total_ops
663668
) do
664669
if not MoveHandlingState.change_already_visible?(filter_state, xid, change) do
665-
case Shape.convert_change(shape, change, extra_refs) do
670+
case Shape.convert_change(shape, change,
671+
extra_refs: extra_refs,
672+
stack_id: stack_id,
673+
shape_handle: shape_handle
674+
) do
666675
[] ->
667676
filter_changes(rest, shape, filtering, extra_refs, change_acc, total_ops)
668677

packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
234234
}
235235
})
236236

237-
Querying.stream_initial_data(conn, stack_id, shape, chunk_bytes_threshold)
237+
Querying.stream_initial_data(conn, stack_id, shape_handle, shape, chunk_bytes_threshold)
238238
|> Stream.transform(
239239
fn -> false end,
240240
fn item, acc ->

packages/sync-service/lib/electric/shapes/partial_modes.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ defmodule Electric.Shapes.PartialModes do
1515
send(self(), {:pg_snapshot_info, pg_snapshot, lsn})
1616
end,
1717
query_fn: fn conn, _, _ ->
18-
Querying.query_subset(conn, shape, subset, headers)
18+
Querying.query_subset(conn, opts[:stack_id], shape_handle, shape, subset, headers)
1919
|> Enum.to_list()
2020
end,
2121
stack_id: opts[:stack_id],
@@ -63,7 +63,7 @@ defmodule Electric.Shapes.PartialModes do
6363
end,
6464
query_fn: fn conn, _, _ ->
6565
result =
66-
Querying.query_move_in(conn, shape, where)
66+
Querying.query_move_in(conn, opts[:stack_id], shape_handle, shape, where)
6767
|> results_fn.()
6868

6969
send(parent, {:query_move_in_complete, opts[:move_in_name], result})

packages/sync-service/lib/electric/shapes/querying.ex

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ defmodule Electric.Shapes.Querying do
44
alias Electric.Shapes.Shape
55
alias Electric.Telemetry.OpenTelemetry
66

7-
def query_move_in(conn, shape, {where, params}) do
7+
def query_move_in(conn, stack_id, shape_handle, shape, {where, params}) do
88
table = Utils.relation_to_sql(shape.root_table)
99

10-
{json_like_select, _} = json_like_select(shape, %{"is_move_in" => true})
10+
{json_like_select, _} =
11+
json_like_select(shape, %{"is_move_in" => true}, stack_id, shape_handle)
12+
1113
key_select = key_select(shape)
1214

1315
query =
@@ -21,7 +23,7 @@ defmodule Electric.Shapes.Querying do
2123
|> Stream.flat_map(& &1.rows)
2224
end
2325

24-
def query_subset(conn, shape, subset, headers \\ []) do
26+
def query_subset(conn, stack_id, shape_handle, shape, subset, headers \\ []) do
2527
# When querying a subset, we select same columns as the base shape
2628
table = Utils.relation_to_sql(shape.root_table)
2729

@@ -44,7 +46,7 @@ defmodule Electric.Shapes.Querying do
4446
limit = if limit = subset.limit, do: " LIMIT #{limit}", else: ""
4547
offset = if offset = subset.offset, do: " OFFSET #{offset}", else: ""
4648

47-
{json_like_select, params} = json_like_select(shape, headers)
49+
{json_like_select, params} = json_like_select(shape, headers, stack_id, shape_handle)
4850

4951
query =
5052
Postgrex.prepare!(
@@ -64,22 +66,30 @@ defmodule Electric.Shapes.Querying do
6466

6567
@type json_result_stream :: Enumerable.t(json_iodata())
6668

67-
@spec stream_initial_data(DBConnection.t(), String.t(), Shape.t(), non_neg_integer()) ::
69+
@spec stream_initial_data(
70+
DBConnection.t(),
71+
String.t(),
72+
String.t(),
73+
Shape.t(),
74+
non_neg_integer()
75+
) ::
6876
json_result_stream()
6977
def stream_initial_data(
7078
conn,
7179
stack_id,
80+
shape_handle,
7281
shape,
7382
chunk_bytes_threshold \\ LogChunker.default_chunk_size_threshold()
7483
)
7584

76-
def stream_initial_data(_, _, %Shape{log_mode: :changes_only}, _chunk_bytes_threshold) do
85+
def stream_initial_data(_, _, _, %Shape{log_mode: :changes_only}, _chunk_bytes_threshold) do
7786
[]
7887
end
7988

8089
def stream_initial_data(
8190
conn,
8291
stack_id,
92+
shape_handle,
8393
%Shape{root_table: root_table} = shape,
8494
chunk_bytes_threshold
8595
) do
@@ -89,7 +99,7 @@ defmodule Electric.Shapes.Querying do
8999
where =
90100
if not is_nil(shape.where), do: " WHERE " <> shape.where.query, else: ""
91101

92-
{json_like_select, params} = json_like_select(shape)
102+
{json_like_select, params} = json_like_select(shape, [], stack_id, shape_handle)
93103

94104
query =
95105
Postgrex.prepare!(conn, table, ~s|SELECT #{json_like_select} FROM #{table} #{where}|)
@@ -122,11 +132,15 @@ defmodule Electric.Shapes.Querying do
122132
root_table: root_table,
123133
selected_columns: columns
124134
} = shape,
125-
additional_headers \\ []
135+
additional_headers,
136+
stack_id,
137+
shape_handle
126138
) do
127139
additional_headers =
128140
Shape.SubqueryMoves.move_in_tag_structure(shape)
129-
|> Electric.Utils.deep_map(fn column_name -> "$${#{column_name}}" end)
141+
|> Electric.Utils.deep_map(fn column_name ->
142+
"$${md5('#{stack_id}#{shape_handle}' || #{column_name}::text)}"
143+
end)
130144
|> case do
131145
[] -> additional_headers
132146
tag_structure -> additional_headers |> Map.new() |> Map.put(:tags, tag_structure)
@@ -163,9 +177,20 @@ defmodule Electric.Shapes.Querying do
163177
headers =
164178
headers
165179
|> Map.merge(additional_headers)
166-
|> Jason.encode!()
167-
|> Utils.escape_quotes(?')
168-
|> String.replace(~r/\$\$\{(\w+)\}/, ~s[' || \\1::text || '])
180+
|> Map.pop(:tags)
181+
|> case do
182+
{nil, headers} ->
183+
headers |> Jason.encode!() |> Utils.escape_quotes(?')
184+
185+
{tags, headers} ->
186+
"{" <> json = headers |> Jason.encode!() |> Utils.escape_quotes(?')
187+
188+
tags =
189+
Jason.encode!(tags)
190+
|> String.replace(~r/\$\$\{([^\}]+)\}/, ~s[' || \\1::text || '])
191+
192+
~s|{"tags":#{tags},| <> json
193+
end
169194

170195
~s['"headers":#{headers}']
171196
end

packages/sync-service/lib/electric/shapes/shape.ex

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ defmodule Electric.Shapes.Shape do
524524
Updates, on the other hand, may be converted to an "new record" or a "deleted record"
525525
if the previous/new version of the updated row isn't in the shape.
526526
"""
527-
def convert_change(shape, change, extra_refs \\ %{})
527+
def convert_change(shape, change, opts \\ [])
528528

529529
def convert_change(%__MODULE__{root_table: table}, %{relation: relation}, _)
530530
when table != relation,
@@ -533,13 +533,13 @@ defmodule Electric.Shapes.Shape do
533533
def convert_change(
534534
%__MODULE__{where: nil, flags: %{selects_all_columns: true}} = shape,
535535
change,
536-
_
536+
opts
537537
) do
538538
# If the change actually doesn't change any columns, we can skip it - this is possible on Postgres but we don't care for those.
539539
if is_struct(change, Changes.UpdatedRecord) and change.changed_columns == MapSet.new() do
540540
[]
541541
else
542-
[fill_move_tags(change, shape)]
542+
[fill_move_tags(change, shape, opts[:stack_id], opts[:shape_handle])]
543543
end
544544
end
545545

@@ -548,14 +548,17 @@ defmodule Electric.Shapes.Shape do
548548
def convert_change(
549549
%__MODULE__{where: where, selected_columns: selected_columns} = shape,
550550
change,
551-
extra_refs
551+
opts
552552
)
553553
when is_struct(change, Changes.NewRecord)
554554
when is_struct(change, Changes.DeletedRecord) do
555555
record = if is_struct(change, Changes.NewRecord), do: change.record, else: change.old_record
556556

557-
if WhereClause.includes_record?(where, record, extra_refs) do
558-
change |> fill_move_tags(shape) |> filter_change_columns(selected_columns) |> List.wrap()
557+
if WhereClause.includes_record?(where, record, opts[:extra_refs] || %{}) do
558+
change
559+
|> fill_move_tags(shape, opts[:stack_id], opts[:shape_handle])
560+
|> filter_change_columns(selected_columns)
561+
|> List.wrap()
559562
else
560563
[]
561564
end
@@ -564,8 +567,9 @@ defmodule Electric.Shapes.Shape do
564567
def convert_change(
565568
%__MODULE__{where: where, selected_columns: selected_columns} = shape,
566569
%Changes.UpdatedRecord{old_record: old_record, record: record} = change,
567-
extra_refs
570+
opts
568571
) do
572+
extra_refs = opts[:extra_refs] || %{}
569573
old_record_in_shape = WhereClause.includes_record?(where, old_record, extra_refs)
570574
new_record_in_shape = WhereClause.includes_record?(where, record, extra_refs)
571575

@@ -578,7 +582,7 @@ defmodule Electric.Shapes.Shape do
578582
end
579583

580584
converted_changes
581-
|> Enum.map(&fill_move_tags(&1, shape))
585+
|> Enum.map(&fill_move_tags(&1, shape, opts[:stack_id], opts[:shape_handle]))
582586
|> Enum.map(&filter_change_columns(&1, selected_columns))
583587
|> Enum.filter(&filtered_columns_changed?/1)
584588
end
@@ -589,33 +593,55 @@ defmodule Electric.Shapes.Shape do
589593
Changes.filter_columns(change, selected_columns)
590594
end
591595

592-
defp fill_move_tags(change, %__MODULE__{tag_structure: []}), do: change
596+
defp fill_move_tags(change, %__MODULE__{tag_structure: []}, _, _), do: change
593597

594-
defp fill_move_tags(%Changes.NewRecord{record: record} = change, %__MODULE__{
595-
tag_structure: tag_structure
596-
}) do
597-
move_tags = Utils.deep_map(tag_structure, fn column_name -> Map.get(record, column_name) end)
598+
defp fill_move_tags(
599+
%Changes.NewRecord{record: record} = change,
600+
%__MODULE__{
601+
tag_structure: tag_structure
602+
},
603+
stack_id,
604+
shape_handle
605+
) do
606+
move_tags = make_tags_from_pattern(tag_structure, record, stack_id, shape_handle)
598607
%{change | move_tags: move_tags}
599608
end
600609

601610
defp fill_move_tags(
602611
%Changes.UpdatedRecord{record: record, old_record: old_record} = change,
603-
%__MODULE__{tag_structure: tag_structure}
612+
%__MODULE__{tag_structure: tag_structure},
613+
stack_id,
614+
shape_handle
604615
) do
605-
move_tags = Utils.deep_map(tag_structure, fn column_name -> Map.get(record, column_name) end)
616+
move_tags = make_tags_from_pattern(tag_structure, record, stack_id, shape_handle)
606617

607618
old_move_tags =
608-
Utils.deep_map(tag_structure, fn column_name -> Map.get(old_record, column_name) end) --
619+
make_tags_from_pattern(tag_structure, old_record, stack_id, shape_handle) --
609620
move_tags
610621

611622
%{change | move_tags: move_tags, removed_move_tags: old_move_tags}
612623
end
613624

614-
defp fill_move_tags(%Changes.DeletedRecord{old_record: record} = change, %__MODULE__{
615-
tag_structure: tag_structure
616-
}) do
617-
move_tags = Utils.deep_map(tag_structure, fn column_name -> Map.get(record, column_name) end)
618-
%{change | move_tags: move_tags}
625+
defp fill_move_tags(
626+
%Changes.DeletedRecord{old_record: record} = change,
627+
%__MODULE__{
628+
tag_structure: tag_structure
629+
},
630+
stack_id,
631+
shape_handle
632+
) do
633+
%{change | move_tags: make_tags_from_pattern(tag_structure, record, stack_id, shape_handle)}
634+
end
635+
636+
defp make_tags_from_pattern(pattern, record, stack_id, shape_handle) do
637+
Utils.deep_map(pattern, fn column_name ->
638+
make_value_hash(stack_id, shape_handle, Map.get(record, column_name))
639+
end)
640+
end
641+
642+
defp make_value_hash(stack_id, shape_handle, value) do
643+
:crypto.hash(:md5, "#{stack_id}#{shape_handle}#{value}")
644+
|> Base.encode16(case: :lower)
619645
end
620646

621647
defp filtered_columns_changed?(%Changes.UpdatedRecord{old_record: record, record: record}),

packages/sync-service/test/electric/plug/router_test.exs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2276,23 +2276,34 @@ defmodule Electric.Plug.RouterTest do
22762276
]
22772277
test "a move-in from the inner shape causes a query and new entries in the outer shape", %{
22782278
opts: opts,
2279-
db_conn: db_conn
2279+
db_conn: db_conn,
2280+
stack_id: stack_id
22802281
} do
22812282
req = make_shape_req("child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)")
2282-
22832283
assert {req, 200, [data, snapshot_end]} = shape_req(req, opts)
2284+
2285+
tag =
2286+
:crypto.hash(:md5, stack_id <> req.handle <> "1")
2287+
|> Base.encode16(case: :lower)
2288+
22842289
assert %{"id" => "1", "parent_id" => "1", "value" => "10"} = data["value"]
2285-
assert %{"operation" => "insert", "tags" => [["1"]]} = data["headers"]
2290+
assert %{"operation" => "insert", "tags" => [[^tag]]} = data["headers"]
22862291
assert %{"headers" => %{"control" => "snapshot-end"}} = snapshot_end
22872292

22882293
task = live_shape_req(req, opts)
22892294

22902295
# Move in reflects in the new shape without invalidating it
22912296
Postgrex.query!(db_conn, "UPDATE parent SET value = 1 WHERE id = 2", [])
22922297

2298+
tag2 =
2299+
:crypto.hash(:md5, stack_id <> req.handle <> "2")
2300+
|> Base.encode16(case: :lower)
2301+
22932302
assert {_, 200, [data, %{"headers" => %{"control" => "up-to-date"}}]} = Task.await(task)
22942303
assert %{"id" => "2", "parent_id" => "2", "value" => "20"} = data["value"]
2295-
assert %{"operation" => "insert", "is_move_in" => true, "tags" => [["2"]]} = data["headers"]
2304+
2305+
assert %{"operation" => "insert", "is_move_in" => true, "tags" => [[^tag2]]} =
2306+
data["headers"]
22962307
end
22972308

22982309
@tag with_sql: [
@@ -2439,11 +2450,15 @@ defmodule Electric.Plug.RouterTest do
24392450
task = live_shape_req(req, ctx.opts)
24402451
Postgrex.query!(ctx.db_conn, "INSERT INTO members (user_id, team_id) VALUES (1, 2)")
24412452

2453+
tag =
2454+
:crypto.hash(:md5, ctx.stack_id <> req.handle <> "2")
2455+
|> Base.encode16(case: :lower)
2456+
24422457
assert {_, 200,
24432458
[
24442459
%{
24452460
"value" => %{"id" => "2", "name" => "Team B"},
2446-
"headers" => %{"tags" => [["2"]]}
2461+
"headers" => %{"tags" => [[^tag]]}
24472462
},
24482463
_
24492464
]} =
@@ -2495,10 +2510,14 @@ defmodule Electric.Plug.RouterTest do
24952510
"UPDATE members SET flag = TRUE WHERE (user_id, team_id) = (2, 2)"
24962511
)
24972512

2513+
tag =
2514+
:crypto.hash(:md5, ctx.stack_id <> req.handle <> "2")
2515+
|> Base.encode16(case: :lower)
2516+
24982517
assert {_, 200,
24992518
[
25002519
%{
2501-
"headers" => %{"tags" => [[["2", "2"]]]},
2520+
"headers" => %{"tags" => [[[^tag, ^tag]]]},
25022521
"value" => %{"id" => "2", "role" => "Member"}
25032522
},
25042523
_
@@ -2541,7 +2560,11 @@ defmodule Electric.Plug.RouterTest do
25412560
task = live_shape_req(req, ctx.opts)
25422561
Postgrex.query!(ctx.db_conn, "UPDATE parent SET other_value = 10 WHERE id = 2")
25432562

2544-
assert {_, 200, [%{"headers" => %{"tags" => [["20"]]}, "value" => %{"id" => "3"}}, _]} =
2563+
tag =
2564+
:crypto.hash(:md5, ctx.stack_id <> req.handle <> "20")
2565+
|> Base.encode16(case: :lower)
2566+
2567+
assert {_, 200, [%{"headers" => %{"tags" => [[^tag]]}, "value" => %{"id" => "3"}}, _]} =
25452568
Task.await(task)
25462569
end
25472570
end

0 commit comments

Comments
 (0)