Skip to content

epymorph.adrio.processing

Data processing utilities for ADRIOs.

DataT module-attribute

DataT = TypeVar('DataT', bound=generic)

A numpy array dtype.

FixLikeInt module-attribute

FixLikeInt = (
    Fix[int64] | int | Callable[[], int] | Literal[False]
)

A value which can be coerced into a Fix for integers.

FixLikeFloat module-attribute

FixLikeFloat = (
    Fix[float64]
    | float
    | Callable[[], float]
    | Literal[False]
)

A value which can be coerced into a Fix for floats.

FillLikeInt module-attribute

FillLikeInt = (
    Fill[int64] | int | Callable[[], int] | Literal[False]
)

A value which can be coerced into a Fill for integers.

FillLikeFloat module-attribute

FillLikeFloat = (
    Fill[float64]
    | float
    | Callable[[], float]
    | Literal[False]
)

A value which can be coerced into a Fill for floats.

HasRandomness

Bases: Protocol

Protocol for an object containing a numpy random number generator.

rng abstractmethod property

rng: Generator

The random number generator instance.

Fix

Bases: ABC, Generic[DataT]

A method for fixing data issues as part of a DataPipeline. Fix instances act as functions (they have call semantics).

Fix is generic in the dtype of the data it fixes (DataT).

__call__ abstractmethod

__call__(
    rng: HasRandomness,
    replace: DataT,
    columns: tuple[str, ...],
    data_df: DataFrame,
) -> DataFrame

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • replace (DataT) –

    The value to replace.

  • columns (tuple[str, ...]) –

    The names of the columns to fix.

  • data_df (DataFrame) –

    The data to fix.

Returns:

  • DataFrame

    The data with the fix applied (a copy if modified).

of_int64 staticmethod

of_int64(fix: FixLikeInt) -> Fix[int64]

Convenience constructor for a Fix for int64 data. The type of Fix returned depends on the type of argument provided.

Parameters:

  • fix (FixLikeInt) –

    A value which implies the type of fix to apply:

    • Fix[np.int64]: is returned unchanged (no-op)
    • int: returns a ConstantFix, to replace bad values with a constant
    • Callable[[], int]: return a FunctionFix, to replace bad values with values obtained from the given callable
    • False: return a DontFix, indicating not to replace bad values

Returns:

  • Fix[int64]

    A Fix instance as determined by the type of the argument.

of_float64 staticmethod

of_float64(fix: FixLikeFloat) -> Fix[float64]

Convenience constructor for a Fix for float64 data. The type of Fix returned depends on the type of argument provided.

Parameters:

  • fix (FixLikeFloat) –

    A value which implies the type of fix to apply.

    • Fix[np.float64]: is returned unchanged (no-op)
    • float: returns a ConstantFix, to replace bad values with a constant
    • Callable[[], float]: return a FunctionFix, to replace bad values with values obtained from the given callable
    • False: return a DontFix, indicating not to replace bad values

Returns:

  • Fix[float64]

    A Fix instance as determined by the type of the argument.

ConstantFix dataclass

ConstantFix(with_value: DataT)

Bases: Fix[DataT]

A Fix which replaces values with a constant value.

ConstantFix is generic in the dtype of the data it fixes (DataT).

Parameters:

  • with_value (DataT) –

    The value to use to replace bad values.

with_value instance-attribute

with_value: DataT

The value to use to replace bad values.

__call__

__call__(rng, replace, columns, data_df)

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • replace (DataT) –

    The value to replace.

  • columns (tuple[str, ...]) –

    The names of the columns to fix.

  • data_df (DataFrame) –

    The data to fix.

Returns:

  • DataFrame

    The data with the fix applied (a copy if modified).

FunctionFix dataclass

FunctionFix(with_function: Callable[[], DataT])

Bases: Fix[DataT]

A Fix which replaces values with values generated by the given function.

FunctionFix is generic in the dtype of the data it fixes (DataT).

Parameters:

  • with_function (Callable[[], DataT]) –

    The function that generates replacement values.

with_function instance-attribute

with_function: Callable[[], DataT]

The function that generates replacement values.

__call__

__call__(rng, replace, columns, data_df)

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • replace (DataT) –

    The value to replace.

  • columns (tuple[str, ...]) –

    The names of the columns to fix.

  • data_df (DataFrame) –

    The data to fix.

Returns:

  • DataFrame

    The data with the fix applied (a copy if modified).

apply staticmethod

apply(
    data_df: DataFrame,
    replace: DataT,
    columns: tuple[str, ...],
    with_function: Callable[[], DataT],
) -> DataFrame

A static method that performs the work of a FunctionFix. This method can be useful in creating other Fix instances, when their replacement value logic can be expressed as a no-parameter function.

Parameters:

  • data_df (DataFrame) –

    The data to fix.

  • replace (DataT) –

    The value to replace.

  • with_function (Callable[[], DataT]) –

    The function used to generate replacement values.

Returns:

  • DataFrame

    A copy of the data with bad values fixed.

RandomFix dataclass

RandomFix(with_random: Callable[[Generator], DataT])

Bases: Fix[DataT]

A Fix which replaces values with randomly-generated values.

RandomFix is generic in the dtype of the data it fixes (DataT).

Parameters:

  • with_random (Callable[[Generator], DataT]) –

    A function for generating replacement values using the given numpy random number generator.

with_random instance-attribute

with_random: Callable[[Generator], DataT]

A function for generating replacement values using the given numpy random number generator.

__call__

__call__(rng, replace, columns, data_df)

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • replace (DataT) –

    The value to replace.

  • columns (tuple[str, ...]) –

    The names of the columns to fix.

  • data_df (DataFrame) –

    The data to fix.

Returns:

  • DataFrame

    The data with the fix applied (a copy if modified).

from_range staticmethod

from_range(low: int, high: int) -> RandomFix[int64]

Convenience constructor for a RandomFix which replaces values with values sampled uniformly from a discrete range of integers.

Parameters:

  • low (int) –

    The lowest replacement value.

  • high (int) –

    The highest replacement value.

Returns:

from_range_float staticmethod

from_range_float(
    low: float, high: float
) -> RandomFix[float64]

Convenience constructor for a RandomFix which replaces values with values sampled uniformly from a continuous range.

Parameters:

  • low (float) –

    The low end of the range of replacement values.

  • high (float) –

    The high end of the range of replacement values. (Not included in the possible values.)

Returns:

DontFix dataclass

DontFix()

Bases: Fix[Any]

A special Fix which does not fix values and simply returns the data as-is (no-op).

__call__

__call__(rng, replace, columns, data_df)

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • replace (DataT) –

    The value to replace.

  • columns (tuple[str, ...]) –

    The names of the columns to fix.

  • data_df (DataFrame) –

    The data to fix.

Returns:

  • DataFrame

    The data with the fix applied (a copy if modified).

Fill

Bases: ABC, Generic[DataT]

A method for filling-in missing data as part of a DataPipeline. Fill instances act as functions (they have call semantics).

Fill is generic in the dtype of the data it fixes (DataT).

__call__ abstractmethod

__call__(
    rng: HasRandomness,
    data_np: NDArray[DataT],
    missing_mask: NDArray[bool_],
) -> tuple[NDArray[DataT], NDArray[bool_] | None]

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • data_np (NDArray[DataT]) –

    The data to fix.

  • missing_mask (NDArray[bool_]) –

    A mask indicating values which should be considered missing.

Returns:

  • tuple[NDArray[DataT], NDArray[bool_] | None]

    A tuple containing two values:

    • a copy of the data with the fill applied (a copy if modified),
    • an updated missing values mask. Fill methods may or may not replace all missing values; if all missing values have been filled, this will be None.

of_int64 staticmethod

of_int64(fill: FillLikeInt) -> Fill[int64]

Convenience constructor for a Fill for int64 data. The type of Fill returned depends on the type of argument provided.

Parameters:

  • fill (FillLikeInt) –

    A value which implies the type of fix to apply.

    • Fill[np.int64]: is returned unchanged (no-op)
    • int: returns a ConstantFill, to replace missing values with a constant
    • Callable[[], int]: return a FunctionFill, to replace missing values with values obtained from the given callable
    • False: return a DontFill, indicating not to replace missing values

Returns:

  • Fill[int64]

    A Fill instance as determined by the type of the argument.

of_float64 staticmethod

of_float64(fill: FillLikeFloat) -> Fill[float64]

Convenience constructor for a Fill for float64 data. The type of Fill returned depends on the type of argument provided.

Parameters:

  • fill (FillLikeFloat) –

    A value which implies the type of fix to apply.

    • Fill[np.float64]: is returned unchanged (no-op)
    • float or int: returns a ConstantFill, to replace missing values with a constant
    • Callable[[], float]: return a FunctionFill, to replace missing values with values obtained from the given callable
    • False: return a DontFill, indicating not to replace missing values

Returns:

  • Fill[float64]

    A Fill instance as determined by the type of the argument.

ConstantFill dataclass

ConstantFill(with_value: DataT)

Bases: Fill[DataT]

A Fill which replaces missing values with a constant value.

ConstantFill is generic in the dtype of the data it fixes (DataT).

Parameters:

  • with_value (DataT) –

    The value to use to replace missing values.

with_value instance-attribute

with_value: DataT

The value to use to replace missing values.

__call__

__call__(
    rng, data_np, missing_mask
) -> tuple[NDArray[DataT], NDArray[bool_] | None]

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • data_np (NDArray[DataT]) –

    The data to fix.

  • missing_mask (NDArray[bool_]) –

    A mask indicating values which should be considered missing.

Returns:

  • tuple[NDArray[DataT], NDArray[bool_] | None]

    A tuple containing two values:

    • a copy of the data with the fill applied (a copy if modified),
    • an updated missing values mask. Fill methods may or may not replace all missing values; if all missing values have been filled, this will be None.

FunctionFill dataclass

FunctionFill(with_function: Callable[[], DataT])

Bases: Fill[DataT]

A Fill which replaces missing values with values generated by the given function.

FunctionFill is generic in the dtype of the data it fixes (DataT).

Parameters:

  • with_function (Callable[[], DataT]) –

    The function that generates replacement values.

with_function instance-attribute

with_function: Callable[[], DataT]

The function that generates replacement values.

__call__

__call__(
    rng, data_np, missing_mask
) -> tuple[NDArray[DataT], NDArray[bool_] | None]

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • data_np (NDArray[DataT]) –

    The data to fix.

  • missing_mask (NDArray[bool_]) –

    A mask indicating values which should be considered missing.

Returns:

  • tuple[NDArray[DataT], NDArray[bool_] | None]

    A tuple containing two values:

    • a copy of the data with the fill applied (a copy if modified),
    • an updated missing values mask. Fill methods may or may not replace all missing values; if all missing values have been filled, this will be None.

apply staticmethod

apply(
    data_np: NDArray[DataT],
    missing_mask: NDArray[bool_],
    with_function: Callable[[], DataT],
) -> tuple[NDArray[DataT], NDArray[bool_] | None]

A static method that performs the work of a FunctionFill. This method can be useful in creating other Fill instances, when their replacement value logic can be expressed as a no-parameter function.

Parameters:

  • data_np (NDArray[DataT]) –

    The data to fix.

  • missing_mask (NDArray[bool_]) –

    A mask indicating values which should be considered missing.

  • with_function (Callable[[], DataT]) –

    The function used to generate replacement values.

Returns:

RandomFill dataclass

RandomFill(with_random: Callable[[Generator], DataT])

Bases: Fill[DataT]

A Fill which replaces missing values with randomly-generated values.

RandomFill is generic in the dtype of the data it fixes (DataT).

Parameters:

  • with_random (Callable[[Generator], DataT]) –

    A function for generating replacement values using the given numpy random number generator.

with_random instance-attribute

with_random: Callable[[Generator], DataT]

A function for generating replacement values using the given numpy random number generator.

__call__

__call__(rng, data_np, missing_mask)

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • data_np (NDArray[DataT]) –

    The data to fix.

  • missing_mask (NDArray[bool_]) –

    A mask indicating values which should be considered missing.

Returns:

  • tuple[NDArray[DataT], NDArray[bool_] | None]

    A tuple containing two values:

    • a copy of the data with the fill applied (a copy if modified),
    • an updated missing values mask. Fill methods may or may not replace all missing values; if all missing values have been filled, this will be None.

from_range staticmethod

from_range(low: int, high: int) -> RandomFill[int64]

Convenience constructor for a RandomFill which replaces values with values sampled uniformly from a discrete range of integers.

Parameters:

  • low (int) –

    The lowest replacement value.

  • high (int) –

    The highest replacement value.

Returns:

from_range_float staticmethod

from_range_float(
    low: float, high: float
) -> RandomFill[float64]

Convenience constructor for a RandomFill which replaces values with values sampled uniformly from a continuous range.

Parameters:

  • low (float) –

    The low end of the range of replacement values.

  • high (float) –

    The high end of the range of replacement values. (Not included in the possible values.)

Returns:

DontFill dataclass

DontFill()

Bases: Fill[DataT]

A special Fill which does not replace missing values and simply returns the data as-is (no-op).

DontFill is generic in the dtype of the data it fixes (DataT).

__call__

__call__(
    rng, data_np, missing_mask
) -> tuple[NDArray[DataT], NDArray[bool_] | None]

Apply this fix to some data.

Parameters:

  • rng (HasRandomness) –

    A source of randomness.

  • data_np (NDArray[DataT]) –

    The data to fix.

  • missing_mask (NDArray[bool_]) –

    A mask indicating values which should be considered missing.

Returns:

  • tuple[NDArray[DataT], NDArray[bool_] | None]

    A tuple containing two values:

    • a copy of the data with the fill applied (a copy if modified),
    • an updated missing values mask. Fill methods may or may not replace all missing values; if all missing values have been filled, this will be None.

PipelineResult dataclass

PipelineResult(
    value: NDArray[DataT],
    issues: Mapping[str, NDArray[bool_]],
)

Bases: Generic[DataT]

An object containing the result of processing data through a DataPipeline.

PipelineResult is generic in the dtype of the resulting numpy array (DataT).

Parameters:

  • value (NDArray[DataT]) –

    The resulting numpy array. In this form, the array will never masked, even if there are issues. If you want a masked array, see the value_as_masked property.

  • issues (Mapping[str, NDArray[bool_]]) –

    The set of outstanding issues in the underlying data, with issue-specific masks.

value instance-attribute

value: NDArray[DataT]

The resulting numpy array. In this form, the array will never masked, even if there are issues. If you want a masked array, see the value_as_masked property.

issues instance-attribute

issues: Mapping[str, NDArray[bool_]]

The set of outstanding issues in the underlying data, with issue-specific masks.

value_as_masked property

value_as_masked: NDArray[DataT]

The resulting numpy array which will be masked if-and-only-if there are issues. The mask is computed as the logical union of the individual issue masks.

with_issue

with_issue(
    issue_name: str, issue_mask: NDArray[bool_] | None
) -> Self

Updates the result by adding a data issue.

Parameters:

  • issue_name (str) –

    The name of the issue.

  • issue_mask (NDArray[bool_] | None) –

    The mask indicating which values are affected by the issue. For convenience, the mask may be None or "no mask" to indicate the data does not have the named issue in fact, in which case the issue will not be added.

Returns:

  • Self

    The updated copy of the result.

to_date_value

to_date_value(
    dates: NDArray[datetime64],
) -> PipelineResult[DateValueType]

Converts the result to a date-value-tuple array.

Parameters:

Returns:

See Also

epymorph.util.to_date_value_array for more detail on how dates and values are combined, and epymorph.util.extract_date_value for a convenient way to separate the dates and values when needed.

sum staticmethod

sum(
    left: PipelineResult[DataT],
    right: PipelineResult[DataT],
    *,
    left_prefix: str,
    right_prefix: str,
) -> PipelineResult[DataT]

Combines two PipelineResults by summing unmasked data values. The result will include both lists of data issues by prefixing the issue names.

Parameters:

  • left (PipelineResult[DataT]) –

    The first addend.

  • right (PipelineResult[DataT]) –

    The second addend.

  • left_prefix (str) –

    A prefix to assign to any left-side issues.

  • right_prefix (str) –

    A prefix to assign to any right-side issues.

Returns:

PivotAxis

Bases: NamedTuple

Describes an axis on which a DataFrame will be pivoted to become a numpy array.

column instance-attribute

column: str

The name of the column in a DataFrame.

values instance-attribute

values: list | NDArray

The set of values we expect to find in the column. This will be used to expand and reorder the resulting pivot table. If values are in this set and not in the data, the table will contain missing values -- which is better than not knowing which values are missing!

DataPipeline dataclass

DataPipeline(
    axes: tuple[PivotAxis, PivotAxis],
    ndims: Literal[1, 2],
    dtype: type[DataT],
    rng: HasRandomness,
    pipeline_steps: Sequence[_PipelineStep] = list(),
)

Bases: Generic[DataT]

DataPipeline is a factory class for assembling data processing pipelines.

Using builder-style syntax you define the processing steps that the data should flow through. Finalizing the pipeline yields a function that takes a DataFrame, executes the pipeline steps in sequence, and returns a PipelineResult containing the processed data and any unresolved data issues discovered along the way. The DataPipeline instance itself can be discarded after the processing function is finalized.

DataPipeline was designed to produce arrays with one or two dimensions. When there is more than one value in the "columns" dimension, it's obvious we should have a 2D array. But when there's only one column, a 1D or 2D array layout are both valid. Because of this ambiguity, it's up to you to provide the number of dimensions you expect. If you specify ndims as 1 and the data has more than one column, this will result in an error.

DataPipeline generic in the dtype of the data it processes (DataT).

Parameters:

  • axes (tuple[PivotAxis, PivotAxis]) –

    The definition of the axes which will be used to tabulate the data. The first axis represents rows in the result, and the second columns.

  • ndims (Literal[1, 2]) –

    The number of dimensions expected in the result: 1 or 2.

  • dtype (type[DataT]) –

    The dtype of the data values in the result.

  • rng (HasRandomness) –

    A source of randomness.

  • pipeline_steps (Sequence[_PipelineStep], default: list() ) –

    The accumulated pipeline steps.

Examples:

This example uses a DataPipeline to process an simple DataFrame:

import numpy as np
import pandas as pd
from epymorph.adrio.processing import DataPipeline, Fill, PivotAxis, RandomFix
from epymorph.simulation import Context
from epymorph.kit import *

# Example data: integer values for each pair of 2 places and 3 variables.
raw_data_df = pd.DataFrame(
    {
        "geoid": ["04", "04", "04", "35", "35"],
        "variable": ["a", "b", "c", "a", "b"],
        "value": [11, -999, 13, 21, 22],
    }
)

# Usually we'd be doing this with a real simulation context.
context = Context.of(rng=np.random.default_rng(42))

# Define the pipeline...
pipeline = (
    DataPipeline(
        # `axes` defines the axes of the result array,
        # as well as the set of values that should be in each axis.
        axes=(
            PivotAxis("geoid", ["04", "35"]),  # first axis
            PivotAxis("variable", ["a", "b", "c"]),  # second axis
        ),
        ndims=2,
        dtype=np.int64,
        rng=context,
    )
    # Replace sentinel values (-999) with a random value from 1 to 3.
    .strip_sentinel(
        "insufficient_data",
        np.int64(-999),
        RandomFix.from_range(1, 3),
    )
    # Fill missing values with 0.
    .finalize(Fill.of_int64(0))
)

# Run the data through the pipeline.
result = pipeline(raw_data_df)

result.value
# array([[11,  1, 13],
#        [21, 22,  0]])

axes instance-attribute

The definition of the axes which will be used to tabulate the data. The first axis represents rows in the result, and the second columns.

ndims instance-attribute

ndims: Literal[1, 2]

The number of dimensions expected in the result: 1 or 2.

dtype instance-attribute

dtype: type[DataT]

The dtype of the data values in the result.

rng instance-attribute

A source of randomness.

pipeline_steps class-attribute instance-attribute

pipeline_steps: Sequence[_PipelineStep] = field(
    default_factory=list
)

The accumulated pipeline steps.

map_series

map_series(
    column: str,
    map_fn: Callable[[Series], Series] | None = None,
) -> Self

Add a pipeline step that transforms a column of the DataFrame by applying a mapping function to the series.

Parameters:

  • column (str) –

    The name of the column to transform.

  • map_fn (Callable[[Series], Series] | None, default: None ) –

    The series mapping function. As a convenience you may pass None, in which case this is a no-op.

Returns:

  • Self

    A copy of this pipeline with the step added.

map_column

map_column(
    column: str, map_fn: Callable | None = None
) -> Self

Add a pipeline step that transforms a column of the DataFrame by applying a mapping function to all values in the series.

Parameters:

  • column (str) –

    The name of the column to transform.

  • map_fn (Callable | None, default: None ) –

    The value mapping function. As a convenience you may pass None, in which case this is a no-op.

Returns:

  • Self

    A copy of this pipeline with the step added.

strip_sentinel

strip_sentinel(
    sentinel_name: str,
    sentinel_value: DataT,
    fix: Fix[DataT],
) -> Self

Add a pipeline step for dealing with sentinel values in the DataFrame. First we apply the given Fix, then check for any remaining sentinel values. If sentinel values still remain in the data, these are recorded as a data issue with an associated mask.

Parameters:

  • sentinel_name (str) –

    The name used for the data issue if any sentinel values remain.

  • sentinel_value (DataT) –

    The value considered a sentinel.

  • fix (Fix[DataT]) –

    The fix to apply to attempt to replace sentinel values.

Returns:

  • Self

    A copy of this pipeline with the step added.

strip_na_as_sentinel

strip_na_as_sentinel(
    column: str,
    sentinel_name: str,
    sentinel_value: DataT,
    fix: Fix[DataT],
) -> Self

Add a pipeline step for dealing with NaN/NA/null values in the DataFrame. First replace NA values with a user-defined sentinel value, then apply the given Fix. Finally check for any remaining such values. If sentinel values still remain in the data, these are recorded as a data issue with an associated mask.

Parameters:

  • column (str) –

    The name of the column to transform.

  • sentinel_name (str) –

    The name used for the data issue if any NA/sentinel values remain.

  • sentinel_value (DataT) –

    The value to use to replace NA values. We want to replace NAs so that we can universally convert the data column to the desired type -- np.int64 doesn't support NA values like np.float64 does, so this allows the input DataFrame to start with something like Pandas' "Int64" data type while the pipeline produces np.int64 results. The sentinel value chosen for this must not already exist in the data.

  • fix (Fix[DataT]) –

    The fix to apply to attempt to replace NA/sentinel values.

Returns:

  • Self

    A copy of this pipeline with the step added.

Raises:

  • Exception

    If the data naturally contains the chosen sentinel value.

finalize

finalize(
    fill_missing: Fill[DataT],
) -> Callable[[DataFrame], PipelineResult[DataT]]

Completes construction of the pipeline.

Parameters:

  • fill_missing (Fill[DataT]) –

    A method for filling in missing data.

Returns: