From c2fe657e042b332354a38d5c10600617d1c9c527 Mon Sep 17 00:00:00 2001 From: danda Date: Mon, 27 May 2024 17:24:01 -0700 Subject: [PATCH] perf: replace RwLock with ArcSwap ArcSwap is lock-free and optimized for our use-case of lots of concurrent reads and infrequent updates. So it is pretty close to a shared-nothing architecture. Some small changes were necessary, but its mostly a drop-in replacement for RwLock. --- Cargo.lock | 7 +++++++ Cargo.toml | 1 + src/alert_email.rs | 2 +- src/html/page/block.rs | 2 +- src/html/page/redirect_qs_to_path.rs | 2 +- src/html/page/root.rs | 2 +- src/html/page/utxo.rs | 2 +- src/main.rs | 2 +- src/model/app_state.rs | 31 ++++++++++++++++++++++++---- src/neptune_rpc.rs | 8 +++---- src/rpc/block_digest.rs | 3 +-- src/rpc/block_info.rs | 3 +-- src/rpc/utxo_digest.rs | 3 +-- 13 files changed, 47 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7aa94cc..e8bb56a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayref" version = "0.3.7" @@ -1934,6 +1940,7 @@ name = "neptune-explorer" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "axum 0.7.5", "boilerplate", "chrono", diff --git a/Cargo.toml b/Cargo.toml index db63ea9..b114543 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ chrono = "0.4.34" # only should be used inside main.rs, for the binary. anyhow = "1.0.86" +arc-swap = "1.7.1" [patch.crates-io] # 694f27daf78aade0ed0dc07e3babaab036cd5572 is tip of branch: master as of 2024-04-30 diff --git a/src/alert_email.rs b/src/alert_email.rs index ceddd55..566fe5b 100644 --- a/src/alert_email.rs +++ b/src/alert_email.rs @@ -15,7 +15,7 @@ pub async fn send( subject: &str, body: String, ) -> std::result::Result { - match state.read().await.config.alert_config() { + match state.load().config.alert_config() { None => { warn!("Alert emails disabled. alert not sent. consider confiuring smtp parameters. subject: {subject}"); Ok(false) diff --git a/src/html/page/block.rs b/src/html/page/block.rs index 6c4b6b3..1298a94 100644 --- a/src/html/page/block.rs +++ b/src/html/page/block.rs @@ -24,7 +24,7 @@ pub async fn block_page( header: HeaderHtml<'a>, block_info: BlockInfo, } - let state = &*state_rw.read().await; + let state = &state_rw.load(); let Path(block_selector) = user_input_maybe.map_err(|e| not_found_html_response(state, Some(e.to_string())))?; diff --git a/src/html/page/redirect_qs_to_path.rs b/src/html/page/redirect_qs_to_path.rs index 0d1cc8d..3924055 100644 --- a/src/html/page/redirect_qs_to_path.rs +++ b/src/html/page/redirect_qs_to_path.rs @@ -76,7 +76,7 @@ pub async fn redirect_query_string_to_path( RawQuery(raw_query_option): RawQuery, State(state_rw): State>, ) -> Result { - let state = &*state_rw.read().await; + let state = &state_rw.load(); let not_found = || not_found_html_response(state, None); diff --git a/src/html/page/root.rs b/src/html/page/root.rs index f762df6..2244ed7 100644 --- a/src/html/page/root.rs +++ b/src/html/page/root.rs @@ -18,7 +18,7 @@ pub async fn root(State(state_rw): State>) -> Result, state: &'a AppStateInner, } - let state = &*state_rw.read().await; + let state = &state_rw.load(); let tip_height = state .rpc_client diff --git a/src/html/page/utxo.rs b/src/html/page/utxo.rs index bcdac60..6085164 100644 --- a/src/html/page/utxo.rs +++ b/src/html/page/utxo.rs @@ -25,7 +25,7 @@ pub async fn utxo_page( digest: Digest, } - let state = &*state_rw.read().await; + let state = &state_rw.load(); let Path(index) = index_maybe.map_err(|e| not_found_html_response(state, Some(e.to_string())))?; diff --git a/src/main.rs b/src/main.rs index 8f72544..a0e7449 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,7 +28,7 @@ async fn main() -> Result<(), anyhow::Error> { let routes = setup_routes(app_state.clone()); - let port = app_state.read().await.config.listen_port; + let port = app_state.load().config.listen_port; let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{port}")) .await .with_context(|| format!("Failed to bind to port {port}"))?; diff --git a/src/model/app_state.rs b/src/model/app_state.rs index 576234c..deae913 100644 --- a/src/model/app_state.rs +++ b/src/model/app_state.rs @@ -1,13 +1,13 @@ use crate::model::config::Config; use crate::neptune_rpc; use anyhow::Context; +use arc_swap::ArcSwap; use clap::Parser; use neptune_core::config_models::network::Network; use neptune_core::models::blockchain::block::block_selector::BlockSelector; use neptune_core::prelude::twenty_first::math::digest::Digest; use neptune_core::rpc_server::RPCClient; use std::sync::Arc; -use tokio::sync::RwLock; pub struct AppStateInner { pub network: Network, @@ -17,10 +17,10 @@ pub struct AppStateInner { } #[derive(Clone)] -pub struct AppState(Arc>); +pub struct AppState(Arc>); impl std::ops::Deref for AppState { - type Target = Arc>; + type Target = Arc>; fn deref(&self) -> &Self::Target { &self.0 @@ -31,7 +31,7 @@ impl From<(Network, Config, RPCClient, Digest)> for AppState { fn from( (network, config, rpc_client, genesis_digest): (Network, Config, RPCClient, Digest), ) -> Self { - Self(Arc::new(RwLock::new(AppStateInner { + Self(Arc::new(ArcSwap::from_pointee(AppStateInner { network, config, rpc_client, @@ -62,4 +62,27 @@ impl AppState { genesis_digest, ))) } + + /// Sets the rpc_client + /// + /// This method exists because it is sometimes necessary + /// to re-establish connection to the neptune RPC server. + /// + /// This is achieved via ArcSwap which is faster than + /// RwLock for our use-case that is heavy reads and few + /// if any mutations. ArcSwap is effectively lock-free. + /// + /// Note that this method takes &self, so interior + /// mutability occurs. + pub fn set_rpc_client(&self, rpc_client: RPCClient) { + let inner = self.0.load(); + + let new_inner = AppStateInner { + rpc_client, + network: inner.network, + config: inner.config.clone(), + genesis_digest: inner.genesis_digest, + }; + self.0.store(Arc::new(new_inner)); + } } diff --git a/src/neptune_rpc.rs b/src/neptune_rpc.rs index f74fc26..f91cec7 100644 --- a/src/neptune_rpc.rs +++ b/src/neptune_rpc.rs @@ -42,7 +42,7 @@ pub async fn watchdog(app_state: AppState) { let app_started = chrono::offset::Utc::now(); let mut was_connected = true; let mut since = chrono::offset::Utc::now(); - let watchdog_secs = app_state.read().await.config.neptune_rpc_watchdog_secs; + let watchdog_secs = app_state.load().config.neptune_rpc_watchdog_secs; debug!("neptune-core rpc watchdog started"); @@ -50,8 +50,7 @@ pub async fn watchdog(app_state: AppState) { tokio::time::sleep(tokio::time::Duration::from_secs(watchdog_secs)).await; let result = app_state - .read() - .await + .load() .rpc_client .network(context::current()) .await; @@ -94,8 +93,7 @@ pub async fn watchdog(app_state: AppState) { if !now_connected { if let Ok(c) = gen_rpc_client().await { - let mut state = app_state.write().await; - state.rpc_client = c; + app_state.set_rpc_client(c); } } } diff --git a/src/rpc/block_digest.rs b/src/rpc/block_digest.rs index 878ff95..c114055 100644 --- a/src/rpc/block_digest.rs +++ b/src/rpc/block_digest.rs @@ -16,8 +16,7 @@ pub async fn block_digest( State(state): State>, ) -> Result, impl IntoResponse> { match state - .read() - .await + .load() .rpc_client .block_digest(context::current(), selector.into()) .await diff --git a/src/rpc/block_info.rs b/src/rpc/block_info.rs index a278fb3..4070e41 100644 --- a/src/rpc/block_info.rs +++ b/src/rpc/block_info.rs @@ -16,8 +16,7 @@ pub async fn block_info( State(state): State>, ) -> Result, Response> { let block_info = state - .read() - .await + .load() .rpc_client .block_info(context::current(), selector.into()) .await diff --git a/src/rpc/utxo_digest.rs b/src/rpc/utxo_digest.rs index 1bc579d..5437093 100644 --- a/src/rpc/utxo_digest.rs +++ b/src/rpc/utxo_digest.rs @@ -17,8 +17,7 @@ pub async fn utxo_digest( State(state): State>, ) -> Result, impl IntoResponse> { match state - .read() - .await + .load() .rpc_client .utxo_digest(context::current(), index) .await