Source code for rxn.utilities.csv.streaming_csv_editor

from inspect import Signature, signature
from typing import Any, Callable, List, Tuple, Type, Union

from attr import define
from tqdm import tqdm
from typing_extensions import TypeAlias

from ..files import PathLike, count_lines
from . import CsvIterator

# Transformation function as actually used under the hood
_TransformationFunction: TypeAlias = Callable[[List[str]], List[str]]


[docs]class StreamingCsvEditor: """ Edit the content of a CSV with a specified transformation, line-by-line. This class avoids loading the whole file into memory as would be done with a pandas DataFrame. """
[docs] def __init__( self, columns_in: List[str], columns_out: List[str], transformation: Callable[..., Any], line_terminator: str = "\n", ): """ Args: columns_in: names for the columns acting as input for the transformation. columns_out: names for the columns where to write the result of the transformation. transformation: function to call on the values from the input columns, with the results being written to the output columns. The function should be annotated, and the following are admissible: - For the parameters: - one or several strings - a list of strings (with one or more elements) - a tuple of strings (with one or more elements) - For the return type: - one string - a list of strings (with one or more elements) - a tuple of strings (with one or more elements) line_terminator: line terminator to use for writing the CSV. """ self.transformation = _CsvTransformation( columns_in=columns_in, columns_out=columns_out, fn=_callback_handler(transformation), ) self.line_terminator = line_terminator
[docs] def process(self, csv_iterator: CsvIterator) -> CsvIterator: """ Process and edit a CSV file. Args: csv_iterator: Input CSV iterator. Returns: an edited instance of a CsvIterator. """ helper = _Helper(csv_iterator.columns, transformation=self.transformation) return CsvIterator( columns=helper.output_columns, rows=(helper.process_line(row) for row in csv_iterator.rows), )
[docs] def process_paths( self, path_in: PathLike, path_out: PathLike, verbose: bool = False ) -> None: """ Process and edit a CSV file. Args: path_in: path to the existing CSV. path_out: path to the edited CSV (to be saved). verbose: whether to write the progress with tqdm. """ with open(path_in, "rt") as f_in, open(path_out, "wt") as f_out: input_iterator = CsvIterator.from_stream(f_in) if verbose: row_count = count_lines(path_in) input_iterator = CsvIterator( input_iterator.columns, rows=(row for row in tqdm(input_iterator.rows, total=row_count)), ) output_iterator = self.process(input_iterator) output_iterator.to_stream(f_out, line_terminator=self.line_terminator)
@define class _CsvTransformation: """Helper class containing the details of a transformation for one CSV file.""" columns_in: List[str] columns_out: List[str] fn: _TransformationFunction class _Helper: """Helper class that does the actual row-by-row processing.""" def __init__( self, input_columns: List[str], transformation: _CsvTransformation, ): self.fn = transformation.fn self.indices_in = self._determine_column_indices( input_columns, transformation.columns_in ) new_columns = [c for c in transformation.columns_out if c not in input_columns] self.n_new_columns = len(new_columns) self.output_columns = input_columns + new_columns self.indices_out = self._determine_column_indices( self.output_columns, transformation.columns_out ) def _determine_column_indices( self, all_columns: List[str], target_columns: List[str] ) -> List[int]: indices: List[int] = [] for c in target_columns: try: indices.append(all_columns.index(c)) except ValueError: raise RuntimeError(f'"{c}" not found in {all_columns}.') return indices def process_line(self, row: List[str]) -> List[str]: """Process one line from the CSV. Args: row: content of one CSV line. Returns: Content of the line after applying the function """ # Process the values input_items = [row[i] for i in self.indices_in] results = self.fn(input_items) # Extend the row object to make space for the new values (if needed) row.extend("" for _ in range(self.n_new_columns)) # overwrite the results for index, result in zip(self.indices_out, results): row[index] = result return row def _parameter_is_tuple(parameter_type: Type[Any]) -> bool: return any(v in str(parameter_type) for v in ["Tuple", "tuple"]) def _parameter_is_list(parameter_type: Type[Any]) -> bool: return any(v in str(parameter_type) for v in ["List", "list"]) def _parameter_is_list_or_tuple(parameter_type: Type[Any]) -> bool: return _parameter_is_list(parameter_type) or _parameter_is_tuple(parameter_type) def _postprocessing_fn(fn: Callable[..., Any]) -> Callable[..., List[str]]: """From the user-given function, wrap it so that the result is converted to a list of strings.""" sig = signature(fn) return_type = sig.return_annotation if return_type is Signature.empty: raise ValueError( "Make sure that the function you provided has a return annotation." ) adapter: Callable[..., List[str]] if return_type is str: def adapter(x: str) -> List[str]: return [x] return adapter if _parameter_is_list_or_tuple(return_type): def adapter(x: Union[List[str], Tuple[str]]) -> List[str]: return list(x) return adapter raise ValueError(f"Unsupported return type: {return_type}") def _preprocessing_fn(fn: Callable[..., Any]) -> Callable[[List[str]], Any]: """From the user-given function, wrap it so that it can ingest a list of strings.""" sig = signature(fn) parameter_types = [p.annotation for p in sig.parameters.values()] if any(p is Signature.empty for p in parameter_types): raise ValueError( "Make sure that the function you provided is fully type-annotated." ) # Necessary for the below adapter: Callable[[List[str]], Any] parameters_are_strs = all(p is str for p in parameter_types) if parameters_are_strs: def adapter(inputs: List[str]) -> Any: return fn(*inputs) return adapter parameters_is_list = len(parameter_types) == 1 and _parameter_is_list( parameter_types[0] ) if parameters_is_list: def adapter(inputs: List[str]) -> Any: return fn(inputs) return adapter parameters_is_tuple = len(parameter_types) == 1 and _parameter_is_tuple( parameter_types[0] ) if parameters_is_tuple: def adapter(inputs: List[str]) -> Any: return fn(tuple(inputs)) return adapter raise ValueError( f"Cannot process parameter types of function with signature {sig}." ) def _callback_handler(fn: Callable[..., Any]) -> _TransformationFunction: """From the user-provided callback, convert it to a function converting a list of strings to a list of strings.""" sig = signature(fn) parameter_types = [p.annotation for p in sig.parameters.values()] if any(p is Signature.empty for p in parameter_types): raise ValueError( "Make sure that the function you provided is fully type-annotated." ) postprocessing_fn = _postprocessing_fn(fn) preprocessing_fn = _preprocessing_fn(fn) def new_fn(inputs: List[str]) -> List[str]: return postprocessing_fn(preprocessing_fn(inputs)) return new_fn