AutoGPT/autogpt/tests/unit/test_gcs_file_storage.py

202 lines
6.6 KiB
Python

import os
import uuid
from pathlib import Path
import pytest
import pytest_asyncio
from forge.file_storage.gcs import GCSFileStorage, GCSFileStorageConfiguration
from google.auth.exceptions import GoogleAuthError
from google.cloud import storage
from google.cloud.exceptions import NotFound
try:
storage.Client()
except GoogleAuthError:
pytest.skip("Google Cloud Authentication not configured", allow_module_level=True)
pytestmark = pytest.mark.slow
@pytest.fixture(scope="module")
def gcs_bucket_name() -> str:
return f"test-bucket-{str(uuid.uuid4())[:8]}"
@pytest.fixture(scope="module")
def gcs_root() -> Path:
return Path("/workspaces/AutoGPT-some-unique-task-id")
@pytest.fixture(scope="module")
def gcs_storage_uninitialized(gcs_bucket_name: str, gcs_root: Path):
os.environ["STORAGE_BUCKET"] = gcs_bucket_name
storage_config = GCSFileStorageConfiguration.from_env()
storage_config.root = gcs_root
storage = GCSFileStorage(storage_config)
yield storage # type: ignore
del os.environ["STORAGE_BUCKET"]
def test_initialize(gcs_bucket_name: str, gcs_storage_uninitialized: GCSFileStorage):
gcs = gcs_storage_uninitialized._gcs
# test that the bucket doesn't exist yet
with pytest.raises(NotFound):
gcs.get_bucket(gcs_bucket_name)
gcs_storage_uninitialized.initialize()
# test that the bucket has been created
bucket = gcs.get_bucket(gcs_bucket_name)
# clean up
bucket.delete(force=True)
@pytest.fixture(scope="module")
def gcs_storage(gcs_storage_uninitialized: GCSFileStorage):
(gcs_storage := gcs_storage_uninitialized).initialize()
yield gcs_storage # type: ignore
# Empty & delete the test bucket
gcs_storage._bucket.delete(force=True)
def test_workspace_bucket_name(
gcs_storage: GCSFileStorage,
gcs_bucket_name: str,
):
assert gcs_storage._bucket.name == gcs_bucket_name
NESTED_DIR = "existing/test/dir"
TEST_FILES: list[tuple[str | Path, str]] = [
("existing_test_file_1", "test content 1"),
("existing_test_file_2.txt", "test content 2"),
(Path("existing_test_file_3"), "test content 3"),
(Path(f"{NESTED_DIR}/test_file_4"), "test content 4"),
]
@pytest_asyncio.fixture
async def gcs_storage_with_files(gcs_storage: GCSFileStorage):
for file_name, file_content in TEST_FILES:
gcs_storage._bucket.blob(
str(gcs_storage.get_path(file_name))
).upload_from_string(file_content)
yield gcs_storage # type: ignore
@pytest.mark.asyncio
async def test_read_file(gcs_storage_with_files: GCSFileStorage):
for file_name, file_content in TEST_FILES:
content = gcs_storage_with_files.read_file(file_name)
assert content == file_content
with pytest.raises(NotFound):
gcs_storage_with_files.read_file("non_existent_file")
def test_list_files(gcs_storage_with_files: GCSFileStorage):
# List at root level
assert (
files := gcs_storage_with_files.list_files()
) == gcs_storage_with_files.list_files()
assert len(files) > 0
assert set(files) == set(Path(file_name) for file_name, _ in TEST_FILES)
# List at nested path
assert (
nested_files := gcs_storage_with_files.list_files(NESTED_DIR)
) == gcs_storage_with_files.list_files(NESTED_DIR)
assert len(nested_files) > 0
assert set(nested_files) == set(
p.relative_to(NESTED_DIR)
for file_name, _ in TEST_FILES
if (p := Path(file_name)).is_relative_to(NESTED_DIR)
)
def test_list_folders(gcs_storage_with_files: GCSFileStorage):
# List recursive
folders = gcs_storage_with_files.list_folders(recursive=True)
assert len(folders) > 0
assert set(folders) == {
Path("existing"),
Path("existing/test"),
Path("existing/test/dir"),
}
# List non-recursive
folders = gcs_storage_with_files.list_folders(recursive=False)
assert len(folders) > 0
assert set(folders) == {Path("existing")}
@pytest.mark.asyncio
async def test_write_read_file(gcs_storage: GCSFileStorage):
await gcs_storage.write_file("test_file", "test_content")
assert gcs_storage.read_file("test_file") == "test_content"
@pytest.mark.asyncio
async def test_overwrite_file(gcs_storage_with_files: GCSFileStorage):
for file_name, _ in TEST_FILES:
await gcs_storage_with_files.write_file(file_name, "new content")
assert gcs_storage_with_files.read_file(file_name) == "new content"
def test_delete_file(gcs_storage_with_files: GCSFileStorage):
for file_to_delete, _ in TEST_FILES:
gcs_storage_with_files.delete_file(file_to_delete)
assert not gcs_storage_with_files.exists(file_to_delete)
def test_exists(gcs_storage_with_files: GCSFileStorage):
for file_name, _ in TEST_FILES:
assert gcs_storage_with_files.exists(file_name)
assert not gcs_storage_with_files.exists("non_existent_file")
def test_rename_file(gcs_storage_with_files: GCSFileStorage):
for file_name, _ in TEST_FILES:
new_name = str(file_name) + "_renamed"
gcs_storage_with_files.rename(file_name, new_name)
assert gcs_storage_with_files.exists(new_name)
assert not gcs_storage_with_files.exists(file_name)
def test_rename_dir(gcs_storage_with_files: GCSFileStorage):
gcs_storage_with_files.rename(NESTED_DIR, "existing/test/dir_renamed")
assert gcs_storage_with_files.exists("existing/test/dir_renamed")
assert not gcs_storage_with_files.exists(NESTED_DIR)
def test_clone(gcs_storage_with_files: GCSFileStorage, gcs_root: Path):
cloned = gcs_storage_with_files.clone_with_subroot("existing/test")
assert cloned.root == gcs_root / Path("existing/test")
assert cloned._bucket.name == gcs_storage_with_files._bucket.name
assert cloned.exists("dir")
assert cloned.exists("dir/test_file_4")
@pytest.mark.asyncio
async def test_copy_file(storage: GCSFileStorage):
await storage.write_file("test_file.txt", "test content")
storage.copy("test_file.txt", "test_file_copy.txt")
storage.make_dir("dir")
storage.copy("test_file.txt", "dir/test_file_copy.txt")
assert storage.read_file("test_file_copy.txt") == "test content"
assert storage.read_file("dir/test_file_copy.txt") == "test content"
@pytest.mark.asyncio
async def test_copy_dir(storage: GCSFileStorage):
storage.make_dir("dir")
storage.make_dir("dir/sub_dir")
await storage.write_file("dir/test_file.txt", "test content")
await storage.write_file("dir/sub_dir/test_file.txt", "test content")
storage.copy("dir", "dir_copy")
assert storage.read_file("dir_copy/test_file.txt") == "test content"
assert storage.read_file("dir_copy/sub_dir/test_file.txt") == "test content"