Skip to main content

AirLibrary/Indexing/Background/
StartWatcher.rs

1//! # StartWatcher
2//!
3//! ## File: Indexing/Background/StartWatcher.rs
4//!
5//! ## Role in Air Architecture
6//!
7//! Provides background task management for the File Indexer service,
8//! handling file watching startup and periodic indexing tasks.
9//!
10//! ## Primary Responsibility
11//!
12//! Start and manage background file watcher and periodic indexing tasks
13//! for the indexing service.
14//!
15//! ## Secondary Responsibilities
16//!
17//! - File watcher initialization and lifecycle management
18//! - Periodic background re-indexing
19//! - Watcher event debouncing
20//! - Background task cleanup
21//!
22//! ## Dependencies
23//!
24//! **External Crates:**
25//! - `notify` - File system watching
26//! - `tokio` - Async runtime for background tasks
27//!
28//! **Internal Modules:**
29//! - `crate::Result` - Error handling type
30//! - `crate::AirError` - Error types
31//! - `crate::ApplicationState::ApplicationState` - Application state
32//! - `super::super::FileIndexer` - Main file indexer
33//! - `super::WatchFile` - File watching operations
34//!
35//! ## Dependents
36//!
37//! - `Indexing::mod::FileIndexer` - Main file indexer implementation
38//!
39//! ## VSCode Pattern Reference
40//!
41//! Inspired by VSCode's background services in
42//! `src/vs/workbench/services/search/common/`
43//!
44//! ## Security Considerations
45//!
46//! - Path validation before watching
47//! - Watch path limits enforcement
48//! - Permission checking on watch paths
49//!
50//! ## Performance Considerations
51//!
52//! - Event debouncing prevents excessive re-indexing
53//! - Parallel processing of file changes
54//! - Efficient background task scheduling
55//!
56//! ## Error Handling Strategy
57//!
58//! Background tasks log errors and continue running, ensuring
59//! temporary failures don't stop the indexing service.
60//!
61//! ## Thread Safety
62//!
63//! Background tasks use Arc for shared state and async/await
64//! for safe concurrent operations.
65
66use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69	sync::{Mutex, RwLock, Semaphore},
70	task::JoinHandle,
71};
72
73use crate::{AirError, ApplicationState::ApplicationState, Indexing::State::CreateState::FileIndex, Result, dev_log};
74
75/// Maximum number of parallel watch event processors
76const MAX_WATCH_PROCESSORS:usize = 5;
77
78/// Background indexer context containing shared state
79pub struct BackgroundIndexerContext {
80	/// Application state reference
81	pub app_state:Arc<ApplicationState>,
82
83	/// File index
84	pub file_index:Arc<RwLock<FileIndex>>,
85
86	/// Corruption detected flag
87	pub corruption_detected:Arc<Mutex<bool>>,
88
89	/// File watcher (optional)
90	pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
91
92	/// Semaphore for limiting parallel operations
93	pub indexing_semaphore:Arc<Semaphore>,
94
95	/// Debounced event handler
96	pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
97}
98
99impl BackgroundIndexerContext {
100	pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
101		Self {
102			app_state,
103
104			file_index,
105
106			corruption_detected:Arc::new(Mutex::new(false)),
107
108			file_watcher:Arc::new(Mutex::new(None)),
109
110			indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
111
112			debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
113		}
114	}
115}
116
117/// Start file watcher for incremental indexing
118///
119/// Monitors file system changes and updates index in real-time.
120/// This enables:
121/// - Real-time search updates
122/// - Automatic reindexing of changed files
123/// - Removal of deleted files from index
124pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
125	use notify::Watcher;
126
127	let index = context.file_index.clone();
128
129	let corruption_flag = context.corruption_detected.clone();
130
131	let config = context.app_state.Configuration.Indexing.clone();
132
133	let debounced_handler = context.debounced_handler.clone();
134
135	// Create and start the watcher
136	let mut watcher:notify::RecommendedWatcher = Watcher::new(
137		move |res:std::result::Result<notify::Event, notify::Error>| {
138			if let Ok(event) = res {
139				// Check corruption flag before processing events
140				if *corruption_flag.blocking_lock() {
141					dev_log!(
142						"indexing",
143						"warn: [StartWatcher] Skipping file event - index marked as corrupted"
144					);
145
146					return;
147				}
148
149				let index = index.clone();
150
151				// Variables cloned for use in async task
152				let _index = index.clone();
153
154				let debounced_handler = debounced_handler.clone();
155
156				let _config_clone = config.clone();
157
158				tokio::spawn(async move {
159					// Convert event to change type and add to debounced handler
160					if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
161						for path in &event.paths {
162							if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
163								path,
164								&crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
165							) {
166								debounced_handler.AddChange(path.clone(), change_type).await;
167							}
168						}
169					}
170				});
171			}
172		},
173		notify::Config::default(),
174	)
175	.map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
176
177	// Watch all specified paths
178	for path in &paths {
179		if path.exists() {
180			match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
181				Ok(()) => {
182					watcher
183						.watch(path, notify::RecursiveMode::Recursive)
184						.map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
185
186					dev_log!("indexing", "[StartWatcher] Watching path: {}", path.display());
187				},
188
189				Err(e) => {
190					dev_log!(
191						"indexing",
192						"warn: [StartWatcher] Skipping invalid watch path {}: {}",
193						path.display(),
194						e
195					);
196				},
197			}
198		}
199	}
200
201	*context.file_watcher.lock().await = Some(watcher);
202
203	dev_log!(
204		"indexing",
205		"[StartWatcher] File watcher started successfully for {} paths",
206		paths.len()
207	);
208
209	Ok(())
210}
211
212/// Start the debounce processor task
213pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
214	tokio::spawn(async move {
215		dev_log!("indexing", "[StartWatcher] Debounce processor started");
216
217		let interval = Duration::from_millis(100); // Process every 100ms
218
219		// Debounce age cutoff
220		let debounce_cutoff = Duration::from_millis(500);
221
222		loop {
223			tokio::time::sleep(interval).await;
224
225			{
226				// Check corruption flag
227				if *context.corruption_detected.lock().await {
228					dev_log!("indexing", "warn: [StartWatcher] Index corrupted, pausing debounce processing");
229
230					tokio::time::sleep(Duration::from_secs(5)).await;
231
232					continue;
233				}
234
235				// Process pending changes
236				let config = context.app_state.Configuration.Indexing.clone();
237
238				match context
239					.debounced_handler
240					.ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
241					.await
242				{
243					Ok(changes) => {
244						if !changes.is_empty() {
245							dev_log!("indexing", "[StartWatcher] Processed {} debounced changes", changes.len());
246						}
247					},
248					Err(e) => {
249						dev_log!("indexing", "error: [StartWatcher] Failed to process pending changes: {}", e);
250					},
251				}
252			}
253		}
254	})
255}
256
257/// Start background tasks for periodic indexing
258pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
259	let config = &context.app_state.Configuration.Indexing;
260
261	if !config.Enabled {
262		dev_log!("indexing", "[StartWatcher] Background indexing disabled in configuration");
263
264		return Err(AirError::Configuration("Background indexing is disabled".to_string()));
265	}
266
267	let handle = tokio::spawn(BackgroundTask(context));
268
269	dev_log!("indexing", "[StartWatcher] Background tasks started");
270
271	Ok(handle)
272}
273
274/// Stop background tasks
275pub async fn StopBackgroundTasks(_context:&BackgroundIndexerContext) {
276	dev_log!("indexing", "[StartWatcher] Stopping background tasks"); // Tasks are cancelled when the task handle is dropped
277}
278
279/// Stop file watcher
280pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
281	if let Some(watcher) = context.file_watcher.lock().await.take() {
282		drop(watcher);
283
284		dev_log!("indexing", "[StartWatcher] File watcher stopped");
285	}
286}
287
288/// Background task for periodic indexing
289async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
290	let config = context.app_state.Configuration.Indexing.clone();
291
292	let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
293
294	let mut interval = tokio::time::interval(interval);
295
296	dev_log!(
297		"indexing",
298		"[StartWatcher] Background indexing configured for {} minute intervals",
299		config.UpdateIntervalMinutes
300	);
301
302	loop {
303		interval.tick().await;
304
305		{
306			// Check corruption flag
307			if *context.corruption_detected.lock().await {
308				dev_log!("indexing", "warn: [StartWatcher] Index corrupted, skipping background update");
309
310				continue;
311			}
312
313			dev_log!("indexing", "[StartWatcher] Running periodic background index...");
314
315			// Re-index configured directories
316			let directories = config.IndexDirectory.clone();
317
318			if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
319			{
320				dev_log!("indexing", "error: [StartWatcher] Background indexing failed: {}", e);
321			}
322		}
323	}
324}
325
326/// Get watcher status
327pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
328	let is_running = context.file_watcher.lock().await.is_some();
329
330	let pending_count = context.debounced_handler.PendingCount().await;
331
332	let is_corrupted = *context.corruption_detected.lock().await;
333
334	WatcherStatus { is_running, pending_count, is_corrupted }
335}
336
337/// Watcher status information
338#[derive(Debug, Clone)]
339pub struct WatcherStatus {
340	pub is_running:bool,
341
342	pub pending_count:usize,
343
344	pub is_corrupted:bool,
345}
346
347/// Start all background components (watcher and tasks)
348pub async fn StartAll(
349	context:Arc<BackgroundIndexerContext>,
350
351	watch_paths:Vec<PathBuf>,
352) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
353	let watcher_handle = if config_watch_enabled(&context) {
354		match StartFileWatcher(&context, watch_paths).await {
355			Ok(()) => {
356				// Start debounce processor
357				Some(StartDebounceProcessor(context.clone()))
358			},
359
360			Err(e) => {
361				dev_log!("indexing", "error: [StartWatcher] Failed to start file watcher: {}", e);
362
363				None
364			},
365		}
366	} else {
367		None
368	};
369
370	let background_handle = match StartBackgroundTasks(context.clone()).await {
371		Ok(handle) => Some(handle),
372
373		Err(e) => {
374			dev_log!("indexing", "warn: [StartWatcher] Failed to start background tasks: {}", e);
375
376			None
377		},
378	};
379
380	Ok((watcher_handle, background_handle))
381}
382
383/// Stop all background components
384pub async fn StopAll(context:&BackgroundIndexerContext) {
385	StopBackgroundTasks(context).await;
386
387	StopFileWatcher(context).await;
388}
389
390/// Check if watching is enabled in configuration
391fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }