WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
64 changes: 40 additions & 24 deletions upath/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import posixpath
import sys
import warnings
from abc import ABCMeta
Expand All @@ -11,6 +10,7 @@
from collections.abc import Mapping
from collections.abc import Sequence
from copy import copy
from pathlib import PurePath
from types import MappingProxyType
from typing import IO
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -336,7 +336,7 @@ def path(self) -> str:
if (self_path := str(self)) == ".":
path = str(current_dir)
else:
path = current_dir.parser.join(str(self), self_path)
path = current_dir.parser.join(str(current_dir), self_path)
return self.parser.strip_protocol(path)
return self._chain.active_path

Expand Down Expand Up @@ -1809,6 +1809,12 @@ def rename(

Returns the new Path instance pointing to the target path.

Info
----
For filesystems that don't have a root character, i.e. for which
relative paths can be ambiguous, you can explicitly indicate a
relative path via prefixing with `./`

Warning
-------
This method is non-standard compared to pathlib.Path.rename(),
Expand All @@ -1819,43 +1825,53 @@ def rename(
running into future compatibility issues.

"""
# check protocol compatibility
target_protocol = get_upath_protocol(target)
if target_protocol and target_protocol != self.protocol:
raise ValueError(
f"expected protocol {self.protocol!r}, got: {target_protocol!r}"
)
if not isinstance(target, UPath):
target = str(target)
if target_protocol or (self.anchor and target.startswith(self.anchor)):
target = self.with_segments(target)
# ensure target is an absolute UPath
if not isinstance(target, type(self)):
if isinstance(target, (UPath, PurePath)):
target_str = target.as_posix()
else:
target_str = str(target)
if target_protocol:
# target protocol provided indicates absolute path
target = self.with_segments(target_str)
elif self.anchor and target_str.startswith(self.anchor):
# self.anchor can be used to indicate absolute path
target = self.with_segments(target_str)
elif not self.anchor and target_str.startswith("./"):
# indicate relative via "./"
target = (
self.cwd()
.joinpath(target_str.removeprefix("./"))
.relative_to(self.cwd())
)
else:
target = UPath(target)
# all other cases
target = self.cwd().joinpath(target_str).relative_to(self.cwd())
# return early if renaming to same path
if target == self:
return self
if self._relative_base is not None:
self = self.absolute()
target_protocol = get_upath_protocol(target)
if target_protocol:
target_ = target
# avoid calling .resolve for subclasses of UPath
if ".." in target_.parts or "." in target_.parts:
target_ = target_.resolve()
else:
parent = self.parent
# avoid calling .resolve for subclasses of UPath
if ".." in parent.parts or "." in parent.parts:
parent = parent.resolve()
target_ = parent.joinpath(posixpath.normpath(target.path))
# ensure source and target are absolute
source_abs = self.absolute()
target_abs = target.absolute()
# avoid calling .resolve for if not needed
if ".." in target_abs.parts or "." in target_abs.parts:
target_abs = target_abs.resolve()
if recursive is not UNSET_DEFAULT:
kwargs["recursive"] = recursive
if maxdepth is not UNSET_DEFAULT:
kwargs["maxdepth"] = maxdepth
self.fs.mv(
self.path,
target_.path,
source_abs.path,
target_abs.path,
**kwargs,
)
return self.with_segments(target_)
return target

def replace(self, target: WritablePathLike) -> Self:
"""
Expand Down
32 changes: 29 additions & 3 deletions upath/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Callable
from typing import Literal
from typing import TextIO
from typing import TypeVar
from typing import overload
from urllib.parse import SplitResult

Expand Down Expand Up @@ -40,6 +41,28 @@
"ProxyUPath",
]

T = TypeVar("T")


class classmethod_or_method(classmethod):
"""A decorator that can be used as a classmethod or an instance method.

When called on the class, it behaves like a classmethod.
When called on an instance, it behaves like an instance method.

"""

def __get__(
self,
instance: T | None,
owner: type[T] | None = None,
/,
) -> Callable[..., T]:
if instance is None:
return self.__func__.__get__(owner)
else:
return self.__func__.__get__(instance)


class ProxyUPath:
"""ProxyUPath base class
Expand Down Expand Up @@ -380,9 +403,12 @@ def as_posix(self) -> str:
def samefile(self, other_path) -> bool:
return self.__wrapped__.samefile(other_path)

@classmethod
def cwd(cls) -> Self:
raise UnsupportedOperation(".cwd() not supported")
@classmethod_or_method
def cwd(cls_or_self) -> Self: # noqa: B902
if isinstance(cls_or_self, type):
raise UnsupportedOperation(".cwd() not supported")
else:
return cls_or_self._from_upath(cls_or_self.__wrapped__.cwd())

@classmethod
def home(cls) -> Self:
Expand Down
17 changes: 17 additions & 0 deletions upath/implementations/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from typing import TYPE_CHECKING

from upath.core import UPath
from upath.types import UNSET_DEFAULT
from upath.types import JoinablePathLike

if TYPE_CHECKING:
from typing import Any
from typing import Literal

if sys.version_info >= (3, 11):
Expand All @@ -19,6 +21,7 @@
from typing_extensions import Unpack

from upath._chain import FSSpecChainParser
from upath.types import WritablePathLike
from upath.types.storage_options import FTPStorageOptions

__all__ = ["FTPPath"]
Expand Down Expand Up @@ -55,3 +58,17 @@ def iterdir(self) -> Iterator[Self]:
raise NotADirectoryError(str(self))
else:
return super().iterdir()

def rename(
self,
target: WritablePathLike,
*, # note: non-standard compared to pathlib
recursive: bool = UNSET_DEFAULT,
maxdepth: int | None = UNSET_DEFAULT,
**kwargs: Any,
) -> Self:
t = super().rename(target, recursive=recursive, maxdepth=maxdepth, **kwargs)
self_dir = self.parent.path
t.fs.invalidate_cache(self_dir)
self.fs.invalidate_cache(self_dir)
return t
13 changes: 13 additions & 0 deletions upath/implementations/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from upath.types import StatResultType
from upath.types import SupportsPathLike
from upath.types import WritablePath
from upath.types import WritablePathLike

if TYPE_CHECKING:
from typing import IO
Expand Down Expand Up @@ -133,6 +134,18 @@ def _raw_urlpaths(self) -> Sequence[JoinablePathLike]:
def _raw_urlpaths(self, value: Sequence[JoinablePathLike]) -> None:
pass

if sys.version_info >= (3, 14):

def rename(
self,
target: WritablePathLike,
) -> Self:
t = super().rename(target) # type: ignore[arg-type]
if not isinstance(target, type(self)):
return self.with_segments(t)
else:
return t

if sys.version_info >= (3, 12):

def __init__(
Expand Down
121 changes: 96 additions & 25 deletions upath/tests/cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from upath import UnsupportedOperation
from upath import UPath
from upath._protocol import get_upath_protocol
from upath._stat import UPathStatResult
from upath.types import StatResultType

Expand Down Expand Up @@ -275,31 +276,101 @@ def test_readlink(self):
self.path.readlink()

def test_rename(self):
upath = self.path.joinpath("file1.txt")
target = upath.parent.joinpath("file1_renamed.txt")
moved = upath.rename(target)
assert target == moved
assert not upath.exists()
assert moved.exists()
# reverse with an absolute path as str
back = moved.rename(upath.path)
assert back == upath
assert not moved.exists()
assert back.exists()

def test_rename2(self):
upath = self.path.joinpath("folder1/file2.txt")
target = "file2_renamed.txt"
moved = upath.rename(target)
target_path = upath.parent.joinpath(target).resolve()
assert target_path == moved
assert not upath.exists()
assert moved.exists()
# reverse with a relative path as UPath
back = moved.rename(UPath("file2.txt"))
assert back == upath
assert not moved.exists()
assert back.exists()
p_source = self.path.joinpath("file1.txt")
p_target = self.path.joinpath("file1_renamed.txt")

p_moved = p_source.rename(p_target)
assert p_target == p_moved
assert not p_source.exists()
assert p_moved.exists()

p_revert = p_moved.rename(p_source)
assert p_revert == p_source
assert not p_moved.exists()
assert p_revert.exists()

@pytest.fixture
def supports_cwd(self):
# intentionally called on the instance to support ProxyUPath().cwd()
try:
self.path.cwd()
except UnsupportedOperation:
return False
else:
return True

@pytest.mark.parametrize(
"target_factory",
[
lambda obj, name: name,
lambda obj, name: UPath(name),
lambda obj, name: Path(name),
lambda obj, name: obj.joinpath(name).relative_to(obj),
],
ids=[
"str_relative",
"plain_upath_relative",
"plain_path_relative",
"self_upath_relative",
],
)
def test_rename_with_target_relative(
self, request, monkeypatch, supports_cwd, target_factory, tmp_path
):
source = self.path.joinpath("folder1/file2.txt")
target = target_factory(self.path, "file2_renamed.txt")

source_text = source.read_text()
if supports_cwd:
cid = request.node.callspec.id
cwd = tmp_path.joinpath(cid)
cwd.mkdir(parents=True, exist_ok=True)
monkeypatch.chdir(cwd)

t = source.rename(target)
assert (t.protocol == UPath(target).protocol) or UPath(
target
).protocol == ""
assert (t.path == UPath(target).path) or (
t.path == UPath(target).absolute().path
)
assert t.exists()
assert t.read_text() == source_text

else:
with pytest.raises(UnsupportedOperation):
source.rename(target)

@pytest.mark.parametrize(
"target_factory",
[
lambda obj, name: obj.joinpath(name).absolute().as_posix(),
lambda obj, name: UPath(obj.absolute().joinpath(name).path),
lambda obj, name: Path(obj.absolute().joinpath(name).path),
lambda obj, name: obj.absolute().joinpath(name),
],
ids=[
"str_absolute",
"plain_upath_absolute",
"plain_path_absolute",
"self_upath_absolute",
],
)
def test_rename_with_target_absolute(self, target_factory):
from upath._chain import Chain
from upath._chain import FSSpecChainParser

source = self.path.joinpath("folder1/file2.txt")
target = target_factory(self.path, "file2_renamed.txt")

source_text = source.read_text()
t = source.rename(target)
assert get_upath_protocol(target) in {t.protocol, ""}
assert t.path == Chain.from_list(
FSSpecChainParser().unchain(str(target))
).active_path.replace("\\", "/")
assert t.exists()
assert t.read_text() == source_text

def test_replace(self):
pass
Expand Down
Loading