cascade.data#
- class cascade.data.ApplyModifier(dataset: Dataset[T] | IteratorDataset[T], func: Callable[[T], Any], p: float | None = None, seed: int | None = None, *args: Any, **kwargs: Any)[source]#
Modifier that applies a function to given dataset’s items in each __getitem__ call.
Can be applied to Iterators too.
- __init__(dataset: Dataset[T] | IteratorDataset[T], func: Callable[[T], Any], p: float | None = None, seed: int | None = None, *args: Any, **kwargs: Any) None [source]#
- Parameters:
dataset (Dataset) – A dataset to modify
func (Callable) – A function to be applied to every item of a dataset - each
__getitem__
callsfunc
on an item obtained from a previous datasetp (Optional[float], by default None) – The probability [0, 1] with which to apply func
seed (Optional[int], by default None) – Random seed is used when p is not None
Examples
>>> from cascade import data as cdd >>> ds = cdd.Wrapper([0, 1, 2, 3, 4]) >>> ds = cdd.ApplyModifier(ds, lambda x: x ** 2)
Now function will only be applied when items are retrieved
>>> assert [item for item in ds] == [0, 1, 4, 9, 16]
- class cascade.data.BruteforceCacher(dataset: BaseDataset[T], *args: Any, **kwargs: Any)[source]#
Special modifier that calls all previous pipeline in __init__ loading everything in memory.
Examples
>>> from cascade import data as cdd >>> ds = cdd.Wrapper([0 for _ in range(1000000)]) >>> ds = cdd.ApplyModifier(ds, lambda x: x + 1) >>> ds = cdd.ApplyModifier(ds, lambda x: x + 1) >>> ds = cdd.ApplyModifier(ds, lambda x: x + 1)
Cache heavy upstream part once
>>> ds = cdd.BruteforceCacher(ds)
Then pickle it
>>> ds = cdd.Pickler('ds', ds)
Unpickle and use further
>>> ds = cdd.Pickler('ds') >>> ds = cdd.RandomSampler(ds, 1000)
See also
- __init__(dataset: BaseDataset[T], *args: Any, **kwargs: Any) None [source]#
Loads every item in dataset in internal list.
- class cascade.data.Composer(datasets: List[Dataset[Any]], *args: Any, **kwargs: Any)[source]#
Unifies two or more datasets element-wise.
Example
>>> from cascade import data as cdd >>> items = cdd.Wrapper([0, 1, 2, 3, 4]) >>> labels = cdd.Wrapper([1, 0, 0, 1, 1]) >>> ds = cdd.Composer((items, labels)) >>> assert ds[0] == (0, 1)
- __init__(datasets: List[Dataset[Any]], *args: Any, **kwargs: Any) None [source]#
- Parameters:
datasets (Iterable[Dataset]) – Datasets of the same length to be unified
- class cascade.data.Concatenator(datasets: List[Dataset[T]], *args: Any, **kwargs: Any)[source]#
Unifies several Datasets under one, calling them sequentially in the provided order.
Examples
>>> from cascade.data import Wrapper, Concatenator >>> ds_1 = Wrapper([0, 1, 2]) >>> ds_2 = Wrapper([2, 1, 0]) >>> ds = Concatenator((ds_1, ds_2)) >>> assert [item for item in ds] == [0, 1, 2, 2, 1, 0]
- __init__(datasets: List[Dataset[T]], *args: Any, **kwargs: Any) None [source]#
Creates concatenated dataset from the list of datasets provided
- class cascade.data.CyclicSampler(dataset: Dataset[T], num_samples: int, *args: Any, **kwargs: Any)[source]#
A Sampler that iterates
num_samples
times through an input Dataset in cyclic mannerExample
>>> from cascade.data import CyclicSampler, Wrapper >>> ds = Wrapper([1,2,3]) >>> ds = CyclicSampler(ds, 7) >>> assert [item for item in ds] == [1, 2, 3, 1, 2, 3, 1]
- class cascade.data.BaseDataset(*args: Any, data_card: DataCard | None = None, **kwargs: Any)[source]#
Base class of any object that constitutes a step in a data-pipeline
See also
- class cascade.data.Assessor(id: str | None = None, position: str | None = None)[source]#
The container for the info on the people who were in charge of labeling process, their experience and other properties.
This is a dataclass, so any additional fields will not be recorded if added. If it needs to be extended, please create a new class instead.
- __init__(id: str | None = None, position: str | None = None) None #
- id: str | None = None#
- position: str | None = None#
- class cascade.data.DataCard(name: str | None = None, desc: str | None = None, source: str | None = None, goal: str | None = None, labeling_info: LabelingInfo | None = None, size: int | Tuple[int] | None = None, metrics: Dict[str, Any] | None = None, schema: Dict[Any, Any] | None = None, **kwargs: Any)[source]#
The container for the information on dataset. The set of fields here is general and can be extended by providing new keywords into __init__.
Example
>>> from cascade.data import DataCard, Assessor, LabelingInfo, DataRegistrator >>> person = Assessor(id=0, position="Assessor") >>> info = LabelingInfo(who=[person], process_desc="Labeling description") >>> dc = DataCard( ... name="Dataset", ... desc="Example dataset", ... source="Database", ... goal="Every dataset should have a goal", ... labeling_info=info, ... size=100, ... metrics={"quality": 100}, ... schema={"label": "value"}, ... custom_field="hello") >>> dr = DataRegistrator('data_log.yml') >>> dr.register(dc)
- __init__(name: str | None = None, desc: str | None = None, source: str | None = None, goal: str | None = None, labeling_info: LabelingInfo | None = None, size: int | Tuple[int] | None = None, metrics: Dict[str, Any] | None = None, schema: Dict[Any, Any] | None = None, **kwargs: Any) None [source]#
- Parameters:
name (Optional[str]) – The name of dataset
desc (Optional[str]) – Short description
source (Optional[str]) – The source of data. Can be URL or textual description of source
goal (Optional[str]) – The datasets have a goal - what should be achieved using this data?
labeling_info (Optional[LabelingInfo]) – The instance of dataclass describing labeling process placed here
size (Union[int, Tuple[int], None]) – This can usually be done automatically - number of items or shape of the table.
metrics (Optional[Dict[str, Any]]) – Dictionary with names and values of metrics. Any quality metrics can be included
schema (Optional[Dict[Any, Any]]) – Schema dictionary describing table datasets, their columns, data types, possible values, etc. Panderas schema objects can be used when converted into dictionaries
- class cascade.data.LabelingInfo(who: List[Assessor] | None = None, process_desc: str | None = None, docs: str | None = None)[source]#
The container for the information on labeling process, people involved, description of the process, documentation links.
This is a dataclass, so any additional fields will not be recorded if added. If it needs to be extended, please create a new class instead.
- __init__(who: List[Assessor] | None = None, process_desc: str | None = None, docs: str | None = None) None #
- docs: str | None = None#
- process_desc: str | None = None#
- class cascade.data.Dataset(*args: Any, data_card: DataCard | None = None, **kwargs: Any)[source]#
An abstract class to represent a dataset with __len__ method present. Inheritance of this class should mean the presence of length.
If your dataset does not have length defined you can use Iterator
See also
cascade.data.Iterator
- class cascade.data.IteratorDataset(*args: Any, data_card: DataCard | None = None, **kwargs: Any)[source]#
An abstract class to represent a dataset as an iterable object
- class cascade.data.IteratorWrapper(data: Iterable[T], *args: Any, **kwargs: Any)[source]#
Wraps IteratorDataset around any Iterable. Does not have map-like interface.
- class cascade.data.Wrapper(obj: Sequence[T], *args: Any, **kwargs: Any)[source]#
Wraps Dataset around any list-like object.
- class cascade.data.Filter(dataset: Dataset, filter_fn: Callable, *args: Any, **kwargs: Any)[source]#
Filter for Datasets with length. Uses a function to create a mask of items that will remain
- __init__(dataset: Dataset, filter_fn: Callable, *args: Any, **kwargs: Any) None [source]#
Filter a dataset using a filter function. Does not accumulate items in memory, will store only an index mask.
- Parameters:
dataset (Dataset) – A dataset to filter
filter_fn (Callable) – A function to be applied to every item of a dataset - should return bool. Will be called on every item on
__init__
.
- Raises:
RuntimeError – If
filter_fn
raises an exception
- class cascade.data.IteratorFilter(dataset: IteratorDataset, filter_fn: Callable, *args: Any, **kwargs: Any)[source]#
Filter for datasets without length
Does not filter on init, returns only items that pass the filter
- __init__(dataset: IteratorDataset, filter_fn: Callable, *args: Any, **kwargs: Any) None [source]#
Constructs a Modifier. Modifier represents a step in a pipeline - some data transformation
- Parameters:
dataset (BaseDataset[T]) – A dataset to modify
- class cascade.data.FolderDataset(root: str, *args: Any, **kwargs: Any)[source]#
Basic “folder of files” dataset. Accepts root folder in which considers all files. Is abstract - getitem is not defined, since it is specific for each file type.
See also
cascade.utils.FolderImageDataset
- cascade.data.dataset(f: Callable[[...], Any], do_validate_in: bool = True) Callable[[...], FunctionDataset] [source]#
Thin wrapper to turn any function into a Cascade’s Dataset. Use this if the function is the data source
Will return FunctionDataset object. To get results of the execution use
dataset.result
field- Parameters:
f (Callable[..., Any]) – Function that produces data
- Returns:
Call this to get a dataset
- Return type:
Callable[…, FunctionDataset]
- cascade.data.modifier(f: Callable[[...], Any], do_validate_in: bool = True) Callable[[...], FunctionModifier] [source]#
Thin wrapper to turn any function into Cascade’s Modifier Pass the returning value of a function that was previosly wrapped dataset or modifier. Will replace any dataset with
dataset.result
automatically if the function argument isFunctionDataset
.- Parameters:
f (Callable[..., Any]) – Function that modifies data
- Returns:
Call this to get a modifier
- Return type:
Callable[…, FunctionModifier]
- class cascade.data.BaseModifier(dataset: BaseDataset[T], *args: Any, **kwargs: Any)[source]#
- __init__(dataset: BaseDataset[T], *args: Any, **kwargs: Any) None [source]#
Constructs a Modifier. Modifier represents a step in a pipeline - some data transformation
- Parameters:
dataset (BaseDataset[T]) – A dataset to modify
- class cascade.data.IteratorModifier(dataset: IteratorDataset[T], *args: Any, **kwargs: Any)[source]#
The Modifier for Iterator datasets
See also
cascade.data.Modifier
,cascade.data.Iterator
- __init__(dataset: IteratorDataset[T], *args: Any, **kwargs: Any) None [source]#
Constructs a Modifier. Modifier represents a step in a pipeline - some data transformation
- Parameters:
dataset (BaseDataset[T]) – A dataset to modify
- class cascade.data.Modifier(dataset: BaseDataset[T], *args: Any, **kwargs: Any)[source]#
Basic pipeline building block in Cascade. Every block which is not a data source should be a successor of Sampler or Modifier.
This structure enables having a data pipeline which consists of uniform blocks each of them has a reference to the previous one in its
_dataset
fieldBasically Modifier defines an arbitrary transformation on every dataset’s item that is applied in a lazy manner on each
__getitem__
call.Applies no transformation if
__getitem__
is not overriddenDoes not change the length of a dataset. See Sampler for this functionality
- class cascade.data.Sampler(dataset: Dataset[T], num_samples: int, *args: Any, **kwargs: Any)[source]#
Defines certain sampling over a Dataset.
Its distinctive feature is that it changes the number of items in dataset.
Can be used to build a batch sampler, random sampler, etc.
- class cascade.data.Pickler(**kwargs)[source]#
Pickles input dataset or unpickles one
- __getitem__(index: int) T [source]#
Forwards the call to the wrapped dataset regardless of presence of this method in it
- __init__(path: str, dataset: BaseDataset[T] | None = None, *args: Any, **kwargs: Any) None [source]#
Loads pickled dataset or dumps one depending on parameters passed:
If only path is passed - loads dataset from path provided if path exists
if path provided with a dataset dumps dataset to the path
- Parameters:
path (str) – Path to the pickled dataset
dataset (BaseDataset, optional) – A dataset to be pickled
- Raises:
FileNotFoundError – if path does not exist
- __len__() int [source]#
Forwards the call to the wrapped dataset regardless of presence of this method in it
- ds() BaseDataset[T] [source]#
Returns pickled dataset
- class cascade.data.RandomSampler(dataset: Dataset[T], num_samples: int | None = None, *args: Any, **kwargs: Any)[source]#
Shuffles a dataset
- __init__(dataset: Dataset[T], num_samples: int | None = None, *args: Any, **kwargs: Any) None [source]#
- Parameters:
dataset (Dataset[T]) – Input dataset to sample from
num_samples (int, optional) – If less or equal than len(dataset) samples without repetitions (shuffles indices) If more than len(dataset) generates random integers as indices If None, then just shuffles the dataset
- class cascade.data.RangeSampler(dataset: Dataset[T], start: int | None = None, stop: int | None = None, step: int = 1, *args: Any, **kwargs: Any)[source]#
Implements an interface of standard range in a dataset.
Example
>>> from cascade.data import RangeSampler, Wrapper >>> ds = Wrapper([1, 2, 3, 4, 5]) >>> # Define start, stop and step exactly as in range() >>> sampler = RangeSampler(ds, 1, 5, 2) >>> for item in sampler: ... print(item) ... 2 4 >>> ds = Wrapper([1, 2, 3, 4, 5]) >>> sampler = RangeSampler(ds, 3) >>> for item in sampler: ... print(item) ... 1 2 3
- __init__(dataset: Dataset[T], start: int | None = None, stop: int | None = None, step: int = 1, *args: Any, **kwargs: Any) None [source]#
- Parameters:
dataset (SizedDataset) – A dataset to sample from
start (int) – Start index in range - included
stop (int) – Stop index in range - excluded
step (int, default is 1) – Step of range
- Raises:
ValueError – when no start or stop present or:
when parameters given produce empty dataset –
- class cascade.data.SchemaModifier(dataset: BaseDataset[T], *args: Any, **kwargs: Any)[source]#
Data validation modifier
When
self._dataset
is called and has self.in_schema defined, wrapsself._dataset
into validator, which is anotherModifier
that checks the output of__getitem__
of the dataset that was wrapped.- In the end it will look like this:
- If
in_schema
is not None: dataset = SchemaModifier(ValidationWrapper(dataset))
- If
in_schema
is None: dataset = SchemaModifier(dataset)
- If
How to use it: 1. Define pydantic schema of input
- class AnnotImage(pydantic.BaseModel):
image: List[List[List[float]]] segments: List[List[int]] bboxes: List[Tuple[int, int, int, int]]
Use schema as
in_schema
```python from cascade.data import SchemaModifier
- class ImageModifier(SchemaModifier):
in_schema = AnnotImage
3. Create a regular
Modifier
by subclassing ImageModifier.```python class IDoNothing(ImageModifier):
- def __getitem__(self, idx):
item = self._dataset[idx] return item
4. That’s all. Schema check will be held automatically every time
self._dataset[idx]
is accessed. If it is notAnnotImage
, cascade.data.ValidationError will be raised.- __init__(dataset: BaseDataset[T], *args: Any, **kwargs: Any) None #
Constructs a Modifier. Modifier represents a step in a pipeline - some data transformation
- Parameters:
dataset (BaseDataset[T]) – A dataset to modify
- class cascade.data.SimpleDataloader(data: Sequence[T], batch_size: int = 1)[source]#
Simple batch builder - given a sequence and a size of batch breaks it in the subsequences
>>> from cascade.data import SimpleDataloader >>> dl = SimpleDataloader([0, 1, 2], 2) >>> [item for item in dl] [[0, 1], [2]]
- cascade.data.split(ds: Dataset[T], frac: float | None = 0.5, num: int | None = None) Tuple[RangeSampler[T], RangeSampler[T]] [source]#
Splits dataset into two cascade.data.RangeSampler
- Parameters:
frac (float) – A fraction for division of dataset. For example if frac=0.8, then first dataset gets 80% of items and the second gets 20%. Is not used, when
num
is specified.num (int) – A number of items that first dataset will get. The second one will get len(dataset) - num items.
Example
>>> ds = cdd.Wrapper([0, 1, 2, 3, 4])
>>> ds1, ds2 = cdd.split(ds) >>> print([item for item in ds1]) [0, 1]
>>> print([item for item in ds2]) [2, 3, 4]
>>> ds1, ds2 = cdd.split(ds, 0.6)
>>> print([item for item in ds1]) [0, 1, 2] >>> print([item for item in ds2]) [3, 4]
>>> ds1, ds2 = cdd.split(ds, num=4)
>>> print([item for item in ds1]) [0, 1, 2, 3] >>> print([item for item in ds2]) [4]
- class cascade.data.ValidationError(message: str | None = None, error_index: int | None = None)[source]#
Base class to raise if data validation failed
Can provide additional information about the fail
- cascade.data.validate_in(f: Callable[[...], Any]) Callable[[...], Any] [source]#
Data validation decorator for callables. In each call validates only the input schema using type annotations if present. Does not check return value.
- Parameters:
f (Callable[[Any], Any]) – Function to wrap
- Returns:
Decorated function
- Return type:
Callable[[Any], Any]
- class cascade.data.VersionAssigner(**kwargs)[source]#
Class for automatic data versioning using metadata.
VersionAssigner
is a simpleModifier
that tracks changes in metadata and assigns dataset a version considering changes in meta. The version consists of two parts, namely major and minor in the formatMAJOR.MINOR
just like in semantic versioning. The meaning of parts is the following: major number changes if there are changes in the structure of the pipeline e.g. some dataset was added/removed; minor number changes in case of any metadata change e.g. new data arrived and changed the length of modifiers on pipeline.Example
>>> # Set up the pipeline >>> from cascade import data as cdd >>> ds = cdd.Wrapper([0, 1, 2, 3, 4]) >>> ds = VersionAssigner(ds, 'data_log.yml') # can be any supported meta format >>> print(ds.version) 0.0
>>> # Changes its structure - add new modifier >>> ds = cdd.Wrapper([0, 1, 2, 3, 4]) >>> ds = cdd.RangeSampler(ds, 0, len(ds), 2) >>> ds = VersionAssigner(ds, 'data_log.yml') >>> print(ds.version) 1.0
>>> # Revert changes - version downgrades back >>> ds = cdd.Wrapper([0, 1, 2, 3, 4]) >>> ds = VersionAssigner(ds, 'data_log.yml') >>> print(ds.version) 0.0
>>> # Update input data - minor update >>> ds = cdd.Wrapper([0, 1, 2, 3, 4, 5]) >>> ds = VersionAssigner(ds, 'data_log.yml') >>> print(ds.version) 0.1
Note
Some limitations are present. If meta data of some dataset has something random or run-dependent like for example memory address of an object or time of creation, then the version will bump on every run.
- __init__(dataset: BaseDataset[T], path: str, verbose: bool = False, *args: Any, **kwargs: Any) None [source]#
- Parameters:
dataset (Dataset) – a dataset to infer version to
path (str) – a path to a version log file of this dataset can be of any supported meta format
- cascade.data.version(ds: BaseDataset[T], path: str) str [source]#
Returns version of a dataset using VersionAssigner
- Parameters:
ds (Dataset[T]) – Dataset to track and version
path (str) – Path to the version log of a dataset, will be created if does not exists
- Returns:
Version in two parts like 2.1 or 0.1
- Return type:
str
See also