diff --git a/upath/core.py b/upath/core.py index e60b2325..5c9c9f2c 100644 --- a/upath/core.py +++ b/upath/core.py @@ -2,7 +2,6 @@ from __future__ import annotations -import posixpath import sys import warnings from abc import ABCMeta @@ -11,6 +10,7 @@ from collections.abc import Mapping from collections.abc import Sequence from copy import copy +from pathlib import PurePath from types import MappingProxyType from typing import IO from typing import TYPE_CHECKING @@ -336,7 +336,7 @@ def path(self) -> str: if (self_path := str(self)) == ".": path = str(current_dir) else: - path = current_dir.parser.join(str(self), self_path) + path = current_dir.parser.join(str(current_dir), self_path) return self.parser.strip_protocol(path) return self._chain.active_path @@ -1809,6 +1809,12 @@ def rename( Returns the new Path instance pointing to the target path. + Info + ---- + For filesystems that don't have a root character, i.e. for which + relative paths can be ambiguous, you can explicitly indicate a + relative path via prefixing with `./` + Warning ------- This method is non-standard compared to pathlib.Path.rename(), @@ -1819,43 +1825,53 @@ def rename( running into future compatibility issues. """ + # check protocol compatibility target_protocol = get_upath_protocol(target) if target_protocol and target_protocol != self.protocol: raise ValueError( f"expected protocol {self.protocol!r}, got: {target_protocol!r}" ) - if not isinstance(target, UPath): - target = str(target) - if target_protocol or (self.anchor and target.startswith(self.anchor)): - target = self.with_segments(target) + # ensure target is an absolute UPath + if not isinstance(target, type(self)): + if isinstance(target, (UPath, PurePath)): + target_str = target.as_posix() + else: + target_str = str(target) + if target_protocol: + # target protocol provided indicates absolute path + target = self.with_segments(target_str) + elif self.anchor and target_str.startswith(self.anchor): + # self.anchor can be used to indicate absolute path + target = self.with_segments(target_str) + elif not self.anchor and target_str.startswith("./"): + # indicate relative via "./" + target = ( + self.cwd() + .joinpath(target_str.removeprefix("./")) + .relative_to(self.cwd()) + ) else: - target = UPath(target) + # all other cases + target = self.cwd().joinpath(target_str).relative_to(self.cwd()) + # return early if renaming to same path if target == self: return self - if self._relative_base is not None: - self = self.absolute() - target_protocol = get_upath_protocol(target) - if target_protocol: - target_ = target - # avoid calling .resolve for subclasses of UPath - if ".." in target_.parts or "." in target_.parts: - target_ = target_.resolve() - else: - parent = self.parent - # avoid calling .resolve for subclasses of UPath - if ".." in parent.parts or "." in parent.parts: - parent = parent.resolve() - target_ = parent.joinpath(posixpath.normpath(target.path)) + # ensure source and target are absolute + source_abs = self.absolute() + target_abs = target.absolute() + # avoid calling .resolve for if not needed + if ".." in target_abs.parts or "." in target_abs.parts: + target_abs = target_abs.resolve() if recursive is not UNSET_DEFAULT: kwargs["recursive"] = recursive if maxdepth is not UNSET_DEFAULT: kwargs["maxdepth"] = maxdepth self.fs.mv( - self.path, - target_.path, + source_abs.path, + target_abs.path, **kwargs, ) - return self.with_segments(target_) + return target def replace(self, target: WritablePathLike) -> Self: """ diff --git a/upath/extensions.py b/upath/extensions.py index 3450e10c..84d56798 100644 --- a/upath/extensions.py +++ b/upath/extensions.py @@ -11,6 +11,7 @@ from typing import Callable from typing import Literal from typing import TextIO +from typing import TypeVar from typing import overload from urllib.parse import SplitResult @@ -40,6 +41,28 @@ "ProxyUPath", ] +T = TypeVar("T") + + +class classmethod_or_method(classmethod): + """A decorator that can be used as a classmethod or an instance method. + + When called on the class, it behaves like a classmethod. + When called on an instance, it behaves like an instance method. + + """ + + def __get__( + self, + instance: T | None, + owner: type[T] | None = None, + /, + ) -> Callable[..., T]: + if instance is None: + return self.__func__.__get__(owner) + else: + return self.__func__.__get__(instance) + class ProxyUPath: """ProxyUPath base class @@ -380,9 +403,12 @@ def as_posix(self) -> str: def samefile(self, other_path) -> bool: return self.__wrapped__.samefile(other_path) - @classmethod - def cwd(cls) -> Self: - raise UnsupportedOperation(".cwd() not supported") + @classmethod_or_method + def cwd(cls_or_self) -> Self: # noqa: B902 + if isinstance(cls_or_self, type): + raise UnsupportedOperation(".cwd() not supported") + else: + return cls_or_self._from_upath(cls_or_self.__wrapped__.cwd()) @classmethod def home(cls) -> Self: diff --git a/upath/implementations/ftp.py b/upath/implementations/ftp.py index 7009c617..69698441 100644 --- a/upath/implementations/ftp.py +++ b/upath/implementations/ftp.py @@ -6,9 +6,11 @@ from typing import TYPE_CHECKING from upath.core import UPath +from upath.types import UNSET_DEFAULT from upath.types import JoinablePathLike if TYPE_CHECKING: + from typing import Any from typing import Literal if sys.version_info >= (3, 11): @@ -19,6 +21,7 @@ from typing_extensions import Unpack from upath._chain import FSSpecChainParser + from upath.types import WritablePathLike from upath.types.storage_options import FTPStorageOptions __all__ = ["FTPPath"] @@ -55,3 +58,17 @@ def iterdir(self) -> Iterator[Self]: raise NotADirectoryError(str(self)) else: return super().iterdir() + + def rename( + self, + target: WritablePathLike, + *, # note: non-standard compared to pathlib + recursive: bool = UNSET_DEFAULT, + maxdepth: int | None = UNSET_DEFAULT, + **kwargs: Any, + ) -> Self: + t = super().rename(target, recursive=recursive, maxdepth=maxdepth, **kwargs) + self_dir = self.parent.path + t.fs.invalidate_cache(self_dir) + self.fs.invalidate_cache(self_dir) + return t diff --git a/upath/implementations/local.py b/upath/implementations/local.py index bcfb1fae..7506b435 100644 --- a/upath/implementations/local.py +++ b/upath/implementations/local.py @@ -32,6 +32,7 @@ from upath.types import StatResultType from upath.types import SupportsPathLike from upath.types import WritablePath +from upath.types import WritablePathLike if TYPE_CHECKING: from typing import IO @@ -133,6 +134,18 @@ def _raw_urlpaths(self) -> Sequence[JoinablePathLike]: def _raw_urlpaths(self, value: Sequence[JoinablePathLike]) -> None: pass + if sys.version_info >= (3, 14): + + def rename( + self, + target: WritablePathLike, + ) -> Self: + t = super().rename(target) # type: ignore[arg-type] + if not isinstance(target, type(self)): + return self.with_segments(t) + else: + return t + if sys.version_info >= (3, 12): def __init__( diff --git a/upath/tests/cases.py b/upath/tests/cases.py index b64654ac..08557fe2 100644 --- a/upath/tests/cases.py +++ b/upath/tests/cases.py @@ -12,6 +12,7 @@ from upath import UnsupportedOperation from upath import UPath +from upath._protocol import get_upath_protocol from upath._stat import UPathStatResult from upath.types import StatResultType @@ -275,31 +276,101 @@ def test_readlink(self): self.path.readlink() def test_rename(self): - upath = self.path.joinpath("file1.txt") - target = upath.parent.joinpath("file1_renamed.txt") - moved = upath.rename(target) - assert target == moved - assert not upath.exists() - assert moved.exists() - # reverse with an absolute path as str - back = moved.rename(upath.path) - assert back == upath - assert not moved.exists() - assert back.exists() - - def test_rename2(self): - upath = self.path.joinpath("folder1/file2.txt") - target = "file2_renamed.txt" - moved = upath.rename(target) - target_path = upath.parent.joinpath(target).resolve() - assert target_path == moved - assert not upath.exists() - assert moved.exists() - # reverse with a relative path as UPath - back = moved.rename(UPath("file2.txt")) - assert back == upath - assert not moved.exists() - assert back.exists() + p_source = self.path.joinpath("file1.txt") + p_target = self.path.joinpath("file1_renamed.txt") + + p_moved = p_source.rename(p_target) + assert p_target == p_moved + assert not p_source.exists() + assert p_moved.exists() + + p_revert = p_moved.rename(p_source) + assert p_revert == p_source + assert not p_moved.exists() + assert p_revert.exists() + + @pytest.fixture + def supports_cwd(self): + # intentionally called on the instance to support ProxyUPath().cwd() + try: + self.path.cwd() + except UnsupportedOperation: + return False + else: + return True + + @pytest.mark.parametrize( + "target_factory", + [ + lambda obj, name: name, + lambda obj, name: UPath(name), + lambda obj, name: Path(name), + lambda obj, name: obj.joinpath(name).relative_to(obj), + ], + ids=[ + "str_relative", + "plain_upath_relative", + "plain_path_relative", + "self_upath_relative", + ], + ) + def test_rename_with_target_relative( + self, request, monkeypatch, supports_cwd, target_factory, tmp_path + ): + source = self.path.joinpath("folder1/file2.txt") + target = target_factory(self.path, "file2_renamed.txt") + + source_text = source.read_text() + if supports_cwd: + cid = request.node.callspec.id + cwd = tmp_path.joinpath(cid) + cwd.mkdir(parents=True, exist_ok=True) + monkeypatch.chdir(cwd) + + t = source.rename(target) + assert (t.protocol == UPath(target).protocol) or UPath( + target + ).protocol == "" + assert (t.path == UPath(target).path) or ( + t.path == UPath(target).absolute().path + ) + assert t.exists() + assert t.read_text() == source_text + + else: + with pytest.raises(UnsupportedOperation): + source.rename(target) + + @pytest.mark.parametrize( + "target_factory", + [ + lambda obj, name: obj.joinpath(name).absolute().as_posix(), + lambda obj, name: UPath(obj.absolute().joinpath(name).path), + lambda obj, name: Path(obj.absolute().joinpath(name).path), + lambda obj, name: obj.absolute().joinpath(name), + ], + ids=[ + "str_absolute", + "plain_upath_absolute", + "plain_path_absolute", + "self_upath_absolute", + ], + ) + def test_rename_with_target_absolute(self, target_factory): + from upath._chain import Chain + from upath._chain import FSSpecChainParser + + source = self.path.joinpath("folder1/file2.txt") + target = target_factory(self.path, "file2_renamed.txt") + + source_text = source.read_text() + t = source.rename(target) + assert get_upath_protocol(target) in {t.protocol, ""} + assert t.path == Chain.from_list( + FSSpecChainParser().unchain(str(target)) + ).active_path.replace("\\", "/") + assert t.exists() + assert t.read_text() == source_text def test_replace(self): pass diff --git a/upath/tests/conftest.py b/upath/tests/conftest.py index 841bee8a..41f6dba3 100644 --- a/upath/tests/conftest.py +++ b/upath/tests/conftest.py @@ -48,6 +48,22 @@ def clear_registry(): _registry.clear() +@pytest.fixture(scope="function") +def windows_working_directory_drive_sync(monkeypatch, tmp_path, tmp_path_factory): + cwd_old = os.getcwd() + drive_cwd = os.path.splitdrive(cwd_old)[0] + drive_tmp = os.path.splitdrive(tmp_path)[0] + if drive_tmp != drive_cwd: + cwd_new = tmp_path_factory.mktemp("cwd_on_tmp_drive") + os.chdir(cwd_new) + try: + yield + finally: + os.chdir(cwd_old) + else: + yield + + @pytest.fixture(scope="function") def clear_fsspec_memory_cache(): fs_cls = get_filesystem_class("memory") @@ -652,7 +668,7 @@ def hf_fixture_with_readonly_mocked_hf_api( @pytest.fixture(scope="module") -def ftp_server(tmp_path_factory): +def ftp_server_process(tmp_path_factory): """Fixture providing a writable FTP filesystem.""" pytest.importorskip("pyftpdlib") @@ -674,7 +690,7 @@ def ftp_server(tmp_path_factory): ) try: time.sleep(1) - yield { + yield str(tmp_path), { "host": "localhost", "port": 2121, "username": "user", @@ -687,3 +703,23 @@ def ftp_server(tmp_path_factory): shutil.rmtree(tmp_path) except Exception: pass + + +@pytest.fixture(scope="function") +def ftp_server(ftp_server_process): + """Fixture providing a writable FTP filesystem.""" + tmp_path, storage_options = ftp_server_process + + try: + yield storage_options + finally: + for filename in os.listdir(tmp_path): + file_path = os.path.join(tmp_path, filename) + if os.path.isdir(file_path): + del_func = shutil.rmtree + else: + del_func = os.unlink + try: + del_func(file_path) + except Exception: + pass diff --git a/upath/tests/implementations/test_data.py b/upath/tests/implementations/test_data.py index f5306fc4..1d55c471 100644 --- a/upath/tests/implementations/test_data.py +++ b/upath/tests/implementations/test_data.py @@ -126,8 +126,13 @@ def test_rename(self): with pytest.raises(NotImplementedError): self.path.rename("newname") - def test_rename2(self): - self.path.rename(self.path) + @pytest.mark.skip("DataPath does not support rename") + def test_rename_with_target_relative(self): + pass + + @pytest.mark.skip("DataPath does not support rename") + def test_rename_with_target_absolute(self): + pass def test_rglob(self, pathlib_base): with pytest.raises(NotImplementedError): diff --git a/upath/tests/implementations/test_github.py b/upath/tests/implementations/test_github.py index 994df83f..66326c92 100644 --- a/upath/tests/implementations/test_github.py +++ b/upath/tests/implementations/test_github.py @@ -152,3 +152,7 @@ def test_move_memory(self, clear_fsspec_memory_cache): @pytest.mark.skip(reason="Only testing read on GithubPath") def test_move_into_memory(self, clear_fsspec_memory_cache): pass + + @pytest.mark.skip(reason="Only testing read on GithubPath") + def test_rename_with_target_absolute(self, target_factory): + return super().test_rename_with_target_str_absolute(target_factory) diff --git a/upath/tests/implementations/test_hf.py b/upath/tests/implementations/test_hf.py index 813e3648..30f5cccb 100644 --- a/upath/tests/implementations/test_hf.py +++ b/upath/tests/implementations/test_hf.py @@ -128,3 +128,7 @@ def test_iterdir(self, local_testdir): @pytest.mark.skip(reason="HfPath does not support listing repositories") def test_iterdir2(self, local_testdir): pass + + @pytest.mark.skip(reason="HfPath does not currently test write") + def test_rename_with_target_absolute(self, target_factory): + return super().test_rename_with_target_absolute(target_factory) diff --git a/upath/tests/implementations/test_http.py b/upath/tests/implementations/test_http.py index 9ca5d10e..f52d8bf9 100644 --- a/upath/tests/implementations/test_http.py +++ b/upath/tests/implementations/test_http.py @@ -173,6 +173,10 @@ def test_move_memory(self, clear_fsspec_memory_cache): def test_move_into_memory(self, clear_fsspec_memory_cache): pass + @pytest.mark.skip(reason="Only testing read on HttpPath") + def test_rename_with_target_absolute(self, target_factory): + return super().test_rename_with_target_absolute(target_factory) + @pytest.mark.parametrize( "args,parts", diff --git a/upath/tests/implementations/test_tar.py b/upath/tests/implementations/test_tar.py index 6f4834e5..9b7326ce 100644 --- a/upath/tests/implementations/test_tar.py +++ b/upath/tests/implementations/test_tar.py @@ -83,6 +83,10 @@ def test_move_memory(self, clear_fsspec_memory_cache): def test_move_into_memory(self, clear_fsspec_memory_cache): pass + @pytest.mark.skip(reason="Only testing read on TarPath") + def test_rename_with_target_absolute(self, target_factory): + return super().test_rename_with_target_str_absolute(target_factory) + @pytest.fixture(scope="function") def tarred_testdir_file_in_memory(tarred_testdir_file, clear_fsspec_memory_cache): diff --git a/upath/tests/implementations/test_webdav.py b/upath/tests/implementations/test_webdav.py index de56cec7..9ac9778c 100644 --- a/upath/tests/implementations/test_webdav.py +++ b/upath/tests/implementations/test_webdav.py @@ -1,3 +1,5 @@ +from pathlib import Path + import pytest from upath import UPath @@ -27,3 +29,27 @@ def test_read_with_fsspec(self): # run the BaseTests, the upath.implementations.webdav module is # imported, which registers the webdav implementation in fsspec. super().test_read_with_fsspec() + + @pytest.mark.parametrize( + "target_factory", + [ + lambda obj, name: str(obj.joinpath(name).absolute()), + pytest.param( + lambda obj, name: UPath(obj.absolute().joinpath(name).path), + marks=pytest.mark.xfail(reason="webdav has no root..."), + ), + pytest.param( + lambda obj, name: Path(obj.absolute().joinpath(name).path), + marks=pytest.mark.xfail(reason="webdav has no root..."), + ), + lambda obj, name: obj.absolute().joinpath(name), + ], + ids=[ + "str_absolute", + "plain_upath_absolute", + "plain_path_absolute", + "self_upath_absolute", + ], + ) + def test_rename_with_target_absolute(self, target_factory): + super().test_rename_with_target_absolute(target_factory) diff --git a/upath/tests/implementations/test_zip.py b/upath/tests/implementations/test_zip.py index 656b9057..72956bb6 100644 --- a/upath/tests/implementations/test_zip.py +++ b/upath/tests/implementations/test_zip.py @@ -42,7 +42,10 @@ def path(self, zipped_testdir_file, request): except (ValueError, TypeError, AttributeError): mode = "r" self.path = UPath("zip://", fo=zipped_testdir_file, mode=mode) - # self.prepare_file_system() done outside of UPath + try: + yield + finally: + self.path.fs.clear_instance_cache() def test_is_ZipPath(self): assert isinstance(self.path, ZipPath) @@ -81,10 +84,6 @@ def test_rename(self): with pytest.raises(NotImplementedError): super().test_rename() # delete is not implemented in fsspec - def test_rename2(self): - with pytest.raises(NotImplementedError): - super().test_rename2() # delete is not implemented in fsspec - def test_move_local(self, tmp_path): with pytest.raises(NotImplementedError): super().test_move_local(tmp_path) # delete is not implemented in fsspec @@ -146,6 +145,10 @@ def test_write_text(self): def test_fsspec_compat(self): pass + @pytest.mark.skip(reason="fsspec zipfile filesystem is either read xor write mode") + def test_rename_with_target_absolute(self, target_factory): + return super().test_rename_with_target_absolute(target_factory) + @pytest.fixture(scope="function") def zipped_testdir_file_in_memory(zipped_testdir_file, clear_fsspec_memory_cache): diff --git a/upath/tests/test_extensions.py b/upath/tests/test_extensions.py index e114092b..c3976957 100644 --- a/upath/tests/test_extensions.py +++ b/upath/tests/test_extensions.py @@ -43,6 +43,11 @@ def test_is_not_FilePath(self): def test_chmod(self): self.path.joinpath("file1.txt").chmod(777) + def test_cwd(self): + self.path.cwd() + with pytest.raises(UnsupportedOperation): + type(self.path).cwd() + class TestProxyPathlibPath(BaseTests): @pytest.fixture(autouse=True) @@ -97,10 +102,6 @@ def test_protocol(self): def test_as_uri(self): assert self.path.as_uri().startswith("file://") - @pytest.mark.xfail(reason="need to revisit relative path .rename") - def test_rename2(self): - super().test_rename2() - if sys.version_info < (3, 10): def test_lstat(self): @@ -124,6 +125,11 @@ def test_relative_to(self): relative = child.relative_to(base) assert str(relative) == f"folder1{os.sep}file1.txt" + def test_cwd(self): + self.path.cwd() + with pytest.raises(UnsupportedOperation): + type(self.path).cwd() + def test_custom_subclass():