Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client_ext for Client::get and Client::list #1375

Merged
merged 18 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ release = false
[features]
default = ["rustls-tls", "kubederive", "ws", "latest", "socks5", "runtime", "refresh"]
kubederive = ["kube/derive"]
openssl-tls = ["kube/client", "kube/openssl-tls"]
rustls-tls = ["kube/client", "kube/rustls-tls"]
openssl-tls = ["kube/client", "kube/openssl-tls", "kube/unstable-client"]
rustls-tls = ["kube/client", "kube/rustls-tls", "kube/unstable-client"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
socks5 = ["kube/socks5"]
refresh = ["kube/oauth", "kube/oidc"]
Expand Down
9 changes: 4 additions & 5 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
api::{Api, ListParams, ResourceExt},
client::{scope, Client},
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
Expand All @@ -25,13 +24,13 @@ async fn main() -> anyhow::Result<()> {

pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
check_for_node_failures(&events, n).await?;
check_for_node_failures(&client, n).await?;
}
Ok(())
}

// A simple node problem detector
async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result<()> {
async fn check_for_node_failures(client: &Client, o: Node) -> anyhow::Result<()> {
let name = o.name_any();
// Nodes often modify a lot - only print broken nodes
if let Some(true) = o.spec.unwrap().unschedulable {
Expand All @@ -52,7 +51,7 @@ async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result
// Find events related to this node
let opts =
ListParams::default().fields(&format!("involvedObject.kind=Node,involvedObject.name={name}"));
let evlist = events.list(&opts).await?;
let evlist = client.list::<Event>(&opts, &scope::Cluster).await?;
for e in evlist {
warn!("Node event: {:?}", serde_json::to_string_pretty(&e)?);
}
Expand Down
3 changes: 2 additions & 1 deletion kube-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ jsonpatch = ["kube-core/jsonpatch"]
admission = ["kube-core/admission"]
config = ["__non_core", "pem", "home"]
socks5 = ["hyper-socks2"]
unstable-client = []

# private feature sets; do not use
__non_core = ["tracing", "serde_yaml", "base64"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest", "socks5"]
features = ["client", "rustls-tls", "openssl-tls", "ws", "oauth", "oidc", "jsonpatch", "admission", "k8s-openapi/latest", "socks5", "unstable-client"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand Down
259 changes: 259 additions & 0 deletions kube-client/src/client/client_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
use crate::{Client, Error, Result};
use k8s_openapi::api::core::v1::Namespace as k8sNs;
use kube_core::{
object::ObjectList,
params::{GetParams, ListParams},
request::Request,
ClusterResourceScope, DynamicResourceScope, NamespaceResourceScope, Resource,
};
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;

/// A marker trait to indicate cluster-wide operations are available
trait ClusterScope {}
/// A marker trait to indicate namespace-scoped operations are available
trait NamespaceScope {}

// k8s_openapi scopes get implementations for free
impl ClusterScope for ClusterResourceScope {}
impl NamespaceScope for NamespaceResourceScope {}
// our DynamicResourceScope can masquerade as either
impl NamespaceScope for DynamicResourceScope {}
impl ClusterScope for DynamicResourceScope {}

/// How to get the url for a collection
///
/// Pick one of `kube::client::Cluster` or `kube::client::Namespace`.
pub trait CollectionUrl<K> {
fn url_path(&self) -> String;
}

/// How to get the url for an object
///
/// Pick one of `kube::client::Cluster` or `kube::client::Namespace`.
pub trait ObjectUrl<K> {
fn url_path(&self) -> String;
}

/// Marker type for cluster level queries
pub struct Cluster;
/// Namespace newtype for namespace level queries
///
/// You can create this directly, or convert `From` a `String` / `&str`, or `TryFrom` an `k8s_openapi::api::core::v1::Namespace`
pub struct Namespace(String);

/// Scopes for `unstable-client` [`Client#impl-Client`] extension methods
pub mod scope {
pub use super::{Cluster, Namespace};
}

// All objects can be listed cluster-wide
impl<K> CollectionUrl<K> for Cluster
where
K: Resource,
K::DynamicType: Default,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), None)
}
}

// Only cluster-scoped objects can be named globally
impl<K> ObjectUrl<K> for Cluster
where
K: Resource,
K::DynamicType: Default,
K::Scope: ClusterScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), None)
}
}

// Only namespaced objects can be accessed via namespace
impl<K> CollectionUrl<K> for Namespace
where
K: Resource,
K::DynamicType: Default,
K::Scope: NamespaceScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), Some(&self.0))
}
}

impl<K> ObjectUrl<K> for Namespace
where
K: Resource,
K::DynamicType: Default,
K::Scope: NamespaceScope,
{
fn url_path(&self) -> String {
K::url_path(&K::DynamicType::default(), Some(&self.0))
}
}

// can be created from a complete native object
impl TryFrom<&k8sNs> for Namespace {
type Error = NamespaceError;

fn try_from(ns: &k8sNs) -> Result<Namespace, Self::Error> {
if let Some(n) = &ns.meta().name {
Ok(Namespace(n.to_owned()))
} else {
Err(NamespaceError::MissingName)

Check warning on line 104 in kube-client/src/client/client_ext.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/client_ext.rs#L104

Added line #L104 was not covered by tests
}
}
}
// and from literals + owned strings
impl From<&str> for Namespace {
fn from(ns: &str) -> Namespace {
Namespace(ns.to_owned())
}
}
impl From<String> for Namespace {
fn from(ns: String) -> Namespace {
Namespace(ns)

Check warning on line 116 in kube-client/src/client/client_ext.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/client_ext.rs#L115-L116

Added lines #L115 - L116 were not covered by tests
}
}

#[derive(thiserror::Error, Debug)]
/// Failures to infer a namespace
pub enum NamespaceError {
/// MissingName
#[error("Missing Namespace Name")]
MissingName,
}

/// Generic client extensions for the `unstable-client` feature
///
/// These methods allow users to query across a wide-array of resources without needing
/// to explicitly create an [`Api`](crate::Api) for each one of them.
///
/// ## Usage
/// 1. Create a [`Client`]
/// 2. Specify the [`scope`] you are querying at via [`Cluster`] or [`Namespace`] as args
/// 3. Specify the resource type you are using for serialization (e.g. a top level k8s-openapi type)
///
/// ## Example
///
/// ```no_run
/// # use k8s_openapi::api::core::v1::Pod;
/// # use k8s_openapi::api::core::v1::Service;
/// # use kube::client::scope::{Cluster, Namespace};
/// # use kube::{ResourceExt, api::ListParams};
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let lp = ListParams::default();
/// // List at Cluster level for Pod resource:
/// for pod in client.list::<Pod>(&lp, &Cluster).await? {
/// println!("Found pod {} in {}", pod.name_any(), pod.namespace().unwrap());
/// }
/// // Namespaced Get for Service resource:
/// let svc = client.get::<Service>("kubernetes", &Namespace::from("default")).await?;
/// assert_eq!(svc.name_unchecked(), "kubernetes");
/// # Ok(())
/// # }
/// ```
impl Client {
/// Get a single instance of a `Resource` implementing type `K` at the specified scope.
///
/// ```no_run
/// # use k8s_openapi::api::rbac::v1::ClusterRole;
/// # use k8s_openapi::api::core::v1::Service;
/// # use kube::client::scope::{Cluster, Namespace};
/// # use kube::{ResourceExt, api::GetParams};
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let cr = client.get::<ClusterRole>("cluster-admin", &Cluster).await?;
/// assert_eq!(cr.name_unchecked(), "cluster-admin");
/// let svc = client.get::<Service>("kubernetes", &Namespace::from("default")).await?;
/// assert_eq!(svc.name_unchecked(), "kubernetes");
/// # Ok(())
/// # }
/// ```
pub async fn get<K>(&self, name: &str, scope: &impl ObjectUrl<K>) -> Result<K>
where
K: Resource + Serialize + DeserializeOwned + Clone + Debug,
<K as Resource>::DynamicType: Default,
{
let mut req = Request::new(scope.url_path())
.get(name, &GetParams::default())
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("get");
self.request::<K>(req).await
}

/// List instances of a `Resource` implementing type `K` at the specified scope.
///
/// ```no_run
/// # use k8s_openapi::api::core::v1::Pod;
/// # use k8s_openapi::api::core::v1::Service;
/// # use kube::client::scope::{Cluster, Namespace};
/// # use kube::{ResourceExt, api::ListParams};
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let lp = ListParams::default();
/// for pod in client.list::<Pod>(&lp, &Cluster).await? {
/// println!("Found pod {} in {}", pod.name_any(), pod.namespace().unwrap());
/// }
/// for svc in client.list::<Service>(&lp, &Namespace::from("default")).await? {
/// println!("Found service {}", svc.name_any());
/// }
/// # Ok(())
/// # }
/// ```
pub async fn list<K>(&self, lp: &ListParams, scope: &impl CollectionUrl<K>) -> Result<ObjectList<K>>
where
K: Resource + Serialize + DeserializeOwned + Clone + Debug,
<K as Resource>::DynamicType: Default,
{
let mut req = Request::new(scope.url_path())
.list(lp)

Check warning on line 212 in kube-client/src/client/client_ext.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/client_ext.rs#L212

Added line #L212 was not covered by tests
.map_err(Error::BuildRequest)?;
req.extensions_mut().insert("list");
self.request::<ObjectList<K>>(req).await
}
}

#[cfg(test)]
mod test {
use super::{
scope::{Cluster, Namespace},
Client, ListParams,
};
use kube_core::ResourceExt;

#[tokio::test]
#[ignore = "needs cluster (will list/get namespaces, pods, jobs, svcs, clusterroles)"]
async fn client_ext_list_get_pods_svcs() -> Result<(), Box<dyn std::error::Error>> {
use k8s_openapi::api::{
batch::v1::Job,
core::v1::{Namespace as k8sNs, Pod, Service},
rbac::v1::ClusterRole,
};

let client = Client::try_default().await?;
let lp = ListParams::default();
// cluster-scoped list
for ns in client.list::<k8sNs>(&lp, &Cluster).await? {
// namespaced list
for p in client.list::<Pod>(&lp, &Namespace::try_from(&ns)?).await? {
println!("Found pod {} in {}", p.name_any(), ns.name_any());
}
}
// across-namespace list
for j in client.list::<Job>(&lp, &Cluster).await? {
println!("Found job {} in {}", j.name_any(), j.namespace().unwrap());

Check warning on line 247 in kube-client/src/client/client_ext.rs

View check run for this annotation

Codecov / codecov/patch

kube-client/src/client/client_ext.rs#L247

Added line #L247 was not covered by tests
}
// namespaced get
let default: Namespace = "default".into();
let svc = client.get::<Service>("kubernetes", &default).await?;
assert_eq!(svc.name_unchecked(), "kubernetes");
// global get
let ca = client.get::<ClusterRole>("cluster-admin", &Cluster).await?;
assert_eq!(ca.name_unchecked(), "cluster-admin");

Ok(())
}
}
23 changes: 22 additions & 1 deletion kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ mod body;
mod builder;
// Add `into_stream()` to `http::Body`
use body::BodyStreamExt;
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
#[cfg(feature = "unstable-client")]
mod client_ext;
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
#[cfg(feature = "unstable-client")]
pub use client_ext::scope;
mod config_ext;
pub use auth::Error as AuthError;
pub use config_ext::ConfigExt;
Expand Down Expand Up @@ -69,6 +75,11 @@ pub struct Client {
default_ns: String,
}

/// Constructors and low-level api interfaces.
///
/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
///
/// The many various lower level interfaces here are for more advanced use-cases with specific requirements.
impl Client {
/// Create a [`Client`] using a custom `Service` stack.
///
Expand Down Expand Up @@ -123,6 +134,14 @@ impl Client {
///
/// Will fail if neither configuration could be loaded.
///
/// ```rust
/// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
/// # use kube::Client;
/// let client = Client::try_default().await?;
/// # Ok(())
/// # }
/// ```
///
/// If you already have a [`Config`] then use [`Client::try_from`](Self::try_from)
/// instead.
pub async fn try_default() -> Result<Self> {
Expand Down Expand Up @@ -460,7 +479,9 @@ fn handle_api_errors(text: &str, s: StatusCode) -> Result<()> {
impl TryFrom<Config> for Client {
type Error = Error;

/// Builds a default [`Client`] from a [`Config`], see [`ClientBuilder`] if more customization is required
/// Builds a default [`Client`] from a [`Config`].
///
/// See [`ClientBuilder`] or [`Client::new`] if more customization is required
fn try_from(config: Config) -> Result<Self> {
Ok(ClientBuilder::try_from(config)?.build())
}
Expand Down
2 changes: 1 addition & 1 deletion kube-client/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub enum LoadDataError {
/// Prefer [`Config::infer`] unless you have particular issues, and avoid manually managing
/// the data in this struct unless you have particular needs. It exists to be consumed by the [`Client`][crate::Client].
///
/// If you are looking to parse the kubeconfig found in a user's home directory see [`Kubeconfig`](crate::config::Kubeconfig).
/// If you are looking to parse the kubeconfig found in a user's home directory see [`Kubeconfig`].
#[cfg_attr(docsrs, doc(cfg(feature = "config")))]
#[derive(Debug, Clone)]
pub struct Config {
Expand Down
Loading
Loading