1186 lines
40 KiB
Python
1186 lines
40 KiB
Python
import asyncio
|
|
import queue
|
|
import shutil
|
|
import signal
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
from pathlib import Path, PurePosixPath
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import msgspec
|
|
import pytest
|
|
|
|
from cista import config, watching
|
|
from cista.protocol import UpdateMessage
|
|
|
|
|
|
@pytest.fixture
|
|
def temp_dir():
|
|
"""Create a temporary directory for testing."""
|
|
with tempfile.TemporaryDirectory() as tmpdirname:
|
|
yield Path(tmpdirname)
|
|
|
|
|
|
@pytest.fixture
|
|
def setup_watcher(temp_dir):
|
|
"""Setup the watcher with a temporary directory."""
|
|
# Store original values
|
|
original_rootpath = watching.rootpath
|
|
original_state = watching.state
|
|
original_quit = watching.quit
|
|
|
|
# Setup test environment
|
|
config.config = config.Config(path=temp_dir, listen=":0")
|
|
watching.rootpath = temp_dir
|
|
watching.state = watching.State()
|
|
watching.quit = threading.Event()
|
|
|
|
yield temp_dir
|
|
|
|
# Cleanup
|
|
watching.quit.set()
|
|
watching.rootpath = original_rootpath
|
|
watching.state = original_state
|
|
watching.quit = original_quit
|
|
|
|
|
|
def create_test_structure(base_path: Path):
|
|
"""Create a test directory structure with subdirectories and files."""
|
|
# Create main subdirectory with files
|
|
subdir = base_path / "test_subdir"
|
|
subdir.mkdir()
|
|
|
|
# Add some files to the subdirectory
|
|
(subdir / "file1.txt").write_text("content1")
|
|
(subdir / "file2.txt").write_text("content2")
|
|
|
|
# Create a nested subdirectory
|
|
nested = subdir / "nested"
|
|
nested.mkdir()
|
|
(nested / "nested_file.txt").write_text("nested content")
|
|
|
|
# Create another top-level directory for reference
|
|
other_dir = base_path / "other_dir"
|
|
other_dir.mkdir()
|
|
(other_dir / "other_file.txt").write_text("other content")
|
|
|
|
return subdir, nested, other_dir
|
|
|
|
|
|
def test_nested_directory_rename_causes_hang(setup_watcher):
|
|
"""Test renaming deeply nested directories - this is where the hang typically occurs.
|
|
|
|
The bug manifests when renaming directories that are nested within other directories,
|
|
not just top-level directories.
|
|
"""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create a complex nested structure that mirrors real-world usage
|
|
# parent/child/grandchild/target_dir/files...
|
|
parent = temp_dir / "parent_folder"
|
|
parent.mkdir()
|
|
|
|
child = parent / "child_folder"
|
|
child.mkdir()
|
|
|
|
grandchild = child / "grandchild_folder"
|
|
grandchild.mkdir()
|
|
|
|
# This is the directory we'll rename - it's deeply nested
|
|
target_dir = grandchild / "target_to_rename"
|
|
target_dir.mkdir()
|
|
|
|
# Add files to make the directory scan more complex
|
|
for i in range(20):
|
|
(target_dir / f"file_{i:03d}.txt").write_text(f"content_{i}")
|
|
|
|
# Add another nested level inside target
|
|
deep_nested = target_dir / "even_deeper"
|
|
deep_nested.mkdir()
|
|
for i in range(10):
|
|
(deep_nested / f"deep_file_{i}.txt").write_text(f"deep_content_{i}")
|
|
|
|
# Initialize watcher state
|
|
initial_root = watching.walk(PurePosixPath())
|
|
watching.state.root = initial_root
|
|
|
|
# Verify the nested structure exists
|
|
target_path = PurePosixPath(
|
|
"parent_folder/child_folder/grandchild_folder/target_to_rename"
|
|
)
|
|
initial_begin, initial_entries = watching.treeget(initial_root, target_path)
|
|
assert initial_begin is not None, (
|
|
"Target directory should be found in initial state"
|
|
)
|
|
assert len(initial_entries) > 1, "Target directory should contain files"
|
|
|
|
# Now rename the deeply nested directory
|
|
new_target = grandchild / "renamed_target"
|
|
target_dir.rename(new_target)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
working_state = watching.state.root[:]
|
|
|
|
# This is where the hang likely occurs - updating a deeply nested path
|
|
old_nested_path = PurePosixPath(
|
|
"parent_folder/child_folder/grandchild_folder/target_to_rename"
|
|
)
|
|
new_nested_path = PurePosixPath(
|
|
"parent_folder/child_folder/grandchild_folder/renamed_target"
|
|
)
|
|
|
|
start_time = time.time()
|
|
|
|
# Update the old path (should remove it)
|
|
watching.update_path(working_state, old_nested_path, loop)
|
|
|
|
# Update the new path (should add it)
|
|
watching.update_path(working_state, new_nested_path, loop)
|
|
|
|
end_time = time.time()
|
|
|
|
# Check for hang - nested operations should still be fast
|
|
duration = end_time - start_time
|
|
assert duration < 3.0, (
|
|
f"Nested directory rename took too long: {duration}s - possible hang"
|
|
)
|
|
|
|
# Verify the old nested path is gone
|
|
old_begin, old_entries = watching.treeget(working_state, old_nested_path)
|
|
assert old_begin is None, "Old nested directory should be removed from tree"
|
|
|
|
# Verify the new nested path exists
|
|
new_begin, new_entries = watching.treeget(working_state, new_nested_path)
|
|
assert new_begin is not None, "New nested directory should exist in tree"
|
|
assert len(new_entries) > 1, "New nested directory should contain all the files"
|
|
|
|
|
|
def test_move_directory_across_nested_parents(setup_watcher):
|
|
"""Test moving a directory from one nested location to another - high hang risk scenario."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create source nested structure
|
|
source_parent = temp_dir / "source_area"
|
|
source_parent.mkdir()
|
|
source_child = source_parent / "source_child"
|
|
source_child.mkdir()
|
|
|
|
# Create the directory to move
|
|
movable_dir = source_child / "movable_directory"
|
|
movable_dir.mkdir()
|
|
|
|
# Add content to make it more complex
|
|
for i in range(15):
|
|
(movable_dir / f"file_{i}.txt").write_text(f"movable_content_{i}")
|
|
|
|
# Create a subdirectory within the movable directory
|
|
sub_movable = movable_dir / "sub_directory"
|
|
sub_movable.mkdir()
|
|
for i in range(5):
|
|
(sub_movable / f"sub_file_{i}.txt").write_text(f"sub_content_{i}")
|
|
|
|
# Create destination nested structure
|
|
dest_parent = temp_dir / "destination_area"
|
|
dest_parent.mkdir()
|
|
dest_child = dest_parent / "dest_child"
|
|
dest_child.mkdir()
|
|
dest_grandchild = dest_child / "dest_grandchild"
|
|
dest_grandchild.mkdir()
|
|
|
|
# Initialize state
|
|
watching.state.root = watching.walk(PurePosixPath())
|
|
working_state = watching.state.root[:]
|
|
|
|
# Move the directory to the deeply nested destination
|
|
dest_movable = dest_grandchild / "moved_directory"
|
|
movable_dir.rename(dest_movable)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# These paths represent the complex nested move operation
|
|
old_path = PurePosixPath("source_area/source_child/movable_directory")
|
|
new_path = PurePosixPath(
|
|
"destination_area/dest_child/dest_grandchild/moved_directory"
|
|
)
|
|
|
|
start_time = time.time()
|
|
|
|
# This sequence is where hangs typically occur with cross-directory moves
|
|
try:
|
|
# Remove from old location
|
|
watching.update_path(working_state, old_path, loop)
|
|
|
|
# Add to new location
|
|
watching.update_path(working_state, new_path, loop)
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"Nested directory move failed: {e}")
|
|
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
# Should complete without hanging
|
|
assert duration < 5.0, f"Cross-nested move took too long: {duration}s"
|
|
|
|
# Verify old location is empty
|
|
old_begin, old_entries = watching.treeget(working_state, old_path)
|
|
assert old_begin is None, "Directory should be removed from old nested location"
|
|
|
|
# Verify new location has the directory
|
|
new_begin, new_entries = watching.treeget(working_state, new_path)
|
|
assert new_begin is not None, "Directory should exist in new nested location"
|
|
assert len(new_entries) > 1, "Moved directory should retain all its contents"
|
|
|
|
|
|
def test_rapid_nested_directory_operations_cause_corruption(setup_watcher):
|
|
"""Test rapid operations on nested directories that can cause state corruption."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create multiple nested structures
|
|
structures = []
|
|
for i in range(3):
|
|
level1 = temp_dir / f"level1_{i}"
|
|
level1.mkdir()
|
|
level2 = level1 / f"level2_{i}"
|
|
level2.mkdir()
|
|
level3 = level2 / f"level3_{i}"
|
|
level3.mkdir()
|
|
target = level3 / f"target_{i}"
|
|
target.mkdir()
|
|
|
|
# Add files
|
|
for j in range(10):
|
|
(target / f"file_{j}.txt").write_text(f"content_{i}_{j}")
|
|
|
|
structures.append((level1, level2, level3, target))
|
|
|
|
# Initialize state
|
|
watching.state.root = watching.walk(PurePosixPath())
|
|
working_state = watching.state.root[:]
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# Perform rapid nested operations that can cause race conditions
|
|
operations = []
|
|
|
|
for i, (level1, level2, level3, target) in enumerate(structures):
|
|
# Rename the deeply nested target
|
|
new_target = level3 / f"renamed_target_{i}"
|
|
target.rename(new_target)
|
|
|
|
old_path = PurePosixPath(f"level1_{i}/level2_{i}/level3_{i}/target_{i}")
|
|
new_path = PurePosixPath(f"level1_{i}/level2_{i}/level3_{i}/renamed_target_{i}")
|
|
operations.append((old_path, new_path))
|
|
|
|
start_time = time.time()
|
|
|
|
# Process all operations rapidly - this can cause state corruption/hangs
|
|
for old_path, new_path in operations:
|
|
try:
|
|
watching.update_path(working_state, old_path, loop)
|
|
watching.update_path(working_state, new_path, loop)
|
|
except Exception as e:
|
|
pytest.fail(
|
|
f"Rapid nested operations failed for {old_path} -> {new_path}: {e}"
|
|
)
|
|
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
# Should complete without hanging even with rapid operations
|
|
assert duration < 10.0, f"Rapid nested operations took too long: {duration}s"
|
|
|
|
# Verify final state consistency
|
|
for i, (old_path, new_path) in enumerate(operations):
|
|
# Old paths should be gone
|
|
old_begin, old_entries = watching.treeget(working_state, old_path)
|
|
assert old_begin is None, f"Old path {old_path} should be removed"
|
|
|
|
# New paths should exist
|
|
new_begin, new_entries = watching.treeget(working_state, new_path)
|
|
assert new_begin is not None, f"New path {new_path} should exist"
|
|
|
|
|
|
def test_nested_directory_treeget_corruption(setup_watcher):
|
|
"""Test that treeget function handles nested path operations correctly without corruption."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create a complex tree structure
|
|
root_dirs = []
|
|
for i in range(3):
|
|
root_dir = temp_dir / f"root_{i}"
|
|
root_dir.mkdir()
|
|
|
|
for j in range(2):
|
|
mid_dir = root_dir / f"mid_{j}"
|
|
mid_dir.mkdir()
|
|
|
|
for k in range(2):
|
|
leaf_dir = mid_dir / f"leaf_{k}"
|
|
leaf_dir.mkdir()
|
|
|
|
# Add files to leaf directories
|
|
for l in range(5):
|
|
(leaf_dir / f"file_{l}.txt").write_text(f"content_{i}_{j}_{k}_{l}")
|
|
|
|
root_dirs.append(root_dir)
|
|
|
|
# Initialize state
|
|
initial_root = watching.walk(PurePosixPath())
|
|
watching.state.root = initial_root
|
|
|
|
# Test treeget with various nested paths
|
|
test_paths = [
|
|
PurePosixPath("root_0"),
|
|
PurePosixPath("root_0/mid_0"),
|
|
PurePosixPath("root_0/mid_0/leaf_0"),
|
|
PurePosixPath("root_1/mid_1/leaf_1"),
|
|
PurePosixPath("root_2/mid_0/leaf_1"),
|
|
]
|
|
|
|
# Verify treeget works correctly for all paths
|
|
for path in test_paths:
|
|
begin, entries = watching.treeget(initial_root, path)
|
|
assert begin is not None, f"treeget should find existing path: {path}"
|
|
assert len(entries) >= 1, f"treeget should return entries for: {path}"
|
|
|
|
# Now rename a nested directory and test treeget consistency
|
|
old_leaf = temp_dir / "root_0" / "mid_0" / "leaf_0"
|
|
new_leaf = temp_dir / "root_0" / "mid_0" / "renamed_leaf"
|
|
old_leaf.rename(new_leaf)
|
|
|
|
# Update the state
|
|
loop = asyncio.new_event_loop()
|
|
working_state = initial_root[:]
|
|
|
|
old_nested_path = PurePosixPath("root_0/mid_0/leaf_0")
|
|
new_nested_path = PurePosixPath("root_0/mid_0/renamed_leaf")
|
|
|
|
# Update paths
|
|
watching.update_path(working_state, old_nested_path, loop)
|
|
watching.update_path(working_state, new_nested_path, loop)
|
|
|
|
# Verify treeget consistency after the update
|
|
old_begin, old_entries = watching.treeget(working_state, old_nested_path)
|
|
assert old_begin is None, "Old nested path should not be found after rename"
|
|
|
|
new_begin, new_entries = watching.treeget(working_state, new_nested_path)
|
|
assert new_begin is not None, "New nested path should be found after rename"
|
|
assert len(new_entries) >= 1, "New nested path should have entries"
|
|
|
|
# Verify that other paths are still accessible (no corruption)
|
|
for path in [
|
|
PurePosixPath("root_1/mid_1/leaf_1"),
|
|
PurePosixPath("root_2/mid_0/leaf_1"),
|
|
]:
|
|
begin, entries = watching.treeget(working_state, path)
|
|
assert begin is not None, f"Other paths should remain accessible: {path}"
|
|
|
|
|
|
def test_format_update_infinite_loop_with_complex_nested_changes(setup_watcher):
|
|
"""Create a scenario that specifically triggers infinite loops in format_update.
|
|
|
|
The hang often occurs in format_update when the diff algorithm gets confused
|
|
by complex nested directory moves.
|
|
"""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create a complex scenario that can confuse the diff algorithm
|
|
# Multiple directories with similar names and nested structures
|
|
dirs_data = []
|
|
|
|
for i in range(4):
|
|
# Create main directory
|
|
main_dir = temp_dir / f"main_{i}"
|
|
main_dir.mkdir()
|
|
|
|
# Create subdirectories with similar patterns
|
|
sub_dir = main_dir / "common_subdir_name"
|
|
sub_dir.mkdir()
|
|
|
|
# Create files with varying content
|
|
for j in range(15):
|
|
(sub_dir / f"file_{j:02d}.txt").write_text(f"main_{i}_content_{j}")
|
|
|
|
# Add another level of nesting
|
|
nested = sub_dir / "nested_level"
|
|
nested.mkdir()
|
|
for j in range(8):
|
|
(nested / f"nested_{j}.txt").write_text(f"nested_{i}_{j}")
|
|
|
|
dirs_data.append((main_dir, sub_dir, nested))
|
|
|
|
# Get initial state
|
|
old_state = watching.walk(PurePosixPath())
|
|
|
|
# Perform complex renames that can confuse the diff algorithm
|
|
# Rename all subdirectories to have even more similar names
|
|
for i, (main_dir, sub_dir, nested) in enumerate(dirs_data):
|
|
# Rename the subdirectory to a name that's very similar to others
|
|
new_sub_name = f"renamed_common_subdir_{i}"
|
|
new_sub_dir = main_dir / new_sub_name
|
|
sub_dir.rename(new_sub_dir)
|
|
|
|
# Also rename some files to create more confusion
|
|
for j in range(0, 10, 2): # Rename every other file
|
|
old_file = new_sub_dir / f"file_{j:02d}.txt"
|
|
new_file = new_sub_dir / f"renamed_file_{j:02d}.txt"
|
|
if old_file.exists():
|
|
old_file.rename(new_file)
|
|
|
|
# Get new state
|
|
new_state = watching.walk(PurePosixPath())
|
|
|
|
# This is the critical test - format_update with complex nested changes
|
|
# that have caused infinite loops in the past
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Set a more aggressive timeout
|
|
def timeout_handler(signum, frame):
|
|
raise TimeoutError("format_update appears to be hanging")
|
|
|
|
# Set a 10-second timeout
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
signal.alarm(10)
|
|
|
|
try:
|
|
update_msg = watching.format_update(old_state, new_state)
|
|
signal.alarm(0) # Cancel the alarm
|
|
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
# Even complex diffs should complete quickly
|
|
assert duration < 8.0, (
|
|
f"format_update took {duration}s - possible infinite loop"
|
|
)
|
|
|
|
# Verify the result is valid
|
|
assert update_msg, "format_update should return a message"
|
|
decoded = msgspec.json.decode(update_msg, type=UpdateMessage)
|
|
assert decoded.update, "Update should contain operations"
|
|
|
|
except TimeoutError:
|
|
signal.alarm(0)
|
|
pytest.fail(
|
|
"format_update hung/infinite loop detected with complex nested changes"
|
|
)
|
|
|
|
except Exception as e:
|
|
signal.alarm(0)
|
|
pytest.fail(f"format_update failed: {e}")
|
|
|
|
|
|
def test_update_path_with_corrupted_tree_state(setup_watcher):
|
|
"""Test update_path when the tree state becomes corrupted by rapid changes."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create a nested structure
|
|
parent = temp_dir / "parent"
|
|
parent.mkdir()
|
|
child = parent / "child"
|
|
child.mkdir()
|
|
target = child / "target_dir"
|
|
target.mkdir()
|
|
|
|
# Add many files to make operations slower
|
|
for i in range(30):
|
|
(target / f"file_{i:03d}.txt").write_text(f"content_{i}")
|
|
|
|
# Add nested subdirectories
|
|
for i in range(3):
|
|
subdir = target / f"subdir_{i}"
|
|
subdir.mkdir()
|
|
for j in range(10):
|
|
(subdir / f"sub_file_{j}.txt").write_text(f"sub_content_{i}_{j}")
|
|
|
|
# Initialize state
|
|
watching.state.root = watching.walk(PurePosixPath())
|
|
|
|
# Create a working copy that we'll manually corrupt to simulate race conditions
|
|
working_state = watching.state.root[:]
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# Rename the directory
|
|
new_target = child / "renamed_target"
|
|
target.rename(new_target)
|
|
|
|
# Simulate the race condition by manually corrupting the tree state
|
|
# This mimics what happens when inotify events arrive out of order
|
|
|
|
# First, try to update a path that should exist
|
|
old_path = PurePosixPath("parent/child/target_dir")
|
|
|
|
# Manually remove an entry to simulate corruption
|
|
if len(working_state) > 5:
|
|
# Remove a random entry to corrupt the tree structure
|
|
del working_state[3]
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# This should handle corrupted state gracefully
|
|
watching.update_path(working_state, old_path, loop)
|
|
|
|
# Now add the new path
|
|
new_path = PurePosixPath("parent/child/renamed_target")
|
|
watching.update_path(working_state, new_path, loop)
|
|
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
# Should complete without hanging even with corrupted state
|
|
assert duration < 5.0, f"update_path with corrupted state took {duration}s"
|
|
|
|
except Exception as e:
|
|
# Some exceptions are expected with corrupted state, but shouldn't hang
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
assert duration < 5.0, f"update_path hung even when failing: {duration}s"
|
|
|
|
|
|
def test_simulate_real_inotify_event_sequence(setup_watcher):
|
|
"""Simulate the exact inotify event sequence that causes hangs."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create the exact scenario from real usage that triggers the bug
|
|
project_dir = temp_dir / "project"
|
|
project_dir.mkdir()
|
|
|
|
src_dir = project_dir / "src"
|
|
src_dir.mkdir()
|
|
|
|
components_dir = src_dir / "components"
|
|
components_dir.mkdir()
|
|
|
|
# This is the directory that will be renamed
|
|
old_component = components_dir / "OldComponent"
|
|
old_component.mkdir()
|
|
|
|
# Add files that exist in real projects
|
|
for filename in ["index.tsx", "styles.css", "types.ts", "utils.ts"]:
|
|
(old_component / filename).write_text(f"// {filename} content")
|
|
|
|
# Add a subdirectory with more files
|
|
sub_dir = old_component / "subcomponents"
|
|
sub_dir.mkdir()
|
|
for i in range(5):
|
|
(sub_dir / f"SubComponent{i}.tsx").write_text(f"// SubComponent{i}")
|
|
|
|
# Initialize state
|
|
watching.state.root = watching.walk(PurePosixPath())
|
|
working_state = watching.state.root[:]
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# This is the exact operation that causes hangs in real usage
|
|
new_component = components_dir / "NewComponent"
|
|
old_component.rename(new_component)
|
|
|
|
# Simulate the inotify event sequence that causes problems
|
|
# IN_MOVED_FROM event for the old directory
|
|
old_path = PurePosixPath("project/src/components/OldComponent")
|
|
|
|
# IN_MOVED_TO event for the new directory
|
|
new_path = PurePosixPath("project/src/components/NewComponent")
|
|
|
|
# Track how long the operations take
|
|
start_time = time.time()
|
|
|
|
# Set up timeout detection
|
|
def timeout_handler(signum, frame):
|
|
raise TimeoutError("Simulated inotify sequence hung")
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
signal.alarm(15) # 15 second timeout
|
|
|
|
try:
|
|
# This sequence is where the hang occurs in real usage
|
|
watching.update_path(working_state, old_path, loop)
|
|
watching.update_path(working_state, new_path, loop)
|
|
|
|
# If we get here without hanging, cancel the alarm
|
|
signal.alarm(0)
|
|
|
|
end_time = time.time()
|
|
duration = end_time - start_time
|
|
|
|
# Real inotify operations should be fast
|
|
assert duration < 10.0, f"Simulated inotify sequence took {duration}s"
|
|
|
|
# Verify the final state is correct
|
|
old_begin, old_entries = watching.treeget(working_state, old_path)
|
|
assert old_begin is None, "Old component path should be removed"
|
|
|
|
new_begin, new_entries = watching.treeget(working_state, new_path)
|
|
assert new_begin is not None, "New component path should exist"
|
|
assert len(new_entries) > 1, "New component should contain all files"
|
|
|
|
except TimeoutError:
|
|
signal.alarm(0)
|
|
pytest.fail("HANG DETECTED: Simulated inotify event sequence hung!")
|
|
|
|
except Exception as e:
|
|
signal.alarm(0)
|
|
pytest.fail(f"Simulated inotify sequence failed: {e}")
|
|
|
|
finally:
|
|
signal.alarm(0) # Ensure alarm is cancelled
|
|
"""Test format_update with nested directory changes that could cause infinite loops."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create complex nested structure that has caused issues
|
|
complex_structure = temp_dir / "complex"
|
|
complex_structure.mkdir()
|
|
|
|
# Create multiple levels with similar names (potential for confusion)
|
|
level_a = complex_structure / "level_a"
|
|
level_a.mkdir()
|
|
sublevel_a = level_a / "sublevel"
|
|
sublevel_a.mkdir()
|
|
|
|
level_b = complex_structure / "level_b"
|
|
level_b.mkdir()
|
|
sublevel_b = level_b / "sublevel"
|
|
sublevel_b.mkdir()
|
|
|
|
# Add files to each sublevel
|
|
for i in range(10):
|
|
(sublevel_a / f"file_a_{i}.txt").write_text(f"content_a_{i}")
|
|
(sublevel_b / f"file_b_{i}.txt").write_text(f"content_b_{i}")
|
|
|
|
# Get initial state
|
|
old_state = watching.walk(PurePosixPath())
|
|
|
|
# Perform nested directory renames that could confuse the diff algorithm
|
|
renamed_sublevel_a = level_a / "renamed_sublevel"
|
|
sublevel_a.rename(renamed_sublevel_a)
|
|
|
|
renamed_sublevel_b = level_b / "also_renamed_sublevel"
|
|
sublevel_b.rename(renamed_sublevel_b)
|
|
|
|
# Get new state
|
|
new_state = watching.walk(PurePosixPath())
|
|
|
|
# This is where infinite loops or hangs can occur in format_update
|
|
start_time = time.time()
|
|
|
|
try:
|
|
update_msg = watching.format_update(old_state, new_state)
|
|
end_time = time.time()
|
|
|
|
duration = end_time - start_time
|
|
assert duration < 5.0, (
|
|
f"format_update took too long with nested changes: {duration}s"
|
|
)
|
|
|
|
# Verify the update message is valid
|
|
assert update_msg, "format_update should return valid message"
|
|
decoded = msgspec.json.decode(update_msg, type=UpdateMessage)
|
|
assert decoded.update, "Update should contain operations"
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"format_update failed or hung with nested directory changes: {e}")
|
|
"""Test that reproduces the hang when directory rename events race with updates.
|
|
|
|
This test simulates the exact conditions that cause the hang:
|
|
1. Create a directory with files
|
|
2. Start monitoring it
|
|
3. Rename the directory while the watcher is processing events
|
|
4. This should cause a hang where old directory names are preserved
|
|
"""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create test structure with many files to increase chance of race conditions
|
|
subdir = temp_dir / "original_dir"
|
|
subdir.mkdir()
|
|
|
|
# Create many files to make the directory scan take longer
|
|
for i in range(50):
|
|
(subdir / f"file_{i:03d}.txt").write_text(f"content_{i}")
|
|
|
|
# Create nested directories
|
|
nested = subdir / "nested"
|
|
nested.mkdir()
|
|
for i in range(20):
|
|
(nested / f"nested_file_{i:03d}.txt").write_text(f"nested_content_{i}")
|
|
|
|
# Initial scan to populate the state
|
|
initial_root = watching.walk(PurePosixPath())
|
|
watching.state.root = initial_root
|
|
|
|
# Verify initial structure
|
|
initial_names = [entry.name for entry in initial_root]
|
|
assert "original_dir" in initial_names
|
|
|
|
# Create a mock event loop for testing
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# Simulate the problematic sequence:
|
|
# 1. Start processing the original directory
|
|
# 2. Rename it while processing
|
|
# 3. Try to update both old and new paths
|
|
|
|
# Start by getting the initial state
|
|
original_rootmod = watching.state.root[:]
|
|
|
|
# Rename the directory
|
|
renamed_dir = temp_dir / "renamed_dir"
|
|
subdir.rename(renamed_dir)
|
|
|
|
# Now simulate what happens in the inotify watcher:
|
|
# Multiple rapid updates that can cause race conditions
|
|
|
|
# First, try to update the old path (should remove it)
|
|
watching.update_path(original_rootmod, PurePosixPath("original_dir"), loop)
|
|
|
|
# Then try to update the new path (should add it)
|
|
watching.update_path(original_rootmod, PurePosixPath("renamed_dir"), loop)
|
|
|
|
# Check if the state is consistent
|
|
final_names = [entry.name for entry in original_rootmod]
|
|
|
|
# The bug would manifest as:
|
|
# 1. Old directory name still present (should be gone)
|
|
# 2. New directory name missing (should be there)
|
|
# 3. Inconsistent state causing hangs
|
|
|
|
# This is the expected correct behavior
|
|
assert "original_dir" not in final_names, "Old directory name should be removed"
|
|
assert "renamed_dir" in final_names, "New directory name should be present"
|
|
|
|
# Additional check: verify we can still walk the renamed directory
|
|
renamed_walk = watching.walk(PurePosixPath("renamed_dir"))
|
|
assert len(renamed_walk) > 1, "Should be able to walk renamed directory"
|
|
|
|
|
|
def test_concurrent_inotify_events_simulation(setup_watcher):
|
|
"""Simulate concurrent inotify events that can cause the hanging bug."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create a complex directory structure
|
|
dirs = ["dir_a", "dir_b", "dir_c"]
|
|
created_dirs = []
|
|
|
|
for dir_name in dirs:
|
|
dir_path = temp_dir / dir_name
|
|
dir_path.mkdir()
|
|
# Add files to each directory
|
|
for i in range(10):
|
|
(dir_path / f"file_{i}.txt").write_text(f"content in {dir_name}")
|
|
created_dirs.append(dir_path)
|
|
|
|
# Initial state
|
|
watching.state.root = watching.walk(PurePosixPath())
|
|
original_state = watching.state.root[:]
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# Simulate rapid concurrent operations that happen in real usage
|
|
# This mimics what happens when multiple filesystem events arrive rapidly
|
|
|
|
# Rename all directories simultaneously (as might happen with mv commands)
|
|
renamed_paths = []
|
|
for i, dir_path in enumerate(created_dirs):
|
|
new_path = temp_dir / f"renamed_{dirs[i]}"
|
|
dir_path.rename(new_path)
|
|
renamed_paths.append(new_path)
|
|
|
|
# Now simulate the inotify event processing that causes issues
|
|
# In the real code, these updates happen in rapid succession
|
|
# and can cause race conditions
|
|
|
|
working_state = original_state[:]
|
|
|
|
# Process removal events (IN_MOVED_FROM)
|
|
for dir_name in dirs:
|
|
try:
|
|
watching.update_path(working_state, PurePosixPath(dir_name), loop)
|
|
except Exception as e:
|
|
# The bug might manifest as exceptions during updates
|
|
pytest.fail(f"Update path failed for {dir_name}: {e}")
|
|
|
|
# Process addition events (IN_MOVED_TO)
|
|
for i, dir_name in enumerate(dirs):
|
|
try:
|
|
new_name = f"renamed_{dir_name}"
|
|
watching.update_path(working_state, PurePosixPath(new_name), loop)
|
|
except Exception as e:
|
|
pytest.fail(f"Update path failed for {new_name}: {e}")
|
|
|
|
# Verify final state is consistent
|
|
final_names = [entry.name for entry in working_state]
|
|
|
|
# Check that old names are gone
|
|
for dir_name in dirs:
|
|
assert dir_name not in final_names, (
|
|
f"Old directory {dir_name} should be removed"
|
|
)
|
|
|
|
# Check that new names are present
|
|
for i, dir_name in enumerate(dirs):
|
|
new_name = f"renamed_{dir_name}"
|
|
assert new_name in final_names, f"New directory {new_name} should be present"
|
|
|
|
|
|
def test_format_update_with_rapid_changes(setup_watcher):
|
|
"""Test format_update with rapid directory changes that can cause hangs."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create initial structure
|
|
initial_dirs = ["test1", "test2", "test3"]
|
|
for dir_name in initial_dirs:
|
|
dir_path = temp_dir / dir_name
|
|
dir_path.mkdir()
|
|
(dir_path / "file.txt").write_text("test content")
|
|
|
|
# Get initial state
|
|
old_state = watching.walk(PurePosixPath())
|
|
|
|
# Perform rapid renames
|
|
for i, dir_name in enumerate(initial_dirs):
|
|
old_path = temp_dir / dir_name
|
|
new_path = temp_dir / f"renamed_{dir_name}"
|
|
old_path.rename(new_path)
|
|
|
|
# Get new state
|
|
new_state = watching.walk(PurePosixPath())
|
|
|
|
# This is where the hang might occur - in format_update
|
|
start_time = time.time()
|
|
try:
|
|
update_msg = watching.format_update(old_state, new_state)
|
|
end_time = time.time()
|
|
|
|
# Should complete quickly
|
|
duration = end_time - start_time
|
|
assert duration < 5.0, f"format_update took too long: {duration}s"
|
|
|
|
# Decode the update to verify it's valid
|
|
decoded = msgspec.json.decode(update_msg, type=UpdateMessage)
|
|
assert decoded.update, "Update message should contain operations"
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"format_update failed or hung: {e}")
|
|
|
|
|
|
def test_update_path_with_missing_directory(setup_watcher):
|
|
"""Test update_path when called on a directory that no longer exists.
|
|
|
|
This simulates the race condition where update_path is called for a path
|
|
that was just moved/deleted.
|
|
"""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create and populate initial state
|
|
test_dir = temp_dir / "disappearing_dir"
|
|
test_dir.mkdir()
|
|
(test_dir / "file.txt").write_text("content")
|
|
|
|
initial_state = watching.walk(PurePosixPath())
|
|
watching.state.root = initial_state
|
|
working_state = initial_state[:]
|
|
|
|
# Remove the directory
|
|
shutil.rmtree(test_dir)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# Now try to update the path that no longer exists
|
|
# This should handle gracefully without hanging
|
|
start_time = time.time()
|
|
try:
|
|
watching.update_path(working_state, PurePosixPath("disappearing_dir"), loop)
|
|
end_time = time.time()
|
|
|
|
duration = end_time - start_time
|
|
assert duration < 2.0, f"update_path took too long: {duration}s"
|
|
|
|
# Verify the directory was removed from the state
|
|
final_names = [entry.name for entry in working_state]
|
|
assert "disappearing_dir" not in final_names
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"update_path should handle missing directories gracefully: {e}")
|
|
|
|
|
|
def test_threaded_watcher_simulation(setup_watcher):
|
|
"""Test that simulates the actual threaded watcher behavior with directory renames.
|
|
|
|
This test creates a more realistic scenario where the watcher thread
|
|
processes events while filesystem operations are happening.
|
|
"""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create test structure
|
|
test_dirs = []
|
|
for i in range(5):
|
|
dir_path = temp_dir / f"thread_test_dir_{i}"
|
|
dir_path.mkdir()
|
|
# Add some files
|
|
for j in range(5):
|
|
(dir_path / f"file_{j}.txt").write_text(f"content_{i}_{j}")
|
|
test_dirs.append(dir_path)
|
|
|
|
# Initialize state
|
|
watching.state.root = watching.walk(PurePosixPath())
|
|
|
|
# Create an event loop for the simulation
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# Track state changes
|
|
state_changes = []
|
|
original_broadcast = watching.broadcast
|
|
|
|
def tracking_broadcast(msg, loop_param):
|
|
state_changes.append(msg)
|
|
return original_broadcast(msg, loop_param)
|
|
|
|
# Patch broadcast to track changes
|
|
with patch("cista.watching.broadcast", side_effect=tracking_broadcast):
|
|
# Simulate rapid directory operations
|
|
start_time = time.time()
|
|
|
|
for i, dir_path in enumerate(test_dirs):
|
|
# Rename directory
|
|
new_path = temp_dir / f"renamed_thread_test_dir_{i}"
|
|
dir_path.rename(new_path)
|
|
|
|
# Update the watcher state (simulating inotify events)
|
|
old_name = f"thread_test_dir_{i}"
|
|
new_name = f"renamed_thread_test_dir_{i}"
|
|
|
|
# Simulate the race condition: rapid updates
|
|
watching.update_path(watching.state.root, PurePosixPath(old_name), loop)
|
|
watching.update_path(watching.state.root, PurePosixPath(new_name), loop)
|
|
|
|
end_time = time.time()
|
|
|
|
# Should complete without hanging
|
|
duration = end_time - start_time
|
|
assert duration < 10.0, f"Threaded operations took too long: {duration}s"
|
|
|
|
# Verify final state is consistent
|
|
final_names = [entry.name for entry in watching.state.root]
|
|
|
|
# Old names should be gone
|
|
for i in range(5):
|
|
old_name = f"thread_test_dir_{i}"
|
|
assert old_name not in final_names, (
|
|
f"Old directory {old_name} should be removed"
|
|
)
|
|
|
|
# New names should be present
|
|
for i in range(5):
|
|
new_name = f"renamed_thread_test_dir_{i}"
|
|
assert new_name in final_names, (
|
|
f"New directory {new_name} should be present"
|
|
)
|
|
|
|
|
|
def test_directory_rename_with_nested_structure(setup_watcher):
|
|
"""Test renaming a directory that contains nested subdirectories."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create a more complex nested structure
|
|
main_dir = temp_dir / "main_dir"
|
|
main_dir.mkdir()
|
|
|
|
# Create multiple levels of nesting
|
|
level1 = main_dir / "level1"
|
|
level1.mkdir()
|
|
(level1 / "l1_file.txt").write_text("level1 content")
|
|
|
|
level2 = level1 / "level2"
|
|
level2.mkdir()
|
|
(level2 / "l2_file.txt").write_text("level2 content")
|
|
|
|
level3 = level2 / "level3"
|
|
level3.mkdir()
|
|
(level3 / "l3_file.txt").write_text("level3 content")
|
|
|
|
# Initial scan
|
|
initial_root = watching.walk(PurePosixPath())
|
|
watching.state.root = initial_root
|
|
|
|
# Rename the main directory
|
|
renamed_main = temp_dir / "renamed_main_dir"
|
|
main_dir.rename(renamed_main)
|
|
|
|
# Update the watching system
|
|
loop = asyncio.new_event_loop()
|
|
watching.update_path(watching.state.root, PurePosixPath("main_dir"), loop)
|
|
watching.update_path(watching.state.root, PurePosixPath("renamed_main_dir"), loop)
|
|
|
|
# Verify the entire nested structure is properly updated
|
|
updated_root = watching.state.root
|
|
updated_names = [entry.name for entry in updated_root]
|
|
|
|
assert "main_dir" not in updated_names
|
|
assert "renamed_main_dir" in updated_names
|
|
|
|
# Verify the nested structure is still intact
|
|
renamed_structure = watching.walk(PurePosixPath("renamed_main_dir"))
|
|
|
|
# Extract all the names from the renamed structure
|
|
all_names = [entry.name for entry in renamed_structure]
|
|
|
|
# Should contain the directory itself and all nested items
|
|
assert "renamed_main_dir" in all_names
|
|
assert "level1" in all_names
|
|
assert "l1_file.txt" in all_names
|
|
assert "level2" in all_names
|
|
assert "l2_file.txt" in all_names
|
|
assert "level3" in all_names
|
|
assert "l3_file.txt" in all_names
|
|
|
|
|
|
def test_directory_rename_format_update(setup_watcher):
|
|
"""Test that format_update correctly handles directory renames."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create test structure
|
|
subdir, _, other_dir = create_test_structure(temp_dir)
|
|
|
|
# Get initial state
|
|
old_root = watching.walk(PurePosixPath())
|
|
|
|
# Rename directory
|
|
renamed_subdir = temp_dir / "renamed_subdir"
|
|
subdir.rename(renamed_subdir)
|
|
|
|
# Get new state
|
|
new_root = watching.walk(PurePosixPath())
|
|
|
|
# Generate update message
|
|
update_msg = watching.format_update(old_root, new_root)
|
|
|
|
# The update should not be empty and should contain proper operations
|
|
assert update_msg
|
|
assert "update" in update_msg
|
|
|
|
# Decode and verify the update contains expected operations
|
|
decoded = msgspec.json.decode(update_msg, type=UpdateMessage)
|
|
assert decoded.update # Should have update operations
|
|
|
|
# The update should reflect the rename operation (delete old, insert new)
|
|
operations = decoded.update
|
|
assert len(operations) > 0
|
|
|
|
|
|
def test_concurrent_directory_operations(setup_watcher):
|
|
"""Test behavior when multiple directory operations happen concurrently."""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create multiple directories
|
|
dirs_to_create = ["dir1", "dir2", "dir3"]
|
|
created_dirs = []
|
|
|
|
for dir_name in dirs_to_create:
|
|
dir_path = temp_dir / dir_name
|
|
dir_path.mkdir()
|
|
(dir_path / f"{dir_name}_file.txt").write_text(f"content for {dir_name}")
|
|
created_dirs.append(dir_path)
|
|
|
|
# Initial scan
|
|
initial_root = watching.walk(PurePosixPath())
|
|
watching.state.root = initial_root
|
|
|
|
# Rename multiple directories "simultaneously"
|
|
renamed_dirs = []
|
|
for i, dir_path in enumerate(created_dirs):
|
|
renamed_path = temp_dir / f"renamed_dir{i + 1}"
|
|
dir_path.rename(renamed_path)
|
|
renamed_dirs.append(renamed_path)
|
|
|
|
# Update the watching system for all changes
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# Update for all old paths (should remove them)
|
|
for dir_name in dirs_to_create:
|
|
watching.update_path(watching.state.root, PurePosixPath(dir_name), loop)
|
|
|
|
# Update for all new paths (should add them)
|
|
for i in range(len(renamed_dirs)):
|
|
watching.update_path(
|
|
watching.state.root, PurePosixPath(f"renamed_dir{i + 1}"), loop
|
|
)
|
|
|
|
# Verify final state
|
|
final_root = watching.state.root
|
|
final_names = [entry.name for entry in final_root]
|
|
|
|
# Old names should be gone
|
|
for dir_name in dirs_to_create:
|
|
assert dir_name not in final_names
|
|
|
|
# New names should be present
|
|
for i in range(len(renamed_dirs)):
|
|
assert f"renamed_dir{i + 1}" in final_names
|
|
|
|
|
|
@pytest.mark.slow
|
|
def test_watcher_doesnt_hang_on_directory_rename(setup_watcher):
|
|
"""Test that the watcher doesn't hang when a directory is renamed.
|
|
|
|
This test specifically addresses the reported bug where directory renames
|
|
cause the system to hang and no more operations go through.
|
|
"""
|
|
temp_dir = setup_watcher
|
|
|
|
# Create test structure
|
|
subdir, _, _ = create_test_structure(temp_dir)
|
|
|
|
# Initialize the watcher state
|
|
watching.state.root = watching.walk(PurePosixPath())
|
|
|
|
# Mock the inotify events to simulate what happens during a rename
|
|
# This simulates the problematic scenario described in the bug report
|
|
with patch("time.monotonic", side_effect=[0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6]):
|
|
# Simulate the rename operation
|
|
renamed_subdir = temp_dir / "renamed_test_subdir"
|
|
subdir.rename(renamed_subdir)
|
|
|
|
# Create a simple event loop for testing
|
|
loop = asyncio.new_event_loop()
|
|
|
|
# This should complete without hanging
|
|
start_time = time.time()
|
|
|
|
# Update the path - this is where the hang might occur
|
|
watching.update_path(watching.state.root, PurePosixPath("test_subdir"), loop)
|
|
watching.update_path(
|
|
watching.state.root, PurePosixPath("renamed_test_subdir"), loop
|
|
)
|
|
|
|
end_time = time.time()
|
|
|
|
# The operation should complete quickly (within 5 seconds)
|
|
assert end_time - start_time < 5.0, (
|
|
"Directory rename operation took too long, possible hang detected"
|
|
)
|
|
|
|
# Verify the state is consistent
|
|
final_names = [entry.name for entry in watching.state.root]
|
|
assert "test_subdir" not in final_names
|
|
assert "renamed_test_subdir" in final_names
|
|
|
|
# Verify we can still perform operations after the rename
|
|
# This tests that the system isn't in a broken state
|
|
another_dir = temp_dir / "post_rename_dir"
|
|
another_dir.mkdir()
|
|
|
|
# This should work without issues
|
|
watching.update_path(
|
|
watching.state.root, PurePosixPath("post_rename_dir"), loop
|
|
)
|
|
final_names_after = [entry.name for entry in watching.state.root]
|
|
assert "post_rename_dir" in final_names_after
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|