1use std::{collections::HashMap, sync::Arc};
75
76use serde::{Deserialize, Serialize};
77use tokio::sync::RwLock;
78
79use crate::{AirError, Result, dev_log};
80
81#[derive(Debug, Clone)]
83pub struct TraceGenerator {
84 trace_spans:Arc<RwLock<HashMap<String, TraceSpan>>>,
85
86 sampling_config:Arc<RwLock<SamplingConfig>>,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct SamplingConfig {
92 pub sample_rate:f64,
94
95 pub critical_sample_rate:f64,
97
98 pub max_spans_per_trace:usize,
100
101 pub trace_ttl_ms:u64,
103}
104
105impl Default for SamplingConfig {
106 fn default() -> Self {
107 Self {
108 sample_rate:0.1, critical_sample_rate:1.0, max_spans_per_trace:1000,
113
114 trace_ttl_ms:3600000, }
116 }
117}
118
119impl SamplingConfig {
120 pub fn validate(&self) -> Result<()> {
122 if self.sample_rate < 0.0 || self.sample_rate > 1.0 {
123 return Err(crate::AirError::Internal("sample_rate must be between 0.0 and 1.0".to_string()));
124 }
125
126 if self.critical_sample_rate < 0.0 || self.critical_sample_rate > 1.0 {
127 return Err(crate::AirError::Internal(
128 "critical_sample_rate must be between 0.0 and 1.0".to_string(),
129 ));
130 }
131
132 if self.max_spans_per_trace == 0 {
133 return Err(crate::AirError::Internal(
134 "max_spans_per_trace must be greater than 0".to_string(),
135 ));
136 }
137
138 if self.trace_ttl_ms == 0 {
139 return Err(crate::AirError::Internal("trace_ttl_ms must be greater than 0".to_string()));
140 }
141
142 Ok(())
143 }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct TraceSpan {
149 pub span_id:String,
150
151 pub trace_id:String,
152
153 pub parent_span_id:Option<String>,
154
155 pub operation_name:String,
156
157 pub start_time:u64,
158
159 pub end_time:Option<u64>,
160
161 pub status:SpanStatus,
162
163 pub attributes:HashMap<String, String>,
164
165 pub events:Vec<SpanEvent>,
166
167 pub error:Option<String>,
168
169 pub duration_ms:Option<u64>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174pub enum SpanStatus {
175 Started,
176
177 Active,
178
179 Completed,
180
181 Failed,
182
183 Cancelled,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct SpanEvent {
189 pub timestamp:u64,
190
191 pub name:String,
192
193 pub attributes:HashMap<String, String>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct TraceMetadata {
199 pub trace_id:String,
200
201 pub root_span_id:String,
202
203 pub total_spans:usize,
204
205 pub root_operation:String,
206
207 pub start_time:u64,
208
209 pub end_time:Option<u64>,
210
211 pub total_duration_ms:Option<u64>,
212
213 pub status:TraceStatus,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
218pub enum TraceStatus {
219 InProgress,
220
221 Completed,
222
223 Failed,
224
225 Cancelled,
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct PropagationContext {
231 pub TraceId:String,
232
233 pub SpanId:String,
234
235 pub CorrelationId:String,
236
237 pub ParentSpanId:Option<String>,
238}
239
240impl TraceGenerator {
241 pub fn new() -> Self {
243 Self {
244 trace_spans:Arc::new(RwLock::new(HashMap::new())),
245
246 sampling_config:Arc::new(RwLock::new(SamplingConfig::default())),
247 }
248 }
249
250 pub fn with_sampling(sampling_config:SamplingConfig) -> Result<Self> {
252 sampling_config
253 .validate()
254 .map_err(|e| AirError::Internal(format!("Invalid sampling config: {}", e)))?;
255
256 Ok(Self {
257 trace_spans:Arc::new(RwLock::new(HashMap::new())),
258 sampling_config:Arc::new(RwLock::new(sampling_config)),
259 })
260 }
261
262 pub fn generate_trace_id() -> String {
264 std::panic::catch_unwind(|| uuid::Uuid::new_v4().to_string()).unwrap_or_else(|e| {
265 dev_log!("air", "error: [Tracing] Panic in generate_trace_id, using fallback: {:?}", e);
266
267 format!("{:x}", rand::random::<u64>())
268 })
269 }
270
271 pub fn generate_span_id() -> String {
273 std::panic::catch_unwind(|| uuid::Uuid::new_v4().to_string()).unwrap_or_else(|e| {
274 dev_log!("air", "error: [Tracing] Panic in generate_span_id, using fallback: {:?}", e);
275
276 format!("{:x}", rand::random::<u64>())
277 })
278 }
279
280 pub async fn should_sample(&self, is_critical:bool) -> bool {
282 let config = self.sampling_config.read().await;
283
284 let rate = if is_critical { config.critical_sample_rate } else { config.sample_rate };
285
286 rand::random::<f64>() < rate
287 }
288
289 pub fn parse_trace_context(header:&str) -> Result<PropagationContext> {
291 let parts:Vec<&str> = header.split(';').collect();
292
293 let mut trace_id = String::new();
294
295 let mut parent_span_id = None;
296
297 for part in parts {
298 let kv:Vec<&str> = part.split('=').collect();
299
300 if kv.len() != 2 {
301 continue;
302 }
303
304 match kv[0].trim() {
305 "traceparent" => {
306 let trace_parent:Vec<&str> = kv[1].trim().split('-').collect();
307
308 if trace_parent.len() >= 2 {
309 trace_id = trace_parent[1].to_string();
310
311 if trace_parent.len() >= 3 {
312 parent_span_id = Some(trace_parent[2].to_string());
313 }
314 }
315 },
316
317 _ => {},
318 }
319 }
320
321 if trace_id.is_empty() {
322 return Err(AirError::Internal("Invalid trace context header".to_string()));
323 }
324
325 Ok(PropagationContext {
326 TraceId:trace_id,
327 SpanId:Self::generate_span_id(),
328 CorrelationId:crate::Utility::GenerateRequestId(),
329 ParentSpanId:parent_span_id,
330 })
331 }
332
333 pub async fn create_span(
335 &self,
336
337 trace_id:String,
338
339 operation_name:impl Into<String>,
340
341 parent_span_id:Option<String>,
342
343 attributes:Option<HashMap<String, String>>,
344 ) -> Result<TraceSpan> {
345 let span_id = Self::generate_span_id();
346
347 let operation_name = operation_name.into();
348
349 let span = TraceSpan {
350 span_id:span_id.clone(),
351
352 trace_id:trace_id.clone(),
353
354 parent_span_id:parent_span_id.clone(),
355
356 operation_name:operation_name.clone(),
357
358 start_time:crate::Utility::CurrentTimestamp(),
359
360 end_time:None,
361
362 status:SpanStatus::Started,
363
364 attributes:attributes.unwrap_or_default(),
365
366 events:Vec::new(),
367
368 error:None,
369
370 duration_ms:None,
371 };
372
373 let mut spans = self.trace_spans.write().await;
374
375 let trace_span_count = spans.values().filter(|s| s.trace_id == trace_id).count();
377
378 let config = self.sampling_config.read().await;
379
380 if trace_span_count >= config.max_spans_per_trace {
381 dev_log!(
382 "air",
383 "warn: [Tracing] Trace {} exceeds max spans ({}), dropping span {}",
384 trace_id,
385 config.max_spans_per_trace,
386 span_id
387 );
388
389 return Err(AirError::Internal("Max spans per trace exceeded".to_string()));
390 }
391
392 spans.insert(span_id.clone(), span.clone());
393
394 Ok(span)
395 }
396
397 pub async fn add_span_event(
399 &self,
400
401 span_id:&str,
402
403 event_name:impl Into<String>,
404
405 attributes:HashMap<String, String>,
406 ) -> Result<()> {
407 let event = SpanEvent {
408 timestamp:crate::Utility::CurrentTimestamp(),
409
410 name:event_name.into(),
411
412 attributes:self.sanitize_attributes(attributes),
413 };
414
415 let mut spans = self.trace_spans.write().await;
416
417 if let Some(span) = spans.get_mut(span_id) {
418 span.events.push(event);
419
420 Ok(())
421 } else {
422 Err(AirError::Internal(format!("Span not found: {}", span_id)))
423 }
424 }
425
426 pub async fn mark_span_active(&self, span_id:&str) -> Result<()> {
428 let mut spans = self.trace_spans.write().await;
429
430 if let Some(span) = spans.get_mut(span_id) {
431 span.status = SpanStatus::Active;
432
433 Ok(())
434 } else {
435 Err(AirError::Internal(format!("Span not found: {}", span_id)))
436 }
437 }
438
439 pub async fn complete_span(&self, span_id:&str, error:Option<String>) -> Result<u64> {
441 let Now = crate::Utility::CurrentTimestamp();
442
443 let mut spans = self.trace_spans.write().await;
444
445 if let Some(span) = spans.get_mut(span_id) {
446 span.end_time = Some(Now);
447
448 span.duration_ms = Some(Now.saturating_sub(span.start_time));
449
450 span.status = if error.is_some() { SpanStatus::Failed } else { SpanStatus::Completed };
451
452 span.error = error.map(|e| self.sanitize_error_message(&e));
453
454 Ok(span.duration_ms.unwrap_or(0))
455 } else {
456 Err(AirError::Internal(format!("Span not found: {}", span_id)))
457 }
458 }
459
460 pub async fn add_span_attribute(&self, span_id:&str, key:String, value:String) -> Result<()> {
462 self.add_span_attributes(span_id, HashMap::from([(key, value)])).await
463 }
464
465 pub async fn add_span_attributes(&self, span_id:&str, attributes:HashMap<String, String>) -> Result<()> {
467 let sanitized = self.sanitize_attributes(attributes);
468
469 let mut spans = self.trace_spans.write().await;
470
471 if let Some(span) = spans.get_mut(span_id) {
472 for (key, value) in sanitized {
473 span.attributes.insert(key, value);
474 }
475
476 Ok(())
477 } else {
478 Err(AirError::Internal(format!("Span not found: {}", span_id)))
479 }
480 }
481
482 pub async fn get_span(&self, span_id:&str) -> Result<TraceSpan> {
484 let spans = self.trace_spans.read().await;
485
486 spans
487 .get(span_id)
488 .cloned()
489 .ok_or_else(|| AirError::Internal(format!("Span not found: {}", span_id)))
490 }
491
492 pub async fn get_trace_spans(&self, trace_id:&str) -> Result<Vec<TraceSpan>> {
494 let spans = self.trace_spans.read().await;
495
496 Ok(spans.values().filter(|span| span.trace_id == trace_id).cloned().collect())
497 }
498
499 pub async fn get_trace_metadata(&self, trace_id:&str) -> Result<TraceMetadata> {
501 let trace_spans = self.get_trace_spans(trace_id).await?;
502
503 if trace_spans.is_empty() {
504 return Err(AirError::Internal(format!("Trace not found: {}", trace_id)));
505 }
506
507 let root_span = trace_spans
508 .iter()
509 .find(|s| s.parent_span_id.is_none())
510 .ok_or_else(|| AirError::Internal("No root span found".to_string()))?;
511
512 let total_duration_ms = trace_spans.iter().filter_map(|s| s.duration_ms).max();
513
514 let status = if trace_spans.iter().any(|s| s.status == SpanStatus::Failed) {
515 TraceStatus::Failed
516 } else if trace_spans
517 .iter()
518 .all(|s| s.status == SpanStatus::Completed || s.status == SpanStatus::Failed)
519 {
520 TraceStatus::Completed
521 } else {
522 TraceStatus::InProgress
523 };
524
525 let end_time = trace_spans.iter().filter_map(|s| s.end_time).max();
526
527 Ok(TraceMetadata {
528 trace_id:trace_id.to_string(),
529 root_span_id:root_span.span_id.clone(),
530 total_spans:trace_spans.len(),
531 root_operation:root_span.operation_name.clone(),
532 start_time:root_span.start_time,
533 end_time,
534 total_duration_ms,
535 status,
536 })
537 }
538
539 pub async fn export_trace(&self, trace_id:&str) -> Result<String> {
541 let spans = self.get_trace_spans(trace_id).await?;
542
543 let metadata = self.get_trace_metadata(trace_id).await?;
544
545 let export = serde_json::json!({
546 "metadata": metadata,
547 "spans": spans,
548 });
549
550 serde_json::to_string_pretty(&export)
551 .map_err(|e| AirError::Serialization(format!("Failed to export trace: {}", e)))
552 }
553
554 pub async fn cleanup_old_spans(&self, older_than_ms:Option<u64>) -> Result<usize> {
556 let Now = crate::Utility::CurrentTimestamp();
557
558 let ttl = older_than_ms.unwrap_or_else(|| {
559 tokio::task::block_in_place(|| {
560 tokio::runtime::Handle::current().block_on(async { self.sampling_config.read().await.trace_ttl_ms })
561 })
562 });
563
564 let mut spans = self.trace_spans.write().await;
565
566 let original_len = spans.len();
567
568 spans.retain(|_, span| span.end_time.map_or(true, |end| Now.saturating_sub(end) < ttl));
569
570 Ok(original_len.saturating_sub(spans.len()))
571 }
572
573 pub async fn get_statistics(&self) -> TraceStatistics {
575 let spans = self.trace_spans.read().await;
576
577 let total_traces = spans
578 .values()
579 .map(|s| s.trace_id.clone())
580 .collect::<std::collections::HashSet<_>>()
581 .len();
582
583 let completed_spans = spans.values().filter(|s| s.status == SpanStatus::Completed).count();
584
585 let failed_spans = spans.values().filter(|s| s.status == SpanStatus::Failed).count();
586
587 let in_progress_spans = spans
588 .values()
589 .filter(|s| s.status == SpanStatus::Started || s.status == SpanStatus::Active)
590 .count();
591
592 TraceStatistics {
593 total_traces:total_traces as u64,
594
595 total_spans:spans.len() as u64,
596
597 completed_spans:completed_spans as u64,
598
599 failed_spans:failed_spans as u64,
600
601 in_progress_spans:in_progress_spans as u64,
602 }
603 }
604
605 fn sanitize_attributes(&self, mut attributes:HashMap<String, String>) -> HashMap<String, String> {
607 let sensitive_keys = vec![
608 "password",
609 "token",
610 "secret",
611 "api_key",
612 "authorization",
613 "credential",
614 "auth",
615 "private_key",
616 "session_token",
617 ];
618
619 let attr_keys:Vec<String> = attributes.keys().cloned().collect();
621
622 for key in sensitive_keys {
623 let key_lower = key.to_lowercase();
624
625 for attr_key in &attr_keys {
626 if attr_key.to_lowercase().contains(&key_lower) {
627 attributes.insert(attr_key.clone(), "[REDACTED]".to_string());
628 }
629 }
630 }
631
632 attributes
633 }
634
635 fn sanitize_error_message(&self, message:&str) -> String {
637 let mut sanitized = message.to_string();
638
639 let patterns = vec![
640 (r"(?i)password[=:]\S+", "password=[REDACTED]"),
641 (r"(?i)token[=:]\S+", "token=[REDACTED]"),
642 (r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
643 (r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
644 (
645 r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
646 "Authorization: Bearer [REDACTED]",
647 ),
648 ];
649
650 for (pattern, replacement) in patterns {
651 if let Ok(re) = regex::Regex::new(pattern) {
652 sanitized = re.replace_all(&sanitized, replacement).to_string();
653 }
654 }
655
656 sanitized
657 }
658}
659
660#[derive(Debug, Clone, Serialize, Deserialize)]
662pub struct TraceStatistics {
663 pub total_traces:u64,
664
665 pub total_spans:u64,
666
667 pub completed_spans:u64,
668
669 pub failed_spans:u64,
670
671 pub in_progress_spans:u64,
672}
673
674impl Default for TraceGenerator {
675 fn default() -> Self { Self::new() }
676}
677
678static TRACE_GENERATOR:std::sync::OnceLock<TraceGenerator> = std::sync::OnceLock::new();
680
681pub fn get_trace_generator() -> &'static TraceGenerator { TRACE_GENERATOR.get_or_init(TraceGenerator::new) }
683
684pub fn initialize_tracing(sampling_config:Option<SamplingConfig>) -> Result<()> {
686 let generator = if let Some(config) = sampling_config {
687 TraceGenerator::with_sampling(config)?
688 } else {
689 TraceGenerator::new()
690 };
691
692 let _old = TRACE_GENERATOR.set(generator);
693
694 dev_log!("air", "[Tracing] Trace generator initialized with tracing");
695
696 Ok(())
697}
698
699thread_local! {
700
701 static PROPAGATION_CONTEXT: std::cell::RefCell<Option<PropagationContext>> = std::cell::RefCell::new(None);
702}
703
704pub fn set_propagation_context(context:PropagationContext) {
706 PROPAGATION_CONTEXT.with(|ctx| {
707 *ctx.borrow_mut() = Some(context);
708 });
709}
710
711pub fn get_propagation_context() -> Option<PropagationContext> { PROPAGATION_CONTEXT.with(|ctx| ctx.borrow().clone()) }
713
714pub async fn create_propagation_context(TraceId:String, ParentSpanId:Option<String>) -> PropagationContext {
716 let SpanId = TraceGenerator::generate_span_id();
717
718 let CorrelationId = crate::Utility::GenerateRequestId();
719
720 PropagationContext { TraceId, SpanId, CorrelationId, ParentSpanId }
721}
722
723pub fn create_trace_context_header(context:&PropagationContext) -> String {
725 format!("traceparent=00-{}-{}-01", context.TraceId, context.SpanId)
726}