1use std::{
79 sync::{
80 Arc,
81 atomic::{AtomicI64, AtomicU64, Ordering},
82 },
83 time::{Duration, Instant},
84};
85
86use serde::{Deserialize, Serialize};
87
88use crate::{AirError, Result, dev_log};
89
90struct MetricGuard {
92 current:u64,
93
94 max:u64,
95}
96
97impl MetricGuard {
98 fn new(current:u64, max:u64) -> Self { Self { current, max } }
99
100 fn increment(&mut self) -> bool {
102 if self.current < self.max.saturating_sub(1) {
103 self.current += 1;
104
105 true
106 } else {
107 dev_log!("metrics", "warn: [Metrics] Metric overflow detected, wrapping around");
108
109 self.current = 0;
110
111 true
112 }
113 }
114}
115
116#[derive(Debug)]
118struct AggregationValidator {
119 last_timestamp:Instant,
120
121 validation_window:Duration,
122}
123
124impl AggregationValidator {
125 fn new(validation_window_secs:u64) -> Self {
126 Self {
127 last_timestamp:Instant::now(),
128
129 validation_window:Duration::from_secs(validation_window_secs),
130 }
131 }
132
133 fn validate(&mut self) -> std::result::Result<(), String> {
135 let now = Instant::now();
136
137 if now.duration_since(self.last_timestamp) > self.validation_window {
138 dev_log!("metrics", "warn: [Metrics] Aggregation outside validation window, resetting");
139
140 self.last_timestamp = now;
141
142 Ok(())
143 } else {
144 Ok(())
145 }
146 }
147}
148
149#[derive(Debug, Clone)]
152pub struct MetricsCollector {
153 requests_total:Arc<AtomicU64>,
155
156 requests_successful:Arc<AtomicU64>,
157
158 requests_failed:Arc<AtomicU64>,
159
160 request_latency_sum_ms:Arc<AtomicU64>,
161
162 request_latency_count:Arc<AtomicU64>,
163
164 request_latency_min_ms:Arc<AtomicU64>,
165
166 request_latency_max_ms:Arc<AtomicU64>,
167
168 errors_total:Arc<AtomicU64>,
170
171 errors_by_type:Arc<std::sync::Mutex<std::collections::HashMap<String, u64>>>,
172
173 memory_usage_bytes:Arc<AtomicI64>,
175
176 cpu_usage_percent:Arc<AtomicU64>,
177
178 active_connections:Arc<AtomicU64>,
179
180 threads_active:Arc<AtomicU64>,
181
182 authentication_operations:Arc<AtomicU64>,
184
185 authentication_failures:Arc<AtomicU64>,
186
187 downloads_total:Arc<AtomicU64>,
188
189 downloads_completed:Arc<AtomicU64>,
190
191 downloads_failed:Arc<AtomicU64>,
192
193 downloads_bytes_total:Arc<AtomicU64>,
194
195 indexing_operations:Arc<AtomicU64>,
196
197 indexing_entries:Arc<AtomicI64>,
198
199 updates_checked:Arc<AtomicU64>,
200
201 updates_applied:Arc<AtomicU64>,
202
203 aggregator:Arc<std::sync::Mutex<AggregationValidator>>,
205}
206
207impl MetricsCollector {
208 pub fn new() -> Result<Self> {
210 dev_log!("metrics", "[Metrics] MetricsCollector initialized successfully");
211
212 Ok(Self {
213 requests_total:Arc::new(AtomicU64::new(0)),
214 requests_successful:Arc::new(AtomicU64::new(0)),
215 requests_failed:Arc::new(AtomicU64::new(0)),
216 request_latency_sum_ms:Arc::new(AtomicU64::new(0)),
217 request_latency_count:Arc::new(AtomicU64::new(0)),
218 request_latency_min_ms:Arc::new(AtomicU64::new(u64::MAX)),
219 request_latency_max_ms:Arc::new(AtomicU64::new(0)),
220 errors_total:Arc::new(AtomicU64::new(0)),
221 errors_by_type:Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
222 memory_usage_bytes:Arc::new(AtomicI64::new(0)),
223 cpu_usage_percent:Arc::new(AtomicU64::new(0)),
224 active_connections:Arc::new(AtomicU64::new(0)),
225 threads_active:Arc::new(AtomicU64::new(0)),
226 authentication_operations:Arc::new(AtomicU64::new(0)),
227 authentication_failures:Arc::new(AtomicU64::new(0)),
228 downloads_total:Arc::new(AtomicU64::new(0)),
229 downloads_completed:Arc::new(AtomicU64::new(0)),
230 downloads_failed:Arc::new(AtomicU64::new(0)),
231 downloads_bytes_total:Arc::new(AtomicU64::new(0)),
232 indexing_operations:Arc::new(AtomicU64::new(0)),
233 indexing_entries:Arc::new(AtomicI64::new(0)),
234 updates_checked:Arc::new(AtomicU64::new(0)),
235 updates_applied:Arc::new(AtomicU64::new(0)),
236 aggregator:Arc::new(std::sync::Mutex::new(AggregationValidator::new(3600))),
237 })
238 }
239
240 pub fn ValidateAggregation(&self) -> Result<()> {
242 match self.aggregator.lock() {
243 Ok(mut validator) => validator.validate().map_err(|e| AirError::Internal(e)),
244
245 Err(_) => {
246 dev_log!("metrics", "warn: [Metrics] Failed to acquire aggregation validator lock");
247
248 Ok(())
249 },
250 }
251 }
252
253 pub fn RecordRequestSuccess(&self, LatencySeconds:f64) {
255 let _ = self.ValidateAggregation();
256
257 let LatencyMs = (LatencySeconds * 1000.0) as u64;
258
259 let _ = self.requests_total.fetch_add(1, Ordering::Relaxed);
261
262 let _ = self.requests_successful.fetch_add(1, Ordering::Relaxed);
263
264 let _ = self.request_latency_sum_ms.fetch_add(LatencyMs, Ordering::Relaxed);
266
267 let _ = self.request_latency_count.fetch_add(1, Ordering::Relaxed);
268
269 MinMaxUpdate(&self.request_latency_min_ms, &self.request_latency_max_ms, LatencyMs);
271
272 dev_log!(
273 "metrics",
274 "[Metrics] Recorded successful request with latency: {:.3}s",
275 LatencySeconds
276 );
277 }
278
279 pub fn RecordRequestFailure(&self, ErrorType:&str, LatencySeconds:f64) {
281 let _ = self.ValidateAggregation();
282
283 let LatencyMs = (LatencySeconds * 1000.0) as u64;
284
285 let _ = self.requests_total.fetch_add(1, Ordering::Relaxed);
287
288 let _ = self.requests_failed.fetch_add(1, Ordering::Relaxed);
289
290 let _ = self.errors_total.fetch_add(1, Ordering::Relaxed);
291
292 let _ = self.request_latency_sum_ms.fetch_add(LatencyMs, Ordering::Relaxed);
294
295 let _ = self.request_latency_count.fetch_add(1, Ordering::Relaxed);
296
297 MinMaxUpdate(&self.request_latency_min_ms, &self.request_latency_max_ms, LatencyMs);
299
300 let RedactedError = self.RedactErrorType(ErrorType);
302
303 let RedactedErrorClone = RedactedError.clone();
304
305 if let Ok(mut error_map) = self.errors_by_type.lock() {
306 *error_map.entry(RedactedError).or_insert(0) += 1;
307 }
308
309 dev_log!(
310 "metrics",
311 "[Metrics] Recorded failed request: {}, latency: {:.3}s",
312 RedactedErrorClone,
313 LatencySeconds
314 );
315 }
316
317 pub fn UpdateResourceMetrics(&self, MemoryBytes:u64, CPUPercent:f64, ActiveConns:u64, ActiveThreads:u64) {
319 self.memory_usage_bytes.store(MemoryBytes as i64, Ordering::Relaxed);
320
321 self.cpu_usage_percent.store((CPUPercent * 100.0) as u64, Ordering::Relaxed);
322
323 self.active_connections.store(ActiveConns, Ordering::Relaxed);
324
325 self.threads_active.store(ActiveThreads, Ordering::Relaxed);
326
327 dev_log!(
328 "metrics",
329 "[Metrics] Updated resource metrics - Memory: {}B, CPU: {:.1}%, Connections: {}, Threads: {}",
330 MemoryBytes,
331 CPUPercent,
332 ActiveConns,
333 ActiveThreads
334 );
335 }
336
337 pub fn RecordAuthenticationOperation(&self, Success:bool) {
339 let _ = self.authentication_operations.fetch_add(1, Ordering::Relaxed);
340
341 if !Success {
342 let _ = self.authentication_failures.fetch_add(1, Ordering::Relaxed);
343 }
344 }
345
346 pub fn RecordDownload(&self, Success:bool, Bytes:u64) {
348 let _ = self.downloads_total.fetch_add(1, Ordering::Relaxed);
349
350 let _ = self.downloads_bytes_total.fetch_add(Bytes, Ordering::Relaxed);
351
352 if Success {
353 let _ = self.downloads_completed.fetch_add(1, Ordering::Relaxed);
354 } else {
355 let _ = self.downloads_failed.fetch_add(1, Ordering::Relaxed);
356 }
357 }
358
359 pub fn RecordIndexingOperation(&self, EntriesIndexed:u64) {
361 let _ = self.indexing_operations.fetch_add(1, Ordering::Relaxed);
362
363 self.indexing_entries.store(EntriesIndexed as i64, Ordering::Relaxed);
364 }
365
366 pub fn RecordUpdateCheck(&self, UpdatesAvailable:bool) {
368 let _ = self.updates_checked.fetch_add(1, Ordering::Relaxed);
369
370 if UpdatesAvailable {
371 let _ = self.updates_applied.fetch_add(1, Ordering::Relaxed);
372 }
373 }
374
375 fn RedactErrorType(&self, ErrorType:&str) -> String {
377 let Redacted = ErrorType.to_lowercase();
378
379 if Redacted.contains("password") || Redacted.contains("token") || Redacted.contains("secret") {
381 return "sensitive_error".to_string();
382 }
383
384 Redacted
385 }
386
387 pub fn ExportMetrics(&self) -> Result<String> {
389 let metrics_data = self.GetMetricsData();
390
391 let mut output = String::new();
392
393 output.push_str("# HELP air_requests_total Total number of requests processed by Air daemon\n");
394
395 output.push_str("# TYPE air_requests_total counter\n");
396
397 output.push_str(&format!("air_requests_total {}\n", metrics_data.requests_total));
398
399 output.push_str("# HELP air_requests_successful Total number of successful requests\n");
400
401 output.push_str("# TYPE air_requests_successful counter\n");
402
403 output.push_str(&format!("air_requests_successful {}\n", metrics_data.requests_successful));
404
405 output.push_str("# HELP air_requests_failed Total number of failed requests\n");
406
407 output.push_str("# TYPE air_requests_failed counter\n");
408
409 output.push_str(&format!("air_requests_failed {}\n", metrics_data.requests_failed));
410
411 output.push_str("# HELP air_errors_total Total number of errors encountered\n");
412
413 output.push_str("# TYPE air_errors_total counter\n");
414
415 output.push_str(&format!("air_errors_total {}\n", metrics_data.errors_total));
416
417 output.push_str("# HELP air_memory_usage_bytes Memory usage in bytes\n");
418
419 output.push_str("# TYPE air_memory_usage_bytes gauge\n");
420
421 output.push_str(&format!("air_memory_usage_bytes {}\n", metrics_data.memory_bytes));
422
423 output.push_str("# HELP air_cpu_usage_percent CPU usage in hundredths of a percent\n");
424
425 output.push_str("# TYPE air_cpu_usage_percent gauge\n");
426
427 output.push_str(&format!("air_cpu_usage_percent {}\n", metrics_data.cpu_percent));
428
429 output.push_str("# HELP air_active_connections Number of active connections\n");
430
431 output.push_str("# TYPE air_active_connections gauge\n");
432
433 output.push_str(&format!("air_active_connections {}\n", metrics_data.active_connections));
434
435 output.push_str("# HELP air_threads_active Number of active threads\n");
436
437 output.push_str("# TYPE air_threads_active gauge\n");
438
439 output.push_str(&format!("air_threads_active {}\n", metrics_data.active_threads));
440
441 output.push_str("# HELP air_authentication_operations_total Total authentication operations\n");
442
443 output.push_str("# TYPE air_authentication_operations_total counter\n");
444
445 output.push_str(&format!(
446 "air_authentication_operations_total {}\n",
447 metrics_data.authentication_operations
448 ));
449
450 output.push_str("# HELP air_authentication_failures_total Total authentication failures\n");
451
452 output.push_str("# TYPE air_authentication_failures_total counter\n");
453
454 output.push_str(&format!(
455 "air_authentication_failures_total {}\n",
456 metrics_data.authentication_failures
457 ));
458
459 output.push_str("# HELP air_downloads_total Total downloads initiated\n");
460
461 output.push_str("# TYPE air_downloads_total counter\n");
462
463 output.push_str(&format!("air_downloads_total {}\n", metrics_data.downloads_total));
464
465 output.push_str("# HELP air_downloads_completed_total Total downloads completed successfully\n");
466
467 output.push_str("# TYPE air_downloads_completed_total counter\n");
468
469 output.push_str(&format!("air_downloads_completed_total {}\n", metrics_data.downloads_completed));
470
471 output.push_str("# HELP air_downloads_failed_total Total downloads failed\n");
472
473 output.push_str("# TYPE air_downloads_failed_total counter\n");
474
475 output.push_str(&format!("air_downloads_failed_total {}\n", metrics_data.downloads_failed));
476
477 output.push_str("# HELP air_downloads_bytes_total Total bytes downloaded\n");
478
479 output.push_str("# TYPE air_downloads_bytes_total counter\n");
480
481 output.push_str(&format!("air_downloads_bytes_total {}\n", metrics_data.downloads_bytes));
482
483 output.push_str("# HELP air_indexing_operations_total Total indexing operations\n");
484
485 output.push_str("# TYPE air_indexing_operations_total counter\n");
486
487 output.push_str(&format!("air_indexing_operations_total {}\n", metrics_data.indexing_operations));
488
489 output.push_str("# HELP air_indexing_entries Number of indexed entries\n");
490
491 output.push_str("# TYPE air_indexing_entries gauge\n");
492
493 output.push_str(&format!("air_indexing_entries {}\n", metrics_data.indexing_entries));
494
495 output.push_str("# HELP air_updates_checked_total Total update checks performed\n");
496
497 output.push_str("# TYPE air_updates_checked_total counter\n");
498
499 output.push_str(&format!("air_updates_checked_total {}\n", metrics_data.updates_checked));
500
501 output.push_str("# HELP air_updates_applied_total Total updates applied\n");
502
503 output.push_str("# TYPE air_updates_applied_total counter\n");
504
505 output.push_str(&format!("air_updates_applied_total {}\n", metrics_data.updates_applied));
506
507 Ok(output)
508 }
509
510 pub fn GetMetricsData(&self) -> MetricsData {
512 let latency_avg = if self.request_latency_count.load(Ordering::Relaxed) > 0 {
513 self.request_latency_sum_ms.load(Ordering::Relaxed) as f64
514 / self.request_latency_count.load(Ordering::Relaxed) as f64
515 } else {
516 0.0
517 };
518
519 MetricsData {
520 timestamp:crate::Utility::CurrentTimestamp(),
521
522 requests_total:self.requests_total.load(Ordering::Relaxed),
523
524 requests_successful:self.requests_successful.load(Ordering::Relaxed),
525
526 requests_failed:self.requests_failed.load(Ordering::Relaxed),
527
528 errors_total:self.errors_total.load(Ordering::Relaxed),
529
530 memory_bytes:self.memory_usage_bytes.load(Ordering::Relaxed).max(0) as u64,
531
532 cpu_percent:self.cpu_usage_percent.load(Ordering::Relaxed) as f64 / 100.0,
533
534 active_connections:self.active_connections.load(Ordering::Relaxed),
535
536 active_threads:self.threads_active.load(Ordering::Relaxed),
537
538 authentication_operations:self.authentication_operations.load(Ordering::Relaxed),
539
540 authentication_failures:self.authentication_failures.load(Ordering::Relaxed),
541
542 downloads_total:self.downloads_total.load(Ordering::Relaxed),
543
544 downloads_completed:self.downloads_completed.load(Ordering::Relaxed),
545
546 downloads_failed:self.downloads_failed.load(Ordering::Relaxed),
547
548 downloads_bytes:self.downloads_bytes_total.load(Ordering::Relaxed),
549
550 indexing_operations:self.indexing_operations.load(Ordering::Relaxed),
551
552 indexing_entries:self.indexing_entries.load(Ordering::Relaxed).max(0) as u64,
553
554 updates_checked:self.updates_checked.load(Ordering::Relaxed),
555
556 updates_applied:self.updates_applied.load(Ordering::Relaxed),
557
558 latency_avg_ms:latency_avg,
559
560 latency_min_ms:self.request_latency_min_ms.load(Ordering::Relaxed),
561
562 latency_max_ms:self.request_latency_max_ms.load(Ordering::Relaxed),
563 }
564 }
565
566 #[cfg(test)]
568 pub fn Reset(&self) {
569 self.requests_total.store(0, Ordering::Relaxed);
570
571 self.requests_successful.store(0, Ordering::Relaxed);
572
573 self.requests_failed.store(0, Ordering::Relaxed);
574
575 self.request_latency_sum_ms.store(0, Ordering::Relaxed);
576
577 self.request_latency_count.store(0, Ordering::Relaxed);
578
579 self.request_latency_min_ms.store(u64::MAX, Ordering::Relaxed);
580
581 self.request_latency_max_ms.store(0, Ordering::Relaxed);
582
583 self.errors_total.store(0, Ordering::Relaxed);
584
585 self.memory_usage_bytes.store(0, Ordering::Relaxed);
586
587 self.cpu_usage_percent.store(0, Ordering::Relaxed);
588
589 self.active_connections.store(0, Ordering::Relaxed);
590
591 self.threads_active.store(0, Ordering::Relaxed);
592
593 self.authentication_operations.store(0, Ordering::Relaxed);
594
595 self.authentication_failures.store(0, Ordering::Relaxed);
596
597 self.downloads_total.store(0, Ordering::Relaxed);
598
599 self.downloads_completed.store(0, Ordering::Relaxed);
600
601 self.downloads_failed.store(0, Ordering::Relaxed);
602
603 self.downloads_bytes_total.store(0, Ordering::Relaxed);
604
605 self.indexing_operations.store(0, Ordering::Relaxed);
606
607 self.indexing_entries.store(0, Ordering::Relaxed);
608
609 self.updates_checked.store(0, Ordering::Relaxed);
610
611 self.updates_applied.store(0, Ordering::Relaxed);
612 }
613}
614
615fn MinMaxUpdate(MinMetric:&AtomicU64, MaxMetric:&AtomicU64, Value:u64) {
617 let mut CurrentMin = MinMetric.load(Ordering::Relaxed);
618
619 let mut CurrentMax = MaxMetric.load(Ordering::Relaxed);
620
621 loop {
622 if Value < CurrentMin {
623 match MinMetric.compare_exchange_weak(CurrentMin, Value, Ordering::Relaxed, Ordering::Relaxed) {
624 Ok(_) => break,
625
626 Err(NewMin) => CurrentMin = NewMin,
627 }
628 } else if Value > CurrentMax {
629 match MaxMetric.compare_exchange_weak(CurrentMax, Value, Ordering::Relaxed, Ordering::Relaxed) {
630 Ok(_) => break,
631
632 Err(NewMax) => CurrentMax = NewMax,
633 }
634 } else {
635 break;
636 }
637 }
638}
639
640impl Default for MetricsCollector {
641 fn default() -> Self { Self::new().expect("Failed to create MetricsCollector") }
642}
643
644#[derive(Debug, Clone, Serialize, Deserialize)]
646pub struct MetricsData {
647 pub timestamp:u64,
648
649 pub requests_total:u64,
650
651 pub requests_successful:u64,
652
653 pub requests_failed:u64,
654
655 pub errors_total:u64,
656
657 pub memory_bytes:u64,
658
659 pub cpu_percent:f64,
660
661 pub active_connections:u64,
662
663 pub active_threads:u64,
664
665 pub authentication_operations:u64,
666
667 pub authentication_failures:u64,
668
669 pub downloads_total:u64,
670
671 pub downloads_completed:u64,
672
673 pub downloads_failed:u64,
674
675 pub downloads_bytes:u64,
676
677 pub indexing_operations:u64,
678
679 pub indexing_entries:u64,
680
681 pub updates_checked:u64,
682
683 pub updates_applied:u64,
684
685 pub latency_avg_ms:f64,
686
687 pub latency_min_ms:u64,
688
689 pub latency_max_ms:u64,
690}
691
692impl MetricsData {
693 pub fn SuccessRate(&self) -> f64 {
695 if self.requests_total == 0 {
696 return 100.0;
697 }
698
699 (self.requests_successful as f64 / self.requests_total as f64) * 100.0
700 }
701
702 pub fn DownloadSuccessRate(&self) -> f64 {
704 if self.downloads_total == 0 {
705 return 100.0;
706 }
707
708 (self.downloads_completed as f64 / self.downloads_total as f64) * 100.0
709 }
710
711 pub fn ErrorRate(&self) -> f64 {
713 if self.requests_total == 0 {
714 return 0.0;
715 }
716
717 (self.errors_total as f64 / self.requests_total as f64) * 100.0
718 }
719}
720
721static METRICS_INSTANCE:std::sync::OnceLock<MetricsCollector> = std::sync::OnceLock::new();
723
724pub fn GetMetrics() -> &'static MetricsCollector { METRICS_INSTANCE.get_or_init(|| MetricsCollector::default()) }
726
727pub fn InitializeMetrics() -> Result<()> {
729 let _collector = GetMetrics();
730
731 dev_log!("metrics", "[Metrics] Global metrics collector initialized");
732
733 Ok(())
734}