Quickstart
This quickstart guide will walk you through how to build two fully iterative DSP pipelines in Openseize
Pipeline 1: We will compute the Power Spectral Density (PSD) of ~ 1 hour of data recorded from a mouse. Specifically, this will demonstrate how to:
- Build a data reader
- Build a producer that iterative yields data sections
- Build a lowpass FIR Filter that iteratively filters produced data
- Build and visulize the PSD of the lowpassed produced data
Pipeline 2: We will compute the PSDs of the same mouse in pipeline 1 but restrict the data to two behavioral states described in a seperate annotation file. To do this, we will:
- Build a data reader
- Read the associated behavior annotations
- Build masked producers that produce data during two different behavioral annotations
- Downsample the data in each masked producer
- Notch filter the downsampled data in each state
- Compute the Power Spectral Density (PSD) of the notch-filtered data in each state.
- Visualize differences between the PSDs measured in each behavioral state
Imports¶
import time
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Image
from openseize import demos
from openseize import producer
from openseize.file_io import edf, annotations
from openseize.filtering import fir, iir
from openseize.spectra import estimators
from openseize.resampling import resampling
Locate Demonstration Data¶
For both pipelines we will use the demonstration data that accompanies Openseize. The data set consist of a European Data Format (EDF) EEG file and a CSV annotation file. This demo data is stored at Zenodo. Openseize will download it if its not in your local demo data directory.
# Get access to the file's path locally, downloading if needed
# The eeg data will be used for both pipelines 1 and 2
eeg_path = demos.paths.locate('recording_001.edf')
# the annotations will be used in pipeline 2
annotes_path = demos.paths.locate('annotations_001.txt')
Build a Reader¶
To produce data from the file we need a reader that can read data samples. Openseize includes an EDF reader. Lets build this reader.
# The reader is initialized with a path to the edf file
eeg_reader = edf.Reader(eeg_path)
# printing the reader shows the metadata 'Header' from the edf file
print(eeg_reader)
Reader Object ---Attributes & Properties--- {'path': PosixPath('/home/matt/python/nri/openseize/src/openseize/demos/data/recording_001.edf'), 'mode': 'rb', 'kwargs': {}, 'header': {'version': '0', 'patient': 'PIN-42 M 11-MAR-1952 Animal', 'recording': 'Startdate 15-AUG-2020 X X X', 'start_date': '15.08.20', 'start_time': '09.59.15', 'header_bytes': 1536, 'reserved_0': 'EDF+C', 'num_records': 3775, 'record_duration': 1.0, 'num_signals': 5, 'names': ['EEG EEG_1_SA-B', 'EEG EEG_2_SA-B', 'EEG EEG_3_SA-B', 'EEG EEG_4_SA-B', 'EDF Annotations'], 'transducers': ['8401 HS:15279', '8401 HS:15279', '8401 HS:15279', '8401 HS:15279', ''], 'physical_dim': ['uV', 'uV', 'uV', 'uV', ''], 'physical_min': [-8144.31, -8144.31, -8144.31, -8144.31, -1.0], 'physical_max': [8144.319, 8144.319, 8144.319, 8144.319, 1.0], 'digital_min': [-8192.0, -8192.0, -8192.0, -8192.0, -32768.0], 'digital_max': [8192.0, 8192.0, 8192.0, 8192.0, 32767.0], 'prefiltering': ['none', 'none', 'none', 'none', ''], 'samples_per_record': [5000, 5000, 5000, 5000, 1024], 'reserved_1': ['', '', '', '', '']}, 'channels': [0, 1, 2, 3], 'shape': (4, 18875000)} Type help(Reader) for full documentation
The Header data tells us that this file has 4 channels x 18875000 samples and each 1 second record has 5000 samples so the sampling frequency is 5 KHz.
Build a producer¶
Producers are at the heart of Openseize, they can produce samples from files, ndarrays, sequences and even generating functions. We will now build a producer that will yield 100,000 samples from the file at a time.
# build a producer that will give us 100K samples at a time from all 4 channels
pro = producer(eeg_reader, chunksize=100e3, axis=-1)
Lowpass filter the producer¶
The magic of Openseize is that we can produce from producers. This allows us to construct sequences of DSP operations that only use the amount of memory needed to hold 1 array in memory at a time. Lets create our first DSP operation that will produce filtered values from our producer.
# build a Finite Impulse Response filter to filter the 5 KHz data to 200 Hz.
# The gpass is the acceptable ripple in the pass band in dB and the gstop is
# the minimum attenuation in the stop band. The transition band will 30 Hz wide
filt = fir.Kaiser(fpass=200, fstop=230, fs=5000, gpass=1, gstop=40)
Before we build a producer of filtered values, we should check that the filter looks correct. Filters in Openseize have a plot method which will help you check the:
- Impulse Response
- Filters Gain in dB (blue) and phase response (orange)
- Unscaled Gain
filt.plot()
The filter looks correct so we can now create a producer of filtered values from the producer of data values
# apply the filter to the producer using 100e3 elements from each channel
# this makes a new producer! The 'same' mode adjust for the filters delay.
filtpro = filt(pro, chunksize=100e3, axis=-1, mode='same')
Compute the Power Spectral Density (PSD)¶
We have a filtering producer which yields 4 x 100e3 element arrays of filtered data. We will now compute the PSD of each channel in this filtering producer. At this stage in the pipeline, Openseize's PSD estimator will get each data segment from the file, filter each segment and compute its PSD. Importantly, Openseize knows how to take care of the boundary conditions that arise at the ends of each segment so the result will be exactly the same as if you had the entire data in memory. Since the PSD operation decreases the data size, the estimator will now store the final PSD result in memory.
It is a good idea to open your system monitor to watch the memory consumption. Openseize's PSD operation will request and process only a single array from the filtering producer at any one time. Thus the max memory consumption will be a single filtered array. If you use Scipy, all the data (4, 18875000) will have to be loaded, then filtered and stored before the PSD is computed. This is the power of iterative computation in Openseize.
# Openseize's PSD estimator returns the number of psd estimatives, the
# frequencies at which the PSD is estimated and an array 4 channels X 2500 Hz (i.e. the nyquist)
t0 = time.perf_counter()
cnt, freqs, results = estimators.psd(filtpro, fs=5000, axis=-1, resolution=0.5)
elapsed = time.perf_counter() - t0
msg = 'PSD computed on producer of shape ({}, {}) in {} secs'
print(msg.format(*filtpro.shape, elapsed))
PSD computed on producer of shape (4, 18875000) in 7.686323934998654 secs
Notice that this operation took time. That's because all of the operations, data fetching, filtering, and PSD computation are now being executed. Remember, computations in Openseize only occur when you explicitly ask for them in a loop or ask the producer to convert itself to an ndarray. In this case, the PSD estimator is performing the looping over the data to return a result which triggers the data fetch and the filtering steps in-sequence.
Visualize the PSDs for each Channel¶
fig, ax = plt.subplots(figsize=(8,3))
for ch, arr in enumerate(results):
ax.plot(freqs, arr, label='Ch {}'.format(ch))
ax.legend()
ax.set_xlim([0,40])
ax.set_ylim(0, 1200)
ax.set_xlabel('Frequency (Hz)')
ax.set_ylabel(r'PSD ($\mu V^2 / Hz$)')
plt.show()
Pipeline 1 was simplistic and probably doesn't reflect the day-to-day analysis that you want to do. This pipeline will expand on pipeline 1 by considering the PSD during specific states, as well as considering more sophisticated DSP steps such as downsampling and notch filtering; operations you will likely care about. As before, all the DSP operations will be chained together into a fully iterative DSP pipeline that can scale to very large datasets that you can't address to memory.
Read the annotations¶
In this pipeline we want to compute the PSD during specific behavioral states. The annotations file that accompanies the demo eeg data contains three annotated states; 'rest', 'grooming' and 'exploring'. We will compute and compare the PSDs during the 'rest' and 'exploring' states.
First, we will need to read the annotations. Openseizes Pinnacle CSV reader can read specific annotations from a CSV file. It stores each annotation to an Annotations dataclass instance with a label, a time, a duration and the channel of the annotation. Lets get these annotations from the CSV file.
# Build the Pinnacle CSV reader and start reading from line 6
with annotations.Pinnacle(annotes_path, start=6) as reader:
#call the readers read method to get all the annotations as a sequence of dataclasses
annotes = reader.read()
# Filter the rest and exploring annotations and print them
rest_annotes = [ann for ann in annotes if ann.label=='rest']
explore_annotes = [ann for ann in annotes if ann.label=='exploring']
print(*rest_annotes, sep='\n')
print(*explore_annotes, sep='\n')
Annotation(label='rest', time=574.785, duration=5.0, channel='ALL') Annotation(label='rest', time=1074.785, duration=5.0, channel='ALL') Annotation(label='rest', time=2193.785, duration=5.0, channel='ALL') Annotation(label='exploring', time=320.785, duration=5.0, channel='ALL') Annotation(label='exploring', time=394.785, duration=5.0, channel='ALL') Annotation(label='exploring', time=658.785, duration=5.0, channel='ALL')
Convert annotations to masks¶
You are probably familiar with masking data by selecting data sections and writing out new files of "cleaned" data. With Openseize, there is no need to do that because you can construct 1-D boolean mask that give you control of which samples a producer will produce. In this case, we are only going to produce data values during the 15 secs of rest or exploring.
To build producers that will produce data from only the 'rest' and 'exploring' sections of the data we need to build a boolean mask. This mask should be the same length as the eeg data (18875000 samples). The annotations.as_mask function converts a sequence of Annotation data classes into a 1-D boolean mask that can then be used to construct a masked producer.
# Build masks for the rest and exploring states
rest_mask = annotations.as_mask(rest_annotes, size=eeg_reader.shape[-1], fs=5000)
explore_mask = annotations.as_mask(explore_annotes, size=eeg_reader.shape[-1], fs=5000)
print('rest mask: ', rest_mask, rest_mask.shape)
print('explore mask: ', explore_mask, explore_mask.shape)
rest mask: [False False False ... False False False] (18875000,) explore mask: [False False False ... False False False] (18875000,)
If your data has artifacts, you could create a mask of artifacts and combine the mask with the state mask using numpy's set routines
Build masked producers¶
Now that we have boolean masks we can create masked producers that will only produce data during the rest and exploring states. Since the amount of time the mouse is in these states is 15 secs @ 5KHz = 75000 samples we will set the chunksize to 10K samples. Its ok if the chunksize exceeds the samples -- it just means all the 75000 samples would be loaded. Here we'll use a chunksize of 10k to force Openseize to work iteratively to simulate working with large data.
# set the chunksize to 10000 samples to load into memory at any one-time
rest_pro = producer(eeg_reader, chunksize=10000, axis=-1, mask=rest_mask)
explore_pro = producer(eeg_reader, chunksize=10000, axis=-1, mask=explore_mask)
# print the rest producer
print(rest_pro)
MaskedProducer Object ---Attributes & Properties--- {'data': ReaderProducer(data, chunksize, axis, **kwargs), 'axis': -1, 'kwargs': {}, 'mask': ArrayProducer(data, chunksize, axis, **kwargs), 'chunksize': 10000, 'shape': (4, 75000)} Type help(MaskedProducer) for full documentation
Downsample the masked producers¶
Typically, downsampling is a 2-step process in which you first low-pass filter the data and then decimate the data. This is ineffecient because you throw away many filtered values. Openseize uses a very effecient polyphase decomposition to simultaneously filter and decimate the data in one-shot. We'll use this to downsample the data from 5000 Hz to 200 Hz.
rest_down_pro = resampling.downsample(rest_pro, M=25, fs=5000, chunksize=10000)
explore_down_pro = resampling.downsample(explore_pro, M=25, fs=5000, chunksize=10000)
print(rest_down_pro)
GenProducer Object ---Attributes & Properties--- {'data': functools.partial(<function polyphase_resample at 0x7f5db8bb7be0>, MaskedProducer(pro, mask, chunksize, axis, **kwargs), 1, 25, 5000, <class 'openseize.filtering.fir.Kaiser'>, -1), 'axis': -1, 'kwargs': {}, 'chunksize': 10000, 'shape': (4, 3000)} Type help(GenProducer) for full documentation
Notice that rest_down_pro is a new producer which will run polyphase resampling on the masked producer. You can also see that the shape has gone from 75000/25 = 3000 samples. Remember, nothing has been computed yet, we are just instructing the producers to mask the data then downsample the data. Computations in Openseize only occur when you explicitly ask for them in a loop or ask the producer to convert itself to an ndarray.
Notch filter the downsampling producers¶
Next we will remove any 60 Hz noise from the downsampling producer by notch filtering the data.
# build a notch filter that will stop 60 Hz with a width of 4 Hz on 200 Hz downsampled data
notch = iir.Notch(fstop=60, width=4, fs=200)
notch.plot()
# since the filter looks correct we can now apply it to the downsampling producer
# this gives us a new producer for each state
rpro = notch(rest_down_pro, chunksize=500, axis=-1)
epro = notch(explore_down_pro, chunksize=500, axis=-1)
Compute the PSD of each behavioral state¶
The time has now come when all of the DSP steps we have encoded; data fetching, data masking, downsampling, and notch filtering will be triggered. This occurs because the PSD estimation is a reducing operation whose results can always be addressed to memory. As such, the PSD loop will trigger each of our DSP steps to yield arrays one at a time. Each yielded array runs through all the DSP steps and is stored to memory. Again, Openseize works out all the important boundary conditions so that the result exactly matches what you would get if you could store the full data to memory.
# compute the PSD in each state -- this triggers all DSP operations to run iteratively
t0 = time.perf_counter()
rest_cnt, res_freqs, rest_results = estimators.psd(rpro, fs=200, axis=-1, resolution=0.5)
exp_cnt, exp_freqs, exp_results = estimators.psd(epro, fs=200, axis=-1, resolution=0.5)
print("rest and exploring PSDs computed in {} s".format(time.perf_counter() - t0))
rest and exploring PSDs computed in 8.593423744001484 s
Visualize PSDs from each behavioral state¶
We will now plot the PSDs for the first 3 channels (the last channel is an EMG) in each state. A word of caution. This data is demo data only; it is very small (15 seconds), the electrode impedances have not been compensated and the signals are non-stationary so very little should be concluded on the science side.
# plot the data of the EEG channels 0,1,2
fig, axarr = plt.subplots(1, 2, figsize=(10,3))
for ch, (rest, exp) in enumerate(zip(rest_results[:-1,:], exp_results[:-1,:])):
axarr[0].plot(res_freqs, rest)
axarr[1].plot(exp_freqs, exp, label='Ch {}'.format(ch))
axarr[1].legend()
axarr[0].set_xlim([0,100])
axarr[0].set_ylim(0, 100)
axarr[0].set_xlabel('Frequency (Hz)')
axarr[0].set_ylabel(r'PSD ($\mu V^2 / Hz$)')
axarr[0].set_title('rest state')
axarr[1].set_title('explore state')
plt.show()
Resource recovery¶
At the beginning of this quickstart, we created a reader instance. This reader instance contains an opened file object attribute that is currently using resources. To close this resource the reader has a close method.
Before closing the reader, lets ask why doesn't the producer automatically close the file object when it is done producing? Producers do not form a lock with a specific file meaning multiple producers could in theory be using the same file. For example you may be running 2 different DSP pipelines that rely on the same data file. If we close this file for one producer, we close it for all producers. Since this could be bad, Openseize relies on you to tell it when it is time to close the file object of a reader instance. Lets close it now.
# show the file is still open by peeking at the secret file obj
print('Is the reader closed? {}'.format(eeg_reader._fobj.closed))
Is the reader closed? False
# to close the reader (and file object) just call the close method
eeg_reader.close()
# when we close the reader we set it to None, the reason for this
# is to support multiprocessing of producers which can not work
# with an open or closed file objects. Lets confirm the file
# object is closed by checking it is None
print(eeg_reader._fobj)
None
In short, when you are done producing from a file call the readers close method to recover resources.
This guide has shown you how to construct fully iterative DSP pipelines that can scale to very large EEG datasets. The following image is a list of the currently available tools you can use to construct your own iterative pipelines. Openseize is a growing ecosystem of DSP tools so stay tuned for updates!
Image(filename=Path.joinpath(Path.cwd(), "imgs/types.png"))