AirLibrary/Indexing/Background/
StartWatcher.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69 sync::{Mutex, RwLock, Semaphore},
70 task::JoinHandle,
71};
72
73use crate::{AirError, ApplicationState::ApplicationState, Indexing::State::CreateState::FileIndex, Result, dev_log};
74
75const MAX_WATCH_PROCESSORS:usize = 5;
77
78pub struct BackgroundIndexerContext {
80 pub app_state:Arc<ApplicationState>,
82
83 pub file_index:Arc<RwLock<FileIndex>>,
85
86 pub corruption_detected:Arc<Mutex<bool>>,
88
89 pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
91
92 pub indexing_semaphore:Arc<Semaphore>,
94
95 pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
97}
98
99impl BackgroundIndexerContext {
100 pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
101 Self {
102 app_state,
103
104 file_index,
105
106 corruption_detected:Arc::new(Mutex::new(false)),
107
108 file_watcher:Arc::new(Mutex::new(None)),
109
110 indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
111
112 debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
113 }
114 }
115}
116
117pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
125 use notify::Watcher;
126
127 let index = context.file_index.clone();
128
129 let corruption_flag = context.corruption_detected.clone();
130
131 let config = context.app_state.Configuration.Indexing.clone();
132
133 let debounced_handler = context.debounced_handler.clone();
134
135 let mut watcher:notify::RecommendedWatcher = Watcher::new(
137 move |res:std::result::Result<notify::Event, notify::Error>| {
138 if let Ok(event) = res {
139 if *corruption_flag.blocking_lock() {
141 dev_log!(
142 "indexing",
143 "warn: [StartWatcher] Skipping file event - index marked as corrupted"
144 );
145
146 return;
147 }
148
149 let index = index.clone();
150
151 let _index = index.clone();
153
154 let debounced_handler = debounced_handler.clone();
155
156 let _config_clone = config.clone();
157
158 tokio::spawn(async move {
159 if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
161 for path in &event.paths {
162 if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
163 path,
164 &crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
165 ) {
166 debounced_handler.AddChange(path.clone(), change_type).await;
167 }
168 }
169 }
170 });
171 }
172 },
173 notify::Config::default(),
174 )
175 .map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
176
177 for path in &paths {
179 if path.exists() {
180 match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
181 Ok(()) => {
182 watcher
183 .watch(path, notify::RecursiveMode::Recursive)
184 .map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
185
186 dev_log!("indexing", "[StartWatcher] Watching path: {}", path.display());
187 },
188
189 Err(e) => {
190 dev_log!(
191 "indexing",
192 "warn: [StartWatcher] Skipping invalid watch path {}: {}",
193 path.display(),
194 e
195 );
196 },
197 }
198 }
199 }
200
201 *context.file_watcher.lock().await = Some(watcher);
202
203 dev_log!(
204 "indexing",
205 "[StartWatcher] File watcher started successfully for {} paths",
206 paths.len()
207 );
208
209 Ok(())
210}
211
212pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
214 tokio::spawn(async move {
215 dev_log!("indexing", "[StartWatcher] Debounce processor started");
216
217 let interval = Duration::from_millis(100); let debounce_cutoff = Duration::from_millis(500);
221
222 loop {
223 tokio::time::sleep(interval).await;
224
225 {
226 if *context.corruption_detected.lock().await {
228 dev_log!("indexing", "warn: [StartWatcher] Index corrupted, pausing debounce processing");
229
230 tokio::time::sleep(Duration::from_secs(5)).await;
231
232 continue;
233 }
234
235 let config = context.app_state.Configuration.Indexing.clone();
237
238 match context
239 .debounced_handler
240 .ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
241 .await
242 {
243 Ok(changes) => {
244 if !changes.is_empty() {
245 dev_log!("indexing", "[StartWatcher] Processed {} debounced changes", changes.len());
246 }
247 },
248 Err(e) => {
249 dev_log!("indexing", "error: [StartWatcher] Failed to process pending changes: {}", e);
250 },
251 }
252 }
253 }
254 })
255}
256
257pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
259 let config = &context.app_state.Configuration.Indexing;
260
261 if !config.Enabled {
262 dev_log!("indexing", "[StartWatcher] Background indexing disabled in configuration");
263
264 return Err(AirError::Configuration("Background indexing is disabled".to_string()));
265 }
266
267 let handle = tokio::spawn(BackgroundTask(context));
268
269 dev_log!("indexing", "[StartWatcher] Background tasks started");
270
271 Ok(handle)
272}
273
274pub async fn StopBackgroundTasks(_context:&BackgroundIndexerContext) {
276 dev_log!("indexing", "[StartWatcher] Stopping background tasks"); }
278
279pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
281 if let Some(watcher) = context.file_watcher.lock().await.take() {
282 drop(watcher);
283
284 dev_log!("indexing", "[StartWatcher] File watcher stopped");
285 }
286}
287
288async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
290 let config = context.app_state.Configuration.Indexing.clone();
291
292 let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
293
294 let mut interval = tokio::time::interval(interval);
295
296 dev_log!(
297 "indexing",
298 "[StartWatcher] Background indexing configured for {} minute intervals",
299 config.UpdateIntervalMinutes
300 );
301
302 loop {
303 interval.tick().await;
304
305 {
306 if *context.corruption_detected.lock().await {
308 dev_log!("indexing", "warn: [StartWatcher] Index corrupted, skipping background update");
309
310 continue;
311 }
312
313 dev_log!("indexing", "[StartWatcher] Running periodic background index...");
314
315 let directories = config.IndexDirectory.clone();
317
318 if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
319 {
320 dev_log!("indexing", "error: [StartWatcher] Background indexing failed: {}", e);
321 }
322 }
323 }
324}
325
326pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
328 let is_running = context.file_watcher.lock().await.is_some();
329
330 let pending_count = context.debounced_handler.PendingCount().await;
331
332 let is_corrupted = *context.corruption_detected.lock().await;
333
334 WatcherStatus { is_running, pending_count, is_corrupted }
335}
336
337#[derive(Debug, Clone)]
339pub struct WatcherStatus {
340 pub is_running:bool,
341
342 pub pending_count:usize,
343
344 pub is_corrupted:bool,
345}
346
347pub async fn StartAll(
349 context:Arc<BackgroundIndexerContext>,
350
351 watch_paths:Vec<PathBuf>,
352) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
353 let watcher_handle = if config_watch_enabled(&context) {
354 match StartFileWatcher(&context, watch_paths).await {
355 Ok(()) => {
356 Some(StartDebounceProcessor(context.clone()))
358 },
359
360 Err(e) => {
361 dev_log!("indexing", "error: [StartWatcher] Failed to start file watcher: {}", e);
362
363 None
364 },
365 }
366 } else {
367 None
368 };
369
370 let background_handle = match StartBackgroundTasks(context.clone()).await {
371 Ok(handle) => Some(handle),
372
373 Err(e) => {
374 dev_log!("indexing", "warn: [StartWatcher] Failed to start background tasks: {}", e);
375
376 None
377 },
378 };
379
380 Ok((watcher_handle, background_handle))
381}
382
383pub async fn StopAll(context:&BackgroundIndexerContext) {
385 StopBackgroundTasks(context).await;
386
387 StopFileWatcher(context).await;
388}
389
390fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }