Skip to main content

AirLibrary/Downloader/
RateLimit.rs

1//! Token-bucket rate limiter for per-download bandwidth throttling.
2//!
3//! Tokens represent bytes that can be consumed. They refill at `refill_rate`
4//! bytes/second up to `capacity`. Downloads call `consume(bytes).await` which
5//! parks the task until enough tokens are available, keeping the observed
6//! throughput at or below the configured limit while still allowing short
7//! bursts up to `capacity_factor` seconds' worth of data.
8
9use std::time::{Duration, Instant};
10
11use crate::Result;
12
13/// Token-bucket rate limiter. Stores the bucket state; wrap in `Arc<RwLock<_>>`
14/// to share across concurrent download tasks.
15#[derive(Debug)]
16pub struct TokenBucket {
17	/// Available tokens (bytes).
18	tokens:f64,
19
20	/// Burst capacity (bytes).
21	capacity:f64,
22
23	/// Bytes-per-second refill rate.
24	refill_rate:f64,
25
26	/// Monotonic timestamp of the last refill.
27	last_refill:Instant,
28}
29
30impl TokenBucket {
31	/// Create a bucket with `bytes_per_sec` sustained throughput and
32	/// a burst buffer of `capacity_factor` seconds' worth of tokens.
33	pub fn new(bytes_per_sec:u64, capacity_factor:f64) -> Self {
34		let refill_rate = bytes_per_sec as f64;
35
36		let capacity = refill_rate * capacity_factor;
37
38		Self { tokens:capacity, capacity, refill_rate, last_refill:Instant::now() }
39	}
40
41	/// Replenish tokens based on elapsed wall time. Call before every consume.
42	pub fn refill(&mut self) {
43		let elapsed = self.last_refill.elapsed().as_secs_f64();
44
45		if elapsed > 0.0 {
46			self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.capacity);
47
48			self.last_refill = Instant::now();
49		}
50	}
51
52	/// Consume up to `bytes` tokens immediately. Returns how many were
53	/// consumed. Does not block - the caller decides what to do with remaining
54	/// deficit.
55	pub fn try_consume(&mut self, bytes:u64) -> u64 {
56		self.refill();
57
58		let bytes = bytes as f64;
59
60		if self.tokens >= bytes {
61			self.tokens -= bytes;
62
63			bytes as u64
64		} else {
65			let available = self.tokens;
66
67			self.tokens = 0.0;
68
69			available as u64
70		}
71	}
72
73	/// Async-wait until `bytes` tokens are available, then consume them.
74	/// Polls at most every 100 ms so Tokio's timer wheel stays responsive.
75	pub async fn consume(&mut self, bytes:u64) -> Result<()> {
76		let bytes_needed = bytes as f64;
77
78		loop {
79			self.refill();
80
81			if self.tokens >= bytes_needed {
82				self.tokens -= bytes_needed;
83
84				return Ok(());
85			}
86
87			let tokens_needed = bytes_needed - self.tokens;
88
89			let wait_secs = (tokens_needed / self.refill_rate).min(0.1);
90
91			tokio::time::sleep(Duration::from_secs_f64(wait_secs)).await;
92		}
93	}
94
95	/// Adjust the sustained rate. Burst capacity is reset to 5× the new rate.
96	pub fn set_rate(&mut self, bytes_per_sec:u64) {
97		self.refill_rate = bytes_per_sec as f64;
98
99		self.capacity = self.refill_rate * 5.0;
100	}
101}