Source code for hmc_mir.tsm_tools.tsm_tools

"""Time Series Modulation

Code is from https://github.com/HMC-MIR/PianoConcertoAccompaniment/blob/main/tsm_tools.ipynb and https://www.mdpi.com/128016

"""

import numpy as np
import librosa as lb
from scipy.signal import medfilt

[docs]def harmonic_percussive_separation(x, sr=22050, fft_size = 2048, hop_length=512, lh=6, lp=6): '''Performs harmonic-percussive separation on a given audio sample. Args x: the input audio waveform sr: sample rate of the audio data fft_size: specifies the FFT size to use in computing an STFT hop_length: specifies the hop length in samples to use in computing an STFT lh: the harmonic spectrotemporal median filter will be of size (1, 2*lh+1) lp: the percussive spectrotemporal median filter will be of size (2*lp+1, 1) Returns: xh: audio waveform of the estimated harmonic component xp: audio waveform of the estimated percussive component Xh: an STFT of the input signal with percussive components masked out Xp: an STFT of the input signal with harmonic components masked out ''' window = hann_window(fft_size) X = lb.core.stft(x, n_fft=fft_size, hop_length=512, window=window, center=False) Y = np.abs(X) Yh = medfilt(Y, (1, 2*lh+1)) Yp = medfilt(Y, (2*lp+1, 1)) Mh = (Yh > Yp) Mp = np.logical_not(Mh) Xh = X * Mh Xp = X * Mp xh = invert_stft(Xh, hop_length, window) xp = invert_stft(Xp, hop_length, window) return xh, xp, Xh, Xp
[docs]def hann_window(L): '''Returns a Hann window of a specified length. Args: L: length of window to return ''' w = .5 * (1 - np.cos(2*np.pi * np.arange(L)/ L)) return w
[docs]def invert_stft(S, hop_length, window): '''Reconstruct a signal from a modified STFT matrix. Args: S: modified STFT matrix hop_length: the synthesis hop size in samples window: an array specifying the window used for FFT analysis Returns: A time-domain signal y whose STFT is closest to S in squared error distance. ''' L = len(window) # construct full stft matrix fft_size = (S.shape[0] - 1) * 2 Sfull = np.zeros((fft_size, S.shape[1]), dtype=np.complex64) Sfull[0:S.shape[0],:] = S Sfull[S.shape[0]:,:] = np.conj(np.flipud(S[1:fft_size//2,:])) # compute inverse FFTs frames = np.zeros_like(Sfull) for i in range(frames.shape[1]): frames[:,i] = np.fft.ifft(Sfull[:,i]) frames = np.real(frames) # remove imaginary components due to numerical roundoff # synthesis frames num = window.reshape((-1,1)) den = calc_sum_squared_window(window, hop_length) #den = np.square(window) + np.square(np.roll(window, hop_length)) frames = frames * window.reshape((-1,1)) / den.reshape((-1,1)) #frames = frames * window.reshape((-1,1)) # reconstruction y = np.zeros(hop_length*(frames.shape[1]-1) + L) for i in range(frames.shape[1]): offset = i * hop_length y[offset:offset+L] += frames[:,i] return y
[docs]def calc_sum_squared_window(window, hop_length): '''Calculates the denominator term for computing synthesis frames. Inputs: window: array specifying the window used in FFT analysis hop_length: the synthesis hop size in samples Returns: An array specifying the normalization factor. ''' assert (len(window) % hop_length == 0), "Hop length does not divide the window evenly." numShifts = len(window) // hop_length den = np.zeros_like(window) for i in range(numShifts): den += np.roll(np.square(window), i*hop_length) return den
[docs]def tsm_phase_vocoder(x, alpha = 1.0, L = 2048, sr = 22050): '''Time stretches the input signal using the phase vocoder method. Uses a synthesis hop size that is one-fourth the value of L. Note that this implementation allows for a non-integer analysis hop size (in samples), which ensures that the time-scale modification factor is exactly as specified. Args: x: the input signal alpha: the time stretch factor, which is defined as the ratio of the synthesis hop size to the analysis hop size L: the length of each analysis frame in samples sr: sampling rate Returns: the time-stretched signal y. ''' assert(L % 4 == 0), "Frame length must be divisible by four." Hs = L // 4 # compute STFT Ha = Hs/alpha # allow non-integer values window = hann_window(L) X, analysis_frame_offsets = my_stft(x, L, Ha, window) # custom implementation to handle non-integer hop size # compute modified STFT w_if = estimateIF_var(X, sr, analysis_frame_offsets/sr) # custom implementation to handle non-constant frame locations phase_mod = np.zeros(X.shape) phase_mod[:,0] = np.angle(X[:,0]) for i in range(1, phase_mod.shape[1]): phase_mod[:,i] = phase_mod[:,i-1] + w_if[:,i-1] * Hs / sr Xmod = np.abs(X) * np.exp(1j * phase_mod) # signal reconstruction y = invert_stft(Xmod, Hs, window) #y = lb.core.istft(Xmod, hop_length=Hs, center=False) return y
[docs]def my_stft(x, N, hop_length, window): '''A custom implementation of the STFT that allows for non-integer hop lengths (in samples). Args: x: the input audio waveform N: the FFT size hop_length: the hop size specified in samples, can be a non-integer window: the window to apply to the analysis frames Returns: X: the computed STFT matrix frame_offsets: a list specifying the offsets (in samples) of each analysis frame ''' assert len(window) == N # get analysis frames numFrames = int((len(x) - N) // hop_length) + 1 analysisFrames = np.zeros((numFrames, N)) frame_offsets = np.rint(np.arange(numFrames) * hop_length).astype(int) for i, offset in enumerate(frame_offsets): analysisFrames[i,:] = x[offset: offset + N] # compute STFT analysisFrames = analysisFrames * window.reshape((1,-1)) Xfull = np.fft.fft(analysisFrames, axis=1) halfLen = N//2 + 1 X = Xfull[:,0:halfLen].T return X, frame_offsets
[docs]def estimateIF(S, sr, hop_samples): '''Estimates the instantaneous frequencies in a STFT matrix. This function is not actually used in our custom implementation of the phase vocoder -- it is included here as a contrast to estimateIF_var() below. Args: S: the STFT matrix, should only contain the lower half of the frequency bins sr: sampling rate hop_samples: the hop size of the STFT analysis in samples Returns: A matrix containing the estimated instantaneous frequency at each time-frequency bin. This matrix should contain one less column than S. ''' hop_sec = hop_samples / sr fft_size = (S.shape[0] - 1) * 2 w_nom = np.arange(S.shape[0]) * sr / fft_size * 2 * np.pi w_nom = w_nom.reshape((-1,1)) unwrapped = np.angle(S[:,1:]) - np.angle(S[:,0:-1]) - w_nom * hop_sec wrapped = (unwrapped + np.pi) % (2 * np.pi) - np.pi w_if = w_nom + wrapped / hop_sec return w_if
[docs]def tsm_overlap_add(x, alpha = 1.0, L = 220): '''Time stretches the input signal using the overlap-add method. Uses a synthesis hop size that is half the value of L. Note that this implementation allows for a non-integer analysis hop size (in samples), which ensures that the time-scale modification factor is exactly as specified. Args: x: the input signal alpha: the time stretch factor, which is defined as the ratio of the synthesis hop size to the analysis hop size L: the length of each analysis frame in samples Returns: the time-stretched signal y ''' assert(L % 2 == 0), "Frame length must be even." Hs = L // 2 # compute analysis frames Ha = Hs/alpha # allow non-integer analysis hop size numFrames = int((len(x) - L) // Ha) + 1 analysisFrames = np.zeros((L, numFrames)) for i in range(numFrames): offset = int(np.round(i * Ha)) analysisFrames[:, i] = x[offset: offset + L] # reconstruction synthesisFrames = analysisFrames * hann_window(L).reshape((-1,1)) # use broadcasting y = np.zeros(Hs * (numFrames-1) + L) for i in range(numFrames): offset = i * Hs y[offset:offset+L] += synthesisFrames[:,i] return y
[docs]def mix_recordings(x1, x2): '''Mixes two audio waveforms together. Args: x1: first audio waveform x2: second audio waveform Returns: An audio waveform that is an average of the two waveforms. The length of the returned waveform is the minimum of the two lengths. ''' min_length = min(len(x1), len(x2)) y = .5 * (x1[0:min_length] + x2[0:min_length]) return y
[docs]def tsm_hybrid(x, alpha=1.0, sr=22050): '''Time stretches the input signal using a hybrid method that combines overlap-add and phase vocoding. Args: x: the input signal alpha: the time stretch factor, which is defined as the ratio of the synthesis hop size to the analysis hop size sr: sampling rate Returns: the time-stretched signal y. ''' xh, xp, _, _ = harmonic_percussive_separation(x) xh_stretched = tsm_phase_vocoder(xh, alpha) xp_stretched = tsm_overlap_add(xp, alpha) y = mix_recordings(xh_stretched, xp_stretched) return y
[docs]def estimateIF_var(S, sr, timestamps): ''' Estimates the instantaneous frequencies in an STFT-like matrix when the analysis frames are not evenly spaced. Args: S: the STFT-like matrix, should only contain the lower half of the frequency bins sr: sampling rate timestamps: timestamps corresponding to each STFT column (in sec) Returns: A matrix containing the estimated instantaneous frequency at each time-frequency bin. This matrix should contain one less column than S. ''' assert S.shape[1] == len(timestamps) # hop_sec = hop_samples / sr fft_size = (S.shape[0] - 1) * 2 w_nom = np.arange(S.shape[0]) * sr / fft_size * 2 * np.pi w_nom = w_nom.reshape((-1,1)) unwrapped = np.angle(S[:,1:]) - np.angle(S[:,0:-1]) - w_nom * (timestamps[1:] - timestamps[0:-1]).reshape((1,-1)) wrapped = (unwrapped + np.pi) % (2 * np.pi) - np.pi w_if = w_nom + wrapped / (timestamps[1:] - timestamps[0:-1]).reshape((1,-1)) return w_if
[docs]def tsmvar_overlap_add(x, alignment, L = 220, fs = 22050): ''' Time stretches the input signal using the overlap-add method according to a given alignment. Uses a synthesis hop size that is half the value of L. Args: x: the input signal (orchestra only) alignment: a 2xN matrix specifying the desired alignment in seconds. The first row indicates the timestamp in the input signal, and the last row indicates where in the output signal the instant should occur. L: the length of each analysis frame in samples fs: sample rate of input signal Returns: The variable time-stretched signal y ''' assert(L % 2 == 0), "Frame length must be even." Hs = L // 2 # determine interpolation points target_dur = alignment[1,-1] # in sec target_start = alignment[1,0] # if a subsequence alignment, output will be zero until target_start (in sec) numFrames = int((target_dur * fs - L) // Hs) + 1 analysisFrames = np.zeros((L, numFrames)) interp_pts = np.interp(np.arange(numFrames)*Hs/fs, alignment[1,:], alignment[0,:]) # left edge of analysis windows # compute analysis frames for i in range(numFrames): if i*Hs/fs >= target_start: offset = int(np.round(interp_pts[i] * fs)) offset = min(offset, len(x) - L) analysisFrames[:, i] = x[offset: offset + L] # reconstruction synthesisFrames = analysisFrames * hann_window(L).reshape((-1,1)) # use broadcasting y = np.zeros(Hs * (numFrames-1) + L) for i in range(numFrames): offset = i * Hs y[offset:offset+L] += synthesisFrames[:,i] return y
[docs]def tsmvar_phase_vocoder(x, alignment, L = 2048, fs = 22050): ''' Time stretches the input signal using a phase vocoder according to a given alignment. Uses a synthesis hop size that is one-fourth the value of L. Args: x: the input signal alignment: a 2xN matrix specifying the desired alignment in seconds. The first row indicates the timestamp in the input signal, and the last row indicates where in the output signal the instant should occur. L: the length of each analysis frame in samples fs: sampling rate Return: The variable time-stretched signal y ''' assert(L % 4 == 0), "Frame length must be divisible by four." Hs = L // 4 # determine interpolation points target_dur = alignment[1,-1] # in sec target_start_frm = int(np.ceil(alignment[1,0] * fs / Hs)) # if a subsequence alignment, output will be zero until target_start_frm numFrames = int((target_dur * fs - L) // Hs) + 1 analysisFrames = np.zeros((numFrames, L)) interp_pts = np.interp(np.arange(numFrames)*Hs/fs, alignment[1,:], alignment[0,:]) # left edge of analysis windows # compute analysis frames for i in range(numFrames): if i >= target_start_frm: offset = int(np.round(interp_pts[i] * fs)) offset = min(offset, len(x) - L) analysisFrames[i,:] = x[offset: offset + L] # compute STFT window = hann_window(L) analysisFrames = analysisFrames * window.reshape((1,-1)) Xfull = np.fft.fft(analysisFrames, axis=1) halfLen = L//2 + 1 X = Xfull[:,0:halfLen].T # compute modified STFT w_if = estimateIF_var(X[:,target_start_frm:], fs, interp_pts[target_start_frm:]) # only for active frames phase_mod = np.zeros(X.shape) phase_mod[:,target_start_frm] = np.angle(X[:,target_start_frm]) for i in range(target_start_frm + 1, phase_mod.shape[1]): phase_mod[:,i] = phase_mod[:,i-1] + w_if[:,i-target_start_frm-1] * Hs / fs Xmod = np.abs(X) * np.exp(1j * phase_mod) # signal reconstruction y = invert_stft(Xmod, Hs, window) #y = lb.core.istft(Xmod, hop_length=Hs, center=False) return y
[docs]def tsmvar_hybrid(x, alignment, sr=22050): ''' Time stretches the input signal using a hybrid method that combines overlap-add and phase vocoding. The time stretch factor is specified at each time instant by the provided alignment. Args: x: the input signal alignment: a 2xN matrix specifying the desired alignment in seconds. The first row indicates the timestamp in the input signal, and the last row indicates where in the output signal the instant should occur. sr: sampling rate Returns: The variable time-stretched signal y ''' xh, xp, _, _ = harmonic_percussive_separation(x) xh_stretched = tsmvar_phase_vocoder(xh, alignment) xp_stretched = tsmvar_overlap_add(xp, alignment) y = mix_recordings(xh_stretched, xp_stretched) return y