Source code for moten.core

'''
'''
#
# Adapted from MATLAB code written by S. Nishimoto (see Nishimoto, et al., 2011).
# Anwar O. Nunez-Elizalde (Jan, 2016)
#
# Updates:
#  Anwar O. Nunez-Elizalde (Apr, 2020)
#
import itertools
import math
from PIL import Image

import numpy as np

from moten.backend import get_backend
from moten.utils import (DotDict,
                         iterator_func,
                         log_compress,
                         sqrt_sum_squares,
                         pointwise_square,
                         )


##############################
#
##############################

[docs] def raw_project_stimulus(stimulus, filters, vhsize=(), dtype='float32'): '''Obtain responses to the stimuli from all filter quadrature-pairs. Parameters ---------- stimulus : np.ndarray, (nimages, vdim, hdim) or (nimages, npixels) The movie frames. If `stimulus` is two-dimensional with shape (nimages, npixels), then `vhsize=(vdim,hdim)` is required and `npixels == vdim*hdim`. Returns ------- output_sin : np.ndarray, (nimages, nfilters) output_cos : np.ndarray, (nimages, nfilters) ''' # parameters if stimulus.ndim == 3: nimages, vdim, hdim = stimulus.shape stimulus = stimulus.reshape(stimulus.shape[0], -1) vhsize = (vdim, hdim) backend = get_backend() stimulus = backend.asarray(stimulus) # checks for 2D stimuli assert stimulus.ndim == 2 # (nimages, pixels) assert isinstance(vhsize, tuple) and len(vhsize) == 2 # (hdim, vdim) assert vhsize[0] * vhsize[1] == stimulus.shape[1] # hdim*vdim == pixels # Compute responses nfilters = len(filters) nimages = stimulus.shape[0] sin_responses = backend.zeros((nimages, nfilters), dtype=dtype) cos_responses = backend.zeros((nimages, nfilters), dtype=dtype) for gaborid, gabor_parameters in iterator_func(enumerate(filters), 'project_stimulus', total=len(filters)): sgabor0, sgabor90, tgabor0, tgabor90 = mk_3d_gabor(vhsize, **gabor_parameters) channel_sin, channel_cos = dotdelay_frames(sgabor0, sgabor90, tgabor0, tgabor90, stimulus) sin_responses[:, gaborid] = channel_sin cos_responses[:, gaborid] = channel_cos return sin_responses, cos_responses
[docs] def project_stimulus(stimulus, filters, quadrature_combination=sqrt_sum_squares, output_nonlinearity=log_compress, vhsize=(), dtype='float32'): '''Compute the motion energy filter responses to the stimuli. Parameters ---------- stimulus : np.ndarray, (nimages, vdim, hdim) or (nimages, npixels) The movie frames. If `stimulus` is two-dimensional with shape (nimages, npixels), then `vhsize=(vdim,hdim)` is required and `npixels == vdim*hdim`. Returns ------- filter_responses : np.ndarray, (nimages, nfilters) ''' # parameters if stimulus.ndim == 3: nimages, vdim, hdim = stimulus.shape stimulus = stimulus.reshape(stimulus.shape[0], -1) vhsize = (vdim, hdim) backend = get_backend() stimulus = backend.asarray(stimulus) # checks for 2D stimuli assert stimulus.ndim == 2 # (nimages, pixels) assert isinstance(vhsize, tuple) and len(vhsize) == 2 # (hdim, vdim) assert vhsize[0] * vhsize[1] == stimulus.shape[1] # hdim*vdim == pixels # Compute responses nfilters = len(filters) nimages = stimulus.shape[0] filter_responses = backend.zeros((nimages, nfilters), dtype=dtype) for gaborid, gabor_parameters in iterator_func(enumerate(filters), 'project_stimulus', total=len(filters)): sgabor0, sgabor90, tgabor0, tgabor90 = mk_3d_gabor(vhsize, **gabor_parameters) channel_sin, channel_cos = dotdelay_frames(sgabor0, sgabor90, tgabor0, tgabor90, stimulus) channel_response = quadrature_combination(channel_sin, channel_cos) channel_response = output_nonlinearity(channel_response) filter_responses[:, gaborid] = channel_response return filter_responses
############################## # core functionality ##############################
[docs] def mk_3d_gabor(vhsize, stimulus_fps, aspect_ratio='auto', filter_temporal_width='auto', centerh=0.5, centerv=0.5, direction=45.0, spatial_freq=16.0, spatial_env=0.3, temporal_freq=2.0, temporal_env=0.3, spatial_phase_offset=0.0, ): '''Make a motion energy filter. A motion energy filter is a 3D gabor with two spatial and one temporal dimension. Each dimension is defined by two sine waves which differ in phase by 90 degrees. The sine waves are then multiplied by a gaussian. Parameters ---------- vhsize : tuple of ints, (vdim, hdim) Size of the stimulus in pixels (vdim, hdim) `vdim` : vertical dimension `hdim` : horizontal dimension stimulus_fps : scalar, [Hz] Stimulus playback speed in frames per second. centerv : scalar Vertical filter position from top of frame (min=0, max=1.0). centerh : scalar Horizontal filter position from left of frame (min=0, max=aspect_ratio). direction : scalar, [degrees] Direction of filter motion. Degree position corresponds to standard unit-circle coordinates (i.e. 0=right, 180=left). spatial_freq : float, [cycles-per-image] Spatial frequency of the filter. temporal_freq : float, [Hz] Temporal frequency of the filter filter_temporal_width : int Temporal window of the motion energy filter (e.g. 10). Defaults to approximately 0.666[secs] (`floor(stimulus_fps*(2/3))`). aspect_ratio : optional, 'auto' or float-like, Defaults to stimulus aspect ratio: hdim/vdim Useful for preserving the spatial gabors circular even when images have non-square aspect ratios. For example, a 16:9 image would have `aspect_ratio`=16/9. spatial_env : float Spatial envelope (s.d. of the gaussian) temporal_env : float Temporal envelope (s.d. of gaussian) spatial_phase_offset : float, [degrees Phase offset for the spatial sinusoid Returns ------- spatial_gabor_sin : 2D np.ndarray, (vdim, hdim) spatial_gabor_cos : 2D np.ndarray, (vdim, hdim) Spatial gabor quadrature pair. ``spatial_gabor_cos`` has a 90 degree phase offset relative to ``spatial_gabor_sin`` temporal_gabor_sin : 1D np.ndarray, (`filter_temporal_width`,) temporal_gabor_cos : 1D np.ndarray, (`filter_temporal_width`,) Temporal gabor quadrature pair. ``temporal_gabor_cos`` has a 90 degree phase offset relative to ``temporal_gabor_sin`` Notes ----- Same method as Nishimoto, et al., 2011. ''' backend = get_backend() vdim, hdim = vhsize if aspect_ratio == 'auto': aspect_ratio = hdim/float(vdim) if filter_temporal_width == 'auto': filter_temporal_width = int(stimulus_fps*(2/3.)) # cast filter width to integer frames assert math.isclose(filter_temporal_width, int(filter_temporal_width)) filter_temporal_width = int(filter_temporal_width) dh = backend.linspace(0, aspect_ratio, hdim, endpoint=True) dv = backend.linspace(0, 1, vdim, endpoint=True) dt = backend.linspace(0, 1, filter_temporal_width, endpoint=False) # AN: Actually, `dt` should include endpoint. # Currently, the center of the filter width is +(1./fps)/2. # However, this would break backwards compatibility. # TODO: Allow for `dt_endpoint` as an argument # and set default to False. ihs, ivs = backend.meshgrid(dh, dv) fh = -spatial_freq*backend.cos(backend.asarray(direction/180.*backend.pi))*2*backend.pi fv = spatial_freq*backend.sin(backend.asarray(direction/180.*backend.pi))*2*backend.pi # normalize temporal frequency to wavelet size ft = backend.real(backend.asarray(temporal_freq*(filter_temporal_width/float(stimulus_fps))))*2*backend.pi # spatial filters spatial_gaussian = backend.exp(-((ihs - centerh)**2 + (ivs - centerv)**2)/(2*spatial_env**2)) spatial_grating_sin = backend.sin((ihs - centerh)*fh + (ivs - centerv)*fv + spatial_phase_offset) spatial_grating_cos = backend.cos((ihs - centerh)*fh + (ivs - centerv)*fv + spatial_phase_offset) spatial_gabor_sin = spatial_gaussian * spatial_grating_sin spatial_gabor_cos = spatial_gaussian * spatial_grating_cos ############################## temporal_gaussian = backend.exp(-(dt - 0.5)**2/(2*temporal_env**2)) temporal_grating_sin = backend.sin((dt - 0.5)*ft) temporal_grating_cos = backend.cos((dt - 0.5)*ft) temporal_gabor_sin = temporal_gaussian*temporal_grating_sin temporal_gabor_cos = temporal_gaussian*temporal_grating_cos return spatial_gabor_sin, spatial_gabor_cos, temporal_gabor_sin, temporal_gabor_cos
[docs] def generate_3dgabor_array(vhsize=(576,1024), stimulus_fps=24, aspect_ratio='auto', filter_temporal_width='auto', centerh=0.5, centerv=0.5, direction=45.0, spatial_freq=16.0, spatial_env=0.3, temporal_freq=2.0, temporal_env=0.3, phase_offset=0.0): ''' ''' vdim, hdim = vhsize if aspect_ratio == 'auto': aspect_ratio = hdim/float(vdim) if filter_temporal_width == 'auto': filter_temporal_width = int(stimulus_fps*(2/3.)) gabor_components = mk_3d_gabor(vhsize, stimulus_fps=stimulus_fps, aspect_ratio=aspect_ratio, filter_temporal_width=filter_temporal_width, centerh=centerh, centerv=centerv, direction=direction, spatial_freq=spatial_freq, spatial_env=spatial_env, temporal_freq=temporal_freq, temporal_env=temporal_env, phase_offset=phase_offset, ) gabor_video = mk_spatiotemporal_gabor(*gabor_components) return gabor_video
[docs] def dotspatial_frames(spatial_gabor_sin, spatial_gabor_cos, stimulus, masklimit=0.001): '''Dot the spatial gabor filters filter with the stimulus Parameters ---------- spatial_gabor_sin : np.array, (vdim,hdim) spatial_gabor_cos : np.array, (vdim,hdim) Spatial gabor quadrature pair stimulus : 2D np.array (nimages, vdim*hdim) The movie frames with the spatial dimension collapsed. masklimit : float-like Threshold to find the non-zero filter region Returns ------- channel_sin : np.ndarray, (nimages, ) channel_cos : np.ndarray, (nimages, ) The filter response to each stimulus The quadrature pair can be combined: (x^2 + y^2)^0.5 ''' backend = get_backend() gabors = backend.stack([spatial_gabor_sin.reshape(-1), spatial_gabor_cos.reshape(-1)]) # dot the gabors with the stimulus mask = backend.abs(gabors).sum(0) > masklimit gabor_prod = (gabors[:, mask] @ stimulus.T[mask]).T gabor_sin, gabor_cos = gabor_prod[:,0], gabor_prod[:,1] return gabor_sin, gabor_cos
[docs] def dotdelay_frames(spatial_gabor_sin, spatial_gabor_cos, temporal_gabor_sin, temporal_gabor_cos, stimulus, masklimit=0.001): '''Convolve the motion energy filter with a stimulus Parameters ---------- spatial_gabor_sin : np.array, (vdim,hdim) spatial_gabor_cos : np.array, (vdim,hdim) Spatial gabor quadrature pair temporal_gabor_sin : np.array, (temporal_filter_width,) temporal_gabor_cos : np.array, (temporal_filter_width,) Temporal gabor quadrature pair stimulus : 2D np.array (nimages, vdim*hdim) The movie frames with the spatial dimension collapsed. Returns ------- channel_sin : np.ndarray, (nimages, ) channel_cos : np.ndarray, (nimages, ) The filter response to the stimulus at each time point The quadrature pair can be combined: (x^2 + y^2)^0.5 ''' backend = get_backend() gabor_sin, gabor_cos = dotspatial_frames(spatial_gabor_sin, spatial_gabor_cos, stimulus, masklimit=masklimit) gabor_prod = backend.column_stack([gabor_sin, gabor_cos]) temporal_gabors = backend.stack([temporal_gabor_sin, temporal_gabor_cos]) # dot the product with the temporal gabors outs = gabor_prod[:, [0]] @ temporal_gabors[[1]] + gabor_prod[:, [1]] @ temporal_gabors[[0]] outc = -gabor_prod[:, [0]] @ temporal_gabors[[0]] + gabor_prod[:, [1]] @ temporal_gabors[[1]] # sum across delays nouts = backend.zeros_like(outs) noutc = backend.zeros_like(outc) tdxc = int(math.ceil(outs.shape[1]/2.0)) delays = range(outs.shape[1]) for ddx in delays: num = ddx - tdxc + 1 if num == 0: nouts[:, ddx] = outs[:,ddx] noutc[:, ddx] = outc[:,ddx] elif num > 0: nouts[num:, ddx] = outs[:-num,ddx] noutc[num:, ddx] = outc[:-num,ddx] elif num < 0: nouts[:num, ddx] = outs[abs(num):,ddx] noutc[:num, ddx] = outc[abs(num):,ddx] channel_sin = nouts.sum(-1) channel_cos = noutc.sum(-1) return channel_sin, channel_cos
[docs] def mk_spatiotemporal_gabor(spatial_gabor_sin, spatial_gabor_cos, temporal_gabor_sin, temporal_gabor_cos): '''Make 3D motion energy filter defined by the spatial and temporal gabors. Takes the output of :func:`mk_3d_gabor` and constructs the 3D filter. This is useful for visualization. Parameters ---------- spatial_gabor_sin : np.array, (vdim,hdim) spatial_gabor_cos : np.array, (vdim,hdim) Spatial gabor quadrature pair temporal_gabor_sin : np.array, (filter_temporal_width,) temporal_gabor_cos : np.array, (filter_temporal_width,) Temporal gabor quadrature pair Returns ------- motion_energy_filter : np.array, (vdim, hdim, filter_temporal_width) The motion energy filter ''' a = -spatial_gabor_sin.ravel()[...,None] @ temporal_gabor_sin[...,None].T b = spatial_gabor_cos.ravel()[...,None] @ temporal_gabor_cos[...,None].T x,y = spatial_gabor_sin.shape t = temporal_gabor_sin.shape[0] return (a+b).reshape(x,y,t)
[docs] def compute_spatial_gabor_responses(stimulus, aspect_ratio='auto', spatial_frequencies=[0,2,4,8,16,32], quadrature_combination=sqrt_sum_squares, output_nonlinearity=log_compress, dtype='float64', dozscore=True): """Compute the spatial gabor filters' response to each stimulus. Parameters ---------- stimulus : 3D np.array (n, vdim, hdim) The stimulus frames. spatial_frequencies : array-like The spatial frequencies to compute. The spatial envelope is determined by this. quadrature_combination : function, optional Specifies how to combine the channel reponses quadratures. The function must take the sin and cos as arguments in order. Defaults to: (sin^2 + cos^2)^1/2 output_nonlinearity : function, optional Passes the channels (after `quadrature_combination`) through a non-linearity. The function input is the (`n`,`nfilters`) array. Defaults to: ln(x + 1e-05) dozscore : bool, optional Whether to z-score the channel responses in time dtype : str or dtype Defaults to 'float64' Returns ------- filter_responses : np.array, (n, nfilters) """ backend = get_backend() nimages, vdim, hdim = stimulus.shape vhsize = (vdim, hdim) if aspect_ratio == 'auto': aspect_ratio = hdim/float(vdim) stimulus = stimulus.reshape(stimulus.shape[0], -1) parameter_names, gabor_parameters = mk_moten_pyramid_params( 1., # fps filter_temporal_width=1., aspect_ratio=aspect_ratio, temporal_frequencies=[0.], spatial_directions=[0.], spatial_frequencies=spatial_frequencies, ) ngabors = gabor_parameters.shape[0] filters = [{name : gabor_parameters[idx, pdx] for pdx, name \ in enumerate(parameter_names)} \ for idx in range(ngabors)] info = 'Computing responses for #%i filters across #%i images (aspect_ratio=%0.03f)' print(info%(len(gabor_parameters), nimages, aspect_ratio)) channels = backend.zeros((nimages, len(gabor_parameters)), dtype=dtype) for idx, gabor_param_dict in iterator_func(enumerate(filters), '%s.compute_spatial_gabor_responses'%__name__, total=len(gabor_parameters)): sgabor_sin, sgabor_cos, _, _ = mk_3d_gabor(vhsize, **gabor_param_dict) channel_sin, channel_cos = dotspatial_frames(sgabor_sin, sgabor_cos, stimulus) channel = quadrature_combination(channel_sin, channel_cos) channels[:, idx] = channel channels = output_nonlinearity(channels) if dozscore: from scipy.stats import zscore channels = backend.to_numpy(channels) channels = zscore(channels) return channels
[docs] def compute_filter_responses(stimulus, stimulus_fps, aspect_ratio='auto', filter_temporal_width='auto', quadrature_combination=sqrt_sum_squares, output_nonlinearity=log_compress, dozscore=True, dtype='float64', pyramid_parameters={}): """Compute the motion energy filters' response to the stimuli. Parameters ---------- stimulus : 3D np.array (n, vdim, hdim) The movie frames. stimulus_fps : scalar The temporal frequency of the stimulus aspect_ratio : bool, or scalar Defaults to hdim/vdim. Otherwise, pass as scalar filter_temporal_width : int, None The number of frames in one filter. Defaults to approximately 0.666[secs] (floor(stimulus_fps*(2/3))). quadrature_combination : function, optional Specifies how to combine the channel reponses quadratures. The function must take the sin and cos as arguments in order. Defaults to: (sin^2 + cos^2)^1/2 output_nonlinearity : function, optional Passes the channels (after `quadrature_combination`) through a non-linearity. The function input is the (`n`,`nfilters`) array. Defaults to: ln(x + 1e-05) dozscore : bool, optional Whether to z-score the channel responses in time dtype : str or dtype Defaults to 'float64' pyramid_parameters: dict See :func:`mk_moten_pyramid_params` for details on parameters specifiying a motion energy pyramid. Returns ------- filter_responses : np.array, (n, nfilters) """ backend = get_backend() nimages, vdim, hdim = stimulus.shape stimulus = stimulus.reshape(stimulus.shape[0], -1) vhsize = (vdim, hdim) if aspect_ratio == 'auto': aspect_ratio = hdim/float(vdim) if filter_temporal_width == 'auto': filter_temporal_width = int(stimulus_fps*(2./3.)) # pass parameters pkwargs = dict(aspect_ratio=aspect_ratio, filter_temporal_width=filter_temporal_width) pkwargs.update(**pyramid_parameters) parameter_names, gabor_parameters = mk_moten_pyramid_params(stimulus_fps, **pkwargs) ngabors = gabor_parameters.shape[0] filters = [{name : gabor_parameters[idx, pdx] for pdx, name \ in enumerate(parameter_names)} \ for idx in range(ngabors)] info = 'Computing responses for #%i filters across #%i images (aspect_ratio=%0.03f)' print(info%(len(gabor_parameters), nimages, aspect_ratio)) channels = backend.zeros((nimages, len(gabor_parameters)), dtype=dtype) for idx, gabor_param_dict in iterator_func(enumerate(filters), '%s.compute_filter_responses'%__name__, total=len(filters)): gabor = mk_3d_gabor(vhsize, **gabor_param_dict) gabor0, gabor90, tgabor0, tgabor90 = gabor channel_sin, channel_cos = dotdelay_frames(gabor0, gabor90, tgabor0, tgabor90, stimulus, ) channel = quadrature_combination(channel_sin, channel_cos) channels[:,idx] = channel channels = output_nonlinearity(channels) if dozscore: from scipy.stats import zscore channels = backend.to_numpy(channels) channels = zscore(channels) return channels
############################## # batched computation ##############################
[docs] def mk_3d_gabor_batched(vhsize, filters_batch): '''Build spatial and temporal gabor filter banks for a batch of filters. Vectorizes :func:`mk_3d_gabor` across multiple filters so that all gabor arrays are constructed with batched tensor operations instead of a Python for-loop. Parameters ---------- vhsize : tuple of ints, (vdim, hdim) filters_batch : list of dicts Each dict has the same keys accepted by :func:`mk_3d_gabor`. Returns ------- spatial_gabors_sin : array, (B, npixels) spatial_gabors_cos : array, (B, npixels) temporal_gabors_sin : array, (B, T) temporal_gabors_cos : array, (B, T) ''' backend = get_backend() B = len(filters_batch) vdim, hdim = vhsize npixels = vdim * hdim f0 = filters_batch[0] aspect_ratio = f0.get('aspect_ratio', 'auto') if aspect_ratio == 'auto': aspect_ratio = hdim / float(vdim) stimulus_fps = f0['stimulus_fps'] filter_temporal_width = int(f0['filter_temporal_width']) # Validate that shared parameters are consistent across the batch for f in filters_batch[1:]: assert int(f['filter_temporal_width']) == filter_temporal_width, \ "All filters in a batch must share the same filter_temporal_width" assert f['stimulus_fps'] == stimulus_fps, \ "All filters in a batch must share the same stimulus_fps" # Extract per-filter parameters as 1-D arrays centerh = backend.asarray([f['centerh'] for f in filters_batch]) centerv = backend.asarray([f['centerv'] for f in filters_batch]) direction = backend.asarray([f['direction'] for f in filters_batch]) spatial_freq = backend.asarray([f['spatial_freq'] for f in filters_batch]) spatial_env = backend.asarray([f['spatial_env'] for f in filters_batch]) temporal_freq = backend.asarray([f['temporal_freq'] for f in filters_batch]) temporal_env = backend.asarray([f['temporal_env'] for f in filters_batch]) spatial_phase_offset = backend.asarray( [f.get('spatial_phase_offset', 0.0) for f in filters_batch]) # Shared spatial grid -- (vdim, hdim) dh = backend.linspace(0, aspect_ratio, hdim, endpoint=True) dv = backend.linspace(0, 1, vdim, endpoint=True) ihs, ivs = backend.meshgrid(dh, dv) # (vdim, hdim) # Reshape for broadcasting: params → (B, 1, 1), grid → (1, vdim, hdim) dir_rad = direction / 180.0 * backend.pi fh = (-spatial_freq * backend.cos(dir_rad) * 2 * backend.pi).reshape(B, 1, 1) fv = (spatial_freq * backend.sin(dir_rad) * 2 * backend.pi).reshape(B, 1, 1) ch = centerh.reshape(B, 1, 1) cv = centerv.reshape(B, 1, 1) se = spatial_env.reshape(B, 1, 1) spo = spatial_phase_offset.reshape(B, 1, 1) ihs_3d = ihs.reshape(1, vdim, hdim) # broadcast over batch ivs_3d = ivs.reshape(1, vdim, hdim) dih = ihs_3d - ch # (B, vdim, hdim) div = ivs_3d - cv spatial_gaussian = backend.exp(-(dih ** 2 + div ** 2) / (2 * se ** 2)) phase = dih * fh + div * fv + spo spatial_gabors_sin = (spatial_gaussian * backend.sin(phase)).reshape(B, npixels) spatial_gabors_cos = (spatial_gaussian * backend.cos(phase)).reshape(B, npixels) # Temporal filters -- (B, T) dt = backend.linspace(0, 1, filter_temporal_width, endpoint=False) # (T,) ft = (temporal_freq * (filter_temporal_width / float(stimulus_fps)) * 2 * backend.pi).reshape(B, 1) te = temporal_env.reshape(B, 1) dt_3d = dt.reshape(1, filter_temporal_width) temporal_gaussian = backend.exp(-(dt_3d - 0.5) ** 2 / (2 * te ** 2)) temporal_gabors_sin = temporal_gaussian * backend.sin((dt_3d - 0.5) * ft) temporal_gabors_cos = temporal_gaussian * backend.cos((dt_3d - 0.5) * ft) return spatial_gabors_sin, spatial_gabors_cos, temporal_gabors_sin, temporal_gabors_cos
def _compute_temporal_pad(filters): '''Compute the number of padding frames needed for temporal batching. The temporal convolution shifts frames by up to ``filter_temporal_width // 2`` positions in either direction. To avoid edge artifacts when processing temporal batches, each batch is conservatively padded by ``filter_temporal_width - 1`` frames on each side. The filter width is negligible compared to typical stimulus durations, so the extra overlap imposes little overhead. Parameters ---------- filters : list of dicts Filter parameter dictionaries. Returns ------- pad : int Number of frames to pad on each side of a temporal batch. ''' # All filters in a pyramid share the same filter_temporal_width, # but compute the max across all filters to be safe. max_filter_temporal_width = 0 for f in filters: filter_temporal_width = f.get('filter_temporal_width', 'auto') if filter_temporal_width == 'auto': filter_temporal_width = int(f['stimulus_fps'] * (2 / 3.)) max_filter_temporal_width = max(max_filter_temporal_width, int(filter_temporal_width)) # The delay shifting in project_stimulus_batched makes the output at # frame t depend on input frames in [t - (T - tdxc), t + (tdxc - 1)], # where tdxc = ceil(T / 2), i.e. at most T // 2 frames away. Pad by # T - 1 frames to stay conservative; the extra overlap is negligible # compared to typical stimulus durations. return max_filter_temporal_width - 1 if max_filter_temporal_width > 0 else 0
[docs] def project_stimulus_batched(stimulus, filters, quadrature_combination=sqrt_sum_squares, output_nonlinearity=log_compress, vhsize=(), dtype='float32', batch_size=128, stimulus_batch_size=None, masklimit=0.001): '''Compute motion energy responses using batched operations. Functionally equivalent to :func:`project_stimulus` but constructs spatial and temporal gabor filter banks in batches and uses a single large matrix multiply per batch instead of per-filter dot products. This is significantly faster on GPU backends and can also be faster on CPU for large filter sets. Parameters ---------- stimulus : array, (nimages, vdim, hdim) or (nimages, npixels) The movie frames. filters : list of dicts Filter parameter dictionaries (as produced by a pyramid). quadrature_combination : callable, optional Defaults to ``sqrt_sum_squares``. output_nonlinearity : callable, optional Defaults to ``log_compress``. vhsize : tuple of ints ``(vdim, hdim)`` required when stimulus is 2-D. dtype : str Output dtype. batch_size : int Number of filters to process simultaneously. Larger values use more memory but reduce Python-loop overhead. stimulus_batch_size : int or None Number of stimulus frames to process at a time. When ``None`` (default), all frames are processed together, preserving the original behaviour. When set, the stimulus is split into overlapping temporal batches to reduce VRAM usage for long stimuli. The overlap is computed automatically from the temporal filter width to avoid edge artifacts. masklimit : float Threshold for zeroing near-zero gabor pixels. Matches the ``masklimit`` parameter of :func:`dotspatial_frames`. Returns ------- filter_responses : array, (nimages, nfilters) ''' if stimulus.ndim == 3: nimages, vdim, hdim = stimulus.shape # reshape with explicit sizes (not -1) so empty stimuli also work stimulus = stimulus.reshape(nimages, vdim * hdim) vhsize = (vdim, hdim) backend = get_backend() stimulus = backend.asarray(stimulus) assert stimulus.ndim == 2 assert isinstance(vhsize, tuple) and len(vhsize) == 2 assert vhsize[0] * vhsize[1] == stimulus.shape[1] if batch_size <= 0: raise ValueError(f"batch_size must be positive, got {batch_size}") if stimulus_batch_size is not None and stimulus_batch_size <= 0: raise ValueError( f"stimulus_batch_size must be positive or None, got {stimulus_batch_size}") nfilters = len(filters) nimages = stimulus.shape[0] # Determine temporal padding for stimulus batching if stimulus_batch_size is not None: temporal_pad = _compute_temporal_pad(filters) else: temporal_pad = 0 # Process all frames at once (max with 1 so that range() below # gets a nonzero step when the stimulus is empty). stimulus_batch_size = max(nimages, 1) filter_responses = backend.zeros((nimages, nfilters), dtype=dtype) # Iterate over filter batches first so that gabor banks are computed # once per batch and reused across all temporal chunks. for batch_start in range(0, nfilters, batch_size): batch_end = min(batch_start + batch_size, nfilters) batch_filters = filters[batch_start:batch_end] B = len(batch_filters) # Build gabor filter banks for this batch (computed once) sg_sin, sg_cos, tg_sin, tg_cos = mk_3d_gabor_batched(vhsize, batch_filters) # sg_sin: (B, npixels), tg_sin: (B, T) # Apply per-filter mask: zero out pixels where the gabor # amplitude is below threshold (matches dotspatial_frames # masklimit behaviour without breaking the batched matmul). gabor_mask = (backend.abs(sg_sin) + backend.abs(sg_cos)) > masklimit sg_sin = sg_sin * gabor_mask sg_cos = sg_cos * gabor_mask T = tg_sin.shape[1] tdxc = int(math.ceil(T / 2.0)) # Iterate over temporal batches of the stimulus for t_start in range(0, nimages, stimulus_batch_size): t_end = min(t_start + stimulus_batch_size, nimages) # Compute padded window boundaries pad_start = max(t_start - temporal_pad, 0) pad_end = min(t_end + temporal_pad, nimages) # Extract the padded stimulus chunk stim_chunk = stimulus[pad_start:pad_end] # (chunk_len, npixels) chunk_len = stim_chunk.shape[0] # Offsets to slice valid (non-padded) results from the chunk output valid_start = t_start - pad_start valid_end = valid_start + (t_end - t_start) # stimulus transpose for this chunk -- (npixels, chunk_len) stim_T = stim_chunk.T # Spatial dot product -- single matmul per batch # (B, npixels) @ (npixels, chunk_len) → (B, chunk_len) → transpose spatial_sin = (sg_sin @ stim_T).T # (chunk_len, B) spatial_cos = (sg_cos @ stim_T).T # (chunk_len, B) # Temporal convolution + delay shifting. channel_sin = backend.zeros((chunk_len, B), dtype=dtype) channel_cos = backend.zeros((chunk_len, B), dtype=dtype) for ddx in range(T): num = ddx - tdxc + 1 curr_outs = spatial_sin * tg_cos[:, ddx] + spatial_cos * tg_sin[:, ddx] curr_outc = -spatial_sin * tg_sin[:, ddx] + spatial_cos * tg_cos[:, ddx] if num == 0: channel_sin += curr_outs channel_cos += curr_outc elif num > 0: channel_sin[num:, :] += curr_outs[:-num, :] channel_cos[num:, :] += curr_outc[:-num, :] elif num < 0: channel_sin[:num, :] += curr_outs[abs(num):, :] channel_cos[:num, :] += curr_outc[abs(num):, :] # Extract only the valid (non-padded) portion channel_sin = channel_sin[valid_start:valid_end] channel_cos = channel_cos[valid_start:valid_end] channel_response = quadrature_combination(channel_sin, channel_cos) channel_response = output_nonlinearity(channel_response) filter_responses[t_start:t_end, batch_start:batch_end] = channel_response return filter_responses
[docs] def mk_moten_pyramid_params(stimulus_fps, filter_temporal_width='auto', aspect_ratio='auto', temporal_frequencies=[0,2,4], spatial_frequencies=[0,2,4,8,16,32], spatial_directions=[0,45,90,135,180,225,270,315], sf_gauss_ratio=0.6, max_spatial_env=0.3, gabor_spacing=3.5, tf_gauss_ratio=10., max_temp_env=0.3, spatial_phase_offset=0.0, include_edges=False, ): """Parametrize a motion energy pyramid that tiles the stimulus. Parameters ---------- stimulus_fps : scalar, [Hz] Stimulus playback speed in frames per second. spatial_frequencies : array-like, [cycles-per-image] Spatial frequencies for the filters spatial_directions : array-like, [degrees] Direction of filter motion. Degree position corresponds to standard unit-circle coordinates (i.e. 0=right, 180=left). temporal_frequencies : array-like, [Hz] Temporal frequencies of the filters filter_temporal_width : int Temporal window of the motion energy filter (e.g. 10). Defaults to approximately 0.666[secs] (`floor(stimulus_fps*(2/3))`). aspect_ratio : optional, 'auto' or float-like, Defaults to stimulus aspect ratio: hdim/vdim Useful for preserving the spatial gabors circular even when images have non-square aspect ratios. For example, a 16:9 image would have `aspect_ratio`=16/9. sf_gauss_ratio : scalar The ratio of spatial frequency to gaussian s.d. This controls the number of cycles in a filter max_spatial_env : scalar Defines the maximum s.d. of the gaussian gabor_spacing : scalar Defines the spacing between spatial gabors (in s.d. units) tf_gauss_ratio : scalar The ratio of temporal frequency to gaussian s.d. This controls the number of temporal cycles max_temp_env : scalar Defines the maximum s.d. of the temporal gaussian include_edges : bool Determines whether to include filters at the edge of the image which might be partially outside the stimulus field-of-view Returns ------- parameter_names : list of strings The name of the parameters gabor_parameters : 2D np.ndarray, (nfilters, 11) Parameters that define the motion energy filter Each of the `nfilters` has the following parameters: * centerv,centerh : y:vertical and x:horizontal position ('0,0' is top left) * direction : direction of motion [degrees] * spatial_freq : spatial frequency [cpi] * spatial_env : spatial envelope (gaussian s.d.) * temporal_freq : temporal frequency [Hz] * temporal_env : temporal envelope (gaussian s.d.) * filter_temporal_width : temporal window of filter [frames] * aspect_ratio : width/height * stimulus_fps : stimulus playback speed in frames per second * spatial_phase_offset : filter phase offset in [degrees] Notes ----- Same method as Nishimoto, et al., 2011. """ assert isinstance(aspect_ratio, (int, float)) or (hasattr(aspect_ratio, 'ndim')) def compute_envelope(freq, ratio): return float('inf') if freq == 0 else (1.0/freq)*ratio # mk_moten_pyramid_params always uses numpy for parameter construction # since it produces metadata (filter parameters), not computed signals. spatial_frequencies = np.asarray(spatial_frequencies) spatial_directions = np.asarray(spatial_directions) temporal_frequencies = np.asarray(temporal_frequencies) include_edges = int(include_edges) # We have to deal with zero frequency spatial filters differently include_local_dc = True if 0 in spatial_frequencies else False spatial_frequencies = np.asarray([t for t in spatial_frequencies if t != 0]) # add temporal envelope max params = list(itertools.product(spatial_frequencies, spatial_directions)) gabor_parameters = [] for spatial_freq, spatial_direction in params: spatial_env = min(compute_envelope(spatial_freq, sf_gauss_ratio), max_spatial_env) # compute the number of gaussians that will fit in the FOV vertical_space = np.floor(((1.0 - spatial_env*gabor_spacing)/(gabor_spacing*spatial_env))/2.0) horizontal_space = np.floor(((aspect_ratio - spatial_env*gabor_spacing)/(gabor_spacing*spatial_env))/2.0) # include the edges of screen? vertical_space = max(vertical_space, 0) + include_edges horizontal_space = max(horizontal_space, 0) + include_edges # get the spatial gabor locations ycenters = spatial_env*gabor_spacing*np.arange(-vertical_space, vertical_space+1) + 0.5 xcenters = spatial_env*gabor_spacing*np.arange(-horizontal_space, horizontal_space+1) + aspect_ratio/2. for ii, (cx, cy) in enumerate(itertools.product(xcenters,ycenters)): for temp_freq in temporal_frequencies: temp_env = min(compute_envelope(temp_freq, tf_gauss_ratio), max_temp_env) if temp_freq == 0 and spatial_direction >= 180: # 0Hz temporal filter doesn't have motion, so # 0 and 180 degrees orientations are the same filters continue gabor_parameters.append([cx, cy, spatial_direction, spatial_freq, spatial_env, temp_freq, temp_env, filter_temporal_width, aspect_ratio, stimulus_fps, spatial_phase_offset, ]) if spatial_direction == 0 and include_local_dc: # add local 0 spatial frequency non-directional temporal filter gabor_parameters.append([cx, cy, spatial_direction, 0., # zero spatial freq spatial_env, temp_freq, temp_env, filter_temporal_width, aspect_ratio, stimulus_fps, spatial_phase_offset, ]) parameter_names = ('centerh', 'centerv', 'direction', 'spatial_freq', 'spatial_env', 'temporal_freq', 'temporal_env', 'filter_temporal_width', 'aspect_ratio', 'stimulus_fps', 'spatial_phase_offset', ) gabor_parameters = np.asarray(gabor_parameters) return parameter_names, gabor_parameters