Source code for rxn.utilities.files

import errno
import os
import random
import shutil
import sys
import tempfile
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import Iterable, Iterator, List, Tuple, Union

from typing_extensions import TypeAlias

from .basic import temporary_random_seed
from .containers import all_identical

PathLike: TypeAlias = Union[str, os.PathLike]


[docs]def load_list_from_file(filename: PathLike) -> List[str]: return list(iterate_lines_from_file(filename))
[docs]def iterate_lines_from_file(filename: PathLike) -> Iterator[str]: with open(filename, "rt") as f: for line in f: yield line.rstrip("\r\n")
[docs]def dump_list_to_file(values: Iterable[str], filename: PathLike) -> None: """Write an iterable of strings to a file. Args: values: values to write to the file. filename: file to write to. Will be overwritten if it exists already. """ with open(filename, "wt") as f: for v in values: f.write(f"{v}\n")
[docs]def append_to_file(values: Iterable[str], filename: PathLike) -> None: """Append an iterable of strings to a file. Args: values: values to append to the file. filename: file to append to. """ with open(filename, "at") as f: for v in values: f.write(f"{v}\n")
[docs]def count_lines(filename: PathLike) -> int: return sum(1 for _ in open(filename))
[docs]def iterate_tuples_from_files( filenames: List[PathLike], ) -> Iterator[Tuple[str, ...]]: """ Read from several files at once, and put the values from the same lines numbers into tuples. Args: filenames: files to read. Returns: iterator over the generated tuples. """ # Make sure the files have the same lengths. This is not the optimal solution # and in principle, one could detect unequal lengths when reading the files. # However, an easy solution is available only from Python 3.10: # https://stackoverflow.com/q/32954486 if not all_identical([count_lines(file) for file in filenames]): raise ValueError("Not all the files have identical lengths") # Opening several files at once; # See https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack with ExitStack() as stack: files = [stack.enter_context(open(fname, "rt")) for fname in filenames] iterators = [(line.rstrip("\r\n") for line in f) for f in files] yield from zip(*iterators)
[docs]def dump_tuples_to_files( values: Iterable[Tuple[str, ...]], filenames: List[PathLike] ) -> None: """Write tuples to multiple files (1st tuple value ends up in 1st file, etc.). Args: values: tuples to write to files. filenames: files to create. """ # Opening several files at once; # See https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack with ExitStack() as stack: files = [stack.enter_context(open(fname, "wt")) for fname in filenames] number_files = len(files) for value_tuple in values: if len(value_tuple) != number_files: raise ValueError( f"Tuple {value_tuple} has incorrect size (expected: {number_files})." ) for value, f in zip(value_tuple, files): f.write(f"{value}\n")
[docs]def stable_shuffle( input_file: PathLike, output_file: PathLike, seed: int, is_csv: bool = False ) -> None: """ Shuffle a file in a deterministic order (the same seed always reorders files of the same number of lines identically). Useful, as an example, to shuffle a source and target files identically. Args: input_file: file to shuffle. output_file: where to save the shuffled file. is_csv: if True, the first line will not be shuffled. """ # Note we use the context manager to avoid side effects of setting the seed. with temporary_random_seed(seed): line_iterator = iterate_lines_from_file(input_file) # Get the header, if it's a CSV. We store it as a list, which will have 0 or 1 element. header = [] if is_csv: header = [next(line_iterator)] # Get actual content and shuffle it lines = list(line_iterator) random.shuffle(lines) # Write header (if there is no header, it will empty the file) dump_list_to_file(header, output_file) # Write the shuffled lines append_to_file(lines, output_file)
[docs]@contextmanager def named_temporary_path(delete: bool = True) -> Iterator[Path]: """ Get the path for a temporary file or directory, without creating it (can be especially useful in tests). This is similar to tempfile.NamedTemporaryFile, when the file is not to be actually opened, and one is just interested in obtaining a writable / readable path to optionally delete at the end of the context. This function was originally created to bypass a limitation of NamedTemporaryFile on Windows (https://stackoverflow.com/q/23212435), which becomes relevant when one does not want the file to be opened automatically. The solution is inspired by https://stackoverflow.com/a/58955530. Args: delete: whether to delete the file when exiting the context Examples: >>> with named_temporary_path() as temporary_path: ... # do something on the temporary path. ... # The file or directory at that path will be deleted at the ... # end of the context, except if delete=False. """ base_temp_dir = Path(tempfile.gettempdir()) temporary_path = base_temp_dir / os.urandom(24).hex() try: yield temporary_path finally: if delete and temporary_path.exists(): if temporary_path.is_file(): temporary_path.unlink() else: shutil.rmtree(temporary_path)
[docs]@contextmanager def named_temporary_directory(delete: bool = True) -> Iterator[Path]: """ Get the path for a temporary directory and create it. Relies on ``named_temporary_path`` to provide a context manager that will automatically delete the directory when leaving the context. Args: delete: whether to delete the file when exiting the context Examples: >>> with named_temporary_directory() as temporary_directory: ... # do something with the temporary directory. ... # The directory will be deleted at the ... # end of the context, except if delete=False. """ with named_temporary_path(delete=delete) as path: path.mkdir() yield path
[docs]def is_pathname_valid(pathname: PathLike) -> bool: """ `True` if the passed pathname is a valid pathname for the current OS; `False` otherwise. Copied from https://stackoverflow.com/a/34102855. More details there. """ pathname = str(pathname) try: if not isinstance(pathname, str) or not pathname: return False _, pathname = os.path.splitdrive(pathname) root_dirname = ( os.environ.get("HOMEDRIVE", "C:") if sys.platform == "win32" else os.path.sep ) assert os.path.isdir(root_dirname) root_dirname = root_dirname.rstrip(os.path.sep) + os.path.sep for pathname_part in pathname.split(os.path.sep): try: os.lstat(root_dirname + pathname_part) except OSError as exc: if hasattr(exc, "winerror"): error_invalid_name = 123 if exc.winerror == error_invalid_name: return False elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}: return False except TypeError: return False else: return True
[docs]def is_path_creatable(pathname: PathLike) -> bool: """ `True` if the current user has sufficient permissions to create the passed pathname; `False` otherwise. Copied from https://stackoverflow.com/a/34102855. More details there. """ pathname = str(pathname) dirname = os.path.dirname(pathname) or os.getcwd() return os.access(dirname, os.W_OK)
[docs]def is_path_exists_or_creatable(pathname: PathLike) -> bool: """ `True` if the passed pathname is a valid pathname for the current OS _and_ either currently exists or is hypothetically creatable; `False` otherwise. This function is guaranteed to _never_ raise exceptions. Copied from https://stackoverflow.com/a/34102855. More details there. """ pathname = str(pathname) try: return is_pathname_valid(pathname) and ( os.path.exists(pathname) or is_path_creatable(pathname) ) except OSError: return False
[docs]def paths_are_identical(*paths: PathLike) -> bool: """Whether paths, possibly given in a mix of absolute and relative formats, point to the same file.""" real_paths = {os.path.realpath(p) for p in paths} return len(real_paths) == 1
[docs]def raise_if_paths_are_identical(*paths: PathLike) -> None: """ Raise an exception if input and output paths point to the same file. """ if paths_are_identical(*paths): paths_str = ", ".join(f'"{p}"' for p in paths) raise ValueError(f"The paths {paths_str} must be different.")
[docs]def ensure_directory_exists_and_is_empty(directory: Path) -> None: """Create a directory if it does not exist already, and raise if not empty.""" directory.mkdir(parents=True, exist_ok=True) directory_contains_files = any(directory.iterdir()) if directory_contains_files: raise RuntimeError(f'The directory "{directory}" is required to be empty.')
[docs]def get_file_size_as_string(file: PathLike) -> str: """Get the file size as a readable string. Adapted from https://stackoverflow.com/a/39988702. Args: file: File to get the size for. Raises: ValueError: if the given path is not a file. Returns: Readable string for the file size (such as "1000.0 bytes", "2.3 KB", or "1.1 MB"). """ if not isinstance(file, Path): file = Path(file) if file.is_dir(): raise ValueError(f'"{file} should be a file, but it is a directory.') # Get the size in bytes size: Union[int, float] = file.stat().st_size for unit in ["bytes", "KB", "MB", "GB", "TB"]: if size < 1024.0: return f"{size:3.1f} {unit}" size /= 1024.0 raise RuntimeError(f'The file "{file}" is too big to determine the size.')