2024-06-14 17:15:49 +01:00
|
|
|
"""
|
|
|
|
Low-level OS functionality wrappers used by pathlib.
|
|
|
|
"""
|
|
|
|
|
2024-07-06 17:18:39 +01:00
|
|
|
from errno import *
|
2025-02-08 01:16:45 +00:00
|
|
|
from stat import S_ISDIR, S_ISREG, S_ISLNK
|
2024-06-14 17:15:49 +01:00
|
|
|
import os
|
|
|
|
import sys
|
|
|
|
try:
|
|
|
|
import fcntl
|
|
|
|
except ImportError:
|
|
|
|
fcntl = None
|
|
|
|
try:
|
|
|
|
import posix
|
|
|
|
except ImportError:
|
|
|
|
posix = None
|
|
|
|
try:
|
|
|
|
import _winapi
|
|
|
|
except ImportError:
|
|
|
|
_winapi = None
|
|
|
|
|
|
|
|
|
2024-08-24 15:11:39 +01:00
|
|
|
def _get_copy_blocksize(infd):
|
2024-06-14 17:15:49 +01:00
|
|
|
"""Determine blocksize for fastcopying on Linux.
|
|
|
|
Hopefully the whole file will be copied in a single call.
|
|
|
|
The copying itself should be performed in a loop 'till EOF is
|
|
|
|
reached (0 return) so a blocksize smaller or bigger than the actual
|
|
|
|
file size should not make any difference, also in case the file
|
|
|
|
content changes while being copied.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB
|
|
|
|
except OSError:
|
|
|
|
blocksize = 2 ** 27 # 128 MiB
|
|
|
|
# On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
|
|
|
|
# see gh-82500.
|
|
|
|
if sys.maxsize < 2 ** 32:
|
|
|
|
blocksize = min(blocksize, 2 ** 30)
|
|
|
|
return blocksize
|
|
|
|
|
|
|
|
|
|
|
|
if fcntl and hasattr(fcntl, 'FICLONE'):
|
2024-08-24 15:11:39 +01:00
|
|
|
def _ficlone(source_fd, target_fd):
|
2024-06-14 17:15:49 +01:00
|
|
|
"""
|
|
|
|
Perform a lightweight copy of two files, where the data blocks are
|
|
|
|
copied only when modified. This is known as Copy on Write (CoW),
|
|
|
|
instantaneous copy or reflink.
|
|
|
|
"""
|
|
|
|
fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd)
|
|
|
|
else:
|
2024-08-24 15:11:39 +01:00
|
|
|
_ficlone = None
|
2024-06-14 17:15:49 +01:00
|
|
|
|
|
|
|
|
|
|
|
if posix and hasattr(posix, '_fcopyfile'):
|
2024-08-24 15:11:39 +01:00
|
|
|
def _fcopyfile(source_fd, target_fd):
|
2024-06-14 17:15:49 +01:00
|
|
|
"""
|
|
|
|
Copy a regular file content using high-performance fcopyfile(3)
|
|
|
|
syscall (macOS).
|
|
|
|
"""
|
|
|
|
posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA)
|
2024-08-24 15:11:39 +01:00
|
|
|
else:
|
|
|
|
_fcopyfile = None
|
|
|
|
|
|
|
|
|
|
|
|
if hasattr(os, 'copy_file_range'):
|
|
|
|
def _copy_file_range(source_fd, target_fd):
|
2024-06-14 17:15:49 +01:00
|
|
|
"""
|
|
|
|
Copy data from one regular mmap-like fd to another by using a
|
|
|
|
high-performance copy_file_range(2) syscall that gives filesystems
|
|
|
|
an opportunity to implement the use of reflinks or server-side
|
|
|
|
copy.
|
|
|
|
This should work on Linux >= 4.5 only.
|
|
|
|
"""
|
2024-08-24 15:11:39 +01:00
|
|
|
blocksize = _get_copy_blocksize(source_fd)
|
2024-06-14 17:15:49 +01:00
|
|
|
offset = 0
|
|
|
|
while True:
|
|
|
|
sent = os.copy_file_range(source_fd, target_fd, blocksize,
|
|
|
|
offset_dst=offset)
|
|
|
|
if sent == 0:
|
|
|
|
break # EOF
|
|
|
|
offset += sent
|
2024-08-24 15:11:39 +01:00
|
|
|
else:
|
|
|
|
_copy_file_range = None
|
|
|
|
|
|
|
|
|
|
|
|
if hasattr(os, 'sendfile'):
|
|
|
|
def _sendfile(source_fd, target_fd):
|
2024-06-14 17:15:49 +01:00
|
|
|
"""Copy data from one regular mmap-like fd to another by using
|
|
|
|
high-performance sendfile(2) syscall.
|
|
|
|
This should work on Linux >= 2.6.33 only.
|
|
|
|
"""
|
2024-08-24 15:11:39 +01:00
|
|
|
blocksize = _get_copy_blocksize(source_fd)
|
2024-06-14 17:15:49 +01:00
|
|
|
offset = 0
|
|
|
|
while True:
|
|
|
|
sent = os.sendfile(target_fd, source_fd, offset, blocksize)
|
|
|
|
if sent == 0:
|
|
|
|
break # EOF
|
|
|
|
offset += sent
|
|
|
|
else:
|
2024-08-24 15:11:39 +01:00
|
|
|
_sendfile = None
|
2024-06-14 17:15:49 +01:00
|
|
|
|
|
|
|
|
2024-08-11 22:43:18 +01:00
|
|
|
if _winapi and hasattr(_winapi, 'CopyFile2'):
|
|
|
|
def copyfile(source, target):
|
2024-06-14 17:15:49 +01:00
|
|
|
"""
|
|
|
|
Copy from one file to another using CopyFile2 (Windows only).
|
|
|
|
"""
|
2024-08-11 22:43:18 +01:00
|
|
|
_winapi.CopyFile2(source, target, 0)
|
2024-06-14 17:15:49 +01:00
|
|
|
else:
|
|
|
|
copyfile = None
|
|
|
|
|
|
|
|
|
|
|
|
def copyfileobj(source_f, target_f):
|
|
|
|
"""
|
|
|
|
Copy data from file-like object source_f to file-like object target_f.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
source_fd = source_f.fileno()
|
|
|
|
target_fd = target_f.fileno()
|
|
|
|
except Exception:
|
|
|
|
pass # Fall through to generic code.
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
# Use OS copy-on-write where available.
|
2024-08-24 15:11:39 +01:00
|
|
|
if _ficlone:
|
2024-06-14 17:15:49 +01:00
|
|
|
try:
|
2024-08-24 15:11:39 +01:00
|
|
|
_ficlone(source_fd, target_fd)
|
2024-06-14 17:15:49 +01:00
|
|
|
return
|
|
|
|
except OSError as err:
|
|
|
|
if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV):
|
|
|
|
raise err
|
|
|
|
|
|
|
|
# Use OS copy where available.
|
2024-08-24 15:11:39 +01:00
|
|
|
if _fcopyfile:
|
|
|
|
try:
|
|
|
|
_fcopyfile(source_fd, target_fd)
|
|
|
|
return
|
|
|
|
except OSError as err:
|
|
|
|
if err.errno not in (EINVAL, ENOTSUP):
|
|
|
|
raise err
|
|
|
|
if _copy_file_range:
|
|
|
|
try:
|
|
|
|
_copy_file_range(source_fd, target_fd)
|
|
|
|
return
|
|
|
|
except OSError as err:
|
|
|
|
if err.errno not in (ETXTBSY, EXDEV):
|
|
|
|
raise err
|
|
|
|
if _sendfile:
|
|
|
|
try:
|
|
|
|
_sendfile(source_fd, target_fd)
|
|
|
|
return
|
|
|
|
except OSError as err:
|
|
|
|
if err.errno != ENOTSOCK:
|
|
|
|
raise err
|
2024-06-14 17:15:49 +01:00
|
|
|
except OSError as err:
|
|
|
|
# Produce more useful error messages.
|
|
|
|
err.filename = source_f.name
|
|
|
|
err.filename2 = target_f.name
|
|
|
|
raise err
|
|
|
|
|
|
|
|
# Last resort: copy with fileobj read() and write().
|
|
|
|
read_source = source_f.read
|
|
|
|
write_target = target_f.write
|
|
|
|
while buf := read_source(1024 * 1024):
|
|
|
|
write_target(buf)
|
2025-02-08 01:16:45 +00:00
|
|
|
|
|
|
|
|
|
|
|
class _PathInfoBase:
|
|
|
|
__slots__ = ()
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
path_type = "WindowsPath" if os.name == "nt" else "PosixPath"
|
|
|
|
return f"<{path_type}.info>"
|
|
|
|
|
|
|
|
|
|
|
|
class _WindowsPathInfo(_PathInfoBase):
|
|
|
|
"""Implementation of pathlib.types.PathInfo that provides status
|
|
|
|
information for Windows paths. Don't try to construct it yourself."""
|
|
|
|
__slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink')
|
|
|
|
|
|
|
|
def __init__(self, path):
|
|
|
|
self._path = str(path)
|
|
|
|
|
|
|
|
def exists(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path exists."""
|
|
|
|
if not follow_symlinks and self.is_symlink():
|
|
|
|
return True
|
|
|
|
try:
|
|
|
|
return self._exists
|
|
|
|
except AttributeError:
|
|
|
|
if os.path.exists(self._path):
|
|
|
|
self._exists = True
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
self._exists = self._is_dir = self._is_file = False
|
|
|
|
return False
|
|
|
|
|
|
|
|
def is_dir(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path is a directory."""
|
|
|
|
if not follow_symlinks and self.is_symlink():
|
|
|
|
return False
|
|
|
|
try:
|
|
|
|
return self._is_dir
|
|
|
|
except AttributeError:
|
|
|
|
if os.path.isdir(self._path):
|
|
|
|
self._is_dir = self._exists = True
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
self._is_dir = False
|
|
|
|
return False
|
|
|
|
|
|
|
|
def is_file(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path is a regular file."""
|
|
|
|
if not follow_symlinks and self.is_symlink():
|
|
|
|
return False
|
|
|
|
try:
|
|
|
|
return self._is_file
|
|
|
|
except AttributeError:
|
|
|
|
if os.path.isfile(self._path):
|
|
|
|
self._is_file = self._exists = True
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
self._is_file = False
|
|
|
|
return False
|
|
|
|
|
|
|
|
def is_symlink(self):
|
|
|
|
"""Whether this path is a symbolic link."""
|
|
|
|
try:
|
|
|
|
return self._is_symlink
|
|
|
|
except AttributeError:
|
|
|
|
self._is_symlink = os.path.islink(self._path)
|
|
|
|
return self._is_symlink
|
|
|
|
|
|
|
|
|
|
|
|
class _PosixPathInfo(_PathInfoBase):
|
|
|
|
"""Implementation of pathlib.types.PathInfo that provides status
|
|
|
|
information for POSIX paths. Don't try to construct it yourself."""
|
|
|
|
__slots__ = ('_path', '_mode')
|
|
|
|
|
|
|
|
def __init__(self, path):
|
|
|
|
self._path = str(path)
|
|
|
|
self._mode = [None, None]
|
|
|
|
|
|
|
|
def _get_mode(self, *, follow_symlinks=True):
|
|
|
|
idx = bool(follow_symlinks)
|
|
|
|
mode = self._mode[idx]
|
|
|
|
if mode is None:
|
|
|
|
try:
|
|
|
|
st = os.stat(self._path, follow_symlinks=follow_symlinks)
|
|
|
|
except (OSError, ValueError):
|
|
|
|
mode = 0
|
|
|
|
else:
|
|
|
|
mode = st.st_mode
|
|
|
|
if follow_symlinks or S_ISLNK(mode):
|
|
|
|
self._mode[idx] = mode
|
|
|
|
else:
|
|
|
|
# Not a symlink, so stat() will give the same result
|
|
|
|
self._mode = [mode, mode]
|
|
|
|
return mode
|
|
|
|
|
|
|
|
def exists(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path exists."""
|
|
|
|
return self._get_mode(follow_symlinks=follow_symlinks) > 0
|
|
|
|
|
|
|
|
def is_dir(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path is a directory."""
|
|
|
|
return S_ISDIR(self._get_mode(follow_symlinks=follow_symlinks))
|
|
|
|
|
|
|
|
def is_file(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path is a regular file."""
|
|
|
|
return S_ISREG(self._get_mode(follow_symlinks=follow_symlinks))
|
|
|
|
|
|
|
|
def is_symlink(self):
|
|
|
|
"""Whether this path is a symbolic link."""
|
|
|
|
return S_ISLNK(self._get_mode(follow_symlinks=False))
|
|
|
|
|
|
|
|
|
|
|
|
PathInfo = _WindowsPathInfo if os.name == 'nt' else _PosixPathInfo
|
|
|
|
|
|
|
|
|
|
|
|
class DirEntryInfo(_PathInfoBase):
|
|
|
|
"""Implementation of pathlib.types.PathInfo that provides status
|
|
|
|
information by querying a wrapped os.DirEntry object. Don't try to
|
|
|
|
construct it yourself."""
|
|
|
|
__slots__ = ('_entry', '_exists')
|
|
|
|
|
|
|
|
def __init__(self, entry):
|
|
|
|
self._entry = entry
|
|
|
|
|
|
|
|
def exists(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path exists."""
|
|
|
|
if not follow_symlinks:
|
|
|
|
return True
|
|
|
|
try:
|
|
|
|
return self._exists
|
|
|
|
except AttributeError:
|
|
|
|
try:
|
|
|
|
self._entry.stat()
|
|
|
|
except OSError:
|
|
|
|
self._exists = False
|
|
|
|
else:
|
|
|
|
self._exists = True
|
|
|
|
return self._exists
|
|
|
|
|
|
|
|
def is_dir(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path is a directory."""
|
|
|
|
try:
|
|
|
|
return self._entry.is_dir(follow_symlinks=follow_symlinks)
|
|
|
|
except OSError:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def is_file(self, *, follow_symlinks=True):
|
|
|
|
"""Whether this path is a regular file."""
|
|
|
|
try:
|
|
|
|
return self._entry.is_file(follow_symlinks=follow_symlinks)
|
|
|
|
except OSError:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def is_symlink(self):
|
|
|
|
"""Whether this path is a symbolic link."""
|
|
|
|
try:
|
|
|
|
return self._entry.is_symlink()
|
|
|
|
except OSError:
|
|
|
|
return False
|