Producers
Imports¶
import sys
import numpy as np
import matplotlib.pyplot as plt
from openseize import producer
from openseize.core.producer import as_producer
from openseize import demos
from openseize.file_io import edf, annotations
Introduction¶
The size of an EEG dataset depends on three factors, the number of signals acquired, the sampling rate of each signal and the duration of the measurement. Recent advances in electrode and data acquistion hardware allow for increases in each of these factors such that the resulting dataset may not fit into the virtual (RAM) memory of a user's computer.
To address this, openseize uses an iterable object -- the producer, an object that sequentially produces numpy arrays from a data source. This data source can be a sequence, an ndarray, a file stored to disk, or even a generator function that produces data itself. In this demo, we will cover producer creation routines, attributes, and methods.
Creation Routines¶
All producers, no matter the data source, are constructed using the produce() function. To see what arguments are needed to build a producer, we can look at the producer function documentation.
help(producer)
Help on function producer in module openseize.core.producer: producer(data: Union[numpy.ndarray[Any, numpy.dtype[+ScalarType]], Iterable[numpy.ndarray[Any, numpy.dtype[+ScalarType]]], openseize.file_io.edf.Reader, Callable, ForwardRef('Producer')], chunksize: int, axis: int, shape: Optional[Sequence[int]] = None, mask: Optional[numpy.ndarray[Any, numpy.dtype[numpy.bool_]]] = None, **kwargs) -> 'Producer' Constructs an iterable that produces ndarrays of length chunksize along axis during iteration. This constructor returns an object that is capable of producing ndarrays or masked ndarrays during iteration from a single ndarray, a sequence of ndarrays, a file Reader instance (see io.bases.Reader), an ndarray generating function, or a pre-existing producer of ndarrays. The produced ndarrays from this object will have length chunksize along axis. Args: data: An object from which ndarrays will be produced from. Supported types are Reader instances, ndarrays, sequences, generating functions yielding ndarrays, or other Producers. For sequences and generator functions it is required that each subarray has the same shape along all axes except for the axis along which chunks will be produced. chunksize: The desired length along axis of each produced ndarray. axis: The sample axis of data that will be partitioned into chunks of length chunksize. shape: The combined shape of all ndarrays from this producer. This parameter is only required when object is a generating function and will be ignored otherwise. mask: A boolean describing which values of data along axis should by produced. Values that are True will be produced and values that are False will be ignored. If None (Default), producer will produce all values from object. kwargs: Keyword arguments specific to data type that ndarrays will be produced from. For Reader instances, valid kwargs are padvalue (see io.bases.Readers and io.edf.Reader) For generating functions, all the positional and keyword arguments must be passed to the function through these kwargs to avoid name collisions with the producer func arguments. Returns: An iterable of ndarrays of shape chunksize along axis.
So the producer function needs a data source, a chunksize describing the number of samples that should be included in each produced subarray, the axis along which samples lie, and possibly a shape and mask. We will cover each of these parameters in detail in this demo.
Producers From Arrays¶
To create a producer from an array may seem silly. Isn't the array already in memory? Well, yes it is but maybe that array is consuming a lot of your memory and you can't do anything with the array. By creating a producer, you can work with the produced values using any of the openseize functions (downsample, filter, etc) while still holding the array in-memory.
Let's make an array and then create a producer to demonstrate this utility.
# create a reproducible random data array with 4 channels and 1 million samples along axis=1
rng = np.random.default_rng(1234)
data = rng.random((4, 1000000))
# lets also print data's memory consumption
print('data is using = {} MB'.format((data.size * data.itemsize)/1e6))
data is using = 32.0 MB
# build a producer declaring that we want the producer to yield arrays of size 300000
# using the samples along the last axis
pro = producer(data, chunksize=300000, axis=-1)
# lets checkout the producer's memory consumption
print('pro is using = {} Bytes'.format(sys.getsizeof(pro)))
pro is using = 48 Bytes
This is the first important point about producers. Producers do not store data, they are iterables that know how to yield data to you on-the-fly. Let's see what the producers attributes are.
print(pro)
ArrayProducer Object ---Attributes & Properties--- {'data': array([[0.97669977, 0.38019574, 0.92324623, ..., 0.02049864, 0.84033509, 0.07061386], [0.32584251, 0.01559622, 0.16734471, ..., 0.48613722, 0.13466647, 0.78129557], [0.45169665, 0.44011763, 0.0325013 , ..., 0.86914401, 0.5904367 , 0.4616979 ], [0.84830865, 0.97995714, 0.63405179, ..., 0.7236714 , 0.80536627, 0.77495984]]), 'axis': -1, 'kwargs': {}, 'chunksize': 300000, 'shape': (4, 1000000)} Type help(ArrayProducer) for full documentation
The producer instance is holding a reference to the data array, the sample axis, the chunksize of subarrays that will be produced, and the shape of the referenced data. Let's try to get each subarray from the producer. Wait.. how do we do that?.
Since the producer is an iterable, you can access each subarray just like any iterable, Any method that triggers python's iteration protocol will give you the subarrays in the producer. This could be a for-loop, a list comprehension, or an explicit call to the iter and next builtin methods. Lets access each produced array in a loop.
# loop to access each subarray printing it's shape and first 5 of samples for each channel
for idx, subarr in enumerate(pro):
print('Array {}, shape={}'.format(idx, subarr.shape))
print(subarr[:, :5])
Array 0, shape=(4, 300000) [[0.97669977 0.38019574 0.92324623 0.26169242 0.31909706] [0.32584251 0.01559622 0.16734471 0.12427613 0.25749222] [0.45169665 0.44011763 0.0325013 0.02906749 0.20707769] [0.84830865 0.97995714 0.63405179 0.71921724 0.34165105]] Array 1, shape=(4, 300000) [[0.19975295 0.38469445 0.31663237 0.32026263 0.85713905] [0.68094421 0.67678136 0.02969927 0.90235448 0.79731081] [0.3700237 0.60763138 0.04216831 0.57699506 0.04456521] [0.54071085 0.82855925 0.09775676 0.03968656 0.65453465]] Array 2, shape=(4, 300000) [[0.92858655 0.05528663 0.88124263 0.28606888 0.54164412] [0.95592965 0.80143229 0.09263899 0.72895997 0.85988591] [0.7104101 0.58855675 0.11348623 0.5171883 0.90972664] [0.48743344 0.00490091 0.20384552 0.91139126 0.04721849]] Array 3, shape=(4, 100000) [[0.78483056 0.93115015 0.41382943 0.38030702 0.75412888] [0.4725766 0.14425412 0.15515715 0.71459954 0.30351422] [0.34821652 0.89459182 0.1399783 0.21133067 0.58058115] [0.78146378 0.0234853 0.10318636 0.26773979 0.75606789]]
Be sure not to miss that the last array the producer yielded was smaller than the previous 3. Why? Remember the data shape is (4, 1e6) and 1e6 is not perfectly divisible by 300,000. In fact, the last array yielded is of course 1e6 % 300,000 = 100,000 samples long. Important question Is the producer exhausted?
# test if producer can produce again
for idx, subarr in enumerate(pro):
print('Array {}, shape={}'.format(idx, subarr.shape))
Array 0, shape=(4, 300000) Array 1, shape=(4, 300000) Array 2, shape=(4, 300000) Array 3, shape=(4, 100000)
This is critical: the producer is an iterable not a one-shot iterator. It can go through the data as many times as you need.
Now if you are skeptical (like any good scientist) you are probably wondering. How do I know that the produced values exactly match the original data source. Let's demonstrate that all the produced data exactly matches the original data source.
# demonstrate that the produced arrays match the original data source 'data'
# concatenate all produced subarrays along the last sample axis.
produced_array = np.concatenate([subarr for subarr in pro], axis=1)
#now test if the combined produced arrays match the original data array
print('Fingers crossed.. Do they match? -> {}'.format(np.allclose(produced_array, data)))
Fingers crossed.. Do they match? -> True
Our method of testing array equality required us to concatenate the produced arrays. Since converting a producer to an ndarray is likely something you'll need often, it is a formal method of each producer instance called to_array. Let's call this important method and repeat our test.
# demonstrate that the produced arrays match the original data source 'data'
# concatenate all produced subarrays along the last sample axis using the producer's to_array method.
produced_array = pro.to_array()
#now test if the combined produced arrays match the original data array
print('Match? -> {}'.format(np.allclose(produced_array, data)))
Match? -> True
Of course this was just one test. If you need to see more tests to be convinced, please see openseize.core.tests.producer_tests for the formal pytesting.
Producers From Sequences¶
As you might guess from our discussion on producers built from arrays, producers can be built from any sequence. Let's show that producers can be built from sequences.
# make a fun sequence from monty-python
my_seq = [["(Knight) Tis but a scratch."],
["(Arthur) A scratch? Your arm's off!"],
["(Knight) No, it isn't."],
["(Arthur) Well, what's that then?"]]
# convert it to a scene producer
scene = producer(my_seq, chunksize=1, axis=-1)
# play the scene out
for dialog in scene:
print(dialog)
['(Knight) Tis but a scratch.'] ["(Arthur) A scratch? Your arm's off!"] ["(Knight) No, it isn't."] ["(Arthur) Well, what's that then?"]
There is no restriction on the datatype that can be produced as the above snippet demonstrates. As such, you might find producers useful for other large tasks that you need to break into subproblems.
Producers From Files¶
Producing data from a file stored to disk that is too large to fit into virtual memory is one of the most important use cases for producers. Here we are going to open a European data format(+) binary file type and produce arrays from it. A detailed demo of this important file reader can be found in the file_reading demo.
Demo Data¶
In order to produce from a file, we will need demo data. Openseize includes a sample edf file called recording_001.edf. Where is this file?
When we imported the demos module, we got a paths object that has two methods. The available method lists all the datasets available in the local demos/data directory as well as the demo data available in a remote Zenodo repository.The locate method will return a local filepath to a named dataset. To do this, locate may need to download the data first depending on whether the data file is already on your system.
# check out the available demo data including with openseize
# this will include any local data in demos/data and remote data stored at openseizes Zenodo repository.
demos.paths.available
---Available demo data files & location--- ------------------------------------------ annotations_001.txt '/home/matt/python...nnotations_001.txt' recording_001.edf '/home/matt/python.../recording_001.edf' 5872_Left_group A.txt '/home/matt/python...2_Left_group A.txt' split0.edf '/home/matt/python...os/data/split0.edf' 5872_Left_group A-D.edf '/home/matt/python...Left_group A-D.edf' irregular_write_test.edf '/home/matt/python...lar_write_test.edf' write_test.edf '/home/matt/python...ata/write_test.edf' CW0259_SWDs.npy '/home/matt/python...ta/CW0259_SWDs.npy' subset_001.edf '/home/matt/python...ata/subset_001.edf' split1.edf '/home/matt/python...os/data/split1.edf'
# if not in on your machine, a confirmation box will open to confirm your download
recording_path = demos.paths.locate('recording_001.edf')
# now confirm the recording_001.edf file is on your machine
demos.paths.available
---Available demo data files & location--- ------------------------------------------ annotations_001.txt '/home/matt/python...nnotations_001.txt' recording_001.edf '/home/matt/python.../recording_001.edf' 5872_Left_group A.txt '/home/matt/python...2_Left_group A.txt' split0.edf '/home/matt/python...os/data/split0.edf' 5872_Left_group A-D.edf '/home/matt/python...Left_group A-D.edf' irregular_write_test.edf '/home/matt/python...lar_write_test.edf' write_test.edf '/home/matt/python...ata/write_test.edf' CW0259_SWDs.npy '/home/matt/python...ta/CW0259_SWDs.npy' subset_001.edf '/home/matt/python...ata/subset_001.edf' split1.edf '/home/matt/python...os/data/split1.edf'
Building Data Readers¶
Ok, so we have a path to a data file. So can we make a producer using this path? No, openseize is a highly extensible package that will support many different file types. Each of these file types will need to be read according to its own protocol. For reading EDF files, openseize has an EDF reader that reads... well EDF files. We discuss file readers in the file_reading demo. For now, we will make the EDF reader and explain the steps as we go along.
# how do we build an edf reader -- ask for help!
help(edf.Reader)
Help on class Reader in module openseize.file_io.edf: class Reader(openseize.file_io.bases.Reader) | Reader(path: Union[str, pathlib.Path]) -> None | | A reader of European Data Format (EDF/EDF+) files. | | This reader supports reading EEG data and metadata from an EDF file with | and without context management (see Introduction). If opened outside | of context management, you should close this Reader's instance manually | by calling the 'close' method to recover open file resources when you | finish processing a file. | | Attributes: | header (dict): | A dictionary representation of the EDFs header. | shape (tuple): | A (channels, samples) shape tuple. | channels (Sequence): | The channels to be returned from the 'read' method call. | | Examples: | >>> from openseize.demos import paths | >>> filepath = paths.locate('recording_001.edf') | >>> from openseize.io.edf import Reader | >>> # open a reader using context management and reading 120 samples | >>> # from all 4 channels | >>> with Reader(filepath) as infile: | >>> x = infile.read(start=0, stop=120) | >>> print(x.shape) | ... (4, 120) | | Method resolution order: | Reader | openseize.file_io.bases.Reader | abc.ABC | openseize.core.mixins.ViewInstance | builtins.object | | Methods defined here: | | __init__(self, path: Union[str, pathlib.Path]) -> None | Extends the Reader ABC with a header attribute. | | read(self, start: int, stop: Optional[int] = None, padvalue: float = nan) -> numpy.ndarray[typing.Any, numpy.dtype[numpy.float64]] | Reads samples from this EDF from this Reader's channels. | | Args: | start: | The start sample index to read. | stop: | The stop sample index to read (exclusive). If None, samples | will be read until the end of file. | padvalue: | Value to pad to channels that run out of samples to return. | Only applicable if sample rates of channels differ. | | Returns: | A float64 array of shape len(chs) x (stop-start) samples. | | ---------------------------------------------------------------------- | Readonly properties defined here: | | shape | Returns a 2-tuple containing the number of channels and | number of samples in this EDF. | | ---------------------------------------------------------------------- | Data descriptors defined here: | | channels | Returns the channels that this Reader will read. | | ---------------------------------------------------------------------- | Data and other attributes defined here: | | __abstractmethods__ = frozenset() | | ---------------------------------------------------------------------- | Methods inherited from openseize.file_io.bases.Reader: | | __enter__(self) | Return reader instance as target variable of this context. | | __exit__(self, exc_type, exc_value, traceback) | On context exit, close this reader's file object and propagate | errors by returning None. | | close(self) | Close this reader instance's opened file object and destroy the | reference to the file object. | | File descriptors whether opened or closed are not serializable. To | support concurrent processing we close & remove all references to the | file descriptor on close. | | open(self) | Opens the file at path for reading & stores the file descriptor to | this Reader's '_fobj' attribute. | | ---------------------------------------------------------------------- | Data descriptors inherited from openseize.file_io.bases.Reader: | | __dict__ | dictionary for instance variables (if defined) | | __weakref__ | list of weak references to the object (if defined) | | ---------------------------------------------------------------------- | Methods inherited from openseize.core.mixins.ViewInstance: | | __repr__(self) | Returns the __init__'s signature as the echo representation. | | Returns: str | | __str__(self) | Returns this instances print representation.
Under the 'Methods' section we see that to make a reader all we need is an edf path.
# build the reader using the recording path we fetched
reader = edf.Reader(recording_path)
Also notice under the methods there is just one method called read. It reads samples from the EDF file between start and stop sample indices for each channel in channels list. Lets see if this does something.
values = reader.read(start=0, stop=10) #channels unspecified means read all channels
print(values)
[[-19.87908032 7.95793213 19.88808032 18.89390131 18.89390131 43.74837671 6.96375311 30.8240495 5.9695741 22.87061737] [-86.4890744 51.70180884 63.63195703 88.48643243 63.63195703 61.643599 54.68434589 43.74837671 55.6785249 64.62613605] [-85.49489539 44.74255573 29.82987048 79.53882129 52.69598785 42.75419769 42.75419769 21.87643835 60.64941998 66.61449408] [ 62.63777802 95.44568555 77.55046326 36.7891236 109.36419177 118.31180292 122.28851898 115.32926587 76.55628424 44.74255573]]
So we got an array of shape (4,10) back. Is that right?
# examine the reader with print
print(reader)
Reader Object ---Attributes & Properties--- {'path': PosixPath('/home/matt/python/nri/openseize/src/openseize/demos/data/recording_001.edf'), 'mode': 'rb', 'kwargs': {}, 'header': {'version': '0', 'patient': 'PIN-42 M 11-MAR-1952 Animal', 'recording': 'Startdate 15-AUG-2020 X X X', 'start_date': '15.08.20', 'start_time': '09.59.15', 'header_bytes': 1536, 'reserved_0': 'EDF+C', 'num_records': 3775, 'record_duration': 1.0, 'num_signals': 5, 'names': ['EEG EEG_1_SA-B', 'EEG EEG_2_SA-B', 'EEG EEG_3_SA-B', 'EEG EEG_4_SA-B', 'EDF Annotations'], 'transducers': ['8401 HS:15279', '8401 HS:15279', '8401 HS:15279', '8401 HS:15279', ''], 'physical_dim': ['uV', 'uV', 'uV', 'uV', ''], 'physical_min': [-8144.31, -8144.31, -8144.31, -8144.31, -1.0], 'physical_max': [8144.319, 8144.319, 8144.319, 8144.319, 1.0], 'digital_min': [-8192.0, -8192.0, -8192.0, -8192.0, -32768.0], 'digital_max': [8192.0, 8192.0, 8192.0, 8192.0, 32767.0], 'prefiltering': ['none', 'none', 'none', 'none', ''], 'samples_per_record': [5000, 5000, 5000, 5000, 1024], 'reserved_1': ['', '', '', '', '']}, 'channels': [0, 1, 2, 3], 'shape': (4, 18875000)} Type help(Reader) for full documentation
Awesome! Printing the reader gives us the EDFs header which tells us everything we need to know. The file contains 4 channels named [EEG EEG_1_SA-B,... EEG EEG_4_SA-B]. So our array shape makes sense.
Can we finally build a producer? YES, producers can produce from any reader type as long as the reader has a method called read. Lucky for us the EDF reader we just built has a read method.
# build a producer from our edf reader instance
# the chunksize will be set to 100k samples
# the axis should be 1 since the reader has samples along this axis
rpro = producer(reader, chunksize=100000, axis=1)
# lets check out the data and attributes of this producer
print(rpro)
ReaderProducer Object ---Attributes & Properties--- {'data': Reader(path: Union[str, pathlib.Path]) -> None, 'axis': 1, 'kwargs': {}, 'chunksize': 100000, 'shape': (4, 18875000)} Type help(ReaderProducer) for full documentation
# and lets checkout the producer's memory consumption
print('pro is using = {} Bytes'.format(sys.getsizeof(pro)))
pro is using = 48 Bytes
# how much memory would we use if we loaded all the data in at once?
size = np.multiply(*rpro.shape)
itemsize = 8 #8 bytes in a float64
print('data would be = {} MB'.format((size * itemsize)/1e6))
data would be = 604.0 MB
# lets also get the shape of each produced array
shapes = [arr.shape for arr in rpro]
print(shapes)
[(4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 100000), (4, 75000)]
So this ReaderProducer is giving us access to a little over 600 MB of data using only 48 bytes! Lets test to make sure the produced values equal the values from the file read in as a single array.
# This will consume 604 MB of data
produced_array = rpro.to_array()
# test if this matches the array from reading all the data
read_array = reader.read(0, stop=None) #if stop is None the reader reads to the end of file
print('Do they match...', np.allclose(produced_array, read_array))
Do they match... True
Masked Producers¶
Masking with Annotations¶
While the producer will produce sequential chunks of data from the start of file to the end of a file, it is common that researchers only want to analyze specific sections of the data. To support non-contiguous production of arrays from a data source, producers can be initialized with a mask. Let's see how to use an annotation file to create a mask so that the producer only produces data we want.
# again lets check the available demo data
demos.paths.available
---Available demo data files & location--- ------------------------------------------ annotations_001.txt '/home/matt/python...nnotations_001.txt' recording_001.edf '/home/matt/python.../recording_001.edf' 5872_Left_group A.txt '/home/matt/python...2_Left_group A.txt' split0.edf '/home/matt/python...os/data/split0.edf' 5872_Left_group A-D.edf '/home/matt/python...Left_group A-D.edf' irregular_write_test.edf '/home/matt/python...lar_write_test.edf' write_test.edf '/home/matt/python...ata/write_test.edf' CW0259_SWDs.npy '/home/matt/python...ta/CW0259_SWDs.npy' subset_001.edf '/home/matt/python...ata/subset_001.edf' split1.edf '/home/matt/python...os/data/split1.edf'
# lets fetch the annotations_001.edf. This contains user annotations for recording_001.edf
filepath = demos.paths.locate('annotations_001.txt')
# lets have a look at this file's contents
with open(filepath, 'r') as infile:
for idx, line in enumerate(infile):
print(idx, line)
0 Experiment ID Experiment 1 Animal ID Animal 2 Researcher Test 3 Directory path 4 5 6 Number Start Time End Time Time From Start Channel Annotation 7 0 08/15/20 09:59:15.215 08/15/20 09:59:15.215 0.0000 ALL Started Recording 8 1 08/15/20 10:00:00.000 08/15/20 10:00:00.000 44.7850 ALL Qi_start 9 2 08/15/20 10:00:25.000 08/15/20 10:00:30.000 69.7850 ALL grooming 10 3 08/15/20 10:00:45.000 08/15/20 10:00:50.000 89.7850 ALL grooming 11 4 08/15/20 10:02:15.000 08/15/20 10:02:20.000 179.7850 ALL grooming 12 5 08/15/20 10:04:36.000 08/15/20 10:04:41.000 320.7850 ALL exploring 13 6 08/15/20 10:05:50.000 08/15/20 10:05:55.000 394.7850 ALL exploring 14 7 08/15/20 10:08:50.000 08/15/20 10:08:55.000 574.7850 ALL rest 15 8 08/15/20 10:10:14.000 08/15/20 10:10:19.000 658.7850 ALL exploring 16 9 08/15/20 10:17:10.000 08/15/20 10:17:15.000 1074.7850 ALL rest 17 10 08/15/20 10:35:49.000 08/15/20 10:35:54.000 2193.7850 ALL rest 18 11 08/15/20 10:40:00.000 08/15/20 10:40:00.000 2444.7850 ALL Qi_stop 19 12 08/15/20 11:02:09.879 08/15/20 11:02:09.879 3774.6640 ALL Stopped Recording
This file contains 13 annotations; these annotations include user start and stop, rest, grooming and exploring. Our goal is to use these annotations to build a producer that produces values for only rest and exploring times.
Openseize has a set of annotation file readers. This file is in the Pinnacle format so we will use the Pinnacle annotations reader (this is described further in the file_reading demo).
# for details on reading Pinnacle files see the file_reading demo
# Pinnacle files need a path & the column header line (start)
with annotations.Pinnacle(path=filepath, start=6) as areader:
# read creates a list of annotation dataclass instances that contain the label,
# time (in secs), duration (in secs) and the channel(s) the annotation was marked
# for.
annotes = areader.read(labels=['rest','exploring'])
# lets print each annotation instance in the read_annotes list
for an in annotes:
print(an)
#you can see the times match the time from the start of recording in the text file.
Annotation(label='exploring', time=320.785, duration=5.0, channel='ALL') Annotation(label='exploring', time=394.785, duration=5.0, channel='ALL') Annotation(label='rest', time=574.785, duration=5.0, channel='ALL') Annotation(label='exploring', time=658.785, duration=5.0, channel='ALL') Annotation(label='rest', time=1074.785, duration=5.0, channel='ALL') Annotation(label='rest', time=2193.785, duration=5.0, channel='ALL')
So how do we take these Annotation dataclass instances and turn them into a mask for a producer?
The annotations module in openseize has a function called as_mask
help(annotations.as_mask)
Help on function as_mask in module openseize.file_io.annotations: as_mask(annotations: Sequence[openseize.file_io.bases.Annotation], size: int, fs: float, include: bool = True) -> numpy.ndarray[typing.Any, numpy.dtype[numpy.bool_]] Creates a boolean mask from a sequence of annotation dataclass instances.. Producers of EEG data may receive an optional boolean array mask. This function creates a boolean mask from a sequence of annotations and is therefore useful for filtering EEG data by annotation label during processing. Args: annotations: A sequence of annotation dataclass instances to convert to a mask. size: The length of the boolean array to return. fs: The sampling rate in Hz of the digital system. include: Boolean determining if annotations should be set to True or False in the returned array. True means all values are False in the returned array except for samples where the annotations are located. Returns: A 1-D boolean array of length size. Examples: >>> # read the annotations from the demo annotation file >>> from openseize.demos import paths >>> filepath = paths.locate('annotations_001.txt') >>> from openseize.io.annotations import Pinnacle >>> # read the 'rest' annotations >>> with Pinnacle(filepath, start=6) as pinnacle: >>> annotations = pinnacle.read(labels=['rest']) >>> # create a mask measuring 3700 secs at 5000 Hz >>> mask = as_mask(annotations, size=3700*5000, fs=5000) >>> # measure the total time in secs of 'rest' annotation >>> print(np.count_nonzero(mask) / 5000) 15.0
To make a mask we need the annotation dataclasses, the length of the mask, and the sampling rate. The sampling rate is needed to convert the time of the annotation into a sample number.
# make a mask the same size as the number of samples in the reader & set values at the annotes to True
mask = annotations.as_mask(annotes, size=reader.shape[-1], fs=5000, include=True)
print(mask.shape)
(18875000,)
Note that the size of the mask is the same size as all the samples in the reader. So now we are ready to make a Masked Producer.
masked_pro = producer(reader, chunksize=100000, axis=-1, mask=mask)
print(masked_pro)
MaskedProducer Object ---Attributes & Properties--- {'data': ReaderProducer(data, chunksize, axis, **kwargs), 'axis': -1, 'kwargs': {}, 'mask': ArrayProducer(data, chunksize, axis, **kwargs), 'chunksize': 100000, 'shape': (4, 150000)} Type help(MaskedProducer) for full documentation
Notice the size of the masked producer is now 150K. This is because we are keeping rest and exploring periods of the EEG which amounts to 30 secs of data @ 5 KHz = 150K samples
# as before, we can loop over the producer with a mask
# this will give us the shape of the produced arrays ONLY during periods of rest and exploring
shapes = [arr.shape for arr in masked_pro]
print(shapes)
[(4, 100000), (4, 50000)]
In the above example we built a mask from a list of annotation dataclass instances but a masked producer can be built using any numpy boolean array. This allows for spohisticated masking conditions. For example to mask between certain hours of the EEG and filter out periods like grooming, you can construct 2 mask and take their intersection as the mask to apply to the producer.
Producers from Generating functions¶
A major question that developers of Openseize needed to address is What good is a producer if you have to convert it into an array in order to compute something?
To address this, Openseize relies on converting generating functions into producers. This allows for computations to be computed on-the-fly inside the body of a generating function and converted into a multitransversal producer iterable. This is a lot to take in so we are going to go through a few simple examples.
To wield openseize taking advantage of its high memory effeciency, you'll need to familarize yourself with generating functions: generating functions tutorial
# lets start by re-examining our producer built from the values in the edf file
print(rpro)
ReaderProducer Object ---Attributes & Properties--- {'data': Reader(path: Union[str, pathlib.Path]) -> None, 'axis': 1, 'kwargs': {}, 'chunksize': 100000, 'shape': (4, 18875000)} Type help(ReaderProducer) for full documentation
Now lets say that we want to take every value in this producer and square it. To do this we will need to loop over the producer but we don't want to store all the squared values to an array since that take a lot of memory. The solution is to build a generating function.
def squared(pro):
"""A generating function that yields the squared values of each subarray in a producer."""
for arr in pro:
yield arr**2
Now lets make our generator and run it. We will compute the largest squared value in the data.
# compute the max squared value across all chunks
gen = squared(rpro) #make a generator
extreme = 0
cnt = 0
for arr in gen:
# max of this sub arr
m = np.max(arr)
cnt += 1
if m > extreme:
extreme = m
print('The largest squared value of the {} arrays is {}'.format(cnt, extreme))
The largest squared value of the 189 arrays is 1061277445.6032624
The problem with this approach is that the generator 'gen' has now been exhausted. So anytime we need the square values we have to make a new generator from the generating function. To see this try to get the generator's next value...
try:
next(gen)
except StopIteration:
print ("Can't do it-- this gen is empty")
Can't do it-- this gen is empty
Another problem is that the generator can't be copied. When you make a copy (called teeing) and advance one generator, you advance all copies of the generator. This is a huge problem because we need to make copies in order to advance generators independently to solve problems where the computed value at a sample depends on the surrounding sample values.
The way out of these problems is to make producers from generators. Lets go through this.
# lets make a producer of squared values using the squared generating function
# notice we pass in the generating function, a chunksize, an axis and the shape of all arrays
# that will be yielded by the generating function. Since it just squares values, the shape is the
# same as the rpro shape. Lastly squared is a function and it needs a producer so we pass that as a
# keyword argument
squared_pro = producer(squared, chunksize=120000, axis=-1, shape=rpro.shape, pro=rpro)
# lastly notice that we gave a chunksize 120k that was different that rpro chunksize = 100k. That's ok,
# producers are smart enough to figure out how to collect values from the generating function until the
# new chunksize is reached.
# lets again compute the maximum squared value
extreme = 0
cnt = 0
for arr in squared_pro:
# max of this sub arr
m = np.max(arr)
cnt += 1
if m > extreme:
extreme = m
# the number of arrays will be less b/c we increased chunksize to 120k
print('The largest squared value of the {} arrays is {}'.format(cnt, extreme))
The largest squared value of the 158 arrays is 1061277445.6032624
# since squared_pro is a producer, it is multitransversal
cnt = 0
for sub_arr in squared_pro:
cnt += 1
print('squared_pro has {} subarrays.'.format(cnt))
squared_pro has 158 subarrays.
# making a copy of the producer is easy because openseize supports producer creation from producers.
squared_pro2 = producer(squared_pro, chunksize=10000, axis=-1) #These move independently from each other.
To recap, we can convert generating functions into producers. These producers are both multitransversal and support copying, features that are absent from generators. This makes them far more usable in numerical code.
To simplify creating producers from generating functions, Openseize uses a decorator called as_producer. This decorator can turn generating functions into producers without the need to explicitly call the producer creator "producer(...)". Lets see this in action.
#again we will make a squaring producer but this time using the as_producer decorator
@as_producer
def square_deco(pro):
for arr in pro:
yield arr**2
# lets again compute the maximum squared value
extreme = 0
cnt = 0
for arr in square_deco(rpro):
# max of this sub arr
m = np.max(arr)
cnt += 1
if m > extreme:
extreme = m
print('The largest squared value of the {} subarrays is {}'.format(cnt, extreme))
The largest squared value of the 189 subarrays is 1061277445.6032624
# since squared_pro is a producer it is multitransversal
cnt = 0
for sub_arr in square_deco(rpro):
cnt += 1
print('squared_pro has {} subarrays.'.format(cnt))
# Did we get the right number of subarrays
samples = rpro.shape[-1]
csize = rpro.chunksize
print('We excpected {} subarrays'.format(samples // csize + bool(samples % csize)))
squared_pro has 189 subarrays. We excpected 189 subarrays
This section brought us into the deep topic of converting generating functions into producers. Not all users will need this. Openseize provides many tools that you can just call like a regular function. Just know that under-the-hood, these functions are using this conversion. If you need to compute quantities that are not part of openseize and your data is too large for memory, then creating producers from generating functions is the way to go.
Producers From Producers¶
In the last section, we noted that you can copy producers and then use them independently. The construction of producer copies is straightforward.
# print our EDF data producer
print('Original Producer\n',rpro)
print('\n')
# make a new producer changing the chunksize
newpro = producer(rpro, chunksize=150000, axis=-1)
print('New Producer\n', newpro)
Original Producer ReaderProducer Object ---Attributes & Properties--- {'data': Reader(path: Union[str, pathlib.Path]) -> None, 'axis': 1, 'kwargs': {}, 'chunksize': 100000, 'shape': (4, 18875000)} Type help(ReaderProducer) for full documentation New Producer ReaderProducer Object ---Attributes & Properties--- {'data': Reader(path: Union[str, pathlib.Path]) -> None, 'axis': -1, 'kwargs': {}, 'chunksize': 150000, 'shape': (4, 18875000)} Type help(ReaderProducer) for full documentation
These producers can be iterated over independently.
Resource Recovery¶
Throughout the last 1/2 of this tutorial, we have used a reader instance. This instance uses resources to provide access to the opened edf file. It is important to close these instances when you are done to recover those resources. In the file-reading demo, we will show how to create a reader using a Context Manager which will automatically close the file when you are done with it.
# close the open reader instance
reader.close()
Summary¶
Producers are at the heart of openseize. All available methods in the modules of openseize can accept producer of arrays. This means openseize can compute quantities even when the input or output may not fit into virtual memory. Your data may not require this level of memory effeciency. If not, you can also supply ndarrays into openseize methods. Be aware that some methods may run faster using producer inputs and some may run faster using full arrays. The goal here is to give users options for how they want to use their machine's memory. Lastly, chunksizes can also effect the speed of computations. There is a goldilock's zone that will depend on the available memory of your machine. Openseize gives you the flexibility to adjust this parameter so you decide how much memory you want a process to take.