git.delta.rocks / jrsonnet / refs/commits / b00d46da7979

difftreelog

source

crates/opentelemetry-exporter-env/src/otlp.rs4.9 KiBsourcehistory
1use std::collections::HashMap;2use std::time::Duration;34use opentelemetry_otlp::tonic_types::metadata::MetadataMap;5use opentelemetry_otlp::{6	LogExporter, MetricExporter, SpanExporter, WithExportConfig as _, WithHttpConfig as _,7	WithTonicConfig as _,8};910use crate::{11	OtlpBaseSettings, OtlpLogsSettings, OtlpMetricsSettings, OtlpProtocol, ProviderError,12	ProviderResult,13};1415fn parse_headers<'a>(16	headers: &'a str,17) -> std::iter::Map<std::str::Split<'a, char>, impl FnMut(&'a str) -> (&'a str, &'a str)> {18	headers.split(',').map(|header| {19		let mut parts = header.splitn(2, '=');20		let key = parts.next().unwrap();21		let value = parts.next().unwrap_or("");22		(key, value)23	})24}2526fn parse_headers_metadata_map(headers: Option<&str>) -> MetadataMap {27	headers28		.map(|headers| {29			MetadataMap::from_headers(30				parse_headers(headers)31					.map(|(key, value)| (key.parse().unwrap(), value.parse().unwrap()))32					.collect(),33			)34		})35		.unwrap_or_default()36}37fn parse_headers_hashmap(headers: Option<&str>) -> HashMap<String, String> {38	headers39		.map(|headers| {40			parse_headers(headers)41				.map(|(key, value)| (key.into(), value.into()))42				.collect()43		})44		.unwrap_or_default()45}4647fn logger_exporter(base: &OtlpBaseSettings, log: &OtlpLogsSettings) -> ProviderResult<LogExporter> {48	let endpoint = log49		.endpoint50		.clone()51		.or_else(|| Some(format!("{}/v1/logs", base.endpoint.as_ref()?)))52		.ok_or(ProviderError::EndpointUnset)?;53	let headers = log.headers.as_deref().or(base.headers.as_deref());54	let timeout = Duration::from_millis(log.timeout.or(base.timeout).unwrap_or(10000));5556	let protocol = log57		.protocol58		.or(base.protocol)59		.ok_or(ProviderError::UnsetProtocol)?;6061	match protocol {62		OtlpProtocol::Grpc => {63			let mut builder = LogExporter::builder()64				.with_tonic()65				.with_endpoint(endpoint)66				.with_metadata(parse_headers_metadata_map(headers))67				.with_protocol(protocol.into())68				.with_timeout(timeout);69			let compression = log.compression.or(base.compression);70			if let Some(compression) = compression {71				builder = builder.with_compression(compression.into());72			}7374			Ok(builder.build()?)75		}76		OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {77			let builder = LogExporter::builder()78				.with_http()79				.with_endpoint(endpoint)80				.with_headers(parse_headers_hashmap(headers))81				.with_protocol(protocol.into())82				.with_timeout(timeout);8384			Ok(builder.build()?)85		}86	}87}88fn metric_exporter(89	base: &OtlpBaseSettings,90	metric: &OtlpMetricsSettings,91) -> ProviderResult<MetricExporter> {92	let endpoint = metric93		.endpoint94		.clone()95		.or_else(|| Some(format!("{}/v1/metrics", base.endpoint.as_ref()?)))96		.ok_or(ProviderError::EndpointUnset)?;97	let headers = metric.headers.as_deref().or(base.headers.as_deref());98	let timeout = Duration::from_millis(metric.timeout.or(base.timeout).unwrap_or(10000));99100	let protocol = metric101		.protocol102		.or(base.protocol)103		.ok_or(ProviderError::UnsetProtocol)?;104105	match protocol {106		OtlpProtocol::Grpc => {107			let mut builder = MetricExporter::builder()108				.with_tonic()109				.with_endpoint(endpoint)110				.with_metadata(parse_headers_metadata_map(headers))111				.with_protocol(protocol.into())112				.with_timeout(timeout);113			let compression = metric.compression.or(base.compression);114			if let Some(compression) = compression {115				builder = builder.with_compression(compression.into());116			}117118			Ok(builder.build()?)119		}120		OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {121			let builder = MetricExporter::builder()122				.with_http()123				.with_endpoint(endpoint)124				.with_headers(parse_headers_hashmap(headers))125				.with_protocol(protocol.into())126				.with_timeout(timeout);127128			Ok(builder.build()?)129		}130	}131}132fn span_exporter(133	base: &OtlpBaseSettings,134	trace: &OtlpMetricsSettings,135) -> ProviderResult<SpanExporter> {136	let endpoint = trace137		.endpoint138		.clone()139		.or_else(|| Some(format!("{}/v1/traces", base.endpoint.as_ref()?)))140		.ok_or(ProviderError::EndpointUnset)?;141	let headers = trace.headers.as_deref().or(base.headers.as_deref());142	let timeout = Duration::from_millis(trace.timeout.or(base.timeout).unwrap_or(10000));143144	let protocol = trace145		.protocol146		.or(base.protocol)147		.ok_or(ProviderError::UnsetProtocol)?;148149	match protocol {150		OtlpProtocol::Grpc => {151			let mut builder = SpanExporter::builder()152				.with_tonic()153				.with_endpoint(endpoint)154				.with_metadata(parse_headers_metadata_map(headers))155				.with_protocol(protocol.into())156				.with_timeout(timeout);157			let compression = trace.compression.or(base.compression);158			if let Some(compression) = compression {159				builder = builder.with_compression(compression.into());160			}161162			Ok(builder.build()?)163		}164		OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {165			let builder = SpanExporter::builder()166				.with_http()167				.with_endpoint(endpoint)168				.with_headers(parse_headers_hashmap(headers))169				.with_protocol(protocol.into())170				.with_timeout(timeout);171172			Ok(builder.build()?)173		}174	}175}