import asyncio import queue import shutil import signal import tempfile import threading import time from pathlib import Path, PurePosixPath from unittest.mock import MagicMock, patch import msgspec import pytest from cista import config, watching from cista.protocol import UpdateMessage @pytest.fixture def temp_dir(): """Create a temporary directory for testing.""" with tempfile.TemporaryDirectory() as tmpdirname: yield Path(tmpdirname) @pytest.fixture def setup_watcher(temp_dir): """Setup the watcher with a temporary directory.""" # Store original values original_rootpath = watching.rootpath original_state = watching.state original_quit = watching.quit # Setup test environment config.config = config.Config(path=temp_dir, listen=":0") watching.rootpath = temp_dir watching.state = watching.State() watching.quit = threading.Event() yield temp_dir # Cleanup watching.quit.set() watching.rootpath = original_rootpath watching.state = original_state watching.quit = original_quit def create_test_structure(base_path: Path): """Create a test directory structure with subdirectories and files.""" # Create main subdirectory with files subdir = base_path / "test_subdir" subdir.mkdir() # Add some files to the subdirectory (subdir / "file1.txt").write_text("content1") (subdir / "file2.txt").write_text("content2") # Create a nested subdirectory nested = subdir / "nested" nested.mkdir() (nested / "nested_file.txt").write_text("nested content") # Create another top-level directory for reference other_dir = base_path / "other_dir" other_dir.mkdir() (other_dir / "other_file.txt").write_text("other content") return subdir, nested, other_dir def test_nested_directory_rename_causes_hang(setup_watcher): """Test renaming deeply nested directories - this is where the hang typically occurs. The bug manifests when renaming directories that are nested within other directories, not just top-level directories. """ temp_dir = setup_watcher # Create a complex nested structure that mirrors real-world usage # parent/child/grandchild/target_dir/files... parent = temp_dir / "parent_folder" parent.mkdir() child = parent / "child_folder" child.mkdir() grandchild = child / "grandchild_folder" grandchild.mkdir() # This is the directory we'll rename - it's deeply nested target_dir = grandchild / "target_to_rename" target_dir.mkdir() # Add files to make the directory scan more complex for i in range(20): (target_dir / f"file_{i:03d}.txt").write_text(f"content_{i}") # Add another nested level inside target deep_nested = target_dir / "even_deeper" deep_nested.mkdir() for i in range(10): (deep_nested / f"deep_file_{i}.txt").write_text(f"deep_content_{i}") # Initialize watcher state initial_root = watching.walk(PurePosixPath()) watching.state.root = initial_root # Verify the nested structure exists target_path = PurePosixPath("parent_folder/child_folder/grandchild_folder/target_to_rename") initial_begin, initial_entries = watching.treeget(initial_root, target_path) assert initial_begin is not None, "Target directory should be found in initial state" assert len(initial_entries) > 1, "Target directory should contain files" # Now rename the deeply nested directory new_target = grandchild / "renamed_target" target_dir.rename(new_target) loop = asyncio.new_event_loop() working_state = watching.state.root[:] # This is where the hang likely occurs - updating a deeply nested path old_nested_path = PurePosixPath("parent_folder/child_folder/grandchild_folder/target_to_rename") new_nested_path = PurePosixPath("parent_folder/child_folder/grandchild_folder/renamed_target") start_time = time.time() # Update the old path (should remove it) watching.update_path(working_state, old_nested_path, loop) # Update the new path (should add it) watching.update_path(working_state, new_nested_path, loop) end_time = time.time() # Check for hang - nested operations should still be fast duration = end_time - start_time assert duration < 3.0, f"Nested directory rename took too long: {duration}s - possible hang" # Verify the old nested path is gone old_begin, old_entries = watching.treeget(working_state, old_nested_path) assert old_begin is None, "Old nested directory should be removed from tree" # Verify the new nested path exists new_begin, new_entries = watching.treeget(working_state, new_nested_path) assert new_begin is not None, "New nested directory should exist in tree" assert len(new_entries) > 1, "New nested directory should contain all the files" def test_move_directory_across_nested_parents(setup_watcher): """Test moving a directory from one nested location to another - high hang risk scenario.""" temp_dir = setup_watcher # Create source nested structure source_parent = temp_dir / "source_area" source_parent.mkdir() source_child = source_parent / "source_child" source_child.mkdir() # Create the directory to move movable_dir = source_child / "movable_directory" movable_dir.mkdir() # Add content to make it more complex for i in range(15): (movable_dir / f"file_{i}.txt").write_text(f"movable_content_{i}") # Create a subdirectory within the movable directory sub_movable = movable_dir / "sub_directory" sub_movable.mkdir() for i in range(5): (sub_movable / f"sub_file_{i}.txt").write_text(f"sub_content_{i}") # Create destination nested structure dest_parent = temp_dir / "destination_area" dest_parent.mkdir() dest_child = dest_parent / "dest_child" dest_child.mkdir() dest_grandchild = dest_child / "dest_grandchild" dest_grandchild.mkdir() # Initialize state watching.state.root = watching.walk(PurePosixPath()) working_state = watching.state.root[:] # Move the directory to the deeply nested destination dest_movable = dest_grandchild / "moved_directory" movable_dir.rename(dest_movable) loop = asyncio.new_event_loop() # These paths represent the complex nested move operation old_path = PurePosixPath("source_area/source_child/movable_directory") new_path = PurePosixPath("destination_area/dest_child/dest_grandchild/moved_directory") start_time = time.time() # This sequence is where hangs typically occur with cross-directory moves try: # Remove from old location watching.update_path(working_state, old_path, loop) # Add to new location watching.update_path(working_state, new_path, loop) except Exception as e: pytest.fail(f"Nested directory move failed: {e}") end_time = time.time() duration = end_time - start_time # Should complete without hanging assert duration < 5.0, f"Cross-nested move took too long: {duration}s" # Verify old location is empty old_begin, old_entries = watching.treeget(working_state, old_path) assert old_begin is None, "Directory should be removed from old nested location" # Verify new location has the directory new_begin, new_entries = watching.treeget(working_state, new_path) assert new_begin is not None, "Directory should exist in new nested location" assert len(new_entries) > 1, "Moved directory should retain all its contents" def test_rapid_nested_directory_operations_cause_corruption(setup_watcher): """Test rapid operations on nested directories that can cause state corruption.""" temp_dir = setup_watcher # Create multiple nested structures structures = [] for i in range(3): level1 = temp_dir / f"level1_{i}" level1.mkdir() level2 = level1 / f"level2_{i}" level2.mkdir() level3 = level2 / f"level3_{i}" level3.mkdir() target = level3 / f"target_{i}" target.mkdir() # Add files for j in range(10): (target / f"file_{j}.txt").write_text(f"content_{i}_{j}") structures.append((level1, level2, level3, target)) # Initialize state watching.state.root = watching.walk(PurePosixPath()) working_state = watching.state.root[:] loop = asyncio.new_event_loop() # Perform rapid nested operations that can cause race conditions operations = [] for i, (level1, level2, level3, target) in enumerate(structures): # Rename the deeply nested target new_target = level3 / f"renamed_target_{i}" target.rename(new_target) old_path = PurePosixPath(f"level1_{i}/level2_{i}/level3_{i}/target_{i}") new_path = PurePosixPath(f"level1_{i}/level2_{i}/level3_{i}/renamed_target_{i}") operations.append((old_path, new_path)) start_time = time.time() # Process all operations rapidly - this can cause state corruption/hangs for old_path, new_path in operations: try: watching.update_path(working_state, old_path, loop) watching.update_path(working_state, new_path, loop) except Exception as e: pytest.fail(f"Rapid nested operations failed for {old_path} -> {new_path}: {e}") end_time = time.time() duration = end_time - start_time # Should complete without hanging even with rapid operations assert duration < 10.0, f"Rapid nested operations took too long: {duration}s" # Verify final state consistency for i, (old_path, new_path) in enumerate(operations): # Old paths should be gone old_begin, old_entries = watching.treeget(working_state, old_path) assert old_begin is None, f"Old path {old_path} should be removed" # New paths should exist new_begin, new_entries = watching.treeget(working_state, new_path) assert new_begin is not None, f"New path {new_path} should exist" def test_nested_directory_treeget_corruption(setup_watcher): """Test that treeget function handles nested path operations correctly without corruption.""" temp_dir = setup_watcher # Create a complex tree structure root_dirs = [] for i in range(3): root_dir = temp_dir / f"root_{i}" root_dir.mkdir() for j in range(2): mid_dir = root_dir / f"mid_{j}" mid_dir.mkdir() for k in range(2): leaf_dir = mid_dir / f"leaf_{k}" leaf_dir.mkdir() # Add files to leaf directories for l in range(5): (leaf_dir / f"file_{l}.txt").write_text(f"content_{i}_{j}_{k}_{l}") root_dirs.append(root_dir) # Initialize state initial_root = watching.walk(PurePosixPath()) watching.state.root = initial_root # Test treeget with various nested paths test_paths = [ PurePosixPath("root_0"), PurePosixPath("root_0/mid_0"), PurePosixPath("root_0/mid_0/leaf_0"), PurePosixPath("root_1/mid_1/leaf_1"), PurePosixPath("root_2/mid_0/leaf_1"), ] # Verify treeget works correctly for all paths for path in test_paths: begin, entries = watching.treeget(initial_root, path) assert begin is not None, f"treeget should find existing path: {path}" assert len(entries) >= 1, f"treeget should return entries for: {path}" # Now rename a nested directory and test treeget consistency old_leaf = temp_dir / "root_0" / "mid_0" / "leaf_0" new_leaf = temp_dir / "root_0" / "mid_0" / "renamed_leaf" old_leaf.rename(new_leaf) # Update the state loop = asyncio.new_event_loop() working_state = initial_root[:] old_nested_path = PurePosixPath("root_0/mid_0/leaf_0") new_nested_path = PurePosixPath("root_0/mid_0/renamed_leaf") # Update paths watching.update_path(working_state, old_nested_path, loop) watching.update_path(working_state, new_nested_path, loop) # Verify treeget consistency after the update old_begin, old_entries = watching.treeget(working_state, old_nested_path) assert old_begin is None, "Old nested path should not be found after rename" new_begin, new_entries = watching.treeget(working_state, new_nested_path) assert new_begin is not None, "New nested path should be found after rename" assert len(new_entries) >= 1, "New nested path should have entries" # Verify that other paths are still accessible (no corruption) for path in [PurePosixPath("root_1/mid_1/leaf_1"), PurePosixPath("root_2/mid_0/leaf_1")]: begin, entries = watching.treeget(working_state, path) assert begin is not None, f"Other paths should remain accessible: {path}" def test_format_update_infinite_loop_with_complex_nested_changes(setup_watcher): """Create a scenario that specifically triggers infinite loops in format_update. The hang often occurs in format_update when the diff algorithm gets confused by complex nested directory moves. """ temp_dir = setup_watcher # Create a complex scenario that can confuse the diff algorithm # Multiple directories with similar names and nested structures dirs_data = [] for i in range(4): # Create main directory main_dir = temp_dir / f"main_{i}" main_dir.mkdir() # Create subdirectories with similar patterns sub_dir = main_dir / "common_subdir_name" sub_dir.mkdir() # Create files with varying content for j in range(15): (sub_dir / f"file_{j:02d}.txt").write_text(f"main_{i}_content_{j}") # Add another level of nesting nested = sub_dir / "nested_level" nested.mkdir() for j in range(8): (nested / f"nested_{j}.txt").write_text(f"nested_{i}_{j}") dirs_data.append((main_dir, sub_dir, nested)) # Get initial state old_state = watching.walk(PurePosixPath()) # Perform complex renames that can confuse the diff algorithm # Rename all subdirectories to have even more similar names for i, (main_dir, sub_dir, nested) in enumerate(dirs_data): # Rename the subdirectory to a name that's very similar to others new_sub_name = f"renamed_common_subdir_{i}" new_sub_dir = main_dir / new_sub_name sub_dir.rename(new_sub_dir) # Also rename some files to create more confusion for j in range(0, 10, 2): # Rename every other file old_file = new_sub_dir / f"file_{j:02d}.txt" new_file = new_sub_dir / f"renamed_file_{j:02d}.txt" if old_file.exists(): old_file.rename(new_file) # Get new state new_state = watching.walk(PurePosixPath()) # This is the critical test - format_update with complex nested changes # that have caused infinite loops in the past start_time = time.time() try: # Set a more aggressive timeout def timeout_handler(signum, frame): raise TimeoutError("format_update appears to be hanging") # Set a 10-second timeout signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(10) try: update_msg = watching.format_update(old_state, new_state) signal.alarm(0) # Cancel the alarm end_time = time.time() duration = end_time - start_time # Even complex diffs should complete quickly assert duration < 8.0, f"format_update took {duration}s - possible infinite loop" # Verify the result is valid assert update_msg, "format_update should return a message" decoded = msgspec.json.decode(update_msg, type=UpdateMessage) assert decoded.update, "Update should contain operations" except TimeoutError: signal.alarm(0) pytest.fail("format_update hung/infinite loop detected with complex nested changes") except Exception as e: signal.alarm(0) pytest.fail(f"format_update failed: {e}") def test_update_path_with_corrupted_tree_state(setup_watcher): """Test update_path when the tree state becomes corrupted by rapid changes.""" temp_dir = setup_watcher # Create a nested structure parent = temp_dir / "parent" parent.mkdir() child = parent / "child" child.mkdir() target = child / "target_dir" target.mkdir() # Add many files to make operations slower for i in range(30): (target / f"file_{i:03d}.txt").write_text(f"content_{i}") # Add nested subdirectories for i in range(3): subdir = target / f"subdir_{i}" subdir.mkdir() for j in range(10): (subdir / f"sub_file_{j}.txt").write_text(f"sub_content_{i}_{j}") # Initialize state watching.state.root = watching.walk(PurePosixPath()) # Create a working copy that we'll manually corrupt to simulate race conditions working_state = watching.state.root[:] loop = asyncio.new_event_loop() # Rename the directory new_target = child / "renamed_target" target.rename(new_target) # Simulate the race condition by manually corrupting the tree state # This mimics what happens when inotify events arrive out of order # First, try to update a path that should exist old_path = PurePosixPath("parent/child/target_dir") # Manually remove an entry to simulate corruption if len(working_state) > 5: # Remove a random entry to corrupt the tree structure del working_state[3] start_time = time.time() try: # This should handle corrupted state gracefully watching.update_path(working_state, old_path, loop) # Now add the new path new_path = PurePosixPath("parent/child/renamed_target") watching.update_path(working_state, new_path, loop) end_time = time.time() duration = end_time - start_time # Should complete without hanging even with corrupted state assert duration < 5.0, f"update_path with corrupted state took {duration}s" except Exception as e: # Some exceptions are expected with corrupted state, but shouldn't hang end_time = time.time() duration = end_time - start_time assert duration < 5.0, f"update_path hung even when failing: {duration}s" def test_simulate_real_inotify_event_sequence(setup_watcher): """Simulate the exact inotify event sequence that causes hangs.""" temp_dir = setup_watcher # Create the exact scenario from real usage that triggers the bug project_dir = temp_dir / "project" project_dir.mkdir() src_dir = project_dir / "src" src_dir.mkdir() components_dir = src_dir / "components" components_dir.mkdir() # This is the directory that will be renamed old_component = components_dir / "OldComponent" old_component.mkdir() # Add files that exist in real projects for filename in ["index.tsx", "styles.css", "types.ts", "utils.ts"]: (old_component / filename).write_text(f"// {filename} content") # Add a subdirectory with more files sub_dir = old_component / "subcomponents" sub_dir.mkdir() for i in range(5): (sub_dir / f"SubComponent{i}.tsx").write_text(f"// SubComponent{i}") # Initialize state watching.state.root = watching.walk(PurePosixPath()) working_state = watching.state.root[:] loop = asyncio.new_event_loop() # This is the exact operation that causes hangs in real usage new_component = components_dir / "NewComponent" old_component.rename(new_component) # Simulate the inotify event sequence that causes problems # IN_MOVED_FROM event for the old directory old_path = PurePosixPath("project/src/components/OldComponent") # IN_MOVED_TO event for the new directory new_path = PurePosixPath("project/src/components/NewComponent") # Track how long the operations take start_time = time.time() # Set up timeout detection def timeout_handler(signum, frame): raise TimeoutError("Simulated inotify sequence hung") signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(15) # 15 second timeout try: # This sequence is where the hang occurs in real usage watching.update_path(working_state, old_path, loop) watching.update_path(working_state, new_path, loop) # If we get here without hanging, cancel the alarm signal.alarm(0) end_time = time.time() duration = end_time - start_time # Real inotify operations should be fast assert duration < 10.0, f"Simulated inotify sequence took {duration}s" # Verify the final state is correct old_begin, old_entries = watching.treeget(working_state, old_path) assert old_begin is None, "Old component path should be removed" new_begin, new_entries = watching.treeget(working_state, new_path) assert new_begin is not None, "New component path should exist" assert len(new_entries) > 1, "New component should contain all files" except TimeoutError: signal.alarm(0) pytest.fail("HANG DETECTED: Simulated inotify event sequence hung!") except Exception as e: signal.alarm(0) pytest.fail(f"Simulated inotify sequence failed: {e}") finally: signal.alarm(0) # Ensure alarm is cancelled """Test format_update with nested directory changes that could cause infinite loops.""" temp_dir = setup_watcher # Create complex nested structure that has caused issues complex_structure = temp_dir / "complex" complex_structure.mkdir() # Create multiple levels with similar names (potential for confusion) level_a = complex_structure / "level_a" level_a.mkdir() sublevel_a = level_a / "sublevel" sublevel_a.mkdir() level_b = complex_structure / "level_b" level_b.mkdir() sublevel_b = level_b / "sublevel" sublevel_b.mkdir() # Add files to each sublevel for i in range(10): (sublevel_a / f"file_a_{i}.txt").write_text(f"content_a_{i}") (sublevel_b / f"file_b_{i}.txt").write_text(f"content_b_{i}") # Get initial state old_state = watching.walk(PurePosixPath()) # Perform nested directory renames that could confuse the diff algorithm renamed_sublevel_a = level_a / "renamed_sublevel" sublevel_a.rename(renamed_sublevel_a) renamed_sublevel_b = level_b / "also_renamed_sublevel" sublevel_b.rename(renamed_sublevel_b) # Get new state new_state = watching.walk(PurePosixPath()) # This is where infinite loops or hangs can occur in format_update start_time = time.time() try: update_msg = watching.format_update(old_state, new_state) end_time = time.time() duration = end_time - start_time assert duration < 5.0, f"format_update took too long with nested changes: {duration}s" # Verify the update message is valid assert update_msg, "format_update should return valid message" decoded = msgspec.json.decode(update_msg, type=UpdateMessage) assert decoded.update, "Update should contain operations" except Exception as e: pytest.fail(f"format_update failed or hung with nested directory changes: {e}") """Test that reproduces the hang when directory rename events race with updates. This test simulates the exact conditions that cause the hang: 1. Create a directory with files 2. Start monitoring it 3. Rename the directory while the watcher is processing events 4. This should cause a hang where old directory names are preserved """ temp_dir = setup_watcher # Create test structure with many files to increase chance of race conditions subdir = temp_dir / "original_dir" subdir.mkdir() # Create many files to make the directory scan take longer for i in range(50): (subdir / f"file_{i:03d}.txt").write_text(f"content_{i}") # Create nested directories nested = subdir / "nested" nested.mkdir() for i in range(20): (nested / f"nested_file_{i:03d}.txt").write_text(f"nested_content_{i}") # Initial scan to populate the state initial_root = watching.walk(PurePosixPath()) watching.state.root = initial_root # Verify initial structure initial_names = [entry.name for entry in initial_root] assert "original_dir" in initial_names # Create a mock event loop for testing loop = asyncio.new_event_loop() # Simulate the problematic sequence: # 1. Start processing the original directory # 2. Rename it while processing # 3. Try to update both old and new paths # Start by getting the initial state original_rootmod = watching.state.root[:] # Rename the directory renamed_dir = temp_dir / "renamed_dir" subdir.rename(renamed_dir) # Now simulate what happens in the inotify watcher: # Multiple rapid updates that can cause race conditions # First, try to update the old path (should remove it) watching.update_path(original_rootmod, PurePosixPath("original_dir"), loop) # Then try to update the new path (should add it) watching.update_path(original_rootmod, PurePosixPath("renamed_dir"), loop) # Check if the state is consistent final_names = [entry.name for entry in original_rootmod] # The bug would manifest as: # 1. Old directory name still present (should be gone) # 2. New directory name missing (should be there) # 3. Inconsistent state causing hangs # This is the expected correct behavior assert "original_dir" not in final_names, "Old directory name should be removed" assert "renamed_dir" in final_names, "New directory name should be present" # Additional check: verify we can still walk the renamed directory renamed_walk = watching.walk(PurePosixPath("renamed_dir")) assert len(renamed_walk) > 1, "Should be able to walk renamed directory" def test_concurrent_inotify_events_simulation(setup_watcher): """Simulate concurrent inotify events that can cause the hanging bug.""" temp_dir = setup_watcher # Create a complex directory structure dirs = ["dir_a", "dir_b", "dir_c"] created_dirs = [] for dir_name in dirs: dir_path = temp_dir / dir_name dir_path.mkdir() # Add files to each directory for i in range(10): (dir_path / f"file_{i}.txt").write_text(f"content in {dir_name}") created_dirs.append(dir_path) # Initial state watching.state.root = watching.walk(PurePosixPath()) original_state = watching.state.root[:] loop = asyncio.new_event_loop() # Simulate rapid concurrent operations that happen in real usage # This mimics what happens when multiple filesystem events arrive rapidly # Rename all directories simultaneously (as might happen with mv commands) renamed_paths = [] for i, dir_path in enumerate(created_dirs): new_path = temp_dir / f"renamed_{dirs[i]}" dir_path.rename(new_path) renamed_paths.append(new_path) # Now simulate the inotify event processing that causes issues # In the real code, these updates happen in rapid succession # and can cause race conditions working_state = original_state[:] # Process removal events (IN_MOVED_FROM) for dir_name in dirs: try: watching.update_path(working_state, PurePosixPath(dir_name), loop) except Exception as e: # The bug might manifest as exceptions during updates pytest.fail(f"Update path failed for {dir_name}: {e}") # Process addition events (IN_MOVED_TO) for i, dir_name in enumerate(dirs): try: new_name = f"renamed_{dir_name}" watching.update_path(working_state, PurePosixPath(new_name), loop) except Exception as e: pytest.fail(f"Update path failed for {new_name}: {e}") # Verify final state is consistent final_names = [entry.name for entry in working_state] # Check that old names are gone for dir_name in dirs: assert dir_name not in final_names, f"Old directory {dir_name} should be removed" # Check that new names are present for i, dir_name in enumerate(dirs): new_name = f"renamed_{dir_name}" assert new_name in final_names, f"New directory {new_name} should be present" def test_format_update_with_rapid_changes(setup_watcher): """Test format_update with rapid directory changes that can cause hangs.""" temp_dir = setup_watcher # Create initial structure initial_dirs = ["test1", "test2", "test3"] for dir_name in initial_dirs: dir_path = temp_dir / dir_name dir_path.mkdir() (dir_path / "file.txt").write_text("test content") # Get initial state old_state = watching.walk(PurePosixPath()) # Perform rapid renames for i, dir_name in enumerate(initial_dirs): old_path = temp_dir / dir_name new_path = temp_dir / f"renamed_{dir_name}" old_path.rename(new_path) # Get new state new_state = watching.walk(PurePosixPath()) # This is where the hang might occur - in format_update start_time = time.time() try: update_msg = watching.format_update(old_state, new_state) end_time = time.time() # Should complete quickly duration = end_time - start_time assert duration < 5.0, f"format_update took too long: {duration}s" # Decode the update to verify it's valid decoded = msgspec.json.decode(update_msg, type=UpdateMessage) assert decoded.update, "Update message should contain operations" except Exception as e: pytest.fail(f"format_update failed or hung: {e}") def test_update_path_with_missing_directory(setup_watcher): """Test update_path when called on a directory that no longer exists. This simulates the race condition where update_path is called for a path that was just moved/deleted. """ temp_dir = setup_watcher # Create and populate initial state test_dir = temp_dir / "disappearing_dir" test_dir.mkdir() (test_dir / "file.txt").write_text("content") initial_state = watching.walk(PurePosixPath()) watching.state.root = initial_state working_state = initial_state[:] # Remove the directory shutil.rmtree(test_dir) loop = asyncio.new_event_loop() # Now try to update the path that no longer exists # This should handle gracefully without hanging start_time = time.time() try: watching.update_path(working_state, PurePosixPath("disappearing_dir"), loop) end_time = time.time() duration = end_time - start_time assert duration < 2.0, f"update_path took too long: {duration}s" # Verify the directory was removed from the state final_names = [entry.name for entry in working_state] assert "disappearing_dir" not in final_names except Exception as e: pytest.fail(f"update_path should handle missing directories gracefully: {e}") def test_threaded_watcher_simulation(setup_watcher): """Test that simulates the actual threaded watcher behavior with directory renames. This test creates a more realistic scenario where the watcher thread processes events while filesystem operations are happening. """ temp_dir = setup_watcher # Create test structure test_dirs = [] for i in range(5): dir_path = temp_dir / f"thread_test_dir_{i}" dir_path.mkdir() # Add some files for j in range(5): (dir_path / f"file_{j}.txt").write_text(f"content_{i}_{j}") test_dirs.append(dir_path) # Initialize state watching.state.root = watching.walk(PurePosixPath()) # Create an event loop for the simulation loop = asyncio.new_event_loop() # Track state changes state_changes = [] original_broadcast = watching.broadcast def tracking_broadcast(msg, loop_param): state_changes.append(msg) return original_broadcast(msg, loop_param) # Patch broadcast to track changes with patch("cista.watching.broadcast", side_effect=tracking_broadcast): # Simulate rapid directory operations start_time = time.time() for i, dir_path in enumerate(test_dirs): # Rename directory new_path = temp_dir / f"renamed_thread_test_dir_{i}" dir_path.rename(new_path) # Update the watcher state (simulating inotify events) old_name = f"thread_test_dir_{i}" new_name = f"renamed_thread_test_dir_{i}" # Simulate the race condition: rapid updates watching.update_path(watching.state.root, PurePosixPath(old_name), loop) watching.update_path(watching.state.root, PurePosixPath(new_name), loop) end_time = time.time() # Should complete without hanging duration = end_time - start_time assert duration < 10.0, f"Threaded operations took too long: {duration}s" # Verify final state is consistent final_names = [entry.name for entry in watching.state.root] # Old names should be gone for i in range(5): old_name = f"thread_test_dir_{i}" assert old_name not in final_names, f"Old directory {old_name} should be removed" # New names should be present for i in range(5): new_name = f"renamed_thread_test_dir_{i}" assert new_name in final_names, f"New directory {new_name} should be present" def test_directory_rename_with_nested_structure(setup_watcher): """Test renaming a directory that contains nested subdirectories.""" temp_dir = setup_watcher # Create a more complex nested structure main_dir = temp_dir / "main_dir" main_dir.mkdir() # Create multiple levels of nesting level1 = main_dir / "level1" level1.mkdir() (level1 / "l1_file.txt").write_text("level1 content") level2 = level1 / "level2" level2.mkdir() (level2 / "l2_file.txt").write_text("level2 content") level3 = level2 / "level3" level3.mkdir() (level3 / "l3_file.txt").write_text("level3 content") # Initial scan initial_root = watching.walk(PurePosixPath()) watching.state.root = initial_root # Rename the main directory renamed_main = temp_dir / "renamed_main_dir" main_dir.rename(renamed_main) # Update the watching system loop = asyncio.new_event_loop() watching.update_path(watching.state.root, PurePosixPath("main_dir"), loop) watching.update_path(watching.state.root, PurePosixPath("renamed_main_dir"), loop) # Verify the entire nested structure is properly updated updated_root = watching.state.root updated_names = [entry.name for entry in updated_root] assert "main_dir" not in updated_names assert "renamed_main_dir" in updated_names # Verify the nested structure is still intact renamed_structure = watching.walk(PurePosixPath("renamed_main_dir")) # Extract all the names from the renamed structure all_names = [entry.name for entry in renamed_structure] # Should contain the directory itself and all nested items assert "renamed_main_dir" in all_names assert "level1" in all_names assert "l1_file.txt" in all_names assert "level2" in all_names assert "l2_file.txt" in all_names assert "level3" in all_names assert "l3_file.txt" in all_names def test_directory_rename_format_update(setup_watcher): """Test that format_update correctly handles directory renames.""" temp_dir = setup_watcher # Create test structure subdir, _, other_dir = create_test_structure(temp_dir) # Get initial state old_root = watching.walk(PurePosixPath()) # Rename directory renamed_subdir = temp_dir / "renamed_subdir" subdir.rename(renamed_subdir) # Get new state new_root = watching.walk(PurePosixPath()) # Generate update message update_msg = watching.format_update(old_root, new_root) # The update should not be empty and should contain proper operations assert update_msg assert "update" in update_msg # Decode and verify the update contains expected operations decoded = msgspec.json.decode(update_msg, type=UpdateMessage) assert decoded.update # Should have update operations # The update should reflect the rename operation (delete old, insert new) operations = decoded.update assert len(operations) > 0 def test_concurrent_directory_operations(setup_watcher): """Test behavior when multiple directory operations happen concurrently.""" temp_dir = setup_watcher # Create multiple directories dirs_to_create = ["dir1", "dir2", "dir3"] created_dirs = [] for dir_name in dirs_to_create: dir_path = temp_dir / dir_name dir_path.mkdir() (dir_path / f"{dir_name}_file.txt").write_text(f"content for {dir_name}") created_dirs.append(dir_path) # Initial scan initial_root = watching.walk(PurePosixPath()) watching.state.root = initial_root # Rename multiple directories "simultaneously" renamed_dirs = [] for i, dir_path in enumerate(created_dirs): renamed_path = temp_dir / f"renamed_dir{i+1}" dir_path.rename(renamed_path) renamed_dirs.append(renamed_path) # Update the watching system for all changes loop = asyncio.new_event_loop() # Update for all old paths (should remove them) for dir_name in dirs_to_create: watching.update_path(watching.state.root, PurePosixPath(dir_name), loop) # Update for all new paths (should add them) for i in range(len(renamed_dirs)): watching.update_path(watching.state.root, PurePosixPath(f"renamed_dir{i+1}"), loop) # Verify final state final_root = watching.state.root final_names = [entry.name for entry in final_root] # Old names should be gone for dir_name in dirs_to_create: assert dir_name not in final_names # New names should be present for i in range(len(renamed_dirs)): assert f"renamed_dir{i+1}" in final_names @pytest.mark.slow def test_watcher_doesnt_hang_on_directory_rename(setup_watcher): """Test that the watcher doesn't hang when a directory is renamed. This test specifically addresses the reported bug where directory renames cause the system to hang and no more operations go through. """ temp_dir = setup_watcher # Create test structure subdir, _, _ = create_test_structure(temp_dir) # Initialize the watcher state watching.state.root = watching.walk(PurePosixPath()) # Mock the inotify events to simulate what happens during a rename # This simulates the problematic scenario described in the bug report with patch('time.monotonic', side_effect=[0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6]): # Simulate the rename operation renamed_subdir = temp_dir / "renamed_test_subdir" subdir.rename(renamed_subdir) # Create a simple event loop for testing loop = asyncio.new_event_loop() # This should complete without hanging start_time = time.time() # Update the path - this is where the hang might occur watching.update_path(watching.state.root, PurePosixPath("test_subdir"), loop) watching.update_path(watching.state.root, PurePosixPath("renamed_test_subdir"), loop) end_time = time.time() # The operation should complete quickly (within 5 seconds) assert end_time - start_time < 5.0, "Directory rename operation took too long, possible hang detected" # Verify the state is consistent final_names = [entry.name for entry in watching.state.root] assert "test_subdir" not in final_names assert "renamed_test_subdir" in final_names # Verify we can still perform operations after the rename # This tests that the system isn't in a broken state another_dir = temp_dir / "post_rename_dir" another_dir.mkdir() # This should work without issues watching.update_path(watching.state.root, PurePosixPath("post_rename_dir"), loop) final_names_after = [entry.name for entry in watching.state.root] assert "post_rename_dir" in final_names_after if __name__ == "__main__": pytest.main([__file__, "-v"])