1pub mod Retry;
71
72pub use Retry::{ErrorClass, RetryEvent, RetryManager, RetryPolicy};
73
74pub mod Timeout;
75
76use std::{
77 collections::HashMap,
78 sync::Arc,
79 time::{Duration, Instant},
80};
81
82pub use Timeout::TimeoutManager;
83use tokio::sync::{RwLock, broadcast};
84use serde::{Deserialize, Serialize};
85
86use crate::dev_log;
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
92pub enum CircuitState {
93 Closed,
95
96 Open,
98
99 HalfOpen,
101}
102
103#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
105pub struct CircuitBreakerConfig {
106 pub FailureThreshold:u32,
108
109 pub SuccessThreshold:u32,
111
112 pub TimeoutSecs:u64,
114}
115
116impl Default for CircuitBreakerConfig {
117 fn default() -> Self { Self { FailureThreshold:5, SuccessThreshold:2, TimeoutSecs:60 } }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct CircuitEvent {
123 pub name:String,
124
125 pub FromState:CircuitState,
126
127 pub ToState:CircuitState,
128
129 pub timestamp:u64,
130
131 pub reason:String,
132}
133
134#[derive(Debug)]
137pub struct CircuitBreaker {
138 Name:String,
139
140 State:Arc<RwLock<CircuitState>>,
141
142 Config:CircuitBreakerConfig,
143
144 FailureCount:Arc<RwLock<u32>>,
145
146 SuccessCount:Arc<RwLock<u32>>,
147
148 LastFailureTime:Arc<RwLock<Option<Instant>>>,
149
150 EventTx:Arc<broadcast::Sender<CircuitEvent>>,
151
152 StateTransitionCounter:Arc<RwLock<u32>>,
153}
154
155impl CircuitBreaker {
156 pub fn new(name:String, Config:CircuitBreakerConfig) -> Self {
158 let (EventTx, _) = broadcast::channel(1000);
159
160 Self {
161 Name:name.clone(),
162
163 State:Arc::new(RwLock::new(CircuitState::Closed)),
164
165 Config,
166
167 FailureCount:Arc::new(RwLock::new(0)),
168
169 SuccessCount:Arc::new(RwLock::new(0)),
170
171 LastFailureTime:Arc::new(RwLock::new(None)),
172
173 EventTx:Arc::new(EventTx),
174
175 StateTransitionCounter:Arc::new(RwLock::new(0)),
176 }
177 }
178
179 pub fn GetEventTransmitter(&self) -> broadcast::Sender<CircuitEvent> { (*self.EventTx).clone() }
181
182 pub async fn GetState(&self) -> CircuitState { *self.State.read().await }
184
185 pub async fn ValidateState(&self) -> Result<(), String> {
187 let state = *self.State.read().await;
188
189 let failures = *self.FailureCount.read().await;
190
191 let successes = *self.SuccessCount.read().await;
192
193 match state {
194 CircuitState::Closed => {
195 if successes != 0 {
196 return Err(format!("Inconsistent state: Closed but has {} successes", successes));
197 }
198
199 if failures >= self.Config.FailureThreshold {
200 dev_log!(
201 "resilience",
202 "warn: [CircuitBreaker] State inconsistency: Closed but failure count ({}) >= threshold ({})",
203 failures,
204 self.Config.FailureThreshold
205 );
206 }
207 },
208
209 CircuitState::Open => {
210 if failures < self.Config.FailureThreshold {
211 dev_log!(
212 "resilience",
213 "warn: [CircuitBreaker] State inconsistency: Open but failure count ({}) < threshold ({})",
214 failures,
215 self.Config.FailureThreshold
216 );
217 }
218 },
219
220 CircuitState::HalfOpen => {
221 if successes >= self.Config.SuccessThreshold {
222 return Err(format!(
223 "Inconsistent state: HalfOpen but has {} successes (should be Closed)",
224 successes
225 ));
226 }
227 },
228 }
229
230 Ok(())
231 }
232
233 async fn TransitionState(&self, NewState:CircuitState, reason:&str) -> Result<(), String> {
235 let CurrentState = self.GetState().await;
236
237 if CurrentState == NewState {
238 return Ok(());
240 }
241
242 match (CurrentState, NewState) {
244 (CircuitState::Closed, CircuitState::Open) | (CircuitState::HalfOpen, CircuitState::Open) => {
245
246 },
248
249 (CircuitState::Open, CircuitState::HalfOpen) => {
250
251 },
253
254 (CircuitState::HalfOpen, CircuitState::Closed) => {
255
256 },
258
259 _ => {
260 return Err(format!(
261 "Invalid state transition from {:?} to {:?} for {}",
262 CurrentState, NewState, self.Name
263 ));
264 },
265 }
266
267 let event = CircuitEvent {
269 name:self.Name.clone(),
270
271 FromState:CurrentState,
272
273 ToState:NewState,
274
275 timestamp:crate::Utility::CurrentTimestamp(),
276
277 reason:reason.to_string(),
278 };
279
280 let _ = self.EventTx.send(event);
281
282 *self.State.write().await = NewState;
284
285 *self.StateTransitionCounter.write().await += 1;
287
288 dev_log!(
289 "resilience",
290 "[CircuitBreaker] State transition for {}: {:?} -> {:?} (reason: {})",
291 self.Name,
292 CurrentState,
293 NewState,
294 reason
295 );
296
297 self.ValidateState().await.map_err(|e| {
299 dev_log!(
300 "resilience",
301 "error: [CircuitBreaker] State validation failed after transition: {}",
302 e
303 );
304
305 e
306 })?;
307
308 Ok(())
309 }
310
311 pub async fn RecordSuccess(&self) {
313 let state = self.GetState().await;
314
315 match state {
316 CircuitState::Closed => {
317 *self.FailureCount.write().await = 0;
319 },
320
321 CircuitState::HalfOpen => {
322 let mut SuccessCount = self.SuccessCount.write().await;
324
325 *SuccessCount += 1;
326
327 if *SuccessCount >= self.Config.SuccessThreshold {
328 let _ = self.TransitionState(CircuitState::Closed, "Success threshold reached").await;
330
331 *self.FailureCount.write().await = 0;
332
333 *self.SuccessCount.write().await = 0;
334 }
335 },
336
337 _ => {},
338 }
339 }
340
341 pub async fn RecordFailure(&self) {
343 let State = self.GetState().await;
344
345 *self.LastFailureTime.write().await = Some(Instant::now());
346
347 match State {
348 CircuitState::Closed => {
349 let mut FailureCount = self.FailureCount.write().await;
351
352 *FailureCount += 1;
353
354 if *FailureCount >= self.Config.FailureThreshold {
355 let _ = self.TransitionState(CircuitState::Open, "Failure threshold reached").await;
357
358 *self.SuccessCount.write().await = 0;
359 }
360 },
361
362 CircuitState::HalfOpen => {
363 let _ = self.TransitionState(CircuitState::Open, "Failure in half-open state").await;
365
366 *self.SuccessCount.write().await = 0;
367 },
368
369 _ => {},
370 }
371 }
372
373 pub async fn AttemptRecovery(&self) -> bool {
376 let state = self.GetState().await;
377
378 if state != CircuitState::Open {
379 return state == CircuitState::HalfOpen;
380 }
381
382 if let Some(last_failure) = *self.LastFailureTime.read().await {
383 if last_failure.elapsed() >= Duration::from_secs(self.Config.TimeoutSecs) {
384 let _ = self.TransitionState(CircuitState::HalfOpen, "Recovery timeout elapsed").await;
385
386 *self.SuccessCount.write().await = 0;
387
388 return true;
389 }
390 }
391
392 false
393 }
394
395 pub async fn GetStatistics(&self) -> CircuitStatistics {
397 CircuitStatistics {
398 Name:self.Name.clone(),
399
400 State:self.GetState().await,
401
402 Failures:*self.FailureCount.read().await,
403
404 Successes:*self.SuccessCount.read().await,
405
406 StateTransitions:*self.StateTransitionCounter.read().await,
407
408 LastFailureTime:*self.LastFailureTime.read().await,
409 }
410 }
411
412 pub fn ValidateConfig(&config:&CircuitBreakerConfig) -> Result<(), String> {
414 if config.FailureThreshold == 0 {
415 return Err("FailureThreshold must be greater than 0".to_string());
416 }
417
418 if config.SuccessThreshold == 0 {
419 return Err("SuccessThreshold must be greater than 0".to_string());
420 }
421
422 if config.TimeoutSecs == 0 {
423 return Err("TimeoutSecs must be greater than 0".to_string());
424 }
425
426 Ok(())
427 }
428}
429
430#[derive(Debug, Clone, Serialize)]
432pub struct CircuitStatistics {
433 pub Name:String,
434
435 pub State:CircuitState,
436
437 pub Failures:u32,
438
439 pub Successes:u32,
440
441 pub StateTransitions:u32,
442
443 #[serde(skip_serializing)]
444 pub LastFailureTime:Option<Instant>,
445}
446
447impl<'de> Deserialize<'de> for CircuitStatistics {
448 fn deserialize<D>(Deserializer:D) -> std::result::Result<Self, D::Error>
449 where
450 D: serde::Deserializer<'de>, {
451 use serde::de::{self, Visitor};
452
453 struct CircuitStatisticsVisitor;
454
455 impl<'de> Visitor<'de> for CircuitStatisticsVisitor {
456 type Value = CircuitStatistics;
457
458 fn expecting(&self, formatter:&mut std::fmt::Formatter) -> std::fmt::Result {
459 formatter.write_str("struct CircuitStatistics")
460 }
461
462 fn visit_map<A>(self, mut map:A) -> std::result::Result<CircuitStatistics, A::Error>
463 where
464 A: de::MapAccess<'de>, {
465 let mut Name = None;
466
467 let mut State = None;
468
469 let mut Failures = None;
470
471 let mut Successes = None;
472
473 let mut StateTransitions = None;
474
475 while let Some(key) = map.next_key::<String>()? {
476 match key.as_str() {
477 "name" => Name = Some(map.next_value()?),
478
479 "state" => State = Some(map.next_value()?),
480
481 "failures" => Failures = Some(map.next_value()?),
482
483 "successes" => Successes = Some(map.next_value()?),
484
485 "state_transitions" => StateTransitions = Some(map.next_value()?),
486
487 _ => {
488 map.next_value::<de::IgnoredAny>()?;
489 },
490 }
491 }
492
493 Ok(CircuitStatistics {
494 Name:Name.ok_or_else(|| de::Error::missing_field("name"))?,
495
496 State:State.ok_or_else(|| de::Error::missing_field("state"))?,
497
498 Failures:Failures.ok_or_else(|| de::Error::missing_field("failures"))?,
499
500 Successes:Successes.ok_or_else(|| de::Error::missing_field("successes"))?,
501
502 StateTransitions:StateTransitions.ok_or_else(|| de::Error::missing_field("state_transitions"))?,
503
504 LastFailureTime:None,
505 })
506 }
507 }
508
509 const FIELDS:&[&str] = &["name", "state", "failures", "successes", "state_transitions"];
510
511 Deserializer.deserialize_struct("CircuitStatistics", FIELDS, CircuitStatisticsVisitor)
512 }
513}
514
515impl Clone for CircuitBreaker {
516 fn clone(&self) -> Self {
517 Self {
518 Name:self.Name.clone(),
519
520 State:self.State.clone(),
521
522 Config:self.Config.clone(),
523
524 FailureCount:self.FailureCount.clone(),
525
526 SuccessCount:self.SuccessCount.clone(),
527
528 LastFailureTime:self.LastFailureTime.clone(),
529
530 EventTx:self.EventTx.clone(),
531
532 StateTransitionCounter:self.StateTransitionCounter.clone(),
533 }
534 }
535}
536
537#[derive(Debug, Clone, Serialize, Deserialize)]
539pub struct BulkheadConfig {
540 pub max_concurrent:usize,
542
543 pub max_queue:usize,
545
546 pub timeout_secs:u64,
548}
549
550impl Default for BulkheadConfig {
551 fn default() -> Self { Self { max_concurrent:10, max_queue:100, timeout_secs:30 } }
552}
553
554#[derive(Debug, Clone, Serialize, Deserialize)]
556pub struct BulkheadStatistics {
557 pub name:String,
558
559 pub current_concurrent:u32,
560
561 pub current_queue:u32,
562
563 pub max_concurrent:usize,
564
565 pub max_queue:usize,
566
567 pub total_rejected:u64,
568
569 pub total_completed:u64,
570
571 pub total_timed_out:u64,
572}
573
574#[derive(Debug)]
576pub struct BulkheadExecutor {
577 name:String,
578
579 semaphore:Arc<tokio::sync::Semaphore>,
580
581 config:BulkheadConfig,
582
583 current_requests:Arc<RwLock<u32>>,
584
585 queue_size:Arc<RwLock<u32>>,
586
587 total_rejected:Arc<RwLock<u64>>,
588
589 total_completed:Arc<RwLock<u64>>,
590
591 total_timed_out:Arc<RwLock<u64>>,
592}
593
594impl BulkheadExecutor {
595 pub fn new(name:String, config:BulkheadConfig) -> Self {
597 Self {
598 name:name.clone(),
599
600 semaphore:Arc::new(tokio::sync::Semaphore::new(config.max_concurrent)),
601
602 config,
603
604 current_requests:Arc::new(RwLock::new(0)),
605
606 queue_size:Arc::new(RwLock::new(0)),
607
608 total_rejected:Arc::new(RwLock::new(0)),
609
610 total_completed:Arc::new(RwLock::new(0)),
611
612 total_timed_out:Arc::new(RwLock::new(0)),
613 }
614 }
615
616 pub fn ValidateConfig(config:&BulkheadConfig) -> Result<(), String> {
618 if config.max_concurrent == 0 {
619 return Err("max_concurrent must be greater than 0".to_string());
620 }
621
622 if config.max_queue == 0 {
623 return Err("max_queue must be greater than 0".to_string());
624 }
625
626 if config.timeout_secs == 0 {
627 return Err("timeout_secs must be greater than 0".to_string());
628 }
629
630 Ok(())
631 }
632
633 pub async fn Execute<F, R>(&self, f:F) -> Result<R, String>
635 where
636 F: std::future::Future<Output = Result<R, String>>, {
637 async {
638 if self.config.timeout_secs == 0 {
640 return Err("Bulkhead timeout must be greater than 0".to_string());
641 }
642
643 let queue = *self.queue_size.read().await;
645
646 if queue >= self.config.max_queue as u32 {
647 *self.total_rejected.write().await += 1;
648
649 dev_log!("resilience", "warn: [Bulkhead] Queue full for {}, rejecting request", self.name);
650
651 return Err("Bulkhead queue full".to_string());
652 }
653
654 *self.queue_size.write().await += 1;
656
657 let _Permit =
659 match tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), self.semaphore.acquire())
660 .await
661 {
662 Ok(Ok(_)) => {
663 *self.queue_size.write().await -= 1;
666 },
667
668 Ok(Err(e)) => {
669 *self.queue_size.write().await -= 1;
670
671 return Err(format!("Bulkhead semaphore error: {}", e));
672 },
673
674 Err(_) => {
675 *self.queue_size.write().await -= 1;
676
677 *self.total_timed_out.write().await += 1;
678
679 dev_log!("resilience", "warn: [Bulkhead] Timeout waiting for permit for {}", self.name);
680
681 return Err("Bulkhead timeout waiting for permit".to_string());
682 },
683 };
684
685 *self.queue_size.write().await -= 1;
687
688 *self.current_requests.write().await += 1;
689
690 let execution_result = tokio::time::timeout(Duration::from_secs(self.config.timeout_secs), f).await;
692
693 let execution_result:Result<R, String> = match execution_result {
694 Ok(Ok(value)) => Ok(value),
695
696 Ok(Err(e)) => Err(e),
697
698 Err(_) => {
699 *self.total_timed_out.write().await += 1;
700
701 Err("Bulkhead execution timeout".to_string())
702 },
703 };
704
705 if execution_result.is_ok() {
706 *self.total_completed.write().await += 1;
707 }
708
709 execution_result
710 }
711 .await
712 }
713
714 pub async fn GetLoad(&self) -> (u32, u32) {
716 async {
717 let current = *self.current_requests.read().await;
718
719 let queue = *self.queue_size.read().await;
720
721 (current, queue)
722 }
723 .await
724 }
725
726 pub async fn GetStatistics(&self) -> BulkheadStatistics {
728 async {
729 BulkheadStatistics {
730 name:self.name.clone(),
731
732 current_concurrent:*self.current_requests.read().await,
733
734 current_queue:*self.queue_size.read().await,
735
736 max_concurrent:self.config.max_concurrent,
737
738 max_queue:self.config.max_queue,
739
740 total_rejected:*self.total_rejected.read().await,
741
742 total_completed:*self.total_completed.read().await,
743
744 total_timed_out:*self.total_timed_out.read().await,
745 }
746 }
747 .await
748 }
749
750 pub async fn GetUtilization(&self) -> f64 {
752 let (current, _) = self.GetLoad().await;
753
754 if self.config.max_concurrent == 0 {
755 return 0.0;
756 }
757
758 (current as f64 / self.config.max_concurrent as f64) * 100.0
759 }
760}
761
762impl Clone for BulkheadExecutor {
763 fn clone(&self) -> Self {
764 Self {
765 name:self.name.clone(),
766
767 semaphore:self.semaphore.clone(),
768
769 config:self.config.clone(),
770
771 current_requests:self.current_requests.clone(),
772
773 queue_size:self.queue_size.clone(),
774
775 total_rejected:self.total_rejected.clone(),
776
777 total_completed:self.total_completed.clone(),
778
779 total_timed_out:self.total_timed_out.clone(),
780 }
781 }
782}
783
784#[derive(Debug)]
786pub struct ResilienceOrchestrator {
787 retry_manager:Arc<RetryManager>,
788
789 circuit_breakers:Arc<RwLock<HashMap<String, CircuitBreaker>>>,
790
791 bulkheads:Arc<RwLock<HashMap<String, BulkheadExecutor>>>,
792}
793
794impl ResilienceOrchestrator {
795 pub fn new(retry_policy:RetryPolicy) -> Self {
797 Self {
798 retry_manager:Arc::new(RetryManager::new(retry_policy)),
799
800 circuit_breakers:Arc::new(RwLock::new(HashMap::new())),
801
802 bulkheads:Arc::new(RwLock::new(HashMap::new())),
803 }
804 }
805
806 pub async fn GetCircuitBreaker(&self, service:&str, config:CircuitBreakerConfig) -> Arc<CircuitBreaker> {
808 let mut breakers = self.circuit_breakers.write().await;
809
810 Arc::new(
811 breakers
812 .entry(service.to_string())
813 .or_insert_with(|| CircuitBreaker::new(service.to_string(), config))
814 .clone(),
815 )
816 }
817
818 pub async fn GetBulkhead(&self, service:&str, config:BulkheadConfig) -> Arc<BulkheadExecutor> {
820 let mut bulkheads = self.bulkheads.write().await;
821
822 Arc::new(
823 bulkheads
824 .entry(service.to_string())
825 .or_insert_with(|| BulkheadExecutor::new(service.to_string(), config))
826 .clone(),
827 )
828 }
829
830 pub async fn GetAllCircuitBreakerStatistics(&self) -> Vec<CircuitStatistics> {
832 let breakers = self.circuit_breakers.read().await;
833
834 let mut stats = Vec::new();
835
836 for breaker in breakers.values() {
837 stats.push(breaker.GetStatistics().await);
838 }
839
840 stats
841 }
842
843 pub async fn GetAllBulkheadStatistics(&self) -> Vec<BulkheadStatistics> {
845 let bulkheads = self.bulkheads.read().await;
846
847 let mut stats = Vec::new();
848
849 for bulkhead in bulkheads.values() {
850 stats.push(bulkhead.GetStatistics().await);
851 }
852
853 stats
854 }
855
856 pub async fn ExecuteResilient<F, R>(
858 &self,
859
860 service:&str,
861
862 retry_policy:&RetryPolicy,
863
864 circuit_config:CircuitBreakerConfig,
865
866 bulkhead_config:BulkheadConfig,
867
868 f:F,
869 ) -> Result<R, String>
870 where
871 F: Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<R, String>> + Send>>, {
872 if let Err(e) = CircuitBreaker::ValidateConfig(&circuit_config) {
874 return Err(format!("Invalid circuit breaker config: {}", e));
875 }
876
877 if let Err(e) = BulkheadExecutor::ValidateConfig(&bulkhead_config) {
878 return Err(format!("Invalid bulkhead config: {}", e));
879 }
880
881 let breaker = self.GetCircuitBreaker(service, circuit_config).await;
882
883 let bulkhead = self.GetBulkhead(service, bulkhead_config).await;
884
885 if breaker.GetState().await == CircuitState::Open {
887 if !breaker.AttemptRecovery().await {
888 return Err("Circuit breaker is open".to_string());
889 }
890 }
891
892 let mut Attempt = 0;
894
895 let _LastError = "".to_string();
896
897 loop {
898 let result = bulkhead.Execute(f()).await;
899
900 match result {
901 Ok(Value) => {
902 breaker.RecordSuccess().await;
903
904 let Event = RetryEvent {
906 Service:service.to_string(),
907
908 Attempt,
909
910 ErrorClass:ErrorClass::Unknown,
911
912 DelayMs:0,
913
914 Success:true,
915
916 ErrorMessage:None,
917 };
918
919 self.retry_manager.PublishRetryEvent(Event);
920
921 return Ok(Value);
922 },
923
924 Err(E) => {
925 let ErrorClass = self.retry_manager.ClassifyError(&E);
926
927 breaker.RecordFailure().await;
928
929 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
931
932 let Event = RetryEvent {
933 Service:service.to_string(),
934
935 Attempt,
936
937 ErrorClass,
938
939 DelayMs:Delay.as_millis() as u64,
940
941 Success:false,
942
943 ErrorMessage:Some(self.redact_sensitive_data(&E)),
944 };
945
946 self.retry_manager.PublishRetryEvent(Event);
947
948 if Attempt < retry_policy.MaxRetries
949 && ErrorClass != ErrorClass::NonRetryable
950 && self.retry_manager.CanRetry(service).await
951 {
952 let Delay = self.retry_manager.CalculateAdaptiveRetryDelay(&E, Attempt);
953
954 dev_log!(
955 "resilience",
956 "[ResilienceOrchestrator] Retrying {} (attempt {}/{}) after {:?}, error: {}",
957 service,
958 Attempt + 1,
959 retry_policy.MaxRetries,
960 Delay,
961 self.redact_sensitive_data(&E)
962 );
963
964 tokio::time::sleep(Delay).await;
965
966 Attempt += 1;
967 } else {
968 return Err(E);
969 }
970 },
971 }
972 }
973 }
974
975 fn redact_sensitive_data(&self, message:&str) -> String {
978 let mut redacted = message.to_string();
979
980 let patterns = vec![
982 (r"(?i)password[=:]\S+", "password=[REDACTED]"),
983 (r"(?i)token[=:]\S+", "token=[REDACTED]"),
984 (r"(?i)(api|private)[_-]?key[=:]\S+", "api_key=[REDACTED]"),
985 (r"(?i)secret[=:]\S+", "secret=[REDACTED]"),
986 (
987 r"(?i)authorization[=[:space:]]+Bearer[[:space:]]+\S+",
988 "Authorization: Bearer [REDACTED]",
989 ),
990 (r"(?i)credit[_-]?card[=:][\d-]+", "credit_card=[REDACTED]"),
991 (r"(?i)ssn[=:][\d-]{9,11}", "ssn=[REDACTED]"),
992 ];
993
994 for (pattern, replacement) in patterns {
995 if let Ok(re) = regex::Regex::new(pattern) {
996 redacted = re.replace_all(&redacted, replacement).to_string();
997 }
998 }
999
1000 redacted
1001 }
1002
1003 pub fn ValidateConfigurations(
1005 &self,
1006
1007 _RetryPolicy:&RetryPolicy,
1008
1009 CircuitConfig:&CircuitBreakerConfig,
1010
1011 BulkheadConfig:&BulkheadConfig,
1012 ) -> Result<(), String> {
1013 self.retry_manager.ValidatePolicy()?;
1014
1015 CircuitBreaker::ValidateConfig(CircuitConfig)?;
1016
1017 BulkheadExecutor::ValidateConfig(BulkheadConfig)?;
1018
1019 TimeoutManager::ValidateTimeout(Duration::from_secs(BulkheadConfig.timeout_secs))?;
1020
1021 Ok(())
1022 }
1023}
1024
1025impl Clone for ResilienceOrchestrator {
1026 fn clone(&self) -> Self {
1027 Self {
1028 retry_manager:self.retry_manager.clone(),
1029
1030 circuit_breakers:self.circuit_breakers.clone(),
1031
1032 bulkheads:self.bulkheads.clone(),
1033 }
1034 }
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039
1040 use super::*;
1041
1042 #[test]
1043 fn test_retry_delay_calculation() {
1044 let policy = RetryPolicy::default();
1045
1046 let manager = RetryManager::new(policy);
1047
1048 let delay_1 = manager.CalculateRetryDelay(1);
1049
1050 let delay_2 = manager.CalculateRetryDelay(2);
1051
1052 assert!(delay_2 >= delay_1);
1054 }
1055
1056 #[test]
1057 fn test_adaptive_retry_delay() {
1058 let policy = RetryPolicy::default();
1059
1060 let manager = RetryManager::new(policy);
1061
1062 let rate_limit_delay = manager.CalculateAdaptiveRetryDelay("rate_limit_exceeded", 1);
1064
1065 let transient_delay = manager.CalculateAdaptiveRetryDelay("timeout", 1);
1066
1067 assert!(rate_limit_delay >= transient_delay);
1068 }
1069
1070 #[test]
1071 fn test_error_classification() {
1072 let policy = RetryPolicy::default();
1073
1074 let manager = RetryManager::new(policy);
1075
1076 assert_eq!(manager.ClassifyError("connection timeout"), ErrorClass::Transient);
1077
1078 assert_eq!(manager.ClassifyError("rate limit exceeded"), ErrorClass::RateLimited);
1079
1080 assert_eq!(manager.ClassifyError("unauthorized"), ErrorClass::NonRetryable);
1081
1082 assert_eq!(manager.ClassifyError("server error"), ErrorClass::ServerError);
1083 }
1084
1085 #[test]
1086 fn test_policy_validation() {
1087 let policy = RetryPolicy::default();
1088
1089 let manager = RetryManager::new(policy);
1090
1091 assert!(manager.ValidatePolicy().is_ok());
1092
1093 let invalid_policy = RetryPolicy { MaxRetries:0, ..Default::default() };
1094
1095 let invalid_manager = RetryManager::new(invalid_policy);
1096
1097 assert!(invalid_manager.ValidatePolicy().is_err());
1098 }
1099
1100 #[tokio::test]
1101 async fn test_circuit_breaker_state_transitions() {
1102 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1103
1104 let breaker = CircuitBreaker::new("test".to_string(), config);
1105
1106 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1107
1108 breaker.RecordFailure().await;
1109
1110 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1111
1112 breaker.RecordFailure().await;
1113
1114 assert_eq!(breaker.GetState().await, CircuitState::Open);
1115
1116 assert!(breaker.AttemptRecovery().await);
1117
1118 assert_eq!(breaker.GetState().await, CircuitState::HalfOpen);
1119
1120 breaker.RecordSuccess().await;
1121
1122 assert_eq!(breaker.GetState().await, CircuitState::Closed);
1123 }
1124
1125 #[tokio::test]
1126 async fn test_circuit_breaker_validation() {
1127 let config = CircuitBreakerConfig { FailureThreshold:2, SuccessThreshold:1, TimeoutSecs:1 };
1128
1129 let breaker = CircuitBreaker::new("test".to_string(), config);
1130
1131 assert!(breaker.ValidateState().await.is_ok());
1133
1134 breaker.RecordFailure().await;
1136
1137 breaker.RecordFailure().await;
1138
1139 let validate_result = breaker.ValidateState().await;
1140
1141 assert!(validate_result.is_ok() || validate_result.is_err());
1143 }
1144
1145 #[test]
1146 fn test_circuit_breaker_config_validation() {
1147 let valid_config = CircuitBreakerConfig::default();
1148
1149 assert!(CircuitBreaker::ValidateConfig(&valid_config).is_ok());
1150
1151 let invalid_config = CircuitBreakerConfig { FailureThreshold:0, ..Default::default() };
1152
1153 assert!(CircuitBreaker::ValidateConfig(&invalid_config).is_err());
1154 }
1155
1156 #[tokio::test]
1157 async fn test_bulkhead_resource_isolation() {
1158 let config = BulkheadConfig { max_concurrent:2, max_queue:5, timeout_secs:10 };
1159
1160 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1161
1162 let (_current, _queue) = bulkhead.GetLoad().await;
1163
1164 assert_eq!(_current, 0);
1165
1166 assert_eq!(_queue, 0);
1167
1168 let stats = bulkhead.GetStatistics().await;
1169
1170 assert_eq!(stats.current_concurrent, 0);
1171
1172 assert_eq!(stats.current_queue, 0);
1173
1174 assert_eq!(stats.max_concurrent, 2);
1175
1176 assert_eq!(stats.max_queue, 5);
1177 }
1178
1179 #[tokio::test]
1180 async fn test_bulkhead_utilization() {
1181 let config = BulkheadConfig { max_concurrent:10, max_queue:100, timeout_secs:30 };
1182
1183 let bulkhead = BulkheadExecutor::new("test".to_string(), config);
1184
1185 let utilization = bulkhead.GetUtilization().await;
1186
1187 assert_eq!(utilization, 0.0);
1188 }
1189
1190 #[test]
1191 fn test_bulkhead_config_validation() {
1192 let valid_config = BulkheadConfig::default();
1193
1194 assert!(BulkheadExecutor::ValidateConfig(&valid_config).is_ok());
1195
1196 let invalid_config = BulkheadConfig { max_concurrent:0, ..Default::default() };
1197
1198 assert!(BulkheadExecutor::ValidateConfig(&invalid_config).is_err());
1199 }
1200
1201 #[test]
1202 fn test_timeout_manager() {
1203 let manager = TimeoutManager::new(Duration::from_secs(30));
1204
1205 assert!(!manager.IsExceeded());
1206
1207 assert_eq!(manager.EffectiveTimeout(), Duration::from_secs(30));
1208
1209 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(30)).is_ok());
1210
1211 assert!(TimeoutManager::ValidateTimeout(Duration::from_secs(0)).is_err());
1212 }
1213
1214 #[test]
1215 fn test_timeout_manager_with_deadline() {
1216 let deadline = Instant::now() + Duration::from_secs(60);
1217
1218 let manager = TimeoutManager::with_deadline(deadline, Duration::from_secs(30));
1219
1220 let remaining = manager.Remaining();
1221
1222 assert!(remaining.is_some());
1223
1224 assert!(remaining.unwrap() <= Duration::from_secs(60));
1225 }
1226}