WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
3998895
refactor(webhook): event filtering to support separate query paths fo…
VenuMadhav2541 Nov 23, 2025
078db2d
chore: run formatter
hyperswitch-bot[bot] Nov 23, 2025
220c75b
fix(webhooks): replace unreachable!() with explicit error for clippy …
VenuMadhav2541 Nov 23, 2025
23b39d2
fix(webhooks): replace unreachable!() with explicit error for clippy …
VenuMadhav2541 Nov 23, 2025
0bb4043
refactor: Enhance webhook_events core logic for targeted event searches
VenuMadhav2541 Nov 25, 2025
d883c5e
chore: run formatter
hyperswitch-bot[bot] Nov 25, 2025
16910a0
fix(webhooks): improve error message clarity for event list constraints
VenuMadhav2541 Nov 25, 2025
4a52e7b
Merge branch 'webhooks_selective_search' of https://github.com/juspay…
VenuMadhav2541 Nov 25, 2025
6b9bfbe
refactor(webhooks): enhance event query tests and add clippy allowance
VenuMadhav2541 Nov 25, 2025
a34f2fe
refactor(webhooks): change event lookup from list to single find oper…
VenuMadhav2541 Nov 28, 2025
b461072
refactor(webhooks): add event_id = initial_attempt_id constraint to f…
VenuMadhav2541 Dec 1, 2025
36e4ad3
fix(webhooks): ensure event queries return only initial event, not re…
VenuMadhav2541 Dec 1, 2025
2b446fd
Merge branch 'main' of https://github.com/juspay/hyperswitch into web…
VenuMadhav2541 Dec 2, 2025
e109fe7
refactor(webhooks): improve event processing with concurrent executio…
VenuMadhav2541 Dec 5, 2025
8624c5d
chore: run formatter
hyperswitch-bot[bot] Dec 5, 2025
b2b1f29
refactor(diesel_models): consolidate duplicate imports in events module
VenuMadhav2541 Dec 5, 2025
a7aa0ef
chore: run formatter
hyperswitch-bot[bot] Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/api_models/src/webhook_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub enum EventListConstraintsInternal {
},
ObjectIdFilter {
object_id: String,
},
EventIdFilter {
event_id: String,
},
}
Expand Down
134 changes: 104 additions & 30 deletions crates/diesel_models/src/query/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,62 @@ impl Event {
pub async fn list_initial_attempts_by_merchant_id_primary_object_id_or_initial_attempt_id(
conn: &PgPooledConn,
merchant_id: &common_utils::id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<<Self as HasTable>::Table, _, _, _>(
conn,
dsl::event_id
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::{
debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods,
NullableExpressionMethods, QueryDsl,
};
use error_stack::ResultExt;
use router_env::logger;

use super::generics::db_metrics::{track_database_call, DatabaseOperation};
use crate::errors::DatabaseError;

if let Some(event_id) = initial_attempt_id {
let predicate = dsl::event_id
.nullable()
.eq(dsl::initial_attempt_id) // Filter initial attempts only
.and(dsl::merchant_id.eq(merchant_id.to_owned()))
.and(
dsl::primary_object_id
.eq(primary_object_id.to_owned())
.or(dsl::initial_attempt_id.eq(initial_attempt_id.to_owned())),
),
None,
None,
Some(dsl::created_at.desc()),
)
.await
.and(dsl::initial_attempt_id.eq(event_id.to_owned()));

let result =
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(conn, predicate)
.await;

match result {
Ok(event) => Ok(vec![event]),
Err(err) => match err.current_context() {
DatabaseError::NotFound => Ok(vec![]),
_ => Err(err).attach_printable("Error finding event by event_id"),
},
}
} else if let Some(obj_id) = primary_object_id {
let query = Self::table()
.filter(
dsl::event_id
.nullable()
.eq(dsl::initial_attempt_id) // Filter initial attempts only
.and(dsl::merchant_id.eq(merchant_id.to_owned()))
.and(dsl::primary_object_id.eq(obj_id.to_owned())),
)
.order(dsl::created_at.desc())
.into_boxed();

logger::debug!(query = %debug_query::<Pg, _>(&query).to_string());

track_database_call::<Self, _, _>(
query.get_results_async(conn),
DatabaseOperation::Filter,
)
.await
.change_context(DatabaseError::Others)
.attach_printable("Error filtering events by object_id")
} else {
Ok(vec![])
}
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -137,25 +174,62 @@ impl Event {
pub async fn list_initial_attempts_by_profile_id_primary_object_id_or_initial_attempt_id(
conn: &PgPooledConn,
profile_id: &common_utils::id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
) -> StorageResult<Vec<Self>> {
generics::generic_filter::<<Self as HasTable>::Table, _, _, _>(
conn,
dsl::event_id
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::{
debug_query, pg::Pg, BoolExpressionMethods, ExpressionMethods,
NullableExpressionMethods, QueryDsl,
};
use error_stack::ResultExt;
use router_env::logger;

use super::generics::db_metrics::{track_database_call, DatabaseOperation};
use crate::errors::DatabaseError;

if let Some(event_id) = initial_attempt_id {
let predicate = dsl::event_id
.nullable()
.eq(dsl::initial_attempt_id) // Filter initial attempts only
.and(dsl::business_profile_id.eq(profile_id.to_owned()))
.and(
dsl::primary_object_id
.eq(primary_object_id.to_owned())
.or(dsl::initial_attempt_id.eq(initial_attempt_id.to_owned())),
),
None,
None,
Some(dsl::created_at.desc()),
)
.await
.and(dsl::initial_attempt_id.eq(event_id.to_owned()));

let result =
generics::generic_find_one::<<Self as HasTable>::Table, _, _>(conn, predicate)
.await;

match result {
Ok(event) => Ok(vec![event]),
Err(err) => match err.current_context() {
DatabaseError::NotFound => Ok(vec![]),
_ => Err(err).attach_printable("Error finding event by event_id"),
},
}
} else if let Some(obj_id) = primary_object_id {
let query = Self::table()
.filter(
dsl::event_id
.nullable()
.eq(dsl::initial_attempt_id) // Filter initial attempts only
.and(dsl::business_profile_id.eq(profile_id.to_owned()))
.and(dsl::primary_object_id.eq(obj_id.to_owned())),
)
.order(dsl::created_at.desc())
.into_boxed();

logger::debug!(query = %debug_query::<Pg, _>(&query).to_string());

track_database_call::<Self, _, _>(
query.get_results_async(conn),
DatabaseOperation::Filter,
)
.await
.change_context(DatabaseError::Others)
.attach_printable("Error filtering events by object_id")
} else {
Ok(vec![])
}
}

#[allow(clippy::too_many_arguments)]
Expand Down
43 changes: 35 additions & 8 deletions crates/router/src/core/webhooks/webhook_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,53 @@ pub async fn list_initial_delivery_attempts(
(now.date() - time::Duration::days(INITIAL_DELIVERY_ATTEMPTS_LIST_MAX_DAYS)).midnight();

let (events, total_count) = match constraints {
api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter {
object_id,
event_id,
} => {
api_models::webhook_events::EventListConstraintsInternal::ObjectIdFilter { object_id } => {
let events =
match account {
MerchantAccountOrProfile::MerchantAccount(merchant_account) => store
.list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
Copy link
Contributor

@Aishwariyaa-Anand Aishwariyaa-Anand Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one of ObjectID or EventID will be ever sent, there would be no scenario where both are provided.It would be better to split this into two separate functions list_initial_events_by_merchant_id_primary_object_id and list_initial_events_by_merchant_id_initial_attempt_id

merchant_account.get_id(),
Some(object_id.as_str()),
None,
&key_store,
)
.await,
MerchantAccountOrProfile::Profile(business_profile) => {
store
.list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
business_profile.get_id(),
Some(object_id.as_str()),
None,
&key_store,
)
.await
}
}
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to list events with specified constraints")?;

let total_count = i64::try_from(events.len())
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Error while converting from usize to i64")?;
(events, total_count)
}
api_models::webhook_events::EventListConstraintsInternal::EventIdFilter { event_id } => {
let events =
match account {
MerchantAccountOrProfile::MerchantAccount(merchant_account) => store
.list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
merchant_account.get_id(),
&object_id,
&event_id,
None,
Some(event_id.as_str()),
&key_store,
)
.await,
MerchantAccountOrProfile::Profile(business_profile) => {
store
.list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
business_profile.get_id(),
&object_id,
&event_id,
None,
Some(event_id.as_str()),
&key_store,
)
.await
Expand Down
92 changes: 60 additions & 32 deletions crates/router/src/db/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ where
async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;

Expand All @@ -74,8 +74,8 @@ where
async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
&self,
profile_id: &common_utils::id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError>;

Expand Down Expand Up @@ -187,8 +187,8 @@ impl EventInterface for Store {
async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
Expand Down Expand Up @@ -302,8 +302,8 @@ impl EventInterface for Store {
async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
&self,
profile_id: &common_utils::id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
Expand Down Expand Up @@ -521,21 +521,35 @@ impl EventInterface for MockDb {
async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&self,
merchant_id: &common_utils::id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let locked_events = self.events.lock().await;
let events = locked_events
.iter()
.filter(|event| {

let events = if let Some(event_id) = initial_attempt_id {
if let Some(event) = locked_events.iter().find(|event| {
event.merchant_id == Some(merchant_id.to_owned())
&& event.initial_attempt_id.as_deref() == Some(&event.event_id)
&& (event.primary_object_id == primary_object_id
|| event.initial_attempt_id.as_deref() == Some(initial_attempt_id))
})
.cloned()
.collect::<Vec<_>>();
&& event.initial_attempt_id.as_deref() == Some(event_id)
}) {
vec![event.clone()]
} else {
vec![]
}
} else if let Some(obj_id) = primary_object_id {
locked_events
.iter()
.filter(|event| {
event.merchant_id == Some(merchant_id.to_owned())
&& event.initial_attempt_id.as_deref() == Some(&event.event_id)
&& event.primary_object_id.as_str() == obj_id
})
.cloned()
.collect::<Vec<_>>()
} else {
vec![]
};

let mut domain_events = Vec::with_capacity(events.len());

Expand Down Expand Up @@ -659,21 +673,35 @@ impl EventInterface for MockDb {
async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
&self,
profile_id: &common_utils::id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
let locked_events = self.events.lock().await;
let events = locked_events
.iter()
.filter(|event| {

let events = if let Some(event_id) = initial_attempt_id {
if let Some(event) = locked_events.iter().find(|event| {
event.business_profile_id == Some(profile_id.to_owned())
&& event.initial_attempt_id.as_ref() == Some(&event.event_id)
&& (event.primary_object_id == primary_object_id
|| event.initial_attempt_id.as_deref() == Some(initial_attempt_id))
})
.cloned()
.collect::<Vec<_>>();
&& event.initial_attempt_id.as_deref() == Some(&event.event_id)
&& event.event_id == event_id
}) {
vec![event.clone()]
} else {
vec![]
}
} else if let Some(obj_id) = primary_object_id {
locked_events
.iter()
.filter(|event| {
event.business_profile_id == Some(profile_id.to_owned())
&& event.initial_attempt_id.as_deref() == Some(&event.event_id)
&& event.primary_object_id.as_str() == obj_id
})
.cloned()
.collect::<Vec<_>>()
} else {
vec![]
};

let mut domain_events = Vec::with_capacity(events.len());

Expand Down Expand Up @@ -1453,9 +1481,9 @@ mod tests {
.store
.list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&business_profile.merchant_id,
&primary_object_id.clone(),
&initial_attempt_id.clone(),
platform.get_processor().get_key_store(),
Some(primary_object_id.as_str()),
Some(initial_attempt_id.as_str()),
&merchant_key_store,
)
.await?;

Expand Down
8 changes: 4 additions & 4 deletions crates/router/src/db/kafka_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,8 @@ impl EventInterface for KafkaStore {
async fn list_initial_events_by_merchant_id_primary_object_or_initial_attempt_id(
&self,
merchant_id: &id_type::MerchantId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
self.diesel_store
Expand Down Expand Up @@ -794,8 +794,8 @@ impl EventInterface for KafkaStore {
async fn list_initial_events_by_profile_id_primary_object_or_initial_attempt_id(
&self,
profile_id: &id_type::ProfileId,
primary_object_id: &str,
initial_attempt_id: &str,
primary_object_id: Option<&str>,
initial_attempt_id: Option<&str>,
merchant_key_store: &domain::MerchantKeyStore,
) -> CustomResult<Vec<domain::Event>, errors::StorageError> {
self.diesel_store
Expand Down
Loading
Loading