from inspect import Signature, signature
from typing import Any, Callable, List, Tuple, Type, Union
from attr import define
from tqdm import tqdm
from typing_extensions import TypeAlias
from ..files import PathLike, count_lines
from . import CsvIterator
# Transformation function as actually used under the hood
_TransformationFunction: TypeAlias = Callable[[List[str]], List[str]]
[docs]class StreamingCsvEditor:
"""
Edit the content of a CSV with a specified transformation, line-by-line.
This class avoids loading the whole file into memory as would be done
with a pandas DataFrame.
"""
[docs] def __init__(
self,
columns_in: List[str],
columns_out: List[str],
transformation: Callable[..., Any],
line_terminator: str = "\n",
):
"""
Args:
columns_in: names for the columns acting as input for the transformation.
columns_out: names for the columns where to write the result of the
transformation.
transformation: function to call on the values from the input columns,
with the results being written to the output columns.
The function should be annotated, and the following are admissible:
- For the parameters:
- one or several strings
- a list of strings (with one or more elements)
- a tuple of strings (with one or more elements)
- For the return type:
- one string
- a list of strings (with one or more elements)
- a tuple of strings (with one or more elements)
line_terminator: line terminator to use for writing the CSV.
"""
self.transformation = _CsvTransformation(
columns_in=columns_in,
columns_out=columns_out,
fn=_callback_handler(transformation),
)
self.line_terminator = line_terminator
[docs] def process(self, csv_iterator: CsvIterator) -> CsvIterator:
"""
Process and edit a CSV file.
Args:
csv_iterator: Input CSV iterator.
Returns:
an edited instance of a CsvIterator.
"""
helper = _Helper(csv_iterator.columns, transformation=self.transformation)
return CsvIterator(
columns=helper.output_columns,
rows=(helper.process_line(row) for row in csv_iterator.rows),
)
[docs] def process_paths(
self, path_in: PathLike, path_out: PathLike, verbose: bool = False
) -> None:
"""
Process and edit a CSV file.
Args:
path_in: path to the existing CSV.
path_out: path to the edited CSV (to be saved).
verbose: whether to write the progress with tqdm.
"""
with open(path_in, "rt") as f_in, open(path_out, "wt") as f_out:
input_iterator = CsvIterator.from_stream(f_in)
if verbose:
row_count = count_lines(path_in)
input_iterator = CsvIterator(
input_iterator.columns,
rows=(row for row in tqdm(input_iterator.rows, total=row_count)),
)
output_iterator = self.process(input_iterator)
output_iterator.to_stream(f_out, line_terminator=self.line_terminator)
@define
class _CsvTransformation:
"""Helper class containing the details of a transformation for one CSV file."""
columns_in: List[str]
columns_out: List[str]
fn: _TransformationFunction
class _Helper:
"""Helper class that does the actual row-by-row processing."""
def __init__(
self,
input_columns: List[str],
transformation: _CsvTransformation,
):
self.fn = transformation.fn
self.indices_in = self._determine_column_indices(
input_columns, transformation.columns_in
)
new_columns = [c for c in transformation.columns_out if c not in input_columns]
self.n_new_columns = len(new_columns)
self.output_columns = input_columns + new_columns
self.indices_out = self._determine_column_indices(
self.output_columns, transformation.columns_out
)
def _determine_column_indices(
self, all_columns: List[str], target_columns: List[str]
) -> List[int]:
indices: List[int] = []
for c in target_columns:
try:
indices.append(all_columns.index(c))
except ValueError:
raise RuntimeError(f'"{c}" not found in {all_columns}.')
return indices
def process_line(self, row: List[str]) -> List[str]:
"""Process one line from the CSV.
Args:
row: content of one CSV line.
Returns:
Content of the line after applying the function
"""
# Process the values
input_items = [row[i] for i in self.indices_in]
results = self.fn(input_items)
# Extend the row object to make space for the new values (if needed)
row.extend("" for _ in range(self.n_new_columns))
# overwrite the results
for index, result in zip(self.indices_out, results):
row[index] = result
return row
def _parameter_is_tuple(parameter_type: Type[Any]) -> bool:
return any(v in str(parameter_type) for v in ["Tuple", "tuple"])
def _parameter_is_list(parameter_type: Type[Any]) -> bool:
return any(v in str(parameter_type) for v in ["List", "list"])
def _parameter_is_list_or_tuple(parameter_type: Type[Any]) -> bool:
return _parameter_is_list(parameter_type) or _parameter_is_tuple(parameter_type)
def _postprocessing_fn(fn: Callable[..., Any]) -> Callable[..., List[str]]:
"""From the user-given function, wrap it so that the result is converted
to a list of strings."""
sig = signature(fn)
return_type = sig.return_annotation
if return_type is Signature.empty:
raise ValueError(
"Make sure that the function you provided has a return annotation."
)
adapter: Callable[..., List[str]]
if return_type is str:
def adapter(x: str) -> List[str]:
return [x]
return adapter
if _parameter_is_list_or_tuple(return_type):
def adapter(x: Union[List[str], Tuple[str]]) -> List[str]:
return list(x)
return adapter
raise ValueError(f"Unsupported return type: {return_type}")
def _preprocessing_fn(fn: Callable[..., Any]) -> Callable[[List[str]], Any]:
"""From the user-given function, wrap it so that it can ingest a list of strings."""
sig = signature(fn)
parameter_types = [p.annotation for p in sig.parameters.values()]
if any(p is Signature.empty for p in parameter_types):
raise ValueError(
"Make sure that the function you provided is fully type-annotated."
)
# Necessary for the below
adapter: Callable[[List[str]], Any]
parameters_are_strs = all(p is str for p in parameter_types)
if parameters_are_strs:
def adapter(inputs: List[str]) -> Any:
return fn(*inputs)
return adapter
parameters_is_list = len(parameter_types) == 1 and _parameter_is_list(
parameter_types[0]
)
if parameters_is_list:
def adapter(inputs: List[str]) -> Any:
return fn(inputs)
return adapter
parameters_is_tuple = len(parameter_types) == 1 and _parameter_is_tuple(
parameter_types[0]
)
if parameters_is_tuple:
def adapter(inputs: List[str]) -> Any:
return fn(tuple(inputs))
return adapter
raise ValueError(
f"Cannot process parameter types of function with signature {sig}."
)
def _callback_handler(fn: Callable[..., Any]) -> _TransformationFunction:
"""From the user-provided callback, convert it to a function converting
a list of strings to a list of strings."""
sig = signature(fn)
parameter_types = [p.annotation for p in sig.parameters.values()]
if any(p is Signature.empty for p in parameter_types):
raise ValueError(
"Make sure that the function you provided is fully type-annotated."
)
postprocessing_fn = _postprocessing_fn(fn)
preprocessing_fn = _preprocessing_fn(fn)
def new_fn(inputs: List[str]) -> List[str]:
return postprocessing_fn(preprocessing_fn(inputs))
return new_fn