Skip to main content

AirLibrary/Indexing/Watch/
WatchFile.rs

1//! # WatchFile
2//!
3//! ## File: Indexing/Watch/WatchFile.rs
4//!
5//! ## Role in Air Architecture
6//!
7//! Provides file watching functionality for the File Indexer service,
8//! handling file system events for incremental index updates.
9//!
10//! ## Primary Responsibility
11//!
12//! Handle file system change events and trigger index updates for
13//! created, modified, and deleted files.
14//!
15//! ## Secondary Responsibilities
16//!
17//! - File creation event handling
18//! - File modification event handling
19//! - File deletion event handling
20//! - Directory change event handling
21//! - Event debouncing for rapid changes
22//!
23//! ## Dependencies
24//!
25//! **External Crates:**
26//! - `notify` - File system watching
27//! - `tokio` - Async runtime for event handling
28//!
29//! **Internal Modules:**
30//! - `crate::Result` - Error handling type
31//! - `crate::AirError` - Error types
32//! - `super::super::FileIndex` - Index structure definitions
33//! - `super::super::Store::UpdateIndex` - Index update operations
34//!
35//! ## Dependents
36//!
37//! - `Indexing::Background::StartWatcher` - Watcher setup and management
38//! - `Indexing::mod::FileIndexer` - Main file indexer implementation
39//!
40//! ## VSCode Pattern Reference
41//!
42//! Inspired by VSCode's file watching in
43//! `src/vs/base/node/watcher/`
44//!
45//! ## Security Considerations
46//!
47//! - Path validation before watching
48//! - Symbolic link following disabled
49//! - Permission checking on watch paths
50//!
51//! ## Performance Considerations
52//!
53//! - Event debouncing prevents excessive updates
54//! - Batch processing of multiple events
55//! - Efficient event filtering
56//!
57//! ## Error Handling Strategy
58//!
59//! Event operations log warnings for individual errors and continue,
60//! ensuring a single event failure doesn't stop the watcher.
61//!
62//! ## Thread Safety
63//!
64//! Event handlers acquire write locks on shared state and process
65//! events asynchronously to avoid blocking the watcher loop.
66
67use std::path::PathBuf;
68
69use tokio::sync::{Mutex, RwLock};
70
71use crate::{AirError, Configuration::IndexingConfig, Indexing::State::CreateState::FileIndex, Result, dev_log};
72
73/// Handle file watcher event for incremental indexing
74///
75/// This function processes file system events and updates the index
76/// accordingly:
77/// - File Created: Index the new file
78/// - File Modified: Re-index the modified file
79/// - File Removed: Remove from index
80pub async fn HandleFileEvent(event:notify::Event, index_arc:&RwLock<FileIndex>, config:&IndexingConfig) -> Result<()> {
81	match event.kind {
82		notify::EventKind::Create(notify::event::CreateKind::File) => {
83			for path in event.paths {
84				dev_log!("indexing", "[WatchFile] File created: {}", path.display());
85
86				let mut index = index_arc.write().await;
87
88				if let Err(e) = crate::Indexing::Store::UpdateIndex::UpdateSingleFile(&mut index, &path, config).await {
89					dev_log!(
90						"indexing",
91						"warn: [WatchFile] Failed to index new file {}: {}",
92						path.display(),
93						e
94					);
95				}
96			}
97		},
98
99		notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
100		| notify::EventKind::Modify(notify::event::ModifyKind::Name(notify::event::RenameMode::Both)) => {
101			for path in event.paths {
102				dev_log!("indexing", "[WatchFile] File modified: {}", path.display());
103
104				let mut index = index_arc.write().await;
105
106				if let Err(e) = crate::Indexing::Store::UpdateIndex::UpdateSingleFile(&mut index, &path, config).await {
107					dev_log!(
108						"indexing",
109						"warn: [WatchFile] Failed to re-index modified file {}: {}",
110						path.display(),
111						e
112					);
113				}
114			}
115		},
116
117		notify::EventKind::Remove(notify::event::RemoveKind::File) => {
118			for path in event.paths {
119				dev_log!("indexing", "[WatchFile] File removed: {}", path.display());
120
121				let mut index = index_arc.write().await;
122
123				if let Err(e) = crate::Indexing::State::UpdateState::RemoveFileFromIndex(&mut index, &path) {
124					dev_log!(
125						"indexing",
126						"warn: [WatchFile] Failed to remove file from index {}: {}",
127						path.display(),
128						e
129					);
130				}
131			}
132		},
133
134		notify::EventKind::Create(notify::event::CreateKind::Folder) => {
135			for path in event.paths {
136				dev_log!("indexing", "[WatchFile] Directory created: {}", path.display()); // Directories themselves don't need indexing, just their
137
138				// contents
139			}
140		},
141
142		notify::EventKind::Remove(notify::event::RemoveKind::Folder) => {
143			for path in event.paths {
144				dev_log!("indexing", "[WatchFile] Directory removed: {}", path.display()); // Remove all files from this directory
145
146				let mut index = index_arc.write().await;
147
148				let mut paths_to_remove = Vec::new();
149
150				for indexed_path in index.files.keys() {
151					if indexed_path.starts_with(&path) {
152						paths_to_remove.push(indexed_path.clone());
153					}
154				}
155
156				for indexed_path in paths_to_remove {
157					if let Err(e) = crate::Indexing::State::UpdateState::RemoveFileFromIndex(&mut index, &indexed_path)
158					{
159						dev_log!(
160							"indexing",
161							"warn: [WatchFile] Failed to remove file {}: {}",
162							indexed_path.display(),
163							e
164						);
165					}
166				}
167			}
168		},
169
170		_ => {
171			// Ignore other event types
172			dev_log!("indexing", "ignored event kind: {:?}", event.kind);
173		},
174	}
175
176	Ok(())
177}
178
179/// Debounced file change handler
180///
181/// Prevents rapid successive changes from causing excessive re-indexing
182pub struct DebouncedEventHandler {
183	pending_changes:Mutex<std::collections::HashMap<PathBuf, FileChangeInfo>>,
184}
185
186impl DebouncedEventHandler {
187	pub fn new() -> Self { Self { pending_changes:Mutex::new(std::collections::HashMap::new()) } }
188
189	/// Add a file change event
190	pub async fn AddChange(&self, path:PathBuf, change_type:FileChangeType) {
191		let mut pending = self.pending_changes.lock().await;
192
193		let now = std::time::Instant::now();
194
195		match pending.get_mut(&path) {
196			Some(change_info) => {
197				change_info.last_seen = now;
198
199				change_info.change_type = change_type.max(change_info.change_type);
200
201				change_info.suppressed_count += 1;
202			},
203
204			None => {
205				pending.insert(
206					path.clone(),
207					FileChangeInfo { path:path.clone(), change_type, last_seen:now, suppressed_count:0 },
208				);
209			},
210		}
211	}
212
213	/// Process pending changes older than the specified duration
214	pub async fn ProcessPendingChanges(
215		&self,
216
217		age_cutoff:std::time::Duration,
218
219		index_arc:&RwLock<FileIndex>,
220
221		config:&IndexingConfig,
222	) -> Result<Vec<ProcessedChange>> {
223		let mut processed = Vec::new();
224
225		let expired_paths = {
226			let mut pending = self.pending_changes.lock().await;
227
228			let mut expired = Vec::new();
229
230			for (path, change_info) in pending.iter() {
231				if change_info.last_seen.elapsed() >= age_cutoff {
232					expired.push((path.clone(), change_info.clone()));
233				}
234			}
235
236			// Remove expired entries
237			for (path, _) in &expired {
238				pending.remove(path);
239			}
240
241			expired
242		};
243
244		for (path, change_info) in expired_paths {
245			dev_log!(
246				"indexing",
247				"[WatchFile] Processing debounced change for {} (suppressed: {})",
248				path.display(),
249				change_info.suppressed_count
250			);
251
252			let result = match change_info.change_type {
253				FileChangeType::Created => {
254					let mut index = index_arc.write().await;
255
256					crate::Indexing::Store::UpdateIndex::UpdateSingleFile(&mut index, &path, config)
257						.await
258						.map(|_| ProcessedChangeResult::Success)
259						.unwrap_or(ProcessedChangeResult::Failed)
260				},
261
262				FileChangeType::Modified => {
263					let mut index = index_arc.write().await;
264
265					super::super::Store::UpdateIndex::UpdateSingleFile(&mut index, &path, config)
266						.await
267						.map(|_| ProcessedChangeResult::Success)
268						.unwrap_or(ProcessedChangeResult::Failed)
269				},
270
271				FileChangeType::Removed => {
272					let mut index = index_arc.write().await;
273
274					crate::Indexing::State::UpdateState::RemoveFileFromIndex(&mut index, &path)
275						.map(|_| ProcessedChangeResult::Success)
276						.unwrap_or(ProcessedChangeResult::Failed)
277				},
278			};
279
280			processed.push(ProcessedChange {
281				path,
282				change_type:change_info.change_type,
283				suppressed_count:change_info.suppressed_count,
284				result,
285			});
286		}
287
288		Ok(processed)
289	}
290
291	/// Clear all pending changes
292	pub async fn ClearPending(&self) -> usize {
293		let mut pending = self.pending_changes.lock().await;
294
295		let count = pending.len();
296
297		pending.clear();
298
299		count
300	}
301
302	/// Get the number of pending changes
303	pub async fn PendingCount(&self) -> usize {
304		let pending = self.pending_changes.lock().await;
305
306		pending.len()
307	}
308}
309
310impl Default for DebouncedEventHandler {
311	fn default() -> Self { Self::new() }
312}
313
314/// File change type for debouncing
315#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
316pub enum FileChangeType {
317	Created,
318
319	Modified,
320
321	Removed,
322}
323
324impl FileChangeType {
325	pub fn max(self, other:Self) -> Self {
326		// Removed takes precedence over Modified, which takes precedence over Created
327		match (self, other) {
328			(Self::Removed, _) | (_, Self::Removed) => Self::Removed,
329
330			(Self::Modified, _) | (_, Self::Modified) => Self::Modified,
331
332			(Self::Created, Self::Created) => Self::Created,
333		}
334	}
335}
336
337/// File change information for debouncing
338#[derive(Debug, Clone)]
339struct FileChangeInfo {
340	path:PathBuf,
341
342	change_type:FileChangeType,
343
344	last_seen:std::time::Instant,
345
346	suppressed_count:usize,
347}
348
349/// Result of processing a debounced change
350#[derive(Debug, Clone)]
351pub enum ProcessedChangeResult {
352	Success,
353
354	Failed,
355}
356
357/// Describes a processed file change
358#[derive(Debug, Clone)]
359pub struct ProcessedChange {
360	pub path:PathBuf,
361
362	pub change_type:FileChangeType,
363
364	pub suppressed_count:usize,
365
366	pub result:ProcessedChangeResult,
367}
368
369/// Convert notify event kind to FileChangeType
370pub fn EventKindToChangeType(kind:notify::EventKind) -> Option<FileChangeType> {
371	match kind {
372		notify::EventKind::Create(_) => Some(FileChangeType::Created),
373
374		notify::EventKind::Modify(_) => Some(FileChangeType::Modified),
375
376		notify::EventKind::Remove(_) => Some(FileChangeType::Removed),
377
378		_ => None,
379	}
380}
381
382/// Check if a path should be watched (not in ignored paths)
383pub fn ShouldWatchPath(path:&PathBuf, ignored_patterns:&[String]) -> bool {
384	let path_str = path.to_string_lossy();
385
386	// Check against ignore patterns
387	for pattern in ignored_patterns {
388		if path_str.contains(pattern) {
389			return false;
390		}
391	}
392
393	true
394}
395
396/// Get default ignored patterns for file watching
397pub fn GetDefaultIgnoredPatterns() -> Vec<String> {
398	vec![
399		"node_modules".to_string(),
400		"target".to_string(),
401		".git".to_string(),
402		".svn".to_string(),
403		".hg".to_string(),
404		".bzr".to_string(),
405		"dist".to_string(),
406		"build".to_string(),
407		".next".to_string(),
408		".nuxt".to_string(),
409		"__pycache__".to_string(),
410		"*.pyc".to_string(),
411		".venv".to_string(),
412		"venv".to_string(),
413		"env".to_string(),
414		".env".to_string(),
415		".idea".to_string(),
416		".vscode".to_string(),
417		".DS_Store".to_string(),
418		"Thumbs.db".to_string(),
419		"*.swp".to_string(),
420		"*.tmp".to_string(),
421	]
422}
423
424/// Validate that a watch path exists and is accessible
425pub fn ValidateWatchPath(path:&PathBuf) -> Result<()> {
426	if !path.exists() {
427		return Err(AirError::FileSystem(format!("Watch path does not exist: {}", path.display())));
428	}
429
430	if !path.is_dir() {
431		return Err(AirError::FileSystem(format!(
432			"Watch path is not a directory: {}",
433			path.display()
434		)));
435	}
436
437	// Check read access
438	std::fs::read_dir(path)
439		.map_err(|e| AirError::FileSystem(format!("Cannot access watch path {}: {}", path.display(), e)))?;
440
441	Ok(())
442}