Skip to content

Commit

Permalink
stackdriver: avoid calling log from inside exporter (#709)
Browse files Browse the repository at this point in the history
  • Loading branch information
djc authored Jan 20, 2022
1 parent 6d07ff0 commit a81c6c6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-stackdriver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ documentation = "https://docs.rs/opentelemetry-stackdriver/"
repository = "https:/open-telemetry/opentelemetry-rust"
license = "Apache-2.0"
edition = "2018"
exclude = ["/proto"]

[dependencies]
async-trait = "0.1.48"
Expand All @@ -15,7 +16,6 @@ hex = "0.4"
http = "0.2"
hyper = "0.14.2"
hyper-rustls = { version = "0.22.1", optional = true }
log = "0.4"
opentelemetry = { version = "0.16", path = "../opentelemetry" }
prost = "0.9"
prost-types = "0.9"
Expand Down
76 changes: 46 additions & 30 deletions opentelemetry-stackdriver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ use std::{
use async_trait::async_trait;
use futures::stream::StreamExt;
use opentelemetry::{
sdk::export::trace::{ExportResult, SpanData, SpanExporter},
global::handle_error,
sdk::export::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::TraceError,
Value,
};
use thiserror::Error;
Expand Down Expand Up @@ -61,22 +66,6 @@ pub struct StackDriverExporter {
maximum_shutdown_duration: Duration,
}

impl fmt::Debug for StackDriverExporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[allow(clippy::unneeded_field_pattern)]
let Self {
maximum_shutdown_duration,
pending_count,
tx: _,
} = self;
f.debug_struct("StackDriverExporter")
.field("tx", &"(elided)")
.field("pending_count", pending_count)
.field("maximum_shutdown_duration", maximum_shutdown_duration)
.finish()
}
}

impl StackDriverExporter {
pub fn builder() -> Builder {
Builder::default()
Expand All @@ -91,10 +80,7 @@ impl StackDriverExporter {
impl SpanExporter for StackDriverExporter {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
match self.tx.try_send(batch) {
Err(e) => {
log::error!("Unable to send to export_inner {:?}", e);
Err(e.into())
}
Err(e) => Err(e.into()),
Ok(()) => {
self.pending_count.fetch_add(1, Ordering::Relaxed);
Ok(())
Expand All @@ -112,6 +98,22 @@ impl SpanExporter for StackDriverExporter {
}
}

impl fmt::Debug for StackDriverExporter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[allow(clippy::unneeded_field_pattern)]
let Self {
tx: _,
pending_count,
maximum_shutdown_duration,
} = self;
f.debug_struct("StackDriverExporter")
.field("tx", &"(elided)")
.field("pending_count", pending_count)
.field("maximum_shutdown_duration", maximum_shutdown_duration)
.finish()
}
}

/// Helper type to build a `StackDriverExporter`.
#[derive(Clone, Default)]
pub struct Builder {
Expand Down Expand Up @@ -141,10 +143,13 @@ impl Builder {
self
}

pub async fn build(
pub async fn build<A: Authorizer>(
self,
authenticator: impl Authorizer,
) -> Result<(StackDriverExporter, impl Future<Output = ()>), Error> {
authenticator: A,
) -> Result<(StackDriverExporter, impl Future<Output = ()>), Error>
where
Error: From<A::Error>,
{
let Self {
maximum_shutdown_duration,
num_concurrent_requests,
Expand Down Expand Up @@ -217,7 +222,10 @@ struct ExporterContext<'a, A> {
scopes: Arc<Vec<&'static str>>,
}

impl<A: Authorizer> ExporterContext<'_, A> {
impl<A: Authorizer> ExporterContext<'_, A>
where
Error: From<A::Error>,
{
async fn export(mut self, batch: Vec<SpanData>) {
use proto::devtools::cloudtrace::v2::span::time_event::Value;

Expand Down Expand Up @@ -327,9 +335,9 @@ impl<A: Authorizer> ExporterContext<'_, A> {

self.pending_count.fetch_sub(1, Ordering::Relaxed);
if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
log::error!("StackDriver authentication failed {}", e);
handle_error(TraceError::from(Error::from(e)));
} else if let Err(e) = self.trace_client.batch_write_spans(req).await {
log::error!("StackDriver push failed {}", e);
handle_error(TraceError::from(Error::TonicRpc(e)));
}

let client = match &mut self.log_client {
Expand All @@ -351,9 +359,9 @@ impl<A: Authorizer> ExporterContext<'_, A> {
});

if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
log::error!("StackDriver authentication failed {}", e);
handle_error(TraceError::from(Error::from(e)));
} else if let Err(e) = client.client.write_log_entries(req).await {
log::error!("StackDriver push failed {}", e);
handle_error(TraceError::from(Error::TonicRpc(e)));
}
}
}
Expand Down Expand Up @@ -498,12 +506,20 @@ pub enum Error {
#[error("{0}")]
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("tonic error: {0}")]
Tonic(#[from] tonic::transport::Error),
TonicRpc(#[from] tonic::Status),
#[error("tonic error: {0}")]
TonicTransport(#[from] tonic::transport::Error),
#[cfg(feature = "yup-oauth2")]
#[error("authorizer error: {0}")]
Yup(#[from] yup_oauth2::Error),
}

impl ExportError for Error {
fn exporter_name(&self) -> &'static str {
"stackdriver"
}
}

/// As defined in https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.type#google.logging.type.LogSeverity.
enum LogSeverity {
Default = 0,
Expand Down

0 comments on commit a81c6c6

Please sign in to comment.