-
Notifications
You must be signed in to change notification settings - Fork 123
Description
Edit
If somebody read this before Monday Dec 30
I made a mistake in the matrix formulation of
Motivation
Thank you very much for this amazing package! I'm still in the process of diving deeper into it, but I would like to apply it for problems that involve 1D-interpolation.
Unfortunately, the pylops.signalprocessing.Interp only offers interpolation methods which are either not accurate enough ("linear", "nearest") or too computationally expensive ("sinc").
Proposed Feature
I would like to propose to incorporate a cubic spline interpolation into Interp which would be a good middle ground in terms of accuracy and computational load.
While I managed to implement it, I don't know how easy this can be cast into the backend-agnostic nature of pylops. On top of that, I'm not 100% sure if this can easily be generalised to arbitrary dimensions, altough I think that a 1D-only implementation would already be great.
Theoretical Background
I derived the following matrix notation from the German Wikipedia article on spline interpolation. It was a bit of tedious work with index-bookkeeping and reformulation of the equations, but I got it to work.
The matrix formulation for the interpolation would be
where
-
$\mathbf{X}$ is the design matrix that depends on the x-coordinates$x_{fit}$ of the data points to be fitted as well as the x-coordinates$x_{interp}$ of the data points to be interpolated -
$\mathbf{Q}$ is the matrix that obtains the coefficients for the cubic spline interpolation -
$\mathbf{y_{fit}}$ are the$n$ data points to be fitted -
$y_{interp}$ are the interpolated data points -
$\mathbf{O}$ is the interpolation operator
For natural boundary conditions (second derivative at the boundaries is zero), the matrix
where
-
$\mathbf{I}_{n}$ is the identity matrix of size$n\times n$ -
$\mathbf{A}$ is a tridiagonal matrix (see below) of size$n \times n$ -
$\mathbf{D}_{0+}$ is a second order finite difference matrix with zero-padding (see below) of size$n \times n$
The matrix
The matrix
Consequently, the transpose operation
In both the
Implementation
I would like to share my implementation that currently relies on scipy.sparse.linalg.LinearOperator because I didn't manage to inherit from Interp yet. Besides, it invokes scipy further for handing the sparse matrix X and solving the banded matrices with LAPACK.
I also implemented a custom finite difference operator with Numba to be fast.
The code was written in a Jupyter notebook and all the %magic operations are commented out. The timing results can be found below.
Edit
I adapted the code so that it reuses the factorizations fo gttrf and gttrs. So, intead of a factorization + solve call for every matrix-vector product, the factorization is only called once and the results are used in one solve per matrix-vector product.
# === Imports ===
from dataclasses import dataclass
from typing import Callable, Tuple, Type
import numpy as np
from numba import jit
from matplotlib import pyplot as plt
from pylops.signalprocessing import Interp
from pylops.utils import dottest
from scipy.linalg import get_lapack_funcs
from scipy.sparse import csr_matrix
from scipy.sparse.linalg import LinearOperator
# %matplotlib widget
# === Second Order Finite Differences ===
@jit(nopython=True)
def second_order_finite_differences_zero_padded(x: np.ndarray) -> np.ndarray:
"""
Computes the second order finite differences of ``x`` and pads the result with a
leading and trailing zero.
Parameters
----------
x : :class:`numpy.ndarray` of shape (n,)
The input array.
Returns
-------
x_diffs : :class:`numpy.ndarray` of shape (n,)
The second order finite differences of ``x`` padded with a leading and trailing
zero.
"""
assert x.ndim == 1
assert x.size >= 3
x_diffs = np.empty(shape=(x.size,), dtype=x.dtype)
x_diffs[0] = 0.0
for index in range(1, x.size - 1):
x_diffs[index] = x[index + 1] - 2.0 * x[index] + x[index - 1]
x_diffs[x.size - 1] = 0.0
return x_diffs
@jit(nopython=True)
def second_order_finite_differences_zero_padded_transposed(x: np.ndarray) -> np.ndarray:
"""
Computes the transposed operation of the second order finite differences operator
with subsequent zero padding, i.e.,
- ``x[0]`` and ``x[-1]`` are not considered,
- ``x[1:-1]`` is padded with 2 leading and 2 trailing zero,
- the second order finite differences of the padded array are computed
Parameters
----------
x : :class:`numpy.ndarray` of shape (n,)
The input array.
Returns
-------
x_diffs : :class:`numpy.ndarray` of shape (n,)
The result of the transposed second order finite differences operator with
subsequent zero padding.
"""
assert x.ndim == 1
assert x.size >= 3
if x.size == 3:
return np.array((x[1], -2.0 * x[1], x[1]))
if x.size == 4:
return np.array((x[1], -2.0 * x[1] + x[2], x[1] - 2.0 * x[2], x[2]))
x_diffs = np.empty(shape=(x.size,), dtype=x.dtype)
x_diffs[0] = x[1]
x_diffs[1] = -2.0 * x[1] + x[2]
for index in range(2, x.size - 2):
x_diffs[index] = x[index - 1] - 2.0 * x[index] + x[index + 1]
x_diffs[x.size - 2] = x[x.size - 3] - 2.0 * x[x.size - 2]
x_diffs[x.size - 1] = x[x.size - 2]
return x_diffs
# === Tridiagonal Matrices ===
@dataclass
class TridiagonalMatrix:
"""
Represents a tridiagonal matrix with
- the main diagonal ``main_diagonal``,
- the super-diagonal ``super_diagonal``,
- the sub-diagonal ``sub_diagonal``.
"""
main_diagonal: np.ndarray
super_diagonal: np.ndarray
sub_diagonal: np.ndarray
def __post_init__(self) -> None:
"""
Validates the input.
"""
assert self.main_diagonal.ndim == 1
assert self.super_diagonal.ndim == 1
assert self.sub_diagonal.ndim == 1
assert self.main_diagonal.size == self.super_diagonal.size + 1
assert self.main_diagonal.size == self.sub_diagonal.size + 1
def __iter__(self):
"""
Returns an iterator over the sub-diagonal, main diagonal and super-diagonal
(in that order) as required for the LAPACK routines ``?gttrf``.
"""
return iter((self.sub_diagonal, self.main_diagonal, self.super_diagonal))
@property
def T(self) -> "TridiagonalMatrix":
"""
Returns the transpose of the tridiagonal matrix.
"""
return TridiagonalMatrix(
main_diagonal=self.main_diagonal,
super_diagonal=self.sub_diagonal,
sub_diagonal=self.super_diagonal,
)
@dataclass
class TridiagonalLUDecomposition:
"""
Represents the LU decomposition of a tridiagonal matrix as performed by the LAPACK
routines ``?gttrf``.
"""
sub_diagonal_lu: np.ndarray
main_diagonal_lu: np.ndarray
super_diagonal_lu: np.ndarray
super_two_diagonal_lu: np.ndarray
pivot_indices: np.ndarray
def __iter__(self):
"""
Returns an iterator over the sub-diagonal, main diagonal, super-diagonal,
the second super-diagonal (filled by pivoting) and the pivot indices
(in that order) as required for the LAPACK routines ``?gttrs``.
"""
return iter(
(
self.sub_diagonal_lu,
self.main_diagonal_lu,
self.super_diagonal_lu,
self.super_two_diagonal_lu,
self.pivot_indices,
)
)
@staticmethod
def from_tridiagonal_matrix(
matrix: TridiagonalMatrix,
lapack_factorizer: Callable,
) -> "TridiagonalLUDecomposition":
"""
Computes the LU decomposition of a tridiagonal matrix using the LAPACK routine
``?gttrf``.
Parameters
----------
matrix : :class:`TridiagonalMatrix`
The tridiagonal matrix to decompose.
lapack_factorizer : callable
The LAPACK routine ``?gttrf`` to use for the decomposition.
Returns
-------
lu_decomposition : :class:`TridiagonalLUDecomposition`
The LU decomposition of the tridiagonal matrix.
"""
(
sub_diagonal_lu,
main_diagonal_lu,
super_diagonal_lu,
super_two_diagonal_lu,
pivot_indices,
info,
) = lapack_factorizer(*matrix)
assert info == 0
return TridiagonalLUDecomposition(
sub_diagonal_lu=sub_diagonal_lu,
main_diagonal_lu=main_diagonal_lu,
super_diagonal_lu=super_diagonal_lu,
super_two_diagonal_lu=super_two_diagonal_lu,
pivot_indices=pivot_indices,
)
def solve(self, rhs: np.ndarray, lapack_solver: Callable) -> np.ndarray:
"""
Solves the linear system of equations ``A @ x = rhs`` where ``A`` is the
tridiagonal matrix represented by the LU decomposition. For this, the LAPACK
routine ``?gttrs`` is used.
Parameters
----------
rhs : :class:`numpy.ndarray` of shape (n,)
The right-hand side of the linear system of equations.
lapack_solver : callable
The LAPACK routine ``?gttrs`` to use for solving the system.
Returns
-------
x : :class:`numpy.ndarray` of shape (n,)
The solution of the linear system of equations.
"""
x, info = lapack_solver(*self, rhs)
assert info == 0
return x
def _make_cubic_spline_left_hand_side(
dims: int,
) -> TridiagonalMatrix:
"""
Constructs the banded matrix ``A`` for the linear system of equations ``A @ m = b``
where
- ``A`` is a diagonally dominant tridiagonal matrix with the main diagonal
being ``[1, 2/3, 2/3, ..., 2/3, 2/3, 1]``, the super-diagonal being
``[0, 1/6, 1/6, ..., 1/6, 1/6]`` and the sub-diagonal being
``[1/6, 1/6, ..., 1/6, 1/6, 0]``,
- ``m`` is the unknown vector of spline coefficients,
- ``b`` is the second order finite differences of the ``y`` values for which the
spline should interpolate.
Parameters
----------
dims : :class:`int`
The number of points the spline should interpolate.
Returns
-------
matrix_a : :class:`TridiagonalMatrix`
The tridiagonal matrix ``A``.
"""
main_diag = np.empty(shape=(dims,), dtype=np.float64)
super_diag = np.empty(shape=(dims - 1,), dtype=np.float64)
main_diag[0] = 1.0
main_diag[1 : dims - 1] = 0.6666666666666666
main_diag[dims - 1] = 1.0
super_diag[0] = 0.0
super_diag[1 : dims - 1] = 0.16666666666666666
return TridiagonalMatrix(
main_diagonal=main_diag,
super_diagonal=super_diag,
sub_diagonal=np.flip(super_diag).copy(), # to avoid a view
)
# === Custom Cubic Spline Interpolator ===
@jit(nopython=True)
def _make_cubic_spline_x_csr_specs(
dims: int,
iava: np.ndarray,
base_indices: np.ndarray,
iava_remainders: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Constructs the specifications ``data``, ``indices``, and ``indptr`` for a
:class:`scipy.sparse.csr_matrix` ``X`` that can be used to interpolate the
equally spaced ``y`` values with a cubic spline like ``X @ Q @ y`` where
- ``X`` is the interpolation matrix to be constructed,
- ``Q`` is the linear operator that obtains the spline coefficients ``y``
(a vertical concatenation of the ``y`` values and the spline coefficients
``m``),
- ``y`` are the values to be interpolated.
Parameters
----------
dims : :class:`int`
The number of points the spline should interpolate.
iava : :class:`numpy.ndarray` of shape (n,)
Floating indices of the locations to which the spline should interpolate.
base_indices : :class:`numpy.ndarray` of shape (n,)
The indices of the respective first data point in ``y`` for the intervals
in which the corresponding ``iava`` values lie.
iava_remainders : :class:`numpy.ndarray` of shape (n,)
The remainders of the ``iava`` values after subtracting the respective
``base_indices``.
Returns
-------
data : :class:`numpy.ndarray` of shape (4 * ``iava.size``,)
The data array of the CSR matrix ``X``
indices : :class:`numpy.ndarray` of shape (4 * ``iava.size``,)
The column indices of the CSR matrix ``X``
indptr : :class:`numpy.ndarray` of shape (``iava.size`` + 1,)
The row pointers of the CSR matrix ``X``
"""
# some auxiliary variables are required
iava_remainders_cubic = ( # (x - x[i])**3
iava_remainders * iava_remainders * iava_remainders
)
one_minus_iava_remainders = 1.0 - iava_remainders # (x[i + 1] - x)
one_minus_iava_remainders_cubic = ( # (x[i + 1] - x)**33
one_minus_iava_remainders
* one_minus_iava_remainders
* one_minus_iava_remainders
)
# for each data point, except for the first and the last one, we need 4 entries
# to multiply with ``y[i]``, ``y[i + 1]``, ``m[i]``, and ``m[i + 1]``;
data = np.empty(shape=(4 * iava.size,), dtype=np.float64)
indices = np.empty(shape=(4 * iava.size,), dtype=np.intp)
indptr = np.arange(0, 4 * (iava.size + 1), 4, dtype=np.int64)
# the rows are specified by means of a loop
for row_index in range(0, iava.size):
index_offset = 4 * row_index
data[index_offset] = one_minus_iava_remainders[row_index]
data[index_offset + 1] = iava_remainders[row_index]
data[index_offset + 2] = 0.16666666666666666 * (
one_minus_iava_remainders_cubic[row_index]
- one_minus_iava_remainders[row_index]
)
data[index_offset + 3] = 0.16666666666666666 * (
iava_remainders_cubic[row_index] - iava_remainders[row_index]
)
indices[index_offset] = base_indices[row_index]
indices[index_offset + 1] = base_indices[row_index] + 1
indices[index_offset + 2] = base_indices[row_index] + dims
indices[index_offset + 3] = base_indices[row_index] + dims + 1
return data, indices, indptr
class CubicSplineInterp(LinearOperator):
"""
Custom cubic spline interpolator.
Parameters
----------
dims : :class:`int`
The number of points the spline should interpolate.
iava : :class:`numpy.ndarray` of shape (n,)
Floating indices of the locations to which the spline should interpolate.
dtype : type
The data type of the input and output arrays.
"""
def __init__(
self,
dims: int,
iava: np.ndarray,
dtype: Type = np.float64,
) -> None:
self.dims: int = dims
self.iava: np.ndarray = iava
self.shape: Tuple[int, int] = (iava.size, dims)
self.dtype: Type = dtype
self.base_indices = np.clip(
iava.astype(np.int64),
a_min=0,
a_max=dims - 2,
)
self.iava_remainders = iava - self.base_indices
self._tridiag_factorize = get_lapack_funcs(("gttrf",), dtype=np.float64)[0]
self._tridiag_lu_solve = get_lapack_funcs(("gttrs",), dtype=self.dtype)[0]
self.lhs_matrix: TridiagonalMatrix = (
_make_cubic_spline_left_hand_side(dims=dims)
)
self.lhs_matrix_lu = TridiagonalLUDecomposition.from_tridiagonal_matrix(
matrix=self.lhs_matrix,
lapack_factorizer=self._tridiag_factorize,
)
self.lhs_matrix_transposed_lu = (
TridiagonalLUDecomposition.from_tridiagonal_matrix(
matrix=self.lhs_matrix.T,
lapack_factorizer=self._tridiag_factorize,
)
)
self.X_matrix: csr_matrix = csr_matrix(
_make_cubic_spline_x_csr_specs(
dims=dims,
iava=iava,
base_indices=self.base_indices,
iava_remainders=self.iava_remainders,
),
shape=(iava.size, 2 * dims),
)
self.X_matrix_transposed: csr_matrix = self.X_matrix.transpose().tocsr() # type: ignore
def _matvec(self, x: np.ndarray) -> np.ndarray:
m_coeffs = self.lhs_matrix_lu.solve(
rhs=second_order_finite_differences_zero_padded(x),
lapack_solver=self._tridiag_lu_solve,
)
return self.X_matrix @ np.concatenate((x, m_coeffs))
def _rmatvec(self, x: np.ndarray) -> np.ndarray:
x_mod = self.X_matrix_transposed @ x
return (
x_mod[0:self.dims]
+ second_order_finite_differences_zero_padded_transposed(
self.lhs_matrix_transposed_lu.solve(
rhs=x_mod[self.dims: x_mod.size],
lapack_solver=self._tridiag_lu_solve,
)
)
)
# === Interpolators Test ===
# A complex signal is created to test the interpolators
x = np.arange(50)
x_new = np.linspace(0, x.max() - np.spacing(x.max()), 50_001)
y = np.sin(6.0 * np.pi * (x / x.size)) + 1.0j * np.cos(4.0 * np.pi * (x / x.size))
y = y.astype(np.complex128)
linop_linear = Interp(
dims=x.size,
iava=x_new,
kind="linear",
)[0]
y_new_linear = linop_linear @ y
print("Timing PyLops Linear Interpolator")
print("Creation")
# %timeit linop_linear = Interp(dims=x.size, iava=x_new, kind="linear", dtype="complex128",)
print("Matrix-Vector Multiplication")
# %timeit y_new_linear = linop_linear @ y
print("Transpose Operation")
# %timeit linop_linear.T @ y_new_linear
linop_sinc = Interp(
dims=x.size,
iava=x_new,
kind="sinc",
)[0]
y_new_sinc = linop_sinc @ y
print("\nTiming PyLops Sinc Interpolator")
print("Creation")
# %timeit linop_sinc = Interp(dims=x.size, iava=x_new, kind="sinc", dtype="complex128",)
print("Matrix-Vector Multiplication")
# %timeit y_new_sinc = linop_sinc @ y
print("Transpose Operation")
# %timeit linop_sinc.T @ y_new_sinc
linop_spline = CubicSplineInterp(dims=x.size, iava=x_new, dtype=np.complex128,)
print("\nTesting custom Spline Interpolator")
assert dottest(linop_spline, nr=x_new.size, nc=x.size, complexflag=3)
print("Passed!")
y_new_spline = linop_spline @ y
print("\nTiming custom Spline Interpolator")
print("Creation")
# %timeit linop_spline = CubicSplineInterp(dims=x.size, iava=x_new, dtype=np.complex128,)
print("Matrix-Vector Multiplication")
# %timeit y_new_sline = linop_spline @ y
print("Transpose Operation")
# %timeit linop_spline.T @ y_new_spline
# === Plotting ===
fig, ax = plt.subplots(nrows=2, sharex=True, figsize=(10, 6))
ax[0].plot(x, y.real, "o", label="Original data")
ax[0].plot(x_new, np.sin(6.0 * np.pi * (x_new / x.size)), label="Ground truth")
ax[0].plot(x_new, y_new_spline.real, label="Cubic spline interpolation")
ax[0].plot(x_new, y_new_linear.real, label="Linear interpolation", linestyle=":")
ax[0].plot(x_new, y_new_sinc.real, label="Sinc interpolation", linestyle="--")
ax[0].set_ylabel("Real part")
ax[0].legend()
ax[1].plot(x, y.imag, "o", label="Original data")
ax[1].plot(x_new, np.cos(4.0 * np.pi * (x_new / x.size)), label="Ground truth")
ax[1].plot(x_new, y_new_spline.imag, label="Cubic spline interpolation")
ax[1].plot(x_new, y_new_linear.imag, label="Linear interpolation", linestyle=":")
ax[1].plot(x_new, y_new_sinc.imag, label="Sinc interpolation", linestyle="--")
ax[1].set_xlabel("Index")
ax[1].set_ylabel("Imaginary part")
ax[1].legend()
plt.show()Results
Correctness and Timing
The timings for this test with a relatively large complex Array show that the cubic spline interpolation in this basic version is competitive. It passes dottest and its runtime is comparable to the linear interpolation.
Timing PyLops Linear Interpolator
Creation
2.04 ms ± 106 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Matrix-Vector Multiplication
314 µs ± 4.53 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Transpose Operation
218 µs ± 11.4 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Timing PyLops Sinc Interpolator
Creation
12.7 ms ± 228 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Matrix-Vector Multiplication
3.67 ms ± 146 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Transpose Operation
5.43 ms ± 389 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Testing custom Spline Interpolator
Passed!
Timing custom Spline Interpolator
Creation
750 µs ± 14.7 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Matrix-Vector Multiplication
260 µs ± 12.1 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Transpose Operation
290 µs ± 4.53 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)Visualisation
In one go, the real and imaginary part of the complex test data are interpolated separately as intended. For this example, the cubic spline gives higher accuracy than the linear method. Besides, it also does not suffer from the ringing introduces by the sinc-interpolation of the imaginary part.
