git.delta.rocks / jrsonnet / refs/commits / dffcb6119923

difftreelog

feat otel-exporter-env

uktsurkpYaroslav Bolyukin2026-03-12parent: #d173b8d.patch.diff
in: trunk

2 files changed

modifiedcrates/opentelemetry-exporter-env/src/lib.rsdiffbeforeafterboth
--- a/crates/opentelemetry-exporter-env/src/lib.rs
+++ b/crates/opentelemetry-exporter-env/src/lib.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
 use std::convert::Infallible;
 use std::env::{self, VarError};
 use std::ffi::OsString;
@@ -6,42 +5,41 @@
 use std::str::FromStr;
 use std::time::Duration;
 
-use clap::Parser;
-#[cfg(feature = "otlp")]
-use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
 #[cfg(feature = "otlp")]
-use opentelemetry_otlp::{
-	ExporterBuildError, LogExporter, MetricExporter, SpanExporter, WithExportConfig,
-	WithHttpConfig, WithTonicConfig,
-};
-
-#[cfg(feature = "otlp")]
 mod otlp;
 
+#[derive(thiserror::Error, Debug)]
 pub enum Error {
+	#[error("environment variable {env} contains invalid UTF-8: {value:?}")]
 	InvalidUtf8 {
 		env: &'static str,
 		value: OsString,
 	},
-	EnvParseError {
+	#[error("environment variable {env}={value:?}: {error}")]
+	EnvParse {
 		env: &'static str,
 		value: String,
 		error: &'static str,
 	},
-	EnvParseIntError {
+	#[error("environment variable {env}={value:?}: {error}")]
+	EnvParseInt {
 		env: &'static str,
 		value: String,
 		error: ParseIntError,
 	},
+	#[cfg(feature = "otlp")]
+	#[error("failed to build exporter: {0}")]
+	Exporter(#[from] opentelemetry_otlp::ExporterBuildError),
 }
+
 impl From<(&'static str, &'static str, String)> for Error {
 	fn from((env, error, value): (&'static str, &'static str, String)) -> Self {
-		Self::EnvParseError { env, value, error }
+		Self::EnvParse { env, value, error }
 	}
 }
 impl From<(&'static str, ParseIntError, String)> for Error {
 	fn from((env, error, value): (&'static str, ParseIntError, String)) -> Self {
-		Self::EnvParseIntError { env, value, error }
+		Self::EnvParseInt { env, value, error }
 	}
 }
 impl From<(&'static str, Infallible, String)> for Error {
@@ -62,36 +60,6 @@
 	}
 }
 
-macro_rules! impl_settings {
-	(
-	#[name($env_prefix:literal, $long_prefix:literal)]
-	struct $id:ident {
-		$(
-			$(#[doc = $doc:literal])*
-			#[name($env:literal, $long:literal)]
-			$(#[arg($($tt:tt)*)])?
-			$name:ident: $ty:ty,
-		)*
-	}) => {
-		#[derive(Parser)]
-		pub struct $id {
-			$(
-				$(#[doc = $doc])*
-				#[arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env), $($($tt)*)?)]
-				pub $name: Option<$ty>,
-			)*
-		}
-		impl $id {
-			pub fn from_env() -> Result<Self, Error> {
-				Ok(Self {
-					$(
-						$name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?,
-					)*
-				})
-			}
-		}
-	}
-}
 macro_rules! impl_enum {
 	(enum $id:ident {
 		$(
@@ -115,7 +83,7 @@
 					$(
 						$value => Self::$var,
 					)*
-					_ => return Err("unsupported value, supported are")
+					_ => return Err("unsupported value")
 				})
 			}
 		}
@@ -123,6 +91,49 @@
 }
 
 impl_enum! {
+	enum ExporterKind {
+		#[name = "otlp"]
+		Otlp,
+		#[name = "none"]
+		None,
+	}
+}
+
+#[derive(Default)]
+#[cfg_attr(feature = "clap", derive(clap::Parser))]
+pub struct SignalExporterSettings {
+	/// Traces exporter to be used.
+	#[cfg_attr(feature = "clap", arg(long = "otel-traces-exporter", env = "OTEL_TRACES_EXPORTER", value_enum))]
+	pub traces: Option<ExporterKind>,
+	/// Metrics exporter to be used.
+	#[cfg_attr(feature = "clap", arg(long = "otel-metrics-exporter", env = "OTEL_METRICS_EXPORTER", value_enum))]
+	pub metrics: Option<ExporterKind>,
+	/// Logs exporter to be used.
+	#[cfg_attr(feature = "clap", arg(long = "otel-logs-exporter", env = "OTEL_LOGS_EXPORTER", value_enum))]
+	pub logs: Option<ExporterKind>,
+}
+
+impl SignalExporterSettings {
+	pub fn from_env() -> Result<Self, Error> {
+		Ok(Self {
+			traces: load_env("OTEL_TRACES_EXPORTER")?,
+			metrics: load_env("OTEL_METRICS_EXPORTER")?,
+			logs: load_env("OTEL_LOGS_EXPORTER")?,
+		})
+	}
+
+	pub fn traces_enabled(&self) -> bool {
+		!matches!(self.traces, Some(ExporterKind::None))
+	}
+	pub fn metrics_enabled(&self) -> bool {
+		!matches!(self.metrics, Some(ExporterKind::None))
+	}
+	pub fn logs_enabled(&self) -> bool {
+		!matches!(self.logs, Some(ExporterKind::None))
+	}
+}
+
+impl_enum! {
 	enum Compression {
 		#[name = "gzip"]
 		Gzip,
@@ -161,6 +172,53 @@
 	}
 }
 
+pub trait OtlpSignalSettings {
+	fn compression(&self) -> Option<Compression>;
+	fn endpoint(&self) -> Option<&str>;
+	fn headers(&self) -> Option<&str>;
+	fn protocol(&self) -> Option<OtlpProtocol>;
+	fn timeout(&self) -> Option<u64>;
+}
+
+macro_rules! impl_settings {
+	(
+	#[name($env_prefix:literal, $long_prefix:literal)]
+	struct $id:ident {
+		$(
+			$(#[doc = $doc:literal])*
+			#[name($env:literal, $long:literal)]
+			$(#[arg($($tt:tt)*)])?
+			$name:ident: $ty:ty,
+		)*
+	}) => {
+		#[derive(Default)]
+		#[cfg_attr(feature = "clap", derive(clap::Parser))]
+		pub struct $id {
+			$(
+				$(#[doc = $doc])*
+				#[cfg_attr(feature = "clap", arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env) $(, $($tt)*)?))]
+				pub $name: Option<$ty>,
+			)*
+		}
+		impl $id {
+			pub fn from_env() -> Result<Self, Error> {
+				Ok(Self {
+					$(
+						$name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?,
+					)*
+				})
+			}
+		}
+		impl OtlpSignalSettings for $id {
+			fn compression(&self) -> Option<Compression> { self.compression }
+			fn endpoint(&self) -> Option<&str> { self.endpoint.as_deref() }
+			fn headers(&self) -> Option<&str> { self.headers.as_deref() }
+			fn protocol(&self) -> Option<OtlpProtocol> { self.protocol }
+			fn timeout(&self) -> Option<u64> { self.timeout }
+		}
+	}
+}
+
 impl_settings! {
 	#[name("", "")]
 	struct OtlpBaseSettings {
@@ -168,7 +226,7 @@
 		#[name("COMPRESSION", "compression")]
 		#[arg(value_enum)]
 		compression: Compression,
-		/// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you’re sending more than one signal to the same endpoint and want one environment variable to control the endpoint.
+		/// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you're sending more than one signal to the same endpoint and want one environment variable to control the endpoint.
 		#[name("ENDPOINT", "endpoint")]
 		endpoint: String,
 		/// A list of headers to apply to all outgoing data (traces, metrics, and logs).
@@ -227,7 +285,6 @@
 		timeout: u64,
 	}
 }
-
 impl_settings! {
 	#[name("TRACES_", "traces-")]
 	struct OtlpTracesSettings {
@@ -251,14 +308,78 @@
 	}
 }
 
-#[derive(thiserror::Error, Debug)]
-enum ProviderError {
-	#[error("protocol is not set")]
-	UnsetProtocol,
-	#[error("endpoint is not set")]
-	EndpointUnset,
-	#[cfg(feature = "otlp")]
-	#[error("failed to build exporter: {0}")]
-	Exporter(#[from] ExporterBuildError),
+pub struct ResolvedOtlpSettings {
+	pub compression: Option<Compression>,
+	pub endpoint: String,
+	pub headers: Option<String>,
+	pub protocol: OtlpProtocol,
+	pub timeout: Duration,
 }
-type ProviderResult<T, E = ProviderError> = Result<T, E>;
+
+impl ResolvedOtlpSettings {
+	const DEFAULT_TIMEOUT_MS: u64 = 10000;
+	const DEFAULT_GRPC_ENDPOINT: &str = "http://localhost:4317";
+	const DEFAULT_HTTP_ENDPOINT: &str = "http://localhost:4318";
+
+	pub fn traces(
+		base: &impl OtlpSignalSettings,
+		signal: &impl OtlpSignalSettings,
+	) -> Result<Self, Error> {
+		Self::resolve(base, signal, "/v1/traces")
+	}
+
+	pub fn metrics(
+		base: &impl OtlpSignalSettings,
+		signal: &impl OtlpSignalSettings,
+	) -> Result<Self, Error> {
+		Self::resolve(base, signal, "/v1/metrics")
+	}
+
+	pub fn logs(
+		base: &impl OtlpSignalSettings,
+		signal: &impl OtlpSignalSettings,
+	) -> Result<Self, Error> {
+		Self::resolve(base, signal, "/v1/logs")
+	}
+
+	fn resolve(
+		base: &impl OtlpSignalSettings,
+		signal: &impl OtlpSignalSettings,
+		signal_path: &str,
+	) -> Result<Self, Error> {
+		let protocol = signal
+			.protocol()
+			.or_else(|| base.protocol())
+			.unwrap_or(OtlpProtocol::HttpProtobuf);
+
+		let endpoint = if let Some(ep) = signal.endpoint() {
+			ep.to_owned()
+		} else if let Some(ep) = base.endpoint() {
+			match protocol {
+				OtlpProtocol::Grpc => ep.to_owned(),
+				_ => format!("{ep}{signal_path}"),
+			}
+		} else {
+			match protocol {
+				OtlpProtocol::Grpc => Self::DEFAULT_GRPC_ENDPOINT.to_owned(),
+				_ => format!("{}{signal_path}", Self::DEFAULT_HTTP_ENDPOINT),
+			}
+		};
+
+		Ok(Self {
+			compression: signal.compression().or_else(|| base.compression()),
+			endpoint,
+			headers: signal
+				.headers()
+				.or_else(|| base.headers())
+				.map(str::to_owned),
+			protocol,
+			timeout: Duration::from_millis(
+				signal
+					.timeout()
+					.or_else(|| base.timeout())
+					.unwrap_or(Self::DEFAULT_TIMEOUT_MS),
+			),
+		})
+	}
+}
modifiedcrates/opentelemetry-exporter-env/src/otlp.rsdiffbeforeafterboth
1use std::collections::HashMap;1use std::collections::HashMap;
2use std::time::Duration;
32
4use opentelemetry_otlp::tonic_types::metadata::MetadataMap;3use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
5use opentelemetry_otlp::{4use opentelemetry_otlp::{
6 LogExporter, MetricExporter, SpanExporter, WithExportConfig as _, WithHttpConfig as _,5 LogExporter, MetricExporter, SpanExporter, WithExportConfig as _, WithHttpConfig as _,
7 WithTonicConfig as _,6 WithTonicConfig as _,
8};7};
98
10use crate::{9use crate::{Error, OtlpProtocol, ResolvedOtlpSettings};
11 OtlpBaseSettings, OtlpLogsSettings, OtlpMetricsSettings, OtlpProtocol, ProviderError,
12 ProviderResult,
13};
1410
15fn parse_headers<'a>(11fn parse_headers(headers: &str) -> impl Iterator<Item = (&str, &str)> {
16 headers: &'a str,
17) -> std::iter::Map<std::str::Split<'a, char>, impl FnMut(&'a str) -> (&'a str, &'a str)> {
18 headers.split(',').map(|header| {12 headers.split(',').map(|header| {
19 let mut parts = header.splitn(2, '=');13 let mut parts = header.splitn(2, '=');
20 let key = parts.next().unwrap();14 let key = parts.next().unwrap();
23 })17 })
24}18}
2519
26fn parse_headers_metadata_map(headers: Option<&str>) -> MetadataMap {20fn to_metadata_map(headers: Option<&str>) -> MetadataMap {
27 headers21 headers
28 .map(|headers| {22 .map(|headers| {
29 MetadataMap::from_headers(23 MetadataMap::from_headers(
35 .unwrap_or_default()29 .unwrap_or_default()
36}30}
31
37fn parse_headers_hashmap(headers: Option<&str>) -> HashMap<String, String> {32fn to_hashmap(headers: Option<&str>) -> HashMap<String, String> {
38 headers33 headers
39 .map(|headers| {34 .map(|headers| {
40 parse_headers(headers)35 parse_headers(headers)
44 .unwrap_or_default()39 .unwrap_or_default()
45}40}
4641
42macro_rules! build_exporter {
47fn logger_exporter(base: &OtlpBaseSettings, log: &OtlpLogsSettings) -> ProviderResult<LogExporter> {43 ($exporter:ty, $settings:expr) => {{
48 let endpoint = log44 let s: &ResolvedOtlpSettings = $settings;
49 .endpoint
50 .clone()
51 .or_else(|| Some(format!("{}/v1/logs", base.endpoint.as_ref()?)))
52 .ok_or(ProviderError::EndpointUnset)?;
53 let headers = log.headers.as_deref().or(base.headers.as_deref());
54 let timeout = Duration::from_millis(log.timeout.or(base.timeout).unwrap_or(10000));
55
56 let protocol = log
57 .protocol
58 .or(base.protocol)
59 .ok_or(ProviderError::UnsetProtocol)?;
60
61 match protocol {45 match s.protocol {
62 OtlpProtocol::Grpc => {46 OtlpProtocol::Grpc => {
63 let mut builder = LogExporter::builder()47 let mut builder = <$exporter>::builder()
64 .with_tonic()48 .with_tonic()
65 .with_endpoint(endpoint)49 .with_endpoint(&s.endpoint)
66 .with_metadata(parse_headers_metadata_map(headers))50 .with_metadata(to_metadata_map(s.headers.as_deref()))
67 .with_protocol(protocol.into())51 .with_protocol(s.protocol.into())
68 .with_timeout(timeout);52 .with_timeout(s.timeout);
69 let compression = log.compression.or(base.compression);
70 if let Some(compression) = compression {53 if let Some(compression) = s.compression {
71 builder = builder.with_compression(compression.into());54 builder = builder.with_compression(compression.into());
72 }55 }
73
74 Ok(builder.build()?)56 builder.build()
75 }57 }
76 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {58 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
77 let builder = LogExporter::builder()59 <$exporter>::builder()
78 .with_http()60 .with_http()
79 .with_endpoint(endpoint)61 .with_endpoint(&s.endpoint)
80 .with_headers(parse_headers_hashmap(headers))62 .with_headers(to_hashmap(s.headers.as_deref()))
81 .with_protocol(protocol.into())63 .with_protocol(s.protocol.into())
82 .with_timeout(timeout);64 .with_timeout(s.timeout)
83
84 Ok(builder.build()?)65 .build()
85 }66 }
86 }67 }
87}68 }};
69}
70
71impl ResolvedOtlpSettings {
88fn metric_exporter(72 pub fn span_exporter(&self) -> Result<SpanExporter, Error> {
89 base: &OtlpBaseSettings,73 Ok(build_exporter!(SpanExporter, self)?)
90 metric: &OtlpMetricsSettings,74 }
91) -> ProviderResult<MetricExporter> {75
92 let endpoint = metric
93 .endpoint
94 .clone()
95 .or_else(|| Some(format!("{}/v1/metrics", base.endpoint.as_ref()?)))
96 .ok_or(ProviderError::EndpointUnset)?;
97 let headers = metric.headers.as_deref().or(base.headers.as_deref());
98 let timeout = Duration::from_millis(metric.timeout.or(base.timeout).unwrap_or(10000));
99
100 let protocol = metric
101 .protocol
102 .or(base.protocol)
103 .ok_or(ProviderError::UnsetProtocol)?;
104
105 match protocol {
106 OtlpProtocol::Grpc => {
107 let mut builder = MetricExporter::builder()
108 .with_tonic()
109 .with_endpoint(endpoint)
110 .with_metadata(parse_headers_metadata_map(headers))
111 .with_protocol(protocol.into())
112 .with_timeout(timeout);
113 let compression = metric.compression.or(base.compression);76 pub fn log_exporter(&self) -> Result<LogExporter, Error> {
114 if let Some(compression) = compression {
115 builder = builder.with_compression(compression.into());
116 }
117
118 Ok(builder.build()?)77 Ok(build_exporter!(LogExporter, self)?)
119 }78 }
120 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {79
121 let builder = MetricExporter::builder()80 pub fn metric_exporter(&self) -> Result<MetricExporter, Error> {
122 .with_http()
123 .with_endpoint(endpoint)
124 .with_headers(parse_headers_hashmap(headers))
125 .with_protocol(protocol.into())
126 .with_timeout(timeout);
127
128 Ok(builder.build()?)81 Ok(build_exporter!(MetricExporter, self)?)
129 }82 }
130 }83}
131}
132fn span_exporter(
133 base: &OtlpBaseSettings,
134 trace: &OtlpMetricsSettings,
135) -> ProviderResult<SpanExporter> {
136 let endpoint = trace
137 .endpoint
138 .clone()
139 .or_else(|| Some(format!("{}/v1/traces", base.endpoint.as_ref()?)))
140 .ok_or(ProviderError::EndpointUnset)?;
141 let headers = trace.headers.as_deref().or(base.headers.as_deref());
142 let timeout = Duration::from_millis(trace.timeout.or(base.timeout).unwrap_or(10000));
143
144 let protocol = trace
145 .protocol
146 .or(base.protocol)
147 .ok_or(ProviderError::UnsetProtocol)?;
148
149 match protocol {
150 OtlpProtocol::Grpc => {
151 let mut builder = SpanExporter::builder()
152 .with_tonic()
153 .with_endpoint(endpoint)
154 .with_metadata(parse_headers_metadata_map(headers))
155 .with_protocol(protocol.into())
156 .with_timeout(timeout);
157 let compression = trace.compression.or(base.compression);
158 if let Some(compression) = compression {
159 builder = builder.with_compression(compression.into());
160 }
161
162 Ok(builder.build()?)
163 }
164 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
165 let builder = SpanExporter::builder()
166 .with_http()
167 .with_endpoint(endpoint)
168 .with_headers(parse_headers_hashmap(headers))
169 .with_protocol(protocol.into())
170 .with_timeout(timeout);
171
172 Ok(builder.build()?)
173 }
174 }
175}
17684