Source code for prysm.interferogram

"""tools to analyze interferometric data."""
import warnings

from .conf import config
from ._phase import OpticalPhase
from ._zernike import defocus
from .io import read_zygo_dat, read_zygo_datx
from .fttools import forward_ft_unit
from .coordinates import cart_to_polar, uniform_cart_to_polar
from .propagation import prop_pupil_plane_to_psf_plane
from .util import share_fig_ax
from .geometry import mcache

from prysm import mathops as m

[docs]class Interferogram(OpticalPhase): """Class containing logic and data for working with interferometric data.""" def __init__(self, phase, intensity=None, x=None, y=None, scale='px', phase_unit='nm', meta=None): if x is None: # assume x, y given together x = m.arange(phase.shape[1]) y = m.arange(phase.shape[0]) scale = 'px' self.lateral_res = 1 wvl = meta.get('wavelength', None) if wvl is None: wvl = meta.get('Wavelength') if wvl is not None: wvl *= 1e6 # m to um super().__init__(unit_x=x, unit_y=y, phase=phase, wavelength=wvl, phase_unit=phase_unit, spatial_unit=scale) self.xaxis_label = 'X' self.yaxis_label = 'Y' self.zaxis_label = 'Height' self.intensity = intensity self.meta = meta if scale != 'px': self.change_spatial_unit(to=scale, inplace=True) @property def dropout_percentage(self): """Percentage of pixels in the data that are invalid (NaN).""" return m.count_nonzero(~m.isfinite(self.phase)) / self.phase.size * 100 @property def pvr(self): """Peak-to-Valley residual. Notes ----- See: C. Evans, "Robust Estimation of PV for Optical Surface Specification and Testing" in Optical Fabrication and Testing, OSA Technical Digest (CD) (Optical Society of America, 2008), paper OWA4. """ from .fringezernike import fit, FringeZernike coefs, residual = fit(self.phase, terms=36, residual=True) fz = FringeZernike(coefs, samples=self.shape[0]) return fz.pv + 3 * residual
[docs] def fill(self, _with=0): """Fill invalid (NaN) values. Parameters ---------- _with : `float`, optional value to fill with Returns ------- `Interferogram` self """ nans = ~m.isfinite(self.phase) self.phase[nans] = _with return self
[docs] def crop(self): """Crop data to rectangle bounding non-NaN region.""" nans = m.isfinite(self.phase) nancols = m.any(nans, axis=0) nanrows = m.any(nans, axis=1) left, right = nanrows.argmax(), nanrows[::-1].argmax() top, bottom = nancols.argmax(), nancols[::-1].argmax() if left == right == top == bottom == 0: return self self.phase = self.phase[left:-right, top:-bottom] self.unit_y, self.unit_x = self.unit_y[left:-right], self.unit_x[top:-bottom] self.unit_x -= self.unit_x[0] self.unit_y -= self.unit_y[0] return self
[docs] def remove_piston(self): """Remove piston from the data by subtracting the mean value.""" self.phase -= self.phase[m.isfinite(self.phase)].mean() return self
[docs] def remove_tiptilt(self): """Remove tip/tilt from the data by least squares fitting and subtracting a plane.""" plane = fit_plane(self.unit_x, self.unit_y, self.phase) self.phase -= plane return self
[docs] def remove_power(self): """Remove power from the data by least squares fitting.""" sphere = fit_sphere(self.phase) self.phase -= sphere return self
[docs] def remove_piston_tiptilt(self): """Remove piston/tip/tilt from the data, see remove_tiptilt and remove_piston.""" self.remove_piston() self.remove_tiptilt() return self
[docs] def remove_piston_tiptilt_power(self): """Remove piston/tip/tilt/power from the data.""" self.remove_piston() self.remove_piston_tiptilt() self.remove_power() return self
[docs] def mask(self, shape=None, diameter=0, mask=None): """Mask the signal. The mask will be inscribed in the axis with fewer pixels. I.e., for a interferogram with 1280x1000 pixels, the mask will be 1000x1000 at largest. Parameters ---------- shape : `str` valid shape from prysm.geometry diameter : `float` diameter of the mask, in self.spatial_units mask : `numpy.ndarray` user-provided mask Returns ------- self modified Interferogram instance. """ if shape is None and mask is None: raise ValueError('must provide either a shape or a mask') if mask is None: mask = mcache(shape, min(self.shape), radius=diameter / min(self.diameter_x, self.diameter_y)) base = m.zeros(self.shape, dtype=config.precision) difference = abs(self.shape[0] - self.shape[1]) l, u = int(m.floor(difference / 2)), int(m.ceil(difference / 2)) if u is 0: # guard against nocrop scenario _slice = slice(None) else: _slice = slice(l, -u) if self.shape[0] < self.shape[1]: base[:, _slice] = mask else: base[_slice, :] = mask mask = base hitpts = mask == 0 self.phase[hitpts] = m.nan return self
[docs] def latcal(self, plate_scale, unit='mm'): """Perform lateral calibration. This probably won't do what you want if your data already has spatial units of anything but pixels (px). Parameters ---------- plate_scale : `float` center-to-center sample spacing of pixels, in (unit)s. unit : `str`, optional unit associated with the plate scale. Returns ------- self modified `Interferogram` instance. """ self.change_spatial_unit(to=unit, inplace=True) # will be 0..n spatial units # sloppy to do this here... self.unit_x *= plate_scale self.unit_y *= plate_scale return self
[docs] def spike_clip(self, nsigma=3): """Clip points in the data that exceed a certain multiple of the standard deviation. Parameters ---------- nsigma : `float` number of standard deviations to keep Returns ------- self this Interferogram instance. """ pts_over_nsigma = abs(self.phase) > nsigma * self.std self.phase[pts_over_nsigma] = m.nan return self
[docs] def psd(self): """Power spectral density of the data., units (self.phase_unit^2)/((cy/self.spatial_unit)^2). Returns ------- unit_x : `numpy.ndarray` ordinate x frequency axis unit_y : `numpy.ndarray` ordinate y frequency axis psd : `numpy.ndarray` power spectral density """ return psd(self.phase, self.sample_spacing)
[docs] def bandlimited_rms(self, wllow=None, wlhigh=None, flow=None, fhigh=None): """Calculate the bandlimited RMS of a signal from its PSD. Parameters ---------- wllow : `float` short spatial scale wlhigh : `float` long spatial scale flow : `float` low frequency fhigh : `float` high frequency Returns ------- `float` band-limited RMS value. """ return bandlimited_rms(*self.psd(), wllow=wllow, wlhigh=wlhigh, flow=flow, fhigh=fhigh)
[docs] def total_integrated_scatter(self, wavelength, incident_angle=0): """Calculate the total integrated scatter (TIS) for an angle or angles. Parameters ---------- wavelength : `float` wavelength of light in microns. incident_angle : `float` or `numpy.ndarray` incident angle(s) of light. Returns ------- `float` or `numpy.ndarray` TIS value. """ if self.spatial_unit != 'μm': raise ValueError('Use microns for spatial unit when evaluating TIS.') upper_limit = 1 / wavelength kernel = 4 * m.pi * m.cos(m.radians(incident_angle)) kernel *= self.bandlimited_rms(upper_limit, None) / wavelength return 1 - m.exp(-kernel**2)
[docs] def psd_xy_avg(self): """Power spectral density of the data., units (self.phase_unit^2)/((cy/self.spatial_unit)^2). Returns ------- `dict` with keys x, y, avg. Each containing a tuple of (unit, psd) """ x, y, _psd = self.psd() lx, ly = len(x)//2, len(y)//2 rho, phi, _psdrp = uniform_cart_to_polar(x, y, _psd) return { 'x': (x[lx:], _psd[ly, lx:]), 'y': (y[ly:], _psd[ly:, lx]), 'avg': (rho, _psdrp.mean(axis=0)), }
[docs] def plot_psd2d(self, axlim=None, clim=(1e-9, 1e2), interp_method='lanczos', fig=None, ax=None): """Plot the two dimensional PSD. Parameters ---------- axlim : `float`, optional symmetrical axis limit power : `float`, optional inverse of power to stretch image by interp_method : `str`, optional method used to interpolate the image, passed directly to matplotlib imshow fig : `matplotlib.figure.Figure` Figure containing the plot ax : `matplotlib.axes.Axis` Axis containing the plot Returns ------- fig : `matplotlib.figure.Figure` Figure containing the plot ax : `matplotlib.axes.Axis` Axis containing the plot """ from matplotlib import colors x, y, psd = self.psd() if axlim is None: lims = (None, None) else: lims = (-axlim, axlim) fig, ax = share_fig_ax(fig, ax) im = ax.imshow(psd, extent=[x[0], x[-1], y[0], y[-1]], origin='lower', cmap='Greys_r', norm=colors.LogNorm(*clim), interpolation=interp_method) ax.set(xlim=lims, xlabel=r'$\nu_x$' + f' [cy/{self.spatial_unit}]', ylim=lims, ylabel=r'$\nu_y$' + f' [cy/{self.spatial_unit}]') cb = fig.colorbar(im, label='PSD [' + self.phase_unit + r'$^2$' + f'/(cy/{self.spatial_unit})]', ax=ax, fraction=0.046, extend='both') cb.outline.set_edgecolor('k') cb.outline.set_linewidth(0.5) return fig, ax
[docs] def plot_psd_xy_avg(self, a=None, b=None, c=None, lw=3, xlim=None, ylim=None, fig=None, ax=None): """Plot the x, y, and average PSD on a linear x axis. Parameters ---------- a : `float`, optional a coefficient of Lorentzian PSD model plotted alongside data b : `float`, optional b coefficient of Lorentzian PSD model plotted alongside data c : `float`, optional c coefficient of Lorentzian PSD model plotted alongside data lw : `float`, optional linewidth provided directly to matplotlib xlim : `tuple`, optional len 2 tuple of low, high x axis limits ylim : `tuple`, optional len 2 tuple of low, high y axis limits fig : `matplotlib.figure.Figure` Figure containing the plot ax : `matplotlib.axes.Axis` Axis containing the plot Returns ------- fig : `matplotlib.figure.Figure` Figure containing the plot ax : `matplotlib.axes.Axis` Axis containing the plot Notes ----- if a, b given but not c, an AB / inverse power model will be used for the PSD. If a, b, c are given the Lorentzian model will be used. """ xyavg = self.psd_xy_avg() x, px = xyavg['x'] y, py = xyavg['y'] r, pr = xyavg['avg'] fig, ax = share_fig_ax(fig, ax) ax.loglog(x, px, lw=lw, label='x', alpha=0.4) ax.loglog(y, py, lw=lw, label='y', alpha=0.4) ax.loglog(r, pr, lw=lw*1.5, label='avg') if a is not None: if c is not None: requirement = abc_psd(a=a, b=b, c=c, nu=r) else: requirement = ab_psd(a=a, b=b, nu=r) ax.loglog(r, requirement, c='k', lw=lw*2) ax.legend(title='Orientation') ax.set(xlim=xlim, xlabel=f'Spatial Frequency [cy/{self.spatial_unit}]', ylim=ylim, ylabel=r'PSD [nm$^2$/' + f'(cy/{self.spatial_unit})$^2$]') return fig, ax
[docs] @staticmethod def from_zygo_dat(path, multi_intensity_action='first', scale='mm'): """Create a new interferogram from a zygo dat file. Parameters ---------- path : path_like path to a zygo dat file multi_intensity_action : str, optional see `io.read_zygo_dat` scale : `str`, optional, {'um', 'mm'} what xy scale to label the data with, microns or mm Returns ------- `Interferogram` new Interferogram instance """ if str(path).endswith('datx'): # datx file, use datx reader zydat = read_zygo_datx(path) res = zydat['meta']['Lateral Resolution'] else: # dat file, use dat file reader zydat = read_zygo_dat(path, multi_intensity_action=multi_intensity_action) res = zydat['meta']['lateral_resolution'] # meters phase = zydat['phase'] if res == 0.0: res = 1 scale = 'px' if scale != 'px': _scale = 'm' else: _scale = 'px' i = Interferogram(phase=phase, intensity=zydat['intensity'], x=m.arange(phase.shape[1]) * res, y=m.arange(phase.shape[0]) * res, scale=_scale, meta=zydat['meta']) return i.change_spatial_unit(to=scale.lower(), inplace=True)
def fit_plane(x, y, z): xx, yy = m.meshgrid(x, y) pts = m.isfinite(z) xx_, yy_ = xx[pts].flatten(), yy[pts].flatten() flat = m.ones(xx_.shape) coefs = m.lstsq(m.stack([xx_, yy_, flat]).T, z[pts].flatten(), rcond=None)[0] plane_fit = coefs[0] * xx + coefs[1] * yy + coefs[2] return plane_fit def fit_sphere(z): x, y = m.linspace(-1, 1, z.shape[1]), m.linspace(-1, 1, z.shape[0]) xx, yy = m.meshgrid(x, y) pts = m.isfinite(z) xx_, yy_ = xx[pts].flatten(), yy[pts].flatten() rho, phi = cart_to_polar(xx_, yy_) focus = defocus(rho, phi) coefs = m.lstsq(m.stack([focus, m.ones(focus.shape)]).T, z[pts].flatten(), rcond=None)[0] rho, phi = cart_to_polar(xx, yy) sphere = defocus(rho, phi) * coefs[0] return sphere def make_window(signal, sample_spacing, which='welch'): """Generates a window function to be used in PSD analysis. Parameters ---------- signal : `numpy.ndarray` signal or phase data sample_spacing : `float` spacing of samples in the input data which : `str,` {'welch', 'hann', 'auto'}, optional which window to produce. If auto, attempts to guess the appropriate window based on the input signal Notes ----- For 2D welch, see: Power Spectral Density Specification and Analysis of Large Optical Surfaces E. Sidick, JPL Returns ------- `numpy.ndarray` window array """ s = signal.shape if which is None: # attempt to guess best window ysamples = int(round(s[0] * 0.02, 0)) xsamples = int(round(s[1] * 0.02, 0)) corner1 = signal[:ysamples, :xsamples] == 0 corner2 = signal[-ysamples:, :xsamples] == 0 corner3 = signal[:ysamples, -xsamples:] == 0 corner4 = signal[-ysamples:, -xsamples:] == 0 if corner1.all() and corner2.all() and corner3.all() and corner4.all(): # four corners all "black" -- circular data, Welch window is best # looks wrong but 2D welch takes x, y while indices are y, x y = m.arange(s[1]) * sample_spacing x = m.arange(s[0]) * sample_spacing return window_2d_welch(y, x) else: # if not circular, square data; use Hanning window y = m.hanning(s[0]) x = m.hanning(s[1]) return m.outer(y, x) else: if type(which) is str: # known window type wl = which.lower() if wl == 'welch': y = m.arange(s[1]) * sample_spacing x = m.arange(s[0]) * sample_spacing return window_2d_welch(y, x) elif wl in ('hann', 'hanning'): y = m.hanning(s[0]) x = m.hanning(s[1]) return m.outer(y, x) else: raise ValueError('unknown window type') else: return which # window provided as ndarray def psd(height, sample_spacing, window=None): """Compute the power spectral density of a signal. Parameters ---------- height : `numpy.ndarray` height or phase data sample_spacing : `float` spacing of samples in the input data window : {'welch', 'hann'} or ndarray, optional Returns ------- unit_x : `numpy.ndarray` ordinate x frequency axis unit_y : `numpy.ndarray` ordinate y frequency axis psd : `numpy.ndarray` power spectral density Notes ----- See GH_FFT for a rigorous treatment of FFT scalings """ window = make_window(height, sample_spacing, window) fft = m.ifftshift(m.fft2(m.fftshift(height * window))) psd = abs(fft)**2 # mag squared first as per GH_FFT fs = 1 / sample_spacing S2 = (window**2).sum() coef = S2 * fs * fs psd /= coef ux = forward_ft_unit(sample_spacing, height.shape[1]) uy = forward_ft_unit(sample_spacing, height.shape[0]) return ux, uy, psd def bandlimited_rms(ux, uy, psd, wllow=None, wlhigh=None, flow=None, fhigh=None): """Calculate the bandlimited RMS of a signal from its PSD. Parameters ---------- ux : `numpy.ndarray` x spatial frequencies uy : `numpy.ndarray` y spatial frequencies psd : `numpy.ndarray` power spectral density wllow : `float` short spatial scale wlhigh : `float` long spatial scale flow : `float` low frequency fhigh : `float` high frequency Returns ------- `float` band-limited RMS value. """ if wllow is not None or wlhigh is not None: # spatial period given if wllow is None: flow = 0 else: fhigh = 1 / wllow if wlhigh is None: fhigh = max(ux[-1], uy[-1]) else: flow = 1 / wlhigh elif flow is not None or fhigh is not None: # spatial frequency given if flow is None: flow = 0 if fhigh is None: fhigh = max(ux[-1], uy[-1]) else: raise ValueError('must specify either period (wavelength) or frequency') ux2, uy2 = m.meshgrid(ux, uy) r, p = cart_to_polar(ux2, uy2) if flow is None: warnings.warn('no lower limit given, using 0 for low frequency') flow = 0 if fhigh is None: warnings.warn('no upper limit given, using limit imposed by data.') fhigh = r.max() work = psd.copy() work[r < flow] = 0 work[r > fhigh] = 0 first = m.trapz(work, uy, axis=0) second = m.trapz(first, ux, axis=0) return m.sqrt(second) def window_2d_welch(x, y, alpha=8): """Return a 2D welch window for a given alpha. Parameters ---------- x : `numpy.ndarray` x values, 1D array y : `numpy.ndarray` y values, 1D array alpha : `float` alpha (edge roll) parameter Returns ------- `numpy.ndarray` window """ xx, yy = m.meshgrid(x, y) r, _ = cart_to_polar(xx, yy) rmax = m.sqrt(x.max()**2 + y.max()**2) window = 1 - abs(r/rmax)**alpha return window def abc_psd(nu, a, b, c): """Lorentzian model of a Power Spectral Density. Parameters ---------- nu : `numpy.ndarray` or `float` spatial frequency a : `float` a coefficient b : `float` b coefficient c : `float` c coefficient Returns ------- `numpy.ndarray` value of PSD model """ return a / (1 + (nu/b)**2)**(c/2) def ab_psd(nu, a, b): """inverse power model of a Power Spectral Density. Parameters ---------- nu : `numpy.ndarray` or `float` spatial frequency a : `float` a coefficient b : `float` b coefficient Returns ------- `numpy.ndarray` value of PSD model """ return a * nu ** (-b)