615 lines
21 KiB
Python
Raw Normal View History

"""
Low-level OS functionality wrappers used by pathlib.
"""
from errno import *
from stat import S_ISDIR, S_ISREG, S_ISLNK, S_IMODE
import io
import os
import sys
try:
import fcntl
except ImportError:
fcntl = None
try:
import posix
except ImportError:
posix = None
try:
import _winapi
except ImportError:
_winapi = None
def _get_copy_blocksize(infd):
"""Determine blocksize for fastcopying on Linux.
Hopefully the whole file will be copied in a single call.
The copying itself should be performed in a loop 'till EOF is
reached (0 return) so a blocksize smaller or bigger than the actual
file size should not make any difference, also in case the file
content changes while being copied.
"""
try:
blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB
except OSError:
blocksize = 2 ** 27 # 128 MiB
# On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
# see gh-82500.
if sys.maxsize < 2 ** 32:
blocksize = min(blocksize, 2 ** 30)
return blocksize
if fcntl and hasattr(fcntl, 'FICLONE'):
def _ficlone(source_fd, target_fd):
"""
Perform a lightweight copy of two files, where the data blocks are
copied only when modified. This is known as Copy on Write (CoW),
instantaneous copy or reflink.
"""
fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd)
else:
_ficlone = None
if posix and hasattr(posix, '_fcopyfile'):
def _fcopyfile(source_fd, target_fd):
"""
Copy a regular file content using high-performance fcopyfile(3)
syscall (macOS).
"""
posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA)
else:
_fcopyfile = None
if hasattr(os, 'copy_file_range'):
def _copy_file_range(source_fd, target_fd):
"""
Copy data from one regular mmap-like fd to another by using a
high-performance copy_file_range(2) syscall that gives filesystems
an opportunity to implement the use of reflinks or server-side
copy.
This should work on Linux >= 4.5 only.
"""
blocksize = _get_copy_blocksize(source_fd)
offset = 0
while True:
sent = os.copy_file_range(source_fd, target_fd, blocksize,
offset_dst=offset)
if sent == 0:
break # EOF
offset += sent
else:
_copy_file_range = None
if hasattr(os, 'sendfile'):
def _sendfile(source_fd, target_fd):
"""Copy data from one regular mmap-like fd to another by using
high-performance sendfile(2) syscall.
This should work on Linux >= 2.6.33 only.
"""
blocksize = _get_copy_blocksize(source_fd)
offset = 0
while True:
sent = os.sendfile(target_fd, source_fd, offset, blocksize)
if sent == 0:
break # EOF
offset += sent
else:
_sendfile = None
if _winapi and hasattr(_winapi, 'CopyFile2'):
def copyfile(source, target):
"""
Copy from one file to another using CopyFile2 (Windows only).
"""
_winapi.CopyFile2(source, target, 0)
else:
copyfile = None
def copyfileobj(source_f, target_f):
"""
Copy data from file-like object source_f to file-like object target_f.
"""
try:
source_fd = source_f.fileno()
target_fd = target_f.fileno()
except Exception:
pass # Fall through to generic code.
else:
try:
# Use OS copy-on-write where available.
if _ficlone:
try:
_ficlone(source_fd, target_fd)
return
except OSError as err:
if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV):
raise err
# Use OS copy where available.
if _fcopyfile:
try:
_fcopyfile(source_fd, target_fd)
return
except OSError as err:
if err.errno not in (EINVAL, ENOTSUP):
raise err
if _copy_file_range:
try:
_copy_file_range(source_fd, target_fd)
return
except OSError as err:
if err.errno not in (ETXTBSY, EXDEV):
raise err
if _sendfile:
try:
_sendfile(source_fd, target_fd)
return
except OSError as err:
if err.errno != ENOTSOCK:
raise err
except OSError as err:
# Produce more useful error messages.
err.filename = source_f.name
err.filename2 = target_f.name
raise err
# Last resort: copy with fileobj read() and write().
read_source = source_f.read
write_target = target_f.write
while buf := read_source(1024 * 1024):
write_target(buf)
def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
newline=None):
"""
Open the file pointed to by this path and return a file object, as
the built-in open() function does.
"""
try:
return io.open(path, mode, buffering, encoding, errors, newline)
except TypeError:
pass
cls = type(path)
text = 'b' not in mode
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
if text:
try:
attr = getattr(cls, f'__open_{mode}__')
except AttributeError:
pass
else:
return attr(path, buffering, encoding, errors, newline)
try:
attr = getattr(cls, f'__open_{mode}b__')
except AttributeError:
pass
else:
stream = attr(path, buffering)
if text:
stream = io.TextIOWrapper(stream, encoding, errors, newline)
return stream
raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
class CopyReader:
"""
Class that implements the "read" part of copying between path objects.
An instance of this class is available from the ReadablePath._copy_reader
property.
"""
__slots__ = ('_path',)
def __init__(self, path):
self._path = path
_readable_metakeys = frozenset()
def _read_metadata(self, metakeys, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise NotImplementedError
class CopyWriter:
"""
Class that implements the "write" part of copying between path objects. An
instance of this class is available from the WritablePath._copy_writer
property.
"""
__slots__ = ('_path',)
def __init__(self, path):
self._path = path
_writable_metakeys = frozenset()
def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise NotImplementedError
def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
self._ensure_distinct_path(source)
if preserve_metadata:
metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
else:
metakeys = None
if not follow_symlinks and source.is_symlink():
self._create_symlink(source, metakeys)
elif source.is_dir():
self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
else:
self._create_file(source, metakeys)
return self._path
def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst._copy_writer._create_symlink(src, metakeys)
elif src.is_dir():
dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
else:
dst._copy_writer._create_file(src, metakeys)
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)
def _create_file(self, source, metakeys):
"""Copy the given file to our path."""
self._ensure_different_file(source)
with magic_open(source, 'rb') as source_f:
try:
with magic_open(self._path, 'wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)
def _create_symlink(self, source, metakeys):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)
def _ensure_different_file(self, source):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
pass
def _ensure_distinct_path(self, source):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if source == self._path:
err = OSError(EINVAL, "Source and target are the same path")
elif source in self._path.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = str(source)
err.filename2 = str(self._path)
raise err
class LocalCopyReader(CopyReader):
"""This object implements the "read" part of copying local paths. Don't
try to construct it yourself.
"""
__slots__ = ()
_readable_metakeys = {'mode', 'times_ns'}
if hasattr(os.stat_result, 'st_flags'):
_readable_metakeys.add('flags')
if hasattr(os, 'listxattr'):
_readable_metakeys.add('xattrs')
_readable_metakeys = frozenset(_readable_metakeys)
def _read_metadata(self, metakeys, *, follow_symlinks=True):
metadata = {}
if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
st = self._path.stat(follow_symlinks=follow_symlinks)
if 'mode' in metakeys:
metadata['mode'] = S_IMODE(st.st_mode)
if 'times_ns' in metakeys:
metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
if 'flags' in metakeys:
metadata['flags'] = st.st_flags
if 'xattrs' in metakeys:
try:
metadata['xattrs'] = [
(attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
except OSError as err:
if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
return metadata
class LocalCopyWriter(CopyWriter):
"""This object implements the "write" part of copying local paths. Don't
try to construct it yourself.
"""
__slots__ = ()
_writable_metakeys = LocalCopyReader._readable_metakeys
def _write_metadata(self, metadata, *, follow_symlinks=True):
def _nop(*args, ns=None, follow_symlinks=None):
pass
if follow_symlinks:
# use the real function if it exists
def lookup(name):
return getattr(os, name, _nop)
else:
# use the real function only if it exists
# *and* it supports follow_symlinks
def lookup(name):
fn = getattr(os, name, _nop)
if fn in os.supports_follow_symlinks:
return fn
return _nop
times_ns = metadata.get('times_ns')
if times_ns is not None:
lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
xattrs = metadata.get('xattrs')
if xattrs is not None:
for attr, value in xattrs:
try:
os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
mode = metadata.get('mode')
if mode is not None:
try:
lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
# * lchown() is unavailable, and
# * either
# * fchownat() is unavailable or
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
# (it returned ENOSUP.)
# therefore we're out of options--we simply cannot chown the
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass
flags = metadata.get('flags')
if flags is not None:
try:
lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise
if copyfile:
# Use fast OS routine for local file copying where available.
def _create_file(self, source, metakeys):
"""Copy the given file to the given target."""
try:
source = os.fspath(source)
except TypeError:
super()._create_file(source, metakeys)
else:
copyfile(source, os.fspath(self._path))
if os.name == 'nt':
# Windows: symlink target might not exist yet if we're copying several
# files, so ensure we pass is_dir to os.symlink().
def _create_symlink(self, source, metakeys):
"""Copy the given symlink to the given target."""
self._path.symlink_to(source.readlink(), source.is_dir())
if metakeys:
metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)
def _ensure_different_file(self, source):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
try:
if not self._path.samefile(source):
return
except (OSError, ValueError):
return
err = OSError(EINVAL, "Source and target are the same file")
err.filename = str(source)
err.filename2 = str(self._path)
raise err
class _PathInfoBase:
__slots__ = ()
def __repr__(self):
path_type = "WindowsPath" if os.name == "nt" else "PosixPath"
return f"<{path_type}.info>"
class _WindowsPathInfo(_PathInfoBase):
"""Implementation of pathlib.types.PathInfo that provides status
information for Windows paths. Don't try to construct it yourself."""
__slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink')
def __init__(self, path):
self._path = str(path)
def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
if not follow_symlinks and self.is_symlink():
return True
try:
return self._exists
except AttributeError:
if os.path.exists(self._path):
self._exists = True
return True
else:
self._exists = self._is_dir = self._is_file = False
return False
def is_dir(self, *, follow_symlinks=True):
"""Whether this path is a directory."""
if not follow_symlinks and self.is_symlink():
return False
try:
return self._is_dir
except AttributeError:
if os.path.isdir(self._path):
self._is_dir = self._exists = True
return True
else:
self._is_dir = False
return False
def is_file(self, *, follow_symlinks=True):
"""Whether this path is a regular file."""
if not follow_symlinks and self.is_symlink():
return False
try:
return self._is_file
except AttributeError:
if os.path.isfile(self._path):
self._is_file = self._exists = True
return True
else:
self._is_file = False
return False
def is_symlink(self):
"""Whether this path is a symbolic link."""
try:
return self._is_symlink
except AttributeError:
self._is_symlink = os.path.islink(self._path)
return self._is_symlink
class _PosixPathInfo(_PathInfoBase):
"""Implementation of pathlib.types.PathInfo that provides status
information for POSIX paths. Don't try to construct it yourself."""
__slots__ = ('_path', '_mode')
def __init__(self, path):
self._path = str(path)
self._mode = [None, None]
def _get_mode(self, *, follow_symlinks=True):
idx = bool(follow_symlinks)
mode = self._mode[idx]
if mode is None:
try:
st = os.stat(self._path, follow_symlinks=follow_symlinks)
except (OSError, ValueError):
mode = 0
else:
mode = st.st_mode
if follow_symlinks or S_ISLNK(mode):
self._mode[idx] = mode
else:
# Not a symlink, so stat() will give the same result
self._mode = [mode, mode]
return mode
def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
return self._get_mode(follow_symlinks=follow_symlinks) > 0
def is_dir(self, *, follow_symlinks=True):
"""Whether this path is a directory."""
return S_ISDIR(self._get_mode(follow_symlinks=follow_symlinks))
def is_file(self, *, follow_symlinks=True):
"""Whether this path is a regular file."""
return S_ISREG(self._get_mode(follow_symlinks=follow_symlinks))
def is_symlink(self):
"""Whether this path is a symbolic link."""
return S_ISLNK(self._get_mode(follow_symlinks=False))
PathInfo = _WindowsPathInfo if os.name == 'nt' else _PosixPathInfo
class DirEntryInfo(_PathInfoBase):
"""Implementation of pathlib.types.PathInfo that provides status
information by querying a wrapped os.DirEntry object. Don't try to
construct it yourself."""
__slots__ = ('_entry', '_exists')
def __init__(self, entry):
self._entry = entry
def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
if not follow_symlinks:
return True
try:
return self._exists
except AttributeError:
try:
self._entry.stat()
except OSError:
self._exists = False
else:
self._exists = True
return self._exists
def is_dir(self, *, follow_symlinks=True):
"""Whether this path is a directory."""
try:
return self._entry.is_dir(follow_symlinks=follow_symlinks)
except OSError:
return False
def is_file(self, *, follow_symlinks=True):
"""Whether this path is a regular file."""
try:
return self._entry.is_file(follow_symlinks=follow_symlinks)
except OSError:
return False
def is_symlink(self):
"""Whether this path is a symbolic link."""
try:
return self._entry.is_symlink()
except OSError:
return False