diff --git a/cista/protocol.py b/cista/protocol.py index 86617f6..ff76abe 100644 --- a/cista/protocol.py +++ b/cista/protocol.py @@ -120,6 +120,9 @@ class FileEntry(msgspec.Struct, array_like=True): size: int isfile: int + def __repr__(self): + return self.key or "FileEntry()" + class Update(msgspec.Struct, array_like=True): ... @@ -137,6 +140,10 @@ class UpdIns(Update, tag="i"): items: list[FileEntry] +class UpdateMessage(msgspec.Struct): + update: list[UpdKeep | UpdDel | UpdIns] + + class Space(msgspec.Struct): disk: int free: int diff --git a/cista/watching.py b/cista/watching.py index ab62ab6..b8d4ae8 100644 --- a/cista/watching.py +++ b/cista/watching.py @@ -282,13 +282,11 @@ def format_update(old, new): del_count = 0 rest = new[nidx:] - while old[oidx] not in rest: + while oidx < len(old) and old[oidx] not in rest: del_count += 1 oidx += 1 - if del_count: update.append(UpdDel(del_count)) - oidx += 1 continue insert_items = [] diff --git a/tests/test_watching.py b/tests/test_watching.py new file mode 100644 index 0000000..8e65faf --- /dev/null +++ b/tests/test_watching.py @@ -0,0 +1,77 @@ +import msgspec + +from cista.protocol import FileEntry, UpdateMessage, UpdDel, UpdIns, UpdKeep +from cista.watching import format_update + + +def decode(data: str): + return msgspec.json.decode(data, type=UpdateMessage).update + + +# Helper function to create a list of FileEntry objects +def f(count, start=0): + return [FileEntry(i, str(i), str(i), 0, 0, 0) for i in range(start, start + count)] + + +def test_identical_lists(): + old_list = f(3) + new_list = old_list.copy() + expected = [UpdKeep(3)] + assert decode(format_update(old_list, new_list)) == expected + + +def test_completely_different_lists(): + old_list = f(3) + new_list = f(3, 3) # Different entries + expected = [UpdDel(3), UpdIns(new_list)] + assert decode(format_update(old_list, new_list)) == expected + + +def test_insertions(): + old_list = f(3) + new_list = old_list[:2] + f(1, 10) + old_list[2:] + expected = [UpdKeep(2), UpdIns(f(1, 10)), UpdKeep(1)] + assert decode(format_update(old_list, new_list)) == expected + + +def test_deletions(): + old_list = f(3) + new_list = [old_list[0], old_list[2]] + expected = [UpdKeep(1), UpdDel(1), UpdKeep(1)] + assert decode(format_update(old_list, new_list)) == expected + + +def test_mixed_operations(): + old_list = f(4) + new_list = [old_list[0], old_list[2], *f(1, 10)] + expected = [UpdKeep(1), UpdDel(1), UpdKeep(1), UpdDel(1), UpdIns(f(1, 10))] + assert decode(format_update(old_list, new_list)) == expected + + +def test_empty_old_list(): + old_list = [] + new_list = f(3) + expected = [UpdIns(new_list)] + assert decode(format_update(old_list, new_list)) == expected + + +def test_empty_new_list(): + old_list = f(3) + new_list = [] + expected = [UpdDel(3)] + assert decode(format_update(old_list, new_list)) == expected + + +def test_longer_lists(): + old_list = f(6) + new_list = f(1, 6) + old_list[1:3] + old_list[4:5] + f(2, 7) + expected = [ + UpdDel(1), + UpdIns(f(1, 6)), + UpdKeep(2), + UpdDel(1), + UpdKeep(1), + UpdDel(1), + UpdIns(f(2, 7)), + ] + assert decode(format_update(old_list, new_list)) == expected