AirLibrary/Indexing/Store/
UpdateIndex.rs1use std::{path::PathBuf, sync::Arc, time::Duration};
71
72use tokio::{
73 sync::{RwLock, Semaphore},
74 time::Instant,
75};
76
77use crate::{
78 AirError,
79 Configuration::IndexingConfig,
80 Indexing::State::CreateState::{FileIndex, FileMetadata, SymbolInfo, SymbolLocation},
81 Result,
82};
83
84pub async fn UpdateSingleFile(
86 index:&mut FileIndex,
87 file_path:&PathBuf,
88 config:&IndexingConfig,
89) -> Result<Option<FileMetadata>> {
90 let start_time = Instant::now();
91
92 if !file_path.exists() {
94 crate::Indexing::State::UpdateState::RemoveFileFromIndex(index, file_path)?;
96 log::debug!("[UpdateIndex] Removed deleted file: {}", file_path.display());
97 return Ok(None);
98 }
99
100 let current_metadata = std::fs::metadata(file_path)
102 .map_err(|e| AirError::FileSystem(format!("Failed to get file metadata: {}", e)))?;
103
104 let current_modified = current_metadata
105 .modified()
106 .map_err(|e| AirError::FileSystem(format!("Failed to get modification time: {}", e)))?;
107
108 let current_modified_time = chrono::DateTime::<chrono::Utc>::from(current_modified);
109
110 let needs_update = match index.files.get(file_path) {
112 Some(old_metadata) => {
113 let checksum = crate::Indexing::Scan::ScanFile::CalculateChecksum(
115 &tokio::fs::read(file_path).await.unwrap_or_default(),
116 );
117 old_metadata.checksum != checksum
118 },
119 None => {
120 true
122 },
123 };
124
125 if !needs_update {
126 log::trace!("[UpdateIndex] File unchanged: {}", file_path.display());
127 return Ok(index.files.get(file_path).cloned());
128 }
129
130 use crate::Indexing::{
132 Process::ProcessContent::{DetectEncoding, DetectLanguage, DetectMimeType},
133 Scan::ScanFile::IndexFileInternal,
134 State::UpdateState::UpdateIndexMetadata,
135 };
136
137 let (metadata, symbols) = IndexFileInternal(file_path, config, &RwLock::new(index.clone()), &[]).await?;
138
139 crate::Indexing::State::UpdateState::RemoveFileFromIndex(index, file_path)?;
141 crate::Indexing::State::UpdateState::AddFileToIndex(index, file_path.clone(), metadata.clone(), symbols)?;
142
143 UpdateIndexMetadata(index)?;
145
146 let elapsed = start_time.elapsed();
147 log::trace!(
148 "[UpdateIndex] Updated {} in {}ms ({} symbols)",
149 file_path.display(),
150 elapsed.as_millis(),
151 metadata.symbol_count
152 );
153
154 Ok(Some(metadata))
155}
156
157pub async fn UpdateFileContent(index:&mut FileIndex, file_path:&PathBuf, metadata:&FileMetadata) -> Result<()> {
159 if !metadata.mime_type.starts_with("text/") && !metadata.mime_type.contains("json") {
161 return Ok(());
162 }
163
164 let content = tokio::fs::read_to_string(file_path)
165 .await
166 .map_err(|e| AirError::FileSystem(format!("Failed to read file content: {}", e)))?;
167
168 for (_, files) in index.content_index.iter_mut() {
170 files.retain(|p| p != file_path);
171 }
172
173 let tokens = crate::Indexing::Process::ProcessContent::TokenizeContent(&content);
175
176 for token in tokens {
177 if token.len() > 2 {
178 index
180 .content_index
181 .entry(token)
182 .or_insert_with(Vec::new)
183 .push(file_path.clone());
184 }
185 }
186
187 Ok(())
188}
189
190pub async fn UpdateFilesBatch(
192 index:&mut FileIndex,
193 file_paths:Vec<PathBuf>,
194 config:&IndexingConfig,
195) -> Result<UpdateBatchResult> {
196 let start_time = Instant::now();
197 let mut updated_count = 0u32;
198 let mut removed_count = 0u32;
199 let mut error_count = 0u32;
200 let mut total_size = 0u64;
201
202 for file_path in file_paths {
203 match UpdateSingleFile(index, &file_path, config).await {
204 Ok(Some(metadata)) => {
205 updated_count += 1;
206 total_size += metadata.size;
207 },
208 Ok(None) => {
209 removed_count += 1;
210 },
211 Err(e) => {
212 log::warn!("[UpdateIndex] Failed to update file {}: {}", file_path.display(), e);
213 error_count += 1;
214 },
215 }
216 }
217
218 crate::Indexing::State::UpdateState::UpdateIndexMetadata(index)?;
220
221 Ok(UpdateBatchResult {
222 updated_count,
223 removed_count,
224 error_count,
225 total_size,
226 duration_seconds:start_time.elapsed().as_secs_f64(),
227 })
228}
229
230#[derive(Debug, Clone)]
232pub struct UpdateBatchResult {
233 pub updated_count:u32,
234 pub removed_count:u32,
235 pub error_count:u32,
236 pub total_size:u64,
237 pub duration_seconds:f64,
238}
239
240pub struct DebouncedUpdate {
242 file_path:PathBuf,
243 last_seen:Instant,
244 index:*const RwLock<FileIndex>,
245 config:IndexingConfig,
246 duration:Duration,
247 pending:bool,
248}
249
250unsafe impl Send for DebouncedUpdate {}
251
252impl DebouncedUpdate {
253 pub fn new(file_path:PathBuf, index:&RwLock<FileIndex>, config:&IndexingConfig, duration:Duration) -> Self {
254 Self {
255 file_path,
256 last_seen:Instant::now(),
257 index:index as *const RwLock<FileIndex>,
258 config:config.clone(),
259 duration,
260 pending:false,
261 }
262 }
263
264 pub async fn trigger(&mut self) {
265 self.last_seen = Instant::now();
266 self.pending = true;
267 }
268
269 pub async fn process_if_ready(&mut self) -> Result<bool> {
270 if !self.pending {
271 return Ok(false);
272 }
273
274 if self.last_seen.elapsed() >= self.duration {
275 self.pending = false;
276
277 let index_ref = unsafe { &*self.index };
279 let mut index = index_ref.write().await;
280
281 match UpdateSingleFile(&mut index, &self.file_path, &self.config).await {
282 Ok(_) => {
283 log::debug!("[UpdateIndex] Debounced update completed: {}", self.file_path.display());
284 return Ok(true);
285 },
286 Err(e) => {
287 log::warn!("[UpdateIndex] Debounced update failed: {}", e);
288 return Err(e);
289 },
290 }
291 }
292
293 Ok(false)
294 }
295
296 pub fn clear_pending(&mut self) { self.pending = false; }
297}
298
299pub async fn ProcessWatcherEvent(
301 index:&mut FileIndex,
302 event:notify::Event,
303 config:&IndexingConfig,
304) -> Result<WatcherEventResult> {
305 let mut updated = 0u32;
306 let mut removed = 0u32;
307
308 for file_path in event.paths {
309 match event.kind {
310 notify::EventKind::Create(notify::event::CreateKind::File) => {
311 log::debug!("[UpdateIndex] File created: {}", file_path.display());
312 if UpdateSingleFile(index, &file_path, config).await.is_ok() {
313 updated += 1;
314 }
315 },
316 notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
317 | notify::EventKind::Modify(notify::event::ModifyKind::Name(notify::event::RenameMode::Both)) => {
318 log::debug!("[UpdateIndex] File modified: {}", file_path.display());
319 if UpdateSingleFile(index, &file_path, config).await.is_ok() {
320 updated += 1;
321 }
322 },
323 notify::EventKind::Remove(notify::event::RemoveKind::File) => {
324 log::debug!("[UpdateIndex] File removed: {}", file_path.display());
325 if super::super::State::UpdateState::RemoveFileFromIndex(index, &file_path).is_ok() {
326 removed += 1;
327 }
328 },
329 _ => {},
330 }
331 }
332
333 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
335
336 Ok(WatcherEventResult { updated, removed })
337}
338
339#[derive(Debug, Clone)]
341pub struct WatcherEventResult {
342 pub updated:u32,
343 pub removed:u32,
344}
345
346pub async fn CleanupRemovedFiles(index:&mut FileIndex) -> Result<u32> {
348 let mut paths_to_remove = Vec::new();
349 let all_paths:Vec<_> = index.files.keys().cloned().collect();
350
351 for path in all_paths {
352 if !path.exists() {
353 paths_to_remove.push(path);
354 }
355 }
356
357 for path in &paths_to_remove {
358 super::super::State::UpdateState::RemoveFileFromIndex(index, path)?;
359 }
360
361 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
363
364 log::debug!("[UpdateIndex] Cleaned up {} removed files", paths_to_remove.len());
365
366 Ok(paths_to_remove.len() as u32)
367}
368
369pub async fn RebuildIndex(
371 index:&mut FileIndex,
372 directories:Vec<String>,
373 patterns:Vec<String>,
374 config:&IndexingConfig,
375) -> Result<UpdateBatchResult> {
376 let start_time = Instant::now();
377
378 index.files.clear();
380 index.content_index.clear();
381 index.symbol_index.clear();
382 index.file_symbols.clear();
383
384 let (files_to_index, scan_result) =
386 crate::Indexing::Scan::ScanDirectory::ScanDirectoriesParallel(directories, patterns, config, 10).await?;
387
388 let semaphore = Arc::new(Semaphore::new(config.MaxParallelIndexing as usize));
390 let index_arc = Arc::new(RwLock::new(index.clone()));
391 let mut tasks = Vec::new();
392
393 for file_path in files_to_index {
394 let permit = semaphore.clone().acquire_owned().await.unwrap();
395 let index_ref = index_arc.clone();
396 let config_clone = config.clone();
397
398 let task = tokio::spawn(async move {
399 let _permit = permit;
400
401 crate::Indexing::Scan::ScanFile::IndexFileInternal(&file_path, &config_clone, &index_ref, &[]).await
402 });
403
404 tasks.push(task);
405 }
406
407 let mut updated_count = 0u32;
408 let mut total_size = 0u64;
409
410 for task in tasks {
411 match task.await {
412 Ok(Ok((metadata, symbols))) => {
413 let file_size = metadata.size;
414 super::super::State::UpdateState::AddFileToIndex(index, metadata.path.clone(), metadata, symbols)?;
415 updated_count += 1;
416 total_size += file_size;
417 },
418 Ok(Err(e)) => {
419 log::warn!("[UpdateIndex] Rebuild task failed: {}", e);
420 },
421 Err(e) => {
422 log::warn!("[UpdateIndex] Rebuild task join failed: {}", e);
423 },
424 }
425 }
426
427 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
429
430 Ok(UpdateBatchResult {
431 updated_count,
432 removed_count:0,
433 error_count:scan_result.errors,
434 total_size,
435 duration_seconds:start_time.elapsed().as_secs_f64(),
436 })
437}
438
439pub async fn ValidateAndRepairIndex(index:&mut FileIndex) -> Result<RepairResult> {
441 let start_time = Instant::now();
442 let mut repaired_files = 0u32;
443 let mut removed_orphans = 0u32;
444
445 match super::super::State::UpdateState::ValidateIndexConsistency(index) {
447 Ok(()) => {},
448 Err(e) => {
449 log::warn!("[UpdateIndex] Index validation failed: {}", e);
450 repaired_files += 1;
451 },
452 }
453
454 removed_orphans = super::super::State::UpdateState::CleanupOrphanedEntries(index)?;
456
457 super::super::State::UpdateState::UpdateIndexMetadata(index)?;
459
460 Ok(RepairResult {
461 repaired_files,
462 removed_orphans,
463 duration_seconds:start_time.elapsed().as_secs_f64(),
464 })
465}
466
467#[derive(Debug, Clone)]
469pub struct RepairResult {
470 pub repaired_files:u32,
471 pub removed_orphans:u32,
472 pub duration_seconds:f64,
473}