Source code for moten.extras

'''Compute total motion energy from greyscale videos.
import numpy as np

import moten
from moten.utils import (iterator_func,

# Make tqdm optional
with_tqdm = True
    from tqdm import tqdm
except ImportError:
    with_tqdm = False

[docs] def process_motion_energy_from_files(filenames, size=None, nimages=np.inf, batch_size=1000, dtype='float32', mask=None, ): ''' ''' import if not isinstance(filenames, (list, tuple)): filenames = [filenames] XTX = 0 NFRAMES = 0 for fl in filenames: generator = fl, size=size, nimages=nimages, dtype=dtype) if mask is not None: generator =, generator) nframes, xtx = pixbypix_covariance_from_frames_generator(generator, batch_size=batch_size, mask=mask) XTX += xtx NFRAMES = nframes return NFRAMES, XTX
[docs] def pixbypix_covariance_from_frames_generator(data_generator, batch_size=1000, output_nonlinearity=pointwise_square, dtype='float32'): '''Compute the pixel-by-pixel covariance from a video frame generator in batches. Parameters ---------- data_generator : generator object Yields a video frame of shape (vdim, hdim) batch_size : optional, int Number of frames to process simultaneously while output_nonlinearity : optiona, function A pointwise function applied to the pixels of each frame. Examples -------- >>> import moten >>> video_file = '' >>> small_size = (36, 64) # downsample to (vdim, hdim) 16:9 aspect ratio >>> fdiffgen =, size=small_size, nimages=333) >>> nimages, XTX = moten.extras.pixbypix_covariance_from_frames_generator(fdiffgen) # doctest: +SKIP ''' first_frame = next(data_generator) vdim, hdim = first_frame.shape npixels = vdim*hdim framediff_buffer = np.zeros((batch_size, npixels), dtype=dtype) XTX = np.zeros((npixels, npixels), dtype=dtype) nframes = 0 if with_tqdm: pbar = tqdm(desc='Computing pixel-by-pixel covariance (p=%i)'%npixels, total=0, unit='[frames]') else: print('Computing pixel-by-pixel covariance (p=%i)...'%npixels, end='') RUN = True while RUN: framediff_buffer *= 0.0 # clear buffer try: for batch_frame_idx in range(batch_size): frame_difference = next(data_generator).reshape(1, -1) framediff_buffer[batch_frame_idx] = output_nonlinearity(frame_difference) except StopIteration: RUN = False finally: if with_tqdm: pbar.update(batch_frame_idx + 1) else: print('.', end='') nframes += batch_frame_idx + 1 XTX += framediff_buffer.T @ framediff_buffer if with_tqdm: pbar.close() else: print('Finished #%i frames'%nframes) return nframes, XTX
[docs] class StimulusTotalMotionEnergy(object): '''Compute the principal components of the total motion energy. Total motion energy is defined as the squared difference between the previous and current frame. The pixel-by-pixel covariance of the total energy is computed frame-by-frame. Then, the spatial principal components are estimated. In a second pass, the temporal principal components are computed by projecting the total energy onto the spatial components. Parameters ---------- video_file : str Full path to the video file. The video must be greyscale. size : optional, tuple (vdim, hdim) The desired output image size. If specified, the image is scaled or shrunk to this size. If not specified, the original size is kept. nimages : optional, int If specified, only `nimages` frames are loaded. batch_size : optional, int Number of frames to process simultaneously while computing the pixel covariances. output_nonlinearity : optional, function Defaults to point-wise square. Notes ----- The time-by-pixel total motion energy matrix is defined as :math:`T`. Its singular value decomposition is :math:`U S V^{\intercal} = T`. The spatial components are :math:`V` and the temporal components are :math:`U`. As implemented in this class, * The spatial components computed are as above (:math:`V`). * The temporal compoonents are scaled by their singular values (:math:`US`). * The eigenvalues are the squared singular values (:math:`S^2`). Attributes ---------- decomposition_spatial_pcs : np.ndarray, (npixels, npcs) decomposition_temporal_pcs : list, (ntimepoints, npcs) decomposition_eigenvalues : np.ndarray, (min(npixels, nframes),) Examples -------- >>> import moten.extras >>> video_file = '' >>> small_size = (36, 64) # (vdim, hdim) >>> totalmoten = moten.extras.StimulusTotalMotionEnergy(video_file, small_size, nimages=300) >>> totalmoten.compute_pixel_by_pixel_covariance() >>> totalmoten.compute_spatial_pcs(npcs=10) >>> totalmoten.compute_temporal_pcs() ''' def __init__(self, video_file, size=None, nimages=np.inf, batch_size=1000, output_nonlinearity=pointwise_square, dtype='float32', mask=None, ): ''' ''' self.mask = mask self.size = size self.dtype = dtype self.nimages = nimages self.video_file = video_file self.batch_size = batch_size self.output_nonlinearity = output_nonlinearity
[docs] def get_frame_difference_generator(self): '''Return a video buffer that generates the difference between the previous and the current frame. ''' import generator = self.video_file, size=self.size, nimages=self.nimages, dtype=self.dtype) if self.mask is not None: generator =, generator) return generator
[docs] def compute_pixel_by_pixel_covariance(self, generator=None, ): '''Compute the pixel-by-pixel covariance of the total energy video. Notes ----- Covariance is estimated in batches. Parameters ---------- generator : optional The video frame difference generator. Defaults to the ``video_file`` used to instantiate the class. Attributes ---------- covariance_pixbypix : np.ndarray, (npixels, npixels) The full covarianc matrix covariance_nframes : int Number of frames used in estimating the covariance matrix. Defaults to the total number of frames in the video. npixels : int Total number of pixels in the video (after downsampling). ''' import moten.utils if generator is None: # allow the user to provide their own frame difference generator generator = self.get_frame_difference_generator() nframes, xtx = pixbypix_covariance_from_frames_generator( generator, batch_size=self.batch_size, dtype=self.dtype, output_nonlinearity=self.output_nonlinearity) self.covariance_pixbypix = xtx self.covariance_nframes = nframes self.npixels = xtx.shape[0]
[docs] def compute_spatial_pcs(self, npcs=None): '''Compute the principal components from the pixel-by-pixel total energy covariance matrix. Parameters ---------- npcs : optional, int Number of principal components to keep Attributes ---------- decomposition_spatial_pcs : np.ndarray, (npixels, npcs) decomposition_eigenvalues : np.ndarray ''' from scipy import linalg if npcs is None: npcs = min(self.npixels, self.covariance_nframes) + 1 # Recall the eigendecomposition # Q L QT = XTX # U,S,Vt = X # Q = V L, Q = linalg.eigh(self.covariance_pixbypix) # increasing order L = L[::-1] # eigenvals (npcs) Q = Q[:, ::-1] # eigenvecs (npixels, npcs) # store: (npixels, npcs) self.decomposition_spatial_pcs = np.asarray(Q[:, :npcs], dtype=self.dtype) self.decomposition_eigenvalues = np.asarray(L, dtype=self.dtype)
[docs] def compute_temporal_pcs(self, generator=None, skip_first=False): '''Extract the temporal principal components of the total motion energy. Parameters ---------- generator : optional The video frame difference generator. Defaults to the ``video_file`` used to instantiate the class. skip_first : optional, bool By default, set the first timepoint of all the PCs is set to zeros because the first timepoint corresponds to the difference between the first frame and nothing. If `skip_first=True`, then the first frame is removed from the timecourse. Attributes ---------- decomposition_temporal_pcs : list, (ntimepoints, npcs) The temporal compoonents are scaled by their singular values (:math:`US`). ''' import moten.utils if generator is None: # allow the user to provide their own frame difference generator generator = self.get_frame_difference_generator() if skip_first: # drop the first frame b/c the difference is with 0's # and so projection is with itself next(generator) self.decomposition_temporal_pcs = [] ## TODO: batch for faster performance for frame_diff in iterator_func(generator, 'projecting stimuli', unit='[frames]'): # flatten and square frame_diff = self.output_nonlinearity(frame_diff.reshape(1, -1)) projection = frame_diff @ self.decomposition_spatial_pcs self.decomposition_temporal_pcs.append(projection.squeeze()) if skip_first is False: # if not skipped, set the first frame to 0s # b/c its difference is not really defined self.decomposition_temporal_pcs[0][:] *= 0