Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added NATS - a new implementor for the messaging capability! #354

Merged
merged 15 commits into from
Mar 13, 2023
Merged
292 changes: 285 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ slight-core = { workspace = true }
slight-runtime = { workspace = true }
slight-keyvalue = { workspace = true, features = ["filesystem", "awsdynamodb", "redis", "azblob"], optional = true}
slight-distributed-locking = { workspace = true, features = ["etcd"], optional = true}
slight-messaging = { workspace = true, features = ["filesystem", "mosquitto", "azsbus"], optional = true}
slight-messaging = { workspace = true, features = ["filesystem", "mosquitto", "azsbus", "natsio"], optional = true}
slight-runtime-configs = { workspace = true, optional = true }
slight-common = { workspace = true }
slight-sql = { workspace = true, features = ["postgres"], optional = true }
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ run-rust:
RUST_LOG=$(LOG_LEVEL) $(SLIGHT) -c './examples/messaging-producer-demo/caf_slightfile.toml' run ./examples/messaging-producer-demo/target/wasm32-wasi/release/messaging-producer-demo.wasm
# sql.postgres
RUST_LOG=$(LOG_LEVEL) $(SLIGHT) -c './examples/sql-demo/postgres_slightfile.toml' run ./examples/sql-demo/target/wasm32-wasi/release/sql-demo.wasm
# messaging.nats
RUST_LOG=$(LOG_LEVEL) $(SLIGHT) -c './examples/messaging-consumer-demo/nats_slightfile.toml' run ./examples/messaging-consumer-demo/target/wasm32-wasi/release/messaging-consumer-demo.wasm &
RUST_LOG=$(LOG_LEVEL) $(SLIGHT) -c './examples/messaging-producer-demo/nats_slightfile.toml' run ./examples/messaging-producer-demo/target/wasm32-wasi/release/messaging-producer-demo.wasm

.PHONY: clean-rust
clean-rust:
Expand Down
2 changes: 1 addition & 1 deletion build/azure-pipeline/workflow-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ stages:

#
# Install build tools
#
#
- script: |
choco install strawberryperl
make install-deps-win
Expand Down
5 changes: 4 additions & 1 deletion crates/messaging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ async-channel = { version = "1.5", optional = true }
azure_core = { version = "0.10.0", optional = true }
azure_messaging_servicebus = { version = "0.10.0", optional = true }
http = { version = "0.2", optional = true }
# messaging.nats deps
nats = { version = "0.24.0", optional = true }

[features]
default = ["filesystem"]
apache_kafka = ["rdkafka", "openssl"]
filesystem = ["filesystem-pubsub"]
mosquitto = ["mosquitto-rs", "async-channel"]
azsbus = ["azure_core", "azure_messaging_servicebus", "http"]
azsbus = ["azure_core", "azure_messaging_servicebus", "http"]
natsio = ["nats"]
2 changes: 2 additions & 0 deletions crates/messaging/src/implementors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub mod azsbus;
pub mod filesystem;
#[cfg(feature = "mosquitto")]
pub mod mosquitto;
#[cfg(feature = "natsio")]
pub mod natsio;

#[async_trait]
pub trait PubImplementor {
Expand Down
74 changes: 74 additions & 0 deletions crates/messaging/src/implementors/natsio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use async_trait::async_trait;
use nats::{Connection, Subscription};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::runtime::Handle;
use tokio::task::block_in_place;

use anyhow::{Context, Result};
use slight_common::BasicState;
use slight_runtime_configs::get_from_state;

use super::{PubImplementor, SubImplementor};

#[derive(Clone, Debug)]
pub struct NatsIoImplementor {
connection: Connection,
subscription_tokens: Arc<Mutex<HashMap<String, Subscription>>>,
}

impl NatsIoImplementor {
pub async fn new(slight_state: &BasicState) -> Self {
let nats_creds = get_from_state("NATS_CREDS", slight_state).await.unwrap();

let connection = nats::Options::with_static_credentials(&nats_creds)
.unwrap()
.connect("connect.ngs.global")
danbugs marked this conversation as resolved.
Show resolved Hide resolved
.unwrap();
let subscription_tokens = Arc::new(Mutex::new(HashMap::new()));

Self {
connection,
subscription_tokens,
}
}
}

#[async_trait]
impl PubImplementor for NatsIoImplementor {
async fn publish(&self, msg: &[u8], topic: &str) -> Result<()> {
self.connection.publish(topic, msg).unwrap();
Ok(())
}
}

#[async_trait]
impl SubImplementor for NatsIoImplementor {
async fn subscribe(&self, topic: &str) -> Result<String> {
let sub = self.connection.subscribe(topic).unwrap();

let sub_tok = uuid::Uuid::new_v4().to_string();
self.subscription_tokens
.lock()
.unwrap()
.insert(sub_tok.clone(), sub);

Ok(sub_tok)
}

async fn receive(&self, sub_tok: &str) -> Result<Vec<u8>> {
block_in_place(|| {
Handle::current().block_on(async move {
let sub_toks = self.subscription_tokens.lock().unwrap();

let accessed_consumer = sub_toks
.get(sub_tok)
.with_context(|| "failed to get consumer from subscription token")?;

let msg = accessed_consumer.next().unwrap();

Ok(msg.data)
})
})
}
}
12 changes: 12 additions & 0 deletions crates/messaging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ impl PubInner {
MessagingImplementors::AzSbus => {
Arc::new(azsbus::AzSbusImplementor::new(slight_state).await)
}
#[cfg(feature = "natsio")]
MessagingImplementors::Nats => {
Arc::new(natsio::NatsIoImplementor::new(slight_state).await)
}
},
})
}
Expand Down Expand Up @@ -98,6 +102,10 @@ impl SubInner {
MessagingImplementors::AzSbus => {
Arc::new(azsbus::AzSbusImplementor::new(slight_state).await)
}
#[cfg(feature = "natsio")]
MessagingImplementors::Nats => {
Arc::new(natsio::NatsIoImplementor::new(slight_state).await)
}
},
};

Expand Down Expand Up @@ -199,6 +207,8 @@ pub enum MessagingImplementors {
Filesystem,
#[cfg(feature = "azsbus")]
AzSbus,
#[cfg(feature = "natsio")]
Nats,
}

impl From<&str> for MessagingImplementors {
Expand All @@ -212,6 +222,8 @@ impl From<&str> for MessagingImplementors {
"messaging.filesystem" | "mq.filesystem" => Self::Filesystem,
#[cfg(feature = "azsbus")]
"messaging.azsbus" | "mq.azsbus" => Self::AzSbus,
#[cfg(feature = "natsio")]
"messaging.nats" => Self::Nats,
p => panic!(
"failed to match provided name (i.e., '{p}') to any known host implementations"
),
Expand Down
9 changes: 2 additions & 7 deletions crates/runtime-configs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ impl ConfigsInner {
/// This defines the available implementor implementations for the `Configs` interface.
///
/// As per its' usage in `ConfigsInner`, it must `derive` `Debug`, and `Clone`.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub enum ConfigsImplementor {
Local,
#[default]
danbugs marked this conversation as resolved.
Show resolved Hide resolved
EnvVars,
UserSecrets, // user creates configs at compile time that are encrypted and stored in their slightfile
AzApp,
Expand Down Expand Up @@ -164,12 +165,6 @@ impl From<&str> for ConfigsImplementor {
}
}

impl Default for ConfigsImplementor {
fn default() -> Self {
ConfigsImplementor::EnvVars
}
}

/// SDK-ish bit
pub async fn get(
config_type: &str,
Expand Down
12 changes: 6 additions & 6 deletions examples/configs-demo/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions examples/distributed-locking-demo/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions examples/http-client-demo/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading