AirLibrary/Indexing/Background/
StartWatcher.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
67
68use tokio::{
69 sync::{Mutex, RwLock, Semaphore},
70 task::JoinHandle,
71};
72
73use crate::{
74 AirError,
75 ApplicationState::ApplicationState,
76 Configuration::IndexingConfig,
77 Indexing::State::CreateState::{FileIndex, SymbolLocation},
78 Result,
79};
80
81const MAX_WATCH_PROCESSORS:usize = 5;
83
84pub struct BackgroundIndexerContext {
86 pub app_state:Arc<ApplicationState>,
88 pub file_index:Arc<RwLock<FileIndex>>,
90 pub corruption_detected:Arc<Mutex<bool>>,
92 pub file_watcher:Arc<Mutex<Option<notify::RecommendedWatcher>>>,
94 pub indexing_semaphore:Arc<Semaphore>,
96 pub debounced_handler:Arc<crate::Indexing::Watch::WatchFile::DebouncedEventHandler>,
98}
99
100impl BackgroundIndexerContext {
101 pub fn new(app_state:Arc<ApplicationState>, file_index:Arc<RwLock<FileIndex>>) -> Self {
102 Self {
103 app_state,
104 file_index,
105 corruption_detected:Arc::new(Mutex::new(false)),
106 file_watcher:Arc::new(Mutex::new(None)),
107 indexing_semaphore:Arc::new(Semaphore::new(MAX_WATCH_PROCESSORS)),
108 debounced_handler:Arc::new(crate::Indexing::Watch::WatchFile::DebouncedEventHandler::new()),
109 }
110 }
111}
112
113pub async fn StartFileWatcher(context:&BackgroundIndexerContext, paths:Vec<PathBuf>) -> Result<()> {
121 use notify::{RecursiveMode, Watcher};
122
123 let index = context.file_index.clone();
124 let corruption_flag = context.corruption_detected.clone();
125 let config = context.app_state.Configuration.Indexing.clone();
126 let debounced_handler = context.debounced_handler.clone();
127
128 let mut watcher:notify::RecommendedWatcher = Watcher::new(
130 move |res:std::result::Result<notify::Event, notify::Error>| {
131 if let Ok(event) = res {
132 if *corruption_flag.blocking_lock() {
134 log::warn!("[StartWatcher] Skipping file event - index marked as corrupted");
135 return;
136 }
137
138 let index = index.clone();
139 let debounced_handler = debounced_handler.clone();
140 let config_clone = config.clone();
141
142 tokio::spawn(async move {
143 if let Some(change_type) = crate::Indexing::Watch::WatchFile::EventKindToChangeType(event.kind) {
145 for path in &event.paths {
146 if crate::Indexing::Watch::WatchFile::ShouldWatchPath(
147 path,
148 &crate::Indexing::Watch::WatchFile::GetDefaultIgnoredPatterns(),
149 ) {
150 debounced_handler.AddChange(path.clone(), change_type).await;
151 }
152 }
153 }
154 });
155 }
156 },
157 notify::Config::default(),
158 )
159 .map_err(|e| AirError::Internal(format!("Failed to create file watcher: {}", e)))?;
160
161 for path in &paths {
163 if path.exists() {
164 match crate::Indexing::Watch::WatchFile::ValidateWatchPath(path) {
165 Ok(()) => {
166 watcher
167 .watch(path, notify::RecursiveMode::Recursive)
168 .map_err(|e| AirError::FileSystem(format!("Failed to watch path {}: {}", path.display(), e)))?;
169 log::info!("[StartWatcher] Watching path: {}", path.display());
170 },
171 Err(e) => {
172 log::warn!("[StartWatcher] Skipping invalid watch path {}: {}", path.display(), e);
173 },
174 }
175 }
176 }
177
178 *context.file_watcher.lock().await = Some(watcher);
179
180 log::info!("[StartWatcher] File watcher started successfully for {} paths", paths.len());
181
182 Ok(())
183}
184
185pub fn StartDebounceProcessor(context:Arc<BackgroundIndexerContext>) -> JoinHandle<()> {
187 tokio::spawn(async move {
188 log::info!("[StartWatcher] Debounce processor started");
189
190 let interval = Duration::from_millis(100); let debounce_cutoff = Duration::from_millis(500);
193
194 loop {
195 tokio::time::sleep(interval).await;
196 {
197 if *context.corruption_detected.lock().await {
199 log::warn!("[StartWatcher] Index corrupted, pausing debounce processing");
200 tokio::time::sleep(Duration::from_secs(5)).await;
201 continue;
202 }
203
204 let config = context.app_state.Configuration.Indexing.clone();
206
207 match context
208 .debounced_handler
209 .ProcessPendingChanges(debounce_cutoff, &context.file_index, &config)
210 .await
211 {
212 Ok(changes) => {
213 if !changes.is_empty() {
214 log::debug!("[StartWatcher] Processed {} debounced changes", changes.len());
215 }
216 },
217 Err(e) => {
218 log::error!("[StartWatcher] Failed to process pending changes: {}", e);
219 },
220 }
221 }
222 }
223 })
224}
225
226pub async fn StartBackgroundTasks(context:Arc<BackgroundIndexerContext>) -> Result<tokio::task::JoinHandle<()>> {
228 let config = &context.app_state.Configuration.Indexing;
229
230 if !config.Enabled {
231 log::info!("[StartWatcher] Background indexing disabled in configuration");
232 return Err(AirError::Configuration("Background indexing is disabled".to_string()));
233 }
234
235 let handle = tokio::spawn(BackgroundTask(context));
236
237 log::info!("[StartWatcher] Background tasks started");
238
239 Ok(handle)
240}
241
242pub async fn StopBackgroundTasks(context:&BackgroundIndexerContext) {
244 log::info!("[StartWatcher] Stopping background tasks");
245 }
247
248pub async fn StopFileWatcher(context:&BackgroundIndexerContext) {
250 if let Some(watcher) = context.file_watcher.lock().await.take() {
251 drop(watcher);
252 log::info!("[StartWatcher] File watcher stopped");
253 }
254}
255
256async fn BackgroundTask(context:Arc<BackgroundIndexerContext>) {
258 let config = context.app_state.Configuration.Indexing.clone();
259
260 let interval = Duration::from_secs(config.UpdateIntervalMinutes as u64 * 60);
261 let mut interval = tokio::time::interval(interval);
262
263 log::info!(
264 "[StartWatcher] Background indexing configured for {} minute intervals",
265 config.UpdateIntervalMinutes
266 );
267
268 loop {
269 interval.tick().await;
270 {
271 if *context.corruption_detected.lock().await {
273 log::warn!("[StartWatcher] Index corrupted, skipping background update");
274 continue;
275 }
276
277 log::info!("[StartWatcher] Running periodic background index...");
278
279 let directories = config.IndexDirectory.clone();
281 if let Err(e) = crate::Indexing::Scan::ScanDirectory::ScanDirectory(&directories, vec![], &config, 10).await
282 {
283 log::error!("[StartWatcher] Background indexing failed: {}", e);
284 }
285 }
286 }
287}
288
289pub async fn GetWatcherStatus(context:&BackgroundIndexerContext) -> WatcherStatus {
291 let is_running = context.file_watcher.lock().await.is_some();
292 let pending_count = context.debounced_handler.PendingCount().await;
293 let is_corrupted = *context.corruption_detected.lock().await;
294
295 WatcherStatus { is_running, pending_count, is_corrupted }
296}
297
298#[derive(Debug, Clone)]
300pub struct WatcherStatus {
301 pub is_running:bool,
302 pub pending_count:usize,
303 pub is_corrupted:bool,
304}
305
306pub async fn StartAll(
308 context:Arc<BackgroundIndexerContext>,
309 watch_paths:Vec<PathBuf>,
310) -> Result<(Option<JoinHandle<()>>, Option<JoinHandle<()>>)> {
311 let watcher_handle = if config_watch_enabled(&context) {
312 match StartFileWatcher(&context, watch_paths).await {
313 Ok(()) => {
314 Some(StartDebounceProcessor(context.clone()))
316 },
317 Err(e) => {
318 log::error!("[StartWatcher] Failed to start file watcher: {}", e);
319 None
320 },
321 }
322 } else {
323 None
324 };
325
326 let background_handle = match StartBackgroundTasks(context.clone()).await {
327 Ok(handle) => Some(handle),
328 Err(e) => {
329 log::warn!("[StartWatcher] Failed to start background tasks: {}", e);
330 None
331 },
332 };
333
334 Ok((watcher_handle, background_handle))
335}
336
337pub async fn StopAll(context:&BackgroundIndexerContext) {
339 StopBackgroundTasks(context).await;
340 StopFileWatcher(context).await;
341}
342
343fn config_watch_enabled(context:&BackgroundIndexerContext) -> bool { context.app_state.Configuration.Indexing.Enabled }