difftreelog
feat otel-exporter-env
in: trunk
2 files changed
crates/opentelemetry-exporter-env/src/lib.rsdiffbeforeafterboth--- a/crates/opentelemetry-exporter-env/src/lib.rs
+++ b/crates/opentelemetry-exporter-env/src/lib.rs
@@ -1,4 +1,3 @@
-use std::collections::HashMap;
use std::convert::Infallible;
use std::env::{self, VarError};
use std::ffi::OsString;
@@ -6,42 +5,41 @@
use std::str::FromStr;
use std::time::Duration;
-use clap::Parser;
-#[cfg(feature = "otlp")]
-use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
#[cfg(feature = "otlp")]
-use opentelemetry_otlp::{
- ExporterBuildError, LogExporter, MetricExporter, SpanExporter, WithExportConfig,
- WithHttpConfig, WithTonicConfig,
-};
-
-#[cfg(feature = "otlp")]
mod otlp;
+#[derive(thiserror::Error, Debug)]
pub enum Error {
+ #[error("environment variable {env} contains invalid UTF-8: {value:?}")]
InvalidUtf8 {
env: &'static str,
value: OsString,
},
- EnvParseError {
+ #[error("environment variable {env}={value:?}: {error}")]
+ EnvParse {
env: &'static str,
value: String,
error: &'static str,
},
- EnvParseIntError {
+ #[error("environment variable {env}={value:?}: {error}")]
+ EnvParseInt {
env: &'static str,
value: String,
error: ParseIntError,
},
+ #[cfg(feature = "otlp")]
+ #[error("failed to build exporter: {0}")]
+ Exporter(#[from] opentelemetry_otlp::ExporterBuildError),
}
+
impl From<(&'static str, &'static str, String)> for Error {
fn from((env, error, value): (&'static str, &'static str, String)) -> Self {
- Self::EnvParseError { env, value, error }
+ Self::EnvParse { env, value, error }
}
}
impl From<(&'static str, ParseIntError, String)> for Error {
fn from((env, error, value): (&'static str, ParseIntError, String)) -> Self {
- Self::EnvParseIntError { env, value, error }
+ Self::EnvParseInt { env, value, error }
}
}
impl From<(&'static str, Infallible, String)> for Error {
@@ -62,36 +60,6 @@
}
}
-macro_rules! impl_settings {
- (
- #[name($env_prefix:literal, $long_prefix:literal)]
- struct $id:ident {
- $(
- $(#[doc = $doc:literal])*
- #[name($env:literal, $long:literal)]
- $(#[arg($($tt:tt)*)])?
- $name:ident: $ty:ty,
- )*
- }) => {
- #[derive(Parser)]
- pub struct $id {
- $(
- $(#[doc = $doc])*
- #[arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env), $($($tt)*)?)]
- pub $name: Option<$ty>,
- )*
- }
- impl $id {
- pub fn from_env() -> Result<Self, Error> {
- Ok(Self {
- $(
- $name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?,
- )*
- })
- }
- }
- }
-}
macro_rules! impl_enum {
(enum $id:ident {
$(
@@ -115,7 +83,7 @@
$(
$value => Self::$var,
)*
- _ => return Err("unsupported value, supported are")
+ _ => return Err("unsupported value")
})
}
}
@@ -123,6 +91,49 @@
}
impl_enum! {
+ enum ExporterKind {
+ #[name = "otlp"]
+ Otlp,
+ #[name = "none"]
+ None,
+ }
+}
+
+#[derive(Default)]
+#[cfg_attr(feature = "clap", derive(clap::Parser))]
+pub struct SignalExporterSettings {
+ /// Traces exporter to be used.
+ #[cfg_attr(feature = "clap", arg(long = "otel-traces-exporter", env = "OTEL_TRACES_EXPORTER", value_enum))]
+ pub traces: Option<ExporterKind>,
+ /// Metrics exporter to be used.
+ #[cfg_attr(feature = "clap", arg(long = "otel-metrics-exporter", env = "OTEL_METRICS_EXPORTER", value_enum))]
+ pub metrics: Option<ExporterKind>,
+ /// Logs exporter to be used.
+ #[cfg_attr(feature = "clap", arg(long = "otel-logs-exporter", env = "OTEL_LOGS_EXPORTER", value_enum))]
+ pub logs: Option<ExporterKind>,
+}
+
+impl SignalExporterSettings {
+ pub fn from_env() -> Result<Self, Error> {
+ Ok(Self {
+ traces: load_env("OTEL_TRACES_EXPORTER")?,
+ metrics: load_env("OTEL_METRICS_EXPORTER")?,
+ logs: load_env("OTEL_LOGS_EXPORTER")?,
+ })
+ }
+
+ pub fn traces_enabled(&self) -> bool {
+ !matches!(self.traces, Some(ExporterKind::None))
+ }
+ pub fn metrics_enabled(&self) -> bool {
+ !matches!(self.metrics, Some(ExporterKind::None))
+ }
+ pub fn logs_enabled(&self) -> bool {
+ !matches!(self.logs, Some(ExporterKind::None))
+ }
+}
+
+impl_enum! {
enum Compression {
#[name = "gzip"]
Gzip,
@@ -161,6 +172,53 @@
}
}
+pub trait OtlpSignalSettings {
+ fn compression(&self) -> Option<Compression>;
+ fn endpoint(&self) -> Option<&str>;
+ fn headers(&self) -> Option<&str>;
+ fn protocol(&self) -> Option<OtlpProtocol>;
+ fn timeout(&self) -> Option<u64>;
+}
+
+macro_rules! impl_settings {
+ (
+ #[name($env_prefix:literal, $long_prefix:literal)]
+ struct $id:ident {
+ $(
+ $(#[doc = $doc:literal])*
+ #[name($env:literal, $long:literal)]
+ $(#[arg($($tt:tt)*)])?
+ $name:ident: $ty:ty,
+ )*
+ }) => {
+ #[derive(Default)]
+ #[cfg_attr(feature = "clap", derive(clap::Parser))]
+ pub struct $id {
+ $(
+ $(#[doc = $doc])*
+ #[cfg_attr(feature = "clap", arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env) $(, $($tt)*)?))]
+ pub $name: Option<$ty>,
+ )*
+ }
+ impl $id {
+ pub fn from_env() -> Result<Self, Error> {
+ Ok(Self {
+ $(
+ $name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?,
+ )*
+ })
+ }
+ }
+ impl OtlpSignalSettings for $id {
+ fn compression(&self) -> Option<Compression> { self.compression }
+ fn endpoint(&self) -> Option<&str> { self.endpoint.as_deref() }
+ fn headers(&self) -> Option<&str> { self.headers.as_deref() }
+ fn protocol(&self) -> Option<OtlpProtocol> { self.protocol }
+ fn timeout(&self) -> Option<u64> { self.timeout }
+ }
+ }
+}
+
impl_settings! {
#[name("", "")]
struct OtlpBaseSettings {
@@ -168,7 +226,7 @@
#[name("COMPRESSION", "compression")]
#[arg(value_enum)]
compression: Compression,
- /// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you’re sending more than one signal to the same endpoint and want one environment variable to control the endpoint.
+ /// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you're sending more than one signal to the same endpoint and want one environment variable to control the endpoint.
#[name("ENDPOINT", "endpoint")]
endpoint: String,
/// A list of headers to apply to all outgoing data (traces, metrics, and logs).
@@ -227,7 +285,6 @@
timeout: u64,
}
}
-
impl_settings! {
#[name("TRACES_", "traces-")]
struct OtlpTracesSettings {
@@ -251,14 +308,78 @@
}
}
-#[derive(thiserror::Error, Debug)]
-enum ProviderError {
- #[error("protocol is not set")]
- UnsetProtocol,
- #[error("endpoint is not set")]
- EndpointUnset,
- #[cfg(feature = "otlp")]
- #[error("failed to build exporter: {0}")]
- Exporter(#[from] ExporterBuildError),
+pub struct ResolvedOtlpSettings {
+ pub compression: Option<Compression>,
+ pub endpoint: String,
+ pub headers: Option<String>,
+ pub protocol: OtlpProtocol,
+ pub timeout: Duration,
}
-type ProviderResult<T, E = ProviderError> = Result<T, E>;
+
+impl ResolvedOtlpSettings {
+ const DEFAULT_TIMEOUT_MS: u64 = 10000;
+ const DEFAULT_GRPC_ENDPOINT: &str = "http://localhost:4317";
+ const DEFAULT_HTTP_ENDPOINT: &str = "http://localhost:4318";
+
+ pub fn traces(
+ base: &impl OtlpSignalSettings,
+ signal: &impl OtlpSignalSettings,
+ ) -> Result<Self, Error> {
+ Self::resolve(base, signal, "/v1/traces")
+ }
+
+ pub fn metrics(
+ base: &impl OtlpSignalSettings,
+ signal: &impl OtlpSignalSettings,
+ ) -> Result<Self, Error> {
+ Self::resolve(base, signal, "/v1/metrics")
+ }
+
+ pub fn logs(
+ base: &impl OtlpSignalSettings,
+ signal: &impl OtlpSignalSettings,
+ ) -> Result<Self, Error> {
+ Self::resolve(base, signal, "/v1/logs")
+ }
+
+ fn resolve(
+ base: &impl OtlpSignalSettings,
+ signal: &impl OtlpSignalSettings,
+ signal_path: &str,
+ ) -> Result<Self, Error> {
+ let protocol = signal
+ .protocol()
+ .or_else(|| base.protocol())
+ .unwrap_or(OtlpProtocol::HttpProtobuf);
+
+ let endpoint = if let Some(ep) = signal.endpoint() {
+ ep.to_owned()
+ } else if let Some(ep) = base.endpoint() {
+ match protocol {
+ OtlpProtocol::Grpc => ep.to_owned(),
+ _ => format!("{ep}{signal_path}"),
+ }
+ } else {
+ match protocol {
+ OtlpProtocol::Grpc => Self::DEFAULT_GRPC_ENDPOINT.to_owned(),
+ _ => format!("{}{signal_path}", Self::DEFAULT_HTTP_ENDPOINT),
+ }
+ };
+
+ Ok(Self {
+ compression: signal.compression().or_else(|| base.compression()),
+ endpoint,
+ headers: signal
+ .headers()
+ .or_else(|| base.headers())
+ .map(str::to_owned),
+ protocol,
+ timeout: Duration::from_millis(
+ signal
+ .timeout()
+ .or_else(|| base.timeout())
+ .unwrap_or(Self::DEFAULT_TIMEOUT_MS),
+ ),
+ })
+ }
+}
crates/opentelemetry-exporter-env/src/otlp.rsdiffbeforeafterboth1use std::collections::HashMap;1use std::collections::HashMap;2use std::time::Duration;324use opentelemetry_otlp::tonic_types::metadata::MetadataMap;3use opentelemetry_otlp::tonic_types::metadata::MetadataMap;5use opentelemetry_otlp::{4use opentelemetry_otlp::{6 LogExporter, MetricExporter, SpanExporter, WithExportConfig as _, WithHttpConfig as _,5 LogExporter, MetricExporter, SpanExporter, WithExportConfig as _, WithHttpConfig as _,7 WithTonicConfig as _,6 WithTonicConfig as _,8};7};9810use crate::{9use crate::{Error, OtlpProtocol, ResolvedOtlpSettings};11 OtlpBaseSettings, OtlpLogsSettings, OtlpMetricsSettings, OtlpProtocol, ProviderError,12 ProviderResult,13};141015fn parse_headers<'a>(11fn parse_headers(headers: &str) -> impl Iterator<Item = (&str, &str)> {16 headers: &'a str,17) -> std::iter::Map<std::str::Split<'a, char>, impl FnMut(&'a str) -> (&'a str, &'a str)> {18 headers.split(',').map(|header| {12 headers.split(',').map(|header| {19 let mut parts = header.splitn(2, '=');13 let mut parts = header.splitn(2, '=');20 let key = parts.next().unwrap();14 let key = parts.next().unwrap();23 })17 })24}18}251926fn parse_headers_metadata_map(headers: Option<&str>) -> MetadataMap {20fn to_metadata_map(headers: Option<&str>) -> MetadataMap {27 headers21 headers28 .map(|headers| {22 .map(|headers| {29 MetadataMap::from_headers(23 MetadataMap::from_headers(35 .unwrap_or_default()29 .unwrap_or_default()36}30}3137fn parse_headers_hashmap(headers: Option<&str>) -> HashMap<String, String> {32fn to_hashmap(headers: Option<&str>) -> HashMap<String, String> {38 headers33 headers39 .map(|headers| {34 .map(|headers| {40 parse_headers(headers)35 parse_headers(headers)44 .unwrap_or_default()39 .unwrap_or_default()45}40}464142macro_rules! build_exporter {47fn logger_exporter(base: &OtlpBaseSettings, log: &OtlpLogsSettings) -> ProviderResult<LogExporter> {43 ($exporter:ty, $settings:expr) => {{48 let endpoint = log44 let s: &ResolvedOtlpSettings = $settings;49 .endpoint50 .clone()51 .or_else(|| Some(format!("{}/v1/logs", base.endpoint.as_ref()?)))52 .ok_or(ProviderError::EndpointUnset)?;53 let headers = log.headers.as_deref().or(base.headers.as_deref());54 let timeout = Duration::from_millis(log.timeout.or(base.timeout).unwrap_or(10000));5556 let protocol = log57 .protocol58 .or(base.protocol)59 .ok_or(ProviderError::UnsetProtocol)?;6061 match protocol {45 match s.protocol {62 OtlpProtocol::Grpc => {46 OtlpProtocol::Grpc => {63 let mut builder = LogExporter::builder()47 let mut builder = <$exporter>::builder()64 .with_tonic()48 .with_tonic()65 .with_endpoint(endpoint)49 .with_endpoint(&s.endpoint)66 .with_metadata(parse_headers_metadata_map(headers))50 .with_metadata(to_metadata_map(s.headers.as_deref()))67 .with_protocol(protocol.into())51 .with_protocol(s.protocol.into())68 .with_timeout(timeout);52 .with_timeout(s.timeout);69 let compression = log.compression.or(base.compression);70 if let Some(compression) = compression {53 if let Some(compression) = s.compression {71 builder = builder.with_compression(compression.into());54 builder = builder.with_compression(compression.into());72 }55 }7374 Ok(builder.build()?)56 builder.build()75 }57 }76 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {58 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {77 let builder = LogExporter::builder()59 <$exporter>::builder()78 .with_http()60 .with_http()79 .with_endpoint(endpoint)61 .with_endpoint(&s.endpoint)80 .with_headers(parse_headers_hashmap(headers))62 .with_headers(to_hashmap(s.headers.as_deref()))81 .with_protocol(protocol.into())63 .with_protocol(s.protocol.into())82 .with_timeout(timeout);64 .with_timeout(s.timeout)8384 Ok(builder.build()?)65 .build()85 }66 }86 }67 }87}68 }};69}7071impl ResolvedOtlpSettings {88fn metric_exporter(72 pub fn span_exporter(&self) -> Result<SpanExporter, Error> {89 base: &OtlpBaseSettings,73 Ok(build_exporter!(SpanExporter, self)?)90 metric: &OtlpMetricsSettings,74 }91) -> ProviderResult<MetricExporter> {7592 let endpoint = metric93 .endpoint94 .clone()95 .or_else(|| Some(format!("{}/v1/metrics", base.endpoint.as_ref()?)))96 .ok_or(ProviderError::EndpointUnset)?;97 let headers = metric.headers.as_deref().or(base.headers.as_deref());98 let timeout = Duration::from_millis(metric.timeout.or(base.timeout).unwrap_or(10000));99100 let protocol = metric101 .protocol102 .or(base.protocol)103 .ok_or(ProviderError::UnsetProtocol)?;104105 match protocol {106 OtlpProtocol::Grpc => {107 let mut builder = MetricExporter::builder()108 .with_tonic()109 .with_endpoint(endpoint)110 .with_metadata(parse_headers_metadata_map(headers))111 .with_protocol(protocol.into())112 .with_timeout(timeout);113 let compression = metric.compression.or(base.compression);76 pub fn log_exporter(&self) -> Result<LogExporter, Error> {114 if let Some(compression) = compression {115 builder = builder.with_compression(compression.into());116 }117118 Ok(builder.build()?)77 Ok(build_exporter!(LogExporter, self)?)119 }78 }120 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {79121 let builder = MetricExporter::builder()80 pub fn metric_exporter(&self) -> Result<MetricExporter, Error> {122 .with_http()123 .with_endpoint(endpoint)124 .with_headers(parse_headers_hashmap(headers))125 .with_protocol(protocol.into())126 .with_timeout(timeout);127128 Ok(builder.build()?)81 Ok(build_exporter!(MetricExporter, self)?)129 }82 }130 }83}131}132fn span_exporter(133 base: &OtlpBaseSettings,134 trace: &OtlpMetricsSettings,135) -> ProviderResult<SpanExporter> {136 let endpoint = trace137 .endpoint138 .clone()139 .or_else(|| Some(format!("{}/v1/traces", base.endpoint.as_ref()?)))140 .ok_or(ProviderError::EndpointUnset)?;141 let headers = trace.headers.as_deref().or(base.headers.as_deref());142 let timeout = Duration::from_millis(trace.timeout.or(base.timeout).unwrap_or(10000));143144 let protocol = trace145 .protocol146 .or(base.protocol)147 .ok_or(ProviderError::UnsetProtocol)?;148149 match protocol {150 OtlpProtocol::Grpc => {151 let mut builder = SpanExporter::builder()152 .with_tonic()153 .with_endpoint(endpoint)154 .with_metadata(parse_headers_metadata_map(headers))155 .with_protocol(protocol.into())156 .with_timeout(timeout);157 let compression = trace.compression.or(base.compression);158 if let Some(compression) = compression {159 builder = builder.with_compression(compression.into());160 }161162 Ok(builder.build()?)163 }164 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {165 let builder = SpanExporter::builder()166 .with_http()167 .with_endpoint(endpoint)168 .with_headers(parse_headers_hashmap(headers))169 .with_protocol(protocol.into())170 .with_timeout(timeout);171172 Ok(builder.build()?)173 }174 }175}17684