1use std::collections::HashMap;2use std::time::Duration;34use opentelemetry_otlp::tonic_types::metadata::MetadataMap;5use opentelemetry_otlp::{6 LogExporter, MetricExporter, SpanExporter, WithExportConfig as _, WithHttpConfig as _,7 WithTonicConfig as _,8};910use crate::{11 OtlpBaseSettings, OtlpLogsSettings, OtlpMetricsSettings, OtlpProtocol, ProviderError,12 ProviderResult,13};1415fn parse_headers<'a>(16 headers: &'a str,17) -> std::iter::Map<std::str::Split<'a, char>, impl FnMut(&'a str) -> (&'a str, &'a str)> {18 headers.split(',').map(|header| {19 let mut parts = header.splitn(2, '=');20 let key = parts.next().unwrap();21 let value = parts.next().unwrap_or("");22 (key, value)23 })24}2526fn parse_headers_metadata_map(headers: Option<&str>) -> MetadataMap {27 headers28 .map(|headers| {29 MetadataMap::from_headers(30 parse_headers(headers)31 .map(|(key, value)| (key.parse().unwrap(), value.parse().unwrap()))32 .collect(),33 )34 })35 .unwrap_or_default()36}37fn parse_headers_hashmap(headers: Option<&str>) -> HashMap<String, String> {38 headers39 .map(|headers| {40 parse_headers(headers)41 .map(|(key, value)| (key.into(), value.into()))42 .collect()43 })44 .unwrap_or_default()45}4647fn logger_exporter(base: &OtlpBaseSettings, log: &OtlpLogsSettings) -> ProviderResult<LogExporter> {48 let endpoint = log49 .endpoint50 .clone()51 .or_else(|| Some(format!("{}/v1/logs", base.endpoint.as_ref()?)))52 .ok_or(ProviderError::EndpointUnset)?;53 let headers = log.headers.as_deref().or(base.headers.as_deref());54 let timeout = Duration::from_millis(log.timeout.or(base.timeout).unwrap_or(10000));5556 let protocol = log57 .protocol58 .or(base.protocol)59 .ok_or(ProviderError::UnsetProtocol)?;6061 match protocol {62 OtlpProtocol::Grpc => {63 let mut builder = LogExporter::builder()64 .with_tonic()65 .with_endpoint(endpoint)66 .with_metadata(parse_headers_metadata_map(headers))67 .with_protocol(protocol.into())68 .with_timeout(timeout);69 let compression = log.compression.or(base.compression);70 if let Some(compression) = compression {71 builder = builder.with_compression(compression.into());72 }7374 Ok(builder.build()?)75 }76 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {77 let builder = LogExporter::builder()78 .with_http()79 .with_endpoint(endpoint)80 .with_headers(parse_headers_hashmap(headers))81 .with_protocol(protocol.into())82 .with_timeout(timeout);8384 Ok(builder.build()?)85 }86 }87}88fn metric_exporter(89 base: &OtlpBaseSettings,90 metric: &OtlpMetricsSettings,91) -> ProviderResult<MetricExporter> {92 let endpoint = metric93 .endpoint94 .clone()95 .or_else(|| Some(format!("{}/v1/metrics", base.endpoint.as_ref()?)))96 .ok_or(ProviderError::EndpointUnset)?;97 let headers = metric.headers.as_deref().or(base.headers.as_deref());98 let timeout = Duration::from_millis(metric.timeout.or(base.timeout).unwrap_or(10000));99100 let protocol = metric101 .protocol102 .or(base.protocol)103 .ok_or(ProviderError::UnsetProtocol)?;104105 match protocol {106 OtlpProtocol::Grpc => {107 let mut builder = MetricExporter::builder()108 .with_tonic()109 .with_endpoint(endpoint)110 .with_metadata(parse_headers_metadata_map(headers))111 .with_protocol(protocol.into())112 .with_timeout(timeout);113 let compression = metric.compression.or(base.compression);114 if let Some(compression) = compression {115 builder = builder.with_compression(compression.into());116 }117118 Ok(builder.build()?)119 }120 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {121 let builder = MetricExporter::builder()122 .with_http()123 .with_endpoint(endpoint)124 .with_headers(parse_headers_hashmap(headers))125 .with_protocol(protocol.into())126 .with_timeout(timeout);127128 Ok(builder.build()?)129 }130 }131}132fn span_exporter(133 base: &OtlpBaseSettings,134 trace: &OtlpMetricsSettings,135) -> ProviderResult<SpanExporter> {136 let endpoint = trace137 .endpoint138 .clone()139 .or_else(|| Some(format!("{}/v1/traces", base.endpoint.as_ref()?)))140 .ok_or(ProviderError::EndpointUnset)?;141 let headers = trace.headers.as_deref().or(base.headers.as_deref());142 let timeout = Duration::from_millis(trace.timeout.or(base.timeout).unwrap_or(10000));143144 let protocol = trace145 .protocol146 .or(base.protocol)147 .ok_or(ProviderError::UnsetProtocol)?;148149 match protocol {150 OtlpProtocol::Grpc => {151 let mut builder = SpanExporter::builder()152 .with_tonic()153 .with_endpoint(endpoint)154 .with_metadata(parse_headers_metadata_map(headers))155 .with_protocol(protocol.into())156 .with_timeout(timeout);157 let compression = trace.compression.or(base.compression);158 if let Some(compression) = compression {159 builder = builder.with_compression(compression.into());160 }161162 Ok(builder.build()?)163 }164 OtlpProtocol::HttpProtobuf | OtlpProtocol::HttpJson => {165 let builder = SpanExporter::builder()166 .with_http()167 .with_endpoint(endpoint)168 .with_headers(parse_headers_hashmap(headers))169 .with_protocol(protocol.into())170 .with_timeout(timeout);171172 Ok(builder.build()?)173 }174 }175}