difftreelog
feat otel-exporter-env
in: trunk
2 files changed
crates/opentelemetry-exporter-env/src/lib.rsdiffbeforeafterboth1use std::collections::HashMap;2use std::convert::Infallible;3use std::env::{self, VarError};4use std::ffi::OsString;5use std::num::ParseIntError;6use std::str::FromStr;7use std::time::Duration;89use clap::Parser;10#[cfg(feature = "otlp")]11use opentelemetry_otlp::tonic_types::metadata::MetadataMap;12#[cfg(feature = "otlp")]13use opentelemetry_otlp::{14 ExporterBuildError, LogExporter, MetricExporter, SpanExporter, WithExportConfig,15 WithHttpConfig, WithTonicConfig,16};1718#[cfg(feature = "otlp")]19mod otlp;2021pub enum Error {22 InvalidUtf8 {23 env: &'static str,24 value: OsString,25 },26 EnvParseError {27 env: &'static str,28 value: String,29 error: &'static str,30 },31 EnvParseIntError {32 env: &'static str,33 value: String,34 error: ParseIntError,35 },36}37impl From<(&'static str, &'static str, String)> for Error {38 fn from((env, error, value): (&'static str, &'static str, String)) -> Self {39 Self::EnvParseError { env, value, error }40 }41}42impl From<(&'static str, ParseIntError, String)> for Error {43 fn from((env, error, value): (&'static str, ParseIntError, String)) -> Self {44 Self::EnvParseIntError { env, value, error }45 }46}47impl From<(&'static str, Infallible, String)> for Error {48 fn from(_v: (&'static str, Infallible, String)) -> Self {49 unreachable!()50 }51}5253fn load_env<T>(env: &'static str) -> Result<Option<T>, Error>54where55 T: FromStr,56 Error: From<(&'static str, <T as FromStr>::Err, String)>,57{58 match env::var(env) {59 Ok(v) => Ok(Some(T::from_str(&v).map_err(|err| (env, err, v))?)),60 Err(VarError::NotPresent) => Ok(None),61 Err(VarError::NotUnicode(value)) => Err(Error::InvalidUtf8 { env, value }),62 }63}6465macro_rules! impl_settings {66 (67 #[name($env_prefix:literal, $long_prefix:literal)]68 struct $id:ident {69 $(70 $(#[doc = $doc:literal])*71 #[name($env:literal, $long:literal)]72 $(#[arg($($tt:tt)*)])?73 $name:ident: $ty:ty,74 )*75 }) => {76 #[derive(Parser)]77 pub struct $id {78 $(79 $(#[doc = $doc])*80 #[arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env), $($($tt)*)?)]81 pub $name: Option<$ty>,82 )*83 }84 impl $id {85 pub fn from_env() -> Result<Self, Error> {86 Ok(Self {87 $(88 $name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?,89 )*90 })91 }92 }93 }94}95macro_rules! impl_enum {96 (enum $id:ident {97 $(98 #[name = $value:literal]99 $var:ident,100 )*101 }) => {102 #[derive(Clone, Copy)]103 #[cfg_attr(feature = "clap", derive(clap::ValueEnum))]104 pub enum $id {105 $(106 #[cfg_attr(feature = "clap", value(name = $value))]107 $var,108 )*109 }110 impl FromStr for $id {111 type Err = &'static str;112113 fn from_str(s: &str) -> Result<Self, Self::Err> {114 Ok(match s {115 $(116 $value => Self::$var,117 )*118 _ => return Err("unsupported value, supported are")119 })120 }121 }122 };123}124125impl_enum! {126 enum Compression {127 #[name = "gzip"]128 Gzip,129 #[name = "zstd"]130 Zstd,131 }132}133#[cfg(feature = "otlp")]134impl From<Compression> for opentelemetry_otlp::Compression {135 fn from(value: Compression) -> Self {136 match value {137 Compression::Gzip => opentelemetry_otlp::Compression::Gzip,138 Compression::Zstd => opentelemetry_otlp::Compression::Zstd,139 }140 }141}142143impl_enum! {144 enum OtlpProtocol {145 #[name = "grpc"]146 Grpc,147 #[name = "http/protobuf"]148 HttpProtobuf,149 #[name = "http/json"]150 HttpJson,151 }152}153#[cfg(feature = "otlp")]154impl From<OtlpProtocol> for opentelemetry_otlp::Protocol {155 fn from(value: OtlpProtocol) -> Self {156 match value {157 OtlpProtocol::Grpc => opentelemetry_otlp::Protocol::Grpc,158 OtlpProtocol::HttpProtobuf => opentelemetry_otlp::Protocol::HttpBinary,159 OtlpProtocol::HttpJson => opentelemetry_otlp::Protocol::HttpJson,160 }161 }162}163164impl_settings! {165 #[name("", "")]166 struct OtlpBaseSettings {167 /// Specifies the OTLP transport compression to be used for all telemetry data.168 #[name("COMPRESSION", "compression")]169 #[arg(value_enum)]170 compression: Compression,171 /// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you’re sending more than one signal to the same endpoint and want one environment variable to control the endpoint.172 #[name("ENDPOINT", "endpoint")]173 endpoint: String,174 /// A list of headers to apply to all outgoing data (traces, metrics, and logs).175 #[name("HEADERS", "headers")]176 headers: String,177 /// Specifies the OTLP transport protocol to be used for all telemetry data.178 #[name("PROTOCOL", "protocol")]179 #[arg(value_enum)]180 protocol: OtlpProtocol,181 /// The timeout value for all outgoing data (traces, metrics, and logs) in milliseconds.182 #[name("TIMEOUT", "timeout")]183 timeout: u64,184 }185}186impl_settings! {187 #[name("LOGS_", "logs-")]188 struct OtlpLogsSettings {189 /// Specifies the OTLP transport compression to be used for log data.190 #[name("COMPRESSION", "compression")]191 #[arg(value_enum)]192 compression: Compression,193 /// Endpoint URL for log data only, with an optionally-specified port number. Typically ends with `v1/logs` when using OTLP/HTTP.194 #[name("ENDPOINT", "endpoint")]195 endpoint: String,196 /// A list of headers to apply to all outgoing logs.197 #[name("HEADERS", "headers")]198 headers: String,199 /// Specifies the OTLP transport protocol to be used for log data.200 #[name("PROTOCOL", "protocol")]201 #[arg(value_enum)]202 protocol: OtlpProtocol,203 /// The timeout value for all outgoing logs in milliseconds.204 #[name("TIMEOUT", "timeout")]205 timeout: u64,206 }207}208impl_settings! {209 #[name("METRICS_", "metrics-")]210 struct OtlpMetricsSettings {211 /// Specifies the OTLP transport compression to be used for metrics data.212 #[name("COMPRESSION", "compression")]213 #[arg(value_enum)]214 compression: Compression,215 /// Endpoint URL for metric data only, with an optionally-specified port number. Typically ends with `v1/metrics` when using OTLP/HTTP.216 #[name("ENDPOINT", "endpoint")]217 endpoint: String,218 /// A list of headers to apply to all outgoing metrics.219 #[name("HEADERS", "headers")]220 headers: String,221 /// Specifies the OTLP transport protocol to be used for metrics data.222 #[name("PROTOCOL", "protocol")]223 #[arg(value_enum)]224 protocol: OtlpProtocol,225 /// The timeout value for all outgoing metrics in milliseconds.226 #[name("TIMEOUT", "timeout")]227 timeout: u64,228 }229}230231impl_settings! {232 #[name("TRACES_", "traces-")]233 struct OtlpTracesSettings {234 /// Specifies the OTLP transport compression to be used for trace data.235 #[name("COMPRESSION", "compression")]236 #[arg(value_enum)]237 compression: Compression,238 /// Endpoint URL for trace data only, with an optionally-specified port number. Typically ends with `v1/traces` when using OTLP/HTTP.239 #[name("ENDPOINT", "endpoint")]240 endpoint: String,241 /// A list of headers to apply to all outgoing traces.242 #[name("HEADERS", "headers")]243 headers: String,244 /// Specifies the OTLP transport protocol to be used for trace data.245 #[name("PROTOCOL", "protocol")]246 #[arg(value_enum)]247 protocol: OtlpProtocol,248 /// The timeout value for all outgoing traces in milliseconds.249 #[name("TIMEOUT", "timeout")]250 timeout: u64,251 }252}253254#[derive(thiserror::Error, Debug)]255enum ProviderError {256 #[error("protocol is not set")]257 UnsetProtocol,258 #[error("endpoint is not set")]259 EndpointUnset,260 #[cfg(feature = "otlp")]261 #[error("failed to build exporter: {0}")]262 Exporter(#[from] ExporterBuildError),263}264type ProviderResult<T, E = ProviderError> = Result<T, E>;crates/opentelemetry-exporter-env/src/otlp.rsdiffbeforeafterboth--- a/crates/opentelemetry-exporter-env/src/otlp.rs
+++ b/crates/opentelemetry-exporter-env/src/otlp.rs
@@ -1,5 +1,4 @@
use std::collections::HashMap;
-use std::time::Duration;
use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
use opentelemetry_otlp::{
@@ -7,14 +6,9 @@
WithTonicConfig as _,
};
-use crate::{
- OtlpBaseSettings, OtlpLogsSettings, OtlpMetricsSettings, OtlpProtocol, ProviderError,
- ProviderResult,
-};
+use crate::{Error, OtlpProtocol, ResolvedOtlpSettings};
-fn parse_headers<'a>(
- headers: &'a str,
-) -> std::iter::Map<std::str::Split<'a, char>, impl FnMut(&'a str) -> (&'a str, &'a str)> {
+fn parse_headers(headers: &str) -> impl Iterator<Item = (&str, &str)> {
headers.split(',').map(|header| {
let mut parts = header.splitn(2, '=');
let key = parts.next().unwrap();
@@ -23,7 +17,7 @@
})
}
-fn parse_headers_metadata_map(headers: Option<&str>) -> MetadataMap {
+fn to_metadata_map(headers: Option<&str>) -> MetadataMap {
headers
.map(|headers| {
MetadataMap::from_headers(
@@ -34,7 +28,8 @@
})
.unwrap_or_default()
}
-fn parse_headers_hashmap(headers: Option<&str>) -> HashMap<String, String> {
+
+fn to_hashmap(headers: Option<&str>) -> HashMap<String, String> {
headers
.map(|headers| {
parse_headers(headers)
@@ -44,132 +39,45 @@
.unwrap_or_default()
}
-fn logger_exporter(base: &OtlpBaseSettings, log: &OtlpLogsSettings) -> ProviderResult<LogExporter> {
- let endpoint = log
- .endpoint
- .clone()
- .or_else(|| Some(format!("{}/v1/logs", base.endpoint.as_ref()?)))
- .ok_or(ProviderError::EndpointUnset)?;
- let headers = log.headers.as_deref().or(base.headers.as_deref());
- let timeout = Duration::from_millis(log.timeout.or(base.timeout).unwrap_or(10000));
-
- let protocol = log
- .protocol
- .or(base.protocol)
- .ok_or(ProviderError::UnsetProtocol)?;
-
- match protocol {
- OtlpProtocol::Grpc => {
- let mut builder = LogExporter::builder()
- .with_tonic()
- .with_endpoint(endpoint)
- .with_metadata(parse_headers_metadata_map(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
- let compression = log.compression.or(base.compression);
- if let Some(compression) = compression {
- builder = builder.with_compression(compression.into());
+macro_rules! build_exporter {
+ ($exporter:ty, $settings:expr) => {{
+ let s: &ResolvedOtlpSettings = $settings;
+ match s.protocol {
+ OtlpProtocol::Grpc => {
+ let mut builder = <$exporter>::builder()
+ .with_tonic()
+ .with_endpoint(&s.endpoint)
+ .with_metadata(to_metadata_map(s.headers.as_deref()))
+ .with_protocol(s.protocol.into())
+ .with_timeout(s.timeout);
+ if let Some(compression) = s.compression {
+ builder = builder.with_compression(compression.into());
+ }
+ builder.build()
+ }
+ OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
+ <$exporter>::builder()
+ .with_http()
+ .with_endpoint(&s.endpoint)
+ .with_headers(to_hashmap(s.headers.as_deref()))
+ .with_protocol(s.protocol.into())
+ .with_timeout(s.timeout)
+ .build()
}
-
- Ok(builder.build()?)
}
- OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
- let builder = LogExporter::builder()
- .with_http()
- .with_endpoint(endpoint)
- .with_headers(parse_headers_hashmap(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
+ }};
+}
- Ok(builder.build()?)
- }
+impl ResolvedOtlpSettings {
+ pub fn span_exporter(&self) -> Result<SpanExporter, Error> {
+ Ok(build_exporter!(SpanExporter, self)?)
}
-}
-fn metric_exporter(
- base: &OtlpBaseSettings,
- metric: &OtlpMetricsSettings,
-) -> ProviderResult<MetricExporter> {
- let endpoint = metric
- .endpoint
- .clone()
- .or_else(|| Some(format!("{}/v1/metrics", base.endpoint.as_ref()?)))
- .ok_or(ProviderError::EndpointUnset)?;
- let headers = metric.headers.as_deref().or(base.headers.as_deref());
- let timeout = Duration::from_millis(metric.timeout.or(base.timeout).unwrap_or(10000));
- let protocol = metric
- .protocol
- .or(base.protocol)
- .ok_or(ProviderError::UnsetProtocol)?;
-
- match protocol {
- OtlpProtocol::Grpc => {
- let mut builder = MetricExporter::builder()
- .with_tonic()
- .with_endpoint(endpoint)
- .with_metadata(parse_headers_metadata_map(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
- let compression = metric.compression.or(base.compression);
- if let Some(compression) = compression {
- builder = builder.with_compression(compression.into());
- }
-
- Ok(builder.build()?)
- }
- OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
- let builder = MetricExporter::builder()
- .with_http()
- .with_endpoint(endpoint)
- .with_headers(parse_headers_hashmap(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
-
- Ok(builder.build()?)
- }
+ pub fn log_exporter(&self) -> Result<LogExporter, Error> {
+ Ok(build_exporter!(LogExporter, self)?)
}
-}
-fn span_exporter(
- base: &OtlpBaseSettings,
- trace: &OtlpMetricsSettings,
-) -> ProviderResult<SpanExporter> {
- let endpoint = trace
- .endpoint
- .clone()
- .or_else(|| Some(format!("{}/v1/traces", base.endpoint.as_ref()?)))
- .ok_or(ProviderError::EndpointUnset)?;
- let headers = trace.headers.as_deref().or(base.headers.as_deref());
- let timeout = Duration::from_millis(trace.timeout.or(base.timeout).unwrap_or(10000));
- let protocol = trace
- .protocol
- .or(base.protocol)
- .ok_or(ProviderError::UnsetProtocol)?;
-
- match protocol {
- OtlpProtocol::Grpc => {
- let mut builder = SpanExporter::builder()
- .with_tonic()
- .with_endpoint(endpoint)
- .with_metadata(parse_headers_metadata_map(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
- let compression = trace.compression.or(base.compression);
- if let Some(compression) = compression {
- builder = builder.with_compression(compression.into());
- }
-
- Ok(builder.build()?)
- }
- OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
- let builder = SpanExporter::builder()
- .with_http()
- .with_endpoint(endpoint)
- .with_headers(parse_headers_hashmap(headers))
- .with_protocol(protocol.into())
- .with_timeout(timeout);
-
- Ok(builder.build()?)
- }
+ pub fn metric_exporter(&self) -> Result<MetricExporter, Error> {
+ Ok(build_exporter!(MetricExporter, self)?)
}
}