difftreelog
feat otel-exporter-env
in: trunk
2 files changed
crates/opentelemetry-exporter-env/src/lib.rsdiffbeforeafterboth1use std::convert::Infallible;2use std::env::{self, VarError};3use std::ffi::OsString;4use std::num::ParseIntError;5use std::str::FromStr;6use std::time::Duration;78#[cfg(feature = "otlp")]9mod otlp;1011#[derive(thiserror::Error, Debug)]12pub enum Error {13 #[error("environment variable {env} contains invalid UTF-8: {value:?}")]14 InvalidUtf8 {15 env: &'static str,16 value: OsString,17 },18 #[error("environment variable {env}={value:?}: {error}")]19 EnvParse {20 env: &'static str,21 value: String,22 error: &'static str,23 },24 #[error("environment variable {env}={value:?}: {error}")]25 EnvParseInt {26 env: &'static str,27 value: String,28 error: ParseIntError,29 },30 #[cfg(feature = "otlp")]31 #[error("failed to build exporter: {0}")]32 Exporter(#[from] opentelemetry_otlp::ExporterBuildError),33}3435impl From<(&'static str, &'static str, String)> for Error {36 fn from((env, error, value): (&'static str, &'static str, String)) -> Self {37 Self::EnvParse { env, value, error }38 }39}40impl From<(&'static str, ParseIntError, String)> for Error {41 fn from((env, error, value): (&'static str, ParseIntError, String)) -> Self {42 Self::EnvParseInt { env, value, error }43 }44}45impl From<(&'static str, Infallible, String)> for Error {46 fn from(_v: (&'static str, Infallible, String)) -> Self {47 unreachable!()48 }49}5051fn load_env<T>(env: &'static str) -> Result<Option<T>, Error>52where53 T: FromStr,54 Error: From<(&'static str, <T as FromStr>::Err, String)>,55{56 match env::var(env) {57 Ok(v) => Ok(Some(T::from_str(&v).map_err(|err| (env, err, v))?)),58 Err(VarError::NotPresent) => Ok(None),59 Err(VarError::NotUnicode(value)) => Err(Error::InvalidUtf8 { env, value }),60 }61}6263macro_rules! impl_enum {64 (enum $id:ident {65 $(66 #[name = $value:literal]67 $var:ident,68 )*69 }) => {70 #[derive(Clone, Copy)]71 #[cfg_attr(feature = "clap", derive(clap::ValueEnum))]72 pub enum $id {73 $(74 #[cfg_attr(feature = "clap", value(name = $value))]75 $var,76 )*77 }78 impl FromStr for $id {79 type Err = &'static str;8081 fn from_str(s: &str) -> Result<Self, Self::Err> {82 Ok(match s {83 $(84 $value => Self::$var,85 )*86 _ => return Err("unsupported value")87 })88 }89 }90 };91}9293impl_enum! {94 enum ExporterKind {95 #[name = "otlp"]96 Otlp,97 #[name = "none"]98 None,99 }100}101102#[derive(Default)]103#[cfg_attr(feature = "clap", derive(clap::Parser))]104pub struct SignalExporterSettings {105 /// Traces exporter to be used.106 #[cfg_attr(feature = "clap", arg(long = "otel-traces-exporter", env = "OTEL_TRACES_EXPORTER", value_enum))]107 pub traces: Option<ExporterKind>,108 /// Metrics exporter to be used.109 #[cfg_attr(feature = "clap", arg(long = "otel-metrics-exporter", env = "OTEL_METRICS_EXPORTER", value_enum))]110 pub metrics: Option<ExporterKind>,111 /// Logs exporter to be used.112 #[cfg_attr(feature = "clap", arg(long = "otel-logs-exporter", env = "OTEL_LOGS_EXPORTER", value_enum))]113 pub logs: Option<ExporterKind>,114}115116impl SignalExporterSettings {117 pub fn from_env() -> Result<Self, Error> {118 Ok(Self {119 traces: load_env("OTEL_TRACES_EXPORTER")?,120 metrics: load_env("OTEL_METRICS_EXPORTER")?,121 logs: load_env("OTEL_LOGS_EXPORTER")?,122 })123 }124125 pub fn traces_enabled(&self) -> bool {126 !matches!(self.traces, Some(ExporterKind::None))127 }128 pub fn metrics_enabled(&self) -> bool {129 !matches!(self.metrics, Some(ExporterKind::None))130 }131 pub fn logs_enabled(&self) -> bool {132 !matches!(self.logs, Some(ExporterKind::None))133 }134}135136impl_enum! {137 enum Compression {138 #[name = "gzip"]139 Gzip,140 #[name = "zstd"]141 Zstd,142 }143}144#[cfg(feature = "otlp")]145impl From<Compression> for opentelemetry_otlp::Compression {146 fn from(value: Compression) -> Self {147 match value {148 Compression::Gzip => opentelemetry_otlp::Compression::Gzip,149 Compression::Zstd => opentelemetry_otlp::Compression::Zstd,150 }151 }152}153154impl_enum! {155 enum OtlpProtocol {156 #[name = "grpc"]157 Grpc,158 #[name = "http/protobuf"]159 HttpProtobuf,160 #[name = "http/json"]161 HttpJson,162 }163}164#[cfg(feature = "otlp")]165impl From<OtlpProtocol> for opentelemetry_otlp::Protocol {166 fn from(value: OtlpProtocol) -> Self {167 match value {168 OtlpProtocol::Grpc => opentelemetry_otlp::Protocol::Grpc,169 OtlpProtocol::HttpProtobuf => opentelemetry_otlp::Protocol::HttpBinary,170 OtlpProtocol::HttpJson => opentelemetry_otlp::Protocol::HttpJson,171 }172 }173}174175pub trait OtlpSignalSettings {176 fn compression(&self) -> Option<Compression>;177 fn endpoint(&self) -> Option<&str>;178 fn headers(&self) -> Option<&str>;179 fn protocol(&self) -> Option<OtlpProtocol>;180 fn timeout(&self) -> Option<u64>;181}182183macro_rules! impl_settings {184 (185 #[name($env_prefix:literal, $long_prefix:literal)]186 struct $id:ident {187 $(188 $(#[doc = $doc:literal])*189 #[name($env:literal, $long:literal)]190 $(#[arg($($tt:tt)*)])?191 $name:ident: $ty:ty,192 )*193 }) => {194 #[derive(Default)]195 #[cfg_attr(feature = "clap", derive(clap::Parser))]196 pub struct $id {197 $(198 $(#[doc = $doc])*199 #[cfg_attr(feature = "clap", arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env) $(, $($tt)*)?))]200 pub $name: Option<$ty>,201 )*202 }203 impl $id {204 pub fn from_env() -> Result<Self, Error> {205 Ok(Self {206 $(207 $name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?,208 )*209 })210 }211 }212 impl OtlpSignalSettings for $id {213 fn compression(&self) -> Option<Compression> { self.compression }214 fn endpoint(&self) -> Option<&str> { self.endpoint.as_deref() }215 fn headers(&self) -> Option<&str> { self.headers.as_deref() }216 fn protocol(&self) -> Option<OtlpProtocol> { self.protocol }217 fn timeout(&self) -> Option<u64> { self.timeout }218 }219 }220}221222impl_settings! {223 #[name("", "")]224 struct OtlpBaseSettings {225 /// Specifies the OTLP transport compression to be used for all telemetry data.226 #[name("COMPRESSION", "compression")]227 #[arg(value_enum)]228 compression: Compression,229 /// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you're sending more than one signal to the same endpoint and want one environment variable to control the endpoint.230 #[name("ENDPOINT", "endpoint")]231 endpoint: String,232 /// A list of headers to apply to all outgoing data (traces, metrics, and logs).233 #[name("HEADERS", "headers")]234 headers: String,235 /// Specifies the OTLP transport protocol to be used for all telemetry data.236 #[name("PROTOCOL", "protocol")]237 #[arg(value_enum)]238 protocol: OtlpProtocol,239 /// The timeout value for all outgoing data (traces, metrics, and logs) in milliseconds.240 #[name("TIMEOUT", "timeout")]241 timeout: u64,242 }243}244impl_settings! {245 #[name("LOGS_", "logs-")]246 struct OtlpLogsSettings {247 /// Specifies the OTLP transport compression to be used for log data.248 #[name("COMPRESSION", "compression")]249 #[arg(value_enum)]250 compression: Compression,251 /// Endpoint URL for log data only, with an optionally-specified port number. Typically ends with `v1/logs` when using OTLP/HTTP.252 #[name("ENDPOINT", "endpoint")]253 endpoint: String,254 /// A list of headers to apply to all outgoing logs.255 #[name("HEADERS", "headers")]256 headers: String,257 /// Specifies the OTLP transport protocol to be used for log data.258 #[name("PROTOCOL", "protocol")]259 #[arg(value_enum)]260 protocol: OtlpProtocol,261 /// The timeout value for all outgoing logs in milliseconds.262 #[name("TIMEOUT", "timeout")]263 timeout: u64,264 }265}266impl_settings! {267 #[name("METRICS_", "metrics-")]268 struct OtlpMetricsSettings {269 /// Specifies the OTLP transport compression to be used for metrics data.270 #[name("COMPRESSION", "compression")]271 #[arg(value_enum)]272 compression: Compression,273 /// Endpoint URL for metric data only, with an optionally-specified port number. Typically ends with `v1/metrics` when using OTLP/HTTP.274 #[name("ENDPOINT", "endpoint")]275 endpoint: String,276 /// A list of headers to apply to all outgoing metrics.277 #[name("HEADERS", "headers")]278 headers: String,279 /// Specifies the OTLP transport protocol to be used for metrics data.280 #[name("PROTOCOL", "protocol")]281 #[arg(value_enum)]282 protocol: OtlpProtocol,283 /// The timeout value for all outgoing metrics in milliseconds.284 #[name("TIMEOUT", "timeout")]285 timeout: u64,286 }287}288impl_settings! {289 #[name("TRACES_", "traces-")]290 struct OtlpTracesSettings {291 /// Specifies the OTLP transport compression to be used for trace data.292 #[name("COMPRESSION", "compression")]293 #[arg(value_enum)]294 compression: Compression,295 /// Endpoint URL for trace data only, with an optionally-specified port number. Typically ends with `v1/traces` when using OTLP/HTTP.296 #[name("ENDPOINT", "endpoint")]297 endpoint: String,298 /// A list of headers to apply to all outgoing traces.299 #[name("HEADERS", "headers")]300 headers: String,301 /// Specifies the OTLP transport protocol to be used for trace data.302 #[name("PROTOCOL", "protocol")]303 #[arg(value_enum)]304 protocol: OtlpProtocol,305 /// The timeout value for all outgoing traces in milliseconds.306 #[name("TIMEOUT", "timeout")]307 timeout: u64,308 }309}310311pub struct ResolvedOtlpSettings {312 pub compression: Option<Compression>,313 pub endpoint: String,314 pub headers: Option<String>,315 pub protocol: OtlpProtocol,316 pub timeout: Duration,317}318319impl ResolvedOtlpSettings {320 const DEFAULT_TIMEOUT_MS: u64 = 10000;321 const DEFAULT_GRPC_ENDPOINT: &str = "http://localhost:4317";322 const DEFAULT_HTTP_ENDPOINT: &str = "http://localhost:4318";323324 pub fn traces(325 base: &impl OtlpSignalSettings,326 signal: &impl OtlpSignalSettings,327 ) -> Result<Self, Error> {328 Self::resolve(base, signal, "/v1/traces")329 }330331 pub fn metrics(332 base: &impl OtlpSignalSettings,333 signal: &impl OtlpSignalSettings,334 ) -> Result<Self, Error> {335 Self::resolve(base, signal, "/v1/metrics")336 }337338 pub fn logs(339 base: &impl OtlpSignalSettings,340 signal: &impl OtlpSignalSettings,341 ) -> Result<Self, Error> {342 Self::resolve(base, signal, "/v1/logs")343 }344345 fn resolve(346 base: &impl OtlpSignalSettings,347 signal: &impl OtlpSignalSettings,348 signal_path: &str,349 ) -> Result<Self, Error> {350 let protocol = signal351 .protocol()352 .or_else(|| base.protocol())353 .unwrap_or(OtlpProtocol::HttpProtobuf);354355 let endpoint = if let Some(ep) = signal.endpoint() {356 ep.to_owned()357 } else if let Some(ep) = base.endpoint() {358 match protocol {359 OtlpProtocol::Grpc => ep.to_owned(),360 _ => format!("{ep}{signal_path}"),361 }362 } else {363 match protocol {364 OtlpProtocol::Grpc => Self::DEFAULT_GRPC_ENDPOINT.to_owned(),365 _ => format!("{}{signal_path}", Self::DEFAULT_HTTP_ENDPOINT),366 }367 };368369 Ok(Self {370 compression: signal.compression().or_else(|| base.compression()),371 endpoint,372 headers: signal373 .headers()374 .or_else(|| base.headers())375 .map(str::to_owned),376 protocol,377 timeout: Duration::from_millis(378 signal379 .timeout()380 .or_else(|| base.timeout())381 .unwrap_or(Self::DEFAULT_TIMEOUT_MS),382 ),383 })384 }385}crates/opentelemetry-exporter-env/src/otlp.rsdiffbeforeafterboth--- a/crates/opentelemetry-exporter-env/src/otlp.rs
+++ b/crates/opentelemetry-exporter-env/src/otlp.rs
@@ -1,5 +1,4 @@
use std::collections::HashMap;
-use std::time::Duration;
use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
use opentelemetry_otlp::{
@@ -7,14 +6,9 @@
WithTonicConfig as _,
};
-use crate::{
- OtlpBaseSettings, OtlpLogsSettings, OtlpMetricsSettings, OtlpProtocol, ProviderError,
- ProviderResult,
-};
+use crate::{Error, OtlpProtocol, ResolvedOtlpSettings};
-fn parse_headers<'a>(
- headers: &'a str,
-) -> std::iter::Map<std::str::Split<'a, char>, impl FnMut(&'a str) -> (&'a str, &'a str)> {
+fn parse_headers(headers: &str) -> impl Iterator<Item = (&str, &str)> {
headers.split(',').map(|header| {
let mut parts = header.splitn(2, '=');
let key = parts.next().unwrap();
@@ -23,7 +17,7 @@
})
}
-fn parse_headers_metadata_map(headers: Option<&str>) -> MetadataMap {
+fn to_metadata_map(headers: Option<&str>) -> MetadataMap {
headers
.map(|headers| {
MetadataMap::from_headers(
@@ -34,7 +28,8 @@
})
.unwrap_or_default()
}
-fn parse_headers_hashmap(headers: Option<&str>) -> HashMap<String, String> {
+
+fn to_hashmap(headers: Option<&str>) -> HashMap<String, String> {
headers
.map(|headers| {
parse_headers(headers)
@@ -44,132 +39,45 @@
.unwrap_or_default()
}
-fn logger_exporter(base: &OtlpBaseSettings, log: &OtlpLogsSettings) -> ProviderResult<LogExporter> {
- let endpoint = log
- .endpoint
- .clone()
- .or_else(|| Some(format!("{}/v1/logs", base.endpoint.as_ref()?)))
- .ok_or(ProviderError::EndpointUnset)?;
- let headers = log.headers.as_deref().or(base.headers.as_deref());
- let timeout = Duration::from_millis(log.timeout.or(base.timeout).unwrap_or(10000));
-
- let protocol = log
- .protocol
- .or(base.protocol)
- .ok_or(ProviderError::UnsetProtocol)?;
-
- match protocol {
- OtlpProtocol::Grpc => {
- let mut builder = LogExporter::builder()
- .with_tonic()
- .with_endpoint(endpoint)
- .with_metadata(parse_headers_metadata_map(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
- let compression = log.compression.or(base.compression);
- if let Some(compression) = compression {
- builder = builder.with_compression(compression.into());
+macro_rules! build_exporter {
+ ($exporter:ty, $settings:expr) => {{
+ let s: &ResolvedOtlpSettings = $settings;
+ match s.protocol {
+ OtlpProtocol::Grpc => {
+ let mut builder = <$exporter>::builder()
+ .with_tonic()
+ .with_endpoint(&s.endpoint)
+ .with_metadata(to_metadata_map(s.headers.as_deref()))
+ .with_protocol(s.protocol.into())
+ .with_timeout(s.timeout);
+ if let Some(compression) = s.compression {
+ builder = builder.with_compression(compression.into());
+ }
+ builder.build()
+ }
+ OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
+ <$exporter>::builder()
+ .with_http()
+ .with_endpoint(&s.endpoint)
+ .with_headers(to_hashmap(s.headers.as_deref()))
+ .with_protocol(s.protocol.into())
+ .with_timeout(s.timeout)
+ .build()
}
-
- Ok(builder.build()?)
}
- OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
- let builder = LogExporter::builder()
- .with_http()
- .with_endpoint(endpoint)
- .with_headers(parse_headers_hashmap(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
+ }};
+}
- Ok(builder.build()?)
- }
+impl ResolvedOtlpSettings {
+ pub fn span_exporter(&self) -> Result<SpanExporter, Error> {
+ Ok(build_exporter!(SpanExporter, self)?)
}
-}
-fn metric_exporter(
- base: &OtlpBaseSettings,
- metric: &OtlpMetricsSettings,
-) -> ProviderResult<MetricExporter> {
- let endpoint = metric
- .endpoint
- .clone()
- .or_else(|| Some(format!("{}/v1/metrics", base.endpoint.as_ref()?)))
- .ok_or(ProviderError::EndpointUnset)?;
- let headers = metric.headers.as_deref().or(base.headers.as_deref());
- let timeout = Duration::from_millis(metric.timeout.or(base.timeout).unwrap_or(10000));
- let protocol = metric
- .protocol
- .or(base.protocol)
- .ok_or(ProviderError::UnsetProtocol)?;
-
- match protocol {
- OtlpProtocol::Grpc => {
- let mut builder = MetricExporter::builder()
- .with_tonic()
- .with_endpoint(endpoint)
- .with_metadata(parse_headers_metadata_map(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
- let compression = metric.compression.or(base.compression);
- if let Some(compression) = compression {
- builder = builder.with_compression(compression.into());
- }
-
- Ok(builder.build()?)
- }
- OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
- let builder = MetricExporter::builder()
- .with_http()
- .with_endpoint(endpoint)
- .with_headers(parse_headers_hashmap(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
-
- Ok(builder.build()?)
- }
+ pub fn log_exporter(&self) -> Result<LogExporter, Error> {
+ Ok(build_exporter!(LogExporter, self)?)
}
-}
-fn span_exporter(
- base: &OtlpBaseSettings,
- trace: &OtlpMetricsSettings,
-) -> ProviderResult<SpanExporter> {
- let endpoint = trace
- .endpoint
- .clone()
- .or_else(|| Some(format!("{}/v1/traces", base.endpoint.as_ref()?)))
- .ok_or(ProviderError::EndpointUnset)?;
- let headers = trace.headers.as_deref().or(base.headers.as_deref());
- let timeout = Duration::from_millis(trace.timeout.or(base.timeout).unwrap_or(10000));
- let protocol = trace
- .protocol
- .or(base.protocol)
- .ok_or(ProviderError::UnsetProtocol)?;
-
- match protocol {
- OtlpProtocol::Grpc => {
- let mut builder = SpanExporter::builder()
- .with_tonic()
- .with_endpoint(endpoint)
- .with_metadata(parse_headers_metadata_map(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
- let compression = trace.compression.or(base.compression);
- if let Some(compression) = compression {
- builder = builder.with_compression(compression.into());
- }
-
- Ok(builder.build()?)
- }
- OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
- let builder = SpanExporter::builder()
- .with_http()
- .with_endpoint(endpoint)
- .with_headers(parse_headers_hashmap(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
-
- Ok(builder.build()?)
- }
+ pub fn metric_exporter(&self) -> Result<MetricExporter, Error> {
+ Ok(build_exporter!(MetricExporter, self)?)
}
}