Skip to main content

AirLibrary/Tracing/
mod.rs

1//! # Distributed Tracing Module
2//!
3//! Provides distributed tracing support with trace ID generation, span
4//! collection, correlation ID propagation, trace export capabilities, and
5//! sampled tracing for performance.
6//!
7//! ## Responsibilities
8//!
9//! ### Trace Generation
10//! - Unique trace ID generation using UUID v4
11//! - Span ID generation for hierarchical tracing
12//! - Distributed trace parent-child relationships
13//! - Trace context propagation across service boundaries
14//!
15//! ### Span Management
16//! - Span lifecycle management (started, active, completed, failed)
17//! - Span attribute and event tracking
18//! - Duration measurement with microsecond precision
19//! - Automatic span cleanup with TTL
20//!
21//! ### Distributed Tracing Integration
22//! - W3C Trace Context format compatibility
23//! - Correlation ID propagation for request tracking
24//! - OpenTelemetry integration support
25//! - B3 header format support for Zipkin/Jaeger
26//!
27//! ### Sampled Tracing
28//! - Trace sampling to avoid performance degradation
29//! - Configurable sampling rates by endpoint
30//! - Head-based sampling for high-volume requests
31//! - Tail-based sampling candidate tracking
32//!
33//! ## Integration with Mountain
34//!
35//! Tracing coordinates with Wind services:
36//! - Distributed traces across all Element daemons
37//! - Wind services consume trace data for analysis
38//! Real-time trace visualization in Mountain UI
39//! - Cross-service latency and dependency mapping
40//!
41//! ## VSCode Debugging References
42//!
43//! Similar tracing patterns used in VSCode for:
44//! - Cross-process communication tracing
45//! - Extension host performance profiling
46//! - Language server protocol debugging
47//! - IPC message flow tracking
48//!
49//! Reference:
50//! vs/base/common/errors
51//!
52//! ## Performance Considerations
53//!
54//! - Trace sampling based on request volume and importance
55//! - Async span storage to avoid blocking service paths
56//! - Bounded span cache with automatic cleanup
57//! - Lock-free where possible for high-frequency operations
58//!
59//! # FUTURE Enhancements
60//!
61//! - `OPENTELEMETRY`: Full OpenTelemetry SDK integration
62//! - `SAMPLING`: Implement dynamic/tail-based sampling
63//! - `EXPORT`: OpenTelemetry Protocol (OTLP) export to Jaeger/Zipkin
64//! - `ANALYSIS`: Automatic anomaly detection in traces
65//! - `METRICS`: Trace-derived custom metrics
66//! ## Sensitive Data Handling
67//!
68//! Tracing automatically excludes sensitive data:
69//! - No request payloads in span attributes
70//! - No authentication tokens in trace headers
71//! - No user-identifiable information in spans
72//! - Error messages are sanitized before export
73
74use std::{collections::HashMap, sync::Arc};
75
76use serde::{Deserialize, Serialize};
77use tokio::sync::RwLock;
78
79use crate::{AirError, Result, dev_log};
80
81/// Trace ID generator and manager with sampling support
82#[derive(Debug, Clone)]
83pub struct TraceGenerator {
84	trace_spans:Arc<RwLock<HashMap<String, TraceSpan>>>,
85
86	sampling_config:Arc<RwLock<SamplingConfig>>,
87}
88
89/// Sampling configuration for trace generation
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct SamplingConfig {
92	/// Sample rate (0.0 to 1.0) - percentage of traces to collect
93	pub sample_rate:f64,
94
95	/// Minimum sample rate for critical operations
96	pub critical_sample_rate:f64,
97
98	/// Max spans per trace to prevent memory bloat
99	pub max_spans_per_trace:usize,
100
101	/// Trace TTL in milliseconds before cleanup
102	pub trace_ttl_ms:u64,
103}
104
105impl Default for SamplingConfig {
106	fn default() -> Self {
107		Self {
108			sample_rate:0.1, // 10% sampling
109
110			critical_sample_rate:1.0, // 100% for critical
111
112			max_spans_per_trace:1000,
113
114			trace_ttl_ms:3600000, // 1 hour
115		}
116	}
117}
118
119impl SamplingConfig {
120	/// Validate sampling configuration
121	pub fn validate(&self) -> Result<()> {
122		if self.sample_rate < 0.0 || self.sample_rate > 1.0 {
123			return Err(crate::AirError::Internal("sample_rate must be between 0.0 and 1.0".to_string()));
124		}
125
126		if self.critical_sample_rate < 0.0 || self.critical_sample_rate > 1.0 {
127			return Err(crate::AirError::Internal(
128				"critical_sample_rate must be between 0.0 and 1.0".to_string(),
129			));
130		}
131
132		if self.max_spans_per_trace == 0 {
133			return Err(crate::AirError::Internal(
134				"max_spans_per_trace must be greater than 0".to_string(),
135			));
136		}
137
138		if self.trace_ttl_ms == 0 {
139			return Err(crate::AirError::Internal("trace_ttl_ms must be greater than 0".to_string()));
140		}
141
142		Ok(())
143	}
144}
145
146/// A single span in a trace
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct TraceSpan {
149	pub span_id:String,
150
151	pub trace_id:String,
152
153	pub parent_span_id:Option<String>,
154
155	pub operation_name:String,
156
157	pub start_time:u64,
158
159	pub end_time:Option<u64>,
160
161	pub status:SpanStatus,
162
163	pub attributes:HashMap<String, String>,
164
165	pub events:Vec<SpanEvent>,
166
167	pub error:Option<String>,
168
169	pub duration_ms:Option<u64>,
170}
171
172/// Span status
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174pub enum SpanStatus {
175	Started,
176
177	Active,
178
179	Completed,
180
181	Failed,
182
183	Cancelled,
184}
185
186/// Event within a span
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct SpanEvent {
189	pub timestamp:u64,
190
191	pub name:String,
192
193	pub attributes:HashMap<String, String>,
194}
195
196/// Distributed trace metadata
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct TraceMetadata {
199	pub trace_id:String,
200
201	pub root_span_id:String,
202
203	pub total_spans:usize,
204
205	pub root_operation:String,
206
207	pub start_time:u64,
208
209	pub end_time:Option<u64>,
210
211	pub total_duration_ms:Option<u64>,
212
213	pub status:TraceStatus,
214}
215
216/// Trace status
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
218pub enum TraceStatus {
219	InProgress,
220
221	Completed,
222
223	Failed,
224
225	Cancelled,
226}
227
228/// Context propagation information
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct PropagationContext {
231	pub TraceId:String,
232
233	pub SpanId:String,
234
235	pub CorrelationId:String,
236
237	pub ParentSpanId:Option<String>,
238}
239
240impl TraceGenerator {
241	/// Create a new trace generator with default sampling
242	pub fn new() -> Self {
243		Self {
244			trace_spans:Arc::new(RwLock::new(HashMap::new())),
245
246			sampling_config:Arc::new(RwLock::new(SamplingConfig::default())),
247		}
248	}
249
250	/// Create a new trace generator with custom sampling
251	pub fn with_sampling(sampling_config:SamplingConfig) -> Result<Self> {
252		sampling_config
253			.validate()
254			.map_err(|e| AirError::Internal(format!("Invalid sampling config: {}", e)))?;
255
256		Ok(Self {
257			trace_spans:Arc::new(RwLock::new(HashMap::new())),
258			sampling_config:Arc::new(RwLock::new(sampling_config)),
259		})
260	}
261
262	/// Generate a new trace ID with panic recovery
263	pub fn generate_trace_id() -> String {
264		std::panic::catch_unwind(|| uuid::Uuid::new_v4().to_string()).unwrap_or_else(|e| {
265			dev_log!("air", "error: [Tracing] Panic in generate_trace_id, using fallback: {:?}", e);
266
267			format!("{:x}", rand::random::<u64>())
268		})
269	}
270
271	/// Generate a new span ID
272	pub fn generate_span_id() -> String {
273		std::panic::catch_unwind(|| uuid::Uuid::new_v4().to_string()).unwrap_or_else(|e| {
274			dev_log!("air", "error: [Tracing] Panic in generate_span_id, using fallback: {:?}", e);
275
276			format!("{:x}", rand::random::<u64>())
277		})
278	}
279
280	/// Determine if a trace should be sampled based on configuration
281	pub async fn should_sample(&self, is_critical:bool) -> bool {
282		let config = self.sampling_config.read().await;
283
284		let rate = if is_critical { config.critical_sample_rate } else { config.sample_rate };
285
286		rand::random::<f64>() < rate
287	}
288
289	/// Parse W3C Trace Context header
290	pub fn parse_trace_context(header:&str) -> Result<PropagationContext> {
291		let parts:Vec<&str> = header.split(';').collect();
292
293		let mut trace_id = String::new();
294
295		let mut parent_span_id = None;
296
297		for part in parts {
298			let kv:Vec<&str> = part.split('=').collect();
299
300			if kv.len() != 2 {
301				continue;
302			}
303
304			match kv[0].trim() {
305				"traceparent" => {
306					let trace_parent:Vec<&str> = kv[1].trim().split('-').collect();
307
308					if trace_parent.len() >= 2 {
309						trace_id = trace_parent[1].to_string();
310
311						if trace_parent.len() >= 3 {
312							parent_span_id = Some(trace_parent[2].to_string());
313						}
314					}
315				},
316
317				_ => {},
318			}
319		}
320
321		if trace_id.is_empty() {
322			return Err(AirError::Internal("Invalid trace context header".to_string()));
323		}
324
325		Ok(PropagationContext {
326			TraceId:trace_id,
327			SpanId:Self::generate_span_id(),
328			CorrelationId:crate::Utility::GenerateRequestId(),
329			ParentSpanId:parent_span_id,
330		})
331	}
332
333	/// Create a new trace span with optional sampling check
334	pub async fn create_span(
335		&self,
336
337		trace_id:String,
338
339		operation_name:impl Into<String>,
340
341		parent_span_id:Option<String>,
342
343		attributes:Option<HashMap<String, String>>,
344	) -> Result<TraceSpan> {
345		let span_id = Self::generate_span_id();
346
347		let operation_name = operation_name.into();
348
349		let span = TraceSpan {
350			span_id:span_id.clone(),
351
352			trace_id:trace_id.clone(),
353
354			parent_span_id:parent_span_id.clone(),
355
356			operation_name:operation_name.clone(),
357
358			start_time:crate::Utility::CurrentTimestamp(),
359
360			end_time:None,
361
362			status:SpanStatus::Started,
363
364			attributes:attributes.unwrap_or_default(),
365
366			events:Vec::new(),
367
368			error:None,
369
370			duration_ms:None,
371		};
372
373		let mut spans = self.trace_spans.write().await;
374
375		// Check trace span limit
376		let trace_span_count = spans.values().filter(|s| s.trace_id == trace_id).count();
377
378		let config = self.sampling_config.read().await;
379
380		if trace_span_count >= config.max_spans_per_trace {
381			dev_log!(
382				"air",
383				"warn: [Tracing] Trace {} exceeds max spans ({}), dropping span {}",
384				trace_id,
385				config.max_spans_per_trace,
386				span_id
387			);
388
389			return Err(AirError::Internal("Max spans per trace exceeded".to_string()));
390		}
391
392		spans.insert(span_id.clone(), span.clone());
393
394		Ok(span)
395	}
396
397	/// Add an event to a span
398	pub async fn add_span_event(
399		&self,
400
401		span_id:&str,
402
403		event_name:impl Into<String>,
404
405		attributes:HashMap<String, String>,
406	) -> Result<()> {
407		let event = SpanEvent {
408			timestamp:crate::Utility::CurrentTimestamp(),
409
410			name:event_name.into(),
411
412			attributes:self.sanitize_attributes(attributes),
413		};
414
415		let mut spans = self.trace_spans.write().await;
416
417		if let Some(span) = spans.get_mut(span_id) {
418			span.events.push(event);
419
420			Ok(())
421		} else {
422			Err(AirError::Internal(format!("Span not found: {}", span_id)))
423		}
424	}
425
426	/// Mark a span as active
427	pub async fn mark_span_active(&self, span_id:&str) -> Result<()> {
428		let mut spans = self.trace_spans.write().await;
429
430		if let Some(span) = spans.get_mut(span_id) {
431			span.status = SpanStatus::Active;
432
433			Ok(())
434		} else {
435			Err(AirError::Internal(format!("Span not found: {}", span_id)))
436		}
437	}
438
439	/// Complete a span with optional error
440	pub async fn complete_span(&self, span_id:&str, error:Option<String>) -> Result<u64> {
441		let Now = crate::Utility::CurrentTimestamp();
442
443		let mut spans = self.trace_spans.write().await;
444
445		if let Some(span) = spans.get_mut(span_id) {
446			span.end_time = Some(Now);
447
448			span.duration_ms = Some(Now.saturating_sub(span.start_time));
449
450			span.status = if error.is_some() { SpanStatus::Failed } else { SpanStatus::Completed };
451
452			span.error = error.map(|e| self.sanitize_error_message(&e));
453
454			Ok(span.duration_ms.unwrap_or(0))
455		} else {
456			Err(AirError::Internal(format!("Span not found: {}", span_id)))
457		}
458	}
459
460	/// Add attribute to a span
461	pub async fn add_span_attribute(&self, span_id:&str, key:String, value:String) -> Result<()> {
462		self.add_span_attributes(span_id, HashMap::from([(key, value)])).await
463	}
464
465	/// Add multiple attributes to a span
466	pub async fn add_span_attributes(&self, span_id:&str, attributes:HashMap<String, String>) -> Result<()> {
467		let sanitized = self.sanitize_attributes(attributes);
468
469		let mut spans = self.trace_spans.write().await;
470
471		if let Some(span) = spans.get_mut(span_id) {
472			for (key, value) in sanitized {
473				span.attributes.insert(key, value);
474			}
475
476			Ok(())
477		} else {
478			Err(AirError::Internal(format!("Span not found: {}", span_id)))
479		}
480	}
481
482	/// Get a span by ID
483	pub async fn get_span(&self, span_id:&str) -> Result<TraceSpan> {
484		let spans = self.trace_spans.read().await;
485
486		spans
487			.get(span_id)
488			.cloned()
489			.ok_or_else(|| AirError::Internal(format!("Span not found: {}", span_id)))
490	}
491
492	/// Get all spans for a trace
493	pub async fn get_trace_spans(&self, trace_id:&str) -> Result<Vec<TraceSpan>> {
494		let spans = self.trace_spans.read().await;
495
496		Ok(spans.values().filter(|span| span.trace_id == trace_id).cloned().collect())
497	}
498
499	/// Get trace metadata
500	pub async fn get_trace_metadata(&self, trace_id:&str) -> Result<TraceMetadata> {
501		let trace_spans = self.get_trace_spans(trace_id).await?;
502
503		if trace_spans.is_empty() {
504			return Err(AirError::Internal(format!("Trace not found: {}", trace_id)));
505		}
506
507		let root_span = trace_spans
508			.iter()
509			.find(|s| s.parent_span_id.is_none())
510			.ok_or_else(|| AirError::Internal("No root span found".to_string()))?;
511
512		let total_duration_ms = trace_spans.iter().filter_map(|s| s.duration_ms).max();
513
514		let status = if trace_spans.iter().any(|s| s.status == SpanStatus::Failed) {
515			TraceStatus::Failed
516		} else if trace_spans
517			.iter()
518			.all(|s| s.status == SpanStatus::Completed || s.status == SpanStatus::Failed)
519		{
520			TraceStatus::Completed
521		} else {
522			TraceStatus::InProgress
523		};
524
525		let end_time = trace_spans.iter().filter_map(|s| s.end_time).max();
526
527		Ok(TraceMetadata {
528			trace_id:trace_id.to_string(),
529			root_span_id:root_span.span_id.clone(),
530			total_spans:trace_spans.len(),
531			root_operation:root_span.operation_name.clone(),
532			start_time:root_span.start_time,
533			end_time,
534			total_duration_ms,
535			status,
536		})
537	}
538
539	/// Export trace in JSON format
540	pub async fn export_trace(&self, trace_id:&str) -> Result<String> {
541		let spans = self.get_trace_spans(trace_id).await?;
542
543		let metadata = self.get_trace_metadata(trace_id).await?;
544
545		let export = serde_json::json!({
546			"metadata": metadata,
547			"spans": spans,
548		});
549
550		serde_json::to_string_pretty(&export)
551			.map_err(|e| AirError::Serialization(format!("Failed to export trace: {}", e)))
552	}
553
554	/// Clean up old spans (older than specified milliseconds)
555	pub async fn cleanup_old_spans(&self, older_than_ms:Option<u64>) -> Result<usize> {
556		let Now = crate::Utility::CurrentTimestamp();
557
558		let ttl = older_than_ms.unwrap_or_else(|| {
559			tokio::task::block_in_place(|| {
560				tokio::runtime::Handle::current().block_on(async { self.sampling_config.read().await.trace_ttl_ms })
561			})
562		});
563
564		let mut spans = self.trace_spans.write().await;
565
566		let original_len = spans.len();
567
568		spans.retain(|_, span| span.end_time.map_or(true, |end| Now.saturating_sub(end) < ttl));
569
570		Ok(original_len.saturating_sub(spans.len()))
571	}
572
573	/// Get trace statistics
574	pub async fn get_statistics(&self) -> TraceStatistics {
575		let spans = self.trace_spans.read().await;
576
577		let total_traces = spans
578			.values()
579			.map(|s| s.trace_id.clone())
580			.collect::<std::collections::HashSet<_>>()
581			.len();
582
583		let completed_spans = spans.values().filter(|s| s.status == SpanStatus::Completed).count();
584
585		let failed_spans = spans.values().filter(|s| s.status == SpanStatus::Failed).count();
586
587		let in_progress_spans = spans
588			.values()
589			.filter(|s| s.status == SpanStatus::Started || s.status == SpanStatus::Active)
590			.count();
591
592		TraceStatistics {
593			total_traces:total_traces as u64,
594
595			total_spans:spans.len() as u64,
596
597			completed_spans:completed_spans as u64,
598
599			failed_spans:failed_spans as u64,
600
601			in_progress_spans:in_progress_spans as u64,
602		}
603	}
604
605	/// Sanitize attributes to remove sensitive data
606	fn sanitize_attributes(&self, mut attributes:HashMap<String, String>) -> HashMap<String, String> {
607		let sensitive_keys = vec![
608			"password",
609			"token",
610			"secret",
611			"api_key",
612			"authorization",
613			"credential",
614			"auth",
615			"private_key",
616			"session_token",
617		];
618
619		// Collect keys first to avoid borrowing issues
620		let attr_keys:Vec<String> = attributes.keys().cloned().collect();
621
622		for key in sensitive_keys {
623			let key_lower = key.to_lowercase();
624
625			for attr_key in &attr_keys {
626				if attr_key.to_lowercase().contains(&key_lower) {
627					attributes.insert(attr_key.clone(), "[REDACTED]".to_string());
628				}
629			}
630		}
631
632		attributes
633	}
634
635	/// Sanitize error messages to remove sensitive data
636	fn sanitize_error_message(&self, message:&str) -> String {
637		let mut sanitized = message.to_string();
638
639		let patterns = vec![
640			(r"(?i)password[=:]\S+", "password=[REDACTED]"),
641			(r"(?i)token[=:]\S+", "token=[REDACTED]"),
642			(r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
643			(r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
644			(
645				r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
646				"Authorization: Bearer [REDACTED]",
647			),
648		];
649
650		for (pattern, replacement) in patterns {
651			if let Ok(re) = regex::Regex::new(pattern) {
652				sanitized = re.replace_all(&sanitized, replacement).to_string();
653			}
654		}
655
656		sanitized
657	}
658}
659
660/// Trace statistics for monitoring
661#[derive(Debug, Clone, Serialize, Deserialize)]
662pub struct TraceStatistics {
663	pub total_traces:u64,
664
665	pub total_spans:u64,
666
667	pub completed_spans:u64,
668
669	pub failed_spans:u64,
670
671	pub in_progress_spans:u64,
672}
673
674impl Default for TraceGenerator {
675	fn default() -> Self { Self::new() }
676}
677
678/// Global trace generator instance
679static TRACE_GENERATOR:std::sync::OnceLock<TraceGenerator> = std::sync::OnceLock::new();
680
681/// Get or initialize the global trace generator
682pub fn get_trace_generator() -> &'static TraceGenerator { TRACE_GENERATOR.get_or_init(TraceGenerator::new) }
683
684/// Initialize the global trace generator with custom sampling
685pub fn initialize_tracing(sampling_config:Option<SamplingConfig>) -> Result<()> {
686	let generator = if let Some(config) = sampling_config {
687		TraceGenerator::with_sampling(config)?
688	} else {
689		TraceGenerator::new()
690	};
691
692	let _old = TRACE_GENERATOR.set(generator);
693
694	dev_log!("air", "[Tracing] Trace generator initialized with tracing");
695
696	Ok(())
697}
698
699thread_local! {
700
701	static PROPAGATION_CONTEXT: std::cell::RefCell<Option<PropagationContext>> = std::cell::RefCell::new(None);
702}
703
704/// Set the propagation context for the current thread
705pub fn set_propagation_context(context:PropagationContext) {
706	PROPAGATION_CONTEXT.with(|ctx| {
707		*ctx.borrow_mut() = Some(context);
708	});
709}
710
711/// Get the current propagation context
712pub fn get_propagation_context() -> Option<PropagationContext> { PROPAGATION_CONTEXT.with(|ctx| ctx.borrow().clone()) }
713
714/// Create a propagation context from a trace span
715pub async fn create_propagation_context(TraceId:String, ParentSpanId:Option<String>) -> PropagationContext {
716	let SpanId = TraceGenerator::generate_span_id();
717
718	let CorrelationId = crate::Utility::GenerateRequestId();
719
720	PropagationContext { TraceId, SpanId, CorrelationId, ParentSpanId }
721}
722
723/// Create a W3C trace context header from propagation context
724pub fn create_trace_context_header(context:&PropagationContext) -> String {
725	format!("traceparent=00-{}-{}-01", context.TraceId, context.SpanId)
726}