WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bin/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async fn meta_example(info_client: &InfoClient) {
async fn meta_and_asset_contexts_example(info_client: &InfoClient) {
info!(
"Meta and asset contexts: {:?}",
info_client.meta_and_asset_contexts().await.unwrap()
info_client.meta_and_asset_contexts(None).await.unwrap()
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/bin/ws_all_mids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() {

let (sender, mut receiver) = unbounded_channel();
let subscription_id = info_client
.subscribe(Subscription::AllMids, sender)
.subscribe(Subscription::AllMids { dex: Some("xyz".to_string()) }, sender)
.await
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions src/bin/ws_candles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async fn main() {
Subscription::Candle {
coin: "ETH".to_string(),
interval: "1m".to_string(),
dex: Some("xyz".to_string()),
},
sender,
)
Expand Down
26 changes: 23 additions & 3 deletions src/info/info_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct CandleSnapshotRequest {
interval: String,
start_time: u64,
end_time: u64,
#[serde(skip_serializing_if = "Option::is_none")]
dex: Option<String>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
Expand Down Expand Up @@ -55,7 +57,10 @@ pub enum InfoRequest {
oid: u64,
},
Meta,
MetaAndAssetCtxs,
MetaAndAssetCtxs {
#[serde(skip_serializing_if = "Option::is_none")]
dex: Option<String>,
},
SpotMeta,
SpotMetaAndAssetCtxs,
AllMids,
Expand Down Expand Up @@ -212,8 +217,11 @@ impl InfoClient {
self.send_info_request(input).await
}

pub async fn meta_and_asset_contexts(&self) -> Result<(Meta, Vec<AssetContext>)> {
let input = InfoRequest::MetaAndAssetCtxs;
pub async fn meta_and_asset_contexts(
&self,
dex: impl Into<Option<String>>,
) -> Result<(Meta, Vec<AssetContext>)> {
let input = InfoRequest::MetaAndAssetCtxs { dex: dex.into() };
self.send_info_request(input).await
}

Expand Down Expand Up @@ -281,13 +289,25 @@ impl InfoClient {
interval: String,
start_time: u64,
end_time: u64,
) -> Result<Vec<CandlesSnapshotResponse>> {
self.candles_snapshot_with_dex(coin, interval, start_time, end_time, None).await
}

pub async fn candles_snapshot_with_dex(
&self,
coin: String,
interval: String,
start_time: u64,
end_time: u64,
dex: Option<String>,
) -> Result<Vec<CandlesSnapshotResponse>> {
let input = InfoRequest::CandleSnapshot {
req: CandleSnapshotRequest {
coin,
interval,
start_time,
end_time,
dex,
},
};
self.send_info_request(input).await
Expand Down
2 changes: 1 addition & 1 deletion src/market_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl MarketMaker {

// Subscribe to AllMids so we can market make around the mid price
self.info_client
.subscribe(Subscription::AllMids, sender)
.subscribe(Subscription::AllMids { dex: Some("xyz".to_string()) }, sender)
.await
.unwrap();

Expand Down
123 changes: 98 additions & 25 deletions src/ws/ws_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,51 @@ pub(crate) struct WsManager {
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
pub enum Subscription {
AllMids,
Notification { user: Address },
WebData2 { user: Address },
Candle { coin: String, interval: String },
L2Book { coin: String },
Trades { coin: String },
OrderUpdates { user: Address },
UserEvents { user: Address },
UserFills { user: Address },
UserFundings { user: Address },
UserNonFundingLedgerUpdates { user: Address },
ActiveAssetCtx { coin: String },
ActiveAssetData { user: Address, coin: String },
Bbo { coin: String },
AllMids {
dex: Option<String>,
},
Notification {
user: Address,
},
WebData2 {
user: Address,
},
Candle {
coin: String,
interval: String,
dex: Option<String>,
},
L2Book {
coin: String,
},
Trades {
coin: String,
},
OrderUpdates {
user: Address,
},
UserEvents {
user: Address,
},
UserFills {
user: Address,
},
UserFundings {
user: Address,
},
UserNonFundingLedgerUpdates {
user: Address,
},
ActiveAssetCtx {
coin: String,
},
ActiveAssetData {
user: Address,
coin: String,
},
Bbo {
coin: String,
},
}

#[derive(Deserialize, Clone, Debug)]
Expand Down Expand Up @@ -226,9 +257,44 @@ impl WsManager {
.0)
}

fn get_possible_identifiers(message: &Message) -> Result<Vec<String>> {
let mut identifiers = Vec::new();

match message {
Message::AllMids(_) => {
// Try both None and common dex values
for dex in [None, Some("xyz".to_string()), Some("hyna".to_string())] {
if let Ok(id) = serde_json::to_string(&Subscription::AllMids { dex }) {
identifiers.push(id);
}
}
}
Message::Candle(candle) => {
// Try both None and common dex values for candles
for dex in [None, Some("xyz".to_string()), Some("hyna".to_string())] {
if let Ok(id) = serde_json::to_string(&Subscription::Candle {
coin: candle.data.coin.clone(),
interval: candle.data.interval.clone(),
dex,
}) {
identifiers.push(id);
}
}
}
_ => {
// For other message types, use the old single identifier logic
if let Ok(id) = WsManager::get_identifier(message) {
identifiers.push(id);
}
}
}

Ok(identifiers)
}

fn get_identifier(message: &Message) -> Result<String> {
match message {
Message::AllMids(_) => serde_json::to_string(&Subscription::AllMids)
Message::AllMids(_) => serde_json::to_string(&Subscription::AllMids { dex: None })
.map_err(|e| Error::JsonParse(e.to_string())),
Message::User(_) => Ok("userEvents".to_string()),
Message::UserFills(fills) => serde_json::to_string(&Subscription::UserFills {
Expand All @@ -252,6 +318,7 @@ impl WsManager {
Message::Candle(candle) => serde_json::to_string(&Subscription::Candle {
coin: candle.data.coin.clone(),
interval: candle.data.interval.clone(),
dex: None,
})
.map_err(|e| Error::JsonParse(e.to_string())),
Message::OrderUpdates(_) => Ok("orderUpdates".to_string()),
Expand Down Expand Up @@ -311,21 +378,27 @@ impl WsManager {
}
let message = serde_json::from_str::<Message>(&data)
.map_err(|e| Error::JsonParse(e.to_string()))?;
let identifier = WsManager::get_identifier(&message)?;
if identifier.is_empty() {
// Try to find matching subscriptions for this message
// We need to try multiple dex values since the message doesn't contain dex info
let possible_identifiers = WsManager::get_possible_identifiers(&message)?;
if possible_identifiers.is_empty() {
return Ok(());
}

let mut subscriptions = subscriptions.lock().await;
let mut res = Ok(());
if let Some(subscription_datas) = subscriptions.get_mut(&identifier) {
for subscription_data in subscription_datas {
if let Err(e) = subscription_data
.sending_channel
.send(message.clone())
.map_err(|e| Error::WsSend(e.to_string()))
{
res = Err(e);

// Try each possible identifier until we find matching subscriptions
for identifier in possible_identifiers {
if let Some(subscription_datas) = subscriptions.get_mut(&identifier) {
for subscription_data in subscription_datas {
if let Err(e) = subscription_data
.sending_channel
.send(message.clone())
.map_err(|e| Error::WsSend(e.to_string()))
{
res = Err(e);
}
}
}
}
Expand Down