diff --git a/Cargo.lock b/Cargo.lock index feaad2a..5e036c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -920,8 +920,10 @@ dependencies = [ "async-trait", "axum", "axum-test", + "babel-api", "clap", "eyre", + "jsonrpsee", "metrics", "metrics-derive", "metrics-exporter-prometheus", @@ -936,6 +938,18 @@ dependencies = [ "url", ] +[[package]] +name = "babel-api" +version = "0.1.0" +dependencies = [ + "eyre", + "jsonrpsee", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "base16ct" version = "0.2.0" @@ -2255,6 +2269,7 @@ dependencies = [ "http", "hyper", "hyper-util", + "log", "rustls", "rustls-native-certs", "rustls-pki-types", @@ -2619,6 +2634,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e281ae70cc3b98dac15fced3366a880949e65fc66e345ce857a5682d152f3e62" dependencies = [ "jsonrpsee-core", + "jsonrpsee-http-client", "jsonrpsee-proc-macros", "jsonrpsee-server", "jsonrpsee-types", @@ -2676,6 +2692,31 @@ dependencies = [ "tracing", ] +[[package]] +name = "jsonrpsee-http-client" +version = "0.24.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f50c389d6e6a52eb7c3548a6600c90cf74d9b71cb5912209833f00a5479e9a01" +dependencies = [ + "async-trait", + "base64", + "http-body", + "hyper", + "hyper-rustls", + "hyper-util", + "jsonrpsee-core", + "jsonrpsee-types", + "rustls", + "rustls-platform-verifier", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tower 0.4.13", + "tracing", + "url", +] + [[package]] name = "jsonrpsee-proc-macros" version = "0.24.10" diff --git a/Cargo.toml b/Cargo.toml index 39c02e4..1259393 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/catalog", "crates/template", "crates/babel", + "crates/babel-api", "crates/fetcher", "crates/fetcher-api", "crates/cosmos-keys" @@ -18,6 +19,7 @@ runtime-docker-compose = { path = "crates/runtime-docker-compose" } runtime-trait = { path = "crates/runtime-trait" } catalog = { path = "crates/catalog" } template = { path = "crates/template" } +babel-api = { path = "crates/babel-api" } fetcher-api = { path = "crates/fetcher-api" } cosmos-keys = { path = "crates/cosmos-keys" } @@ -34,6 +36,6 @@ tinytemplate = "1.2" askama = "0.14.0" clap = { version = "4.5", features = ["derive"] } reqwest = { version = "0.12", default-features = false } -jsonrpsee = { version = "0.24", features = ["server", "ws-client", "macros"] } +jsonrpsee = { version = "0.24", features = ["server", "ws-client", "http-client", "macros"] } tracing = "0.1" tracing-subscriber = "0.3" diff --git a/crates/babel-api/Cargo.toml b/crates/babel-api/Cargo.toml new file mode 100644 index 0000000..0847457 --- /dev/null +++ b/crates/babel-api/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "babel-api" +version = "0.1.0" +edition = "2021" + +[dependencies] +jsonrpsee = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +eyre = { workspace = true } +tracing = "0.1" diff --git a/crates/babel-api/src/lib.rs b/crates/babel-api/src/lib.rs new file mode 100644 index 0000000..cb21b14 --- /dev/null +++ b/crates/babel-api/src/lib.rs @@ -0,0 +1,21 @@ +use jsonrpsee::proc_macros::rpc; +use serde::{Deserialize, Serialize}; + +/// Status response +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct Status { + pub peers: u64, + pub current_block_number: u64, + pub is_syncing: bool, + pub latest_block_number: Option, + pub is_ready: bool, + pub is_healthy: bool, +} + +/// JSON-RPC API for Babel status +#[rpc(server, client)] +pub trait BabelApi { + /// Get comprehensive status + #[method(name = "status")] + async fn status(&self) -> Result; +} diff --git a/crates/babel/Cargo.toml b/crates/babel/Cargo.toml index 92c3bbb..b6d1ab0 100644 --- a/crates/babel/Cargo.toml +++ b/crates/babel/Cargo.toml @@ -13,8 +13,10 @@ serde = { workspace = true } serde_json = { workspace = true } eyre = { workspace = true } async-trait = { workspace = true } +babel-api = { workspace = true } url = "2.5.7" +jsonrpsee = { workspace = true } # HTTP server axum = "0.8" diff --git a/crates/babel/bin/main.rs b/crates/babel/bin/main.rs index 10be5a7..462008c 100644 --- a/crates/babel/bin/main.rs +++ b/crates/babel/bin/main.rs @@ -13,10 +13,14 @@ struct Cli { #[arg(long)] rpc_url: String, - /// Server bind address + /// REST server bind address #[arg(long, default_value = "127.0.0.1:3000")] addr: String, + /// RPC server bind address + #[arg(long, default_value = "127.0.0.1:3001")] + rpc_addr: String, + /// Node name for metrics labels (defaults to hostname/IP from rpc_url) #[arg(long)] nodename: Option, @@ -56,21 +60,18 @@ async fn main() -> eyre::Result<()> { nodename, ); - match cli.node_type.as_str() { + let server = match cli.node_type.as_str() { "ethereum" => { let babel = EthereumBabel::new(cli.rpc_url).await?; - let server = BabelServer::new(babel, nodename); - server.serve(&cli.addr).await?; + BabelServer::new(babel, nodename) } "ethereum_beacon" => { let babel = EthereumBeaconBabel::new(cli.rpc_url); - let server = BabelServer::new(babel, nodename); - server.serve(&cli.addr).await?; + BabelServer::new(babel, nodename) } "cosmos" => { let babel = CosmosBabel::new(cli.rpc_url); - let server = BabelServer::new(babel, nodename); - server.serve(&cli.addr).await?; + BabelServer::new(babel, nodename) } _ => { return Err(eyre::eyre!( @@ -78,7 +79,14 @@ async fn main() -> eyre::Result<()> { cli.node_type )); } - } + }; + + // Start RPC server in background + let rpc_addr = cli.rpc_addr.parse()?; + let (_rpc_handle, _rpc_local_addr) = babel::rpc::start_rpc_server(rpc_addr, server.cached_status()).await?; + + // Start REST server + server.serve(&cli.addr).await?; Ok(()) } diff --git a/crates/babel/src/lib.rs b/crates/babel/src/lib.rs index 7ddf3cf..cd2e8de 100644 --- a/crates/babel/src/lib.rs +++ b/crates/babel/src/lib.rs @@ -1,5 +1,7 @@ use async_trait::async_trait; -use serde::{Deserialize, Serialize}; + +// Re-export Status from babel-api +pub use babel_api::Status; /// Core trait for blockchain node health checks #[async_trait] @@ -8,21 +10,11 @@ pub trait Babel: Send + Sync { async fn status(&self) -> eyre::Result; } -/// Status response -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct Status { - pub peers: u64, - pub current_block_number: u64, - pub is_syncing: bool, - pub latest_block_number: Option, - pub is_ready: bool, - pub is_healthy: bool, -} - pub mod cosmos; pub mod ethereum; pub mod ethereum_beacon; pub mod metrics; +pub mod rpc; pub mod server; mod utils; diff --git a/crates/babel/src/rpc.rs b/crates/babel/src/rpc.rs new file mode 100644 index 0000000..d6e81a4 --- /dev/null +++ b/crates/babel/src/rpc.rs @@ -0,0 +1,49 @@ +use babel_api::{BabelApiServer, Status}; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Implementation of the Babel RPC server +pub struct BabelRpcServer { + cached_status: Arc>>, +} + +impl BabelRpcServer { + pub fn new(cached_status: Arc>>) -> Self { + Self { cached_status } + } +} + +#[jsonrpsee::core::async_trait] +impl BabelApiServer for BabelRpcServer { + async fn status(&self) -> Result { + self.cached_status + .read() + .await + .clone() + .ok_or_else(|| { + jsonrpsee::types::ErrorObjectOwned::owned( + 1, + "Status not yet available", + None::<()>, + ) + }) + } +} + +/// Start the RPC server +pub async fn start_rpc_server( + addr: std::net::SocketAddr, + cached_status: Arc>>, +) -> eyre::Result<(jsonrpsee::server::ServerHandle, std::net::SocketAddr)> { + use jsonrpsee::server::Server; + + let server = Server::builder().build(addr).await?; + let local_addr = server.local_addr()?; + + let rpc_server = BabelRpcServer::new(cached_status); + let handle = server.start(rpc_server.into_rpc()); + + tracing::info!("Babel RPC server listening on {}", local_addr); + + Ok((handle, local_addr)) +} diff --git a/crates/babel/src/server.rs b/crates/babel/src/server.rs index 8cf83d8..f784537 100644 --- a/crates/babel/src/server.rs +++ b/crates/babel/src/server.rs @@ -1,5 +1,5 @@ use crate::{metrics::BabelMetrics, Babel, Status}; -use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Json, Router}; +use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Router}; use metrics_exporter_prometheus::PrometheusHandle; use std::sync::Arc; use tokio::sync::RwLock; @@ -20,19 +20,17 @@ impl BabelServer { pub fn new(babel: impl Babel + 'static, nodename: Option) -> Self { // Setup Prometheus exporter let builder = metrics_exporter_prometheus::PrometheusBuilder::new(); - let prometheus_handle = builder - .install_recorder() - .unwrap_or_else(|_| { - // In tests, recorder might already be installed - #[cfg(test)] - { - metrics_exporter_prometheus::PrometheusBuilder::new() - .build_recorder() - .handle() - } - #[cfg(not(test))] - panic!("failed to install Prometheus recorder") - }); + let prometheus_handle = builder.install_recorder().unwrap_or_else(|_| { + // In tests, recorder might already be installed + #[cfg(test)] + { + metrics_exporter_prometheus::PrometheusBuilder::new() + .build_recorder() + .handle() + } + #[cfg(not(test))] + panic!("failed to install Prometheus recorder") + }); let metrics = if let Some(nodename) = nodename { BabelMetrics::new_with_labels(&[("nodename", nodename)]) @@ -56,13 +54,16 @@ impl BabelServer { let prometheus_handle = self.prometheus_handle.clone(); Router::new() - .route("/status", get(status_handler)) .route("/ready", get(ready_handler)) .route("/healthy", get(healthy_handler)) .route("/metrics", get(move || metrics_handler(prometheus_handle))) .with_state(self.state) } + pub fn cached_status(&self) -> Arc>> { + self.state.cached_status.clone() + } + pub async fn serve(self, addr: &str) -> eyre::Result<()> { let listener = tokio::net::TcpListener::bind(addr).await?; tracing::info!("Babel server listening on {}", addr); @@ -103,18 +104,6 @@ impl BabelServer { } } -async fn status_handler(State(state): State) -> Result, AppError> { - // Return cached status - let status = state - .cached_status - .read() - .await - .clone() - .ok_or_else(|| eyre::eyre!("Status not yet available"))?; - - Ok(Json(status)) -} - async fn ready_handler(State(state): State) -> Result { let status = state .cached_status @@ -228,41 +217,57 @@ mod tests { } } - async fn setup_server_with_status(status: Status) -> axum_test::TestServer { + struct TestContext { + rest_client: axum_test::TestServer, + rpc_url: String, + _rpc_handle: jsonrpsee::server::ServerHandle, + } + + async fn setup_server_with_status(status: Status) -> TestContext { let mock_babel = MockBabel::new(status); let server = BabelServer::new(mock_babel, None); - // Start the status polling task let state = server.state.clone(); tokio::spawn(async move { BabelServer::status_polling_loop(state).await; }); + let cached_status = server.cached_status(); + let rpc_addr = "127.0.0.1:0".parse().unwrap(); + let (rpc_handle, rpc_local_addr) = crate::rpc::start_rpc_server(rpc_addr, cached_status) + .await + .unwrap(); + let rpc_url = format!("http://{}", rpc_local_addr); + let router = server.router(); - let client = axum_test::TestServer::new(router).unwrap(); + let rest_client = axum_test::TestServer::new(router).unwrap(); // Wait for status polling to run at least once tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await; - client + TestContext { + rest_client, + rpc_url, + _rpc_handle: rpc_handle, + } } #[tokio::test] async fn test_healthy_endpoint() { // Test healthy status - let client = setup_server_with_status(Status { + let ctx = setup_server_with_status(Status { is_healthy: true, ..Default::default() }) .await; - let response = client.get("/healthy").await; + let response = ctx.rest_client.get("/healthy").await; assert_eq!(response.status_code(), axum::http::StatusCode::OK); // Test unhealthy status - let client = setup_server_with_status(Status::default()).await; + let ctx = setup_server_with_status(Status::default()).await; - let response = client.get("/healthy").await; + let response = ctx.rest_client.get("/healthy").await; assert_eq!( response.status_code(), axum::http::StatusCode::SERVICE_UNAVAILABLE @@ -272,19 +277,19 @@ mod tests { #[tokio::test] async fn test_ready_endpoint() { // Test ready status - let client = setup_server_with_status(Status { + let ctx = setup_server_with_status(Status { is_ready: true, ..Default::default() }) .await; - let response = client.get("/ready").await; + let response = ctx.rest_client.get("/ready").await; assert_eq!(response.status_code(), axum::http::StatusCode::OK); // Test not ready status - let client = setup_server_with_status(Status::default()).await; + let ctx = setup_server_with_status(Status::default()).await; - let response = client.get("/ready").await; + let response = ctx.rest_client.get("/ready").await; assert_eq!( response.status_code(), axum::http::StatusCode::SERVICE_UNAVAILABLE @@ -292,8 +297,11 @@ mod tests { } #[tokio::test] - async fn test_status_endpoint() { - let client = setup_server_with_status(Status { + async fn test_rpc_status() { + use babel_api::BabelApiClient; + use jsonrpsee::http_client::HttpClientBuilder; + + let ctx = setup_server_with_status(Status { peers: 10, current_block_number: 500, is_syncing: true, @@ -303,10 +311,9 @@ mod tests { }) .await; - let response = client.get("/status").await; - assert_eq!(response.status_code(), axum::http::StatusCode::OK); + let client = HttpClientBuilder::default().build(&ctx.rpc_url).unwrap(); - let status: Status = response.json(); + let status = client.status().await.unwrap(); assert_eq!(status.peers, 10); assert_eq!(status.current_block_number, 500); assert_eq!(status.is_syncing, true);