1use std::{collections::HashMap, sync::Arc};
93
94use serde::{Deserialize, Serialize};
95use tokio::sync::{Mutex, RwLock};
96use systemstat::{Platform, System};
97
98use crate::{AirError, Configuration::AirConfiguration, Result, Utility, dev_log};
99
100#[derive(Debug)]
102pub struct ApplicationState {
103 pub Configuration:Arc<AirConfiguration>,
105
106 pub ServiceStatus:Arc<RwLock<HashMap<String, ServiceStatus>>>,
108
109 pub ActiveRequest:Arc<Mutex<HashMap<String, RequestStatus>>>,
111
112 pub Metrics:Arc<RwLock<PerformanceMetrics>>,
114
115 pub Resources:Arc<RwLock<ResourceUsage>>,
117
118 pub Connection:Arc<RwLock<HashMap<String, ConnectionInfo>>>,
120
121 pub BackgroundTask:Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub enum ServiceStatus {
128 Starting,
129
130 Running,
131
132 Stopping,
133
134 Stopped,
135
136 Error(String),
137}
138
139#[derive(Debug, Clone)]
141pub struct RequestStatus {
142 pub RequestId:String,
143
144 pub Service:String,
145
146 pub StartedAt:u64,
147
148 pub Status:RequestState,
149
150 pub Progress:Option<f32>,
151}
152
153#[derive(Debug, Clone)]
155pub enum RequestState {
156 Pending,
157
158 InProgress,
159
160 Completed,
161
162 Failed(String),
163
164 Cancelled,
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct PerformanceMetrics {
170 pub TotalRequest:u64,
171
172 pub SuccessfulRequest:u64,
173
174 pub FailedRequest:u64,
175
176 pub AverageResponseTime:f64,
177
178 pub UptimeSeconds:u64,
179
180 pub LastUpdated:u64,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct ResourceUsage {
186 pub MemoryUsageMb:f64,
187
188 pub CPUUsagePercent:f64,
189
190 pub DiskUsageMb:f64,
191
192 pub NetworkUsageMbps:f64,
193
194 pub LastUpdated:u64,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct ConnectionInfo {
200 pub ConnectionId:String,
201
202 pub ClientId:String,
203
204 pub ClientVersion:String,
205
206 pub ProtocolVersion:u32,
207
208 pub LastHeartbeat:u64,
209
210 pub IsActive:bool,
211
212 pub ConnectionType:ConnectionType,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
217pub enum ConnectionType {
218 MountainMain,
219
220 MountainWorker,
221
222 Cocoon,
223
224 Wind,
225
226 External,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct ConnectionHealthReport {
232 pub TotalConnection:usize,
233
234 pub HealthyConnection:usize,
235
236 pub StaleConnection:usize,
237
238 pub ConnectionByType:HashMap<String, usize>,
239
240 pub LastChecked:u64,
241}
242
243impl ApplicationState {
244 pub async fn New(Configuration:Arc<AirConfiguration>) -> Result<Self> {
246 let State = Self {
247 Configuration,
248
249 ServiceStatus:Arc::new(RwLock::new(HashMap::new())),
250
251 ActiveRequest:Arc::new(Mutex::new(HashMap::new())),
252
253 Metrics:Arc::new(RwLock::new(PerformanceMetrics {
254 TotalRequest:0,
255 SuccessfulRequest:0,
256 FailedRequest:0,
257 AverageResponseTime:0.0,
258 UptimeSeconds:0,
259 LastUpdated:Utility::CurrentTimestamp(),
260 })),
261
262 Resources:Arc::new(RwLock::new(ResourceUsage {
263 MemoryUsageMb:0.0,
264 CPUUsagePercent:0.0,
265 DiskUsageMb:0.0,
266 NetworkUsageMbps:0.0,
267 LastUpdated:Utility::CurrentTimestamp(),
268 })),
269
270 Connection:Arc::new(RwLock::new(HashMap::new())),
271
272 BackgroundTask:Arc::new(Mutex::new(Vec::new())),
273 };
274
275 State.InitializeServiceStatus().await?;
277
278 Ok(State)
279 }
280
281 async fn InitializeServiceStatus(&self) -> Result<()> {
283 let mut Status = self.ServiceStatus.write().await;
284
285 Status.insert("authentication".to_string(), ServiceStatus::Starting);
286
287 Status.insert("updates".to_string(), ServiceStatus::Starting);
288
289 Status.insert("downloader".to_string(), ServiceStatus::Starting);
290
291 Status.insert("indexing".to_string(), ServiceStatus::Starting);
292
293 Status.insert("grpc".to_string(), ServiceStatus::Starting);
294
295 Status.insert("connections".to_string(), ServiceStatus::Starting);
296
297 Ok(())
298 }
299
300 pub async fn RegisterConnection(
303 &self,
304
305 ConnectionId:String,
306
307 ClientId:String,
308
309 ClientVersion:String,
310
311 ProtocolVersion:u32,
312
313 ConnectionType:ConnectionType,
314 ) -> Result<()> {
315 if ConnectionId.is_empty() {
317 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
318 }
319
320 if ClientId.is_empty() {
322 return Err(AirError::Configuration("Client ID cannot be empty".to_string()));
323 }
324
325 if ProtocolVersion == 0 {
327 return Err(AirError::Configuration("Protocol version must be greater than 0".to_string()));
328 }
329
330 let mut Connection = self.Connection.write().await;
331
332 if Connection.contains_key(&ConnectionId) {
334 return Err(AirError::Configuration(format!("Connection {} already exists", ConnectionId)));
335 }
336
337 if matches!(ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker) {
339 let ClientConnCount = Connection
341 .values()
342 .filter(|c| {
343 c.ClientId == ClientId
344 && matches!(c.ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker)
345 })
346 .count();
347
348 const MAX_CONN_PER_CLIENT:usize = 10;
349
350 if ClientConnCount >= MAX_CONN_PER_CLIENT {
351 return Err(AirError::ResourceLimit(format!(
352 "Client {} exceeds maximum connection limit ({})",
353 ClientId, MAX_CONN_PER_CLIENT
354 )));
355 }
356 }
357
358 Connection.insert(
359 ConnectionId.clone(),
360 ConnectionInfo {
361 ConnectionId:ConnectionId.clone(),
362 ClientId:ClientId.clone(),
363 ClientVersion,
364 ProtocolVersion,
365 LastHeartbeat:Utility::CurrentTimestamp(),
366 IsActive:true,
367 ConnectionType:ConnectionType.clone(),
368 },
369 );
370
371 dev_log!(
372 "lifecycle",
373 "Connection registered: {} - {} ({:?})",
374 ConnectionId,
375 ClientId,
376 ConnectionType
377 );
378
379 Ok(())
380 }
381
382 pub async fn UpdateHeartbeat(&self, ConnectionId:&str) -> Result<()> {
385 if ConnectionId.is_empty() {
386 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
387 }
388
389 let mut Connection = self.Connection.write().await;
390
391 if let Some(Connection) = Connection.get_mut(ConnectionId) {
392 let CurrentTime = Utility::CurrentTimestamp();
393
394 const MAX_HEARTBEAT_INTERVAL:u64 = 120000; if CurrentTime - Connection.LastHeartbeat > MAX_HEARTBEAT_INTERVAL {
398 dev_log!(
399 "lifecycle",
400 "warn: Long heartbeat interval for connection {}: {}ms",
401 ConnectionId,
402 CurrentTime - Connection.LastHeartbeat
403 );
404 }
405
406 Connection.LastHeartbeat = CurrentTime;
407
408 Connection.IsActive = true;
409
410 dev_log!(
411 "lifecycle",
412 "Heartbeat updated for connection: {} (client: {})",
413 ConnectionId,
414 Connection.ClientId
415 );
416 } else {
417 return Err(AirError::Internal(format!("Connection {} not found", ConnectionId)));
418 }
419
420 Ok(())
421 }
422
423 pub async fn RemoveConnection(&self, ConnectionId:&str) -> Result<()> {
426 if ConnectionId.is_empty() {
427 return Err(AirError::Configuration("Connection ID cannot be empty".to_string()));
428 }
429
430 let mut Connection = self.Connection.write().await;
431
432 if let Some(Connection) = Connection.remove(ConnectionId) {
433 dev_log!(
434 "lifecycle",
435 "Connection removed: {} (client: {}, type: {:?})",
436 ConnectionId,
437 Connection.ClientId,
438 Connection.ConnectionType
439 );
440
441 drop(Connection); } else {
448 dev_log!(
449 "lifecycle",
450 "warn: Attempted to remove non-existent connection: {}",
451 ConnectionId
452 );
453 }
454
455 Ok(())
456 }
457
458 pub async fn GetActiveConnectionCount(&self) -> usize {
460 let Connection = self.Connection.read().await;
461
462 Connection.values().filter(|c| c.IsActive).count()
463 }
464
465 pub async fn GetConnectionCountByType(&self, ConnectionType:ConnectionType) -> usize {
467 let Connection = self.Connection.read().await;
468
469 Connection
470 .values()
471 .filter(|c| c.ConnectionType == ConnectionType && c.IsActive)
472 .count()
473 }
474
475 pub async fn GetConnectionsByType(&self, ConnectionType:ConnectionType) -> Vec<ConnectionInfo> {
477 let Connection = self.Connection.read().await;
478
479 Connection
480 .values()
481 .filter(|c| c.ConnectionType == ConnectionType)
482 .cloned()
483 .collect()
484 }
485
486 pub async fn GetNextMountainConnection(&self) -> Result<ConnectionInfo> {
489 let Connection = self.Connection.read().await;
490
491 let MountainConnection:Vec<_> = Connection
492 .values()
493 .filter(|c| {
494 matches!(c.ConnectionType, ConnectionType::MountainMain | ConnectionType::MountainWorker) && c.IsActive
495 })
496 .collect();
497
498 if MountainConnection.is_empty() {
499 return Err(AirError::ServiceUnavailable(
500 "No active Mountain connections available".to_string(),
501 ));
502 }
503
504 let Selected = MountainConnection[0].clone();
510
511 Ok(Selected)
512 }
513
514 pub async fn CleanupStaleConnections(&self, TimeoutSeconds:u64) -> Result<usize> {
518 let mut Connection = self.Connection.write().await;
519
520 let CurrentTime = Utility::CurrentTimestamp();
521
522 let TimeoutMs = TimeoutSeconds * 1000;
523
524 let mut RemovedCount = 0;
525
526 let mut RemovedByType:HashMap<String, usize> = HashMap::new();
527
528 Connection.retain(|Id, Connection| {
529 if CurrentTime - Connection.LastHeartbeat > TimeoutMs {
530 dev_log!(
531 "lifecycle",
532 "warn: Removing stale connection: {} - {} ({:?}) - idle: {}ms",
533 Id,
534 Connection.ClientId,
535 Connection.ConnectionType,
536 CurrentTime - Connection.LastHeartbeat
537 );
538
539 *RemovedByType.entry(format!("{:?}", Connection.ConnectionType)).or_insert(0) += 1;
540
541 RemovedCount += 1;
542
543 false
544 } else {
545 true
546 }
547 });
548
549 if RemovedCount > 0 {
550 dev_log!("lifecycle", "Cleaned up {} stale connections", RemovedCount);
551
552 for (ConnType, Count) in RemovedByType {
553 dev_log!("lifecycle", " - {} connections: {}", ConnType, Count);
554 }
555 }
556
557 Ok(RemovedCount)
558 }
559
560 pub async fn RegisterBackgroundTask(&self, TaskItem:tokio::task::JoinHandle<()>) -> Result<()> {
562 let mut BackgroundTask = self.BackgroundTask.lock().await;
563
564 BackgroundTask.push(TaskItem);
565
566 dev_log!("lifecycle", "Background task registered. Total tasks: {}", BackgroundTask.len());
567
568 Ok(())
569 }
570
571 pub async fn StopAllBackgroundTasks(&self) -> Result<()> {
573 let mut BackgroundTask = self.BackgroundTask.lock().await;
574
575 let TaskCount = BackgroundTask.len();
576
577 dev_log!("lifecycle", "Stopping {} background tasks", TaskCount);
578
579 for TaskItem in BackgroundTask.drain(..) {
581 TaskItem.abort();
582 }
583
584 dev_log!("lifecycle", "Stopped all {} background tasks", TaskCount);
585
586 Ok(())
587 }
588
589 pub async fn UpdateServiceStatus(&self, Service:&str, Status:ServiceStatus) -> Result<()> {
591 if Service.is_empty() {
592 return Err(AirError::Configuration("Service name cannot be empty".to_string()));
593 }
594
595 let mut ServiceStatus = self.ServiceStatus.write().await;
596
597 let StatusClone = Status.clone();
598
599 ServiceStatus.insert(Service.to_string(), Status);
600
601 dev_log!("lifecycle", "Service status updated: {} -> {:?}", Service, StatusClone);
602
603 Ok(())
604 }
605
606 pub async fn GetServiceStatus(&self, Service:&str) -> Option<ServiceStatus> {
608 let ServiceStatus = self.ServiceStatus.read().await;
609
610 ServiceStatus.get(Service).cloned()
611 }
612
613 pub async fn GetAllServiceStatuses(&self) -> HashMap<String, ServiceStatus> {
615 let ServiceStatus = self.ServiceStatus.read().await;
616
617 ServiceStatus.clone()
618 }
619
620 pub async fn RegisterRequest(&self, RequestId:String, Service:String) -> Result<()> {
622 if RequestId.is_empty() {
623 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
624 }
625
626 if Service.is_empty() {
627 return Err(AirError::Configuration("Service name cannot be empty".to_string()));
628 }
629
630 let mut Request = self.ActiveRequest.lock().await;
631
632 if Request.contains_key(&RequestId) {
634 return Err(AirError::Configuration(format!("Request {} already exists", RequestId)));
635 }
636
637 Request.insert(
638 RequestId.clone(),
639 RequestStatus {
640 RequestId:RequestId.clone(),
641 Service,
642 StartedAt:Utility::CurrentTimestamp(),
643 Status:RequestState::Pending,
644 Progress:None,
645 },
646 );
647
648 dev_log!("lifecycle", "Request registered: {}", RequestId);
649
650 Ok(())
651 }
652
653 pub async fn UpdateRequestStatus(&self, RequestId:&str, Status:RequestState, Progress:Option<f32>) -> Result<()> {
655 if RequestId.is_empty() {
656 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
657 }
658
659 if let Some(p) = Progress {
661 if !(0.0..=1.0).contains(&p) {
662 return Err(AirError::Configuration("Progress must be between 0.0 and 1.0".to_string()));
663 }
664 }
665
666 let mut Request = self.ActiveRequest.lock().await;
667
668 if let Some(Request) = Request.get_mut(RequestId) {
669 Request.Status = Status;
670
671 Request.Progress = Progress;
672 } else {
673 return Err(AirError::Internal(format!("Request {} not found", RequestId)));
674 }
675
676 Ok(())
677 }
678
679 pub async fn RemoveRequest(&self, RequestId:&str) -> Result<()> {
681 if RequestId.is_empty() {
682 return Err(AirError::Configuration("Request ID cannot be empty".to_string()));
683 }
684
685 let mut request = self.ActiveRequest.lock().await;
686
687 if request.remove(RequestId).is_some() {
688 dev_log!("lifecycle", "Request removed: {}", RequestId);
689 }
690
691 Ok(())
692 }
693
694 pub async fn UpdateMetrics(&self, Success:bool, ResponseTime:u64) -> Result<()> {
696 let mut Metrics = self.Metrics.write().await;
697
698 Metrics.TotalRequest += 1;
699
700 if Success {
701 Metrics.SuccessfulRequest += 1;
702 } else {
703 Metrics.FailedRequest += 1;
704 }
705
706 let Alpha = 0.1; Metrics.AverageResponseTime = Alpha * (ResponseTime as f64) + (1.0 - Alpha) * Metrics.AverageResponseTime;
710
711 Metrics.LastUpdated = Utility::CurrentTimestamp();
712
713 Ok(())
714 }
715
716 pub async fn UpdateResourceUsage(&self) -> Result<()> {
718 let Sys = System::new();
719
720 let MemoryUsage = if let Ok(Memory) = Sys.memory() {
722 (Memory.total.as_u64() - Memory.free.as_u64()) as f64 / 1024.0 / 1024.0
723 } else {
724 dev_log!("lifecycle", "warn: Failed to get memory usage");
725
726 0.0
727 };
728
729 let CPUUsage = if let Ok(CPU) = Sys.cpu_load_aggregate() {
731 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
732
733 if let Ok(CPU) = CPU.done() {
734 (CPU.user + CPU.nice + CPU.system) as f64 * 100.0
735 } else {
736 dev_log!("lifecycle", "warn: Failed to get CPU usage after sampling");
737
738 0.0
739 }
740 } else {
741 dev_log!("lifecycle", "warn: Failed to start CPU load sampling");
742
743 0.0
744 };
745
746 let mut Resources = self.Resources.write().await;
748
749 Resources.MemoryUsageMb = MemoryUsage;
750
751 Resources.CPUUsagePercent = CPUUsage;
752
753 Resources.LastUpdated = Utility::CurrentTimestamp();
754
755 Ok(())
756 }
757
758 pub async fn GetMetrics(&self) -> PerformanceMetrics {
760 let metrics = self.Metrics.read().await;
761
762 metrics.clone()
763 }
764
765 pub async fn GetResourceUsage(&self) -> ResourceUsage {
767 let Resources = self.Resources.read().await;
768
769 Resources.clone()
770 }
771
772 pub async fn GetActiveRequestCount(&self) -> usize {
774 let Request = self.ActiveRequest.lock().await;
775
776 Request.len()
777 }
778
779 pub async fn IsRequestCancelled(&self, RequestId:&str) -> bool {
781 let Request = self.ActiveRequest.lock().await;
782
783 if let Some(Request) = Request.get(RequestId) {
784 matches!(Request.Status, RequestState::Cancelled)
785 } else {
786 false
787 }
788 }
789
790 pub async fn GetConfiguration(&self) -> Arc<AirConfiguration> { self.Configuration.clone() }
792
793 pub async fn UpdateConfiguration(
795 &self,
796
797 Section:String,
798
799 Updates:std::collections::HashMap<String, String>,
800 ) -> Result<()> {
801 dev_log!("lifecycle", "[ApplicationState] Updating configuration section: {}", Section);
802
803 if Section.is_empty() {
805 return Err(AirError::Configuration("Configuration section cannot be empty".to_string()));
806 }
807
808 if Updates.is_empty() {
810 return Err(AirError::Configuration("Configuration updates cannot be empty".to_string()));
811 }
812
813 match Section.as_str() {
821 "grpc" => {
822 dev_log!("lifecycle", "Updating gRPC configuration: {:?}", Updates);
823 },
824
825 "updates" => {
826 dev_log!("lifecycle", "Updating updates configuration: {:?}", Updates);
827 },
828
829 "downloader" => {
830 dev_log!("lifecycle", "Updating downloader configuration: {:?}", Updates);
831 },
832
833 "indexing" => {
834 dev_log!("lifecycle", "Updating indexing configuration: {:?}", Updates);
835 },
836
837 "daemon" => {
838 dev_log!("lifecycle", "Updating daemon configuration: {:?}", Updates);
839 },
840
841 _ => {
842 return Err(AirError::Configuration(format!("Unknown configuration section: {}", Section)));
843 },
844 }
845
846 Ok(())
847 }
848
849 pub async fn SetResourceLimits(
851 &self,
852
853 MemoryLimitMb:Option<u64>,
854
855 CPULimitPercent:Option<f64>,
856
857 DiskLimitMb:Option<u64>,
858 ) -> Result<()> {
859 dev_log!(
860 "lifecycle",
861 "[ApplicationState] Setting resource limits memory={:?}, CPU={:?}, disk={:?}",
862 MemoryLimitMb,
863 CPULimitPercent,
864 DiskLimitMb
865 );
866
867 if let Some(CPU) = CPULimitPercent {
869 if !(0.0..=100.0).contains(&CPU) {
870 return Err(AirError::ResourceLimit("CPU limit must be between 0 and 100".to_string()));
871 }
872 }
873
874 if let Some(Memory) = MemoryLimitMb {
876 if Memory == 0 {
877 return Err(AirError::ResourceLimit("Memory limit must be greater than 0".to_string()));
878 }
879 }
880
881 if let Some(Disk) = DiskLimitMb {
883 if Disk == 0 {
884 return Err(AirError::ResourceLimit("Disk limit must be greater than 0".to_string()));
885 }
886 }
887
888 if MemoryLimitMb.is_some() {
898 dev_log!("lifecycle", "Memory limit set: {} MB", MemoryLimitMb.unwrap());
899 }
900
901 if CPULimitPercent.is_some() {
902 dev_log!("lifecycle", "CPU limit set: {}%", CPULimitPercent.unwrap());
903 }
904
905 if DiskLimitMb.is_some() {
906 dev_log!("lifecycle", "Disk limit set: {} MB", DiskLimitMb.unwrap());
907 }
908
909 Ok(())
910 }
911
912 pub async fn CheckResourceLimits(&self) -> Result<bool> {
914 let _Resources = self.Resources.read().await;
915
916 Ok(false)
920 }
921
922 pub async fn GetConnectionHealthReport(&self) -> ConnectionHealthReport {
924 let Connection = self.Connection.read().await;
925
926 let CurrentTime = Utility::CurrentTimestamp();
927
928 let mut Healthy = 0;
929
930 let mut Stale = 0;
931
932 let mut ByType:HashMap<String, usize> = HashMap::new();
933
934 for ConnectionItem in Connection.values() {
935 let IsStale = CurrentTime - ConnectionItem.LastHeartbeat > 120000; if IsStale {
938 Stale += 1;
939 } else if ConnectionItem.IsActive {
940 Healthy += 1;
941 }
942
943 *ByType.entry(format!("{:?}", ConnectionItem.ConnectionType)).or_insert(0) += 1;
944 }
945
946 ConnectionHealthReport {
947 TotalConnection:Connection.len(),
948
949 HealthyConnection:Healthy,
950
951 StaleConnection:Stale,
952
953 ConnectionByType:ByType,
954
955 LastChecked:CurrentTime,
956 }
957 }
958}