Skip to content

Commit

Permalink
Merge pull request #9 from chainbound/feature/pubsub-v1
Browse files Browse the repository at this point in the history
Implement pubsub v1
  • Loading branch information
mempirate authored Oct 19, 2023
2 parents 10070e6 + cc9f79f commit cd35949
Show file tree
Hide file tree
Showing 31 changed files with 2,477 additions and 88 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
/target
.vscode/
*.svg
178 changes: 173 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
members = ["msg", "msg-socket", "msg-wire", "msg-transport"]

[workspace.package]
version = "0.1.0"
version = "0.1.0-alpha"
edition = "2021"
rust-version = "1.70" # Remember to update .clippy.toml and README.md
rust-version = "1.70" # Remember to update .clippy.toml and README.md
license = "MIT OR Apache-2.0"
description = "A flexible and lightweight messaging library for distributed systems"
authors = ["Jonas Bostoen", "Nicolas Racchi"]
homepage = "https:/chainbound/msg-rs"
repository = "https:/chainbound/msg-rs"
Expand All @@ -20,7 +21,7 @@ async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec"] }
futures = "0.3"
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["sync"] }
parking_lot = "0.12"

bytes = "1"
Expand All @@ -35,6 +36,9 @@ rustc-hash = "1"
opt-level = 1
overflow-checks = false

[profile.bench]
debug = true

[profile.maxperf]
inherits = "release"
debug = false
Expand Down
35 changes: 34 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ It was built because we needed a Rust-native messaging library like those above.

- [ ] Multiple socket types
- [x] Request/Reply
- [ ] Publish/Subscribe
- [x] Publish/Subscribe
- [ ] Channel
- [ ] Push/Pull
- [ ] Survey/Respond
Expand Down Expand Up @@ -71,6 +71,39 @@ async fn main() {
println!("Response: {:?}", res);
}
```

### Publish/Subscribe
```rust
use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{PubSocket, SubSocket, Tcp};

#[tokio::main]
async fn main() {
// Initialize the publisher socket (server side) with a transport
let mut pub_socket = PubSocket::new(Tcp::new());
pub_socket.bind("0.0.0.0:4444").await.unwrap();

// Initialize the subscriber socket (client side) with a transport
let mut sub_socket = SubSocket::new(Tcp::new());
sub_socket.connect("0.0.0.0:4444").await.unwrap();

let topic = "some_interesting_topic".to_string();

// Subscribe to a topic
sub_socket.subscribe(topic.clone()).await.unwrap();

tokio::spawn(async move {
// Values are `bytes::Bytes`
pub_socket.publish(topic, Bytes::from("hello_world")).await.unwrap();
});

let msg = sub_socket.next().await.unwrap();
println!("Received message: {:?}", msg);
}
```

## MSRV
The minimum supported Rust version is 1.70.

Expand Down
14 changes: 14 additions & 0 deletions msg-socket/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
use std::net::SocketAddr;
use tokio::io::{AsyncRead, AsyncWrite};

#[path = "pub/mod.rs"]
mod pubs;
mod rep;
mod req;
mod sub;

use bytes::Bytes;
pub use pubs::{PubError, PubOptions, PubSocket};
pub use rep::*;
pub use req::*;
pub use sub::*;

pub struct RequestId(u32);

Expand All @@ -24,3 +32,9 @@ impl RequestId {
pub trait Authenticator: Send + Sync + Unpin + 'static {
fn authenticate(&self, id: &Bytes) -> bool;
}

pub(crate) struct AuthResult<S: AsyncRead + AsyncWrite> {
id: Bytes,
addr: SocketAddr,
stream: S,
}
Loading

0 comments on commit cd35949

Please sign in to comment.