cascade.data#

class cascade.data.ApplyModifier(dataset: Dataset[T] | IteratorDataset[T], func: Callable[[T], Any], p: float | None = None, seed: int | None = None, *args: Any, **kwargs: Any)[source]#

Modifier that applies a function to given dataset’s items in each __getitem__ call.

Can be applied to Iterators too.

__init__(dataset: Dataset[T] | IteratorDataset[T], func: Callable[[T], Any], p: float | None = None, seed: int | None = None, *args: Any, **kwargs: Any) None[source]#
Parameters:
  • dataset (Dataset) – A dataset to modify

  • func (Callable) – A function to be applied to every item of a dataset - each __getitem__ calls func on an item obtained from a previous dataset

  • p (Optional[float], by default None) – The probability [0, 1] with which to apply func

  • seed (Optional[int], by default None) – Random seed is used when p is not None

Examples

>>> from cascade import data as cdd
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds = cdd.ApplyModifier(ds, lambda x: x ** 2)

Now function will only be applied when items are retrieved

>>> assert [item for item in ds] == [0, 1, 4, 9, 16]
class cascade.data.BruteforceCacher(dataset: BaseDataset[T], *args: Any, **kwargs: Any)[source]#

Special modifier that calls all previous pipeline in __init__ loading everything in memory.

Examples

>>> from cascade import data as cdd
>>> ds = cdd.Wrapper([0 for _ in range(1000000)])
>>> ds = cdd.ApplyModifier(ds, lambda x: x + 1)
>>> ds = cdd.ApplyModifier(ds, lambda x: x + 1)
>>> ds = cdd.ApplyModifier(ds, lambda x: x + 1)

Cache heavy upstream part once

>>> ds = cdd.BruteforceCacher(ds)

Then pickle it

>>> ds = cdd.Pickler('ds', ds)

Unpickle and use further

>>> ds = cdd.Pickler('ds')
>>> ds = cdd.RandomSampler(ds, 1000)
__init__(dataset: BaseDataset[T], *args: Any, **kwargs: Any) None[source]#

Loads every item in dataset in internal list.

class cascade.data.Composer(datasets: List[Dataset[Any]], *args: Any, **kwargs: Any)[source]#

Unifies two or more datasets element-wise.

Example

>>> from cascade import data as cdd
>>> items = cdd.Wrapper([0, 1, 2, 3, 4])
>>> labels = cdd.Wrapper([1, 0, 0, 1, 1])
>>> ds = cdd.Composer((items, labels))
>>> assert ds[0] == (0, 1)
__init__(datasets: List[Dataset[Any]], *args: Any, **kwargs: Any) None[source]#
Parameters:

datasets (Iterable[Dataset]) – Datasets of the same length to be unified

from_meta(meta: List[Dict[Any, Any]]) None[source]#

Updates its own fields as usual and if meta has data key then sequentially updates data of all its datasets

Parameters:

meta (Meta) – Meta of a single object or a pipeline

get_meta() List[Dict[Any, Any]][source]#

Composer calls get_meta() of all its datasets

class cascade.data.Concatenator(datasets: List[Dataset[T]], *args: Any, **kwargs: Any)[source]#

Unifies several Datasets under one, calling them sequentially in the provided order.

Examples

>>> from cascade.data import Wrapper, Concatenator
>>> ds_1 = Wrapper([0, 1, 2])
>>> ds_2 = Wrapper([2, 1, 0])
>>> ds = Concatenator((ds_1, ds_2))
>>> assert [item for item in ds] == [0, 1, 2, 2, 1, 0]
__init__(datasets: List[Dataset[T]], *args: Any, **kwargs: Any) None[source]#

Creates concatenated dataset from the list of datasets provided

Parameters:

datasets (Union[Iterable[Dataset], Mapping[Dataset]]) – A list or tuple of datasets to concatenate

__len__() int[source]#

Length of Concatenator is a sum of lengths of its datasets

from_meta(meta: List[Dict[Any, Any]]) None[source]#

Updates its own fields as usual and if meta has data key then sequentially updates data of all its datasets

Parameters:

meta (Meta) – Meta of a single object or a pipeline

get_meta() List[Dict[Any, Any]][source]#

Concatenator calls get_meta() of all its datasets

class cascade.data.CyclicSampler(dataset: Dataset[T], num_samples: int, *args: Any, **kwargs: Any)[source]#

A Sampler that iterates num_samples times through an input Dataset in cyclic manner

Example

>>> from cascade.data import CyclicSampler, Wrapper
>>> ds = Wrapper([1,2,3])
>>> ds = CyclicSampler(ds, 7)
>>> assert [item for item in ds] == [1, 2, 3, 1, 2, 3, 1]
class cascade.data.BaseDataset(*args: Any, data_card: DataCard | None = None, **kwargs: Any)[source]#

Base class of any object that constitutes a step in a data-pipeline

__init__(*args: Any, data_card: DataCard | None = None, **kwargs: Any) None[source]#
get_meta() List[Dict[Any, Any]][source]#
Returns:

meta – A list where last element is this dataset’s metadata. Meta can be anything that is worth to document about the dataset and its data. This is done in form of list to enable cascade-like calls in Modifiers and Samplers.

Return type:

Meta

class cascade.data.Assessor(id: str | None = None, position: str | None = None)[source]#

The container for the info on the people who were in charge of labeling process, their experience and other properties.

This is a dataclass, so any additional fields will not be recorded if added. If it needs to be extended, please create a new class instead.

__init__(id: str | None = None, position: str | None = None) None#
id: str | None = None#
position: str | None = None#
class cascade.data.DataCard(name: str | None = None, desc: str | None = None, source: str | None = None, goal: str | None = None, labeling_info: LabelingInfo | None = None, size: int | Tuple[int] | None = None, metrics: Dict[str, Any] | None = None, schema: Dict[Any, Any] | None = None, **kwargs: Any)[source]#

The container for the information on dataset. The set of fields here is general and can be extended by providing new keywords into __init__.

Example

>>> from cascade.data import DataCard, Assessor, LabelingInfo, DataRegistrator
>>> person = Assessor(id=0, position="Assessor")
>>> info = LabelingInfo(who=[person], process_desc="Labeling description")
>>> dc = DataCard(
...     name="Dataset",
...     desc="Example dataset",
...     source="Database",
...     goal="Every dataset should have a goal",
...     labeling_info=info,
...     size=100,
...     metrics={"quality": 100},
...     schema={"label": "value"},
...     custom_field="hello")
>>> dr = DataRegistrator('data_log.yml')
>>> dr.register(dc)
__init__(name: str | None = None, desc: str | None = None, source: str | None = None, goal: str | None = None, labeling_info: LabelingInfo | None = None, size: int | Tuple[int] | None = None, metrics: Dict[str, Any] | None = None, schema: Dict[Any, Any] | None = None, **kwargs: Any) None[source]#
Parameters:
  • name (Optional[str]) – The name of dataset

  • desc (Optional[str]) – Short description

  • source (Optional[str]) – The source of data. Can be URL or textual description of source

  • goal (Optional[str]) – The datasets have a goal - what should be achieved using this data?

  • labeling_info (Optional[LabelingInfo]) – The instance of dataclass describing labeling process placed here

  • size (Union[int, Tuple[int], None]) – This can usually be done automatically - number of items or shape of the table.

  • metrics (Optional[Dict[str, Any]]) – Dictionary with names and values of metrics. Any quality metrics can be included

  • schema (Optional[Dict[Any, Any]]) – Schema dictionary describing table datasets, their columns, data types, possible values, etc. Panderas schema objects can be used when converted into dictionaries

to_dict() Dict[str, Any][source]#
class cascade.data.LabelingInfo(who: List[Assessor] | None = None, process_desc: str | None = None, docs: str | None = None)[source]#

The container for the information on labeling process, people involved, description of the process, documentation links.

This is a dataclass, so any additional fields will not be recorded if added. If it needs to be extended, please create a new class instead.

__init__(who: List[Assessor] | None = None, process_desc: str | None = None, docs: str | None = None) None#
docs: str | None = None#
process_desc: str | None = None#
who: List[Assessor] | None = None#
class cascade.data.Dataset(*args: Any, data_card: DataCard | None = None, **kwargs: Any)[source]#

An abstract class to represent a dataset with __len__ method present. Inheritance of this class should mean the presence of length.

If your dataset does not have length defined you can use Iterator

See also

cascade.data.Iterator

get_meta() List[Dict[Any, Any]][source]#
Returns:

meta – A list where last element is this dataset’s metadata. Meta can be anything that is worth to document about the dataset and its data. This is done in form of list to enable cascade-like calls in Modifiers and Samplers.

Return type:

Meta

class cascade.data.IteratorDataset(*args: Any, data_card: DataCard | None = None, **kwargs: Any)[source]#

An abstract class to represent a dataset as an iterable object

class cascade.data.IteratorWrapper(data: Iterable[T], *args: Any, **kwargs: Any)[source]#

Wraps IteratorDataset around any Iterable. Does not have map-like interface.

__init__(data: Iterable[T], *args: Any, **kwargs: Any) None[source]#
get_meta() List[Dict[Any, Any]][source]#
Returns:

meta – A list where last element is this dataset’s metadata. Meta can be anything that is worth to document about the dataset and its data. This is done in form of list to enable cascade-like calls in Modifiers and Samplers.

Return type:

Meta

class cascade.data.SizedDataset(*args: Any, **kwargs: Any)[source]#
__init__(*args: Any, **kwargs: Any) None[source]#
class cascade.data.Wrapper(obj: Sequence[T], *args: Any, **kwargs: Any)[source]#

Wraps Dataset around any list-like object.

__init__(obj: Sequence[T], *args: Any, **kwargs: Any) None[source]#
get_meta() List[Dict[Any, Any]][source]#
Returns:

meta – A list where last element is this dataset’s metadata. Meta can be anything that is worth to document about the dataset and its data. This is done in form of list to enable cascade-like calls in Modifiers and Samplers.

Return type:

Meta

class cascade.data.Filter(dataset: Dataset, filter_fn: Callable, *args: Any, **kwargs: Any)[source]#

Filter for Datasets with length. Uses a function to create a mask of items that will remain

__init__(dataset: Dataset, filter_fn: Callable, *args: Any, **kwargs: Any) None[source]#

Filter a dataset using a filter function. Does not accumulate items in memory, will store only an index mask.

Parameters:
  • dataset (Dataset) – A dataset to filter

  • filter_fn (Callable) – A function to be applied to every item of a dataset - should return bool. Will be called on every item on __init__.

Raises:

RuntimeError – If filter_fn raises an exception

class cascade.data.IteratorFilter(dataset: IteratorDataset, filter_fn: Callable, *args: Any, **kwargs: Any)[source]#

Filter for datasets without length

Does not filter on init, returns only items that pass the filter

__init__(dataset: IteratorDataset, filter_fn: Callable, *args: Any, **kwargs: Any) None[source]#

Constructs a Modifier. Modifier represents a step in a pipeline - some data transformation

Parameters:

dataset (BaseDataset[T]) – A dataset to modify

class cascade.data.FolderDataset(root: str, *args: Any, **kwargs: Any)[source]#

Basic “folder of files” dataset. Accepts root folder in which considers all files. Is abstract - getitem is not defined, since it is specific for each file type.

See also

cascade.utils.FolderImageDataset

__init__(root: str, *args: Any, **kwargs: Any) None[source]#
Parameters:

root (str) – A path to the folder of files

get_meta() List[Dict[Any, Any]][source]#

Returns meta containing root folder

get_names() List[str][source]#

Returns a list of full paths to the files

cascade.data.dataset(f: Callable[[...], Any], do_validate_in: bool = True) Callable[[...], FunctionDataset][source]#

Thin wrapper to turn any function into a Cascade’s Dataset. Use this if the function is the data source

Will return FunctionDataset object. To get results of the execution use dataset.result field

Parameters:

f (Callable[..., Any]) – Function that produces data

Returns:

Call this to get a dataset

Return type:

Callable[…, FunctionDataset]

cascade.data.modifier(f: Callable[[...], Any], do_validate_in: bool = True) Callable[[...], FunctionModifier][source]#

Thin wrapper to turn any function into Cascade’s Modifier Pass the returning value of a function that was previosly wrapped dataset or modifier. Will replace any dataset with dataset.result automatically if the function argument is FunctionDataset.

Parameters:

f (Callable[..., Any]) – Function that modifies data

Returns:

Call this to get a modifier

Return type:

Callable[…, FunctionModifier]

class cascade.data.BaseModifier(dataset: BaseDataset[T], *args: Any, **kwargs: Any)[source]#
__init__(dataset: BaseDataset[T], *args: Any, **kwargs: Any) None[source]#

Constructs a Modifier. Modifier represents a step in a pipeline - some data transformation

Parameters:

dataset (BaseDataset[T]) – A dataset to modify

from_meta(meta: List[Dict[Any, Any]]) None[source]#

Calls the same method as base class but does it cascade-like which allows to roll a list of meta on a pipeline

Parameters:

meta (Meta) – Meta of a single object or a pipeline

get_meta() List[Dict[Any, Any]][source]#

Overrides base method enabling cascade-like calls to previous datasets. The metadata of a pipeline that consist of several modifiers can be easily obtained with get_meta of the last block.

class cascade.data.IteratorModifier(dataset: IteratorDataset[T], *args: Any, **kwargs: Any)[source]#

The Modifier for Iterator datasets

See also

cascade.data.Modifier, cascade.data.Iterator

__init__(dataset: IteratorDataset[T], *args: Any, **kwargs: Any) None[source]#

Constructs a Modifier. Modifier represents a step in a pipeline - some data transformation

Parameters:

dataset (BaseDataset[T]) – A dataset to modify

get_meta() List[Dict[Any, Any]][source]#

Overrides base method enabling cascade-like calls to previous datasets. The metadata of a pipeline that consist of several modifiers can be easily obtained with get_meta of the last block.

class cascade.data.Modifier(dataset: BaseDataset[T], *args: Any, **kwargs: Any)[source]#

Basic pipeline building block in Cascade. Every block which is not a data source should be a successor of Sampler or Modifier.

This structure enables having a data pipeline which consists of uniform blocks each of them has a reference to the previous one in its _dataset field

Basically Modifier defines an arbitrary transformation on every dataset’s item that is applied in a lazy manner on each __getitem__ call.

Applies no transformation if __getitem__ is not overridden

Does not change the length of a dataset. See Sampler for this functionality

get_meta() List[Dict[Any, Any]][source]#

Overrides base method enabling cascade-like calls to previous datasets. The metadata of a pipeline that consist of several modifiers can be easily obtained with get_meta of the last block.

class cascade.data.Sampler(dataset: Dataset[T], num_samples: int, *args: Any, **kwargs: Any)[source]#

Defines certain sampling over a Dataset.

Its distinctive feature is that it changes the number of items in dataset.

Can be used to build a batch sampler, random sampler, etc.

__init__(dataset: Dataset[T], num_samples: int, *args: Any, **kwargs: Any) None[source]#

Constructs a Sampler.

Parameters:
  • dataset (Dataset) – A dataset to sample from

  • num_samples (int) – The number of samples to use as a new length

class cascade.data.Pickler(**kwargs)[source]#

Pickles input dataset or unpickles one

__getitem__(index: int) T[source]#

Forwards the call to the wrapped dataset regardless of presence of this method in it

__init__(path: str, dataset: BaseDataset[T] | None = None, *args: Any, **kwargs: Any) None[source]#

Loads pickled dataset or dumps one depending on parameters passed:

  1. If only path is passed - loads dataset from path provided if path exists

  2. if path provided with a dataset dumps dataset to the path

Parameters:
  • path (str) – Path to the pickled dataset

  • dataset (BaseDataset, optional) – A dataset to be pickled

Raises:

FileNotFoundError – if path does not exist

__len__() int[source]#

Forwards the call to the wrapped dataset regardless of presence of this method in it

ds() BaseDataset[T][source]#

Returns pickled dataset

class cascade.data.RandomSampler(dataset: Dataset[T], num_samples: int | None = None, *args: Any, **kwargs: Any)[source]#

Shuffles a dataset

__init__(dataset: Dataset[T], num_samples: int | None = None, *args: Any, **kwargs: Any) None[source]#
Parameters:
  • dataset (Dataset[T]) – Input dataset to sample from

  • num_samples (int, optional) – If less or equal than len(dataset) samples without repetitions (shuffles indices) If more than len(dataset) generates random integers as indices If None, then just shuffles the dataset

class cascade.data.RangeSampler(dataset: Dataset[T], start: int | None = None, stop: int | None = None, step: int = 1, *args: Any, **kwargs: Any)[source]#

Implements an interface of standard range in a dataset.

Example

>>> from cascade.data import RangeSampler, Wrapper
>>> ds = Wrapper([1, 2, 3, 4, 5])
>>> # Define start, stop and step exactly as in range()
>>> sampler = RangeSampler(ds, 1, 5, 2)
>>> for item in sampler:
...     print(item)
...
2
4
>>> ds = Wrapper([1, 2, 3, 4, 5])
>>> sampler = RangeSampler(ds, 3)
>>> for item in sampler:
...     print(item)
...
1
2
3
__init__(dataset: Dataset[T], start: int | None = None, stop: int | None = None, step: int = 1, *args: Any, **kwargs: Any) None[source]#
Parameters:
  • dataset (SizedDataset) – A dataset to sample from

  • start (int) – Start index in range - included

  • stop (int) – Stop index in range - excluded

  • step (int, default is 1) – Step of range

Raises:
  • ValueError – when no start or stop present or:

  • when parameters given produce empty dataset

class cascade.data.SchemaModifier(dataset: BaseDataset[T], *args: Any, **kwargs: Any)[source]#

Data validation modifier

When self._dataset is called and has self.in_schema defined, wraps self._dataset into validator, which is another Modifier that checks the output of __getitem__ of the dataset that was wrapped.

In the end it will look like this:
If in_schema is not None:

dataset = SchemaModifier(ValidationWrapper(dataset))

If in_schema is None:

dataset = SchemaModifier(dataset)

How to use it: 1. Define pydantic schema of input

```python import pydantic

class AnnotImage(pydantic.BaseModel):

image: List[List[List[float]]] segments: List[List[int]] bboxes: List[Tuple[int, int, int, int]]

```

  1. Use schema as in_schema

```python from cascade.data import SchemaModifier

class ImageModifier(SchemaModifier):

in_schema = AnnotImage

```

3. Create a regular Modifier by subclassing ImageModifier.

```python class IDoNothing(ImageModifier):

def __getitem__(self, idx):

item = self._dataset[idx] return item

```

4. That’s all. Schema check will be held automatically every time self._dataset[idx] is accessed. If it is not AnnotImage, cascade.data.ValidationError will be raised.

__init__(dataset: BaseDataset[T], *args: Any, **kwargs: Any) None#

Constructs a Modifier. Modifier represents a step in a pipeline - some data transformation

Parameters:

dataset (BaseDataset[T]) – A dataset to modify

class cascade.data.SimpleDataloader(data: Sequence[T], batch_size: int = 1)[source]#

Simple batch builder - given a sequence and a size of batch breaks it in the subsequences

>>> from cascade.data import SimpleDataloader
>>> dl = SimpleDataloader([0, 1, 2], 2)
>>> [item for item in dl]
[[0, 1], [2]]
__init__(data: Sequence[T], batch_size: int = 1) None[source]#
cascade.data.split(ds: Dataset[T], frac: float | None = 0.5, num: int | None = None) Tuple[RangeSampler[T], RangeSampler[T]][source]#

Splits dataset into two cascade.data.RangeSampler

Parameters:
  • frac (float) – A fraction for division of dataset. For example if frac=0.8, then first dataset gets 80% of items and the second gets 20%. Is not used, when num is specified.

  • num (int) – A number of items that first dataset will get. The second one will get len(dataset) - num items.

Example

>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds1, ds2 = cdd.split(ds)
>>> print([item for item in ds1])
[0, 1]
>>> print([item for item in ds2])
[2, 3, 4]
>>> ds1, ds2 = cdd.split(ds, 0.6)
>>> print([item for item in ds1])
[0, 1, 2]
>>> print([item for item in ds2])
[3, 4]
>>> ds1, ds2 = cdd.split(ds, num=4)
>>> print([item for item in ds1])
[0, 1, 2, 3]
>>> print([item for item in ds2])
[4]
class cascade.data.ValidationError(message: str | None = None, error_index: int | None = None)[source]#

Base class to raise if data validation failed

Can provide additional information about the fail

__init__(message: str | None = None, error_index: int | None = None) None[source]#
cascade.data.validate_in(f: Callable[[...], Any]) Callable[[...], Any][source]#

Data validation decorator for callables. In each call validates only the input schema using type annotations if present. Does not check return value.

Parameters:

f (Callable[[Any], Any]) – Function to wrap

Returns:

Decorated function

Return type:

Callable[[Any], Any]

class cascade.data.VersionAssigner(**kwargs)[source]#

Class for automatic data versioning using metadata. VersionAssigner is a simple Modifier that tracks changes in metadata and assigns dataset a version considering changes in meta. The version consists of two parts, namely major and minor in the format MAJOR.MINOR just like in semantic versioning. The meaning of parts is the following: major number changes if there are changes in the structure of the pipeline e.g. some dataset was added/removed; minor number changes in case of any metadata change e.g. new data arrived and changed the length of modifiers on pipeline.

Example

>>> # Set up the pipeline
>>> from cascade import data as cdd
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds = VersionAssigner(ds, 'data_log.yml') # can be any supported meta format
>>> print(ds.version)
    0.0
>>> # Changes its structure - add new modifier
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds = cdd.RangeSampler(ds, 0, len(ds), 2)
>>> ds = VersionAssigner(ds, 'data_log.yml')
>>> print(ds.version)
    1.0
>>> # Revert changes - version downgrades back
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds = VersionAssigner(ds, 'data_log.yml')
>>> print(ds.version)
    0.0
>>> # Update input data - minor update
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4, 5])
>>> ds = VersionAssigner(ds, 'data_log.yml')
>>> print(ds.version)
    0.1

Note

Some limitations are present. If meta data of some dataset has something random or run-dependent like for example memory address of an object or time of creation, then the version will bump on every run.

__init__(dataset: BaseDataset[T], path: str, verbose: bool = False, *args: Any, **kwargs: Any) None[source]#
Parameters:
  • dataset (Dataset) – a dataset to infer version to

  • path (str) – a path to a version log file of this dataset can be of any supported meta format

cascade.data.version(ds: BaseDataset[T], path: str) str[source]#

Returns version of a dataset using VersionAssigner

Parameters:
  • ds (Dataset[T]) – Dataset to track and version

  • path (str) – Path to the version log of a dataset, will be created if does not exists

Returns:

Version in two parts like 2.1 or 0.1

Return type:

str