Skip to main content

AirLibrary/Resilience/
mod.rs

1//! # Resilience Patterns Module
2//!
3//! Provides robust resilience patterns for external service calls:
4//! - Exponential backoff retry logic with jitter
5//! - Circuit breaker pattern for fault isolation
6//! - Bulkhead pattern for resource isolation
7//! - Timeout management with cascading deadlines
8//!
9//! ## Responsibilities
10//!
11//! ### Retry Patterns
12//! - Exponential backoff with jitter for distributed systems
13//! - Adaptive retry policies based on error classification
14//! - Retry budget management for service rate limiting
15//! - Panic recovery for background retry tasks
16//!
17//! ### Circuit Breaker
18//! - Automatic fault detection and isolation
19//! - State consistency validation across transitions
20//! - Event publishing for telemetry integration
21//! - Half-open state monitoring for recovery testing
22//!
23//! ### Bulkhead Pattern
24//! - Concurrent request limiting for resource protection
25//! - Queue management with overflow protection
26//! - Load monitoring and metrics collection
27//! - Timeout validation for all operations
28//!
29//! ### Timeout Management
30//! - Cascading deadline propagation
31//! - Global deadline coordination
32//! - Operation timeout enforcement
33//! - Panic-safe timeout cancellation
34//!
35//! ## Integration with Mountain
36//!
37//! Resilience patterns directly support Mountain's stability by:
38//! - preventing cascading failures through circuit breaker isolation
39//! - managing load through bulkhead resource limits
40//! - providing event publishing for Mountain's telemetry dashboard
41//! - enabling adaptive retry behavior for improved service availability
42//!
43//! ## VSCode Stability References
44//!
45//! Similar patterns used in VSCode for:
46//! - External service resilience (telemetry, updates, extensions)
47//! - Editor process isolation and recovery
48//! - Background task fault tolerance
49//!
50//! Reference:
51//! vs/base/common/errors
52//!
53//! # FUTURE Enhancements
54//!
55//! - [DISTRIBUTED TRACING] Integrate with Tracing module for retry/circuit span
56//! correlation
57//! - [CUSTOM METRICS] Add detailed bulkhead load metrics to Metrics module
58//! - [EVENT PUBLISHING] Extend circuit breaker events with OpenTelemetry
59//! support
60//! - [ADAPTIVE POLICIES] Enhance retry policies with machine learning-based
61//! error prediction
62//! - [METRICS INTEGRATION] Export resilience metrics to Mountain's telemetry UI
63//! ## Sensitive Data Handling
64//!
65//! This module does not process sensitive data directly but should:
66//! - Redact error messages before logging/event publishing
67//! - Avoid including request payloads in resilience events
68//! - Sanitize service names before publishing to telemetry
69
70pub mod Retry;
71
72pub use Retry::{ErrorClass, RetryEvent, RetryManager, RetryPolicy};
73
74pub mod Timeout;
75
76use std::{
77	collections::HashMap,
78	sync::Arc,
79	time::{Duration, Instant},
80};
81
82pub use Timeout::TimeoutManager;
83use tokio::sync::{RwLock, broadcast};
84use serde::{Deserialize, Serialize};
85
86use crate::dev_log;
87
88// Retry types (ErrorClass, RetryPolicy, RetryManager, RetryEvent) → Retry.rs
89
90/// Circuit breaker states
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
92pub enum CircuitState {
93	/// Circuit is closed (normal operation)
94	Closed,
95
96	/// Circuit is open (failing fast)
97	Open,
98
99	/// Circuit is half-open (testing recovery)
100	HalfOpen,
101}
102
103/// Circuit breaker configuration
104#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
105pub struct CircuitBreakerConfig {
106	/// Failure threshold before tripping
107	pub FailureThreshold:u32,
108
109	/// Success threshold before closing
110	pub SuccessThreshold:u32,
111
112	/// Timeout before attempting recovery (in seconds)
113	pub TimeoutSecs:u64,
114}
115
116impl Default for CircuitBreakerConfig {
117	fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
118}
119
120/// Circuit breaker events for metrics and telemetry integration
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct CircuitEvent {
123	pub name:String,
124
125	pub FromState:CircuitState,
126
127	pub ToState:CircuitState,
128
129	pub timestamp:u64,
130
131	pub reason:String,
132}
133
134/// Circuit breaker for fault isolation with state consistency validation and
135/// event publishing
136#[derive(Debug)]
137pub struct CircuitBreaker {
138	Name:String,
139
140	State:Arc<RwLock<CircuitState>>,
141
142	Config:CircuitBreakerConfig,
143
144	FailureCount:Arc<RwLock<u32>>,
145
146	SuccessCount:Arc<RwLock<u32>>,
147
148	LastFailureTime:Arc<RwLock<Option<Instant>>>,
149
150	EventTx:Arc<broadcast::Sender<CircuitEvent>>,
151
152	StateTransitionCounter:Arc<RwLock<u32>>,
153}
154
155impl CircuitBreaker {
156	/// Create a new circuit breaker with event publishing
157	pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
158		let (EventTx, _) = broadcast::channel(1000);
159
160		Self {
161			Name:name.clone(),
162
163			State:Arc::new(RwLock::new(CircuitState::Closed)),
164
165			Config,
166
167			FailureCount:Arc::new(RwLock::new(0)),
168
169			SuccessCount:Arc::new(RwLock::new(0)),
170
171			LastFailureTime:Arc::new(RwLock::new(None)),
172
173			EventTx:Arc::new(EventTx),
174
175			StateTransitionCounter:Arc::new(RwLock::new(0)),
176		}
177	}
178
179	/// Get the circuit breaker event transmitter for subscription
180	pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
181
182	/// Get current state with panic recovery
183	pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
184
185	/// Validate state consistency across all counters
186	pub async fn ValidateState(&self) -> Result<(), String> {
187		let state = *self.State.read().await;
188
189		let failures = *self.FailureCount.read().await;
190
191		let successes = *self.SuccessCount.read().await;
192
193		match state {
194			CircuitState::Closed => {
195				if successes != 0 {
196					return Err(format!("Inconsistent state: Closed but has {} successes", successes));
197				}
198
199				if failures >= self.Config.FailureThreshold {
200					dev_log!(
201						"resilience",
202						"warn: [CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
203						failures,
204						self.Config.FailureThreshold
205					);
206				}
207			},
208
209			CircuitState::Open => {
210				if failures < self.Config.FailureThreshold {
211					dev_log!(
212						"resilience",
213						"warn: [CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
214						failures,
215						self.Config.FailureThreshold
216					);
217				}
218			},
219
220			CircuitState::HalfOpen => {
221				if successes >= self.Config.SuccessThreshold {
222					return Err(format!(
223						"Inconsistent state: HalfOpen but has {} successes (should be Closed)",
224						successes
225					));
226				}
227			},
228		}
229
230		Ok(())
231	}
232
233	/// Transition state with validation and event publishing
234	async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
235		let CurrentState = self.GetState().await;
236
237		if CurrentState == NewState {
238			// No transition needed
239			return Ok(());
240		}
241
242		// Validate the proposed transition
243		match (CurrentState, NewState) {
244			(CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
245
246				// Valid transitions
247			},
248
249			(CircuitState::Open, CircuitState::HalfOpen) => {
250
251				// Valid transition through recovery
252			},
253
254			(CircuitState::HalfOpen, CircuitState::Closed) => {
255
256				// Valid recovery transition
257			},
258
259			_ => {
260				return Err(format!(
261					"Invalid state transition from {:?} to {:?} for {}",
262					CurrentState, NewState, self.Name
263				));
264			},
265		}
266
267		// Publish state transition event
268		let event = CircuitEvent {
269			name:self.Name.clone(),
270
271			FromState:CurrentState,
272
273			ToState:NewState,
274
275			timestamp:crate::Utility::CurrentTimestamp(),
276
277			reason:reason.to_string(),
278		};
279
280		let _ = self.EventTx.send(event);
281
282		// Transition state
283		*self.State.write().await = NewState;
284
285		// Increment transition counter
286		*self.StateTransitionCounter.write().await += 1;
287
288		dev_log!(
289			"resilience",
290			"[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
291			self.Name,
292			CurrentState,
293			NewState,
294			reason
295		);
296
297		// Validate new state consistency
298		self.ValidateState().await.map_err(|e| {
299			dev_log!(
300				"resilience",
301				"error: [CircuitBreaker] State validation failed after transition: {}",
302				e
303			);
304
305			e
306		})?;
307
308		Ok(())
309	}
310
311	/// Record a successful call with panic recovery
312	pub async fn RecordSuccess(&self) {
313		let state = self.GetState().await;
314
315		match state {
316			CircuitState::Closed => {
317				// Reset counters
318				*self.FailureCount.write().await = 0;
319			},
320
321			CircuitState::HalfOpen => {
322				// Increment success count
323				let mut SuccessCount = self.SuccessCount.write().await;
324
325				*SuccessCount += 1;
326
327				if *SuccessCount >= self.Config.SuccessThreshold {
328					// Close the circuit
329					let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
330
331					*self.FailureCount.write().await = 0;
332
333					*self.SuccessCount.write().await = 0;
334				}
335			},
336
337			_ => {},
338		}
339	}
340
341	/// Record a failed call with panic recovery
342	pub async fn RecordFailure(&self) {
343		let State = self.GetState().await;
344
345		*self.LastFailureTime.write().await = Some(Instant::now());
346
347		match State {
348			CircuitState::Closed => {
349				// Increment failure count
350				let mut FailureCount = self.FailureCount.write().await;
351
352				*FailureCount += 1;
353
354				if *FailureCount >= self.Config.FailureThreshold {
355					// Open the circuit
356					let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
357
358					*self.SuccessCount.write().await = 0;
359				}
360			},
361
362			CircuitState::HalfOpen => {
363				// Return to open state
364				let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
365
366				*self.SuccessCount.write().await = 0;
367			},
368
369			_ => {},
370		}
371	}
372
373	/// Attempt to transition to half-open if timeout has elapsed with panic
374	/// recovery
375	pub async fn AttemptRecovery(&self) -> bool {
376		let state = self.GetState().await;
377
378		if state != CircuitState::Open {
379			return state == CircuitState::HalfOpen;
380		}
381
382		if let Some(last_failure) = *self.LastFailureTime.read().await {
383			if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
384				let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
385
386				*self.SuccessCount.write().await = 0;
387
388				return true;
389			}
390		}
391
392		false
393	}
394
395	/// Get circuit breaker statistics for metrics
396	pub async fn GetStatistics(&self) -> CircuitStatistics {
397		CircuitStatistics {
398			Name:self.Name.clone(),
399
400			State:self.GetState().await,
401
402			Failures:*self.FailureCount.read().await,
403
404			Successes:*self.SuccessCount.read().await,
405
406			StateTransitions:*self.StateTransitionCounter.read().await,
407
408			LastFailureTime:*self.LastFailureTime.read().await,
409		}
410	}
411
412	/// Validate circuit breaker configuration
413	pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
414		if config.FailureThreshold == 0 {
415			return Err("FailureThreshold must be greater than 0".to_string());
416		}
417
418		if config.SuccessThreshold == 0 {
419			return Err("SuccessThreshold must be greater than 0".to_string());
420		}
421
422		if config.TimeoutSecs == 0 {
423			return Err("TimeoutSecs must be greater than 0".to_string());
424		}
425
426		Ok(())
427	}
428}
429
430/// Circuit breaker statistics for metrics export
431#[derive(Debug, Clone, Serialize)]
432pub struct CircuitStatistics {
433	pub Name:String,
434
435	pub State:CircuitState,
436
437	pub Failures:u32,
438
439	pub Successes:u32,
440
441	pub StateTransitions:u32,
442
443	#[serde(skip_serializing)]
444	pub LastFailureTime:Option<Instant>,
445}
446
447impl<'de> Deserialize<'de> for CircuitStatistics {
448	fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
449	where
450		D: serde::Deserializer<'de>, {
451		use serde::de::{self, Visitor};
452
453		struct CircuitStatisticsVisitor;
454
455		impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
456			type Value = CircuitStatistics;
457
458			fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
459				formatter.write_str("struct CircuitStatistics")
460			}
461
462			fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
463			where
464				A: de::MapAccess<'de>, {
465				let mut Name = None;
466
467				let mut State = None;
468
469				let mut Failures = None;
470
471				let mut Successes = None;
472
473				let mut StateTransitions = None;
474
475				while let Some(key) = map.next_key::<String>()? {
476					match key.as_str() {
477						"name" => Name = Some(map.next_value()?),
478
479						"state" => State = Some(map.next_value()?),
480
481						"failures" => Failures = Some(map.next_value()?),
482
483						"successes" => Successes = Some(map.next_value()?),
484
485						"state_transitions" => StateTransitions = Some(map.next_value()?),
486
487						_ => {
488							map.next_value::<de::IgnoredAny>()?;
489						},
490					}
491				}
492
493				Ok(CircuitStatistics {
494					Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
495
496					State:State.ok_or_else(|| de::Error::missing_field("state"))?,
497
498					Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
499
500					Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
501
502					StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
503
504					LastFailureTime:None,
505				})
506			}
507		}
508
509		const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
510
511		Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
512	}
513}
514
515impl Clone for CircuitBreaker {
516	fn clone(&self) -> Self {
517		Self {
518			Name:self.Name.clone(),
519
520			State:self.State.clone(),
521
522			Config:self.Config.clone(),
523
524			FailureCount:self.FailureCount.clone(),
525
526			SuccessCount:self.SuccessCount.clone(),
527
528			LastFailureTime:self.LastFailureTime.clone(),
529
530			EventTx:self.EventTx.clone(),
531
532			StateTransitionCounter:self.StateTransitionCounter.clone(),
533		}
534	}
535}
536
537/// Bulkhead configuration
538#[derive(Debug, Clone, Serialize, Deserialize)]
539pub struct BulkheadConfig {
540	/// Maximum concurrent requests
541	pub max_concurrent:usize,
542
543	/// Maximum queue size
544	pub max_queue:usize,
545
546	/// Request timeout (in seconds)
547	pub timeout_secs:u64,
548}
549
550impl Default for BulkheadConfig {
551	fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
552}
553
554/// Bulkhead statistics for metrics export
555#[derive(Debug, Clone, Serialize, Deserialize)]
556pub struct BulkheadStatistics {
557	pub name:String,
558
559	pub current_concurrent:u32,
560
561	pub current_queue:u32,
562
563	pub max_concurrent:usize,
564
565	pub max_queue:usize,
566
567	pub total_rejected:u64,
568
569	pub total_completed:u64,
570
571	pub total_timed_out:u64,
572}
573
574/// Bulkhead semaphore for resource isolation with metrics and panic recovery
575#[derive(Debug)]
576pub struct BulkheadExecutor {
577	name:String,
578
579	semaphore:Arc<tokio::sync::Semaphore>,
580
581	config:BulkheadConfig,
582
583	current_requests:Arc<RwLock<u32>>,
584
585	queue_size:Arc<RwLock<u32>>,
586
587	total_rejected:Arc<RwLock<u64>>,
588
589	total_completed:Arc<RwLock<u64>>,
590
591	total_timed_out:Arc<RwLock<u64>>,
592}
593
594impl BulkheadExecutor {
595	/// Create a new bulkhead executor with metrics tracking
596	pub fn new(name:String, config:BulkheadConfig) -> Self {
597		Self {
598			name:name.clone(),
599
600			semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
601
602			config,
603
604			current_requests:Arc::new(RwLock::new(0)),
605
606			queue_size:Arc::new(RwLock::new(0)),
607
608			total_rejected:Arc::new(RwLock::new(0)),
609
610			total_completed:Arc::new(RwLock::new(0)),
611
612			total_timed_out:Arc::new(RwLock::new(0)),
613		}
614	}
615
616	/// Validate bulkhead configuration
617	pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
618		if config.max_concurrent == 0 {
619			return Err("max_concurrent must be greater than 0".to_string());
620		}
621
622		if config.max_queue == 0 {
623			return Err("max_queue must be greater than 0".to_string());
624		}
625
626		if config.timeout_secs == 0 {
627			return Err("timeout_secs must be greater than 0".to_string());
628		}
629
630		Ok(())
631	}
632
633	/// Execute with bulkhead protection and panic recovery
634	pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
635	where
636		F: std::future::Future<Output = Result<R, String>>, {
637		async {
638			// Validate timeout
639			if self.config.timeout_secs == 0 {
640				return Err("Bulkhead timeout must be greater than 0".to_string());
641			}
642
643			// Check queue size
644			let queue = *self.queue_size.read().await;
645
646			if queue >= self.config.max_queue as u32 {
647				*self.total_rejected.write().await += 1;
648
649				dev_log!("resilience", "warn: [Bulkhead] Queue full for {}, rejecting request", self.name);
650
651				return Err("Bulkhead queue full".to_string());
652			}
653
654			// Increment queue size
655			*self.queue_size.write().await += 1;
656
657			// Acquire permit with timeout
658			let _Permit =
659				match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
660					.await
661				{
662					Ok(Ok(_)) => {
663						// Permit acquired, proceed with execution
664						// Decrement queue size
665						*self.queue_size.write().await -= 1;
666					},
667
668					Ok(Err(e)) => {
669						*self.queue_size.write().await -= 1;
670
671						return Err(format!("Bulkhead semaphore error: {}", e));
672					},
673
674					Err(_) => {
675						*self.queue_size.write().await -= 1;
676
677						*self.total_timed_out.write().await += 1;
678
679						dev_log!("resilience", "warn: [Bulkhead] Timeout waiting for permit for {}", self.name);
680
681						return Err("Bulkhead timeout waiting for permit".to_string());
682					},
683				};
684
685			// Decrement queue size, increment current requests
686			*self.queue_size.write().await -= 1;
687
688			*self.current_requests.write().await += 1;
689
690			// Execute with timeout (no catch_unwind to avoid interior mutability issues)
691			let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
692
693			let execution_result:Result<R, String> = match execution_result {
694				Ok(Ok(value)) => Ok(value),
695
696				Ok(Err(e)) => Err(e),
697
698				Err(_) => {
699					*self.total_timed_out.write().await += 1;
700
701					Err("Bulkhead execution timeout".to_string())
702				},
703			};
704
705			if execution_result.is_ok() {
706				*self.total_completed.write().await += 1;
707			}
708
709			execution_result
710		}
711		.await
712	}
713
714	/// Get current load with panic recovery
715	pub async fn GetLoad(&self) -> (u32, u32) {
716		async {
717			let current = *self.current_requests.read().await;
718
719			let queue = *self.queue_size.read().await;
720
721			(current, queue)
722		}
723		.await
724	}
725
726	/// Get bulkhead statistics for metrics
727	pub async fn GetStatistics(&self) -> BulkheadStatistics {
728		async {
729			BulkheadStatistics {
730				name:self.name.clone(),
731
732				current_concurrent:*self.current_requests.read().await,
733
734				current_queue:*self.queue_size.read().await,
735
736				max_concurrent:self.config.max_concurrent,
737
738				max_queue:self.config.max_queue,
739
740				total_rejected:*self.total_rejected.read().await,
741
742				total_completed:*self.total_completed.read().await,
743
744				total_timed_out:*self.total_timed_out.read().await,
745			}
746		}
747		.await
748	}
749
750	/// Calculate utilization percentage
751	pub async fn GetUtilization(&self) -> f64 {
752		let (current, _) = self.GetLoad().await;
753
754		if self.config.max_concurrent == 0 {
755			return 0.0;
756		}
757
758		(current as f64 / self.config.max_concurrent as f64) * 100.0
759	}
760}
761
762impl Clone for BulkheadExecutor {
763	fn clone(&self) -> Self {
764		Self {
765			name:self.name.clone(),
766
767			semaphore:self.semaphore.clone(),
768
769			config:self.config.clone(),
770
771			current_requests:self.current_requests.clone(),
772
773			queue_size:self.queue_size.clone(),
774
775			total_rejected:self.total_rejected.clone(),
776
777			total_completed:self.total_completed.clone(),
778
779			total_timed_out:self.total_timed_out.clone(),
780		}
781	}
782}
783
784/// Resilience orchestrator combining all patterns
785#[derive(Debug)]
786pub struct ResilienceOrchestrator {
787	retry_manager:Arc<RetryManager>,
788
789	circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
790
791	bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
792}
793
794impl ResilienceOrchestrator {
795	/// Create a new resilience orchestrator
796	pub fn new(retry_policy:RetryPolicy) -> Self {
797		Self {
798			retry_manager:Arc::new(RetryManager::new(retry_policy)),
799
800			circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
801
802			bulkheads:Arc::new(RwLock::new(HashMap::new())),
803		}
804	}
805
806	/// Get or create circuit breaker with configuration validation
807	pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
808		let mut breakers = self.circuit_breakers.write().await;
809
810		Arc::new(
811			breakers
812				.entry(service.to_string())
813				.or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
814				.clone(),
815		)
816	}
817
818	/// Get or create bulkhead with configuration validation
819	pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
820		let mut bulkheads = self.bulkheads.write().await;
821
822		Arc::new(
823			bulkheads
824				.entry(service.to_string())
825				.or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
826				.clone(),
827		)
828	}
829
830	/// Get all circuit breaker statistics
831	pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
832		let breakers = self.circuit_breakers.read().await;
833
834		let mut stats = Vec::new();
835
836		for breaker in breakers.values() {
837			stats.push(breaker.GetStatistics().await);
838		}
839
840		stats
841	}
842
843	/// Get all bulkhead statistics
844	pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
845		let bulkheads = self.bulkheads.read().await;
846
847		let mut stats = Vec::new();
848
849		for bulkhead in bulkheads.values() {
850			stats.push(bulkhead.GetStatistics().await);
851		}
852
853		stats
854	}
855
856	/// Execute with full resilience and event publishing
857	pub async fn ExecuteResilient<F, R>(
858		&self,
859
860		service:&str,
861
862		retry_policy:&RetryPolicy,
863
864		circuit_config:CircuitBreakerConfig,
865
866		bulkhead_config:BulkheadConfig,
867
868		f:F,
869	) -> Result<R, String>
870	where
871		F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
872		// Validate configurations
873		if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
874			return Err(format!("Invalid circuit breaker config: {}", e));
875		}
876
877		if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
878			return Err(format!("Invalid bulkhead config: {}", e));
879		}
880
881		let breaker = self.GetCircuitBreaker(service, circuit_config).await;
882
883		let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
884
885		// Check circuit state
886		if breaker.GetState().await == CircuitState::Open {
887			if !breaker.AttemptRecovery().await {
888				return Err("Circuit breaker is open".to_string());
889			}
890		}
891
892		// Execute with bulkhead protection and retry logic
893		let mut Attempt = 0;
894
895		let _LastError = "".to_string();
896
897		loop {
898			let result = bulkhead.Execute(f()).await;
899
900			match result {
901				Ok(Value) => {
902					breaker.RecordSuccess().await;
903
904					// Publish retry success event
905					let Event = RetryEvent {
906						Service:service.to_string(),
907
908						Attempt,
909
910						ErrorClass:ErrorClass::Unknown,
911
912						DelayMs:0,
913
914						Success:true,
915
916						ErrorMessage:None,
917					};
918
919					self.retry_manager.PublishRetryEvent(Event);
920
921					return Ok(Value);
922				},
923
924				Err(E) => {
925					let ErrorClass = self.retry_manager.ClassifyError(&E);
926
927					breaker.RecordFailure().await;
928
929					// Publish retry failure event
930					let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
931
932					let Event = RetryEvent {
933						Service:service.to_string(),
934
935						Attempt,
936
937						ErrorClass,
938
939						DelayMs:Delay.as_millis() as u64,
940
941						Success:false,
942
943						ErrorMessage:Some(self.redact_sensitive_data(&E)),
944					};
945
946					self.retry_manager.PublishRetryEvent(Event);
947
948					if Attempt < retry_policy.MaxRetries
949						&& ErrorClass != ErrorClass::NonRetryable
950						&& self.retry_manager.CanRetry(service).await
951					{
952						let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
953
954						dev_log!(
955							"resilience",
956							"[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
957							service,
958							Attempt + 1,
959							retry_policy.MaxRetries,
960							Delay,
961							self.redact_sensitive_data(&E)
962						);
963
964						tokio::time::sleep(Delay).await;
965
966						Attempt += 1;
967					} else {
968						return Err(E);
969					}
970				},
971			}
972		}
973	}
974
975	/// Redact sensitive data from error messages before logging/event
976	/// publishing
977	fn redact_sensitive_data(&self, message:&str) -> String {
978		let mut redacted = message.to_string();
979
980		// Redact common patterns - simplified to avoid escaping issues
981		let patterns = vec![
982			(r"(?i)password[=:]\S+", "password=[REDACTED]"),
983			(r"(?i)token[=:]\S+", "token=[REDACTED]"),
984			(r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
985			(r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
986			(
987				r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
988				"Authorization: Bearer [REDACTED]",
989			),
990			(r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
991			(r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
992		];
993
994		for (pattern, replacement) in patterns {
995			if let Ok(re) = regex::Regex::new(pattern) {
996				redacted = re.replace_all(&redacted, replacement).to_string();
997			}
998		}
999
1000		redacted
1001	}
1002
1003	/// Validate all configurations
1004	pub fn ValidateConfigurations(
1005		&self,
1006
1007		_RetryPolicy:&RetryPolicy,
1008
1009		CircuitConfig:&CircuitBreakerConfig,
1010
1011		BulkheadConfig:&BulkheadConfig,
1012	) -> Result<(), String> {
1013		self.retry_manager.ValidatePolicy()?;
1014
1015		CircuitBreaker::ValidateConfig(CircuitConfig)?;
1016
1017		BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1018
1019		TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1020
1021		Ok(())
1022	}
1023}
1024
1025impl Clone for ResilienceOrchestrator {
1026	fn clone(&self) -> Self {
1027		Self {
1028			retry_manager:self.retry_manager.clone(),
1029
1030			circuit_breakers:self.circuit_breakers.clone(),
1031
1032			bulkheads:self.bulkheads.clone(),
1033		}
1034	}
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039
1040	use super::*;
1041
1042	#[test]
1043	fn test_retry_delay_calculation() {
1044		let policy = RetryPolicy::default();
1045
1046		let manager = RetryManager::new(policy);
1047
1048		let delay_1 = manager.CalculateRetryDelay(1);
1049
1050		let delay_2 = manager.CalculateRetryDelay(2);
1051
1052		// delay_2 should be roughly double delay_1 (with some jitter)
1053		assert!(delay_2 >= delay_1);
1054	}
1055
1056	#[test]
1057	fn test_adaptive_retry_delay() {
1058		let policy = RetryPolicy::default();
1059
1060		let manager = RetryManager::new(policy);
1061
1062		// Rate limited errors should have longer delays
1063		let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1064
1065		let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1066
1067		assert!(rate_limit_delay >= transient_delay);
1068	}
1069
1070	#[test]
1071	fn test_error_classification() {
1072		let policy = RetryPolicy::default();
1073
1074		let manager = RetryManager::new(policy);
1075
1076		assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1077
1078		assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1079
1080		assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1081
1082		assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1083	}
1084
1085	#[test]
1086	fn test_policy_validation() {
1087		let policy = RetryPolicy::default();
1088
1089		let manager = RetryManager::new(policy);
1090
1091		assert!(manager.ValidatePolicy().is_ok());
1092
1093		let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1094
1095		let invalid_manager = RetryManager::new(invalid_policy);
1096
1097		assert!(invalid_manager.ValidatePolicy().is_err());
1098	}
1099
1100	#[tokio::test]
1101	async fn test_circuit_breaker_state_transitions() {
1102		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1103
1104		let breaker = CircuitBreaker::new("test".to_string(), config);
1105
1106		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1107
1108		breaker.RecordFailure().await;
1109
1110		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1111
1112		breaker.RecordFailure().await;
1113
1114		assert_eq!(breaker.GetState().await, CircuitState::Open);
1115
1116		assert!(breaker.AttemptRecovery().await);
1117
1118		assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1119
1120		breaker.RecordSuccess().await;
1121
1122		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1123	}
1124
1125	#[tokio::test]
1126	async fn test_circuit_breaker_validation() {
1127		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1128
1129		let breaker = CircuitBreaker::new("test".to_string(), config);
1130
1131		// Validate initial state
1132		assert!(breaker.ValidateState().await.is_ok());
1133
1134		// Trigger state transition to open
1135		breaker.RecordFailure().await;
1136
1137		breaker.RecordFailure().await;
1138
1139		let validate_result = breaker.ValidateState().await;
1140
1141		// May be valid due to timeout behavior
1142		assert!(validate_result.is_ok() || validate_result.is_err());
1143	}
1144
1145	#[test]
1146	fn test_circuit_breaker_config_validation() {
1147		let valid_config = CircuitBreakerConfig::default();
1148
1149		assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1150
1151		let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1152
1153		assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1154	}
1155
1156	#[tokio::test]
1157	async fn test_bulkhead_resource_isolation() {
1158		let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1159
1160		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1161
1162		let (_current, _queue) = bulkhead.GetLoad().await;
1163
1164		assert_eq!(_current, 0);
1165
1166		assert_eq!(_queue, 0);
1167
1168		let stats = bulkhead.GetStatistics().await;
1169
1170		assert_eq!(stats.current_concurrent, 0);
1171
1172		assert_eq!(stats.current_queue, 0);
1173
1174		assert_eq!(stats.max_concurrent, 2);
1175
1176		assert_eq!(stats.max_queue, 5);
1177	}
1178
1179	#[tokio::test]
1180	async fn test_bulkhead_utilization() {
1181		let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1182
1183		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1184
1185		let utilization = bulkhead.GetUtilization().await;
1186
1187		assert_eq!(utilization, 0.0);
1188	}
1189
1190	#[test]
1191	fn test_bulkhead_config_validation() {
1192		let valid_config = BulkheadConfig::default();
1193
1194		assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1195
1196		let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1197
1198		assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1199	}
1200
1201	#[test]
1202	fn test_timeout_manager() {
1203		let manager = TimeoutManager::new(Duration::from_secs(30));
1204
1205		assert!(!manager.IsExceeded());
1206
1207		assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1208
1209		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1210
1211		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1212	}
1213
1214	#[test]
1215	fn test_timeout_manager_with_deadline() {
1216		let deadline = Instant::now() + Duration::from_secs(60);
1217
1218		let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1219
1220		let remaining = manager.Remaining();
1221
1222		assert!(remaining.is_some());
1223
1224		assert!(remaining.unwrap() <= Duration::from_secs(60));
1225	}
1226}