Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ rng = { path = "crates/rng" }

# Core dependencies
best-practices = { version = "0.1.0", git = "https://github.com/cryptidtech/best-practices.git" }
blsful = "3.0.0"
blockstore = "0.7.1"
cid = "0.11.0"
criterion = "0.5.1"
Expand Down
114 changes: 56 additions & 58 deletions crates/bs-p2p/src/events/api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
//! The Events API for interacting witht he netowrk events.
use crate::events::delay;
//! The Events API for interacting witht he netowrk events.
pub use crate::behaviour::req_res::{PeerRequest, PeerResponse};
use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::events::delay;
use crate::events::timeout::with_timeout;
use libp2p::Multiaddr;
use provenance_log::resolver::{Resolver, SuperResolver};
use crate::events::{NetworkError, PublicEvent};
use crate::behaviour::{Behaviour, BehaviourEvent};
use crate::Error;
use blockstore::Blockstore;
use futures::stream::StreamExt;
Expand All @@ -23,7 +21,9 @@ use libp2p::kad::{InboundRequest, Record};
pub use libp2p::multiaddr::Protocol;
use libp2p::request_response::{self, OutboundRequestId, ResponseChannel};
use libp2p::swarm::{Swarm, SwarmEvent};
use libp2p::{identify, kad, ping, PeerId, };
use libp2p::Multiaddr;
use libp2p::{identify, kad, ping, PeerId};
use provenance_log::resolver::{Resolver, SuperResolver};
use std::collections::{HashMap, HashSet};
use std::net::Ipv4Addr;
use std::pin::Pin;
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Client {
receiver.await.map_err(Error::OneshotCanceled)
}

/// Put a record on the DHT
/// Put a record on the DHT
pub async fn put_record(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), Error> {
self.command_sender
.send(NetworkCommand::PutRecord { key, value })
Expand Down Expand Up @@ -218,14 +218,14 @@ impl Resolver for Client {
fn resolve(
&self,
cid: &multicid::Cid,
// ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Self::Error>> + CondSend>> {
// ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, Self::Error>> + CondSend>> {
) -> Pin<Box<dyn SuperResolver<'_, Self> + '_>> {
tracing::debug!("DefaultBsPeer Resolving CID over bitswap: {}", cid);
let cid_bytes: Vec<u8> = cid.clone().into();
let client = self.clone();
Box::pin(async move {
with_timeout(client.get_bits(cid_bytes), Duration::from_secs(10)).await?
})
Box::pin(
async move { with_timeout(client.get_bits(cid_bytes), Duration::from_secs(10)).await? },
)
}
}
/// PeerPiper Network Commands (Libp2p)
Expand Down Expand Up @@ -307,7 +307,6 @@ pub enum Libp2pEvent {
PutRecordRequest { source: PeerId },
}


/// The network event loop.
/// Handles all the network logic for us.
pub struct EventLoop<B: Blockstore + 'static> {
Expand Down Expand Up @@ -457,25 +456,29 @@ impl<B: Blockstore> EventLoop<B> {

// pass the address back to the other task, for display, etc.
self.event_sender
.try_send(PublicEvent::ListenAddr {
address: p2p_addr,
})
.try_send(PublicEvent::ListenAddr { address: p2p_addr })
};
// Protocol::Ip is the first item in the address vector
match address.iter().next() {
Some(Protocol::Ip6(ip6)) => {
// Only add our globally available IPv6 addresses to the external addresses list.
if !ip6.is_loopback()
&& !ip6.is_unspecified()
&& !ip6.is_unspecified()
&& !ip6.is_multicast()
&& (ip6.segments()[0] & 0xffc0) != 0xfe80 // no fe80::/10 addresses, (!ip6.is_unicast_link_local() requires nightly)
&& (ip6.segments()[0] & 0xfe00) != 0xfc00 // Unique Local Addresses (ULAs, fd00::/8) are private IPv6 addresses and should not be advertised.
&& (ip6.segments()[0] & 0xfe00) != 0xfc00
// Unique Local Addresses (ULAs, fd00::/8) are private IPv6 addresses and should not be advertised.
{
addr_handler()?;
}
}
Some(Protocol::Ip4(ip4)) => {
if !(ip4.is_loopback() || ip4.is_unspecified() || ip4.is_private() || ip4.is_multicast() || ip4 == Ipv4Addr::LOCALHOST || ip4.octets()[0] & 240 == 240 && !ip4.is_broadcast())
if !(ip4.is_loopback()
|| ip4.is_unspecified()
|| ip4.is_private()
|| ip4.is_multicast()
|| ip4 == Ipv4Addr::LOCALHOST
|| ip4.octets()[0] & 240 == 240 && !ip4.is_broadcast())
{
addr_handler()?;
}
Expand Down Expand Up @@ -506,7 +509,9 @@ impl<B: Blockstore> EventLoop<B> {
.await
{
tracing::error!("Failed to send NewConnection event: {e}");
return Err(Error::SendFailure("Failed to send NewConnection event".to_string()));
return Err(Error::SendFailure(
"Failed to send NewConnection event".to_string(),
));
}
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
Expand Down Expand Up @@ -591,7 +596,12 @@ impl<B: Blockstore> EventLoop<B> {

// Send ACK back to the sender
let ack_topic = format!("ack/{}", message.topic.to_string());
if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(libp2p::gossipsub::IdentTopic::new(&ack_topic), message.data) {
if let Err(e) = self
.swarm
.behaviour_mut()
.gossipsub
.publish(libp2p::gossipsub::IdentTopic::new(&ack_topic), message.data)
{
tracing::error!("Failed to publish ACK: {e}");
}
}
Expand Down Expand Up @@ -622,27 +632,29 @@ impl<B: Blockstore> EventLoop<B> {
.kad
.store_mut()
.get(&libp2p::kad::RecordKey::new(&key))
.map(|record| record.into_owned()) {
tracing::debug!("Found record for key {:?}: {:?}", key, record);
// Publish the record to the topic
if let Err(e) = self
.swarm
.behaviour_mut()
.gossipsub
.publish(topic, record.value.clone())
{
tracing::error!("Failed to publish record to topic: {e}");
}
}
.map(|record| record.into_owned())
{
tracing::debug!("Found record for key {:?}: {:?}", key, record);
// Publish the record to the topic
if let Err(e) = self
.swarm
.behaviour_mut()
.gossipsub
.publish(topic, record.value.clone())
{
tracing::error!("Failed to publish record to topic: {e}");
}
}
}
SwarmEvent::Behaviour(BehaviourEvent::PeerRequest(
request_response::Event::Message { message, .. },
)) => match message {
request_response::Message::Request {
request, channel: _, ..
request,
channel: _,
..
} => {
tracing::debug!("Received request: {:?}", &request);

}
request_response::Message::Response {
request_id,
Expand Down Expand Up @@ -768,7 +780,6 @@ impl<B: Blockstore> EventLoop<B> {
result,
..
})) => {

tracing::debug!("Got Kad QueryProgressed: {:?}", result);
match result {
kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders {
Expand Down Expand Up @@ -829,22 +840,14 @@ impl<B: Blockstore> EventLoop<B> {
})) => {
tracing::debug!("Kademlia Inbound Request: {:?}", request);
match request {
InboundRequest::PutRecord {
source,
record,
..
} => {
InboundRequest::PutRecord { source, record, .. } => {
tracing::info!("Received PutRecordRequest from: {:?}", source);

// TODO: Filter Providers based on criteria?
// for now, add the provider to the DHT as is
if let Some(rec) = record {
if let Err(e) = self
.swarm
.behaviour_mut()
.kad
.store_mut()
.put(rec.clone())
if let Err(e) =
self.swarm.behaviour_mut().kad.store_mut().put(rec.clone())
{
tracing::error!("Failed to add provider to DHT: {e}");
}
Expand All @@ -853,9 +856,7 @@ impl<B: Blockstore> EventLoop<B> {
// send evt to external handler plugins to decide whether to include record or not:
if let Err(e) = self
.event_sender
.send(PublicEvent::Swarm(Libp2pEvent::PutRecordRequest {
source,
}))
.send(PublicEvent::Swarm(Libp2pEvent::PutRecordRequest { source }))
.await
{
tracing::error!("Failed to send PutRecordRequest event: {e}");
Expand Down Expand Up @@ -928,7 +929,7 @@ impl<B: Blockstore> EventLoop<B> {
}
}
}
},
}
event => {
tracing::debug!("Other type of event: {:?}", event);
}
Expand Down Expand Up @@ -1031,12 +1032,9 @@ impl<B: Blockstore> EventLoop<B> {
.with(Protocol::P2p(*self.swarm.local_peer_id()));

// emit as Event
if let Err(e) = self
.event_sender
.try_send(PublicEvent::ListenAddr {
address: p2p_addr.clone(),
})
{
if let Err(e) = self.event_sender.try_send(PublicEvent::ListenAddr {
address: p2p_addr.clone(),
}) {
tracing::error!("Failed to send share address event: {e}");
}
}
Expand Down Expand Up @@ -1082,7 +1080,8 @@ impl<B: Blockstore> EventLoop<B> {
.swarm
.behaviour_mut()
.kad
.put_record(record, kad::Quorum::One) {
.put_record(record, kad::Quorum::One)
{
tracing::error!("Failed to put record: {e}");
}
}
Expand All @@ -1104,4 +1103,3 @@ impl<B: Blockstore> EventLoop<B> {
}
}
}

4 changes: 1 addition & 3 deletions crates/comrade-component/src/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,9 +1213,7 @@ macro_rules! __export_wacc_impl {
#[doc(inline)]
pub(crate) use __export_wacc_impl as export;
#[cfg(target_arch = "wasm32")]
#[unsafe(
link_section = "component-type:wit-bindgen:0.41.0:comrade:api:wacc:encoded world"
)]
#[unsafe(link_section = "component-type:wit-bindgen:0.41.0:comrade:api:wacc:encoded world")]
#[doc(hidden)]
#[allow(clippy::octal_escapes)]
pub static __WIT_BINDGEN_COMPONENT_TYPE: [u8; 632] = *b"\
Expand Down
2 changes: 1 addition & 1 deletion crates/multikey/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ wasm = ["getrandom/wasm_js"] # needed for CI testing on wasm32-unknown-unknown

[dependencies]
bcrypt-pbkdf = "0.10"
blsful = "2.5"
blsful.workspace = true
chacha20poly1305 = "0.10.1"
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
elliptic-curve.workspace = true
Expand Down
34 changes: 23 additions & 11 deletions crates/multikey/src/mk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,14 +672,17 @@ impl Builder {
}
};
let key_share = bls12381::KeyShare::try_from(key_bytes.as_ref())?;
let identifier: Vec<u8> = Varuint(key_share.0).into();
let identifier: Vec<u8> = key_share.0 .0.to_be_bytes().to_vec();
let threshold: Vec<u8> = Varuint::<usize>(key_share.1.into()).into();
let limit: Vec<u8> = Varuint::<usize>(key_share.2.into()).into();
let mut attributes = Attributes::new();
attributes.insert(AttrId::ShareIdentifier, identifier.into());
attributes.insert(AttrId::Threshold, threshold.into());
attributes.insert(AttrId::Limit, limit.into());
attributes.insert(AttrId::KeyData, key_share.3.into());
attributes.insert(
AttrId::KeyData,
key_share.3 .0.to_be_bytes().to_vec().into(),
);
Ok(Builder {
codec: Codec::Bls12381G1PubShare,
comment: Some(sshkey.comment().to_string()),
Expand Down Expand Up @@ -717,16 +720,19 @@ impl Builder {
}
};
let key_share = bls12381::KeyShare::try_from(key_bytes.as_ref())?;
let identifier: Vec<u8> = Varuint(key_share.0).into();
let identifier: Vec<u8> = key_share.0 .0.to_be_bytes().to_vec();
let threshold: Vec<u8> = Varuint::<usize>(key_share.1.into()).into();
let limit: Vec<u8> = Varuint::<usize>(key_share.2.into()).into();
let mut attributes = Attributes::new();
attributes.insert(AttrId::ShareIdentifier, identifier.into());
attributes.insert(AttrId::Threshold, threshold.into());
attributes.insert(AttrId::Limit, limit.into());
attributes.insert(AttrId::KeyData, key_share.3.into());
attributes.insert(
AttrId::KeyData,
key_share.3 .0.to_be_bytes().to_vec().into(),
);
Ok(Builder {
codec: Codec::Bls12381G1PubShare,
codec: Codec::Bls12381G2PubShare,
comment: Some(sshkey.comment().to_string()),
attributes: Some(attributes),
..Default::default()
Expand Down Expand Up @@ -887,14 +893,17 @@ impl Builder {
}
};
let key_share = bls12381::KeyShare::try_from(key_bytes.as_ref())?;
let identifier: Vec<u8> = Varuint(key_share.0).into();
let identifier: Vec<u8> = key_share.0 .0.to_be_bytes().to_vec();
let threshold: Vec<u8> = Varuint::<usize>(key_share.1.into()).into();
let limit: Vec<u8> = Varuint::<usize>(key_share.2.into()).into();
let mut attributes = Attributes::new();
attributes.insert(AttrId::ShareIdentifier, identifier.into());
attributes.insert(AttrId::Threshold, threshold.into());
attributes.insert(AttrId::Limit, limit.into());
attributes.insert(AttrId::KeyData, key_share.3.into());
attributes.insert(
AttrId::KeyData,
key_share.3 .0.to_be_bytes().to_vec().into(),
);
Ok(Builder {
codec: Codec::Bls12381G1PrivShare,
comment: Some(sshkey.comment().to_string()),
Expand Down Expand Up @@ -932,14 +941,17 @@ impl Builder {
}
};
let key_share = bls12381::KeyShare::try_from(key_bytes.as_ref())?;
let identifier: Vec<u8> = Varuint(key_share.0).into();
let identifier: Vec<u8> = key_share.0 .0.to_be_bytes().to_vec();
let threshold: Vec<u8> = Varuint::<usize>(key_share.1.into()).into();
let limit: Vec<u8> = Varuint::<usize>(key_share.2.into()).into();
let mut attributes = Attributes::new();
attributes.insert(AttrId::ShareIdentifier, identifier.into());
attributes.insert(AttrId::Threshold, threshold.into());
attributes.insert(AttrId::Limit, limit.into());
attributes.insert(AttrId::KeyData, key_share.3.into());
attributes.insert(
AttrId::KeyData,
key_share.3 .0.to_be_bytes().to_vec().into(),
);
Ok(Builder {
codec: Codec::Bls12381G2PrivShare,
comment: Some(sshkey.comment().to_string()),
Expand Down Expand Up @@ -1052,8 +1064,8 @@ impl Builder {
}

/// add in the share identifier value
pub fn with_identifier(self, identifier: u8) -> Self {
self.with_attribute(AttrId::ShareIdentifier, &Varuint(identifier).into())
pub fn with_identifier(self, identifier: impl AsRef<[u8]>) -> Self {
self.with_attribute(AttrId::ShareIdentifier, &identifier.as_ref().to_vec())
}

/// add in the threshold data
Expand Down
2 changes: 1 addition & 1 deletion crates/multikey/src/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub trait ThresholdAttrView {
/// get the limit value for the multikey
fn limit(&self) -> Result<NonZeroUsize, Error>;
/// get the share identifier for the multikey
fn identifier(&self) -> Result<u8, Error>;
fn identifier(&self) -> Result<&[u8], Error>;
/// get the codec-specific threshold data
fn threshold_data(&self) -> Result<&[u8], Error>;
}
Expand Down
Loading