mirror of
https://github.com/trycua/computer.git
synced 2026-01-04 20:40:15 -06:00
Updated tests; bumped mcp deps
This commit is contained in:
@@ -13,8 +13,8 @@ authors = [
|
||||
]
|
||||
dependencies = [
|
||||
"mcp>=1.6.0,<2.0.0",
|
||||
"cua-agent[all]>=0.2.0,<0.3.0",
|
||||
"cua-computer>=0.2.0,<0.3.0",
|
||||
"cua-agent[all]>=0.3.0,<0.4.0",
|
||||
"cua-computer>=0.3.0,<0.4.0",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
||||
256
tests/files.py
256
tests/files.py
@@ -136,6 +136,262 @@ async def test_create_dir(computer):
|
||||
assert exists is True, "Directory should exist"
|
||||
await computer.interface.delete_dir(tmp_dir)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_read_bytes_basic(computer):
|
||||
"""Test basic read_bytes functionality."""
|
||||
tmp_path = "test_read_bytes.bin"
|
||||
test_data = b"Hello, World! This is binary data \x00\x01\x02\x03"
|
||||
|
||||
# Write binary data using write_text (assuming it handles bytes)
|
||||
await computer.interface.write_text(tmp_path, test_data.decode('latin-1'))
|
||||
|
||||
# Read all bytes
|
||||
read_data = await computer.interface.read_bytes(tmp_path)
|
||||
assert read_data == test_data, "Binary data should match"
|
||||
|
||||
await computer.interface.delete_file(tmp_path)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_read_bytes_with_offset_and_length(computer):
|
||||
"""Test read_bytes with offset and length parameters."""
|
||||
tmp_path = "test_read_bytes_offset.bin"
|
||||
test_data = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
|
||||
# Write test data
|
||||
await computer.interface.write_text(tmp_path, test_data.decode('latin-1'))
|
||||
|
||||
# Test reading with offset only
|
||||
read_data = await computer.interface.read_bytes(tmp_path, offset=5)
|
||||
expected = test_data[5:]
|
||||
assert read_data == expected, f"Data from offset 5 should match. Got: {read_data}, Expected: {expected}"
|
||||
|
||||
# Test reading with offset and length
|
||||
read_data = await computer.interface.read_bytes(tmp_path, offset=10, length=5)
|
||||
expected = test_data[10:15]
|
||||
assert read_data == expected, f"Data from offset 10, length 5 should match. Got: {read_data}, Expected: {expected}"
|
||||
|
||||
# Test reading from beginning with length
|
||||
read_data = await computer.interface.read_bytes(tmp_path, offset=0, length=10)
|
||||
expected = test_data[:10]
|
||||
assert read_data == expected, f"Data from beginning, length 10 should match. Got: {read_data}, Expected: {expected}"
|
||||
|
||||
await computer.interface.delete_file(tmp_path)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_get_file_size(computer):
|
||||
"""Test get_file_size functionality."""
|
||||
tmp_path = "test_file_size.txt"
|
||||
test_content = "A" * 1000 # 1000 bytes
|
||||
|
||||
await computer.interface.write_text(tmp_path, test_content)
|
||||
|
||||
file_size = await computer.interface.get_file_size(tmp_path)
|
||||
assert file_size == 1000, f"File size should be 1000 bytes, got {file_size}"
|
||||
|
||||
await computer.interface.delete_file(tmp_path)
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_read_large_file(computer):
|
||||
"""Test reading a file larger than 10MB to verify chunked reading."""
|
||||
tmp_path = "test_large_file.bin"
|
||||
|
||||
# Create a file larger than 10MB (10 * 1024 * 1024 = 10,485,760 bytes)
|
||||
total_size = 12 * 1024 * 1024 # 12MB
|
||||
|
||||
print(f"Creating large file of {total_size} bytes ({total_size / (1024*1024):.1f}MB)...")
|
||||
|
||||
# Create large file content (this will test the chunked writing functionality)
|
||||
large_content = b"X" * total_size
|
||||
|
||||
# Write the large file using write_bytes (will automatically use chunked writing)
|
||||
await computer.interface.write_bytes(tmp_path, large_content)
|
||||
|
||||
# Verify file size
|
||||
file_size = await computer.interface.get_file_size(tmp_path)
|
||||
assert file_size == total_size, f"Large file size should be {total_size} bytes, got {file_size}"
|
||||
|
||||
print(f"Large file created successfully: {file_size} bytes")
|
||||
|
||||
# Test reading the entire large file (should use chunked reading)
|
||||
print("Reading large file...")
|
||||
read_data = await computer.interface.read_bytes(tmp_path)
|
||||
assert len(read_data) == total_size, f"Read data size should match file size. Got {len(read_data)}, expected {total_size}"
|
||||
|
||||
# Verify content (should be all 'X' characters)
|
||||
expected_data = b"X" * total_size
|
||||
assert read_data == expected_data, "Large file content should be all 'X' characters"
|
||||
|
||||
print("Large file read successfully!")
|
||||
|
||||
# Test reading with offset and length on large file
|
||||
offset = 5 * 1024 * 1024 # 5MB offset
|
||||
length = 2 * 1024 * 1024 # 2MB length
|
||||
read_data = await computer.interface.read_bytes(tmp_path, offset=offset, length=length)
|
||||
assert len(read_data) == length, f"Partial read size should be {length}, got {len(read_data)}"
|
||||
assert read_data == b"X" * length, "Partial read content should be all 'X' characters"
|
||||
|
||||
print("Large file partial read successful!")
|
||||
|
||||
# Clean up
|
||||
await computer.interface.delete_file(tmp_path)
|
||||
print("Large file test completed successfully!")
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_read_write_text_with_encoding(computer):
|
||||
"""Test reading and writing text files with different encodings."""
|
||||
print("Testing text file operations with different encodings...")
|
||||
|
||||
tmp_path = "test_encoding.txt"
|
||||
|
||||
# Test UTF-8 encoding (default)
|
||||
utf8_content = "Hello, 世界! 🌍 Ñoño café"
|
||||
await computer.interface.write_text(tmp_path, utf8_content, encoding='utf-8')
|
||||
read_utf8 = await computer.interface.read_text(tmp_path, encoding='utf-8')
|
||||
assert read_utf8 == utf8_content, "UTF-8 content should match"
|
||||
|
||||
# Test ASCII encoding
|
||||
ascii_content = "Hello, World! Simple ASCII text."
|
||||
await computer.interface.write_text(tmp_path, ascii_content, encoding='ascii')
|
||||
read_ascii = await computer.interface.read_text(tmp_path, encoding='ascii')
|
||||
assert read_ascii == ascii_content, "ASCII content should match"
|
||||
|
||||
# Test Latin-1 encoding
|
||||
latin1_content = "Café, naïve, résumé"
|
||||
await computer.interface.write_text(tmp_path, latin1_content, encoding='latin-1')
|
||||
read_latin1 = await computer.interface.read_text(tmp_path, encoding='latin-1')
|
||||
assert read_latin1 == latin1_content, "Latin-1 content should match"
|
||||
|
||||
# Clean up
|
||||
await computer.interface.delete_file(tmp_path)
|
||||
print("Text encoding test completed successfully!")
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_write_text_append_mode(computer):
|
||||
"""Test appending text to files."""
|
||||
print("Testing text file append mode...")
|
||||
|
||||
tmp_path = "test_append.txt"
|
||||
|
||||
# Write initial content
|
||||
initial_content = "First line\n"
|
||||
await computer.interface.write_text(tmp_path, initial_content)
|
||||
|
||||
# Append more content
|
||||
append_content = "Second line\n"
|
||||
await computer.interface.write_text(tmp_path, append_content, append=True)
|
||||
|
||||
# Read and verify
|
||||
final_content = await computer.interface.read_text(tmp_path)
|
||||
expected_content = initial_content + append_content
|
||||
assert final_content == expected_content, f"Expected '{expected_content}', got '{final_content}'"
|
||||
|
||||
# Append one more line
|
||||
third_content = "Third line\n"
|
||||
await computer.interface.write_text(tmp_path, third_content, append=True)
|
||||
|
||||
# Read and verify final result
|
||||
final_content = await computer.interface.read_text(tmp_path)
|
||||
expected_content = initial_content + append_content + third_content
|
||||
assert final_content == expected_content, f"Expected '{expected_content}', got '{final_content}'"
|
||||
|
||||
# Clean up
|
||||
await computer.interface.delete_file(tmp_path)
|
||||
print("Text append test completed successfully!")
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_large_text_file(computer):
|
||||
"""Test reading and writing large text files (>5MB) to verify chunked operations."""
|
||||
print("Testing large text file operations...")
|
||||
|
||||
tmp_path = "test_large_text.txt"
|
||||
|
||||
# Create a large text content (approximately 6MB)
|
||||
# Each line is about 100 characters, so 60,000 lines ≈ 6MB
|
||||
line_template = "This is line {:06d} with some additional text to make it longer and reach about 100 chars.\n"
|
||||
large_content = ""
|
||||
num_lines = 60000
|
||||
|
||||
print(f"Generating large text content with {num_lines} lines...")
|
||||
for i in range(num_lines):
|
||||
large_content += line_template.format(i)
|
||||
|
||||
content_size_mb = len(large_content.encode('utf-8')) / (1024 * 1024)
|
||||
print(f"Generated text content size: {content_size_mb:.2f} MB")
|
||||
|
||||
# Write the large text file
|
||||
print("Writing large text file...")
|
||||
await computer.interface.write_text(tmp_path, large_content)
|
||||
|
||||
# Read the entire file back
|
||||
print("Reading large text file...")
|
||||
read_content = await computer.interface.read_text(tmp_path)
|
||||
|
||||
# Verify content matches
|
||||
assert read_content == large_content, "Large text file content should match exactly"
|
||||
|
||||
# Test partial reading by reading as bytes and decoding specific portions
|
||||
print("Testing partial text reading...")
|
||||
|
||||
# Read first 1000 characters worth of bytes
|
||||
first_1000_chars = large_content[:1000]
|
||||
first_1000_bytes = first_1000_chars.encode('utf-8')
|
||||
read_bytes = await computer.interface.read_bytes(tmp_path, offset=0, length=len(first_1000_bytes))
|
||||
decoded_partial = read_bytes.decode('utf-8')
|
||||
assert decoded_partial == first_1000_chars, "Partial text reading should match"
|
||||
|
||||
# Test appending to large file
|
||||
print("Testing append to large text file...")
|
||||
append_text = "\n--- APPENDED CONTENT ---\nThis content was appended to the large file.\n"
|
||||
await computer.interface.write_text(tmp_path, append_text, append=True)
|
||||
|
||||
# Read and verify appended content
|
||||
final_content = await computer.interface.read_text(tmp_path)
|
||||
expected_final = large_content + append_text
|
||||
assert final_content == expected_final, "Appended large text file should match"
|
||||
|
||||
# Clean up
|
||||
await computer.interface.delete_file(tmp_path)
|
||||
print("Large text file test completed successfully!")
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_text_file_edge_cases(computer):
|
||||
"""Test edge cases for text file operations."""
|
||||
print("Testing text file edge cases...")
|
||||
|
||||
tmp_path = "test_edge_cases.txt"
|
||||
|
||||
# Test empty file
|
||||
empty_content = ""
|
||||
await computer.interface.write_text(tmp_path, empty_content)
|
||||
read_empty = await computer.interface.read_text(tmp_path)
|
||||
assert read_empty == empty_content, "Empty file should return empty string"
|
||||
|
||||
# Test file with only whitespace
|
||||
whitespace_content = " \n\t\r\n \n"
|
||||
await computer.interface.write_text(tmp_path, whitespace_content)
|
||||
read_whitespace = await computer.interface.read_text(tmp_path)
|
||||
assert read_whitespace == whitespace_content, "Whitespace content should be preserved"
|
||||
|
||||
# Test file with special characters and newlines
|
||||
special_content = "Line 1\nLine 2\r\nLine 3\tTabbed\nSpecial: !@#$%^&*()\n"
|
||||
await computer.interface.write_text(tmp_path, special_content)
|
||||
read_special = await computer.interface.read_text(tmp_path)
|
||||
assert read_special == special_content, "Special characters should be preserved"
|
||||
|
||||
# Test very long single line (no newlines)
|
||||
long_line = "A" * 10000 # 10KB single line
|
||||
await computer.interface.write_text(tmp_path, long_line)
|
||||
read_long_line = await computer.interface.read_text(tmp_path)
|
||||
assert read_long_line == long_line, "Long single line should be preserved"
|
||||
|
||||
# Clean up
|
||||
await computer.interface.delete_file(tmp_path)
|
||||
print("Text file edge cases test completed successfully!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run tests directly
|
||||
pytest.main([__file__, "-v"])
|
||||
|
||||
Reference in New Issue
Block a user