AirLibrary/Resilience/
mod.rs

1//! # Resilience Patterns Module
2//!
3//! Provides robust resilience patterns for external service calls:
4//! - Exponential backoff retry logic with jitter
5//! - Circuit breaker pattern for fault isolation
6//! - Bulkhead pattern for resource isolation
7//! - Timeout management with cascading deadlines
8//!
9//! ## Responsibilities
10//!
11//! ### Retry Patterns
12//! - Exponential backoff with jitter for distributed systems
13//! - Adaptive retry policies based on error classification
14//! - Retry budget management for service rate limiting
15//! - Panic recovery for background retry tasks
16//!
17//! ### Circuit Breaker
18//! - Automatic fault detection and isolation
19//! - State consistency validation across transitions
20//! - Event publishing for telemetry integration
21//! - Half-open state monitoring for recovery testing
22//!
23//! ### Bulkhead Pattern
24//! - Concurrent request limiting for resource protection
25//! - Queue management with overflow protection
26//! - Load monitoring and metrics collection
27//! - Timeout validation for all operations
28//!
29//! ### Timeout Management
30//! - Cascading deadline propagation
31//! - Global deadline coordination
32//! - Operation timeout enforcement
33//! - Panic-safe timeout cancellation
34//!
35//! ## Integration with Mountain
36//!
37//! Resilience patterns directly support Mountain's stability by:
38//! - preventing cascading failures through circuit breaker isolation
39//! - managing load through bulkhead resource limits
40//! - providing event publishing for Mountain's telemetry dashboard
41//! - enabling adaptive retry behavior for improved service availability
42//!
43//! ## VSCode Stability References
44//!
45//! Similar patterns used in VSCode for:
46//! - External service resilience (telemetry, updates, extensions)
47//! - Editor process isolation and recovery
48//! - Background task fault tolerance
49//!
50//! Reference:
51//! vs/base/common/errors
52//!
53//! # TODOs
54//!
55//! - [DISTRIBUTED TRACING] Integrate with Tracing module for retry/circuit span
56//!   correlation
57//! - [CUSTOM METRICS] Add detailed bulkhead load metrics to Metrics module
58//! - [EVENT PUBLISHING] Extend circuit breaker events with OpenTelemetry
59//!   support
60//! - [ADAPTIVE POLICIES] Enhance retry policies with machine learning-based
61//!   error prediction
62//! - [METRICS INTEGRATION] Export resilience metrics to Mountain's telemetry UI
63//!
64//! ## Sensitive Data Handling
65//!
66//! This module does not process sensitive data directly but should:
67//! - Redact error messages before logging/event publishing
68//! - Avoid including request payloads in resilience events
69//! - Sanitize service names before publishing to telemetry
70
71use std::{
72	collections::HashMap,
73	sync::Arc,
74	time::{Duration, Instant},
75};
76
77use tokio::sync::{Mutex, RwLock, broadcast};
78use serde::{Deserialize, Serialize};
79
80/// Error classification for adaptive retry policies
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
82pub enum ErrorClass {
83	/// Transient errors (network timeouts, temporary failures)
84	Transient,
85
86	/// Non-retryable errors (authentication, invalid requests)
87	NonRetryable,
88
89	/// Rate limit errors (429 Too Many Requests)
90	RateLimited,
91
92	/// Server errors (500-599)
93	ServerError,
94
95	/// Unknown error classification
96	Unknown,
97}
98
99/// Retry policy configuration
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct RetryPolicy {
102	/// Maximum number of retry attempts
103	pub MaxRetries:u32,
104
105	/// Initial retry interval (in milliseconds)
106	pub InitialIntervalMs:u64,
107
108	/// Maximum retry interval (in milliseconds)
109	pub MaxIntervalMs:u64,
110
111	/// Exponential backoff multiplier
112	pub BackoffMultiplier:f64,
113
114	/// Jitter percentage (0-1)
115	pub JitterFactor:f64,
116
117	/// Retry budget per service (max retries per minute)
118	pub BudgetPerMinute:u32,
119
120	/// Adaptive error classification for intelligent retry behavior
121	pub ErrorClassification:HashMap<String, ErrorClass>,
122}
123
124impl Default for RetryPolicy {
125	fn default() -> Self {
126		let mut ErrorClassification = HashMap::new();
127
128		// Default error classifications
129		ErrorClassification.insert("timeout".to_string(), ErrorClass::Transient);
130
131		ErrorClassification.insert("connection_refused".to_string(), ErrorClass::Transient);
132
133		ErrorClassification.insert("connection_reset".to_string(), ErrorClass::Transient);
134
135		ErrorClassification.insert("rate_limit_exceeded".to_string(), ErrorClass::RateLimited);
136
137		ErrorClassification.insert("authentication_failed".to_string(), ErrorClass::NonRetryable);
138
139		ErrorClassification.insert("unauthorized".to_string(), ErrorClass::NonRetryable);
140
141		ErrorClassification.insert("not_found".to_string(), ErrorClass::NonRetryable);
142
143		ErrorClassification.insert("server_error".to_string(), ErrorClass::ServerError);
144
145		ErrorClassification.insert("internal_server_error".to_string(), ErrorClass::ServerError);
146
147		ErrorClassification.insert("service_unavailable".to_string(), ErrorClass::ServerError);
148
149		ErrorClassification.insert("gateway_timeout".to_string(), ErrorClass::Transient);
150
151		Self {
152			MaxRetries:3,
153
154			InitialIntervalMs:1000,
155
156			MaxIntervalMs:32000,
157
158			BackoffMultiplier:2.0,
159
160			JitterFactor:0.1,
161
162			BudgetPerMinute:100,
163
164			ErrorClassification,
165		}
166	}
167}
168
169/// Retry budget tracker
170#[derive(Debug, Clone)]
171struct RetryBudget {
172	Attempts:Vec<Instant>,
173
174	MaxPerMinute:u32,
175}
176
177impl RetryBudget {
178	fn new(MaxPerMinute:u32) -> Self { Self { Attempts:Vec::new(), MaxPerMinute } }
179
180	fn can_retry(&mut self) -> bool {
181		let Now = Instant::now();
182
183		let OneMinuteAgo = Now - Duration::from_secs(60);
184
185		// Remove attempts older than 1 minute
186		self.Attempts.retain(|&attempt| attempt > OneMinuteAgo);
187
188		if self.Attempts.len() < self.MaxPerMinute as usize {
189			self.Attempts.push(Now);
190
191			true
192		} else {
193			false
194		}
195	}
196}
197
198/// Retry manager with budget tracking and adaptive policies
199pub struct RetryManager {
200	Policy:RetryPolicy,
201
202	Budgets:Arc<Mutex<HashMap<String, RetryBudget>>>,
203
204	EventTx:Arc<broadcast::Sender<RetryEvent>>,
205}
206
207/// Events published by retry operations for metrics and telemetry integration
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct RetryEvent {
210	pub Service:String,
211
212	pub Attempt:u32,
213
214	pub ErrorClass:ErrorClass,
215
216	pub DelayMs:u64,
217
218	pub Success:bool,
219
220	pub ErrorMessage:Option<String>,
221}
222
223impl RetryManager {
224	/// Create a new retry manager
225	pub fn new(policy:RetryPolicy) -> Self {
226		let (EventTx, _) = broadcast::channel(1000);
227
228		Self {
229			Policy:policy,
230
231			Budgets:Arc::new(Mutex::new(HashMap::new())),
232
233			EventTx:Arc::new(EventTx),
234		}
235	}
236
237	/// Get the retry event transmitter for subscription
238	pub fn GetEventTransmitter(&self) -> broadcast::Sender<RetryEvent> { (*self.EventTx).clone() }
239
240	/// Calculate next retry delay with exponential backoff and jitter
241	pub fn CalculateRetryDelay(&self, Attempt:u32) -> Duration {
242		if Attempt == 0 {
243			return Duration::from_millis(0);
244		}
245
246		let BaseDelay = (self.Policy.InitialIntervalMs as f64 * self.Policy.BackoffMultiplier.powi(Attempt as i32 - 1))
247			.min(self.Policy.MaxIntervalMs as f64) as u64;
248
249		// Add jitter
250		let Jitter = (BaseDelay as f64 * self.Policy.JitterFactor) as u64;
251
252		let RandomJitter = (rand::random::<f64>() * Jitter as f64) as u64;
253
254		let FinalDelay = BaseDelay + RandomJitter;
255
256		Duration::from_millis(FinalDelay)
257	}
258
259	/// Calculate adaptive retry delay based on error classification
260	pub fn CalculateAdaptiveRetryDelay(&self, ErrorType:&str, attempt:u32) -> Duration {
261		let ErrorClass = self
262			.Policy
263			.ErrorClassification
264			.get(ErrorType)
265			.copied()
266			.unwrap_or(ErrorClass::Unknown);
267
268		match ErrorClass {
269			ErrorClass::RateLimited => {
270				// Longer delays with linear backoff for rate limits
271				// 5s, 10s, 15s...
272				let delay = (attempt + 1) * 5000;
273
274				Duration::from_millis(delay as u64)
275			},
276
277			ErrorClass::ServerError => {
278				// Aggressive backoff for server errors
279				let BaseDelay = self.Policy.InitialIntervalMs * 2_u64.pow(attempt);
280
281				Duration::from_millis(BaseDelay.min(self.Policy.MaxIntervalMs))
282			},
283
284			ErrorClass::Transient => {
285				// Standard exponential backoff
286				self.CalculateRetryDelay(attempt)
287			},
288
289			ErrorClass::NonRetryable | ErrorClass::Unknown => {
290				// Minimal delay for non-retryable errors (should fail quickly)
291				Duration::from_millis(100)
292			},
293		}
294	}
295
296	/// Classify an error for adaptive retry behavior
297	pub fn ClassifyError(&self, ErrorMessage:&str) -> ErrorClass {
298		let ErrorLower = ErrorMessage.to_lowercase();
299
300		for (pattern, class) in &self.Policy.ErrorClassification {
301			if ErrorLower.contains(pattern) {
302				return *class;
303			}
304		}
305
306		ErrorClass::Unknown
307	}
308
309	/// Check if retry is possible within budget
310	/// Validates budget state before allowing retry
311	pub async fn CanRetry(&self, service:&str) -> bool {
312		let mut budgets = self.Budgets.lock().await;
313
314		let budget = budgets
315			.entry(service.to_string())
316			.or_insert_with(|| RetryBudget::new(self.Policy.BudgetPerMinute));
317
318		budget.can_retry()
319	}
320
321	/// Publish a retry event for telemetry integration
322	pub fn PublishRetryEvent(&self, event:RetryEvent) { let _ = self.EventTx.send(event); }
323
324	/// Validate retry policy configuration
325	pub fn ValidatePolicy(&self) -> Result<(), String> {
326		if self.Policy.MaxRetries == 0 {
327			return Err("MaxRetries must be greater than 0".to_string());
328		}
329
330		if self.Policy.InitialIntervalMs == 0 {
331			return Err("InitialIntervalMs must be greater than 0".to_string());
332		}
333
334		if self.Policy.InitialIntervalMs > self.Policy.MaxIntervalMs {
335			return Err("InitialIntervalMs cannot be greater than MaxIntervalMs".to_string());
336		}
337
338		if self.Policy.BackoffMultiplier <= 1.0 {
339			return Err("BackoffMultiplier must be greater than 1.0".to_string());
340		}
341
342		if self.Policy.JitterFactor < 0.0 || self.Policy.JitterFactor > 1.0 {
343			return Err("JitterFactor must be between 0 and 1".to_string());
344		}
345
346		if self.Policy.BudgetPerMinute == 0 {
347			return Err("BudgetPerMinute must be greater than 0".to_string());
348		}
349
350		Ok(())
351	}
352}
353
354/// Circuit breaker states
355#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
356pub enum CircuitState {
357	/// Circuit is closed (normal operation)
358	Closed,
359
360	/// Circuit is open (failing fast)
361	Open,
362
363	/// Circuit is half-open (testing recovery)
364	HalfOpen,
365}
366
367/// Circuit breaker configuration
368#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
369pub struct CircuitBreakerConfig {
370	/// Failure threshold before tripping
371	pub FailureThreshold:u32,
372
373	/// Success threshold before closing
374	pub SuccessThreshold:u32,
375
376	/// Timeout before attempting recovery (in seconds)
377	pub TimeoutSecs:u64,
378}
379
380impl Default for CircuitBreakerConfig {
381	fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
382}
383
384/// Circuit breaker events for metrics and telemetry integration
385#[derive(Debug, Clone, Serialize, Deserialize)]
386pub struct CircuitEvent {
387	pub name:String,
388
389	pub FromState:CircuitState,
390
391	pub ToState:CircuitState,
392
393	pub timestamp:u64,
394
395	pub reason:String,
396}
397
398/// Circuit breaker for fault isolation with state consistency validation and
399/// event publishing
400pub struct CircuitBreaker {
401	Name:String,
402
403	State:Arc<RwLock<CircuitState>>,
404
405	Config:CircuitBreakerConfig,
406
407	FailureCount:Arc<RwLock<u32>>,
408
409	SuccessCount:Arc<RwLock<u32>>,
410
411	LastFailureTime:Arc<RwLock<Option<Instant>>>,
412
413	EventTx:Arc<broadcast::Sender<CircuitEvent>>,
414
415	StateTransitionCounter:Arc<RwLock<u32>>,
416}
417
418impl CircuitBreaker {
419	/// Create a new circuit breaker with event publishing
420	pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
421		let (EventTx, _) = broadcast::channel(1000);
422
423		Self {
424			Name:name.clone(),
425
426			State:Arc::new(RwLock::new(CircuitState::Closed)),
427
428			Config,
429
430			FailureCount:Arc::new(RwLock::new(0)),
431
432			SuccessCount:Arc::new(RwLock::new(0)),
433
434			LastFailureTime:Arc::new(RwLock::new(None)),
435
436			EventTx:Arc::new(EventTx),
437
438			StateTransitionCounter:Arc::new(RwLock::new(0)),
439		}
440	}
441
442	/// Get the circuit breaker event transmitter for subscription
443	pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
444
445	/// Get current state with panic recovery
446	pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
447
448	/// Validate state consistency across all counters
449	pub async fn ValidateState(&self) -> Result<(), String> {
450		let state = *self.State.read().await;
451
452		let failures = *self.FailureCount.read().await;
453
454		let successes = *self.SuccessCount.read().await;
455
456		match state {
457			CircuitState::Closed => {
458				if successes != 0 {
459					return Err(format!("Inconsistent state: Closed but has {} successes", successes));
460				}
461
462				if failures >= self.Config.FailureThreshold {
463					log::warn!(
464						"[CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
465						failures,
466						self.Config.FailureThreshold
467					);
468				}
469			},
470
471			CircuitState::Open => {
472				if failures < self.Config.FailureThreshold {
473					log::warn!(
474						"[CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
475						failures,
476						self.Config.FailureThreshold
477					);
478				}
479			},
480
481			CircuitState::HalfOpen => {
482				if successes >= self.Config.SuccessThreshold {
483					return Err(format!(
484						"Inconsistent state: HalfOpen but has {} successes (should be Closed)",
485						successes
486					));
487				}
488			},
489		}
490
491		Ok(())
492	}
493
494	/// Transition state with validation and event publishing
495	async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
496		let CurrentState = self.GetState().await;
497
498		if CurrentState == NewState {
499			// No transition needed
500			return Ok(());
501		}
502
503		// Validate the proposed transition
504		match (CurrentState, NewState) {
505			(CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
506				// Valid transitions
507			},
508
509			(CircuitState::Open, CircuitState::HalfOpen) => {
510				// Valid transition through recovery
511			},
512
513			(CircuitState::HalfOpen, CircuitState::Closed) => {
514				// Valid recovery transition
515			},
516
517			_ => {
518				return Err(format!(
519					"Invalid state transition from {:?} to {:?} for {}",
520					CurrentState, NewState, self.Name
521				));
522			},
523		}
524
525		// Publish state transition event
526		let event = CircuitEvent {
527			name:self.Name.clone(),
528
529			FromState:CurrentState,
530
531			ToState:NewState,
532
533			timestamp:crate::Utility::CurrentTimestamp(),
534
535			reason:reason.to_string(),
536		};
537
538		let _ = self.EventTx.send(event);
539
540		// Transition state
541		*self.State.write().await = NewState;
542
543		// Increment transition counter
544		*self.StateTransitionCounter.write().await += 1;
545
546		log::info!(
547			"[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
548			self.Name,
549			CurrentState,
550			NewState,
551			reason
552		);
553
554		// Validate new state consistency
555		self.ValidateState().await.map_err(|e| {
556			log::error!("[CircuitBreaker] State validation failed after transition: {}", e);
557
558			e
559		})?;
560
561		Ok(())
562	}
563
564	/// Record a successful call with panic recovery
565	pub async fn RecordSuccess(&self) {
566		let state = self.GetState().await;
567
568		match state {
569			CircuitState::Closed => {
570				// Reset counters
571				*self.FailureCount.write().await = 0;
572			},
573
574			CircuitState::HalfOpen => {
575				// Increment success count
576				let mut SuccessCount = self.SuccessCount.write().await;
577
578				*SuccessCount += 1;
579
580				if *SuccessCount >= self.Config.SuccessThreshold {
581					// Close the circuit
582					let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
583
584					*self.FailureCount.write().await = 0;
585
586					*self.SuccessCount.write().await = 0;
587				}
588			},
589
590			_ => {},
591		}
592	}
593
594	/// Record a failed call with panic recovery
595	pub async fn RecordFailure(&self) {
596		let State = self.GetState().await;
597
598		*self.LastFailureTime.write().await = Some(Instant::now());
599
600		match State {
601			CircuitState::Closed => {
602				// Increment failure count
603				let mut FailureCount = self.FailureCount.write().await;
604
605				*FailureCount += 1;
606
607				if *FailureCount >= self.Config.FailureThreshold {
608					// Open the circuit
609					let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
610
611					*self.SuccessCount.write().await = 0;
612				}
613			},
614
615			CircuitState::HalfOpen => {
616				// Return to open state
617				let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
618
619				*self.SuccessCount.write().await = 0;
620			},
621
622			_ => {},
623		}
624	}
625
626	/// Attempt to transition to half-open if timeout has elapsed with panic
627	/// recovery
628	pub async fn AttemptRecovery(&self) -> bool {
629		let state = self.GetState().await;
630
631		if state != CircuitState::Open {
632			return state == CircuitState::HalfOpen;
633		}
634
635		if let Some(last_failure) = *self.LastFailureTime.read().await {
636			if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
637				let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
638
639				*self.SuccessCount.write().await = 0;
640
641				return true;
642			}
643		}
644
645		false
646	}
647
648	/// Get circuit breaker statistics for metrics
649	pub async fn GetStatistics(&self) -> CircuitStatistics {
650		CircuitStatistics {
651			Name:self.Name.clone(),
652
653			State:self.GetState().await,
654
655			Failures:*self.FailureCount.read().await,
656
657			Successes:*self.SuccessCount.read().await,
658
659			StateTransitions:*self.StateTransitionCounter.read().await,
660
661			LastFailureTime:*self.LastFailureTime.read().await,
662		}
663	}
664
665	/// Validate circuit breaker configuration
666	pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
667		if config.FailureThreshold == 0 {
668			return Err("FailureThreshold must be greater than 0".to_string());
669		}
670
671		if config.SuccessThreshold == 0 {
672			return Err("SuccessThreshold must be greater than 0".to_string());
673		}
674
675		if config.TimeoutSecs == 0 {
676			return Err("TimeoutSecs must be greater than 0".to_string());
677		}
678
679		Ok(())
680	}
681}
682
683/// Circuit breaker statistics for metrics export
684#[derive(Debug, Clone, Serialize)]
685pub struct CircuitStatistics {
686	pub Name:String,
687
688	pub State:CircuitState,
689
690	pub Failures:u32,
691
692	pub Successes:u32,
693
694	pub StateTransitions:u32,
695
696	#[serde(skip_serializing)]
697	pub LastFailureTime:Option<Instant>,
698}
699
700impl<'de> Deserialize<'de> for CircuitStatistics {
701	fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
702	where
703		D: serde::Deserializer<'de>, {
704		use serde::de::{self, Visitor};
705
706		struct CircuitStatisticsVisitor;
707
708		impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
709			type Value = CircuitStatistics;
710
711			fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
712				formatter.write_str("struct CircuitStatistics")
713			}
714
715			fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
716			where
717				A: de::MapAccess<'de>, {
718				let mut Name = None;
719
720				let mut State = None;
721
722				let mut Failures = None;
723
724				let mut Successes = None;
725
726				let mut StateTransitions = None;
727
728				while let Some(key) = map.next_key::<String>()? {
729					match key.as_str() {
730						"name" => Name = Some(map.next_value()?),
731
732						"state" => State = Some(map.next_value()?),
733
734						"failures" => Failures = Some(map.next_value()?),
735
736						"successes" => Successes = Some(map.next_value()?),
737
738						"state_transitions" => StateTransitions = Some(map.next_value()?),
739
740						_ => {
741							map.next_value::<de::IgnoredAny>()?;
742						},
743					}
744				}
745
746				Ok(CircuitStatistics {
747					Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
748
749					State:State.ok_or_else(|| de::Error::missing_field("state"))?,
750
751					Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
752
753					Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
754
755					StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
756
757					LastFailureTime:None,
758				})
759			}
760		}
761
762		const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
763
764		Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
765	}
766}
767
768impl Clone for CircuitBreaker {
769	fn clone(&self) -> Self {
770		Self {
771			Name:self.Name.clone(),
772
773			State:self.State.clone(),
774
775			Config:self.Config.clone(),
776
777			FailureCount:self.FailureCount.clone(),
778
779			SuccessCount:self.SuccessCount.clone(),
780
781			LastFailureTime:self.LastFailureTime.clone(),
782
783			EventTx:self.EventTx.clone(),
784
785			StateTransitionCounter:self.StateTransitionCounter.clone(),
786		}
787	}
788}
789
790/// Bulkhead configuration
791#[derive(Debug, Clone, Serialize, Deserialize)]
792pub struct BulkheadConfig {
793	/// Maximum concurrent requests
794	pub max_concurrent:usize,
795
796	/// Maximum queue size
797	pub max_queue:usize,
798
799	/// Request timeout (in seconds)
800	pub timeout_secs:u64,
801}
802
803impl Default for BulkheadConfig {
804	fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
805}
806
807/// Bulkhead statistics for metrics export
808#[derive(Debug, Clone, Serialize, Deserialize)]
809pub struct BulkheadStatistics {
810	pub name:String,
811
812	pub current_concurrent:u32,
813
814	pub current_queue:u32,
815
816	pub max_concurrent:usize,
817
818	pub max_queue:usize,
819
820	pub total_rejected:u64,
821
822	pub total_completed:u64,
823
824	pub total_timed_out:u64,
825}
826
827/// Bulkhead semaphore for resource isolation with metrics and panic recovery
828pub struct BulkheadExecutor {
829	name:String,
830
831	semaphore:Arc<tokio::sync::Semaphore>,
832
833	config:BulkheadConfig,
834
835	current_requests:Arc<RwLock<u32>>,
836
837	queue_size:Arc<RwLock<u32>>,
838
839	total_rejected:Arc<RwLock<u64>>,
840
841	total_completed:Arc<RwLock<u64>>,
842
843	total_timed_out:Arc<RwLock<u64>>,
844}
845
846impl BulkheadExecutor {
847	/// Create a new bulkhead executor with metrics tracking
848	pub fn new(name:String, config:BulkheadConfig) -> Self {
849		Self {
850			name:name.clone(),
851
852			semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
853
854			config,
855
856			current_requests:Arc::new(RwLock::new(0)),
857
858			queue_size:Arc::new(RwLock::new(0)),
859
860			total_rejected:Arc::new(RwLock::new(0)),
861
862			total_completed:Arc::new(RwLock::new(0)),
863
864			total_timed_out:Arc::new(RwLock::new(0)),
865		}
866	}
867
868	/// Validate bulkhead configuration
869	pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
870		if config.max_concurrent == 0 {
871			return Err("max_concurrent must be greater than 0".to_string());
872		}
873
874		if config.max_queue == 0 {
875			return Err("max_queue must be greater than 0".to_string());
876		}
877
878		if config.timeout_secs == 0 {
879			return Err("timeout_secs must be greater than 0".to_string());
880		}
881
882		Ok(())
883	}
884
885	/// Execute with bulkhead protection and panic recovery
886	pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
887	where
888		F: std::future::Future<Output = Result<R, String>>, {
889		async {
890			// Validate timeout
891			if self.config.timeout_secs == 0 {
892				return Err("Bulkhead timeout must be greater than 0".to_string());
893			}
894
895			// Check queue size
896			let queue = *self.queue_size.read().await;
897
898			if queue >= self.config.max_queue as u32 {
899				*self.total_rejected.write().await += 1;
900
901				log::warn!("[Bulkhead] Queue full for {}, rejecting request", self.name);
902
903				return Err("Bulkhead queue full".to_string());
904			}
905
906			// Increment queue size
907			*self.queue_size.write().await += 1;
908
909			// Acquire permit with timeout
910			let _Permit =
911				match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
912					.await
913				{
914					Ok(Ok(_)) => {
915						// Permit acquired, proceed with execution
916						// Decrement queue size
917						*self.queue_size.write().await -= 1;
918					},
919
920					Ok(Err(e)) => {
921						*self.queue_size.write().await -= 1;
922
923						return Err(format!("Bulkhead semaphore error: {}", e));
924					},
925
926					Err(_) => {
927						*self.queue_size.write().await -= 1;
928
929						*self.total_timed_out.write().await += 1;
930
931						log::warn!("[Bulkhead] Timeout waiting for permit for {}", self.name);
932
933						return Err("Bulkhead timeout waiting for permit".to_string());
934					},
935				};
936
937			// Decrement queue size, increment current requests
938			*self.queue_size.write().await -= 1;
939
940			*self.current_requests.write().await += 1;
941
942			// Execute with timeout (no catch_unwind to avoid interior mutability issues)
943			let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
944
945			let execution_result:Result<R, String> = match execution_result {
946				Ok(Ok(value)) => Ok(value),
947
948				Ok(Err(e)) => Err(e),
949
950				Err(_) => {
951					*self.total_timed_out.write().await += 1;
952
953					Err("Bulkhead execution timeout".to_string())
954				},
955			};
956
957			if execution_result.is_ok() {
958				*self.total_completed.write().await += 1;
959			}
960
961			execution_result
962		}
963		.await
964	}
965
966	/// Get current load with panic recovery
967	pub async fn GetLoad(&self) -> (u32, u32) {
968		async {
969			let current = *self.current_requests.read().await;
970
971			let queue = *self.queue_size.read().await;
972
973			(current, queue)
974		}
975		.await
976	}
977
978	/// Get bulkhead statistics for metrics
979	pub async fn GetStatistics(&self) -> BulkheadStatistics {
980		async {
981			BulkheadStatistics {
982				name:self.name.clone(),
983
984				current_concurrent:*self.current_requests.read().await,
985
986				current_queue:*self.queue_size.read().await,
987
988				max_concurrent:self.config.max_concurrent,
989
990				max_queue:self.config.max_queue,
991
992				total_rejected:*self.total_rejected.read().await,
993
994				total_completed:*self.total_completed.read().await,
995
996				total_timed_out:*self.total_timed_out.read().await,
997			}
998		}
999		.await
1000	}
1001
1002	/// Calculate utilization percentage
1003	pub async fn GetUtilization(&self) -> f64 {
1004		let (current, _) = self.GetLoad().await;
1005
1006		if self.config.max_concurrent == 0 {
1007			return 0.0;
1008		}
1009
1010		(current as f64 / self.config.max_concurrent as f64) * 100.0
1011	}
1012}
1013
1014impl Clone for BulkheadExecutor {
1015	fn clone(&self) -> Self {
1016		Self {
1017			name:self.name.clone(),
1018
1019			semaphore:self.semaphore.clone(),
1020
1021			config:self.config.clone(),
1022
1023			current_requests:self.current_requests.clone(),
1024
1025			queue_size:self.queue_size.clone(),
1026
1027			total_rejected:self.total_rejected.clone(),
1028
1029			total_completed:self.total_completed.clone(),
1030
1031			total_timed_out:self.total_timed_out.clone(),
1032		}
1033	}
1034}
1035
1036/// Timeout manager for cascading deadlines with validation
1037#[derive(Debug, Clone)]
1038pub struct TimeoutManager {
1039	global_deadline:Option<Instant>,
1040
1041	operation_timeout:Duration,
1042}
1043
1044impl TimeoutManager {
1045	/// Create a new timeout manager
1046	pub fn new(operation_timeout:Duration) -> Self { Self { global_deadline:None, operation_timeout } }
1047
1048	/// Create with global deadline
1049	pub fn with_deadline(global_deadline:Instant, operation_timeout:Duration) -> Self {
1050		Self { global_deadline:Some(global_deadline), operation_timeout }
1051	}
1052
1053	/// Validate timeout configuration
1054	pub fn ValidateTimeout(timeout:Duration) -> Result<(), String> {
1055		if timeout.is_zero() {
1056			return Err("Timeout must be greater than 0".to_string());
1057		}
1058
1059		if timeout.as_secs() > 3600 {
1060			return Err("Timeout cannot exceed 1 hour".to_string());
1061		}
1062
1063		Ok(())
1064	}
1065
1066	/// Validate timeout as Result for error handling
1067	pub fn ValidateTimeoutResult(timeout:Duration) -> Result<Duration, String> {
1068		if timeout.is_zero() {
1069			return Err("Timeout must be greater than 0".to_string());
1070		}
1071
1072		if timeout.as_secs() > 3600 {
1073			return Err("Timeout cannot exceed 1 hour".to_string());
1074		}
1075
1076		Ok(timeout)
1077	}
1078
1079	/// Get remaining time until deadline
1080	pub fn remaining(&self) -> Option<Duration> {
1081		self.global_deadline.map(|deadline| {
1082			deadline
1083				.checked_duration_since(Instant::now())
1084				.unwrap_or(Duration::from_secs(0))
1085		})
1086	}
1087
1088	/// Get remaining time with panic recovery
1089	pub fn Remaining(&self) -> Option<Duration> {
1090		std::panic::catch_unwind(|| self.remaining()).unwrap_or_else(|e| {
1091			log::error!("[TimeoutManager] Panic in Remaining: {:?}", e);
1092
1093			None
1094		})
1095	}
1096
1097	/// Get effective timeout (minimum of operation timeout and remaining time)
1098	pub fn effective_timeout(&self) -> Duration {
1099		match self.remaining() {
1100			Some(remaining) => self.operation_timeout.min(remaining),
1101
1102			None => self.operation_timeout,
1103		}
1104	}
1105
1106	/// Get effective timeout with validation
1107	pub fn EffectiveTimeout(&self) -> Duration {
1108		std::panic::catch_unwind(|| {
1109			let timeout = self.effective_timeout();
1110
1111			match Self::ValidateTimeoutResult(timeout) {
1112				Ok(valid_timeout) => valid_timeout,
1113
1114				Err(_) => Duration::from_secs(30),
1115			}
1116		})
1117		.unwrap_or_else(|e| {
1118			log::error!("[TimeoutManager] Panic in EffectiveTimeout: {:?}", e);
1119
1120			Duration::from_secs(30)
1121		})
1122	}
1123
1124	/// Check if deadline has been exceeded
1125	pub fn is_exceeded(&self) -> bool { self.global_deadline.map_or(false, |deadline| Instant::now() >= deadline) }
1126
1127	/// Check if deadline has been exceeded with panic recovery
1128	pub fn IsExceeded(&self) -> bool {
1129		std::panic::catch_unwind(|| self.is_exceeded()).unwrap_or_else(|e| {
1130			log::error!("[TimeoutManager] Panic in IsExceeded: {:?}", e);
1131
1132			true // Fail safe: assume exceeded
1133		})
1134	}
1135
1136	/// Get the global deadline
1137	pub fn GetGlobalDeadline(&self) -> Option<Instant> { self.global_deadline }
1138
1139	/// Get the operation timeout
1140	pub fn GetOperationTimeout(&self) -> Duration { self.operation_timeout }
1141}
1142
1143/// Resilience orchestrator combining all patterns
1144pub struct ResilienceOrchestrator {
1145	retry_manager:Arc<RetryManager>,
1146
1147	circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
1148
1149	bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
1150}
1151
1152impl ResilienceOrchestrator {
1153	/// Create a new resilience orchestrator
1154	pub fn new(retry_policy:RetryPolicy) -> Self {
1155		Self {
1156			retry_manager:Arc::new(RetryManager::new(retry_policy)),
1157
1158			circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
1159
1160			bulkheads:Arc::new(RwLock::new(HashMap::new())),
1161		}
1162	}
1163
1164	/// Get or create circuit breaker with configuration validation
1165	pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
1166		let mut breakers = self.circuit_breakers.write().await;
1167
1168		Arc::new(
1169			breakers
1170				.entry(service.to_string())
1171				.or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
1172				.clone(),
1173		)
1174	}
1175
1176	/// Get or create bulkhead with configuration validation
1177	pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
1178		let mut bulkheads = self.bulkheads.write().await;
1179
1180		Arc::new(
1181			bulkheads
1182				.entry(service.to_string())
1183				.or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
1184				.clone(),
1185		)
1186	}
1187
1188	/// Get all circuit breaker statistics
1189	pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
1190		let breakers = self.circuit_breakers.read().await;
1191
1192		let mut stats = Vec::new();
1193
1194		for breaker in breakers.values() {
1195			stats.push(breaker.GetStatistics().await);
1196		}
1197
1198		stats
1199	}
1200
1201	/// Get all bulkhead statistics
1202	pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
1203		let bulkheads = self.bulkheads.read().await;
1204
1205		let mut stats = Vec::new();
1206
1207		for bulkhead in bulkheads.values() {
1208			stats.push(bulkhead.GetStatistics().await);
1209		}
1210
1211		stats
1212	}
1213
1214	/// Execute with full resilience and event publishing
1215	pub async fn ExecuteResilient<F, R>(
1216		&self,
1217
1218		service:&str,
1219
1220		retry_policy:&RetryPolicy,
1221
1222		circuit_config:CircuitBreakerConfig,
1223
1224		bulkhead_config:BulkheadConfig,
1225
1226		f:F,
1227	) -> Result<R, String>
1228	where
1229		F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
1230		// Validate configurations
1231		if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
1232			return Err(format!("Invalid circuit breaker config: {}", e));
1233		}
1234
1235		if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
1236			return Err(format!("Invalid bulkhead config: {}", e));
1237		}
1238
1239		let breaker = self.GetCircuitBreaker(service, circuit_config).await;
1240
1241		let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
1242
1243		// Check circuit state
1244		if breaker.GetState().await == CircuitState::Open {
1245			if !breaker.AttemptRecovery().await {
1246				return Err("Circuit breaker is open".to_string());
1247			}
1248		}
1249
1250		// Execute with bulkhead protection and retry logic
1251		let mut Attempt = 0;
1252
1253		let _LastError = "".to_string();
1254
1255		loop {
1256			let result = bulkhead.Execute(f()).await;
1257
1258			match result {
1259				Ok(Value) => {
1260					breaker.RecordSuccess().await;
1261
1262					// Publish retry success event
1263					let Event = RetryEvent {
1264						Service:service.to_string(),
1265
1266						Attempt,
1267
1268						ErrorClass:ErrorClass::Unknown,
1269
1270						DelayMs:0,
1271
1272						Success:true,
1273
1274						ErrorMessage:None,
1275					};
1276
1277					self.retry_manager.PublishRetryEvent(Event);
1278
1279					return Ok(Value);
1280				},
1281
1282				Err(E) => {
1283					let ErrorClass = self.retry_manager.ClassifyError(&E);
1284
1285					breaker.RecordFailure().await;
1286
1287					// Publish retry failure event
1288					let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1289
1290					let Event = RetryEvent {
1291						Service:service.to_string(),
1292
1293						Attempt,
1294
1295						ErrorClass,
1296
1297						DelayMs:Delay.as_millis() as u64,
1298
1299						Success:false,
1300
1301						ErrorMessage:Some(self.redact_sensitive_data(&E)),
1302					};
1303
1304					self.retry_manager.PublishRetryEvent(Event);
1305
1306					if Attempt < retry_policy.MaxRetries
1307						&& ErrorClass != ErrorClass::NonRetryable
1308						&& self.retry_manager.CanRetry(service).await
1309					{
1310						let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
1311
1312						log::debug!(
1313							"[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
1314							service,
1315							Attempt + 1,
1316							retry_policy.MaxRetries,
1317							Delay,
1318							self.redact_sensitive_data(&E)
1319						);
1320
1321						tokio::time::sleep(Delay).await;
1322
1323						Attempt += 1;
1324					} else {
1325						return Err(E);
1326					}
1327				},
1328			}
1329		}
1330	}
1331
1332	/// Redact sensitive data from error messages before logging/event
1333	/// publishing
1334	fn redact_sensitive_data(&self, message:&str) -> String {
1335		let mut redacted = message.to_string();
1336
1337		// Redact common patterns - simplified to avoid escaping issues
1338		let patterns = vec![
1339			(r"(?i)password[=:]\S+", "password=[REDACTED]"),
1340			(r"(?i)token[=:]\S+", "token=[REDACTED]"),
1341			(r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
1342			(r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
1343			(
1344				r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
1345				"Authorization: Bearer [REDACTED]",
1346			),
1347			(r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
1348			(r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
1349		];
1350
1351		for (pattern, replacement) in patterns {
1352			if let Ok(re) = regex::Regex::new(pattern) {
1353				redacted = re.replace_all(&redacted, replacement).to_string();
1354			}
1355		}
1356
1357		redacted
1358	}
1359
1360	/// Validate all configurations
1361	pub fn ValidateConfigurations(
1362		&self,
1363
1364		_RetryPolicy:&RetryPolicy,
1365
1366		CircuitConfig:&CircuitBreakerConfig,
1367
1368		BulkheadConfig:&BulkheadConfig,
1369	) -> Result<(), String> {
1370		self.retry_manager.ValidatePolicy()?;
1371
1372		CircuitBreaker::ValidateConfig(CircuitConfig)?;
1373
1374		BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1375
1376		TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1377
1378		Ok(())
1379	}
1380}
1381
1382impl Clone for ResilienceOrchestrator {
1383	fn clone(&self) -> Self {
1384		Self {
1385			retry_manager:self.retry_manager.clone(),
1386
1387			circuit_breakers:self.circuit_breakers.clone(),
1388
1389			bulkheads:self.bulkheads.clone(),
1390		}
1391	}
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396	use super::*;
1397
1398	#[test]
1399	fn test_retry_delay_calculation() {
1400		let policy = RetryPolicy::default();
1401
1402		let manager = RetryManager::new(policy);
1403
1404		let delay_1 = manager.CalculateRetryDelay(1);
1405
1406		let delay_2 = manager.CalculateRetryDelay(2);
1407
1408		// delay_2 should be roughly double delay_1 (with some jitter)
1409		assert!(delay_2 >= delay_1);
1410	}
1411
1412	#[test]
1413	fn test_adaptive_retry_delay() {
1414		let policy = RetryPolicy::default();
1415
1416		let manager = RetryManager::new(policy);
1417
1418		// Rate limited errors should have longer delays
1419		let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1420
1421		let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1422
1423		assert!(rate_limit_delay >= transient_delay);
1424	}
1425
1426	#[test]
1427	fn test_error_classification() {
1428		let policy = RetryPolicy::default();
1429
1430		let manager = RetryManager::new(policy);
1431
1432		assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1433
1434		assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1435
1436		assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1437
1438		assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1439	}
1440
1441	#[test]
1442	fn test_policy_validation() {
1443		let policy = RetryPolicy::default();
1444
1445		let manager = RetryManager::new(policy);
1446
1447		assert!(manager.ValidatePolicy().is_ok());
1448
1449		let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1450
1451		let invalid_manager = RetryManager::new(invalid_policy);
1452
1453		assert!(invalid_manager.ValidatePolicy().is_err());
1454	}
1455
1456	#[tokio::test]
1457	async fn test_circuit_breaker_state_transitions() {
1458		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1459
1460		let breaker = CircuitBreaker::new("test".to_string(), config);
1461
1462		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1463
1464		breaker.RecordFailure().await;
1465
1466		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1467
1468		breaker.RecordFailure().await;
1469
1470		assert_eq!(breaker.GetState().await, CircuitState::Open);
1471
1472		assert!(breaker.AttemptRecovery().await);
1473
1474		assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1475
1476		breaker.RecordSuccess().await;
1477
1478		assert_eq!(breaker.GetState().await, CircuitState::Closed);
1479	}
1480
1481	#[tokio::test]
1482	async fn test_circuit_breaker_validation() {
1483		let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1484
1485		let breaker = CircuitBreaker::new("test".to_string(), config);
1486
1487		// Validate initial state
1488		assert!(breaker.ValidateState().await.is_ok());
1489
1490		// Trigger state transition to open
1491		breaker.RecordFailure().await;
1492
1493		breaker.RecordFailure().await;
1494
1495		let validate_result = breaker.ValidateState().await;
1496
1497		// May be valid due to timeout behavior
1498		assert!(validate_result.is_ok() || validate_result.is_err());
1499	}
1500
1501	#[test]
1502	fn test_circuit_breaker_config_validation() {
1503		let valid_config = CircuitBreakerConfig::default();
1504
1505		assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1506
1507		let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1508
1509		assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1510	}
1511
1512	#[tokio::test]
1513	async fn test_bulkhead_resource_isolation() {
1514		let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1515
1516		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1517
1518		let (_current, _queue) = bulkhead.GetLoad().await;
1519
1520		assert_eq!(_current, 0);
1521
1522		assert_eq!(_queue, 0);
1523
1524		let stats = bulkhead.GetStatistics().await;
1525
1526		assert_eq!(stats.current_concurrent, 0);
1527
1528		assert_eq!(stats.current_queue, 0);
1529
1530		assert_eq!(stats.max_concurrent, 2);
1531
1532		assert_eq!(stats.max_queue, 5);
1533	}
1534
1535	#[tokio::test]
1536	async fn test_bulkhead_utilization() {
1537		let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1538
1539		let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1540
1541		let utilization = bulkhead.GetUtilization().await;
1542
1543		assert_eq!(utilization, 0.0);
1544	}
1545
1546	#[test]
1547	fn test_bulkhead_config_validation() {
1548		let valid_config = BulkheadConfig::default();
1549
1550		assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1551
1552		let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1553
1554		assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1555	}
1556
1557	#[test]
1558	fn test_timeout_manager() {
1559		let manager = TimeoutManager::new(Duration::from_secs(30));
1560
1561		assert!(!manager.IsExceeded());
1562
1563		assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1564
1565		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1566
1567		assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1568	}
1569
1570	#[test]
1571	fn test_timeout_manager_with_deadline() {
1572		let deadline = Instant::now() + Duration::from_secs(60);
1573
1574		let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1575
1576		let remaining = manager.Remaining();
1577
1578		assert!(remaining.is_some());
1579
1580		assert!(remaining.unwrap() <= Duration::from_secs(60));
1581	}
1582}