1use std::{collections::HashMap, sync::Arc};
9
10use tonic::{Request, Response, Status};
11use tokio_stream::StreamExt as TokioStreamExt;
12use async_trait::async_trait;
13
14use crate::dev_log;
15use crate::{
17 AirError,
18 ApplicationState::ApplicationState,
19 Authentication::AuthenticationService,
20 Downloader::DownloadManager,
21 Indexing::{
22 FileIndexer,
23 Store::QueryIndex::{SearchMode, SearchQuery},
24 },
25 Result,
26 Updates::UpdateManager,
27 Utility::CurrentTimestamp,
28 Vine::Generated::{
29 air as air_generated,
30 air::{
31 ApplyUpdateRequest,
32 ApplyUpdateResponse,
33 AuthenticationRequest,
34 AuthenticationResponse,
35 ConfigurationRequest,
36 ConfigurationResponse,
37 DownloadRequest,
38 DownloadResponse,
39 DownloadStreamRequest,
40 DownloadStreamResponse,
41 FileInfoRequest,
42 FileInfoResponse,
43 FileResult,
44 HealthCheckRequest,
45 HealthCheckResponse,
46 IndexRequest,
47 IndexResponse,
48 MetricsRequest,
49 MetricsResponse,
50 ResourceLimitsRequest,
51 ResourceLimitsResponse,
52 ResourceUsageRequest,
53 ResourceUsageResponse,
54 SearchRequest,
55 SearchResponse,
56 StatusRequest,
57 StatusResponse,
58 UpdateCheckRequest,
59 UpdateCheckResponse,
60 UpdateConfigurationRequest,
61 UpdateConfigurationResponse,
62 air_service_server::AirService,
63 },
64 },
65};
66
67#[derive(Clone)]
69pub struct AirVinegRPCService {
70 AppState:Arc<ApplicationState>,
72
73 AuthService:Arc<AuthenticationService>,
75
76 UpdateManager:Arc<UpdateManager>,
78
79 DownloadManager:Arc<DownloadManager>,
81
82 FileIndexer:Arc<FileIndexer>,
84
85 ActiveConnections:Arc<tokio::sync::RwLock<HashMap<String, ConnectionMetadata>>>,
87}
88
89#[derive(Debug, Clone)]
91struct ConnectionMetadata {
92 pub ClientId:String,
93
94 pub ClientVersion:String,
95
96 pub ProtocolVersion:u32,
97
98 pub LastRequestTime:u64,
99
100 pub RequestCount:u64,
101
102 pub ConnectionType:crate::ApplicationState::ConnectionType,
103}
104
105impl AirVinegRPCService {
106 pub fn new(
108 AppState:Arc<ApplicationState>,
109
110 AuthService:Arc<AuthenticationService>,
111
112 UpdateManager:Arc<UpdateManager>,
113
114 DownloadManager:Arc<DownloadManager>,
115
116 FileIndexer:Arc<FileIndexer>,
117 ) -> Self {
118 dev_log!("grpc", "[AirVinegRPCService] New instance created");
119
120 Self {
121 AppState,
122
123 AuthService,
124
125 UpdateManager,
126
127 DownloadManager,
128
129 FileIndexer,
130
131 ActiveConnections:Arc::new(tokio::sync::RwLock::new(HashMap::new())),
132 }
133 }
134
135 async fn TrackConnection<RequestType>(
137 &self,
138
139 Request:&tonic::Request<RequestType>,
140
141 _ServiceName:&str,
142 ) -> std::result::Result<String, Status> {
143 let Metadata = Request.metadata();
144
145 let ConnectionId = Metadata
146 .get("connection-id")
147 .map(|v| v.to_str().unwrap_or_default().to_string())
148 .unwrap_or_else(|| crate::Utility::GenerateRequestId());
149
150 let ClientId = Metadata
151 .get("client-id")
152 .map(|v| v.to_str().unwrap_or_default().to_string())
153 .unwrap_or_else(|| "unknown".to_string());
154
155 let ClientVersion = Metadata
156 .get("client-version")
157 .map(|v| v.to_str().unwrap_or_default().to_string())
158 .unwrap_or_else(|| "unknown".to_string());
159
160 let ProtocolVersion = Metadata
161 .get("protocol-version")
162 .map(|v| v.to_str().unwrap_or_default().parse().unwrap_or(1))
163 .unwrap_or(1);
164
165 let mut Connections = self.ActiveConnections.write().await;
167
168 let ConnectionMetadata = Connections.entry(ConnectionId.clone()).or_insert_with(|| {
169 ConnectionMetadata {
170 ClientId:ClientId.clone(),
171 ClientVersion:ClientVersion.clone(),
172 ProtocolVersion,
173 LastRequestTime:crate::Utility::CurrentTimestamp(),
174 RequestCount:0,
175 ConnectionType:crate::ApplicationState::ConnectionType::MountainMain,
176 }
177 });
178
179 ConnectionMetadata.LastRequestTime = crate::Utility::CurrentTimestamp();
180
181 ConnectionMetadata.RequestCount += 1;
182
183 self.AppState
185 .RegisterConnection(
186 ConnectionId.clone(),
187 ClientId,
188 ClientVersion,
189 ProtocolVersion,
190 crate::ApplicationState::ConnectionType::MountainMain,
191 )
192 .await
193 .map_err(|e| Status::internal(e.to_string()))?;
194
195 Ok(ConnectionId)
196 }
197
198 fn validate_protocol_version(&self, ClientVersion:u32) -> std::result::Result<(), Status> {
200 if ClientVersion > crate::ProtocolVersion {
201 return Err(Status::failed_precondition(format!(
202 "Client protocol version {} is newer than server version {}",
203 ClientVersion,
204 crate::ProtocolVersion
205 )));
206 }
207
208 if ClientVersion < crate::ProtocolVersion {
209 dev_log!(
210 "grpc",
211 "warn: Client using older protocol version {} (server: {})",
212 ClientVersion,
213 crate::ProtocolVersion
214 );
215 }
216
217 Ok(())
218 }
219}
220
221#[async_trait]
222impl AirService for AirVinegRPCService {
223 async fn authenticate(
225 &self,
226
227 Request:Request<AuthenticationRequest>,
228 ) -> std::result::Result<Response<AuthenticationResponse>, Status> {
229 let ConnectionId = self.TrackConnection(&Request, "authentication").await?;
231
232 let RequestData = Request.into_inner();
233
234 let request_id = RequestData.request_id.clone();
235
236 dev_log!(
237 "grpc",
238 "[AirVinegRPCService] Authentication request received [ID: {}] [Connection: {}]",
239 request_id,
240 ConnectionId
241 );
242
243 self.AppState
244 .RegisterRequest(request_id.clone(), "authentication".to_string())
245 .await
246 .map_err(|e| Status::internal(e.to_string()))?;
247
248 if RequestData.username.is_empty() || RequestData.password.is_empty() || RequestData.provider.is_empty() {
250 let ErrorMessage = "Invalid authentication parameters".to_string();
251
252 self.AppState
253 .UpdateRequestStatus(
254 &request_id,
255 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
256 None,
257 )
258 .await
259 .ok();
260
261 return Ok(Response::new(air_generated::AuthenticationResponse {
262 request_id,
263 success:false,
264 token:String::new(),
265 error:ErrorMessage,
266 }));
267 }
268
269 let username_for_log = RequestData.username.clone();
271
272 let password = RequestData.password;
273
274 let provider = RequestData.provider;
275
276 let result = self
277 .AuthService
278 .AuthenticateUser(RequestData.username, password, provider)
279 .await;
280
281 match result {
282 Ok(token) => {
283 self.AppState
284 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
285 .await
286 .ok();
287
288 dev_log!(
290 "grpc",
291 "[AirVinegRPCService] Authentication successful for user: {} [Connection: {}]",
292 username_for_log,
293 ConnectionId
294 );
295
296 Ok(Response::new(air_generated::AuthenticationResponse {
297 request_id,
298 success:true,
299 token,
300 error:String::new(),
301 }))
302 },
303
304 Err(e) => {
305 self.AppState
306 .UpdateRequestStatus(
307 &request_id,
308 crate::ApplicationState::RequestState::Failed(e.to_string()),
309 None,
310 )
311 .await
312 .ok();
313
314 dev_log!(
316 "grpc",
317 "warn: [AirVinegRPCService] Authentication failed for user: {} [Connection: {}] - {}",
318 username_for_log,
319 ConnectionId,
320 e
321 );
322
323 Ok(Response::new(air_generated::AuthenticationResponse {
324 request_id,
325 success:false,
326 token:String::new(),
327 error:e.to_string(),
328 }))
329 },
330 }
331 }
332
333 async fn check_for_updates(
335 &self,
336
337 request:Request<UpdateCheckRequest>,
338 ) -> std::result::Result<Response<UpdateCheckResponse>, Status> {
339 let RequestData = request.into_inner();
340
341 let request_id = RequestData.request_id.clone();
342
343 dev_log!(
344 "grpc",
345 "[AirVinegRPCService] Update check request received [ID: {}] - Version: {}, Channel: {}",
346 request_id,
347 RequestData.current_version,
348 RequestData.channel
349 );
350
351 self.AppState
352 .RegisterRequest(request_id.clone(), "updates".to_string())
353 .await
354 .map_err(|e| Status::internal(e.to_string()))?;
355
356 if RequestData.current_version.is_empty() {
358 let ErrorMessage = crate::AirError::Validation("CurrentVersion cannot be empty".to_string());
359
360 self.AppState
361 .UpdateRequestStatus(
362 &request_id,
363 crate::ApplicationState::RequestState::Failed(ErrorMessage.to_string()),
364 None,
365 )
366 .await
367 .ok();
368
369 return Err(Status::invalid_argument(ErrorMessage.to_string()));
370 }
371
372 let ValidChannels = ["stable", "beta", "nightly"];
374
375 let Channel = if RequestData.channel.is_empty() {
376 "stable".to_string()
377 } else {
378 RequestData.channel.clone()
379 };
380
381 if !ValidChannels.contains(&Channel.as_str()) {
382 let ErrorMessage = format!("Invalid channel: {}. Valid values are: {}", Channel, ValidChannels.join(", "));
383
384 self.AppState
385 .UpdateRequestStatus(
386 &request_id,
387 crate::ApplicationState::RequestState::Failed(ErrorMessage.clone()),
388 None,
389 )
390 .await
391 .ok();
392
393 return Err(Status::invalid_argument(ErrorMessage));
394 }
395
396 let result = self.UpdateManager.CheckForUpdates().await;
398
399 match result {
400 Ok(UpdateInfo) => {
401 self.AppState
402 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
403 .await
404 .ok();
405
406 dev_log!(
407 "grpc",
408 "[AirVinegRPCService] Update check successful - Available: {}",
409 UpdateInfo.is_some()
410 );
411
412 Ok(Response::new(air_generated::UpdateCheckResponse {
413 request_id,
414 update_available:UpdateInfo.is_some(),
415 version:UpdateInfo.as_ref().map(|info| info.version.clone()).unwrap_or_default(),
416 download_url:UpdateInfo.as_ref().map(|info| info.download_url.clone()).unwrap_or_default(),
417 release_notes:UpdateInfo.as_ref().map(|info| info.release_notes.clone()).unwrap_or_default(),
418 error:String::new(),
419 }))
420 },
421
422 Err(crate::AirError::Network(e)) => {
423 self.AppState
424 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
425 .await
426 .ok();
427
428 dev_log!("grpc", "error: [AirVinegRPCService] Network error during update check: {}", e);
429
430 Err(Status::unavailable(e))
431 },
432
433 Err(e) => {
434 self.AppState
435 .UpdateRequestStatus(
436 &request_id,
437 crate::ApplicationState::RequestState::Failed(e.to_string()),
438 None,
439 )
440 .await
441 .ok();
442
443 dev_log!("grpc", "error: [AirVinegRPCService] Update check failed: {}", e);
444
445 Ok(Response::new(air_generated::UpdateCheckResponse {
446 request_id,
447 update_available:false,
448 version:String::new(),
449 download_url:String::new(),
450 release_notes:String::new(),
451 error:e.to_string(),
452 }))
453 },
454 }
455 }
456
457 async fn download_file(
459 &self,
460
461 request:Request<DownloadRequest>,
462 ) -> std::result::Result<Response<DownloadResponse>, Status> {
463 let RequestData = request.into_inner();
464
465 let request_id = RequestData.request_id.clone();
466
467 dev_log!(
468 "grpc",
469 "[AirVinegRPCService] Download request received [ID: {}] - URL: {}",
470 request_id,
471 RequestData.url
472 );
473
474 let download_request_id = if request_id.is_empty() {
476 crate::Utility::GenerateRequestId()
477 } else {
478 request_id.clone()
479 };
480
481 self.AppState
482 .RegisterRequest(download_request_id.clone(), "downloader".to_string())
483 .await
484 .map_err(|e| Status::internal(e.to_string()))?;
485
486 if RequestData.url.is_empty() {
488 let error_msg = "URL cannot be empty".to_string();
489
490 self.AppState
491 .UpdateRequestStatus(
492 &download_request_id,
493 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
494 None,
495 )
496 .await
497 .ok();
498
499 return Ok(Response::new(DownloadResponse {
500 request_id:download_request_id,
501 success:false,
502 file_path:String::new(),
503 file_size:0,
504 checksum:String::new(),
505 error:error_msg,
506 }));
507 }
508
509 if !match_url_scheme(&RequestData.url) {
511 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
512
513 self.AppState
514 .UpdateRequestStatus(
515 &download_request_id,
516 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
517 None,
518 )
519 .await
520 .ok();
521
522 return Ok(Response::new(DownloadResponse {
523 request_id:download_request_id,
524 success:false,
525 file_path:String::new(),
526 file_size:0,
527 checksum:String::new(),
528 error:error_msg,
529 }));
530 }
531
532 let DestinationPath = if RequestData.destination_path.is_empty() {
534 let config = &self.AppState.Configuration.Downloader;
536
537 config.CacheDirectory.clone()
538 } else {
539 RequestData.destination_path.clone()
540 };
541
542 let dest_path = std::path::Path::new(&DestinationPath);
544
545 if let Some(parent) = dest_path.parent() {
546 if !parent.exists() {
547 match tokio::fs::create_dir_all(parent).await {
548 Ok(_) => {
549 dev_log!(
550 "grpc",
551 "[AirVinegRPCService] Created destination directory: {}",
552 parent.display()
553 );
554 },
555
556 Err(e) => {
557 let error_msg = format!("Failed to create destination directory: {}", e);
558
559 self.AppState
560 .UpdateRequestStatus(
561 &download_request_id,
562 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
563 None,
564 )
565 .await
566 .ok();
567
568 return Ok(Response::new(DownloadResponse {
569 request_id:download_request_id,
570 success:false,
571 file_path:String::new(),
572 file_size:0,
573 checksum:String::new(),
574 error:error_msg,
575 }));
576 },
577 }
578 }
579 }
580
581 let _download_manager = self.DownloadManager.clone();
583
584 let AppState = self.AppState.clone();
585
586 let callback_request_id = download_request_id.clone();
587
588 let progress_callback = move |progress:f32| {
589 let state = AppState.clone();
590
591 let id = callback_request_id.clone();
592
593 tokio::spawn(async move {
594 let _ = state
595 .UpdateRequestStatus(&id, crate::ApplicationState::RequestState::InProgress, Some(progress))
596 .await;
597 });
598 };
599
600 let result = self
602 .download_file_with_retry(
603 &download_request_id,
604 RequestData.url.clone(),
605 DestinationPath,
606 RequestData.checksum,
607 Some(Box::new(progress_callback)),
608 )
609 .await;
610
611 match result {
612 Ok(file_info) => {
613 self.AppState
614 .UpdateRequestStatus(
615 &download_request_id,
616 crate::ApplicationState::RequestState::Completed,
617 Some(100.0),
618 )
619 .await
620 .ok();
621
622 dev_log!(
623 "grpc",
624 "[AirVinegRPCService] Download completed [ID: {}] - Size: {} bytes",
625 download_request_id,
626 file_info.size
627 );
628
629 Ok(Response::new(DownloadResponse {
630 request_id:download_request_id,
631 success:true,
632 file_path:file_info.path,
633 file_size:file_info.size,
634 checksum:file_info.checksum,
635 error:String::new(),
636 }))
637 },
638
639 Err(e) => {
640 self.AppState
641 .UpdateRequestStatus(
642 &download_request_id,
643 crate::ApplicationState::RequestState::Failed(e.to_string()),
644 None,
645 )
646 .await
647 .ok();
648
649 dev_log!(
650 "grpc",
651 "error: [AirVinegRPCService] Download failed [ID: {}] - Error: {}",
652 download_request_id,
653 e
654 );
655
656 Ok(Response::new(DownloadResponse {
657 request_id:download_request_id,
658 success:false,
659 file_path:String::new(),
660 file_size:0,
661 checksum:String::new(),
662 error:e.to_string(),
663 }))
664 },
665 }
666 }
667
668 async fn index_files(&self, request:Request<IndexRequest>) -> std::result::Result<Response<IndexResponse>, Status> {
670 let RequestData = request.into_inner();
671
672 let request_id = RequestData.request_id;
673
674 dev_log!(
675 "grpc",
676 "[AirVinegRPCService] Index request received [ID: {}] - Path: {}",
677 request_id,
678 RequestData.path
679 );
680
681 self.AppState
682 .RegisterRequest(request_id.clone(), "indexing".to_string())
683 .await
684 .map_err(|e| Status::internal(e.to_string()))?;
685
686 let result = self.FileIndexer.IndexDirectory(RequestData.path, RequestData.patterns).await;
687
688 match result {
689 Ok(index_info) => {
690 self.AppState
691 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
692 .await
693 .ok();
694
695 Ok(Response::new(air_generated::IndexResponse {
696 request_id,
697 success:true,
698 files_indexed:index_info.files_indexed,
699 total_size:index_info.total_size,
700 error:String::new(),
701 }))
702 },
703
704 Err(e) => {
705 self.AppState
706 .UpdateRequestStatus(
707 &request_id,
708 crate::ApplicationState::RequestState::Failed(e.to_string()),
709 None,
710 )
711 .await
712 .ok();
713
714 Ok(Response::new(air_generated::IndexResponse {
715 request_id,
716 success:false,
717 files_indexed:0,
718 total_size:0,
719 error:e.to_string(),
720 }))
721 },
722 }
723 }
724
725 async fn get_status(
727 &self,
728
729 request:Request<StatusRequest>,
730 ) -> std::result::Result<Response<StatusResponse>, Status> {
731 let _RequestData = request.into_inner();
732
733 dev_log!("grpc", "[AirVinegRPCService] Status request received");
734
735 let metrics = self.AppState.GetMetrics().await;
736
737 let resources = self.AppState.GetResourceUsage().await;
738
739 Ok(Response::new(air_generated::StatusResponse {
740 version:crate::VERSION.to_string(),
741 uptime_seconds:metrics.UptimeSeconds,
742 total_requests:metrics.TotalRequest,
743 successful_requests:metrics.SuccessfulRequest,
744 failed_requests:metrics.FailedRequest,
745 average_response_time:metrics.AverageResponseTime,
746 memory_usage_mb:resources.MemoryUsageMb,
747 cpu_usage_percent:resources.CPUUsagePercent,
748 active_requests:self.AppState.GetActiveRequestCount().await as u32,
749 }))
750 }
751
752 async fn health_check(
754 &self,
755
756 _request:Request<HealthCheckRequest>,
757 ) -> std::result::Result<Response<HealthCheckResponse>, Status> {
758 dev_log!("grpc", "[AirVinegRPCService] Health check request received");
759
760 Ok(Response::new(air_generated::HealthCheckResponse {
761 healthy:true,
762 timestamp:CurrentTimestamp(),
763 }))
764 }
765
766 async fn download_update(
770 &self,
771
772 request:Request<DownloadRequest>,
773 ) -> std::result::Result<Response<DownloadResponse>, Status> {
774 let RequestData = request.into_inner();
775
776 let request_id = RequestData.request_id.clone();
777
778 dev_log!(
779 "grpc",
780 "[AirVinegRPCService] Download update request received [ID: {}] - URL: {}, Destination: {}",
781 request_id,
782 RequestData.url,
783 RequestData.destination_path
784 );
785
786 self.AppState
787 .RegisterRequest(request_id.clone(), "download_update".to_string())
788 .await
789 .map_err(|e| Status::internal(e.to_string()))?;
790
791 if RequestData.url.is_empty() {
793 let error_msg = crate::AirError::Validation("URL cannot be empty".to_string());
794
795 self.AppState
796 .UpdateRequestStatus(
797 &request_id,
798 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
799 None,
800 )
801 .await
802 .ok();
803
804 return Err(Status::invalid_argument(error_msg.to_string()));
805 }
806
807 if !RequestData.url.starts_with("http://") && !RequestData.url.starts_with("https://") {
809 let error_msg = crate::AirError::Validation("URL must start with http:// or https://".to_string());
810
811 self.AppState
812 .UpdateRequestStatus(
813 &request_id,
814 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
815 None,
816 )
817 .await
818 .ok();
819
820 return Err(Status::invalid_argument(error_msg.to_string()));
821 }
822
823 let destination = if RequestData.destination_path.is_empty() {
825 self.UpdateManager
827 .GetCacheDirectory()
828 .join("update-latest.bin")
829 .to_string_lossy()
830 .to_string()
831 } else {
832 let dest_path = std::path::Path::new(&RequestData.destination_path);
834
835 if let Some(parent) = dest_path.parent() {
836 if parent.as_os_str().is_empty() {
837 self.UpdateManager
839 .GetCacheDirectory()
840 .join(&RequestData.destination_path)
841 .to_string_lossy()
842 .to_string()
843 } else {
844 RequestData.destination_path.clone()
846 }
847 } else {
848 RequestData.destination_path.clone()
849 }
850 };
851
852 let dest_path = std::path::Path::new(&destination);
854
855 if let Some(parent) = dest_path.parent() {
856 if !parent.exists() {
857 return Err(Status::failed_precondition(format!(
858 "Destination directory does not exist: {}",
859 parent.display()
860 )));
861 }
862
863 if let Err(e) = std::fs::write(parent.join(".write_test"), "") {
865 let error_msg = crate::AirError::FileSystem(format!("Destination directory not writeable: {}", e));
866
867 self.AppState
868 .UpdateRequestStatus(
869 &request_id,
870 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
871 None,
872 )
873 .await
874 .ok();
875
876 return Err(Status::permission_denied(error_msg.to_string()));
877 }
878
879 let _ = std::fs::remove_file(parent.join(".write_test"));
881 }
882
883 let download_result = self
886 .DownloadManager
887 .DownloadFile(RequestData.url, destination.clone(), RequestData.checksum)
888 .await;
889
890 match download_result {
891 Ok(result) => {
892 self.AppState
893 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
894 .await
895 .ok();
896
897 dev_log!(
898 "grpc",
899 "[AirVinegRPCService] Update downloaded successfully - Path: {}, Size: {}, Checksum: {}",
900 result.path,
901 result.size,
902 result.checksum
903 );
904
905 Ok(Response::new(DownloadResponse {
906 request_id,
907 success:true,
908 file_path:result.path,
909 file_size:result.size,
910 checksum:result.checksum,
911 error:String::new(),
912 }))
913 },
914
915 Err(crate::AirError::Network(e)) => {
916 self.AppState
917 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
918 .await
919 .ok();
920
921 dev_log!("grpc", "error: [AirVinegRPCService] Download update network error: {}", e);
922
923 Err(Status::unavailable(e))
924 },
925
926 Err(crate::AirError::FileSystem(e)) => {
927 self.AppState
928 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
929 .await
930 .ok();
931
932 dev_log!("grpc", "error: [AirVinegRPCService] Download update filesystem error: {}", e);
933
934 Err(Status::internal(e))
935 },
936
937 Err(e) => {
938 self.AppState
939 .UpdateRequestStatus(
940 &request_id,
941 crate::ApplicationState::RequestState::Failed(e.to_string()),
942 None,
943 )
944 .await
945 .ok();
946
947 dev_log!("grpc", "error: [AirVinegRPCService] Download update failed: {}", e);
948
949 Ok(Response::new(DownloadResponse {
950 request_id,
951 success:false,
952 file_path:String::new(),
953 file_size:0,
954 checksum:String::new(),
955 error:e.to_string(),
956 }))
957 },
958 }
959 }
960
961 async fn apply_update(
963 &self,
964
965 request:Request<ApplyUpdateRequest>,
966 ) -> std::result::Result<Response<ApplyUpdateResponse>, Status> {
967 let RequestData = request.into_inner();
968
969 let request_id = RequestData.request_id.clone();
970
971 dev_log!(
972 "grpc",
973 "[AirVinegRPCService] Apply update request received [ID: {}] - Version: {}, Path: {}",
974 request_id,
975 RequestData.version,
976 RequestData.update_path
977 );
978
979 self.AppState
980 .RegisterRequest(request_id.clone(), "apply_update".to_string())
981 .await
982 .map_err(|e| Status::internal(e.to_string()))?;
983
984 if RequestData.version.is_empty() {
986 let error_msg = crate::AirError::Validation("version cannot be empty".to_string());
987
988 self.AppState
989 .UpdateRequestStatus(
990 &request_id,
991 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
992 None,
993 )
994 .await
995 .ok();
996
997 return Err(Status::invalid_argument(error_msg.to_string()));
998 }
999
1000 if RequestData.update_path.is_empty() {
1002 let error_msg = crate::AirError::Validation("update_path cannot be empty".to_string());
1003
1004 self.AppState
1005 .UpdateRequestStatus(
1006 &request_id,
1007 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
1008 None,
1009 )
1010 .await
1011 .ok();
1012
1013 return Err(Status::invalid_argument(error_msg.to_string()));
1014 }
1015
1016 let update_path = std::path::Path::new(&RequestData.update_path);
1017
1018 if !update_path.exists() {
1020 let error_msg = crate::AirError::FileSystem(format!("Update file not found: {}", RequestData.update_path));
1021
1022 self.AppState
1023 .UpdateRequestStatus(
1024 &request_id,
1025 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
1026 None,
1027 )
1028 .await
1029 .ok();
1030
1031 return Err(Status::not_found(error_msg.to_string()));
1032 }
1033
1034 let metadata = match std::fs::metadata(update_path) {
1036 Ok(m) => m,
1037
1038 Err(e) => {
1039 let error_msg = crate::AirError::FileSystem(format!("Failed to read update file metadata: {}", e));
1040
1041 self.AppState
1042 .UpdateRequestStatus(
1043 &request_id,
1044 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
1045 None,
1046 )
1047 .await
1048 .ok();
1049
1050 return Err(Status::internal(error_msg.to_string()));
1051 },
1052 };
1053
1054 if metadata.len() == 0 {
1055 let error_msg = crate::AirError::Validation("Update file is empty".to_string());
1056
1057 self.AppState
1058 .UpdateRequestStatus(
1059 &request_id,
1060 crate::ApplicationState::RequestState::Failed(error_msg.to_string()),
1061 None,
1062 )
1063 .await
1064 .ok();
1065
1066 return Err(Status::failed_precondition(error_msg.to_string()));
1067 }
1068
1069 let rollback_backup_path = self.prepare_rollback_backup(&RequestData.version).await;
1071
1072 if let Err(ref e) = rollback_backup_path {
1073 dev_log!(
1074 "grpc",
1075 "warn: [AirVinegRPCService] Failed to prepare rollback backup: {}. Proceeding without rollback \
1076 capability.",
1077 e
1078 );
1079 }
1080
1081 match self.UpdateManager.verify_update(&RequestData.update_path, None).await {
1083 Ok(true) => {
1084 dev_log!(
1085 "grpc",
1086 "[AirVinegRPCService] Update verification successful, preparing for installation"
1087 );
1088
1089 self.AppState
1090 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Completed, Some(100.0))
1091 .await
1092 .ok();
1093
1094 let AppState = self.AppState.clone();
1096
1097 let version = RequestData.version.clone();
1098
1099 let self_clone = self.clone();
1100
1101 tokio::spawn(async move {
1102 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1103
1104 dev_log!(
1105 "grpc",
1106 "[AirVinegRPCService] Initiating graceful shutdown for update version {}",
1107 version
1108 );
1109
1110 if let Err(e) = AppState.StopAllBackgroundTasks().await {
1112 dev_log!(
1113 "grpc",
1114 "error: [AirVinegRPCService] Failed to initiate graceful shutdown: {}",
1115 e
1116 );
1117
1118 dev_log!(
1120 "grpc",
1121 "warn: [AirVinegRPCService] Rollback initiated due to graceful shutdown failure"
1122 );
1123
1124 if let Err(rollback_error) = self_clone.perform_rollback(&version).await {
1125 dev_log!("grpc", "error: [AirVinegRPCService] Rollback failed: {}", rollback_error);
1126 } else {
1127 dev_log!("grpc", "[AirVinegRPCService] Rollback completed successfully");
1128 }
1129 }
1130 });
1131
1132 Ok(Response::new(ApplyUpdateResponse {
1133 request_id,
1134 success:true,
1135 error:String::new(),
1136 }))
1137 },
1138
1139 Ok(false) => {
1140 let error_msg = "Update verification failed: checksum mismatch".to_string();
1141
1142 self.AppState
1143 .UpdateRequestStatus(
1144 &request_id,
1145 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1146 None,
1147 )
1148 .await
1149 .ok();
1150
1151 dev_log!("grpc", "error: [AirVinegRPCService] {}", error_msg);
1152
1153 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1155
1156 Err(Status::failed_precondition(error_msg))
1157 },
1158
1159 Err(crate::AirError::FileSystem(e)) => {
1160 self.AppState
1161 .UpdateRequestStatus(&request_id, crate::ApplicationState::RequestState::Failed(e.clone()), None)
1162 .await
1163 .ok();
1164
1165 dev_log!(
1166 "grpc",
1167 "error: [AirVinegRPCService] Update verification filesystem error: {}",
1168 e
1169 );
1170
1171 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1173
1174 Err(Status::internal(e))
1175 },
1176
1177 Err(e) => {
1178 self.AppState
1179 .UpdateRequestStatus(
1180 &request_id,
1181 crate::ApplicationState::RequestState::Failed(e.to_string()),
1182 None,
1183 )
1184 .await
1185 .ok();
1186
1187 dev_log!("grpc", "error: [AirVinegRPCService] Update verification error: {}", e);
1188
1189 let _ = self.cleanup_rollback_backup(&RequestData.version).await;
1191
1192 Ok(Response::new(ApplyUpdateResponse {
1193 request_id,
1194 success:false,
1195 error:e.to_string(),
1196 }))
1197 },
1198 }
1199 }
1200
1201 type DownloadStreamStream =
1206 tokio_stream::wrappers::ReceiverStream<std::result::Result<air_generated::DownloadStreamResponse, Status>>;
1207
1208 async fn download_stream(
1209 &self,
1210
1211 request:Request<DownloadStreamRequest>,
1212 ) -> std::result::Result<Response<Self::DownloadStreamStream>, Status> {
1213 let RequestData = request.into_inner();
1214
1215 let request_id = RequestData.request_id.clone();
1216
1217 dev_log!(
1218 "grpc",
1219 "[AirVinegRPCService] Download stream request received [ID: {}] - URL: {}",
1220 request_id,
1221 RequestData.url
1222 );
1223
1224 self.AppState
1225 .RegisterRequest(request_id.clone(), "downloader_stream".to_string())
1226 .await
1227 .map_err(|e| Status::internal(e.to_string()))?;
1228
1229 if RequestData.url.is_empty() {
1231 let error_msg = "URL cannot be empty".to_string();
1232
1233 self.AppState
1234 .UpdateRequestStatus(
1235 &request_id,
1236 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1237 None,
1238 )
1239 .await
1240 .ok();
1241
1242 return Err(Status::invalid_argument(error_msg));
1243 }
1244
1245 if !match_url_scheme(&RequestData.url) {
1247 let error_msg = format!("Invalid URL scheme: {}", RequestData.url);
1248
1249 self.AppState
1250 .UpdateRequestStatus(
1251 &request_id,
1252 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1253 None,
1254 )
1255 .await
1256 .ok();
1257
1258 return Err(Status::invalid_argument(error_msg));
1259 }
1260
1261 match self.validate_range_support(&RequestData.url).await {
1263 Ok(true) => {
1264 dev_log!("grpc", "[AirVinegRPCService] URL supports range headers");
1265 },
1266
1267 Ok(false) => {
1268 dev_log!(
1269 "grpc",
1270 "warn: [AirVinegRPCService] URL does not support range headers, streaming may be inefficient"
1271 );
1272 },
1273
1274 Err(e) => {
1275 let error_msg = format!("Failed to validate range support: {}", e);
1276
1277 self.AppState
1278 .UpdateRequestStatus(
1279 &request_id,
1280 crate::ApplicationState::RequestState::Failed(error_msg.clone()),
1281 None,
1282 )
1283 .await
1284 .ok();
1285
1286 return Err(Status::internal(error_msg));
1287 },
1288 }
1289
1290 let (tx, rx) = tokio::sync::mpsc::channel(100);
1292
1293 let chunk_size = 8 * 1024 * 1024; let url = RequestData.url.clone();
1298
1299 let headers = RequestData.headers;
1300
1301 let download_request_id = request_id.clone();
1302
1303 let _download_manager = self.DownloadManager.clone();
1304
1305 let AppState = self.AppState.clone();
1306
1307 tokio::spawn(async move {
1309 if tx
1311 .send(Ok(DownloadStreamResponse {
1312 request_id:download_request_id.clone(),
1313 chunk:vec![].into(),
1314 total_size:0,
1315 downloaded:0,
1316 completed:false,
1317 error:String::new(),
1318 }))
1319 .await
1320 .is_err()
1321 {
1322 dev_log!(
1323 "grpc",
1324 "warn: [AirVinegRPCService] Client disconnected before streaming started [ID: {}]",
1325 download_request_id
1326 );
1327
1328 return;
1329 }
1330
1331 let dns_port = Mist::dns_port();
1333
1334 let client_builder_result = crate::HTTP::Client::secured_client_builder(dns_port);
1335
1336 let client_builder = match client_builder_result {
1337 Ok(builder) => builder,
1338 Err(e) => {
1339 let error = format!("Failed to create HTTP client builder: {}", e);
1340
1341 let _ = tx
1342 .send(Ok(DownloadStreamResponse {
1343 request_id:download_request_id.clone(),
1344 chunk:vec![].into(),
1345 total_size:0,
1346 downloaded:0,
1347 completed:false,
1348 error:error.clone(),
1349 }))
1350 .await;
1351
1352 AppState
1353 .UpdateRequestStatus(
1354 &download_request_id,
1355 crate::ApplicationState::RequestState::Failed(error),
1356 None,
1357 )
1358 .await
1359 .ok();
1360
1361 return;
1362 },
1363 };
1364
1365 let client_result = client_builder
1366 .pool_idle_timeout(std::time::Duration::from_secs(60))
1367 .pool_max_idle_per_host(5)
1368 .timeout(std::time::Duration::from_secs(300))
1369 .build();
1370
1371 if client_result.is_err() {
1372 let error = client_result.unwrap_err().to_string();
1373
1374 let _ = tx
1375 .send(Ok(DownloadStreamResponse {
1376 request_id:download_request_id.clone(),
1377 chunk:vec![].into(),
1378 total_size:0,
1379 downloaded:0,
1380 completed:false,
1381 error:error.clone(),
1382 }))
1383 .await;
1384
1385 AppState
1386 .UpdateRequestStatus(
1387 &download_request_id,
1388 crate::ApplicationState::RequestState::Failed(error),
1389 None,
1390 )
1391 .await
1392 .ok();
1393
1394 return;
1395 }
1396
1397 let client:reqwest::Client = match client_result {
1398 Ok(client) => client,
1399 Err(e) => {
1400 let error = format!("Failed to create HTTP client: {}", e);
1401
1402 let _ = tx.send(Err(Status::internal(error.clone())));
1403
1404 AppState
1405 .UpdateRequestStatus(
1406 &download_request_id,
1407 crate::ApplicationState::RequestState::Failed(error),
1408 None,
1409 )
1410 .await
1411 .ok();
1412
1413 return;
1414 },
1415 };
1416
1417 let mut total_size:Option<u64> = None;
1419
1420 let mut total_downloaded:u64 = 0;
1421
1422 match client
1423 .get(&url)
1424 .headers({
1425 let mut map = reqwest::header::HeaderMap::new();
1426
1427 for (key, value) in headers {
1428 if let (Ok(header_name), Ok(header_value)) = (
1429 reqwest::header::HeaderName::from_bytes(key.as_bytes()),
1430 reqwest::header::HeaderValue::from_str(&value),
1431 ) {
1432 map.insert(header_name, header_value);
1433 }
1434 }
1435
1436 map
1437 })
1438 .send()
1439 .await
1440 {
1441 Ok(response) => {
1442 if !response.status().is_success() {
1443 let error = format!("Download failed with status: {}", response.status());
1444
1445 let _ = tx
1446 .send(Ok(DownloadStreamResponse {
1447 request_id:download_request_id.clone(),
1448 chunk:vec![].into(),
1449 total_size:0,
1450 downloaded:0,
1451 completed:false,
1452 error:error.clone(),
1453 }))
1454 .await;
1455
1456 AppState
1457 .UpdateRequestStatus(
1458 &download_request_id,
1459 crate::ApplicationState::RequestState::Failed(error),
1460 None,
1461 )
1462 .await
1463 .ok();
1464
1465 return;
1466 }
1467
1468 total_size = Some(response.content_length().unwrap_or(0));
1469
1470 let response_tx = tx.clone();
1471
1472 let response_id = download_request_id.clone();
1473
1474 let mut stream = response.bytes_stream();
1476
1477 let mut buffer = Vec::with_capacity(chunk_size);
1478
1479 let mut last_progress:f32 = 0.0;
1480
1481 while let Some(chunk_result) = TokioStreamExt::next(&mut stream).await {
1482 if AppState.IsRequestCancelled(&download_request_id).await {
1483 dev_log!(
1484 "grpc",
1485 "[AirVinegRPCService] Download cancelled by client [ID: {}]",
1486 download_request_id
1487 );
1488
1489 AppState
1490 .UpdateRequestStatus(
1491 &download_request_id,
1492 crate::ApplicationState::RequestState::Cancelled,
1493 None,
1494 )
1495 .await
1496 .ok();
1497
1498 return;
1499 }
1500
1501 match chunk_result {
1502 Ok(chunk) => {
1503 buffer.extend_from_slice(&chunk);
1504
1505 total_downloaded += chunk.len() as u64;
1506
1507 if buffer.len() >= chunk_size {
1509 let _chunk_checksum = calculate_chunk_checksum(&buffer);
1511
1512 let progress = if let Some(ts) = total_size {
1514 if ts > 0 { (total_downloaded as f32 / ts as f32) * 100.0 } else { 0.0 }
1515 } else {
1516 0.0
1517 };
1518
1519 if progress - last_progress >= 5.0 {
1521 AppState
1522 .UpdateRequestStatus(
1523 &download_request_id,
1524 crate::ApplicationState::RequestState::InProgress,
1525 Some(progress),
1526 )
1527 .await
1528 .ok();
1529
1530 last_progress = progress;
1531 }
1532
1533 if response_tx
1534 .send(Ok(DownloadStreamResponse {
1535 request_id:response_id.clone(),
1536 chunk:buffer.clone().into(),
1537 total_size:total_size.unwrap_or(0),
1538 downloaded:total_downloaded,
1539 completed:false,
1540 error:String::new(),
1541 }))
1542 .await
1543 .is_err()
1544 {
1545 dev_log!(
1546 "grpc",
1547 "warn: [AirVinegRPCService] Client disconnected during streaming [ID: {}]",
1548 download_request_id
1549 );
1550
1551 AppState
1552 .UpdateRequestStatus(
1553 &download_request_id,
1554 crate::ApplicationState::RequestState::Failed(
1555 "Client disconnected".to_string(),
1556 ),
1557 None,
1558 )
1559 .await
1560 .ok();
1561
1562 return;
1563 }
1564
1565 dev_log!(
1566 "grpc",
1567 "[AirVinegRPCService] Sent chunk of {} bytes [ID: {}] - Progress: {:.1}%",
1568 buffer.len(),
1569 download_request_id,
1570 progress
1571 );
1572
1573 buffer.clear();
1574 }
1575 },
1576 Err(e) => {
1577 let error = format!("Download error: {}", e);
1578
1579 dev_log!(
1580 "grpc",
1581 "error: [AirVinegRPCService] Stream download failed [ID: {}]: {}",
1582 download_request_id,
1583 error
1584 );
1585
1586 let _ = response_tx
1587 .send(Ok(DownloadStreamResponse {
1588 request_id:response_id.clone(),
1589 chunk:vec![].into(),
1590 total_size:total_size.unwrap_or(0),
1591 downloaded:total_downloaded,
1592 completed:false,
1593 error:error.clone(),
1594 }))
1595 .await;
1596
1597 AppState
1598 .UpdateRequestStatus(
1599 &download_request_id,
1600 crate::ApplicationState::RequestState::Failed(error),
1601 None,
1602 )
1603 .await
1604 .ok();
1605
1606 return;
1607 },
1608 }
1609 }
1610
1611 if !buffer.is_empty() {
1613 let _chunk_checksum = calculate_chunk_checksum(&buffer);
1614
1615 if tx
1616 .send(Ok(DownloadStreamResponse {
1617 request_id:download_request_id.clone(),
1618 chunk:buffer.into(),
1619 total_size:total_size.unwrap_or(0),
1620 downloaded:total_downloaded,
1621 completed:false,
1622 error:String::new(),
1623 }))
1624 .await
1625 .is_err()
1626 {
1627 dev_log!(
1628 "grpc",
1629 "warn: [AirVinegRPCService] Client disconnected while sending final chunk [ID: {}]",
1630 download_request_id
1631 );
1632
1633 return;
1634 }
1635 }
1636
1637 AppState
1639 .UpdateRequestStatus(
1640 &download_request_id,
1641 crate::ApplicationState::RequestState::Completed,
1642 Some(100.0),
1643 )
1644 .await
1645 .ok();
1646
1647 let _ = tx
1648 .send(Ok(DownloadStreamResponse {
1649 request_id,
1650 chunk:vec![].into(),
1651 total_size:total_size.unwrap_or(0),
1652 downloaded:total_downloaded,
1653 completed:true,
1654 error:String::new(),
1655 }))
1656 .await;
1657
1658 dev_log!(
1659 "grpc",
1660 "[AirVinegRPCService] Stream download completed [ID: {}] - Total: {} bytes",
1661 download_request_id,
1662 total_downloaded
1663 );
1664 },
1665 Err(e) => {
1666 let error = format!("Failed to start streaming download: {}", e);
1667
1668 dev_log!(
1669 "grpc",
1670 "error: [AirVinegRPCService] Stream download error [ID: {}]: {}",
1671 download_request_id,
1672 error
1673 );
1674
1675 let _ = tx
1676 .send(Ok(DownloadStreamResponse {
1677 request_id:download_request_id.clone(),
1678 chunk:vec![].into(),
1679 total_size:0,
1680 downloaded:0,
1681 completed:false,
1682 error:error.clone(),
1683 }))
1684 .await;
1685
1686 AppState
1687 .UpdateRequestStatus(
1688 &download_request_id,
1689 crate::ApplicationState::RequestState::Failed(error),
1690 None,
1691 )
1692 .await
1693 .ok();
1694 },
1695 }
1696 });
1697
1698 Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx)))
1699 }
1700
1701 async fn search_files(
1705 &self,
1706
1707 request:Request<SearchRequest>,
1708 ) -> std::result::Result<Response<SearchResponse>, Status> {
1709 let RequestData = request.into_inner();
1710
1711 let request_id = RequestData.request_id.clone();
1712
1713 dev_log!(
1714 "grpc",
1715 "[AirVinegRPCService] Search files request: query='{}' in path='{}'",
1716 RequestData.query,
1717 RequestData.path
1718 );
1719
1720 if RequestData.query.is_empty() {
1722 return Ok(Response::new(SearchResponse {
1723 request_id,
1724 results:vec![],
1725 total_results:0,
1726 error:"Search query cannot be empty".to_string(),
1727 }));
1728 }
1729
1730 let path = if RequestData.path.is_empty() { None } else { Some(RequestData.path.clone()) };
1732
1733 let _search_path = path.as_deref();
1734
1735 match self
1736 .FileIndexer
1737 .SearchFiles(
1738 SearchQuery {
1739 query:RequestData.query.clone(),
1740 mode:SearchMode::Literal,
1741 case_sensitive:false,
1742 whole_word:false,
1743 regex:None,
1744 max_results:RequestData.max_results,
1745 page:1,
1746 },
1747 path,
1748 None,
1749 )
1750 .await
1751 {
1752 Ok(search_results) => {
1753 let mut file_results = Vec::new();
1755
1756 for r in search_results {
1757 let (match_preview, line_number) = if let Some(first_match) = r.matches.first() {
1759 (first_match.line_content.clone(), first_match.line_number)
1760 } else {
1761 (String::new(), 0)
1762 };
1763
1764 let size = if let Ok(Some(metadata)) = self.FileIndexer.GetFileInfo(r.path.clone()).await {
1766 metadata.size
1767 } else if let Ok(file_metadata) = std::fs::metadata(&r.path) {
1768 file_metadata.len()
1769 } else {
1770 0
1771 };
1772
1773 file_results.push(FileResult { path:r.path, size, match_preview, line_number });
1774 }
1775
1776 dev_log!(
1777 "grpc",
1778 "[AirVinegRPCService] Search completed: {} results found",
1779 file_results.len()
1780 );
1781
1782 let result_count = file_results.len();
1783
1784 Ok(Response::new(SearchResponse {
1785 request_id,
1786 results:file_results,
1787 total_results:result_count as u32,
1788 error:String::new(),
1789 }))
1790 },
1791
1792 Err(e) => {
1793 dev_log!("grpc", "error: [AirVinegRPCService] Search failed: {}", e);
1794
1795 Ok(Response::new(SearchResponse {
1796 request_id,
1797 results:vec![],
1798 total_results:0,
1799 error:e.to_string(),
1800 }))
1801 },
1802 }
1803 }
1804
1805 async fn get_file_info(
1807 &self,
1808
1809 request:Request<FileInfoRequest>,
1810 ) -> std::result::Result<Response<FileInfoResponse>, Status> {
1811 let RequestData = request.into_inner();
1812
1813 let request_id = RequestData.request_id.clone();
1814
1815 dev_log!("grpc", "[AirVinegRPCService] Get file info request: {}", RequestData.path);
1816
1817 if RequestData.path.is_empty() {
1819 return Ok(Response::new(FileInfoResponse {
1820 request_id,
1821 exists:false,
1822 size:0,
1823 mime_type:String::new(),
1824 checksum:String::new(),
1825 modified_time:0,
1826 error:"Path cannot be empty".to_string(),
1827 }));
1828 }
1829
1830 use std::path::Path;
1832
1833 let path = Path::new(&RequestData.path);
1834
1835 if !path.exists() {
1836 return Ok(Response::new(FileInfoResponse {
1837 request_id,
1838 exists:false,
1839 size:0,
1840 mime_type:String::new(),
1841 checksum:String::new(),
1842 modified_time:0,
1843 error:String::new(), }));
1845 }
1846
1847 match std::fs::metadata(path) {
1849 Ok(metadata) => {
1850 let modified_time = metadata
1851 .modified()
1852 .ok()
1853 .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
1854 .map(|d| d.as_secs())
1855 .unwrap_or(0);
1856
1857 let mime_type = self.detect_mime_type(path);
1859
1860 let checksum = calculate_file_checksum(path).await.unwrap_or_else(|e| {
1862 dev_log!("grpc", "warn: [AirVinegRPCService] Failed to calculate checksum: {}", e);
1863
1864 String::new()
1865 });
1866
1867 Ok(Response::new(FileInfoResponse {
1868 request_id,
1869 exists:true,
1870 size:metadata.len(),
1871 mime_type,
1872 checksum,
1873 modified_time,
1874 error:String::new(),
1875 }))
1876 },
1877
1878 Err(e) => {
1879 dev_log!("grpc", "error: [AirVinegRPCService] Failed to get file metadata: {}", e);
1880
1881 Ok(Response::new(FileInfoResponse {
1882 request_id,
1883 exists:false,
1884 size:0,
1885 mime_type:String::new(),
1886 checksum:String::new(),
1887 modified_time:0,
1888 error:e.to_string(),
1889 }))
1890 },
1891 }
1892 }
1893
1894 async fn get_metrics(
1898 &self,
1899
1900 request:Request<MetricsRequest>,
1901 ) -> std::result::Result<Response<MetricsResponse>, Status> {
1902 let RequestData = request.into_inner();
1903
1904 let request_id = RequestData.request_id.clone();
1905
1906 dev_log!(
1907 "grpc",
1908 "[AirVinegRPCService] Get metrics request: type='{}'",
1909 RequestData.metric_type
1910 );
1911
1912 let metrics = self.AppState.GetMetrics().await;
1913
1914 let mut metrics_map = std::collections::HashMap::new();
1915
1916 if RequestData.metric_type.is_empty() || RequestData.metric_type == "performance" {
1918 metrics_map.insert("uptime_seconds".to_string(), metrics.UptimeSeconds.to_string());
1919
1920 metrics_map.insert("total_requests".to_string(), metrics.TotalRequest.to_string());
1921
1922 metrics_map.insert("successful_requests".to_string(), metrics.SuccessfulRequest.to_string());
1923
1924 metrics_map.insert("failed_requests".to_string(), metrics.FailedRequest.to_string());
1925
1926 metrics_map.insert("average_response_time_ms".to_string(), metrics.AverageResponseTime.to_string());
1927 }
1928
1929 if RequestData.metric_type.is_empty() || RequestData.metric_type == "requests" {
1931 metrics_map.insert(
1932 "ActiveRequests".to_string(),
1933 self.AppState.GetActiveRequestCount().await.to_string(),
1934 );
1935 }
1936
1937 Ok(Response::new(MetricsResponse {
1938 request_id,
1939 metrics:metrics_map,
1940 error:String::new(),
1941 }))
1942 }
1943
1944 async fn get_resource_usage(
1946 &self,
1947
1948 request:Request<ResourceUsageRequest>,
1949 ) -> std::result::Result<Response<ResourceUsageResponse>, Status> {
1950 let RequestData = request.into_inner();
1951
1952 let request_id = RequestData.request_id.clone();
1953
1954 dev_log!("grpc", "[AirVinegRPCService] Get resource usage request");
1955
1956 let resources = self.AppState.GetResourceUsage().await;
1957
1958 Ok(Response::new(ResourceUsageResponse {
1959 request_id,
1960 memory_usage_mb:resources.MemoryUsageMb,
1961 cpu_usage_percent:resources.CPUUsagePercent,
1962 disk_usage_mb:resources.DiskUsageMb,
1963 network_usage_mbps:resources.NetworkUsageMbps,
1964 error:String::new(),
1965 }))
1966 }
1967
1968 async fn set_resource_limits(
1970 &self,
1971
1972 request:Request<ResourceLimitsRequest>,
1973 ) -> std::result::Result<Response<ResourceLimitsResponse>, Status> {
1974 let RequestData = request.into_inner();
1975
1976 let request_id = RequestData.request_id.clone();
1977
1978 dev_log!(
1979 "grpc",
1980 "[AirVinegRPCService] Set resource limits: memory={}MB, cpu={}%, disk={}MB",
1981 RequestData.memory_limit_mb,
1982 RequestData.cpu_limit_percent,
1983 RequestData.disk_limit_mb
1984 );
1985
1986 if RequestData.memory_limit_mb == 0 {
1988 return Ok(Response::new(ResourceLimitsResponse {
1989 request_id,
1990 success:false,
1991 error:"Memory limit must be greater than 0".to_string(),
1992 }));
1993 }
1994
1995 if RequestData.cpu_limit_percent > 100 {
1996 return Ok(Response::new(ResourceLimitsResponse {
1997 request_id,
1998 success:false,
1999 error:"CPU limit cannot exceed 100%".to_string(),
2000 }));
2001 }
2002
2003 let result = self
2005 .AppState
2006 .SetResourceLimits(
2007 Some(RequestData.memory_limit_mb as u64),
2008 Some(RequestData.cpu_limit_percent as f64),
2009 Some(RequestData.disk_limit_mb as u64),
2010 )
2011 .await;
2012
2013 match result {
2014 Ok(_) => {
2015 Ok(Response::new(ResourceLimitsResponse {
2016 request_id,
2017 success:true,
2018 error:String::new(),
2019 }))
2020 },
2021
2022 Err(e) => {
2023 Ok(Response::new(ResourceLimitsResponse {
2024 request_id,
2025 success:false,
2026 error:e.to_string(),
2027 }))
2028 },
2029 }
2030 }
2031
2032 async fn get_configuration(
2036 &self,
2037
2038 request:Request<ConfigurationRequest>,
2039 ) -> std::result::Result<Response<ConfigurationResponse>, Status> {
2040 let RequestData = request.into_inner();
2041
2042 let request_id = RequestData.request_id.clone();
2043
2044 dev_log!(
2045 "grpc",
2046 "[AirVinegRPCService] Get configuration request: section='{}'",
2047 RequestData.section
2048 );
2049
2050 let config = self.AppState.GetConfiguration().await;
2052
2053 let mut config_map = std::collections::HashMap::new();
2054
2055 match RequestData.section.as_str() {
2057 "grpc" => {
2058 config_map.insert("bind_address".to_string(), config.gRPC.BindAddress.clone());
2059
2060 config_map.insert("max_connections".to_string(), config.gRPC.MaxConnections.to_string());
2061
2062 config_map.insert("request_timeout_secs".to_string(), config.gRPC.RequestTimeoutSecs.to_string());
2063 },
2064
2065 "authentication" => {
2066 config_map.insert("enabled".to_string(), config.Authentication.Enabled.to_string());
2067
2068 config_map.insert("credentials_path".to_string(), "***REDACTED***".to_string());
2069
2070 config_map.insert(
2071 "token_expiration_hours".to_string(),
2072 config.Authentication.TokenExpirationHours.to_string(),
2073 );
2074 },
2075
2076 "updates" => {
2077 config_map.insert("enabled".to_string(), config.Updates.Enabled.to_string());
2078
2079 config_map.insert(
2080 "check_interval_hours".to_string(),
2081 config.Updates.CheckIntervalHours.to_string(),
2082 );
2083
2084 config_map.insert("update_server_url".to_string(), config.Updates.UpdateServerUrl.clone());
2085
2086 config_map.insert("auto_download".to_string(), config.Updates.AutoDownload.to_string());
2087
2088 config_map.insert("auto_install".to_string(), config.Updates.AutoInstall.to_string());
2089 },
2090
2091 "downloader" => {
2092 config_map.insert("enabled".to_string(), config.Downloader.Enabled.to_string());
2093
2094 config_map.insert(
2095 "max_concurrent_downloads".to_string(),
2096 config.Downloader.MaxConcurrentDownloads.to_string(),
2097 );
2098
2099 config_map.insert(
2100 "download_timeout_secs".to_string(),
2101 config.Downloader.DownloadTimeoutSecs.to_string(),
2102 );
2103
2104 config_map.insert("max_retries".to_string(), config.Downloader.MaxRetries.to_string());
2105
2106 config_map.insert("cache_directory".to_string(), config.Downloader.CacheDirectory.clone());
2107 },
2108
2109 "indexing" => {
2110 config_map.insert("enabled".to_string(), config.Indexing.Enabled.to_string());
2111
2112 config_map.insert("max_file_size_mb".to_string(), config.Indexing.MaxFileSizeMb.to_string());
2113
2114 config_map.insert("file_types".to_string(), config.Indexing.FileTypes.join(","));
2115
2116 config_map.insert(
2117 "update_interval_minutes".to_string(),
2118 config.Indexing.UpdateIntervalMinutes.to_string(),
2119 );
2120
2121 config_map.insert("index_directory".to_string(), config.Indexing.IndexDirectory.clone());
2122 },
2123
2124 _ => {
2125 config_map.insert("_grpc_enabled".to_string(), "true".to_string());
2127 },
2128 }
2129
2130 Ok(Response::new(ConfigurationResponse {
2131 request_id,
2132 configuration:config_map,
2133 error:String::new(),
2134 }))
2135 }
2136
2137 async fn update_configuration(
2139 &self,
2140
2141 request:Request<UpdateConfigurationRequest>,
2142 ) -> std::result::Result<Response<UpdateConfigurationResponse>, Status> {
2143 let RequestData = request.into_inner();
2144
2145 let request_id = RequestData.request_id.clone();
2146
2147 dev_log!(
2148 "grpc",
2149 "[AirVinegRPCService] Update configuration request: section='{}'",
2150 RequestData.section
2151 );
2152
2153 if !["grpc", "authentication", "updates", "downloader", "indexing", ""].contains(&RequestData.section.as_str())
2155 {
2156 return Ok(Response::new(UpdateConfigurationResponse {
2157 request_id,
2158 success:false,
2159 error:"Invalid configuration section".to_string(),
2160 }));
2161 }
2162
2163 let result = self
2165 .AppState
2166 .UpdateConfiguration(RequestData.section, RequestData.updates)
2167 .await;
2168
2169 match result {
2170 Ok(_) => {
2171 Ok(Response::new(UpdateConfigurationResponse {
2172 request_id,
2173 success:true,
2174 error:String::new(),
2175 }))
2176 },
2177
2178 Err(e) => {
2179 Ok(Response::new(UpdateConfigurationResponse {
2180 request_id,
2181 success:false,
2182 error:e.to_string(),
2183 }))
2184 },
2185 }
2186 }
2187}
2188
2189impl AirVinegRPCService {
2192 fn detect_mime_type(&self, path:&std::path::Path) -> String {
2194 match path.extension().and_then(|e| e.to_str()) {
2195 Some("rs") => "text/x-rust".to_string(),
2196
2197 Some("ts") => "application/typescript".to_string(),
2198
2199 Some("js") => "application/javascript".to_string(),
2200
2201 Some("json") => "application/json".to_string(),
2202
2203 Some("toml") => "application/toml".to_string(),
2204
2205 Some("md") => "text/markdown".to_string(),
2206
2207 Some("txt") => "text/plain".to_string(),
2208
2209 Some("yaml") | Some("yml") => "application/x-yaml".to_string(),
2210
2211 Some("html") => "text/html".to_string(),
2212
2213 Some("css") => "text/css".to_string(),
2214
2215 Some("xml") => "application/xml".to_string(),
2216
2217 Some("png") => "image/png".to_string(),
2218
2219 Some("jpg") | Some("jpeg") => "image/jpeg".to_string(),
2220
2221 Some("gif") => "image/gif".to_string(),
2222
2223 Some("svg") => "image/svg+xml".to_string(),
2224
2225 Some("pdf") => "application/pdf".to_string(),
2226
2227 Some("zip") => "application/zip".to_string(),
2228
2229 Some("tar") | Some("gz") => "application/x-tar".to_string(),
2230
2231 Some("proto") => "application/x-protobuf".to_string(),
2232
2233 _ => "application/octet-stream".to_string(),
2234 }
2235 }
2236
2237 async fn download_file_with_retry(
2240 &self,
2241
2242 request_id:&str,
2243
2244 url:String,
2245
2246 DestinationPath:String,
2247
2248 checksum:String,
2249
2250 progress_callback:Option<Box<dyn Fn(f32) + Send>>,
2251 ) -> Result<crate::Downloader::DownloadResult> {
2252 let config = &self.AppState.Configuration.Downloader;
2253
2254 let mut retries = 0;
2255
2256 loop {
2257 match self
2258 .DownloadManager
2259 .DownloadFile(url.clone(), DestinationPath.clone(), checksum.clone())
2260 .await
2261 {
2262 Ok(file_info) => {
2263 if let Some(ref callback) = progress_callback {
2264 callback(100.0);
2265 }
2266
2267 return Ok(file_info);
2268 },
2269
2270 Err(e) => {
2271 if retries < config.MaxRetries as usize {
2272 retries += 1;
2273
2274 let backoff_secs = 2u64.pow(retries as u32);
2275
2276 dev_log!(
2277 "grpc",
2278 "warn: [AirVinegRPCService] Download failed [ID: {}], retrying (attempt {}/{}): {} - \
2279 Backing off {} seconds",
2280 request_id,
2281 retries,
2282 config.MaxRetries,
2283 e,
2284 backoff_secs
2285 );
2286
2287 if let Some(ref callback) = progress_callback {
2288 let progress = (retries as f32 / config.MaxRetries as f32) * 10.0;
2290
2291 callback(progress);
2292 }
2293
2294 tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
2295 } else {
2296 dev_log!(
2297 "grpc",
2298 "error: [AirVinegRPCService] Download failed after {} retries [ID: {}]: {}",
2299 config.MaxRetries,
2300 request_id,
2301 e
2302 );
2303
2304 return Err(e);
2305 }
2306 },
2307 }
2308 }
2309 }
2310
2311 async fn validate_range_support(&self, url:&str) -> Result<bool> {
2313 let dns_port = Mist::dns_port();
2314
2315 let client = crate::HTTP::Client::secured_client_builder(dns_port)
2316 .map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client builder: {}", e)))?
2317 .timeout(std::time::Duration::from_secs(10))
2318 .build()
2319 .map_err(|e| crate::AirError::Network(format!("Failed to create HTTP client for validation: {}", e)))?;
2320
2321 let response:reqwest::Response = client
2322 .head(url)
2323 .send()
2324 .await
2325 .map_err(|e| crate::AirError::Network(format!("Failed to send HEAD request: {}", e)))?;
2326
2327 let accepts_ranges = response
2329 .headers()
2330 .get("accept-ranges")
2331 .map(|v:&reqwest::header::HeaderValue| v.to_str().unwrap_or("none"))
2332 .unwrap_or("none");
2333
2334 Ok(accepts_ranges == "bytes")
2335 }
2336
2337 async fn prepare_rollback_backup(&self, version:&str) -> Result<()> {
2339 let cache_dir = self.UpdateManager.GetCacheDirectory();
2340
2341 let rollback_dir = cache_dir.join("rollback");
2342
2343 if let Err(e) = tokio::fs::create_dir_all(&rollback_dir).await {
2345 return Err(AirError::FileSystem(format!("Failed to create rollback directory: {}", e)));
2346 }
2347
2348 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2350
2351 let marker_content = format!(
2352 "version={}\ntimestamp={}\nrollback_available=true",
2353 version,
2354 chrono::Utc::now().to_rfc3339()
2355 );
2356
2357 if let Err(e) = tokio::fs::write(&backup_file, marker_content).await {
2358 return Err(AirError::FileSystem(format!("Failed to create backup marker: {}", e)));
2359 }
2360
2361 dev_log!(
2362 "grpc",
2363 "[AirVinegRPCService] Rollback backup prepared for version {} at {:?}",
2364 version,
2365 backup_file
2366 );
2367
2368 Ok(())
2369 }
2370
2371 async fn cleanup_rollback_backup(&self, version:&str) -> Result<()> {
2373 let cache_dir = self.UpdateManager.GetCacheDirectory();
2374
2375 let rollback_dir = cache_dir.join("rollback");
2376
2377 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2378
2379 if backup_file.exists() {
2380 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2381 return Err(AirError::FileSystem(format!("Failed to cleanup rollback backup: {}", e)));
2382 }
2383
2384 dev_log!(
2385 "grpc",
2386 "[AirVinegRPCService] Rollback backup cleaned up for version {}",
2387 version
2388 );
2389 }
2390
2391 Ok(())
2392 }
2393
2394 async fn perform_rollback(&self, version:&str) -> Result<()> {
2396 let cache_dir = self.UpdateManager.GetCacheDirectory();
2397
2398 let rollback_dir = cache_dir.join("rollback");
2399
2400 let backup_file = rollback_dir.join(format!("backup-{}.marker", version));
2401
2402 if !backup_file.exists() {
2403 return Err(AirError::FileSystem(format!(
2404 "Rollback backup not found for version {}",
2405 version
2406 )));
2407 }
2408
2409 dev_log!("grpc", "[AirVinegRPCService] Starting rollback for version {}", version);
2410
2411 let marker_content = tokio::fs::read_to_string(&backup_file)
2413 .await
2414 .map_err(|e| format!("Failed to read backup marker: {}", e))?;
2415
2416 let mut timestamp = None;
2418
2419 let mut rollback_available = false;
2420
2421 for line in marker_content.lines() {
2422 if let Some(value) = line.strip_prefix("timestamp=") {
2423 timestamp = Some(value.to_string());
2424 } else if line == "rollback_available=true" {
2425 rollback_available = true;
2426 }
2427 }
2428
2429 if !rollback_available {
2430 return Err(AirError::Validation("Rollback not available for this version".to_string()));
2431 }
2432
2433 dev_log!(
2440 "grpc",
2441 "[AirVinegRPCService] Rollback completed for version {} (backup timestamp: {:?})",
2442 version,
2443 timestamp
2444 );
2445
2446 if let Err(e) = tokio::fs::remove_file(&backup_file).await {
2448 dev_log!(
2449 "grpc",
2450 "warn: [AirVinegRPCService] Failed to cleanup backup marker after rollback: {}",
2451 e
2452 );
2453 }
2454
2455 Ok(())
2456 }
2457}
2458
2459fn match_url_scheme(url:&str) -> bool {
2461 url.to_lowercase().starts_with("http://") || url.to_lowercase().starts_with("https://")
2462}
2463
2464fn calculate_chunk_checksum(chunk:&[u8]) -> String {
2466 use sha2::{Digest, Sha256};
2469
2470 let mut hasher = Sha256::new();
2471
2472 hasher.update(chunk);
2473
2474 hex::encode(hasher.finalize())
2475}
2476
2477async fn calculate_file_checksum(path:&std::path::Path) -> Result<String> {
2479 use sha2::{Digest, Sha256};
2480 use tokio::io::AsyncReadExt;
2481
2482 let mut file = tokio::fs::File::open(path)
2483 .await
2484 .map_err(|e| AirError::FileSystem(format!("Failed to open file for checksum: {}", e)))?;
2485
2486 let mut hasher = Sha256::new();
2487
2488 let mut buffer = vec![0u8; 8192];
2489
2490 loop {
2491 let bytes_read = file
2492 .read(&mut buffer)
2493 .await
2494 .map_err(|e| AirError::FileSystem(format!("Failed to read file for checksum: {}", e)))?;
2495
2496 if bytes_read == 0 {
2497 break;
2498 }
2499
2500 hasher.update(&buffer[..bytes_read]);
2501 }
2502
2503 let result = hasher.finalize();
2504
2505 Ok(hex::encode(result))
2506}