"""tools to analyze interferometric data."""
import warnings
from .conf import config
from ._phase import OpticalPhase
from ._zernike import defocus
from .io import read_zygo_dat, read_zygo_datx
from .fttools import forward_ft_unit
from .coordinates import cart_to_polar, uniform_cart_to_polar
from .propagation import prop_pupil_plane_to_psf_plane
from .util import share_fig_ax
from .geometry import mcache
from prysm import mathops as m
[docs]class Interferogram(OpticalPhase):
"""Class containing logic and data for working with interferometric data."""
def __init__(self, phase, intensity=None, x=None, y=None, scale='px', phase_unit='nm', meta=None):
if x is None: # assume x, y given together
x = m.arange(phase.shape[1])
y = m.arange(phase.shape[0])
scale = 'px'
self.lateral_res = 1
wvl = meta.get('wavelength', None)
if wvl is None:
wvl = meta.get('Wavelength')
if wvl is not None:
wvl *= 1e6 # m to um
super().__init__(unit_x=x, unit_y=y, phase=phase,
wavelength=wvl, phase_unit=phase_unit,
spatial_unit=scale)
self.xaxis_label = 'X'
self.yaxis_label = 'Y'
self.zaxis_label = 'Height'
self.intensity = intensity
self.meta = meta
if scale != 'px':
self.change_spatial_unit(to=scale, inplace=True)
@property
def dropout_percentage(self):
"""Percentage of pixels in the data that are invalid (NaN)."""
return m.count_nonzero(~m.isfinite(self.phase)) / self.phase.size * 100
@property
def pvr(self):
"""Peak-to-Valley residual.
Notes
-----
See:
C. Evans, "Robust Estimation of PV for Optical Surface Specification and Testing"
in Optical Fabrication and Testing, OSA Technical Digest (CD)
(Optical Society of America, 2008), paper OWA4.
http://www.opticsinfobase.org/abstract.cfm?URI=OFT-2008-OWA4
"""
from .fringezernike import fit, FringeZernike
coefs, residual = fit(self.phase, terms=36, residual=True)
fz = FringeZernike(coefs, samples=self.shape[0])
return fz.pv + 3 * residual
[docs] def fill(self, _with=0):
"""Fill invalid (NaN) values.
Parameters
----------
_with : `float`, optional
value to fill with
Returns
-------
`Interferogram`
self
"""
nans = ~m.isfinite(self.phase)
self.phase[nans] = _with
return self
[docs] def crop(self):
"""Crop data to rectangle bounding non-NaN region."""
nans = m.isfinite(self.phase)
nancols = m.any(nans, axis=0)
nanrows = m.any(nans, axis=1)
left, right = nanrows.argmax(), nanrows[::-1].argmax()
top, bottom = nancols.argmax(), nancols[::-1].argmax()
if left == right == top == bottom == 0:
return self
self.phase = self.phase[left:-right, top:-bottom]
self.unit_y, self.unit_x = self.unit_y[left:-right], self.unit_x[top:-bottom]
self.unit_x -= self.unit_x[0]
self.unit_y -= self.unit_y[0]
return self
[docs] def remove_piston(self):
"""Remove piston from the data by subtracting the mean value."""
self.phase -= self.phase[m.isfinite(self.phase)].mean()
return self
[docs] def remove_tiptilt(self):
"""Remove tip/tilt from the data by least squares fitting and subtracting a plane."""
plane = fit_plane(self.unit_x, self.unit_y, self.phase)
self.phase -= plane
return self
[docs] def remove_power(self):
"""Remove power from the data by least squares fitting."""
sphere = fit_sphere(self.phase)
self.phase -= sphere
return self
[docs] def remove_piston_tiptilt(self):
"""Remove piston/tip/tilt from the data, see remove_tiptilt and remove_piston."""
self.remove_piston()
self.remove_tiptilt()
return self
[docs] def remove_piston_tiptilt_power(self):
"""Remove piston/tip/tilt/power from the data."""
self.remove_piston()
self.remove_piston_tiptilt()
self.remove_power()
return self
[docs] def mask(self, shape=None, diameter=0, mask=None):
"""Mask the signal.
The mask will be inscribed in the axis with fewer pixels. I.e., for
a interferogram with 1280x1000 pixels, the mask will be 1000x1000 at
largest.
Parameters
----------
shape : `str`
valid shape from prysm.geometry
diameter : `float`
diameter of the mask, in self.spatial_units
mask : `numpy.ndarray`
user-provided mask
Returns
-------
self
modified Interferogram instance.
"""
if shape is None and mask is None:
raise ValueError('must provide either a shape or a mask')
if mask is None:
mask = mcache(shape, min(self.shape), radius=diameter / min(self.diameter_x, self.diameter_y))
base = m.zeros(self.shape, dtype=config.precision)
difference = abs(self.shape[0] - self.shape[1])
l, u = int(m.floor(difference / 2)), int(m.ceil(difference / 2))
if u is 0: # guard against nocrop scenario
_slice = slice(None)
else:
_slice = slice(l, -u)
if self.shape[0] < self.shape[1]:
base[:, _slice] = mask
else:
base[_slice, :] = mask
mask = base
hitpts = mask == 0
self.phase[hitpts] = m.nan
return self
[docs] def latcal(self, plate_scale, unit='mm'):
"""Perform lateral calibration.
This probably won't do what you want if your data already has spatial
units of anything but pixels (px).
Parameters
----------
plate_scale : `float`
center-to-center sample spacing of pixels, in (unit)s.
unit : `str`, optional
unit associated with the plate scale.
Returns
-------
self
modified `Interferogram` instance.
"""
self.change_spatial_unit(to=unit, inplace=True) # will be 0..n spatial units
# sloppy to do this here...
self.unit_x *= plate_scale
self.unit_y *= plate_scale
return self
[docs] def spike_clip(self, nsigma=3):
"""Clip points in the data that exceed a certain multiple of the standard deviation.
Parameters
----------
nsigma : `float`
number of standard deviations to keep
Returns
-------
self
this Interferogram instance.
"""
pts_over_nsigma = abs(self.phase) > nsigma * self.std
self.phase[pts_over_nsigma] = m.nan
return self
[docs] def psd(self):
"""Power spectral density of the data., units (self.phase_unit^2)/((cy/self.spatial_unit)^2).
Returns
-------
unit_x : `numpy.ndarray`
ordinate x frequency axis
unit_y : `numpy.ndarray`
ordinate y frequency axis
psd : `numpy.ndarray`
power spectral density
"""
return psd(self.phase, self.sample_spacing)
[docs] def bandlimited_rms(self, wllow=None, wlhigh=None, flow=None, fhigh=None):
"""Calculate the bandlimited RMS of a signal from its PSD.
Parameters
----------
wllow : `float`
short spatial scale
wlhigh : `float`
long spatial scale
flow : `float`
low frequency
fhigh : `float`
high frequency
Returns
-------
`float`
band-limited RMS value.
"""
return bandlimited_rms(*self.psd(),
wllow=wllow,
wlhigh=wlhigh,
flow=flow,
fhigh=fhigh)
[docs] def total_integrated_scatter(self, wavelength, incident_angle=0):
"""Calculate the total integrated scatter (TIS) for an angle or angles.
Parameters
----------
wavelength : `float`
wavelength of light in microns.
incident_angle : `float` or `numpy.ndarray`
incident angle(s) of light.
Returns
-------
`float` or `numpy.ndarray`
TIS value.
"""
if self.spatial_unit != 'μm':
raise ValueError('Use microns for spatial unit when evaluating TIS.')
upper_limit = 1 / wavelength
kernel = 4 * m.pi * m.cos(m.radians(incident_angle))
kernel *= self.bandlimited_rms(upper_limit, None) / wavelength
return 1 - m.exp(-kernel**2)
[docs] def psd_xy_avg(self):
"""Power spectral density of the data., units (self.phase_unit^2)/((cy/self.spatial_unit)^2).
Returns
-------
`dict`
with keys x, y, avg. Each containing a tuple of (unit, psd)
"""
x, y, _psd = self.psd()
lx, ly = len(x)//2, len(y)//2
rho, phi, _psdrp = uniform_cart_to_polar(x, y, _psd)
return {
'x': (x[lx:], _psd[ly, lx:]),
'y': (y[ly:], _psd[ly:, lx]),
'avg': (rho, _psdrp.mean(axis=0)),
}
[docs] def plot_psd2d(self, axlim=None, clim=(1e-9, 1e2), interp_method='lanczos', fig=None, ax=None):
"""Plot the two dimensional PSD.
Parameters
----------
axlim : `float`, optional
symmetrical axis limit
power : `float`, optional
inverse of power to stretch image by
interp_method : `str`, optional
method used to interpolate the image, passed directly to matplotlib
imshow
fig : `matplotlib.figure.Figure`
Figure containing the plot
ax : `matplotlib.axes.Axis`
Axis containing the plot
Returns
-------
fig : `matplotlib.figure.Figure`
Figure containing the plot
ax : `matplotlib.axes.Axis`
Axis containing the plot
"""
from matplotlib import colors
x, y, psd = self.psd()
if axlim is None:
lims = (None, None)
else:
lims = (-axlim, axlim)
fig, ax = share_fig_ax(fig, ax)
im = ax.imshow(psd,
extent=[x[0], x[-1], y[0], y[-1]],
origin='lower',
cmap='Greys_r',
norm=colors.LogNorm(*clim),
interpolation=interp_method)
ax.set(xlim=lims, xlabel=r'$\nu_x$' + f' [cy/{self.spatial_unit}]',
ylim=lims, ylabel=r'$\nu_y$' + f' [cy/{self.spatial_unit}]')
cb = fig.colorbar(im,
label='PSD [' + self.phase_unit + r'$^2$' + f'/(cy/{self.spatial_unit})]',
ax=ax, fraction=0.046, extend='both')
cb.outline.set_edgecolor('k')
cb.outline.set_linewidth(0.5)
return fig, ax
[docs] def plot_psd_xy_avg(self, a=None, b=None, c=None, lw=3,
xlim=None, ylim=None, fig=None, ax=None):
"""Plot the x, y, and average PSD on a linear x axis.
Parameters
----------
a : `float`, optional
a coefficient of Lorentzian PSD model plotted alongside data
b : `float`, optional
b coefficient of Lorentzian PSD model plotted alongside data
c : `float`, optional
c coefficient of Lorentzian PSD model plotted alongside data
lw : `float`, optional
linewidth provided directly to matplotlib
xlim : `tuple`, optional
len 2 tuple of low, high x axis limits
ylim : `tuple`, optional
len 2 tuple of low, high y axis limits
fig : `matplotlib.figure.Figure`
Figure containing the plot
ax : `matplotlib.axes.Axis`
Axis containing the plot
Returns
-------
fig : `matplotlib.figure.Figure`
Figure containing the plot
ax : `matplotlib.axes.Axis`
Axis containing the plot
Notes
-----
if a, b given but not c, an AB / inverse power model will be used for the PSD.
If a, b, c are given the Lorentzian model will be used.
"""
xyavg = self.psd_xy_avg()
x, px = xyavg['x']
y, py = xyavg['y']
r, pr = xyavg['avg']
fig, ax = share_fig_ax(fig, ax)
ax.loglog(x, px, lw=lw, label='x', alpha=0.4)
ax.loglog(y, py, lw=lw, label='y', alpha=0.4)
ax.loglog(r, pr, lw=lw*1.5, label='avg')
if a is not None:
if c is not None:
requirement = abc_psd(a=a, b=b, c=c, nu=r)
else:
requirement = ab_psd(a=a, b=b, nu=r)
ax.loglog(r, requirement, c='k', lw=lw*2)
ax.legend(title='Orientation')
ax.set(xlim=xlim, xlabel=f'Spatial Frequency [cy/{self.spatial_unit}]',
ylim=ylim, ylabel=r'PSD [nm$^2$/' + f'(cy/{self.spatial_unit})$^2$]')
return fig, ax
[docs] @staticmethod
def from_zygo_dat(path, multi_intensity_action='first', scale='mm'):
"""Create a new interferogram from a zygo dat file.
Parameters
----------
path : path_like
path to a zygo dat file
multi_intensity_action : str, optional
see `io.read_zygo_dat`
scale : `str`, optional, {'um', 'mm'}
what xy scale to label the data with, microns or mm
Returns
-------
`Interferogram`
new Interferogram instance
"""
if str(path).endswith('datx'):
# datx file, use datx reader
zydat = read_zygo_datx(path)
res = zydat['meta']['Lateral Resolution']
else:
# dat file, use dat file reader
zydat = read_zygo_dat(path, multi_intensity_action=multi_intensity_action)
res = zydat['meta']['lateral_resolution'] # meters
phase = zydat['phase']
if res == 0.0:
res = 1
scale = 'px'
if scale != 'px':
_scale = 'm'
else:
_scale = 'px'
i = Interferogram(phase=phase, intensity=zydat['intensity'],
x=m.arange(phase.shape[1]) * res, y=m.arange(phase.shape[0]) * res,
scale=_scale, meta=zydat['meta'])
return i.change_spatial_unit(to=scale.lower(), inplace=True)
def fit_plane(x, y, z):
xx, yy = m.meshgrid(x, y)
pts = m.isfinite(z)
xx_, yy_ = xx[pts].flatten(), yy[pts].flatten()
flat = m.ones(xx_.shape)
coefs = m.lstsq(m.stack([xx_, yy_, flat]).T, z[pts].flatten(), rcond=None)[0]
plane_fit = coefs[0] * xx + coefs[1] * yy + coefs[2]
return plane_fit
def fit_sphere(z):
x, y = m.linspace(-1, 1, z.shape[1]), m.linspace(-1, 1, z.shape[0])
xx, yy = m.meshgrid(x, y)
pts = m.isfinite(z)
xx_, yy_ = xx[pts].flatten(), yy[pts].flatten()
rho, phi = cart_to_polar(xx_, yy_)
focus = defocus(rho, phi)
coefs = m.lstsq(m.stack([focus, m.ones(focus.shape)]).T, z[pts].flatten(), rcond=None)[0]
rho, phi = cart_to_polar(xx, yy)
sphere = defocus(rho, phi) * coefs[0]
return sphere
def make_window(signal, sample_spacing, which='welch'):
"""Generates a window function to be used in PSD analysis.
Parameters
----------
signal : `numpy.ndarray`
signal or phase data
sample_spacing : `float`
spacing of samples in the input data
which : `str,` {'welch', 'hann', 'auto'}, optional
which window to produce. If auto, attempts to guess the appropriate
window based on the input signal
Notes
-----
For 2D welch, see:
Power Spectral Density Specification and Analysis of Large Optical Surfaces
E. Sidick, JPL
Returns
-------
`numpy.ndarray`
window array
"""
s = signal.shape
if which is None:
# attempt to guess best window
ysamples = int(round(s[0] * 0.02, 0))
xsamples = int(round(s[1] * 0.02, 0))
corner1 = signal[:ysamples, :xsamples] == 0
corner2 = signal[-ysamples:, :xsamples] == 0
corner3 = signal[:ysamples, -xsamples:] == 0
corner4 = signal[-ysamples:, -xsamples:] == 0
if corner1.all() and corner2.all() and corner3.all() and corner4.all():
# four corners all "black" -- circular data, Welch window is best
# looks wrong but 2D welch takes x, y while indices are y, x
y = m.arange(s[1]) * sample_spacing
x = m.arange(s[0]) * sample_spacing
return window_2d_welch(y, x)
else:
# if not circular, square data; use Hanning window
y = m.hanning(s[0])
x = m.hanning(s[1])
return m.outer(y, x)
else:
if type(which) is str:
# known window type
wl = which.lower()
if wl == 'welch':
y = m.arange(s[1]) * sample_spacing
x = m.arange(s[0]) * sample_spacing
return window_2d_welch(y, x)
elif wl in ('hann', 'hanning'):
y = m.hanning(s[0])
x = m.hanning(s[1])
return m.outer(y, x)
else:
raise ValueError('unknown window type')
else:
return which # window provided as ndarray
def psd(height, sample_spacing, window=None):
"""Compute the power spectral density of a signal.
Parameters
----------
height : `numpy.ndarray`
height or phase data
sample_spacing : `float`
spacing of samples in the input data
window : {'welch', 'hann'} or ndarray, optional
Returns
-------
unit_x : `numpy.ndarray`
ordinate x frequency axis
unit_y : `numpy.ndarray`
ordinate y frequency axis
psd : `numpy.ndarray`
power spectral density
Notes
-----
See GH_FFT for a rigorous treatment of FFT scalings
https://holometer.fnal.gov/GH_FFT.pdf
"""
window = make_window(height, sample_spacing, window)
fft = m.ifftshift(m.fft2(m.fftshift(height * window)))
psd = abs(fft)**2 # mag squared first as per GH_FFT
fs = 1 / sample_spacing
S2 = (window**2).sum()
coef = S2 * fs * fs
psd /= coef
ux = forward_ft_unit(sample_spacing, height.shape[1])
uy = forward_ft_unit(sample_spacing, height.shape[0])
return ux, uy, psd
def bandlimited_rms(ux, uy, psd, wllow=None, wlhigh=None, flow=None, fhigh=None):
"""Calculate the bandlimited RMS of a signal from its PSD.
Parameters
----------
ux : `numpy.ndarray`
x spatial frequencies
uy : `numpy.ndarray`
y spatial frequencies
psd : `numpy.ndarray`
power spectral density
wllow : `float`
short spatial scale
wlhigh : `float`
long spatial scale
flow : `float`
low frequency
fhigh : `float`
high frequency
Returns
-------
`float`
band-limited RMS value.
"""
if wllow is not None or wlhigh is not None:
# spatial period given
if wllow is None:
flow = 0
else:
fhigh = 1 / wllow
if wlhigh is None:
fhigh = max(ux[-1], uy[-1])
else:
flow = 1 / wlhigh
elif flow is not None or fhigh is not None:
# spatial frequency given
if flow is None:
flow = 0
if fhigh is None:
fhigh = max(ux[-1], uy[-1])
else:
raise ValueError('must specify either period (wavelength) or frequency')
ux2, uy2 = m.meshgrid(ux, uy)
r, p = cart_to_polar(ux2, uy2)
if flow is None:
warnings.warn('no lower limit given, using 0 for low frequency')
flow = 0
if fhigh is None:
warnings.warn('no upper limit given, using limit imposed by data.')
fhigh = r.max()
work = psd.copy()
work[r < flow] = 0
work[r > fhigh] = 0
first = m.trapz(work, uy, axis=0)
second = m.trapz(first, ux, axis=0)
return m.sqrt(second)
def window_2d_welch(x, y, alpha=8):
"""Return a 2D welch window for a given alpha.
Parameters
----------
x : `numpy.ndarray`
x values, 1D array
y : `numpy.ndarray`
y values, 1D array
alpha : `float`
alpha (edge roll) parameter
Returns
-------
`numpy.ndarray`
window
"""
xx, yy = m.meshgrid(x, y)
r, _ = cart_to_polar(xx, yy)
rmax = m.sqrt(x.max()**2 + y.max()**2)
window = 1 - abs(r/rmax)**alpha
return window
def abc_psd(nu, a, b, c):
"""Lorentzian model of a Power Spectral Density.
Parameters
----------
nu : `numpy.ndarray` or `float`
spatial frequency
a : `float`
a coefficient
b : `float`
b coefficient
c : `float`
c coefficient
Returns
-------
`numpy.ndarray`
value of PSD model
"""
return a / (1 + (nu/b)**2)**(c/2)
def ab_psd(nu, a, b):
"""inverse power model of a Power Spectral Density.
Parameters
----------
nu : `numpy.ndarray` or `float`
spatial frequency
a : `float`
a coefficient
b : `float`
b coefficient
Returns
-------
`numpy.ndarray`
value of PSD model
"""
return a * nu ** (-b)