WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit ef3ea77

Browse files
committed
Bookie
1 parent 10922e8 commit ef3ea77

File tree

4 files changed

+111
-70
lines changed

4 files changed

+111
-70
lines changed

crates/corro-agent/src/agent/util.rs

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ pub fn process_incomplete_version<T: Deref<Target = rusqlite::Connection> + Comm
11651165
":seq_arr": unnest_param(changes.iter().map(|change| change.seq)),
11661166
":ts_arr": unnest_param(changes.iter().map(|_| ts)),
11671167
},
1168-
|row| Ok(row.get::<_, String>(0)?),
1168+
|row| row.get::<_, String>(0),
11691169
)?
11701170
.collect::<rusqlite::Result<Vec<_>>>()?;
11711171

@@ -1289,17 +1289,17 @@ pub fn process_complete_version<T: Deref<Target = rusqlite::Connection> + Commit
12891289
let mut stmt = sp.prepare_cached(INSERT_CRSQL_CHANGES_QUERY)?;
12901290
trace!("inserting {:?} changes into crsql_changes", changes);
12911291
let params = params![
1292-
unnest_param(changes.iter().map(|c| c.table.as_str())),
1293-
unnest_param(changes.iter().map(|c| &c.pk)),
1294-
unnest_param(changes.iter().map(|c| c.cid.as_str())),
1295-
unnest_param(changes.iter().map(|c| &c.val)),
1296-
unnest_param(changes.iter().map(|c| &c.col_version)),
1297-
unnest_param(changes.iter().map(|c| &c.db_version)),
1298-
unnest_param(changes.iter().map(|c| &c.site_id)),
1299-
unnest_param(changes.iter().map(|c| &c.cl)),
1300-
unnest_param(changes.iter().map(|c| &c.seq)),
1301-
unnest_param(changes.iter().map(|_| &ts)),
1302-
];
1292+
unnest_param(changes.iter().map(|c| c.table.as_str())),
1293+
unnest_param(changes.iter().map(|c| &c.pk)),
1294+
unnest_param(changes.iter().map(|c| c.cid.as_str())),
1295+
unnest_param(changes.iter().map(|c| &c.val)),
1296+
unnest_param(changes.iter().map(|c| &c.col_version)),
1297+
unnest_param(changes.iter().map(|c| &c.db_version)),
1298+
unnest_param(changes.iter().map(|c| &c.site_id)),
1299+
unnest_param(changes.iter().map(|c| &c.cl)),
1300+
unnest_param(changes.iter().map(|c| &c.seq)),
1301+
unnest_param(changes.iter().map(|_| &ts)),
1302+
];
13031303
let mut last_rowids = stmt
13041304
.query_map(params, |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
13051305
.collect::<rusqlite::Result<Vec<(CrsqlDbVersion, CrsqlSeq, i64)>>>()?;
@@ -1309,17 +1309,14 @@ pub fn process_complete_version<T: Deref<Target = rusqlite::Connection> + Commit
13091309
if last_rowids_len != len {
13101310
// This should never happen, but if it does, i need data for debugging
13111311
let query_plan = sp
1312-
.prepare(&format!(
1313-
"EXPLAIN QUERY PLAN {}",
1314-
INSERT_CRSQL_CHANGES_QUERY
1315-
))?
1312+
.prepare(&format!("EXPLAIN QUERY PLAN {INSERT_CRSQL_CHANGES_QUERY}",))?
13161313
.query_map(params, |row| {
13171314
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
13181315
})?
13191316
// id, parent, notused, detail
13201317
.collect::<rusqlite::Result<Vec<(i32, i32, i32, String)>>>()?;
13211318
let vdbe_program = sp
1322-
.prepare(&format!("EXPLAIN {}", INSERT_CRSQL_CHANGES_QUERY))?
1319+
.prepare(&format!("EXPLAIN {INSERT_CRSQL_CHANGES_QUERY}"))?
13231320
.query_map(params, |row| {
13241321
Ok((
13251322
row.get(0)?,
@@ -1334,13 +1331,13 @@ pub fn process_complete_version<T: Deref<Target = rusqlite::Connection> + Commit
13341331
})?
13351332
.collect::<rusqlite::Result<
13361333
Vec<(
1337-
i32, // addr
1338-
String, // opcode
1339-
i32, // p1
1340-
i32, // p2
1341-
i32, // p3
1334+
i32, // addr
1335+
String, // opcode
1336+
i32, // p1
1337+
i32, // p2
1338+
i32, // p3
13421339
Option<String>, // p4
1343-
Option<i32>, // p5
1340+
Option<i32>, // p5
13441341
Option<String>, // comment
13451342
)>,
13461343
>>()?;
@@ -1361,9 +1358,11 @@ pub fn process_complete_version<T: Deref<Target = rusqlite::Connection> + Commit
13611358
// so the rowids we get will be shifted by one
13621359
// we need to shift them back
13631360
for i in 0..(last_rowids_len - 1) {
1364-
last_rowids[i].2 = last_rowids[i+1].2;
1361+
last_rowids[i].2 = last_rowids[i + 1].2;
13651362
}
1366-
last_rowids[last_rowids_len - 1].2 = sp.prepare_cached("SELECT last_insert_rowid()")?.query_row(params![], |row| row.get(0))?;
1363+
last_rowids[last_rowids_len - 1].2 = sp
1364+
.prepare_cached("SELECT last_insert_rowid()")?
1365+
.query_row(params![], |row| row.get(0))?;
13671366

13681367
debug!("last_rowids after shift: {last_rowids:?}");
13691368

crates/corro-agent/src/broadcast/mod.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -938,23 +938,28 @@ fn diff_member_states(
938938
)?
939939
.execute(params![
940940
unnest_param(to_update.iter().map(|(member, _)| member.id().id())),
941-
unnest_param(to_update.iter().map(|(member, _)| member.id().addr().to_string())),
942-
unnest_param(to_update.iter().map(|(member, _)| serde_json::to_string(&member).unwrap())),
941+
unnest_param(
942+
to_update
943+
.iter()
944+
.map(|(member, _)| member.id().addr().to_string())
945+
),
946+
unnest_param(
947+
to_update
948+
.iter()
949+
.map(|(member, _)| serde_json::to_string(&member).unwrap())
950+
),
943951
unnest_param(to_update.iter().map(|(_, rtt_min)| rtt_min)),
944952
unnest_param(to_update.iter().map(|_| updated_at)),
945953
])?;
946-
954+
947955
deleted += tx
948956
.prepare_cached(
949957
r#"DELETE FROM __corro_members
950958
WHERE actor_id IN (SELECT value0 FROM UNNEST(?))
951959
AND updated_at < ?
952960
"#,
953961
)?
954-
.execute(params![
955-
unnest_param(to_delete.iter()),
956-
updated_at
957-
])?;
962+
.execute(params![unnest_param(to_delete.iter()), updated_at])?;
958963

959964
tx.commit()?;
960965

crates/corro-types/src/agent.rs

Lines changed: 73 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
time::{Duration, Instant},
1515
};
1616

17-
use antithesis_sdk::{assert_always, assert_unreachable};
17+
use antithesis_sdk::assert_unreachable;
1818
use arc_swap::ArcSwap;
1919
use camino::Utf8PathBuf;
2020
use compact_str::{CompactString, ToCompactString};
@@ -50,8 +50,8 @@ use crate::{
5050
pubsub::SubsManager,
5151
schema::Schema,
5252
sqlite::{
53-
rusqlite_to_crsqlite, rusqlite_to_crsqlite_write, setup_conn, CrConn, Migration,
54-
SqlitePool, SqlitePoolError,
53+
rusqlite_to_crsqlite, rusqlite_to_crsqlite_write, setup_conn, unnest_param, CrConn,
54+
Migration, SqlitePool, SqlitePoolError,
5555
},
5656
updates::UpdatesManager,
5757
};
@@ -1135,51 +1135,88 @@ impl VersionsSnapshot {
11351135
trace!(actor_id = %self.actor_id, "new: {:?}", changes.insert_set);
11361136

11371137
// those are actual ranges we had stored and will change, remove them from the DB
1138-
for range in std::mem::take(&mut changes.remove_ranges) {
1139-
debug!(actor_id = %self.actor_id, "deleting {range:?}");
1138+
{
1139+
let remove_ranges = std::mem::take(&mut changes.remove_ranges);
1140+
let actors = unnest_param(remove_ranges.iter().map(|_| self.actor_id));
1141+
let starts = unnest_param(remove_ranges.iter().map(|r| r.start()));
1142+
let ends = unnest_param(remove_ranges.iter().map(|r| r.end()));
1143+
// TODO: use returning to discover which ranges were actually deleted
11401144
let count = conn
1141-
.prepare_cached("DELETE FROM __corro_bookkeeping_gaps WHERE actor_id = :actor_id AND start = :start AND end = :end")?
1145+
.prepare_cached(
1146+
"
1147+
DELETE FROM __corro_bookkeeping_gaps WHERE (actor_id, start, end)
1148+
IN (SELECT value0, value1, value2 FROM unnest(:actors, :starts, :ends))
1149+
",
1150+
)?
11421151
.execute(named_params! {
1143-
":actor_id": self.actor_id,
1144-
":start": range.start(),
1145-
":end": range.end()
1152+
":actors": actors,
1153+
":starts": starts,
1154+
":ends": ends,
11461155
})?;
1147-
if count != 1 {
1148-
warn!(actor_id = %self.actor_id, "did not delete gap from db: {range:?}");
1156+
if count != remove_ranges.len() {
1157+
warn!(actor_id = %self.actor_id, "did not delete some gaps from db: {remove_ranges:?}");
1158+
let details: serde_json::Value = json!({"count": count, "ranges": remove_ranges});
1159+
assert_unreachable!("ineffective deletion of gaps in-db", &details);
11491160
}
1150-
let details = json!({"count": count, "range": range});
1151-
assert_always!(count == 1, "ineffective deletion of gaps in-db", &details);
1152-
for version in CrsqlDbVersionRange::from(&range) {
1153-
self.partials.remove(&version);
1161+
1162+
for range in remove_ranges {
1163+
for version in CrsqlDbVersionRange::from(&range) {
1164+
self.partials.remove(&version);
1165+
}
1166+
self.needed.remove(range);
11541167
}
1155-
self.needed.remove(range);
11561168
}
11571169

1158-
for range in std::mem::take(&mut changes.insert_set) {
1159-
debug!(actor_id = %self.actor_id, "inserting {range:?}");
1160-
let res = conn
1170+
{
1171+
let insert_set = std::mem::take(&mut changes.insert_set);
1172+
let actors = unnest_param(insert_set.iter().map(|_| self.actor_id));
1173+
let starts = unnest_param(insert_set.iter().map(|r| r.start()));
1174+
let ends = unnest_param(insert_set.iter().map(|r| r.end()));
1175+
debug!(actor_id = %self.actor_id, "inserting {insert_set:?}");
1176+
// TODO: use returning to discover which ranges were actually inserted
1177+
let count = conn
11611178
.prepare_cached(
1162-
"INSERT INTO __corro_bookkeeping_gaps VALUES (:actor_id, :start, :end)",
1179+
"
1180+
INSERT OR IGNORE INTO __corro_bookkeeping_gaps (actor_id, start, end)
1181+
SELECT value0, value1, value2 FROM unnest(:actors, :starts, :ends)
1182+
",
11631183
)?
11641184
.execute(named_params! {
1165-
":actor_id": self.actor_id,
1166-
":start": range.start(),
1167-
":end": range.end()
1168-
});
1169-
1170-
if let Err(e) = res {
1171-
let (actor_id, start, end) : (ActorId, CrsqlDbVersion, CrsqlDbVersion) = conn.query_row("SELECT actor_id, start, end FROM __corro_bookkeeping_gaps WHERE actor_id = :actor_id AND start = :start", named_params! {
1172-
":actor_id": self.actor_id,
1173-
":start": range.start(),
1174-
}, |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
1175-
1176-
warn!("already had gaps entry! actor_id: {actor_id}, start: {start}, end: {end}");
1177-
let details = json!({"actor_id": actor_id, "start": start, "end": end});
1178-
assert_unreachable!("gaps entry present", &details);
1185+
":actors": actors,
1186+
":starts": starts,
1187+
":ends": ends,
1188+
})?;
1189+
if count != insert_set.len() {
1190+
warn!(actor_id = %self.actor_id, "did not insert some gaps into db: {insert_set:?}");
1191+
1192+
let existing: Vec<(ActorId, CrsqlDbVersion, CrsqlDbVersion)> = conn
1193+
.prepare_cached(
1194+
"
1195+
SELECT actor_id, start, end FROM __corro_bookkeeping_gaps
1196+
WHERE (actor_id, start)
1197+
IN (SELECT value0, value1 FROM unnest(:actors, :starts))",
1198+
)?
1199+
.query_map(
1200+
named_params! {
1201+
":actors": actors,
1202+
":starts": starts,
1203+
},
1204+
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
1205+
)?
1206+
.collect::<rusqlite::Result<Vec<_>>>()?;
1207+
1208+
warn!("already had gaps entries! existing: {existing:?}");
1209+
let details: serde_json::Value =
1210+
json!({"count": count, "insert_set": insert_set, "existing": existing});
1211+
assert_unreachable!("ineffective insertion of gaps in-db", &details);
1212+
return Err(rusqlite::Error::ModuleError(
1213+
"Gaps entries already present in DB".to_string(),
1214+
));
1215+
}
11791216

1180-
return Err(e);
1217+
for range in insert_set {
1218+
self.needed.insert(range);
11811219
}
1182-
self.needed.insert(range);
11831220
}
11841221

11851222
self.max = changes.max.take();

crates/corro-types/src/sqlite.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
};
55

66
use once_cell::sync::Lazy;
7-
use rusqlite::types::{ToSqlOutput, Value, ToSql};
7+
use rusqlite::types::{ToSql, ToSqlOutput, Value};
88
use rusqlite::{
99
params, trace::TraceEventCodes, vtab::eponymous_only_module, Connection, Transaction,
1010
};
@@ -247,7 +247,7 @@ pub fn migrate(conn: &mut Connection, migrations: Vec<Box<dyn Migration>>) -> ru
247247
// into a vector of SQL values, which can be used as a parameter
248248
// to the `unnest` function in SQL.
249249
// Due to limitations in rusqlite we need to use owned values
250-
pub fn unnest_param<'a, T, K>(iter: T) -> Rc<Vec<Value>>
250+
pub fn unnest_param<T, K>(iter: T) -> Rc<Vec<Value>>
251251
where
252252
T: IntoIterator<Item = K>,
253253
K: ToSql,

0 commit comments

Comments
 (0)