git.delta.rocks / jrsonnet / refs/commits / cf89cc01f4f6

difftreelog

feat extract otel autoconfig from pusher

wvtzsrnqYaroslav Bolyukin2025-09-18parent: #b6dd2cb.patch.diff
in: trunk

6 files changed

modifiedCargo.lockdiffbeforeafterboth
before · Cargo.lock
412 packageslockfile v4
modifiedcmds/fleet/Cargo.tomldiffbeforeafterboth
--- a/cmds/fleet/Cargo.toml
+++ b/cmds/fleet/Cargo.toml
@@ -46,6 +46,9 @@
 indicatif = { version = "0.18", optional = true }
 nom = "8.0.0"
 tracing-indicatif = { version = "0.3", optional = true }
+tracing-opentelemetry = "0.31.0"
+opentelemetry = "0.30.0"
+opentelemetry_sdk = "0.30.0"
 
 [features]
 default = []
modifiedcmds/fleet/src/main.rsdiffbeforeafterboth
--- a/cmds/fleet/src/main.rs
+++ b/cmds/fleet/src/main.rs
@@ -4,7 +4,7 @@
 // pub(crate) mod command;
 pub(crate) mod extra_args;
 
-use std::{ffi::OsString, process::ExitCode};
+use std::{env, ffi::OsString, process::ExitCode};
 
 use anyhow::{Result, bail};
 use clap::{CommandFactory, Parser};
@@ -27,7 +27,7 @@
 use tracing::{Instrument, error, info, info_span};
 #[cfg(feature = "indicatif")]
 use tracing_indicatif::IndicatifLayer;
-use tracing_subscriber::{fmt::format::Format, prelude::*, EnvFilter};
+use tracing_subscriber::{EnvFilter, fmt::format::Format, prelude::*};
 
 #[derive(Parser)]
 struct Prefetch {}
@@ -170,6 +170,9 @@
 		let sub = sub.with_writer(indicatif_layer.get_stderr_writer());
 		sub.with_filter(filter) // .without,
 	});
+
+	if env::var_os("FLEET_OTEL").is_some() {}
+
 	// #[cfg(feature = "indicatif")]
 	#[cfg(feature = "indicatif")]
 	let reg = reg.with(indicatif_layer);
addedcrates/opentelemetry-exporter-env/Cargo.tomldiffbeforeafterboth
--- /dev/null
+++ b/crates/opentelemetry-exporter-env/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "opentelemetry-exporter-env"
+version.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+clap = { workspace = true, optional = true }
+opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic", "gzip-tonic", "http-json"], optional = true }
+thiserror.workspace = true
+
+[features]
+default = ["clap", "otlp"]
+clap = ["dep:clap"]
+otlp = ["dep:opentelemetry-otlp"]
addedcrates/opentelemetry-exporter-env/src/lib.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/opentelemetry-exporter-env/src/lib.rs
@@ -0,0 +1,264 @@
+use std::collections::HashMap;
+use std::convert::Infallible;
+use std::env::{self, VarError};
+use std::ffi::OsString;
+use std::num::ParseIntError;
+use std::str::FromStr;
+use std::time::Duration;
+
+use clap::Parser;
+#[cfg(feature = "otlp")]
+use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
+#[cfg(feature = "otlp")]
+use opentelemetry_otlp::{
+	ExporterBuildError, LogExporter, MetricExporter, SpanExporter, WithExportConfig,
+	WithHttpConfig, WithTonicConfig,
+};
+
+#[cfg(feature = "otlp")]
+mod otlp;
+
+pub enum Error {
+	InvalidUtf8 {
+		env: &'static str,
+		value: OsString,
+	},
+	EnvParseError {
+		env: &'static str,
+		value: String,
+		error: &'static str,
+	},
+	EnvParseIntError {
+		env: &'static str,
+		value: String,
+		error: ParseIntError,
+	},
+}
+impl From<(&'static str, &'static str, String)> for Error {
+	fn from((env, error, value): (&'static str, &'static str, String)) -> Self {
+		Self::EnvParseError { env, value, error }
+	}
+}
+impl From<(&'static str, ParseIntError, String)> for Error {
+	fn from((env, error, value): (&'static str, ParseIntError, String)) -> Self {
+		Self::EnvParseIntError { env, value, error }
+	}
+}
+impl From<(&'static str, Infallible, String)> for Error {
+	fn from(_v: (&'static str, Infallible, String)) -> Self {
+		unreachable!()
+	}
+}
+
+fn load_env<T>(env: &'static str) -> Result<Option<T>, Error>
+where
+	T: FromStr,
+	Error: From<(&'static str, <T as FromStr>::Err, String)>,
+{
+	match env::var(env) {
+		Ok(v) => Ok(Some(T::from_str(&v).map_err(|err| (env, err, v))?)),
+		Err(VarError::NotPresent) => Ok(None),
+		Err(VarError::NotUnicode(value)) => Err(Error::InvalidUtf8 { env, value }),
+	}
+}
+
+macro_rules! impl_settings {
+	(
+	#[name($env_prefix:literal, $long_prefix:literal)]
+	struct $id:ident {
+		$(
+			$(#[doc = $doc:literal])*
+			#[name($env:literal, $long:literal)]
+			$(#[arg($($tt:tt)*)])?
+			$name:ident: $ty:ty,
+		)*
+	}) => {
+		#[derive(Parser)]
+		pub struct $id {
+			$(
+				$(#[doc = $doc])*
+				#[arg(long = concat!("otel-exporter-otlp-", $long_prefix, $long), env = concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env), $($($tt)*)?)]
+				pub $name: Option<$ty>,
+			)*
+		}
+		impl $id {
+			pub fn from_env() -> Result<Self, Error> {
+				Ok(Self {
+					$(
+						$name: load_env(concat!("OTEL_EXPORTER_OTLP_", $env_prefix, $env))?,
+					)*
+				})
+			}
+		}
+	}
+}
+macro_rules! impl_enum {
+	(enum $id:ident {
+		$(
+			#[name = $value:literal]
+			$var:ident,
+		)*
+	}) => {
+		#[derive(Clone, Copy)]
+		#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
+		pub enum $id {
+			$(
+				#[cfg_attr(feature = "clap", value(name = $value))]
+				$var,
+			)*
+		}
+		impl FromStr for $id {
+			type Err = &'static str;
+
+			fn from_str(s: &str) -> Result<Self, Self::Err> {
+				Ok(match s {
+					$(
+						$value => Self::$var,
+					)*
+					_ => return Err("unsupported value, supported are")
+				})
+			}
+		}
+	};
+}
+
+impl_enum! {
+	enum Compression {
+		#[name = "gzip"]
+		Gzip,
+		#[name = "zstd"]
+		Zstd,
+	}
+}
+#[cfg(feature = "otlp")]
+impl From<Compression> for opentelemetry_otlp::Compression {
+	fn from(value: Compression) -> Self {
+		match value {
+			Compression::Gzip => opentelemetry_otlp::Compression::Gzip,
+			Compression::Zstd => opentelemetry_otlp::Compression::Zstd,
+		}
+	}
+}
+
+impl_enum! {
+	enum OtlpProtocol {
+		#[name = "grpc"]
+		Grpc,
+		#[name = "http/protobuf"]
+		HttpProtobuf,
+		#[name = "http/json"]
+		HttpJson,
+	}
+}
+#[cfg(feature = "otlp")]
+impl From<OtlpProtocol> for opentelemetry_otlp::Protocol {
+	fn from(value: OtlpProtocol) -> Self {
+		match value {
+			OtlpProtocol::Grpc => opentelemetry_otlp::Protocol::Grpc,
+			OtlpProtocol::HttpProtobuf => opentelemetry_otlp::Protocol::HttpBinary,
+			OtlpProtocol::HttpJson => opentelemetry_otlp::Protocol::HttpJson,
+		}
+	}
+}
+
+impl_settings! {
+	#[name("", "")]
+	struct OtlpBaseSettings {
+		/// Specifies the OTLP transport compression to be used for all telemetry data.
+		#[name("COMPRESSION", "compression")]
+		#[arg(value_enum)]
+		compression: Compression,
+		/// A base endpoint URL for any signal type, with an optionally-specified port number. Helpful for when you’re sending more than one signal to the same endpoint and want one environment variable to control the endpoint.
+		#[name("ENDPOINT", "endpoint")]
+		endpoint: String,
+		/// A list of headers to apply to all outgoing data (traces, metrics, and logs).
+		#[name("HEADERS", "headers")]
+		headers: String,
+		/// Specifies the OTLP transport protocol to be used for all telemetry data.
+		#[name("PROTOCOL", "protocol")]
+		#[arg(value_enum)]
+		protocol: OtlpProtocol,
+		/// The timeout value for all outgoing data (traces, metrics, and logs) in milliseconds.
+		#[name("TIMEOUT", "timeout")]
+		timeout: u64,
+	}
+}
+impl_settings! {
+	#[name("LOGS_", "logs-")]
+	struct OtlpLogsSettings {
+		/// Specifies the OTLP transport compression to be used for log data.
+		#[name("COMPRESSION", "compression")]
+		#[arg(value_enum)]
+		compression: Compression,
+		/// Endpoint URL for log data only, with an optionally-specified port number. Typically ends with `v1/logs` when using OTLP/HTTP.
+		#[name("ENDPOINT", "endpoint")]
+		endpoint: String,
+		/// A list of headers to apply to all outgoing logs.
+		#[name("HEADERS", "headers")]
+		headers: String,
+		/// Specifies the OTLP transport protocol to be used for log data.
+		#[name("PROTOCOL", "protocol")]
+		#[arg(value_enum)]
+		protocol: OtlpProtocol,
+		/// The timeout value for all outgoing logs in milliseconds.
+		#[name("TIMEOUT", "timeout")]
+		timeout: u64,
+	}
+}
+impl_settings! {
+	#[name("METRICS_", "metrics-")]
+	struct OtlpMetricsSettings {
+		/// Specifies the OTLP transport compression to be used for metrics data.
+		#[name("COMPRESSION", "compression")]
+		#[arg(value_enum)]
+		compression: Compression,
+		/// Endpoint URL for metric data only, with an optionally-specified port number. Typically ends with `v1/metrics` when using OTLP/HTTP.
+		#[name("ENDPOINT", "endpoint")]
+		endpoint: String,
+		/// A list of headers to apply to all outgoing metrics.
+		#[name("HEADERS", "headers")]
+		headers: String,
+		/// Specifies the OTLP transport protocol to be used for metrics data.
+		#[name("PROTOCOL", "protocol")]
+		#[arg(value_enum)]
+		protocol: OtlpProtocol,
+		/// The timeout value for all outgoing metrics in milliseconds.
+		#[name("TIMEOUT", "timeout")]
+		timeout: u64,
+	}
+}
+
+impl_settings! {
+	#[name("TRACES_", "traces-")]
+	struct OtlpTracesSettings {
+		/// Specifies the OTLP transport compression to be used for trace data.
+		#[name("COMPRESSION", "compression")]
+		#[arg(value_enum)]
+		compression: Compression,
+		/// Endpoint URL for trace data only, with an optionally-specified port number. Typically ends with `v1/traces` when using OTLP/HTTP.
+		#[name("ENDPOINT", "endpoint")]
+		endpoint: String,
+		/// A list of headers to apply to all outgoing traces.
+		#[name("HEADERS", "headers")]
+		headers: String,
+		/// Specifies the OTLP transport protocol to be used for trace data.
+		#[name("PROTOCOL", "protocol")]
+		#[arg(value_enum)]
+		protocol: OtlpProtocol,
+		/// The timeout value for all outgoing traces in milliseconds.
+		#[name("TIMEOUT", "timeout")]
+		timeout: u64,
+	}
+}
+
+#[derive(thiserror::Error, Debug)]
+enum ProviderError {
+	#[error("protocol is not set")]
+	UnsetProtocol,
+	#[error("endpoint is not set")]
+	EndpointUnset,
+	#[cfg(feature = "otlp")]
+	#[error("failed to build exporter: {0}")]
+	Exporter(#[from] ExporterBuildError),
+}
+type ProviderResult<T, E = ProviderError> = Result<T, E>;
addedcrates/opentelemetry-exporter-env/src/otlp.rsdiffbeforeafterboth
--- /dev/null
+++ b/crates/opentelemetry-exporter-env/src/otlp.rs
@@ -0,0 +1,175 @@
+use std::collections::HashMap;
+use std::time::Duration;
+
+use opentelemetry_otlp::tonic_types::metadata::MetadataMap;
+use opentelemetry_otlp::{
+	LogExporter, MetricExporter, SpanExporter, WithExportConfig as _, WithHttpConfig as _,
+	WithTonicConfig as _,
+};
+
+use crate::{
+	OtlpBaseSettings, OtlpLogsSettings, OtlpMetricsSettings, OtlpProtocol, ProviderError,
+	ProviderResult,
+};
+
+fn parse_headers<'a>(
+	headers: &'a str,
+) -> std::iter::Map<std::str::Split<'a, char>, impl FnMut(&'a str) -> (&'a str, &'a str)> {
+	headers.split(',').map(|header| {
+		let mut parts = header.splitn(2, '=');
+		let key = parts.next().unwrap();
+		let value = parts.next().unwrap_or("");
+		(key, value)
+	})
+}
+
+fn parse_headers_metadata_map(headers: Option<&str>) -> MetadataMap {
+	headers
+		.map(|headers| {
+			MetadataMap::from_headers(
+				parse_headers(headers)
+					.map(|(key, value)| (key.parse().unwrap(), value.parse().unwrap()))
+					.collect(),
+			)
+		})
+		.unwrap_or_default()
+}
+fn parse_headers_hashmap(headers: Option<&str>) -> HashMap<String, String> {
+	headers
+		.map(|headers| {
+			parse_headers(headers)
+				.map(|(key, value)| (key.into(), value.into()))
+				.collect()
+		})
+		.unwrap_or_default()
+}
+
+fn logger_exporter(base: &OtlpBaseSettings, log: &OtlpLogsSettings) -> ProviderResult<LogExporter> {
+	let endpoint = log
+		.endpoint
+		.clone()
+		.or_else(|| Some(format!("{}/v1/logs", base.endpoint.as_ref()?)))
+		.ok_or(ProviderError::EndpointUnset)?;
+	let headers = log.headers.as_deref().or(base.headers.as_deref());
+	let timeout = Duration::from_millis(log.timeout.or(base.timeout).unwrap_or(10000));
+
+	let protocol = log
+		.protocol
+		.or(base.protocol)
+		.ok_or(ProviderError::UnsetProtocol)?;
+
+	match protocol {
+		OtlpProtocol::Grpc => {
+			let mut builder = LogExporter::builder()
+				.with_tonic()
+				.with_endpoint(endpoint)
+				.with_metadata(parse_headers_metadata_map(headers))
+				.with_protocol(protocol.into())
+				.with_timeout(timeout);
+			let compression = log.compression.or(base.compression);
+			if let Some(compression) = compression {
+				builder = builder.with_compression(compression.into());
+			}
+
+			Ok(builder.build()?)
+		}
+		OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
+			let builder = LogExporter::builder()
+				.with_http()
+				.with_endpoint(endpoint)
+				.with_headers(parse_headers_hashmap(headers))
+				.with_protocol(protocol.into())
+				.with_timeout(timeout);
+
+			Ok(builder.build()?)
+		}
+	}
+}
+fn metric_exporter(
+	base: &OtlpBaseSettings,
+	metric: &OtlpMetricsSettings,
+) -> ProviderResult<MetricExporter> {
+	let endpoint = metric
+		.endpoint
+		.clone()
+		.or_else(|| Some(format!("{}/v1/metrics", base.endpoint.as_ref()?)))
+		.ok_or(ProviderError::EndpointUnset)?;
+	let headers = metric.headers.as_deref().or(base.headers.as_deref());
+	let timeout = Duration::from_millis(metric.timeout.or(base.timeout).unwrap_or(10000));
+
+	let protocol = metric
+		.protocol
+		.or(base.protocol)
+		.ok_or(ProviderError::UnsetProtocol)?;
+
+	match protocol {
+		OtlpProtocol::Grpc => {
+			let mut builder = MetricExporter::builder()
+				.with_tonic()
+				.with_endpoint(endpoint)
+				.with_metadata(parse_headers_metadata_map(headers))
+				.with_protocol(protocol.into())
+				.with_timeout(timeout);
+			let compression = metric.compression.or(base.compression);
+			if let Some(compression) = compression {
+				builder = builder.with_compression(compression.into());
+			}
+
+			Ok(builder.build()?)
+		}
+		OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
+			let builder = MetricExporter::builder()
+				.with_http()
+				.with_endpoint(endpoint)
+				.with_headers(parse_headers_hashmap(headers))
+				.with_protocol(protocol.into())
+				.with_timeout(timeout);
+
+			Ok(builder.build()?)
+		}
+	}
+}
+fn span_exporter(
+	base: &OtlpBaseSettings,
+	trace: &OtlpMetricsSettings,
+) -> ProviderResult<SpanExporter> {
+	let endpoint = trace
+		.endpoint
+		.clone()
+		.or_else(|| Some(format!("{}/v1/traces", base.endpoint.as_ref()?)))
+		.ok_or(ProviderError::EndpointUnset)?;
+	let headers = trace.headers.as_deref().or(base.headers.as_deref());
+	let timeout = Duration::from_millis(trace.timeout.or(base.timeout).unwrap_or(10000));
+
+	let protocol = trace
+		.protocol
+		.or(base.protocol)
+		.ok_or(ProviderError::UnsetProtocol)?;
+
+	match protocol {
+		OtlpProtocol::Grpc => {
+			let mut builder = SpanExporter::builder()
+				.with_tonic()
+				.with_endpoint(endpoint)
+				.with_metadata(parse_headers_metadata_map(headers))
+				.with_protocol(protocol.into())
+				.with_timeout(timeout);
+			let compression = trace.compression.or(base.compression);
+			if let Some(compression) = compression {
+				builder = builder.with_compression(compression.into());
+			}
+
+			Ok(builder.build()?)
+		}
+		OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {
+			let builder = SpanExporter::builder()
+				.with_http()
+				.with_endpoint(endpoint)
+				.with_headers(parse_headers_hashmap(headers))
+				.with_protocol(protocol.into())
+				.with_timeout(timeout);
+
+			Ok(builder.build()?)
+		}
+	}
+}