--- a/crates/opentelemetry-exporter-env/src/lib.rs +++ b/crates/opentelemetry-exporter-env/src/lib.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::convert::Infallible; use std::env::{self, VarError}; use std::ffi::OsString; @@ -6,42 +5,41 @@ use std::str::FromStr; use std::time::Duration; -use clap::Parser; -#[cfg(feature = "otlp")] -use opentelemetry_otlp::tonic_types::metadata::MetadataMap; #[cfg(feature = "otlp")] -use opentelemetry_otlp::{ - ExporterBuildError, LogExporter, MetricExporter, SpanExporter, WithExportConfig, - WithHttpConfig, WithTonicConfig, -}; - -#[cfg(feature = "otlp")] mod otlp; +#[derive(thiserror::Error, Debug)] pub enum Error { + #[error("environment variable {env} contains invalid UTF-8: {value:?}")] InvalidUtf8 { env: &'static str, value: OsString, }, - EnvParseError { + #[error("environment variable {env}={value:?}: {error}")] + EnvParse { env: &'static str, value: String, error: &'static str, }, - EnvParseIntError { + #[error("environment variable {env}={value:?}: {error}")] + EnvParseInt { env: &'static str, value: String, error: ParseIntError, }, + #[cfg(feature = "otlp")] + #[error("failed to build exporter: {0}")] + Exporter(#[from] opentelemetry_otlp::ExporterBuildError), } + impl From<(&'static str, &'static str, String)> for Error { fn from((env, error, value): (&'static str, &'static str, String)) -> Self { - Self::EnvParseError { env, value, error } + Self::EnvParse { env, value, error } } } impl From<(&'static str, ParseIntError, String)> for Error { fn from((env, error, value): (&'static str, ParseIntError, String)) -> Self { - Self::EnvParseIntError { env, value, error } + Self::EnvParseInt { env, value, error } } } impl From<(&'static str, Infallible, String)> for Error { @@ -62,36 +60,6 @@ } } -macro_rules! impl_settings { - ( - #[name($env_prefix:literal, $long_prefix:literal)] - struct $id:ident { - $( - $(#[doc = $doc:literal])* - #[name($env:literal, $long:literal)] - $(#[arg($($tt:tt)*)])? - $name:ident: $ty:ty, - )* - }) => { - #[derive(Parser)] - pub struct $id { - $( - $(#[doc = $doc])* - #[arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env), $($($tt)*)?)] - pub $name: Option<$ty>, - )* - } - impl $id { - pub fn from_env() -> Result { - Ok(Self { - $( - $name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?, - )* - }) - } - } - } -} macro_rules! impl_enum { (enum $id:ident { $( @@ -115,7 +83,7 @@ $( $value => Self::$var, )* - _ => return Err("unsupported value, supported are") + _ => return Err("unsupported value") }) } } @@ -123,6 +91,49 @@ } impl_enum! { + enum ExporterKind { + #[name = "otlp"] + Otlp, + #[name = "none"] + None, + } +} + +#[derive(Default)] +#[cfg_attr(feature = "clap", derive(clap::Parser))] +pub struct SignalExporterSettings { + /// Traces exporter to be used. + #[cfg_attr(feature = "clap", arg(long = "otel-traces-exporter", env = "OTEL_TRACES_EXPORTER", value_enum))] + pub traces: Option, + /// Metrics exporter to be used. + #[cfg_attr(feature = "clap", arg(long = "otel-metrics-exporter", env = "OTEL_METRICS_EXPORTER", value_enum))] + pub metrics: Option, + /// Logs exporter to be used. + #[cfg_attr(feature = "clap", arg(long = "otel-logs-exporter", env = "OTEL_LOGS_EXPORTER", value_enum))] + pub logs: Option, +} + +impl SignalExporterSettings { + pub fn from_env() -> Result { + Ok(Self { + traces: load_env("OTEL_TRACES_EXPORTER")?, + metrics: load_env("OTEL_METRICS_EXPORTER")?, + logs: load_env("OTEL_LOGS_EXPORTER")?, + }) + } + + pub fn traces_enabled(&self) -> bool { + !matches!(self.traces, Some(ExporterKind::None)) + } + pub fn metrics_enabled(&self) -> bool { + !matches!(self.metrics, Some(ExporterKind::None)) + } + pub fn logs_enabled(&self) -> bool { + !matches!(self.logs, Some(ExporterKind::None)) + } +} + +impl_enum! { enum Compression { #[name = "gzip"] Gzip, @@ -161,6 +172,53 @@ } } +pub trait OtlpSignalSettings { + fn compression(&self) -> Option; + fn endpoint(&self) -> Option<&str>; + fn headers(&self) -> Option<&str>; + fn protocol(&self) -> Option; + fn timeout(&self) -> Option; +} + +macro_rules! impl_settings { + ( + #[name($env_prefix:literal, $long_prefix:literal)] + struct $id:ident { + $( + $(#[doc = $doc:literal])* + #[name($env:literal, $long:literal)] + $(#[arg($($tt:tt)*)])? + $name:ident: $ty:ty, + )* + }) => { + #[derive(Default)] + #[cfg_attr(feature = "clap", derive(clap::Parser))] + pub struct $id { + $( + $(#[doc = $doc])* + #[cfg_attr(feature = "clap", arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env) $(, $($tt)*)?))] + pub $name: Option<$ty>, + )* + } + impl $id { + pub fn from_env() -> Result { + Ok(Self { + $( + $name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?, + )* + }) + } + } + impl OtlpSignalSettings for $id { + fn compression(&self) -> Option { self.compression } + fn endpoint(&self) -> Option<&str> { self.endpoint.as_deref() } + fn headers(&self) -> Option<&str> { self.headers.as_deref() } + fn protocol(&self) -> Option { self.protocol } + fn timeout(&self) -> Option { self.timeout } + } + } +} + impl_settings! { #[name("", "")] struct OtlpBaseSettings { @@ -168,7 +226,7 @@ #[name("COMPRESSION", "compression")] #[arg(value_enum)] compression: Compression, - /// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you’re sending more than one signal to the same endpoint and want one environment variable to control the endpoint. + /// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you're sending more than one signal to the same endpoint and want one environment variable to control the endpoint. #[name("ENDPOINT", "endpoint")] endpoint: String, /// A list of headers to apply to all outgoing data (traces, metrics, and logs). @@ -227,7 +285,6 @@ timeout: u64, } } - impl_settings! { #[name("TRACES_", "traces-")] struct OtlpTracesSettings { @@ -251,14 +308,78 @@ } } -#[derive(thiserror::Error, Debug)] -enum ProviderError { - #[error("protocol is not set")] - UnsetProtocol, - #[error("endpoint is not set")] - EndpointUnset, - #[cfg(feature = "otlp")] - #[error("failed to build exporter: {0}")] - Exporter(#[from] ExporterBuildError), +pub struct ResolvedOtlpSettings { + pub compression: Option, + pub endpoint: String, + pub headers: Option, + pub protocol: OtlpProtocol, + pub timeout: Duration, } -type ProviderResult = Result; + +impl ResolvedOtlpSettings { + const DEFAULT_TIMEOUT_MS: u64 = 10000; + const DEFAULT_GRPC_ENDPOINT: &str = "http://localhost:4317"; + const DEFAULT_HTTP_ENDPOINT: &str = "http://localhost:4318"; + + pub fn traces( + base: &impl OtlpSignalSettings, + signal: &impl OtlpSignalSettings, + ) -> Result { + Self::resolve(base, signal, "/v1/traces") + } + + pub fn metrics( + base: &impl OtlpSignalSettings, + signal: &impl OtlpSignalSettings, + ) -> Result { + Self::resolve(base, signal, "/v1/metrics") + } + + pub fn logs( + base: &impl OtlpSignalSettings, + signal: &impl OtlpSignalSettings, + ) -> Result { + Self::resolve(base, signal, "/v1/logs") + } + + fn resolve( + base: &impl OtlpSignalSettings, + signal: &impl OtlpSignalSettings, + signal_path: &str, + ) -> Result { + let protocol = signal + .protocol() + .or_else(|| base.protocol()) + .unwrap_or(OtlpProtocol::HttpProtobuf); + + let endpoint = if let Some(ep) = signal.endpoint() { + ep.to_owned() + } else if let Some(ep) = base.endpoint() { + match protocol { + OtlpProtocol::Grpc => ep.to_owned(), + _ => format!("{ep}{signal_path}"), + } + } else { + match protocol { + OtlpProtocol::Grpc => Self::DEFAULT_GRPC_ENDPOINT.to_owned(), + _ => format!("{}{signal_path}", Self::DEFAULT_HTTP_ENDPOINT), + } + }; + + Ok(Self { + compression: signal.compression().or_else(|| base.compression()), + endpoint, + headers: signal + .headers() + .or_else(|| base.headers()) + .map(str::to_owned), + protocol, + timeout: Duration::from_millis( + signal + .timeout() + .or_else(|| base.timeout()) + .unwrap_or(Self::DEFAULT_TIMEOUT_MS), + ), + }) + } +} --- a/crates/opentelemetry-exporter-env/src/otlp.rs +++ b/crates/opentelemetry-exporter-env/src/otlp.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::time::Duration; use opentelemetry_otlp::tonic_types::metadata::MetadataMap; use opentelemetry_otlp::{ @@ -7,14 +6,9 @@ WithTonicConfig as _, }; -use crate::{ - OtlpBaseSettings, OtlpLogsSettings, OtlpMetricsSettings, OtlpProtocol, ProviderError, - ProviderResult, -}; +use crate::{Error, OtlpProtocol, ResolvedOtlpSettings}; -fn parse_headers<'a>( - headers: &'a str, -) -> std::iter::Map, impl FnMut(&'a str) -> (&'a str, &'a str)> { +fn parse_headers(headers: &str) -> impl Iterator { headers.split(',').map(|header| { let mut parts = header.splitn(2, '='); let key = parts.next().unwrap(); @@ -23,7 +17,7 @@ }) } -fn parse_headers_metadata_map(headers: Option<&str>) -> MetadataMap { +fn to_metadata_map(headers: Option<&str>) -> MetadataMap { headers .map(|headers| { MetadataMap::from_headers( @@ -34,7 +28,8 @@ }) .unwrap_or_default() } -fn parse_headers_hashmap(headers: Option<&str>) -> HashMap { + +fn to_hashmap(headers: Option<&str>) -> HashMap { headers .map(|headers| { parse_headers(headers) @@ -44,132 +39,45 @@ .unwrap_or_default() } -fn logger_exporter(base: &OtlpBaseSettings, log: &OtlpLogsSettings) -> ProviderResult { - let endpoint = log - .endpoint - .clone() - .or_else(|| Some(format!("{}/v1/logs", base.endpoint.as_ref()?))) - .ok_or(ProviderError::EndpointUnset)?; - let headers = log.headers.as_deref().or(base.headers.as_deref()); - let timeout = Duration::from_millis(log.timeout.or(base.timeout).unwrap_or(10000)); - - let protocol = log - .protocol - .or(base.protocol) - .ok_or(ProviderError::UnsetProtocol)?; - - match protocol { - OtlpProtocol::Grpc => { - let mut builder = LogExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .with_metadata(parse_headers_metadata_map(headers)) - .with_protocol(protocol.into()) - .with_timeout(timeout); - let compression = log.compression.or(base.compression); - if let Some(compression) = compression { - builder = builder.with_compression(compression.into()); +macro_rules! build_exporter { + ($exporter:ty, $settings:expr) => {{ + let s: &ResolvedOtlpSettings = $settings; + match s.protocol { + OtlpProtocol::Grpc => { + let mut builder = <$exporter>::builder() + .with_tonic() + .with_endpoint(&s.endpoint) + .with_metadata(to_metadata_map(s.headers.as_deref())) + .with_protocol(s.protocol.into()) + .with_timeout(s.timeout); + if let Some(compression) = s.compression { + builder = builder.with_compression(compression.into()); + } + builder.build() + } + OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => { + <$exporter>::builder() + .with_http() + .with_endpoint(&s.endpoint) + .with_headers(to_hashmap(s.headers.as_deref())) + .with_protocol(s.protocol.into()) + .with_timeout(s.timeout) + .build() } - - Ok(builder.build()?) } - OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => { - let builder = LogExporter::builder() - .with_http() - .with_endpoint(endpoint) - .with_headers(parse_headers_hashmap(headers)) - .with_protocol(protocol.into()) - .with_timeout(timeout); + }}; +} - Ok(builder.build()?) - } +impl ResolvedOtlpSettings { + pub fn span_exporter(&self) -> Result { + Ok(build_exporter!(SpanExporter, self)?) } -} -fn metric_exporter( - base: &OtlpBaseSettings, - metric: &OtlpMetricsSettings, -) -> ProviderResult { - let endpoint = metric - .endpoint - .clone() - .or_else(|| Some(format!("{}/v1/metrics", base.endpoint.as_ref()?))) - .ok_or(ProviderError::EndpointUnset)?; - let headers = metric.headers.as_deref().or(base.headers.as_deref()); - let timeout = Duration::from_millis(metric.timeout.or(base.timeout).unwrap_or(10000)); - let protocol = metric - .protocol - .or(base.protocol) - .ok_or(ProviderError::UnsetProtocol)?; - - match protocol { - OtlpProtocol::Grpc => { - let mut builder = MetricExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .with_metadata(parse_headers_metadata_map(headers)) - .with_protocol(protocol.into()) - .with_timeout(timeout); - let compression = metric.compression.or(base.compression); - if let Some(compression) = compression { - builder = builder.with_compression(compression.into()); - } - - Ok(builder.build()?) - } - OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => { - let builder = MetricExporter::builder() - .with_http() - .with_endpoint(endpoint) - .with_headers(parse_headers_hashmap(headers)) - .with_protocol(protocol.into()) - .with_timeout(timeout); - - Ok(builder.build()?) - } + pub fn log_exporter(&self) -> Result { + Ok(build_exporter!(LogExporter, self)?) } -} -fn span_exporter( - base: &OtlpBaseSettings, - trace: &OtlpMetricsSettings, -) -> ProviderResult { - let endpoint = trace - .endpoint - .clone() - .or_else(|| Some(format!("{}/v1/traces", base.endpoint.as_ref()?))) - .ok_or(ProviderError::EndpointUnset)?; - let headers = trace.headers.as_deref().or(base.headers.as_deref()); - let timeout = Duration::from_millis(trace.timeout.or(base.timeout).unwrap_or(10000)); - let protocol = trace - .protocol - .or(base.protocol) - .ok_or(ProviderError::UnsetProtocol)?; - - match protocol { - OtlpProtocol::Grpc => { - let mut builder = SpanExporter::builder() - .with_tonic() - .with_endpoint(endpoint) - .with_metadata(parse_headers_metadata_map(headers)) - .with_protocol(protocol.into()) - .with_timeout(timeout); - let compression = trace.compression.or(base.compression); - if let Some(compression) = compression { - builder = builder.with_compression(compression.into()); - } - - Ok(builder.build()?) - } - OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => { - let builder = SpanExporter::builder() - .with_http() - .with_endpoint(endpoint) - .with_headers(parse_headers_hashmap(headers)) - .with_protocol(protocol.into()) - .with_timeout(timeout); - - Ok(builder.build()?) - } + pub fn metric_exporter(&self) -> Result { + Ok(build_exporter!(MetricExporter, self)?) } }