diff --git a/Cargo.toml b/Cargo.toml index d29e294d..f1669f79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,3 +30,4 @@ thiserror = "2.0" tokio = { version = "1.0", features = ["full"] } tokio-tungstenite = { version = "0.20.0", features = ["native-tls"] } uuid = { version = "1.0", features = ["v4"] } +futures-channel = "0.3.31" diff --git a/src/exchange/exchange_client.rs b/src/exchange/exchange_client.rs index 70b686b8..2aaa2858 100644 --- a/src/exchange/exchange_client.rs +++ b/src/exchange/exchange_client.rs @@ -63,7 +63,7 @@ struct ExchangePayload { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(tag = "type")] #[serde(rename_all = "camelCase")] -pub enum Actions { +pub(crate) enum Actions { UsdSend(UsdSend), UpdateLeverage(UpdateLeverage), UpdateIsolatedMargin(UpdateIsolatedMargin), diff --git a/src/exchange/mod.rs b/src/exchange/mod.rs index 7b9e9042..12b452f7 100644 --- a/src/exchange/mod.rs +++ b/src/exchange/mod.rs @@ -1,10 +1,10 @@ -mod actions; +pub mod actions; mod builder; mod cancel; mod exchange_client; mod exchange_responses; mod modify; -mod order; +pub mod order; pub use actions::*; pub use builder::*; @@ -14,5 +14,5 @@ pub use exchange_responses::*; pub use modify::{ClientModifyRequest, ModifyRequest}; pub use order::{ ClientLimit, ClientOrder, ClientOrderRequest, ClientTrigger, MarketCloseParams, - MarketOrderParams, Order, + MarketOrderParams, Order, OrderRequest, }; diff --git a/src/lib.rs b/src/lib.rs index 86f20e2a..c1de6862 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ mod meta; mod prelude; mod req; mod signature; -mod ws; +pub mod ws; pub use consts::{EPSILON, LOCAL_API_URL, MAINNET_API_URL, TESTNET_API_URL}; pub use eip712::Eip712; pub use errors::Error; diff --git a/src/ws/exchange_helper.rs b/src/ws/exchange_helper.rs new file mode 100644 index 00000000..d723d41f --- /dev/null +++ b/src/ws/exchange_helper.rs @@ -0,0 +1,294 @@ +use crate::WsManager; +use crate::{ + exchange::{order::OrderRequest, BuilderInfo}, + helpers::next_nonce, + prelude::*, + signature::{sign_l1_action,sign_typed_data}, + BulkOrder,SpotSend, Error, +}; +use alloy::primitives::{keccak256, Address, Signature, B256, U256}; +use alloy::signers::local::PrivateKeySigner; +use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; + +#[derive(Debug, Deserialize)] +#[serde(untagged)] +pub(crate) enum OrderStatus { + Filled { filled: OrderFillDetails }, + Resting { resting: OrderRestingDetails }, + Error { error: String }, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct OrderFillDetails { + pub oid: u64, + pub total_sz: Option, + pub avg_px: Option, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct OrderRestingDetails { + pub oid: u64, +} + +// Use #[serde(untagged)] to remove the enum wrapper +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +pub(crate) enum Actions { + Order(BulkOrder), + SpotSend(SpotSend), +} + +#[derive(Debug, Clone, Deserialize)] +struct SignatureData { + r: U256, + s: U256, + v: u8, +} + +impl Serialize for SignatureData { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("SignatureData", 3)?; + state.serialize_field("r", &self.r)?; + state.serialize_field("s", &self.s)?; + state.serialize_field("v", &self.v)?; + state.end() + } +} + +impl From for SignatureData { + fn from(sig: Signature) -> Self { + SignatureData { + r: sig.r().into(), + s: sig.s().into(), + v: if sig.v() { 28 } else { 27 } as u8, + } + } +} + +#[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct ExchangePayload { + action: serde_json::Value, + signature: SignatureData, + nonce: u64, + vault_address: Option
, +} + +impl Actions { + fn hash(&self, timestamp: u64, vault_address: Option
) -> Result { + let mut bytes = + rmp_serde::to_vec_named(self).map_err(|e| Error::RmpParse(e.to_string()))?; + bytes.extend(timestamp.to_be_bytes()); + if let Some(vault_address) = vault_address { + bytes.push(1); + bytes.extend(vault_address); + } else { + bytes.push(0); + } + Ok(keccak256(bytes)) + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct SignedAction { + action: Actions, + nonce: u64, + signature: SignatureData, + #[serde(skip_serializing_if = "Option::is_none")] + vault_address: Option, +} + +pub async fn bulk_order_with_builder( + orders: Vec, + wallet: Option<&PrivateKeySigner>, + mut builder: Option, + vault_address: Option
, + nonce: u64, +) -> Result { + let wallet = wallet + .as_ref() + .ok_or(Error::JsonParse("Wallet not provided".to_string()))?; + + if let Some(builder) = &mut builder { + builder.builder = builder.builder.to_lowercase(); + } else { + builder = None; + } + + let mut transformed_orders = Vec::new(); + + for order in orders { + transformed_orders.push(order); + } + + // Create the action with proper type field + let action = Actions::Order(BulkOrder { + orders: transformed_orders, + grouping: "na".to_string(), + builder: builder, + }); + let action_value = + serde_json::to_value(&action).map_err(|e| Error::JsonParse(e.to_string()))?; + println!("Action: {:#?}", action_value); + // Hash the Actions (this serializes to MessagePack) + let connection_id = action.hash(nonce, vault_address)?; + println!("Connection ID: {:#?}", connection_id); + + let signature = sign_l1_action(wallet, connection_id, true).unwrap(); + let exchange_payload = ExchangePayload { + action: action_value, + signature: signature.into(), + nonce: nonce, + vault_address: vault_address, + }; + + let payload = + serde_json::to_value(&exchange_payload).map_err(|e| Error::JsonParse(e.to_string()))?; + return Ok(payload); +} + + pub async fn spot_transfer( + amount: &str, + destination: &str, + token: &str, + wallet: PrivateKeySigner, + nonce: u64, + ) -> Result { + + let spot_send = SpotSend { + signature_chain_id: 421614, + hyperliquid_chain: "Mainnet".to_string(), + destination: destination.to_string(), + amount: amount.to_string(), + time: nonce, + token: token.to_string(), + }; + let signature = sign_typed_data(&spot_send, &wallet)?; + let action = serde_json::to_value(Actions::SpotSend(spot_send)) + .map_err(|e| Error::JsonParse(e.to_string()))?; + + let exchange_payload = ExchangePayload { + action: action, + signature: signature.into(), + nonce: nonce, + vault_address: None, + }; + let payload = serde_json::to_value(&exchange_payload).map_err(|e| Error::JsonParse(e.to_string()))?; + return Ok(payload); + } + + + +#[cfg(test)] +mod tests { + + use super::*; + use crate::helpers::next_nonce; + use crate::{exchange::order::Limit, Order}; + use alloy::signers::local::PrivateKeySigner; + use std::{str::FromStr, time::Duration}; + + #[tokio::test] + async fn test_send_order() { + let nonce = next_nonce(); + let _ = env_logger::builder() + .is_test(true) + .filter_level(log::LevelFilter::Info) + .try_init(); + + let ws_url = "wss://api.hyperliquid.xyz/ws"; + + let private_key = ""; + let wallet = PrivateKeySigner::from_str(private_key).expect("Invalid private key"); + + println!("Creating WsManager..."); + let mut ws_manager = WsManager::new(ws_url.to_string(), true) + .await + .expect("Failed to create WsManager"); + + println!("Waiting for WebSocket connection to stabilize..."); + tokio::time::sleep(Duration::from_secs(2)).await; + + let order = OrderRequest { + asset: 10151, + is_buy: false, + limit_px: "3900".to_string(), + sz: "0.004".to_string(), + reduce_only: false, + order_type: Order::Limit(Limit { + tif: "Gtc".to_string(), + }), + cloid: None, + }; + + let builder = None; + + println!("Sending order..."); + let payload = bulk_order_with_builder(vec![order], Some(&wallet), builder, None, nonce) + .await + .unwrap(); + + let result = ws_manager.post(payload, nonce).await; + match result { + Ok(response) => { + println!( + "Full Response: {}", + serde_json::to_string_pretty(&response).unwrap() + ); + } + Err(e) => { + eprintln!("Error sending order: {:?}", e); + } + } + } + + async fn test_spot_transfer() { + let nonce = next_nonce(); + let _ = env_logger::builder() + .is_test(true) + .filter_level(log::LevelFilter::Info) + .try_init(); + + let ws_url = "wss://api.hyperliquid.xyz/ws"; + + let private_key = ""; + let wallet = PrivateKeySigner::from_str(private_key).expect("Invalid private key"); + + println!("Creating WsManager..."); + let mut ws_manager = WsManager::new(ws_url.to_string(), true) + .await + .expect("Failed to create WsManager"); + + println!("Waiting for WebSocket connection to stabilize..."); + tokio::time::sleep(Duration::from_secs(2)).await; + + let amount = "0.004215951"; + let destination = "0x20000000000000000000000000000000000000dd"; + let token = "UETH:0xe1edd30daaf5caac3fe63569e24748da"; + + println!("Sending spot transfer..."); + let payload = spot_transfer(amount, destination, token, wallet, nonce) + .await + .unwrap(); + + let result = ws_manager.post(payload, nonce).await; + println!("Result: {:#?}", result); + match result { + Ok(response) => { + println!( + "Full Response: {}", + serde_json::to_string_pretty(&response).unwrap() + ); + } + Err(e) => { + eprintln!("Error sending order: {:?}", e); + } + } + } + +} diff --git a/src/ws/mod.rs b/src/ws/mod.rs index a0304095..6fe81ec9 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -1,7 +1,11 @@ +pub mod exchange_helper; mod message_types; +mod post_structs; mod sub_structs; mod ws_manager; pub use message_types::*; +pub(crate) use post_structs::*; pub use sub_structs::*; -pub(crate) use ws_manager::WsManager; +pub use ws_manager::WsManager; pub use ws_manager::{Message, Subscription}; +pub use exchange_helper::*; diff --git a/src/ws/post_structs.rs b/src/ws/post_structs.rs new file mode 100644 index 00000000..7bc4b7eb --- /dev/null +++ b/src/ws/post_structs.rs @@ -0,0 +1,63 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::convert::TryFrom; + +/// Generic WebSocket response that can be either a success or error response +#[derive(Debug, Clone)] +pub(crate) enum WsResponse { + Post(WsPostResponse), + Error(WsErrorResponse), + Other(serde_json::Value), +} + +impl TryFrom for WsResponse { + type Error = serde_json::Error; + + fn try_from(value: Value) -> Result>::Error> { + if let Ok(post_response) = serde_json::from_value::(value.clone()) { + return Ok(WsResponse::Post(post_response)); + } else if let Ok(error_response) = serde_json::from_value::(value.clone()) + { + return Ok(WsResponse::Error(error_response)); + } else { + return Ok(WsResponse::Other(value)); + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct WsErrorResponse { + pub channel: String, + pub data: WsErrorResponseData, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct WsErrorResponseData { + pub id: u64, + pub error: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct WsRequest { + #[serde(rename = "type")] + pub type_: String, + pub payload: serde_json::Value, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct WsPostRequest { + pub method: String, + pub id: u64, + pub request: WsRequest, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct WsPostResponse { + pub channel: String, + pub data: WsPostResponseData, +} +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct WsPostResponseData { + pub id: u64, + pub response: serde_json::Value, +} diff --git a/src/ws/ws_manager.rs b/src/ws/ws_manager.rs index 4035baf3..164a9887 100755 --- a/src/ws/ws_manager.rs +++ b/src/ws/ws_manager.rs @@ -31,6 +31,7 @@ use crate::{ ActiveAssetData, ActiveSpotAssetCtx, AllMids, Bbo, Candle, L2Book, OrderUpdates, Trades, User, }, + ws::post_structs::{WsPostRequest, WsPostResponse, WsRequest, WsResponse}, ActiveAssetCtx, Error, Notification, UserFills, UserFundings, UserNonFundingLedgerUpdates, WebData2, }; @@ -42,10 +43,11 @@ struct SubscriptionData { id: String, } #[derive(Debug)] -pub(crate) struct WsManager { +pub struct WsManager { stop_flag: Arc, writer: Arc>, protocol::Message>>>, subscriptions: Arc>>>, + pending_responses: Arc>>>, subscription_id: u32, subscription_identifiers: HashMap, } @@ -93,6 +95,10 @@ pub enum Message { ActiveSpotAssetCtx(ActiveSpotAssetCtx), Bbo(Bbo), Pong, + #[serde(rename = "error")] + Error(serde_json::Value), + #[serde(rename = "post")] + Post(serde_json::Value), } #[derive(Serialize)] @@ -109,7 +115,7 @@ pub(crate) struct Ping { impl WsManager { const SEND_PING_INTERVAL: u64 = 50; - pub(crate) async fn new(url: String, reconnect: bool) -> Result { + pub async fn new(url: String, reconnect: bool) -> Result { let stop_flag = Arc::new(AtomicBool::new(false)); let (writer, mut reader) = Self::connect(&url).await?.split(); @@ -118,6 +124,10 @@ impl WsManager { let subscriptions_map: HashMap> = HashMap::new(); let subscriptions = Arc::new(Mutex::new(subscriptions_map)); let subscriptions_copy = Arc::clone(&subscriptions); + let pending_responses: Arc< + Mutex>>, + > = Arc::new(Mutex::new(HashMap::new())); + let pending_responses_copy = Arc::clone(&pending_responses); { let writer = writer.clone(); @@ -125,8 +135,12 @@ impl WsManager { let reader_fut = async move { while !stop_flag.load(Ordering::Relaxed) { if let Some(data) = reader.next().await { - if let Err(err) = - WsManager::parse_and_send_data(data, &subscriptions_copy).await + if let Err(err) = WsManager::parse_and_send_data( + data, + &subscriptions_copy, + &pending_responses_copy, + ) + .await { error!("Error processing data received by WsManager reader: {err}"); } @@ -214,6 +228,7 @@ impl WsManager { stop_flag, writer, subscriptions, + pending_responses, subscription_id: 0, subscription_identifiers: HashMap::new(), }) @@ -293,7 +308,10 @@ impl WsManager { coin: bbo.data.coin.clone(), }) .map_err(|e| Error::JsonParse(e.to_string())), - Message::SubscriptionResponse | Message::Pong => Ok(String::default()), + Message::SubscriptionResponse + | Message::Pong + | Message::Error(_) + | Message::Post(_) => Ok(String::default()), Message::NoData => Ok("".to_string()), Message::HyperliquidError(err) => Ok(format!("hyperliquid error: {err:?}")), } @@ -302,44 +320,93 @@ impl WsManager { async fn parse_and_send_data( data: std::result::Result, subscriptions: &Arc>>>, + pending_responses: &Arc< + Mutex>>, + >, ) -> Result<()> { match data { - Ok(data) => match data.into_text() { - Ok(data) => { - if !data.starts_with('{') { - return Ok(()); - } - let message = serde_json::from_str::(&data) - .map_err(|e| Error::JsonParse(e.to_string()))?; - let identifier = WsManager::get_identifier(&message)?; - if identifier.is_empty() { - return Ok(()); - } + Ok(data) => { + match data.into_text() { + Ok(data) => { + if !data.starts_with('{') { + return Ok(()); + } + match serde_json::from_str::(&data) { + Ok(json_value) => match WsResponse::try_from(json_value.clone()) { + Ok(response) => match response { + WsResponse::Post(post_response) => { + let id = post_response.data.id; + let mut pending = pending_responses.lock().await; + if let Some(sender) = pending.remove(&id) { + if sender.send(json_value).is_err() { + warn!("Failed to send POST response - receiver dropped"); + } + return Ok(()); + } + } + WsResponse::Error(error_response) => { + let id = error_response.data.id; + let mut pending = pending_responses.lock().await; + if let Some(sender) = pending.remove(&id) { + if sender.send(json_value).is_err() { + warn!("Failed to send error response - receiver dropped"); + } + return Ok(()); + } + } + WsResponse::Other(value) => { + if let Some(id) = value.get("id").and_then(|v| v.as_u64()) { + let mut pending = pending_responses.lock().await; + if let Some(sender) = pending.remove(&id) { + if sender.send(value).is_err() { + warn!("Failed to send response - receiver dropped"); + } + return Ok(()); + } + } + } + }, + Err(e) => { + warn!("Failed to parse response: {}", e); + } + }, + Err(e) => { + warn!("Failed to parse JSON: {}", e); + } + } + + let message = serde_json::from_str::(&data) + .map_err(|e| Error::JsonParse(e.to_string()))?; + let identifier = WsManager::get_identifier(&message)?; + if identifier.is_empty() { + return Ok(()); + } - let mut subscriptions = subscriptions.lock().await; - let mut res = Ok(()); - if let Some(subscription_datas) = subscriptions.get_mut(&identifier) { - for subscription_data in subscription_datas { - if let Err(e) = subscription_data - .sending_channel - .send(message.clone()) - .map_err(|e| Error::WsSend(e.to_string())) - { - res = Err(e); + let mut subscriptions = subscriptions.lock().await; + let mut res = Ok(()); + if let Some(subscription_datas) = subscriptions.get_mut(&identifier) { + for subscription_data in subscription_datas { + if let Err(e) = subscription_data + .sending_channel + .send(message.clone()) + .map_err(|e| Error::WsSend(e.to_string())) + { + res = Err(e); + } } } + res + } + Err(err) => { + let error = Error::ReaderTextConversion(err.to_string()); + Ok(WsManager::send_to_all_subscriptions( + subscriptions, + Message::HyperliquidError(error.to_string()), + ) + .await?) } - res - } - Err(err) => { - let error = Error::ReaderTextConversion(err.to_string()); - Ok(WsManager::send_to_all_subscriptions( - subscriptions, - Message::HyperliquidError(error.to_string()), - ) - .await?) } - }, + } Err(err) => { let error = Error::GenericReader(err.to_string()); Ok(WsManager::send_to_all_subscriptions( @@ -488,6 +555,59 @@ impl WsManager { } Ok(()) } + pub async fn post( + &mut self, + payload: serde_json::Value, + nonce: u64, + ) -> Result { + // Changed return type + let request_id = nonce; + + let ws_request = WsPostRequest { + method: "post".to_string(), + id: request_id, + request: WsRequest { + type_: "action".to_string(), + payload: payload, + }, + }; + + let request_json = + serde_json::to_string(&ws_request).map_err(|e| Error::JsonParse(e.to_string()))?; + info!("Sending POST request: {}", request_json); + let (tx, rx) = tokio::sync::oneshot::channel(); + { + let mut pending = self.pending_responses.lock().await; + pending.insert(request_id, tx); + } + { + let mut writer = self.writer.lock().await; + writer + .send(protocol::Message::Text(request_json)) + .await + .map_err(|e| Error::Websocket(e.to_string()))?; + } + + match tokio::time::timeout(Duration::from_secs(10), rx).await { + Ok(Ok(response)) => { + info!("Received POST response: {:?}", response); + let ws_response: WsPostResponse = + serde_json::from_value(response).map_err(|e| { + Error::JsonParse(format!("Failed to parse WsPostResponse: {}", e)) + })?; + Ok(ws_response) + } + Ok(Err(_)) => { + Err(Error::JsonParse("Response channel closed".to_string())) + } + Err(_) => { + self.pending_responses.lock().await.remove(&request_id); + Err(Error::JsonParse( + "Request timed out after 10 seconds".to_string(), + )) + } + } + } } impl Drop for WsManager {