1pub mod RateLimit;
104
105pub mod Types;
106
107use std::{
108 collections::{HashMap, VecDeque},
109 path::{Path, PathBuf},
110 sync::Arc,
111 time::{Duration, Instant},
112};
113
114use serde::{Deserialize, Serialize};
115use tokio::sync::{RwLock, Semaphore};
116
117use crate::{
118 AirError,
119 ApplicationState::ApplicationState,
120 Configuration::ConfigurationManager,
121 Result,
122 Utility,
123 dev_log,
124};
125
126pub struct DownloadManager {
128 AppState:Arc<ApplicationState>,
130
131 ActiveDownloads:Arc<RwLock<HashMap<String, DownloadStatus>>>,
133
134 DownloadQueue:Arc<RwLock<VecDeque<QueuedDownload>>>,
136
137 CacheDirectory:PathBuf,
139
140 client:reqwest::Client,
142
143 ChecksumVerifier:Arc<crate::Security::ChecksumVerifier>,
145
146 BandwidthLimiter:Arc<Semaphore>,
148
149 TokenBucket:Arc<RwLock<TokenBucket>>,
151
152 ConcurrentLimiter:Arc<Semaphore>,
154
155 statistics:Arc<RwLock<DownloadStatistics>>,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
161pub struct DownloadStatus {
162 pub DownloadId:String,
163
164 pub url:String,
165
166 pub destination:PathBuf,
167
168 pub TotalSize:u64,
169
170 pub downloaded:u64,
171
172 pub progress:f32,
173
174 pub status:DownloadState,
175
176 pub error:Option<String>,
177
178 pub StartedAt:Option<chrono::DateTime<chrono::Utc>>,
179
180 pub CompletedAt:Option<chrono::DateTime<chrono::Utc>>,
181
182 pub ChunksCompleted:usize,
183
184 pub TotalChunks:usize,
185
186 pub DownloadRateBytesPerSec:u64,
187
188 pub ExpectedChecksum:Option<String>,
189
190 pub ActualChecksum:Option<String>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
195pub enum DownloadState {
196 Pending,
197
198 Queued,
199
200 Downloading,
201
202 Verifying,
203
204 Completed,
205
206 Failed,
207
208 Cancelled,
209
210 Paused,
211
212 Resuming,
213}
214
215#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
217pub enum DownloadPriority {
218 High = 3,
219
220 Normal = 2,
221
222 Low = 1,
223
224 Background = 0,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct QueuedDownload {
230 DownloadId:String,
231
232 url:String,
233
234 destination:PathBuf,
235
236 checksum:String,
237
238 priority:DownloadPriority,
239
240 AddedAt:chrono::DateTime<chrono::Utc>,
241
242 MaxFileSize:Option<u64>,
243
244 ValidateDiskSpace:bool,
245}
246
247#[derive(Debug, Clone)]
249pub struct DownloadResult {
250 pub path:String,
251
252 pub size:u64,
253
254 pub checksum:String,
255
256 pub duration:Duration,
257
258 pub AverageRate:u64,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct DownloadStatistics {
264 pub TotalDownloads:u64,
265
266 pub SuccessfulDownloads:u64,
267
268 pub FailedDownloads:u64,
269
270 pub CancelledDownloads:u64,
271
272 pub TotalBytesDownloaded:u64,
273
274 pub TotalDownloadTimeSecs:f64,
275
276 pub AverageDownloadRate:f64,
277
278 pub PeakDownloadRate:u64,
279
280 pub ActiveDownloads:usize,
281
282 pub QueuedDownloads:usize,
283}
284
285pub type ProgressCallback = Arc<dyn Fn(DownloadStatus) + Send + Sync>;
287
288#[derive(Debug)]
296struct TokenBucket {
297 tokens:f64,
299
300 capacity:f64,
302
303 refill_rate:f64,
305
306 last_refill:Instant,
308}
309
310impl TokenBucket {
311 fn new(bytes_per_sec:u64, capacity_factor:f64) -> Self {
313 let refill_rate = bytes_per_sec as f64;
314
315 let capacity = refill_rate * capacity_factor; Self { tokens:capacity, capacity, refill_rate, last_refill:Instant::now() }
318 }
319
320 fn refill(&mut self) {
322 let elapsed = self.last_refill.elapsed().as_secs_f64();
323
324 if elapsed > 0.0 {
325 let new_tokens = elapsed * self.refill_rate;
326
327 self.tokens = (self.tokens + new_tokens).min(self.capacity);
328
329 self.last_refill = Instant::now();
330 }
331 }
332
333 fn try_consume(&mut self, bytes:u64) -> u64 {
336 self.refill();
337
338 let bytes = bytes as f64;
339
340 if self.tokens >= bytes {
341 self.tokens -= bytes;
342
343 return bytes as u64;
344 }
345
346 let available = self.tokens;
348
349 self.tokens = 0.0;
350
351 available as u64
352 }
353
354 async fn consume(&mut self, bytes:u64) -> Result<()> {
356 let bytes_needed = bytes as f64;
357
358 loop {
359 self.refill();
360
361 if self.tokens >= bytes_needed {
362 self.tokens -= bytes_needed;
363
364 return Ok(());
365 }
366
367 let tokens_needed = bytes_needed - self.tokens;
369
370 let wait_duration = tokens_needed / self.refill_rate;
371
372 let sleep_duration = Duration::from_secs_f64(wait_duration.min(0.1));
374
375 tokio::time::sleep(sleep_duration).await;
376 }
377 }
378
379 fn set_rate(&mut self, bytes_per_sec:u64) {
381 self.refill_rate = bytes_per_sec as f64;
382
383 self.capacity = self.refill_rate * 5.0; }
385}
386
387#[derive(Debug, Clone)]
389pub struct DownloadConfig {
390 pub url:String,
391
392 pub destination:String,
393
394 pub checksum:String,
395
396 pub MaxFileSize:Option<u64>,
397
398 pub ChunkSize:usize,
399
400 pub MaxRetries:u32,
401
402 pub TimeoutSecs:u64,
403
404 pub priority:DownloadPriority,
405
406 pub ValidateDiskSpace:bool,
407}
408
409impl Default for DownloadConfig {
410 fn default() -> Self {
411 Self {
412 url:String::new(),
413
414 destination:String::new(),
415
416 checksum:String::new(),
417
418 MaxFileSize:None,
419
420 ChunkSize:8 * 1024 * 1024, MaxRetries:5,
423
424 TimeoutSecs:300,
425
426 priority:DownloadPriority::Normal,
427
428 ValidateDiskSpace:true,
429 }
430 }
431}
432
433impl DownloadManager {
434 pub async fn new(AppState:Arc<ApplicationState>) -> Result<Self> {
436 let config = &AppState.Configuration.Downloader;
437
438 let CacheDirectory = ConfigurationManager::ExpandPath(&config.CacheDirectory)?;
440
441 let CacheDirectoryClone = CacheDirectory.clone();
443
444 let CacheDirectoryCloneForInit = CacheDirectoryClone.clone();
446
447 tokio::fs::create_dir_all(&CacheDirectory)
449 .await
450 .map_err(|e| AirError::Configuration(format!("Failed to create cache directory: {}", e)))?;
451
452 let dns_port = Mist::dns_port();
454
455 let client = crate::HTTP::Client::secured_client_builder(dns_port)
456 .map_err(|e| AirError::Network(format!("Failed to create HTTP client: {}", e)))?
457 .timeout(Duration::from_secs(config.DownloadTimeoutSecs))
458 .connect_timeout(Duration::from_secs(30))
459 .pool_idle_timeout(Duration::from_secs(90))
460 .pool_max_idle_per_host(10)
461 .tcp_keepalive(Duration::from_secs(60))
462 .user_agent("Land-AirDownloader/0.1.0")
463 .build()
464 .map_err(|e| AirError::Network(format!("Failed to build HTTP client: {}", e)))?;
465
466 let BandwidthLimiter = Arc::new(Semaphore::new(100));
468
469 let TokenBucket = Arc::new(RwLock::new(TokenBucket::new(100 * 1024 * 1024, 5.0)));
471
472 let ConcurrentLimiter = Arc::new(Semaphore::new(5));
474
475 let manager = Self {
476 AppState,
477
478 ActiveDownloads:Arc::new(RwLock::new(HashMap::new())),
479
480 DownloadQueue:Arc::new(RwLock::new(VecDeque::new())),
481
482 CacheDirectory:CacheDirectoryCloneForInit,
483
484 client,
485
486 ChecksumVerifier:Arc::new(crate::Security::ChecksumVerifier::New()),
487
488 BandwidthLimiter,
489
490 TokenBucket,
491
492 ConcurrentLimiter,
493
494 statistics:Arc::new(RwLock::new(DownloadStatistics::default())),
495 };
496
497 manager
499 .AppState
500 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Running)
501 .await
502 .map_err(|e| AirError::Internal(e.to_string()))?;
503
504 dev_log!(
505 "update",
506 "[DownloadManager] Initialized with cache directory: {}",
507 CacheDirectory.display()
508 );
509
510 Ok(manager)
511 }
512
513 pub async fn DownloadFile(&self, url:String, DestinationPath:String, checksum:String) -> Result<DownloadResult> {
515 self.DownloadFileWithConfig(DownloadConfig { url, destination:DestinationPath, checksum, ..Default::default() })
516 .await
517 }
518
519 pub async fn DownloadFileWithConfig(&self, config:DownloadConfig) -> Result<DownloadResult> {
521 let SanitizedUrl = Self::ValidateAndSanitizeUrl(&config.url)?;
523
524 let DownloadId = Utility::GenerateRequestId();
526
527 dev_log!(
528 "update",
529 "[DownloadManager] Starting download [ID: {}] - URL: {}",
530 DownloadId,
531 SanitizedUrl
532 );
533
534 if SanitizedUrl.is_empty() {
536 return Err(AirError::Network("URL cannot be empty".to_string()));
537 }
538
539 let Destination = if config.destination.is_empty() {
541 let Filename = SanitizedUrl
543 .split('/')
544 .last()
545 .and_then(|s| s.split('?').next())
546 .unwrap_or("download.bin");
547
548 self.CacheDirectory.join(Filename)
549 } else {
550 ConfigurationManager::ExpandPath(&config.destination)?
551 };
552
553 Utility::ValidateFilePath(
555 Destination
556 .to_str()
557 .ok_or_else(|| AirError::Configuration("Invalid destination path".to_string()))?,
558 )?;
559
560 let ExpectedChecksum = if config.checksum.is_empty() { None } else { Some(config.checksum.clone()) };
562
563 self.RegisterDownload(&DownloadId, &SanitizedUrl, &Destination, ExpectedChecksum.clone())
565 .await?;
566
567 if config.ValidateDiskSpace {
569 if let Some(MaxSize) = config.MaxFileSize {
570 self.ValidateDiskSpace(&SanitizedUrl, &Destination, MaxSize * 2).await?;
571 } else {
572 self.ValidateDiskSpace(&SanitizedUrl, &Destination, 1024 * 1024 * 1024).await?; }
574 }
575
576 if let Some(Parent) = Destination.parent() {
578 tokio::fs::create_dir_all(Parent)
579 .await
580 .map_err(|e| AirError::FileSystem(format!("Failed to create destination directory: {}", e)))?;
581 }
582
583 let StartTime = Instant::now();
584
585 let Result = self.DownloadWithRetry(&DownloadId, &SanitizedUrl, &Destination, &config).await;
587
588 let Duration = StartTime.elapsed();
589
590 match Result {
591 Ok(mut FileInfo) => {
592 FileInfo.duration = Duration;
593
594 self.UpdateStatistics(true, FileInfo.size, Duration).await;
596
597 self.UpdateDownloadStatus(&DownloadId, DownloadState::Completed, Some(100.0), None)
598 .await?;
599
600 dev_log!(
601 "update",
602 "[DownloadManager] Download completed [ID: {}] - Size: {} bytes in {:.2}s ({:.2} MB/s)",
603 DownloadId,
604 FileInfo.size,
605 Duration.as_secs_f64(),
606 FileInfo.size as f64 / 1_048_576.0 / Duration.as_secs_f64()
607 );
608
609 Ok(FileInfo)
610 },
611
612 Err(E) => {
613 self.UpdateStatistics(false, 0, Duration).await;
615
616 self.UpdateDownloadStatus(&DownloadId, DownloadState::Failed, None, Some(E.to_string()))
617 .await?;
618
619 if Destination.exists() {
621 let _ = tokio::fs::remove_file(&Destination).await;
622
623 dev_log!(
624 "update",
625 "warn: [DownloadManager] Cleaned up failed download: {}",
626 Destination.display()
627 );
628 }
629
630 dev_log!(
631 "update",
632 "error: [DownloadManager] Download failed [ID: {}] - Error: {}",
633 DownloadId,
634 E
635 );
636
637 Err(E)
638 },
639 }
640 }
641
642 fn ValidateAndSanitizeUrl(url:&str) -> Result<String> {
644 let url = url.trim();
645
646 if url.is_empty() {
648 return Err(AirError::Network("URL cannot be empty".to_string()));
649 }
650
651 let parsed = url::Url::parse(url).map_err(|e| AirError::Network(format!("Invalid URL format: {}", e)))?;
653
654 match parsed.scheme() {
656 "http" | "https" => (),
657
658 scheme => {
659 return Err(AirError::Network(format!(
660 "Unsupported URL scheme: '{}'. Only http and https are allowed.",
661 scheme
662 )));
663 },
664 }
665
666 if parsed.host().is_none() {
668 return Err(AirError::Network("URL must have a valid host".to_string()));
669 }
670
671 #[cfg(debug_assertions)]
673 {
674
675 }
677
678 #[cfg(not(debug_assertions))]
679 {
680 if let Some(host) = parsed.host_str() {
681 if host == "localhost" || host == "127.0.0.1" || host == "::1" {
682 return Err(AirError::Network("Localhost addresses are not allowed".to_string()));
683 }
684
685 if host.starts_with("192.168.") || host.starts_with("10.") || host.starts_with("172.16.") {
686 return Err(AirError::Network("Private network addresses are not allowed".to_string()));
687 }
688 }
689 }
690
691 let mut sanitized = parsed.clone();
693
694 if sanitized.password().is_some() {
696 sanitized.set_password(Some("")).ok();
697 }
698
699 Ok(sanitized.to_string())
700 }
701
702 async fn ValidateDiskSpace(&self, url:&str, destination:&Path, RequiredBytes:u64) -> Result<()> {
704 let DestPath = if destination.is_absolute() {
706 destination.to_path_buf()
707 } else {
708 std::env::current_dir()
709 .map_err(|e| AirError::FileSystem(format!("Failed to get current directory: {}", e)))?
710 .join(destination)
711 };
712
713 let MountPoint = self.FindMountPoint(&DestPath)?;
715
716 dev_log!(
718 "update",
719 "[DownloadManager] Validating disk space for URL {} (requires {} bytes) on mount point: {}",
720 url,
721 RequiredBytes,
722 MountPoint.display()
723 );
724
725 #[cfg(unix)]
726 {
727 match self.GetDiskStatvfs(&MountPoint) {
728 Ok((AvailableBytes, TotalBytes)) => {
729 if AvailableBytes < RequiredBytes {
730 dev_log!(
731 "update",
732 "warn: [DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
733 AvailableBytes,
734 RequiredBytes
735 );
736
737 return Err(AirError::FileSystem(format!(
738 "Insufficient disk space: {} bytes available, {} bytes required",
739 AvailableBytes, RequiredBytes
740 )));
741 }
742
743 dev_log!(
744 "update",
745 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required (total: {})",
746 AvailableBytes,
747 RequiredBytes,
748 TotalBytes
749 );
750 },
751
752 Err(e) => {
753 dev_log!(
754 "update",
755 "warn: [DownloadManager] Failed to check disk space: {}, proceeding anyway",
756 e
757 );
758 },
759 }
760 }
761
762 #[cfg(windows)]
763 {
764 match self.GetDiskSpaceWindows(&MountPoint) {
765 Ok(AvailableBytes) => {
766 if AvailableBytes < RequiredBytes {
767 dev_log!(
768 "update",
769 "warn: [DownloadManager] Insufficient disk space: {} bytes available, {} bytes required",
770 AvailableBytes,
771 RequiredBytes
772 );
773
774 return Err(AirError::FileSystem(format!(
775 "Insufficient disk space: {} bytes available, {} bytes required",
776 available_bytes, RequiredBytes
777 )));
778 }
779
780 dev_log!(
781 "update",
782 "[DownloadManager] Sufficient disk space: {} bytes available, {} bytes required",
783 available_bytes,
784 RequiredBytes
785 );
786 },
787
788 Err(e) => {
789 dev_log!(
790 "update",
791 "warn: [DownloadManager] Failed to check disk space: {}, proceeding anyway",
792 e
793 );
794 },
795 }
796 }
797
798 #[cfg(not(any(unix, windows)))]
799 {
800 dev_log!(
801 "update",
802 "warn: [DownloadManager] Disk space validation not available on this platform"
803 );
804 }
805
806 Ok(())
807 }
808
809 #[cfg(unix)]
811 fn GetDiskStatvfs(&self, path:&Path) -> Result<(u64, u64)> {
812 use std::{ffi::CString, os::unix::ffi::OsStrExt};
813
814 dev_log!("update", "[DownloadManager] Checking disk space at: {}", path.display());
815
816 let path_cstr = CString::new(path.as_os_str().as_bytes())
818 .map_err(|e| AirError::FileSystem(format!("Failed to convert path to C string: {}", e)))?;
819
820 let mut stat:libc::statvfs = unsafe { std::mem::zeroed() };
822
823 let result = unsafe { libc::statvfs(path_cstr.as_ptr(), &mut stat) };
824
825 if result != 0 {
826 let err = std::io::Error::last_os_error();
827
828 return Err(AirError::FileSystem(format!("Failed to get disk stats: {}", err)));
829 }
830
831 let fragment_size = stat.f_frsize as u64;
833
834 let available_bytes = fragment_size * stat.f_bavail as u64;
835
836 let total_bytes = fragment_size * stat.f_blocks as u64;
837
838 dev_log!(
839 "update",
840 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
841 path.display(),
842 available_bytes,
843 total_bytes
844 );
845
846 Ok((available_bytes, total_bytes))
847 }
848
849 #[cfg(windows)]
851 fn GetDiskSpaceWindows(&self, path:&Path) -> Result<u64> {
852 use std::os::windows::ffi::OsStrExt;
853
854 use windows::Win32::Storage::FileSystem::GetDiskFreeSpaceExW;
855
856 dev_log!("update", "[DownloadManager] Checking disk space at: {}", path.display());
857
858 let path_str:Vec<u16> = path.as_os_str().encode_wide().chain(std::iter::once(0)).collect();
860
861 let mut free_bytes_available:u64 = 0;
862
863 let mut total_bytes:u64 = 0;
864
865 let mut total_free_bytes:u64 = 0;
866
867 let result = unsafe {
868 GetDiskFreeSpaceExW(
869 windows::core::PCWSTR(path_str.as_ptr()),
870 &mut free_bytes_available as *mut _ as _,
871 &mut total_bytes as *mut _ as _,
872 &mut total_free_bytes as *mut _ as _,
873 )
874 };
875
876 if !result.as_bool() {
877 let err = std::io::Error::last_os_error();
878
879 return Err(AirError::FileSystem(format!("Failed to get disk space: {}", err)));
880 }
881
882 dev_log!(
883 "update",
884 "[DownloadManager] Disk space at {}: {} bytes available, {} bytes total",
885 path.display(),
886 free_bytes_available,
887 total_bytes
888 );
889
890 Ok(free_bytes_available)
891 }
892
893 fn FindMountPoint(&self, path:&Path) -> Result<PathBuf> {
895 #[cfg(unix)]
896 {
897 let mut current = path
898 .canonicalize()
899 .map_err(|e| AirError::FileSystem(format!("Failed to canonicalize path: {}", e)))?;
900
901 loop {
902 if current.as_os_str().is_empty() || current == Path::new("/") {
903 return Ok(PathBuf::from("/"));
904 }
905
906 let metadata = std::fs::metadata(¤t)
907 .map_err(|e| AirError::FileSystem(format!("Failed to get metadata: {}", e)))?;
908
909 #[cfg(unix)]
911 let CurrentDevice = {
912 use std::os::unix::fs::MetadataExt;
913
914 metadata.dev()
915 };
916
917 #[cfg(not(unix))]
918 let CurrentDevice = 0u64; let parent = current.parent();
921
922 if let Some(parent_path) = parent {
923 let ParentMetadata = std::fs::metadata(parent_path)
924 .map_err(|e| AirError::FileSystem(format!("Failed to get parent metadata: {}", e)))?;
925
926 #[cfg(unix)]
927 let ParentDevice = {
928 use std::os::unix::fs::MetadataExt;
929
930 ParentMetadata.dev()
931 };
932
933 #[cfg(not(unix))]
934 let ParentDevice = 0u64; if ParentDevice != CurrentDevice {
937 return Ok(current);
938 }
939 } else {
940 return Ok(current);
941 }
942
943 current.pop();
944 }
945 }
946
947 #[cfg(windows)]
948 {
949 let PathStr = path.to_string_lossy();
951
952 if PathStr.len() >= 3 && PathStr.chars().nth(1) == Some(':') {
953 return Ok(PathBuf::from(&PathStr[..3]));
954 }
955
956 Ok(PathBuf::from("C:\\"))
957 }
958
959 #[cfg(not(any(unix, windows)))]
960 {
961 Ok(path.to_path_buf())
962 }
963 }
964
965 async fn DownloadWithRetry(
967 &self,
968
969 DownloadId:&str,
970
971 url:&str,
972
973 destination:&PathBuf,
974
975 config:&DownloadConfig,
976 ) -> Result<DownloadResult> {
977 let RetryPolicy = crate::Resilience::RetryPolicy {
978 MaxRetries:config.MaxRetries,
979
980 InitialIntervalMs:1000,
981
982 MaxIntervalMs:32000,
983
984 BackoffMultiplier:2.0,
985
986 JitterFactor:0.1,
987
988 BudgetPerMinute:100,
989
990 ErrorClassification:std::collections::HashMap::new(),
991 };
992
993 let RetryManager = crate::Resilience::RetryManager::new(RetryPolicy.clone());
994
995 let CircuitBreaker = crate::Resilience::CircuitBreaker::new(
996 "downloader".to_string(),
997 crate::Resilience::CircuitBreakerConfig::default(),
998 );
999
1000 let mut attempt = 0;
1001
1002 loop {
1003 if CircuitBreaker.GetState().await == crate::Resilience::CircuitState::Open {
1005 if !CircuitBreaker.AttemptRecovery().await {
1006 return Err(AirError::Network(
1007 "Circuit breaker is open, too many recent failures".to_string(),
1008 ));
1009 }
1010 }
1011
1012 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1014 if status.status == DownloadState::Cancelled {
1015 return Err(AirError::Network("Download cancelled".to_string()));
1016 }
1017 }
1018
1019 match self.PerformDownload(DownloadId, url, destination, config).await {
1020 Ok(file_info) => {
1021 if let Some(ref ExpectedChecksum) = ExpectedChecksumFromConfig(config) {
1023 self.UpdateDownloadStatus(DownloadId, DownloadState::Verifying, Some(100.0), None)
1024 .await?;
1025
1026 if let Err(e) = self.VerifyChecksum(destination, ExpectedChecksum).await {
1027 dev_log!(
1028 "update",
1029 "warn: [DownloadManager] Checksum verification failed [ID: {}]: {}",
1030 DownloadId,
1031 e
1032 );
1033
1034 CircuitBreaker.RecordFailure().await;
1035
1036 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
1037 attempt += 1;
1038
1039 let delay = RetryManager.CalculateRetryDelay(attempt);
1040
1041 dev_log!(
1042 "update",
1043 "[DownloadManager] Retrying download [ID: {}] (attempt {}/{}) after {:?}",
1044 DownloadId,
1045 attempt + 1,
1046 config.MaxRetries + 1,
1047 delay
1048 );
1049
1050 tokio::time::sleep(delay).await;
1051
1052 continue;
1053 } else {
1054 return Err(AirError::Network(format!(
1055 "Checksum verification failed after {} retries: {}",
1056 attempt, e
1057 )));
1058 }
1059 }
1060 }
1061
1062 CircuitBreaker.RecordSuccess().await;
1063
1064 return Ok(file_info);
1065 },
1066
1067 Err(e) => {
1068 CircuitBreaker.RecordFailure().await;
1069
1070 if attempt < config.MaxRetries && RetryManager.CanRetry("downloader").await {
1071 attempt += 1;
1072
1073 dev_log!(
1074 "update",
1075 "warn: [DownloadManager] Download failed [ID: {}], retrying (attempt {}/{}): {}",
1076 DownloadId,
1077 attempt + 1,
1078 config.MaxRetries + 1,
1079 e
1080 );
1081
1082 let delay = RetryManager.CalculateRetryDelay(attempt);
1083
1084 tokio::time::sleep(delay).await;
1085 } else {
1086 return Err(e);
1087 }
1088 },
1089 }
1090 }
1091 }
1092
1093 async fn PerformDownload(
1095 &self,
1096
1097 DownloadId:&str,
1098
1099 url:&str,
1100
1101 destination:&PathBuf,
1102
1103 config:&DownloadConfig,
1104 ) -> Result<DownloadResult> {
1105 let _concurrent_permit = self
1107 .ConcurrentLimiter
1108 .acquire()
1109 .await
1110 .map_err(|e| AirError::Internal(format!("Failed to acquire download permit: {}", e)))?;
1111
1112 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(0.0), None)
1113 .await?;
1114
1115 let TempDestination = destination.with_extension("tmp");
1117
1118 let mut ExistingSize:u64 = 0;
1120
1121 if TempDestination.exists() {
1122 if let Ok(metadata) = tokio::fs::metadata(&TempDestination).await {
1123 ExistingSize = metadata.len();
1124
1125 dev_log!("update", "[DownloadManager] Resuming download from {} bytes", ExistingSize);
1126 }
1127 }
1128
1129 let mut req = self.client.get(url).timeout(Duration::from_secs(config.TimeoutSecs));
1131
1132 if ExistingSize > 0 {
1133 let RangeHeader = format!("bytes={}-", ExistingSize);
1134
1135 req = req.header(reqwest::header::RANGE, RangeHeader);
1136
1137 req = req.header(reqwest::header::IF_MATCH, "*"); }
1139
1140 let response = req
1141 .send()
1142 .await
1143 .map_err(|e| AirError::Network(format!("Failed to start download: {}", e)))?;
1144
1145 let FinalUrl = response.url().clone();
1147
1148 let response = if FinalUrl.as_str() != url {
1149 dev_log!("update", "[DownloadManager] Redirected to: {}", FinalUrl);
1150
1151 response
1152 } else {
1153 response
1154 };
1155
1156 let StatusCode = response.status();
1158
1159 if !StatusCode.is_success() && StatusCode != reqwest::StatusCode::PARTIAL_CONTENT {
1160 return Err(AirError::Network(format!("Download failed with status: {}", StatusCode)));
1161 }
1162
1163 let TotalSize = if let Some(cl) = response.content_length() {
1165 if StatusCode == reqwest::StatusCode::PARTIAL_CONTENT {
1166 cl + ExistingSize
1167 } else {
1168 cl
1169 }
1170 } else {
1171 0
1172 };
1173
1174 if let Some(max_size) = config.MaxFileSize {
1176 if TotalSize > 0 && TotalSize > max_size {
1177 return Err(AirError::Network(format!(
1178 "File too large: {} bytes exceeds maximum allowed size: {} bytes",
1179 TotalSize, max_size
1180 )));
1181 }
1182 }
1183
1184 let mut file = tokio::fs::OpenOptions::new()
1186 .create(true)
1187 .append(true)
1188 .open(&TempDestination)
1189 .await
1190 .map_err(|e| AirError::FileSystem(format!("Failed to open destination file: {}", e)))?;
1191
1192 use tokio::io::AsyncWriteExt;
1193 use futures_util::StreamExt;
1194
1195 let mut downloaded = ExistingSize;
1196
1197 let mut LastProgressUpdate = Instant::now();
1198
1199 let BytesStream = response.bytes_stream();
1200
1201 tokio::pin!(BytesStream);
1202
1203 while let Some(result) = BytesStream.next().await {
1204 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1206 match status.status {
1207 DownloadState::Cancelled => {
1208 let _ = tokio::fs::remove_file(&TempDestination).await;
1210
1211 return Err(AirError::Network("Download cancelled".to_string()));
1212 },
1213
1214 DownloadState::Paused => {
1215 loop {
1217 tokio::time::sleep(Duration::from_millis(250)).await;
1218
1219 if let Some(s) = self.GetDownloadStatus(DownloadId).await {
1220 match s.status {
1221 DownloadState::Paused => continue,
1222
1223 DownloadState::Cancelled => {
1224 let _ = tokio::fs::remove_file(&TempDestination).await;
1225
1226 return Err(AirError::Network("Download cancelled".to_string()));
1227 },
1228
1229 _ => {
1230 dev_log!(
1231 "update",
1232 "[DownloadManager] Resuming paused download [ID: {}]",
1233 DownloadId
1234 );
1235
1236 break;
1237 },
1238 }
1239 } else {
1240 break;
1241 }
1242 }
1243 },
1244
1245 _ => {},
1246 }
1247 }
1248
1249 match result {
1250 Ok(chunk) => {
1251 let ChunkSize = chunk.len();
1253
1254 {
1255 let mut bucket = self.TokenBucket.write().await;
1256
1257 if let Err(e) = bucket.consume(ChunkSize as u64).await {
1258 dev_log!(
1259 "update",
1260 "warn: [DownloadManager] Bandwidth throttling error: {}, continuing anyway",
1261 e
1262 );
1263 }
1264 }
1265
1266 file.write_all(&chunk)
1267 .await
1268 .map_err(|e| AirError::FileSystem(format!("Failed to write file: {}", e)))?;
1269
1270 downloaded += ChunkSize as u64;
1271
1272 if LastProgressUpdate.elapsed() > Duration::from_millis(500) {
1274 LastProgressUpdate = Instant::now();
1275
1276 if TotalSize > 0 {
1277 let progress = (downloaded as f32 / TotalSize as f32) * 100.0;
1278
1279 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(progress), None)
1280 .await?;
1281 }
1282
1283 let rate = self.CalculateDownloadRate(DownloadId, downloaded).await;
1285
1286 self.UpdateDownloadRate(DownloadId, rate).await;
1287 }
1288 },
1289
1290 Err(e) => {
1291 if e.is_timeout() || e.is_connect() {
1293 dev_log!("update", "warn: [DownloadManager] Connection/timeout error, may retry: {}", e);
1294
1295 return Err(AirError::Network(format!("Network error: {}", e)));
1296 }
1297
1298 return Err(AirError::Network(format!("Failed to read response: {}", e)));
1299 },
1300 }
1301 }
1302
1303 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, Some(100.0), None)
1305 .await?;
1306
1307 file.flush()
1309 .await
1310 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
1311
1312 tokio::fs::rename(&TempDestination, destination)
1314 .await
1315 .map_err(|e| AirError::FileSystem(format!("Failed to commit download: {}", e)))?;
1316
1317 let checksum = self.CalculateChecksum(destination).await?;
1319
1320 self.UpdateActualChecksum(DownloadId, &checksum).await;
1322
1323 Ok(DownloadResult {
1324 path:destination.to_string_lossy().to_string(),
1325 size:downloaded,
1326 checksum,
1327 duration:Duration::from_secs(0),
1328 AverageRate:0,
1329 })
1330 }
1331
1332 pub async fn VerifyChecksum(&self, FilePath:&PathBuf, ExpectedChecksum:&str) -> Result<()> {
1334 if !FilePath.exists() {
1336 return Err(AirError::FileSystem(format!(
1337 "File not found for checksum verification: {}",
1338 FilePath.display()
1339 )));
1340 }
1341
1342 let ActualChecksum = self.ChecksumVerifier.CalculateSha256(FilePath).await?;
1343
1344 let NormalizedExpected = ExpectedChecksum.trim().to_lowercase().replace("sha256:", "");
1346
1347 let NormalizedActual = ActualChecksum.trim().to_lowercase();
1348
1349 if NormalizedActual != NormalizedExpected {
1350 dev_log!(
1351 "update",
1352 "error: [DownloadManager] Checksum mismatch for {}: expected {}, got {}",
1353 FilePath.display(),
1354 NormalizedExpected,
1355 NormalizedActual
1356 );
1357
1358 return Err(AirError::Network(format!(
1359 "Checksum verification failed: expected {}, got {}",
1360 NormalizedExpected, NormalizedActual
1361 )));
1362 }
1363
1364 dev_log!("update", "[DownloadManager] Checksum verified for file: {}", FilePath.display());
1365
1366 Ok(())
1367 }
1368
1369 pub async fn CalculateChecksum(&self, FilePath:&PathBuf) -> Result<String> {
1371 if !FilePath.exists() {
1373 return Err(AirError::FileSystem(format!(
1374 "File not found for checksum calculation: {}",
1375 FilePath.display()
1376 )));
1377 }
1378
1379 self.ChecksumVerifier.CalculateSha256(FilePath).await
1380 }
1381
1382 async fn RegisterDownload(
1384 &self,
1385
1386 DownloadId:&str,
1387
1388 url:&str,
1389
1390 destination:&PathBuf,
1391
1392 ExpectedChecksum:Option<String>,
1393 ) -> Result<()> {
1394 let mut downloads = self.ActiveDownloads.write().await;
1395
1396 let mut stats = self.statistics.write().await;
1397
1398 stats.ActiveDownloads += 1;
1399
1400 downloads.insert(
1401 DownloadId.to_string(),
1402 DownloadStatus {
1403 DownloadId:DownloadId.to_string(),
1404 url:url.to_string(),
1405 destination:destination.clone(),
1406 TotalSize:0,
1407 downloaded:0,
1408 progress:0.0,
1409 status:DownloadState::Pending,
1410 error:None,
1411 StartedAt:Some(chrono::Utc::now()),
1412 CompletedAt:None,
1413 ChunksCompleted:0,
1414 TotalChunks:1,
1415 DownloadRateBytesPerSec:0,
1416 ExpectedChecksum:ExpectedChecksum.clone(),
1417 ActualChecksum:None,
1418 },
1419 );
1420
1421 Ok(())
1422 }
1423
1424 async fn UpdateDownloadStatus(
1426 &self,
1427
1428 DownloadId:&str,
1429
1430 status:DownloadState,
1431
1432 progress:Option<f32>,
1433
1434 error:Option<String>,
1435 ) -> Result<()> {
1436 let mut downloads = self.ActiveDownloads.write().await;
1437
1438 if let Some(download) = downloads.get_mut(DownloadId) {
1439 if status == DownloadState::Completed || status == DownloadState::Failed {
1440 download.CompletedAt = Some(chrono::Utc::now());
1441 }
1442
1443 download.status = status;
1444
1445 if let Some(progress) = progress {
1446 download.progress = progress;
1447 }
1448
1449 download.error = error;
1450 }
1451
1452 Ok(())
1453 }
1454
1455 async fn UpdateDownloadRate(&self, DownloadId:&str, rate:u64) {
1457 let mut downloads = self.ActiveDownloads.write().await;
1458
1459 if let Some(download) = downloads.get_mut(DownloadId) {
1460 download.DownloadRateBytesPerSec = rate;
1461 }
1462 }
1463
1464 async fn UpdateActualChecksum(&self, DownloadId:&str, checksum:&str) {
1466 let mut downloads = self.ActiveDownloads.write().await;
1467
1468 if let Some(download) = downloads.get_mut(DownloadId) {
1469 download.ActualChecksum = Some(checksum.to_string());
1470 }
1471 }
1472
1473 async fn CalculateDownloadRate(&self, DownloadId:&str, CurrentBytes:u64) -> u64 {
1475 let downloads = self.ActiveDownloads.read().await;
1476
1477 if let Some(download) = downloads.get(DownloadId) {
1478 if let Some(StartedAt) = download.StartedAt {
1479 let elapsed = chrono::Utc::now().signed_duration_since(StartedAt);
1480
1481 let ElapsedSecs = elapsed.num_seconds() as u64;
1482
1483 if ElapsedSecs > 0 {
1484 return CurrentBytes / ElapsedSecs;
1485 }
1486 }
1487 }
1488
1489 0
1490 }
1491
1492 async fn UpdateStatistics(&self, success:bool, bytes:u64, duration:Duration) {
1494 let mut stats = self.statistics.write().await;
1495
1496 if success {
1497 stats.SuccessfulDownloads += 1;
1498
1499 stats.TotalBytesDownloaded += bytes;
1500
1501 stats.TotalDownloadTimeSecs += duration.as_secs_f64();
1502
1503 if stats.TotalDownloadTimeSecs > 0.0 {
1504 stats.AverageDownloadRate = stats.TotalBytesDownloaded as f64 / stats.TotalDownloadTimeSecs
1505 }
1506
1507 let CurrentRate = if duration.as_secs_f64() > 0.0 {
1509 (bytes as f64 / duration.as_secs_f64()) as u64
1510 } else {
1511 0
1512 };
1513
1514 if CurrentRate > stats.PeakDownloadRate {
1515 stats.PeakDownloadRate = CurrentRate;
1516 }
1517 } else {
1518 stats.FailedDownloads += 1;
1519 }
1520
1521 stats.TotalDownloads += 1;
1522
1523 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1524 }
1525
1526 pub async fn GetDownloadStatus(&self, DownloadId:&str) -> Option<DownloadStatus> {
1528 let downloads = self.ActiveDownloads.read().await;
1529
1530 downloads.get(DownloadId).cloned()
1531 }
1532
1533 pub async fn GetAllDownloads(&self) -> Vec<DownloadStatus> {
1535 let downloads = self.ActiveDownloads.read().await;
1536
1537 downloads.values().cloned().collect()
1538 }
1539
1540 pub async fn CancelDownload(&self, DownloadId:&str) -> Result<()> {
1542 dev_log!("update", "[DownloadManager] Cancelling download [ID: {}]", DownloadId);
1543
1544 self.UpdateDownloadStatus(DownloadId, DownloadState::Cancelled, None, None)
1545 .await?;
1546
1547 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1549 let TempPath = status.destination.with_extension("tmp");
1550
1551 if TempPath.exists() {
1552 let _ = tokio::fs::remove_file(&TempPath).await;
1553 }
1554 }
1555
1556 {
1558 let mut stats = self.statistics.write().await;
1559
1560 stats.CancelledDownloads += 1;
1561
1562 stats.ActiveDownloads = stats.ActiveDownloads.saturating_sub(1);
1563 }
1564
1565 Ok(())
1566 }
1567
1568 pub async fn PauseDownload(&self, DownloadId:&str) -> Result<()> {
1570 self.UpdateDownloadStatus(DownloadId, DownloadState::Paused, None, None).await?;
1571
1572 dev_log!("update", "[DownloadManager] Download paused [ID: {}]", DownloadId);
1573
1574 Ok(())
1575 }
1576
1577 pub async fn ResumeDownload(&self, DownloadId:&str) -> Result<()> {
1579 if let Some(status) = self.GetDownloadStatus(DownloadId).await {
1580 if status.status == DownloadState::Paused {
1581 self.UpdateDownloadStatus(DownloadId, DownloadState::Resuming, None, None)
1582 .await?;
1583
1584 self.UpdateDownloadStatus(DownloadId, DownloadState::Downloading, None, None)
1586 .await?;
1587
1588 dev_log!("update", "[DownloadManager] Download resumed [ID: {}]", DownloadId);
1589 } else {
1590 return Err(AirError::Network("Can only resume paused downloads".to_string()));
1591 }
1592 } else {
1593 return Err(AirError::Network("Download not found".to_string()));
1594 }
1595
1596 Ok(())
1597 }
1598
1599 pub async fn GetActiveDownloadCount(&self) -> usize {
1601 let downloads = self.ActiveDownloads.read().await;
1602
1603 downloads
1604 .iter()
1605 .filter(|(_, s)| {
1606 matches!(
1607 s.status,
1608 DownloadState::Downloading | DownloadState::Verifying | DownloadState::Resuming
1609 )
1610 })
1611 .count()
1612 }
1613
1614 pub async fn GetStatistics(&self) -> DownloadStatistics {
1616 let stats = self.statistics.read().await;
1617
1618 stats.clone()
1619 }
1620
1621 pub async fn QueueDownload(
1623 &self,
1624
1625 url:String,
1626
1627 destination:String,
1628
1629 checksum:String,
1630
1631 priority:DownloadPriority,
1632 ) -> Result<String> {
1633 let DownloadId = Utility::GenerateRequestId();
1634
1635 let destination = if destination.is_empty() {
1636 let filename = url.split('/').last().unwrap_or("download.bin");
1637
1638 self.CacheDirectory.join(filename)
1639 } else {
1640 ConfigurationManager::ExpandPath(&destination)?
1641 };
1642
1643 let queued_download = QueuedDownload {
1644 DownloadId:DownloadId.clone(),
1645
1646 url,
1647
1648 destination,
1649
1650 checksum,
1651
1652 priority,
1653
1654 AddedAt:chrono::Utc::now(),
1655
1656 MaxFileSize:None,
1657
1658 ValidateDiskSpace:true,
1659 };
1660
1661 let mut queue = self.DownloadQueue.write().await;
1662
1663 queue.push_back(queued_download);
1664
1665 queue.make_contiguous().sort_by(|a, b| {
1667 match b.priority.cmp(&a.priority) {
1668 std::cmp::Ordering::Equal => {
1669 a.AddedAt.cmp(&b.AddedAt)
1671 },
1672 order => order,
1673 }
1674 });
1675
1676 {
1677 let mut stats = self.statistics.write().await;
1678
1679 stats.QueuedDownloads += 1;
1680 }
1681
1682 dev_log!(
1683 "update",
1684 "[DownloadManager] Download queued [ID: {}] with priority {:?}",
1685 DownloadId,
1686 priority
1687 );
1688
1689 Ok(DownloadId)
1690 }
1691
1692 pub async fn ProcessQueue(&self) -> Result<Option<String>> {
1694 let mut queue = self.DownloadQueue.write().await;
1695
1696 if let Some(queued) = queue.pop_front() {
1697 let download_id = queued.DownloadId.clone();
1698
1699 drop(queue); let config = DownloadConfig {
1702 url:queued.url.clone(),
1703
1704 destination:queued.destination.to_string_lossy().to_string(),
1705
1706 checksum:queued.checksum.clone(),
1707
1708 priority:queued.priority,
1709
1710 MaxFileSize:queued.MaxFileSize,
1711
1712 ValidateDiskSpace:queued.ValidateDiskSpace,
1713 ..Default::default()
1714 };
1715
1716 {
1717 let mut stats = self.statistics.write().await;
1718
1719 stats.QueuedDownloads = stats.QueuedDownloads.saturating_sub(1);
1720 }
1721
1722 let manager = self.clone();
1724
1725 let download_id_clone = download_id.clone();
1726
1727 tokio::spawn(async move {
1728 if let Err(e) = manager.DownloadFileWithConfig(config).await {
1729 dev_log!(
1730 "update",
1731 "error: [DownloadManager] Queued download failed [ID: {}]: {}",
1732 download_id_clone,
1733 e
1734 ); let _ = manager
1737 .UpdateDownloadStatus(&download_id_clone, DownloadState::Failed, None, Some(e.to_string()))
1738 .await;
1739 }
1740 });
1741
1742 Ok(Some(download_id))
1743 } else {
1744 Ok(None)
1745 }
1746 }
1747
1748 pub async fn StartBackgroundTasks(&self) -> Result<tokio::task::JoinHandle<()>> {
1750 let manager = self.clone();
1751
1752 let handle = tokio::spawn(async move {
1753 manager.BackgroundTaskLoop().await;
1754 });
1755
1756 dev_log!("update", "[DownloadManager] Background tasks started");
1757
1758 Ok(handle)
1759 }
1760
1761 async fn BackgroundTaskLoop(&self) {
1763 let mut interval = tokio::time::interval(Duration::from_secs(60));
1764
1765 loop {
1766 interval.tick().await;
1767
1768 if let Err(e) = self.ProcessQueue().await {
1770 dev_log!("update", "error: [DownloadManager] Queue processing error: {}", e);
1771 }
1772
1773 self.CleanupCompletedDownloads().await;
1775
1776 if let Err(e) = self.CleanupCache().await {
1778 dev_log!("update", "error: [DownloadManager] Cache cleanup failed: {}", e);
1779 }
1780 }
1781 }
1782
1783 async fn CleanupCompletedDownloads(&self) {
1785 let mut downloads = self.ActiveDownloads.write().await;
1786
1787 let mut cleaned_count = 0;
1788
1789 downloads.retain(|_, download| {
1790 let is_final = matches!(
1791 download.status,
1792 DownloadState::Completed | DownloadState::Failed | DownloadState::Cancelled
1793 );
1794
1795 if is_final {
1796 cleaned_count += 1;
1797 }
1798
1799 !is_final
1800 });
1801
1802 if cleaned_count > 0 {
1803 dev_log!("update", "[DownloadManager] Cleaned up {} completed downloads", cleaned_count);
1804 }
1805 }
1806
1807 async fn CleanupCache(&self) -> Result<()> {
1809 let max_age_days = 7;
1810
1811 let now = chrono::Utc::now();
1812
1813 let mut entries = tokio::fs::read_dir(&self.CacheDirectory)
1814 .await
1815 .map_err(|e| AirError::FileSystem(format!("Failed to read cache directory: {}", e)))?;
1816
1817 let mut cleaned_count = 0;
1818
1819 while let Some(entry) = entries
1820 .next_entry()
1821 .await
1822 .map_err(|e| AirError::FileSystem(format!("Failed to read cache entry: {}", e)))?
1823 {
1824 let metadata = entry
1825 .metadata()
1826 .await
1827 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
1828
1829 if metadata.is_file() {
1830 let path = entry.path();
1831
1832 let IsActive = {
1834 let downloads = self.ActiveDownloads.read().await;
1835
1836 downloads.values().any(|d| d.destination == path)
1837 };
1838
1839 if IsActive {
1840 continue;
1841 }
1842
1843 let modified = metadata
1844 .modified()
1845 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
1846
1847 let modified_time = chrono::DateTime::<chrono::Utc>::from(modified);
1848
1849 let age = now.signed_duration_since(modified_time);
1850
1851 if age.num_days() > max_age_days {
1852 match tokio::fs::remove_file(&path).await {
1853 Ok(_) => {
1854 dev_log!(
1855 "update",
1856 "[DownloadManager] Removed old cache file: {}",
1857 entry.file_name().to_string_lossy()
1858 );
1859
1860 cleaned_count += 1;
1861 },
1862
1863 Err(e) => {
1864 dev_log!(
1865 "update",
1866 "warn: [DownloadManager] Failed to remove cache file {}: {}",
1867 entry.file_name().to_string_lossy(),
1868 e
1869 );
1870 },
1871 }
1872 }
1873 }
1874 }
1875
1876 if cleaned_count > 0 {
1877 dev_log!("update", "[DownloadManager] Cleaned up {} old cache files", cleaned_count);
1878 }
1879
1880 Ok(())
1881 }
1882
1883 pub async fn StopBackgroundTasks(&self) {
1885 dev_log!("update", "[DownloadManager] Stopping background tasks");
1886
1887 let ids_to_cancel:Vec<String> = {
1889 let downloads = self.ActiveDownloads.read().await;
1890
1891 downloads
1892 .iter()
1893 .filter(|(_, s)| matches!(s.status, DownloadState::Downloading))
1894 .map(|(id, _)| id.clone())
1895 .collect()
1896 };
1897
1898 for id in ids_to_cancel {
1900 let _ = self.CancelDownload(&id).await;
1901 }
1902
1903 let _ = self
1905 .AppState
1906 .UpdateServiceStatus("downloader", crate::ApplicationState::ServiceStatus::Stopped)
1907 .await;
1908 }
1909
1910 pub async fn SetBandwidthLimit(&mut self, mb_per_sec:usize) {
1923 let bytes_per_sec = (mb_per_sec.max(1).min(1000) * 1024 * 1024) as u64;
1924
1925 {
1927 let mut bucket = self.TokenBucket.write().await;
1928
1929 bucket.set_rate(bytes_per_sec);
1930 }
1931
1932 let permits = mb_per_sec.max(1).min(1000);
1934
1935 self.BandwidthLimiter = Arc::new(Semaphore::new(permits));
1936
1937 dev_log!(
1938 "update",
1939 "[DownloadManager] Bandwidth limit set to {} MB/s ({} bytes/s)",
1940 mb_per_sec,
1941 bytes_per_sec
1942 );
1943 }
1944
1945 pub async fn SetMaxConcurrentDownloads(&mut self, max:usize) {
1949 let permits = max.max(1).min(20);
1950
1951 self.ConcurrentLimiter = Arc::new(Semaphore::new(permits));
1952
1953 dev_log!("update", "[DownloadManager] Max concurrent downloads set to {}", max);
1954 }
1955}
1956
1957impl Clone for DownloadManager {
1958 fn clone(&self) -> Self {
1959 Self {
1960 AppState:self.AppState.clone(),
1961
1962 ActiveDownloads:self.ActiveDownloads.clone(),
1963
1964 DownloadQueue:self.DownloadQueue.clone(),
1965
1966 CacheDirectory:self.CacheDirectory.clone(),
1967
1968 client:self.client.clone(),
1969
1970 ChecksumVerifier:self.ChecksumVerifier.clone(),
1971
1972 BandwidthLimiter:self.BandwidthLimiter.clone(),
1973
1974 TokenBucket:self.TokenBucket.clone(),
1975
1976 ConcurrentLimiter:self.ConcurrentLimiter.clone(),
1977
1978 statistics:self.statistics.clone(),
1979 }
1980 }
1981}
1982
1983impl Default for DownloadStatistics {
1984 fn default() -> Self {
1985 Self {
1986 TotalDownloads:0,
1987
1988 SuccessfulDownloads:0,
1989
1990 FailedDownloads:0,
1991
1992 CancelledDownloads:0,
1993
1994 TotalBytesDownloaded:0,
1995
1996 TotalDownloadTimeSecs:0.0,
1997
1998 AverageDownloadRate:0.0,
1999
2000 PeakDownloadRate:0,
2001
2002 ActiveDownloads:0,
2003
2004 QueuedDownloads:0,
2005 }
2006 }
2007}
2008
2009fn ExpectedChecksumFromConfig(config:&DownloadConfig) -> Option<&str> {
2011 if config.checksum.is_empty() { None } else { Some(&config.checksum) }
2012}
2013
2014#[derive(Debug, Clone)]
2016struct ChunkInfo {
2017 start:u64,
2018
2019 end:u64,
2020
2021 downloaded:u64,
2022
2023 temp_path:PathBuf,
2024}
2025
2026#[derive(Debug)]
2028struct ParallelDownloadResult {
2029 chunks:Vec<ChunkInfo>,
2030
2031 total_size:u64,
2032}
2033
2034impl DownloadManager {
2071 pub async fn DownloadFileWithChunks(
2081 &self,
2082
2083 url:String,
2084
2085 destination:String,
2086
2087 checksum:String,
2088
2089 chunk_size_mb:usize,
2090 ) -> Result<DownloadResult> {
2091 dev_log!(
2092 "update",
2093 "[DownloadManager] Starting chunked download - URL: {}, Chunk size: {} MB",
2094 url,
2095 chunk_size_mb
2096 );
2097
2098 let sanitized_url = Self::ValidateAndSanitizeUrl(&url)?;
2100
2101 let total_size = self.GetRemoteFileSize(&sanitized_url).await?;
2103
2104 dev_log!("update", "[DownloadManager] Remote file size: {} bytes", total_size);
2105
2106 let chunk_threshold = 50 * 1024 * 1024; if total_size < chunk_threshold {
2110 dev_log!(
2111 "update",
2112 "[DownloadManager] File too small for chunked download, using normal download"
2113 );
2114
2115 return self.DownloadFile(url, destination, checksum).await;
2116 }
2117
2118 let chunk_size = (chunk_size_mb * 1024 * 1024) as u64;
2120
2121 let num_chunks = ((total_size + chunk_size - 1) / chunk_size) as usize;
2122
2123 let num_concurrent = num_chunks.min(4); dev_log!(
2126 "update",
2127 "[DownloadManager] Downloading in {} chunks ({} concurrent)",
2128 num_chunks,
2129 num_concurrent
2130 );
2131
2132 let DownloadId = Utility::GenerateRequestId();
2133
2134 let DestinationPath = if destination.is_empty() {
2135 let filename = sanitized_url.split('/').last().unwrap_or("download.bin");
2136
2137 self.CacheDirectory.join(filename)
2138 } else {
2139 ConfigurationManager::ExpandPath(&destination)?
2140 };
2141
2142 let temp_dir = DestinationPath.with_extension("chunks");
2144
2145 tokio::fs::create_dir_all(&temp_dir)
2146 .await
2147 .map_err(|e| AirError::FileSystem(format!("Failed to create temp directory: {}", e)))?;
2148
2149 let mut chunks = Vec::with_capacity(num_chunks);
2151
2152 for i in 0..num_chunks {
2153 let start = (i as u64) * chunk_size;
2154
2155 let end = std::cmp::min(start + chunk_size - 1, total_size - 1);
2156
2157 chunks.push(ChunkInfo { start, end, downloaded:0, temp_path:temp_dir.join(format!("chunk_{:04}", i)) });
2158 }
2159
2160 let downloaded_tracker = Arc::new(RwLock::new(0u64));
2162
2163 let completed_tracker = Arc::new(RwLock::new(0usize));
2164
2165 let mut handles = Vec::new();
2167
2168 for (i, chunk) in chunks.iter().enumerate() {
2169 let manager = self.clone();
2170
2171 let url_clone = sanitized_url.clone();
2172
2173 let chunk_clone = chunk.clone();
2174
2175 let downloaded_tracker = downloaded_tracker.clone();
2176
2177 let completed_tracker = completed_tracker.clone();
2178
2179 let _Did = DownloadId.clone();
2180
2181 let handle = tokio::spawn(async move {
2182 manager.DownloadChunk(&url_clone, &chunk_clone, i).await?;
2183
2184 {
2186 let mut downloaded = downloaded_tracker.write().await;
2187
2188 let mut completed = completed_tracker.write().await;
2189
2190 *downloaded += chunk_clone.end - chunk_clone.start + 1;
2191 *completed += 1;
2192
2193 let progress = (*downloaded as f32 / total_size as f32) * 100.0;
2194
2195 dev_log!(
2196 "update",
2197 "Chunk {} completed ({}/{}) - Progress: {:.1}%",
2198 i + 1,
2199 *completed,
2200 num_chunks,
2201 progress
2202 );
2203 }
2204
2205 Ok::<_, AirError>(())
2206 });
2207
2208 if (i + 1) % num_concurrent == 0 {
2210 for handle in handles.drain(..) {
2211 handle.await??;
2212 }
2213 }
2214
2215 handles.push(handle);
2216 }
2217
2218 for handle in handles {
2220 handle.await??;
2221 }
2222
2223 dev_log!("update", "[DownloadManager] Reassembling chunks into final file");
2225
2226 self.ReassembleChunks(&chunks, &DestinationPath).await?;
2227
2228 tokio::fs::remove_dir_all(&temp_dir).await.map_err(|e| {
2230 dev_log!("update", "warn: [DownloadManager] Failed to clean up temp directory: {}", e);
2231
2232 AirError::FileSystem(e.to_string())
2233 })?;
2234
2235 if !checksum.is_empty() {
2237 self.VerifyChecksum(&DestinationPath, &checksum).await?;
2238 }
2239
2240 let actual_checksum = self.CalculateChecksum(&DestinationPath).await?;
2241
2242 dev_log!("update", "[DownloadManager] Chunked download completed successfully");
2243
2244 Ok(DownloadResult {
2245 path:DestinationPath.to_string_lossy().to_string(),
2246 size:total_size,
2247 checksum:actual_checksum,
2248 duration:Duration::from_secs(0),
2249 AverageRate:0,
2250 })
2251 }
2252
2253 async fn GetRemoteFileSize(&self, url:&str) -> Result<u64> {
2255 let response = self
2256 .client
2257 .head(url)
2258 .timeout(Duration::from_secs(30))
2259 .send()
2260 .await
2261 .map_err(|e| AirError::Network(format!("Failed to get file size: {}", e)))?;
2262
2263 if !response.status().is_success() {
2264 return Err(AirError::Network(format!("Failed to get file size: {}", response.status())));
2265 }
2266
2267 response
2268 .content_length()
2269 .ok_or_else(|| AirError::Network("Content-Length header not found".to_string()))
2270 }
2271
2272 async fn DownloadChunk(&self, url:&str, chunk:&ChunkInfo, chunk_index:usize) -> Result<()> {
2274 dev_log!(
2275 "update",
2276 "[DownloadManager] Downloading chunk {} (bytes {}-{})",
2277 chunk_index,
2278 chunk.start,
2279 chunk.end
2280 );
2281
2282 let range_header = format!("bytes={}-{}", chunk.start, chunk.end);
2283
2284 let response = self
2285 .client
2286 .get(url)
2287 .header(reqwest::header::RANGE, range_header)
2288 .timeout(Duration::from_secs(300))
2289 .send()
2290 .await
2291 .map_err(|e| AirError::Network(format!("Failed to start chunk download: {}", e)))?;
2292
2293 if response.status() != reqwest::StatusCode::PARTIAL_CONTENT {
2294 return Err(AirError::Network(format!(
2295 "Chunk download failed with status: {}",
2296 response.status()
2297 )));
2298 }
2299
2300 let bytes = response
2302 .bytes()
2303 .await
2304 .map_err(|e| AirError::Network(format!("Failed to read chunk bytes: {}", e)))?;
2305
2306 tokio::fs::write(&chunk.temp_path, &bytes)
2307 .await
2308 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk: {}", e)))?;
2309
2310 dev_log!(
2311 "update",
2312 "[DownloadManager] Chunk {} downloaded: {} bytes",
2313 chunk_index,
2314 bytes.len()
2315 );
2316
2317 Ok(())
2318 }
2319
2320 async fn ReassembleChunks(&self, chunks:&[ChunkInfo], destination:&Path) -> Result<()> {
2322 use tokio::io::AsyncWriteExt;
2323
2324 let mut file = tokio::fs::File::create(destination)
2325 .await
2326 .map_err(|e| AirError::FileSystem(format!("Failed to create destination file: {}", e)))?;
2327
2328 let mut sorted_chunks:Vec<_> = chunks.iter().collect();
2330
2331 sorted_chunks.sort_by_key(|c| c.start);
2332
2333 for chunk in sorted_chunks {
2334 let contents = tokio::fs::read(&chunk.temp_path)
2335 .await
2336 .map_err(|e| AirError::FileSystem(format!("Failed to read chunk: {}", e)))?;
2337
2338 file.write_all(&contents)
2339 .await
2340 .map_err(|e| AirError::FileSystem(format!("Failed to write chunk to file: {}", e)))?;
2341
2342 dev_log!(
2343 "update",
2344 "[DownloadManager] Reassembled chunk (bytes {}-{})",
2345 chunk.start,
2346 chunk.end
2347 );
2348 }
2349
2350 file.flush()
2351 .await
2352 .map_err(|e| AirError::FileSystem(format!("Failed to flush file: {}", e)))?;
2353
2354 dev_log!("update", "[DownloadManager] All chunks reassembled successfully");
2355
2356 Ok(())
2357 }
2358}