1use std::{
93 path::{Path, PathBuf},
94 sync::Arc,
95 time::{Duration, Instant},
96};
97
98use serde::{Deserialize, Serialize};
99use tokio::{
100 fs,
101 sync::{RwLock, broadcast, mpsc},
102 time::sleep,
103};
104use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Result as NotifyResult, Watcher};
105use chrono::{DateTime, Utc};
106
107use crate::{AirError, Configuration::AirConfiguration, Result, dev_log};
108
109pub struct ConfigHotReload {
115 active_config:Arc<RwLock<AirConfiguration>>,
117
118 previous_config:Arc<RwLock<Option<AirConfiguration>>>,
120
121 last_config_hash:Arc<RwLock<Option<String>>>,
123
124 config_path:PathBuf,
126
127 watcher:Option<Arc<RwLock<notify::RecommendedWatcher>>>,
129
130 change_sender:broadcast::Sender<ConfigChangeEvent>,
132
133 reload_tx:mpsc::Sender<ReloadRequest>,
135
136 change_history:Arc<RwLock<Vec<ConfigChangeRecord>>>,
138
139 last_reload:Arc<RwLock<Option<DateTime<Utc>>>>,
141
142 last_reload_duration:Arc<RwLock<Option<Duration>>>,
144
145 enabled:Arc<RwLock<bool>>,
147
148 debounce_delay:Duration,
150
151 last_change_time:Arc<RwLock<Option<Instant>>>,
153
154 stats:Arc<RwLock<ReloadStats>>,
156
157 validators:Arc<RwLock<Vec<Box<dyn ConfigValidator>>>>,
159
160 max_retries:u32,
162
163 retry_delay:Duration,
165
166 auto_rollback_enabled:Arc<RwLock<bool>>,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
172pub struct ConfigChangeEvent {
173 pub timestamp:DateTime<Utc>,
174
175 pub old_config_hash:Option<String>,
176
177 pub new_config_hash:String,
178
179 pub changes:Vec<ConfigChange>,
180
181 pub success:bool,
182}
183
184pub enum ReloadRequest {
186 Manual,
188
189 Signal,
191
192 FileChange,
194
195 Periodic,
197}
198
199#[derive(Debug, Clone, Default)]
201pub struct ReloadStats {
202 total_attempts:u64,
203
204 successful_reloads:u64,
205
206 failed_reloads:u64,
207
208 validation_errors:u64,
209
210 parse_errors:u64,
211
212 rollback_attempts:u64,
213
214 last_error:Option<String>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct ConfigChangeRecord {
220 pub timestamp:DateTime<Utc>,
221
222 pub changes:Vec<ConfigChange>,
223
224 pub validated:bool,
225
226 pub reason:String,
227
228 pub rollback_performed:bool,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct ConfigChange {
234 pub path:String,
235
236 pub old_value:serde_json::Value,
237
238 pub new_value:serde_json::Value,
239}
240
241pub trait ConfigValidator: Send + Sync {
243 fn validate(&self, config:&AirConfiguration) -> Result<()>;
245
246 fn name(&self) -> &str;
248
249 fn priority(&self) -> u32 { 0 }
251}
252
253pub struct gRPCConfigValidator;
259
260impl ConfigValidator for gRPCConfigValidator {
261 fn validate(&self, config:&AirConfiguration) -> Result<()> {
262 if config.gRPC.BindAddress.is_empty() {
263 return Err(AirError::Configuration("gRPC bind address cannot be empty".to_string()));
264 }
265
266 if !crate::Configuration::ConfigurationManager::IsValidAddress(&config.gRPC.BindAddress) {
268 return Err(AirError::Configuration(format!(
269 "Invalid gRPC bind address '{}': must be host:port or [IPv6]:port",
270 config.gRPC.BindAddress
271 )));
272 }
273
274 if config.gRPC.MaxConnections < 10 || config.gRPC.MaxConnections > 10000 {
276 return Err(AirError::Configuration(format!(
277 "gRPC MaxConnections {} is out of range [10, 10000]",
278 config.gRPC.MaxConnections
279 )));
280 }
281
282 if config.gRPC.RequestTimeoutSecs < 1 || config.gRPC.RequestTimeoutSecs > 3600 {
284 return Err(AirError::Configuration(format!(
285 "gRPC RequestTimeoutSecs {} is out of range [1, 3600]",
286 config.gRPC.RequestTimeoutSecs
287 )));
288 }
289
290 Ok(())
291 }
292
293 fn name(&self) -> &str { "gRPCConfigValidator" }
294
295 fn priority(&self) -> u32 {
296 100 }
298}
299
300pub struct AuthConfigValidator;
302
303impl ConfigValidator for AuthConfigValidator {
304 fn validate(&self, config:&AirConfiguration) -> Result<()> {
305 if config.Authentication.Enabled {
306 if config.Authentication.CredentialsPath.is_empty() {
307 return Err(AirError::Configuration(
308 "Authentication credentials path cannot be empty when enabled".to_string(),
309 ));
310 }
311
312 if config.Authentication.CredentialsPath.contains("..") {
314 return Err(AirError::Configuration(
315 "Authentication credentials path contains '..' which is not allowed".to_string(),
316 ));
317 }
318 }
319
320 if config.Authentication.TokenExpirationHours < 1 || config.Authentication.TokenExpirationHours > 8760 {
322 return Err(AirError::Configuration(format!(
323 "Token expiration {} hours is out of range [1, 8760]",
324 config.Authentication.TokenExpirationHours
325 )));
326 }
327
328 if config.Authentication.MaxSessions < 1 || config.Authentication.MaxSessions > 1000 {
330 return Err(AirError::Configuration(format!(
331 "Max sessions {} is out of range [1, 1000]",
332 config.Authentication.MaxSessions
333 )));
334 }
335
336 Ok(())
337 }
338
339 fn name(&self) -> &str { "AuthConfigValidator" }
340
341 fn priority(&self) -> u32 {
342 90 }
344}
345
346pub struct UpdateConfigValidator;
348
349impl ConfigValidator for UpdateConfigValidator {
350 fn validate(&self, config:&AirConfiguration) -> Result<()> {
351 if config.Updates.Enabled {
352 if config.Updates.UpdateServerUrl.is_empty() {
353 return Err(AirError::Configuration(
354 "Update server URL cannot be empty when updates are enabled".to_string(),
355 ));
356 }
357
358 if !config.Updates.UpdateServerUrl.starts_with("https://") {
360 return Err(AirError::Configuration(format!(
361 "Update server URL must use HTTPS: {}",
362 config.Updates.UpdateServerUrl
363 )));
364 }
365
366 if !crate::Configuration::ConfigurationManager::IsValidUrl(&config.Updates.UpdateServerUrl) {
368 return Err(AirError::Configuration(format!(
369 "Invalid update server URL: {}",
370 config.Updates.UpdateServerUrl
371 )));
372 }
373 }
374
375 if config.Updates.CheckIntervalHours < 1 || config.Updates.CheckIntervalHours > 168 {
377 return Err(AirError::Configuration(format!(
378 "Update check interval {} hours is out of range [1, 168]",
379 config.Updates.CheckIntervalHours
380 )));
381 }
382
383 Ok(())
384 }
385
386 fn name(&self) -> &str { "UpdateConfigValidator" }
387
388 fn priority(&self) -> u32 {
389 50 }
391}
392
393pub struct DownloadConfigValidator;
395
396impl ConfigValidator for DownloadConfigValidator {
397 fn validate(&self, config:&AirConfiguration) -> Result<()> {
398 if config.Downloader.Enabled {
399 if config.Downloader.CacheDirectory.is_empty() {
400 return Err(AirError::Configuration(
401 "Download cache directory cannot be empty when enabled".to_string(),
402 ));
403 }
404
405 if config.Downloader.CacheDirectory.contains("..") {
407 return Err(AirError::Configuration(
408 "Download cache directory contains '..' which is not allowed".to_string(),
409 ));
410 }
411
412 if config.Downloader.MaxConcurrentDownloads < 1 || config.Downloader.MaxConcurrentDownloads > 50 {
414 return Err(AirError::Configuration(format!(
415 "Max concurrent downloads {} is out of range [1, 50]",
416 config.Downloader.MaxConcurrentDownloads
417 )));
418 }
419
420 if config.Downloader.DownloadTimeoutSecs < 10 || config.Downloader.DownloadTimeoutSecs > 3600 {
422 return Err(AirError::Configuration(format!(
423 "Download timeout {} seconds is out of range [10, 3600]",
424 config.Downloader.DownloadTimeoutSecs
425 )));
426 }
427
428 if config.Downloader.MaxRetries > 10 {
430 return Err(AirError::Configuration(format!(
431 "Max retries {} exceeds maximum (10)",
432 config.Downloader.MaxRetries
433 )));
434 }
435 }
436
437 Ok(())
438 }
439
440 fn name(&self) -> &str { "DownloadConfigValidator" }
441
442 fn priority(&self) -> u32 {
443 50 }
445}
446
447pub struct IndexingConfigValidator;
449
450impl ConfigValidator for IndexingConfigValidator {
451 fn validate(&self, config:&AirConfiguration) -> Result<()> {
452 if config.Indexing.Enabled {
453 if config.Indexing.IndexDirectory.is_empty() {
454 return Err(AirError::Configuration(
455 "Index directory cannot be empty when indexing is enabled".to_string(),
456 ));
457 }
458
459 if config.Indexing.IndexDirectory.contains("..") {
461 return Err(AirError::Configuration(
462 "Index directory contains '..' which is not allowed".to_string(),
463 ));
464 }
465
466 if config.Indexing.FileTypes.is_empty() {
468 return Err(AirError::Configuration(
469 "File types to index cannot be empty when indexing is enabled".to_string(),
470 ));
471 }
472
473 if config.Indexing.MaxFileSizeMb < 1 || config.Indexing.MaxFileSizeMb > 1024 {
475 return Err(AirError::Configuration(format!(
476 "Max file size {} MB is out of range [1, 1024]",
477 config.Indexing.MaxFileSizeMb
478 )));
479 }
480
481 if config.Indexing.UpdateIntervalMinutes < 1 || config.Indexing.UpdateIntervalMinutes > 1440 {
483 return Err(AirError::Configuration(format!(
484 "Index update interval {} minutes is out of range [1, 1440]",
485 config.Indexing.UpdateIntervalMinutes
486 )));
487 }
488 }
489
490 Ok(())
491 }
492
493 fn name(&self) -> &str { "IndexingConfigValidator" }
494
495 fn priority(&self) -> u32 {
496 40 }
498}
499
500pub struct LoggingConfigValidator;
502
503impl ConfigValidator for LoggingConfigValidator {
504 fn validate(&self, config:&AirConfiguration) -> Result<()> {
505 let valid_levels = ["trace", "debug", "info", "warn", "error"];
506
507 if !valid_levels.contains(&config.Logging.Level.as_str()) {
508 return Err(AirError::Configuration(format!(
509 "Invalid log level '{}': must be one of: {}",
510 config.Logging.Level,
511 valid_levels.join(", ")
512 )));
513 }
514
515 if config.Logging.MaxFileSizeMb < 1 || config.Logging.MaxFileSizeMb > 1000 {
517 return Err(AirError::Configuration(format!(
518 "Max log file size {} MB is out of range [1, 1000]",
519 config.Logging.MaxFileSizeMb
520 )));
521 }
522
523 if config.Logging.MaxFiles < 1 || config.Logging.MaxFiles > 50 {
525 return Err(AirError::Configuration(format!(
526 "Max log files {} is out of range [1, 50]",
527 config.Logging.MaxFiles
528 )));
529 }
530
531 Ok(())
532 }
533
534 fn name(&self) -> &str { "LoggingConfigValidator" }
535
536 fn priority(&self) -> u32 {
537 30 }
539}
540
541pub struct PerformanceConfigValidator;
543
544impl ConfigValidator for PerformanceConfigValidator {
545 fn validate(&self, config:&AirConfiguration) -> Result<()> {
546 if config.Performance.MemoryLimitMb < 64 || config.Performance.MemoryLimitMb > 16384 {
548 return Err(AirError::Configuration(format!(
549 "Memory limit {} MB is out of range [64, 16384]",
550 config.Performance.MemoryLimitMb
551 )));
552 }
553
554 if config.Performance.CPULimitPercent < 10 || config.Performance.CPULimitPercent > 100 {
556 return Err(AirError::Configuration(format!(
557 "CPU limit {}% is out of range [10, 100]",
558 config.Performance.CPULimitPercent
559 )));
560 }
561
562 if config.Performance.DiskLimitMb < 100 || config.Performance.DiskLimitMb > 102400 {
564 return Err(AirError::Configuration(format!(
565 "Disk limit {} MB is out of range [100, 102400]",
566 config.Performance.DiskLimitMb
567 )));
568 }
569
570 if config.Performance.BackgroundTaskIntervalSecs < 1 || config.Performance.BackgroundTaskIntervalSecs > 3600 {
572 return Err(AirError::Configuration(format!(
573 "Background task interval {} seconds is out of range [1, 3600]",
574 config.Performance.BackgroundTaskIntervalSecs
575 )));
576 }
577
578 Ok(())
579 }
580
581 fn name(&self) -> &str { "PerformanceConfigValidator" }
582
583 fn priority(&self) -> u32 {
584 20 }
586}
587
588impl ConfigHotReload {
593 pub async fn New(config_path:PathBuf, initial_config:AirConfiguration) -> Result<Self> {
604 let (change_sender, _) = broadcast::channel(100);
605
606 let (reload_tx, reload_rx) = mpsc::channel(100);
607
608 let manager = Self {
609 active_config:Arc::new(RwLock::new(initial_config.clone())),
610
611 previous_config:Arc::new(RwLock::new(None)),
612
613 last_config_hash:Arc::new(RwLock::new(None)),
614
615 config_path,
616
617 watcher:None,
618
619 change_sender,
620
621 reload_tx,
622
623 change_history:Arc::new(RwLock::new(Vec::new())),
624
625 last_reload:Arc::new(RwLock::new(None)),
626
627 last_reload_duration:Arc::new(RwLock::new(None)),
628
629 enabled:Arc::new(RwLock::new(true)),
630
631 debounce_delay:Duration::from_millis(500),
632
633 last_change_time:Arc::new(RwLock::new(None)),
634
635 stats:Arc::new(RwLock::new(ReloadStats::default())),
636
637 validators:Arc::new(RwLock::new(Self::DefaultValidators())),
638
639 max_retries:3,
640
641 retry_delay:Duration::from_secs(1),
642
643 auto_rollback_enabled:Arc::new(RwLock::new(true)),
644 };
645
646 let hash = crate::Configuration::ConfigurationManager::ComputeHash(&initial_config)?;
648
649 *manager.last_config_hash.write().await = Some(hash);
650
651 manager.StartReloadProcessor(reload_rx);
653
654 Ok(manager)
655 }
656
657 fn DefaultValidators() -> Vec<Box<dyn ConfigValidator>> {
659 vec![
660 Box::new(gRPCConfigValidator),
661 Box::new(AuthConfigValidator),
662 Box::new(UpdateConfigValidator),
663 Box::new(DownloadConfigValidator),
664 Box::new(IndexingConfigValidator),
665 Box::new(LoggingConfigValidator),
666 Box::new(PerformanceConfigValidator),
667 ]
668 }
669
670 pub async fn EnableFileWatching(&mut self) -> Result<()> {
672 dev_log!("config", "[HotReload] Enabling file watching for configuration changes");
673
674 let config_path = self.config_path.clone();
675
676 let (tx, mut rx) = tokio::sync::mpsc::channel(100);
678
679 let mut watcher = RecommendedWatcher::new(
680 move |res:NotifyResult<Event>| {
681 if let Ok(event) = res {
682 let _ = tx.blocking_send(event);
683 }
684 },
685 notify::Config::default(),
686 )
687 .map_err(|e| AirError::Configuration(format!("Failed to create file watcher: {}", e)))?;
688
689 let watch_path = if config_path.is_file() {
691 config_path.parent().unwrap_or(&config_path).to_path_buf()
692 } else {
693 config_path.clone()
694 };
695
696 watcher
697 .watch(&watch_path, RecursiveMode::NonRecursive)
698 .map_err(|e| AirError::Configuration(format!("Failed to watch path '{}': {}", watch_path.display(), e)))?;
699
700 let reload_tx = self.reload_tx.clone();
702
703 let config_path_clone = config_path.clone();
704
705 tokio::spawn(async move {
706 while let Some(event) = rx.recv().await {
707 dev_log!("config", "file event detected: {:?}", event.kind);
708
709 let should_reload = event
711 .paths
712 .iter()
713 .any(|p| p == &config_path_clone || p == config_path_clone.as_path())
714 && event.kind != EventKind::Access(notify::event::AccessKind::Any);
715
716 if should_reload {
717 let _ = reload_tx.send(ReloadRequest::FileChange).await;
718 }
719 }
720 });
721
722 self.watcher = Some(Arc::new(RwLock::new(watcher)));
723
724 *self.enabled.write().await = true;
725
726 dev_log!("config", "[HotReload] File watching enabled for: {}", config_path.display());
727
728 Ok(())
729 }
730
731 pub async fn DisableFileWatching(&mut self) -> Result<()> {
733 *self.enabled.write().await = false;
734
735 if let Some(watcher) = self.watcher.take() {
736 drop(watcher);
737 }
738
739 dev_log!("config", "[HotReload] File watching disabled");
740
741 Ok(())
742 }
743
744 fn StartReloadProcessor(&self, mut reload_rx:mpsc::Receiver<ReloadRequest>) {
746 let enabled = self.enabled.clone();
747
748 let debounce_delay = self.debounce_delay;
749
750 let last_change_time = self.last_change_time.clone();
751
752 tokio::spawn(async move {
753 while let Some(request) = reload_rx.recv().await {
754 if !*enabled.read().await {
755 continue;
756 }
757
758 let now = Instant::now();
760
761 {
762 let mut last_change = last_change_time.write().await;
763
764 if let Some(last) = *last_change {
765 if now.duration_since(last) < debounce_delay {
766 continue; }
768 }
769
770 *last_change = Some(now);
771 }
772
773 sleep(debounce_delay).await;
774
775 match request {
777 ReloadRequest::Manual => {
778 dev_log!("config", "[HotReload] Processing manual reload request");
779 },
780 ReloadRequest::Signal => {
781 dev_log!("config", "[HotReload] Processing signal-based reload request");
782 },
783 ReloadRequest::FileChange => {
784 dev_log!("config", "[HotReload] Processing file change reload request");
785 },
786 ReloadRequest::Periodic => {
787 dev_log!("config", "processing periodic reload check");
788 },
789 }
790 }
791 });
792 }
793
794 pub async fn Reload(&self) -> Result<()> {
796 dev_log!(
797 "config",
798 "[HotReload] Reloading configuration from: {}",
799 self.config_path.display()
800 );
801
802 if !*self.enabled.read().await {
804 return Err(AirError::Configuration("Hot-reload is disabled".to_string()));
805 }
806
807 let start_time = Instant::now();
808
809 {
811 let mut stats = self.stats.write().await;
812
813 stats.total_attempts += 1;
814 }
815
816 let mut last_error = None;
818
819 for attempt in 0..=self.max_retries {
820 match self.AttemptReload().await {
821 Ok(()) => {
822 let duration = start_time.elapsed();
823
824 *self.last_reload_duration.write().await = Some(duration);
825
826 {
828 let mut stats = self.stats.write().await;
829
830 stats.successful_reloads += 1;
831
832 stats.last_error = None;
833 }
834
835 dev_log!("config", "[HotReload] Configuration reloaded successfully in {:?}", duration);
836
837 return Ok(());
838 },
839
840 Err(e) => {
841 last_error = Some(e.clone());
842
843 if attempt < self.max_retries {
844 let delay = self.retry_delay * 2_u32.pow(attempt);
845
846 dev_log!(
847 "config",
848 "warn: [HotReload] Reload attempt {} failed, retrying in {:?}: {}",
849 attempt + 1,
850 delay,
851 e
852 );
853
854 sleep(delay).await;
855 }
856 },
857 }
858 }
859
860 {
862 let mut stats = self.stats.write().await;
863
864 stats.failed_reloads += 1;
865
866 stats.last_error = last_error.as_ref().map(|e| e.to_string());
867 }
868
869 let error = last_error.unwrap_or_else(|| AirError::Configuration("Unknown error".to_string()));
870
871 if *self.auto_rollback_enabled.read().await {
873 dev_log!("config", "[HotReload] Attempting rollback due to reload failure");
874
875 if let Err(rollback_err) = self.Rollback().await {
876 dev_log!("config", "error: [HotReload] Rollback also failed: {}", rollback_err);
877 }
878 }
879
880 Err(error)
881 }
882
883 async fn AttemptReload(&self) -> Result<()> {
885 let content = fs::read_to_string(&self.config_path).await;
887
888 if let Err(e) = content {
889 let mut stats = self.stats.write().await;
890
891 stats.parse_errors += 1;
892
893 return Err(AirError::Configuration(format!("Failed to read config file: {}", e)));
894 }
895
896 let content = content.unwrap();
897
898 let new_config:std::result::Result<AirConfiguration, toml::de::Error> = toml::from_str(&content);
899
900 if let Err(e) = new_config {
901 let mut stats = self.stats.write().await;
902
903 stats.parse_errors += 1;
904
905 return Err(AirError::Configuration(format!("Failed to parse config file: {}", e)));
906 }
907
908 let new_config = new_config.unwrap();
909
910 self.ValidateConfig(&new_config).await?;
912
913 let new_hash = crate::Configuration::ConfigurationManager::ComputeHash(&new_config)?;
915
916 let current_hash = self.last_config_hash.read().await.clone();
917
918 if let Some(ref hash) = current_hash {
919 if hash == &new_hash {
920 dev_log!("config", "[HotReload] Configuration unchanged, skipping reload");
921
922 return Ok(());
923 }
924 }
925
926 let old_config = self.active_config.read().await.clone();
928
929 let old_hash = current_hash;
930
931 *self.active_config.write().await = new_config.clone();
932 *self.previous_config.write().await = Some(old_config.clone());
933 *self.last_config_hash.write().await = Some(new_hash.clone());
934 *self.last_reload.write().await = Some(Utc::now());
935
936 let changes = self.ComputeChanges(&old_config, &new_config);
938
939 let record = ConfigChangeRecord {
940 timestamp:Utc::now(),
941
942 changes:changes.clone(),
943
944 validated:true,
945
946 reason:"Reload".to_string(),
947
948 rollback_performed:false,
949 };
950
951 let mut history = self.change_history.write().await;
952
953 history.push(record);
954
955 let history_len = history.len();
957
958 if history_len > 1000 {
959 history.drain(0..history_len - 1000);
960 }
961
962 drop(history);
963
964 let event = ConfigChangeEvent {
966 timestamp:Utc::now(),
967
968 old_config_hash:old_hash,
969
970 new_config_hash:new_hash,
971
972 changes,
973
974 success:true,
975 };
976
977 let _ = self.change_sender.send(event);
978
979 Ok(())
980 }
981
982 pub async fn ReloadAndValidate(&self) -> Result<()> { self.Reload().await }
984
985 pub async fn TriggerReload(&self) -> Result<()> {
987 self.reload_tx
988 .send(ReloadRequest::Manual)
989 .await
990 .map_err(|e| AirError::Configuration(format!("Failed to trigger reload: {}", e)))?;
991
992 Ok(())
993 }
994
995 async fn ValidateConfig(&self, config:&AirConfiguration) -> Result<()> {
997 let validators = self.validators.read().await;
998
999 let mut sorted_validators:Vec<_> = validators.iter().collect();
1001
1002 sorted_validators.sort_by(|a, b| b.priority().cmp(&a.priority()));
1003
1004 for validator in sorted_validators {
1005 let result = validator.validate(config);
1006
1007 if let Err(e) = result {
1008 let mut stats = self.stats.write().await;
1009
1010 stats.validation_errors += 1;
1011
1012 stats.last_error = Some(format!("{}: {}", validator.name(), e));
1013
1014 dev_log!("config", "error: [HotReload] Validation failed ({}): {}", validator.name(), e);
1015
1016 return Err(AirError::Configuration(format!("{}: {}", validator.name(), e)));
1017 }
1018
1019 dev_log!("config", "validator '{}' passed", validator.name());
1020 }
1021
1022 dev_log!(
1023 "config",
1024 "[HotReload] Configuration validation passed ({} validators)",
1025 validators.len()
1026 );
1027
1028 Ok(())
1029 }
1030
1031 pub async fn RegisterValidator(&self, validator:Box<dyn ConfigValidator>) {
1033 let mut validators = self.validators.write().await;
1034
1035 validators.push(validator);
1036
1037 dev_log!("config", "[HotReload] Registered validator (total: {})", validators.len());
1038 }
1039
1040 pub async fn Rollback(&self) -> Result<()> {
1042 let previous = {
1043 let prev = self.previous_config.read().await.clone();
1044
1045 prev.ok_or_else(|| AirError::Configuration("No previous configuration to rollback to".to_string()))?
1046 };
1047
1048 self.ValidateConfig(&previous).await?;
1050
1051 let _old_config = self.active_config.read().await.clone();
1053
1054 let old_hash = self.last_config_hash.read().await.clone();
1055
1056 *self.active_config.write().await = previous.clone();
1057 let new_hash = crate::Configuration::ConfigurationManager::ComputeHash(&previous)?;
1058
1059 *self.last_config_hash.write().await = Some(new_hash.clone());
1060
1061 let record = ConfigChangeRecord {
1063 timestamp:Utc::now(),
1064
1065 changes:vec![],
1066
1067 validated:true,
1068
1069 reason:"Rollback".to_string(),
1070
1071 rollback_performed:true,
1072 };
1073
1074 {
1075 let mut stats = self.stats.write().await;
1076
1077 stats.rollback_attempts += 1;
1078 }
1079
1080 self.change_history.write().await.push(record);
1081
1082 let event = ConfigChangeEvent {
1084 timestamp:Utc::now(),
1085
1086 old_config_hash:old_hash,
1087
1088 new_config_hash:new_hash,
1089
1090 changes:vec![],
1091
1092 success:true,
1093 };
1094
1095 let _ = self.change_sender.send(event);
1096
1097 dev_log!("config", "[HotReload] Configuration rolled back successfully");
1098
1099 Ok(())
1100 }
1101
1102 pub async fn GetConfig(&self) -> AirConfiguration { self.active_config.read().await.clone() }
1104
1105 pub async fn GetConfigRef(&self) -> tokio::sync::RwLockReadGuard<'_, AirConfiguration> {
1107 self.active_config.read().await
1108 }
1109
1110 pub async fn SetValue(&self, path:&str, value:&str) -> Result<()> {
1112 let mut config = self.GetConfig().await;
1113
1114 Self::SetConfigValue(&mut config, path, value)?;
1116
1117 self.ValidateConfig(&config).await?;
1119
1120 let content = toml::to_string_pretty(&config)
1122 .map_err(|e| AirError::Configuration(format!("Serialization failed: {}", e)))?;
1123
1124 fs::write(&self.config_path, content)
1125 .await
1126 .map_err(|e| AirError::Configuration(format!("Failed to write config: {}", e)))?;
1127
1128 self.Reload().await?;
1130
1131 dev_log!("config", "[HotReload] Configuration value updated: {} = {}", path, value);
1132
1133 Ok(())
1134 }
1135
1136 pub async fn GetValue(&self, path:&str) -> Result<serde_json::Value> {
1138 let config = self.active_config.read().await;
1139
1140 let config_json = serde_json::to_value(&*config)
1141 .map_err(|e| AirError::Configuration(format!("Serialization failed: {}", e)))?;
1142
1143 let mut current = config_json;
1144
1145 for key in path.split('.') {
1146 current = current
1147 .get(key)
1148 .ok_or_else(|| AirError::Configuration(format!("Key not found: {}", path)))?
1149 .clone();
1150 }
1151
1152 Ok(current)
1153 }
1154
1155 fn SetConfigValue(config:&mut AirConfiguration, path:&str, value:&str) -> Result<()> {
1157 let parts:Vec<&str> = path.split('.').collect();
1158
1159 match parts.as_slice() {
1160 ["grpc", "bind_address"] => config.gRPC.BindAddress = value.to_string(),
1161
1162 ["grpc", "max_connections"] => {
1163 config.gRPC.MaxConnections = value
1164 .parse()
1165 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1166 },
1167
1168 ["grpc", "request_timeout_secs"] => {
1169 config.gRPC.RequestTimeoutSecs = value
1170 .parse()
1171 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1172 },
1173
1174 ["authentication", "enabled"] => {
1175 config.Authentication.Enabled = value
1176 .parse()
1177 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1178 },
1179
1180 ["authentication", "credentials_path"] => {
1181 config.Authentication.CredentialsPath = value.to_string();
1182 },
1183
1184 ["updates", "enabled"] => {
1185 config.Updates.Enabled = value
1186 .parse()
1187 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1188 },
1189
1190 ["updates", "auto_download"] => {
1191 config.Updates.AutoDownload = value
1192 .parse()
1193 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1194 },
1195
1196 ["updates", "auto_install"] => {
1197 config.Updates.AutoInstall = value
1198 .parse()
1199 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1200 },
1201
1202 ["downloader", "enabled"] => {
1203 config.Downloader.Enabled = value
1204 .parse()
1205 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1206 },
1207
1208 ["indexing", "enabled"] => {
1209 config.Indexing.Enabled = value
1210 .parse()
1211 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1212 },
1213
1214 ["logging", "level"] => {
1215 config.Logging.Level = value.to_lowercase();
1216 },
1217
1218 ["logging", "console_enabled"] => {
1219 config.Logging.ConsoleEnabled = value
1220 .parse()
1221 .map_err(|_| AirError::Configuration(format!("Invalid value: {}", value)))?;
1222 },
1223
1224 _ => {
1225 return Err(AirError::Configuration(format!("Unknown configuration path: {}", path)));
1226 },
1227 }
1228
1229 Ok(())
1230 }
1231
1232 fn ComputeChanges(&self, old:&AirConfiguration, new:&AirConfiguration) -> Vec<ConfigChange> {
1234 let mut changes = Vec::new();
1235
1236 let old_json = serde_json::to_value(old).unwrap_or_default();
1237
1238 let new_json = serde_json::to_value(new).unwrap_or_default();
1239
1240 Self::DiffJson("", &old_json, &new_json, &mut changes);
1241
1242 changes
1243 }
1244
1245 fn DiffJson(prefix:&str, old:&serde_json::Value, new:&serde_json::Value, changes:&mut Vec<ConfigChange>) {
1247 match (old, new) {
1248 (serde_json::Value::Object(old_map), serde_json::Value::Object(new_map)) => {
1249 for (key, new_val) in new_map {
1250 let new_prefix = if prefix.is_empty() { key.clone() } else { format!("{}.{}", prefix, key) };
1251
1252 if let Some(old_val) = old_map.get(key) {
1253 Self::DiffJson(&new_prefix, old_val, new_val, changes);
1254 } else {
1255 changes.push(ConfigChange {
1256 path:new_prefix,
1257 old_value:serde_json::Value::Null,
1258 new_value:new_val.clone(),
1259 });
1260 }
1261 }
1262 },
1263
1264 (old_val, new_val) if old_val != new_val => {
1265 changes.push(ConfigChange {
1266 path:prefix.to_string(),
1267 old_value:old_val.clone(),
1268 new_value:new_val.clone(),
1269 });
1270 },
1271
1272 _ => {},
1273 }
1274 }
1275
1276 pub async fn GetChangeHistory(&self, limit:Option<usize>) -> Vec<ConfigChangeRecord> {
1278 let history = self.change_history.read().await;
1279
1280 if let Some(limit) = limit {
1281 history.iter().rev().take(limit).cloned().collect()
1282 } else {
1283 history.iter().rev().cloned().collect()
1284 }
1285 }
1286
1287 pub async fn GetLastReload(&self) -> Option<DateTime<Utc>> { *self.last_reload.read().await }
1289
1290 pub async fn GetLastReloadDuration(&self) -> Option<Duration> { *self.last_reload_duration.read().await }
1292
1293 pub async fn GetStats(&self) -> ReloadStats { self.stats.read().await.clone() }
1295
1296 pub async fn IsEnabled(&self) -> bool { *self.enabled.read().await }
1298
1299 pub async fn SetAutoRollback(&self, enabled:bool) {
1301 *self.auto_rollback_enabled.write().await = enabled;
1302 dev_log!(
1303 "config",
1304 "[HotReload] Auto-rollback {}",
1305 if enabled { "enabled" } else { "disabled" }
1306 );
1307 }
1308
1309 pub fn SubscribeChanges(&self) -> broadcast::Receiver<ConfigChangeEvent> { self.change_sender.subscribe() }
1313
1314 pub fn GetConfigPath(&self) -> &Path { &self.config_path }
1316
1317 pub async fn SetDebounceDelay(&self, delay:Duration) {
1319 dev_log!("config", "[HotReload] Debounce delay set to {:?}", delay);
1323 }
1324}
1325
1326#[cfg(test)]
1327mod tests {
1328
1329 use tempfile::NamedTempFile;
1330
1331 use super::*;
1332
1333 #[tokio::test]
1334 async fn test_config_hot_reload_creation() {
1335 let config = AirConfiguration::default();
1336
1337 let temp_file = NamedTempFile::new().unwrap();
1338
1339 let path = temp_file.path().to_path_buf();
1340
1341 let manager = ConfigHotReload::New(path, config).await.expect("Failed to create manager");
1342
1343 assert_eq!(manager.GetLastReload().await, None);
1344
1345 assert!(
1346 !manager.GetChangeHistory(Some(10)).await.is_empty() || manager.GetChangeHistory(Some(10)).await.is_empty()
1347 );
1348 }
1349
1350 #[tokio::test]
1351 async fn test_get_set_value() {
1352 let config = AirConfiguration::default();
1353
1354 let temp_file = NamedTempFile::new().unwrap();
1355
1356 let path = temp_file.path().to_path_buf();
1357
1358 let content = toml::to_string_pretty(&config).unwrap();
1360
1361 fs::write(&path, content).await.unwrap();
1362
1363 let manager = ConfigHotReload::New(path, config).await.expect("Failed to create manager");
1364
1365 let value = manager.GetValue("grpc.bind_address").await.unwrap();
1367
1368 assert_eq!(value, "[::1]:50053");
1369 }
1370
1371 #[tokio::test]
1372 async fn test_validator_priority() {
1373 let grpc = gRPCConfigValidator;
1374
1375 let auth = AuthConfigValidator;
1376
1377 let perf = PerformanceConfigValidator;
1378
1379 assert!(grpc.priority() > auth.priority());
1380
1381 assert!(auth.priority() > perf.priority());
1382 }
1383
1384 #[tokio::test]
1385 async fn test_compute_changes() {
1386 let config = AirConfiguration::default();
1387
1388 let manager = ConfigHotReload::New(PathBuf::from("/tmp/test.toml"), config)
1389 .await
1390 .expect("Failed to create manager");
1391
1392 let mut new_config = AirConfiguration::default();
1393
1394 new_config.gRPC.BindAddress = "[::1]:50054".to_string();
1395
1396 let changes = manager.ComputeChanges(&AirConfiguration::default(), &new_config);
1397
1398 assert!(!changes.is_empty());
1399
1400 assert!(changes.iter().any(|c| c.path == "grpc.bind_address"));
1401 }
1402}