Source code for pyampact.alignmentUtils

"""
alignmentUtils
==============

.. autosummary::
    :toctree: generated/

    dp
    gh    
    g
    orio_simmx    
    maptimes    
    f0_est_weighted_sum
    f0_est_weighted_sum_spec
    trim_silences
    merge_grace_notes

"""

import wave
import numpy as np
import pandas as pd
import librosa
import warnings


__all__ = [
    "dp",
    "gh",
    "g",
    "orio_simmx",    
    "maptimes",
    "f0_est_weighted_sum",
    "f0_est_weighted_sum_spec",
    "trim_silences",
    "merge_grace_notes"
]


[docs] def dp(M): """ Port of Dan Ellis dp.m [p,q,D] = dp(M) """ M = np.asarray(M, dtype=float) r, c = M.shape D = np.zeros((r + 1, c + 1), dtype=float) D[0, :] = np.nan D[:, 0] = np.nan D[0, 0] = 0.0 D[1:r+1, 1:c+1] = M phi = np.zeros((r, c), dtype=np.int32) for i in range(r): for j in range(c): # MATLAB: min([D(i,j), D(i,j+1), D(i+1,j)]) choices = np.array([D[i, j], D[i, j+1], D[i+1, j]]) tb = int(np.nanargmin(choices)) + 1 # 1..3 D[i+1, j+1] = D[i+1, j+1] + choices[tb - 1] phi[i, j] = tb # Traceback from bottom-right (r-1,c-1) to (0,0) i = r - 1 j = c - 1 p = [i] q = [j] while i > 0 or j > 0: if i == 0: j -= 1 elif j == 0: i -= 1 else: tb = phi[i, j] if tb == 1: i -= 1 j -= 1 elif tb == 2: i -= 1 elif tb == 3: j -= 1 else: raise RuntimeError("bad traceback") p.insert(0, i) q.insert(0, j) D = D[1:r+1, 1:c+1] return np.asarray(p, dtype=int), np.asarray(q, dtype=int), D
[docs] def gh(v1, i1, v2, i2, domain, frac=0.5): """ Get an element that is `frac` fraction of the way between `v1[i1]` and `v2[i2]`, but check bounds on both vectors. `frac` of 0 returns `v1[i1]`, `frac` of 1 returns `v2[i2]`, `frac` of 0.5 (the default) returns halfway between them. Parameters ---------- v1 : np.ndarray A 1D numpy array representing the first vector. i1 : int The index in `v1` from which to retrieve the value. v2 : np.ndarray A 1D numpy array representing the second vector. i2 : int The index in `v2` from which to retrieve the value. domain : tuple A tuple representing the valid bounds for both vectors. This should define the minimum and maximum allowable indices. frac : float, optional A fraction indicating how far between the two specified elements to interpolate. Default is 0.5. Returns ------- float The element that is `frac` fraction of the way between `v1[i1]` and `v2[i2]`, clipped to the specified domain bounds. """ x1 = g(v1, i1, domain) x2 = g(v2, i2, domain) return int(frac * x1 + (1 - frac) * x2)
[docs] def g(vec, idx, domain): """ Get an element from `vec`, checking bounds. `Domain` is the set of points that `vec` is a subset of. Parameters ---------- vec : np.ndarray A 1D numpy array representing the input vector. idx : int The index of the desired element in `vec`. domain : np.ndarray A 1D numpy array representing the set of valid points, of which `vec` is a subset. Returns ------- float The element from `vec` at index `idx` if it is within bounds; otherwise, the first element of `domain` if `idx` is less than 0, or the last element of `domain` if `idx` exceeds the bounds of `vec`. """ if idx < 1: return domain[0] elif idx > len(vec): return domain[-1] else: return vec[idx - 1]
[docs] def orio_simmx(M, D): """ Calculate an Orio&Schwartz-style (Peak Structure Distance) similarity matrix. Parameters ---------- M : np.ndarray A binary mask where each column corresponds to a row in the output similarity matrix S. The mask indicates the presence or absence of MIDI notes or relevant features. D : np.ndarray The regular spectrogram where the columns of the similarity matrix S correspond to the columns of D. This spectrogram represents the audio signal over time and frequency. Returns ------- np.ndarray The similarity matrix S, calculated based on the Peak Structure Distance between the binary mask M and the spectrogram D. """ # Convert to NumPy arrays if input is DataFrame M = M.values if isinstance(M, pd.DataFrame) else M D = D.values if isinstance(D, pd.DataFrame) else D # Calculate the similarities S = np.zeros((M.shape[1], D.shape[1])) D = D**2 M = M**2 nDc = np.sqrt(np.sum(D, axis=0)) nDc = nDc + (nDc == 0) # Evaluate one row at a time with warnings.catch_warnings(): # Suppress imaginary number warnings, for now warnings.filterwarnings("ignore", category=RuntimeWarning) for r in range(M.shape[1]): S[r, :] = np.sqrt(M[:, r] @ D) / nDc return S
[docs] def maptimes(t, intime, outtime): """ Map onset/offset times using a monotone linear interpolation from `intime` to `outtime`. Handles duplicate `intime` entries (DTW path repeats) by averaging their `outtime`. """ t = np.asarray(t, dtype=float) intime = np.asarray(intime, dtype=float) outtime = np.asarray(outtime, dtype=float) original_shape = t.shape x = t.reshape(-1) # Sort by intime order = np.argsort(intime) intime = intime[order] outtime = outtime[order] # Collapse duplicate intime values by averaging outtime uniq, inv = np.unique(intime, return_inverse=True) out_sum = np.bincount(inv, weights=outtime) out_cnt = np.bincount(inv) out_u = out_sum / np.maximum(out_cnt, 1) # Enforce nondecreasing outtime (DTW should be monotone; this prevents tiny inversions) out_u = np.maximum.accumulate(out_u) # Linear interpolation with endpoint clamping y = np.interp(x, uniq, out_u, left=out_u[0], right=out_u[-1]) return y.reshape(original_shape)
# These are the new functions to replace calculate_f0_est
[docs] def f0_est_weighted_sum(x, f, f0i, fMax=20000, fThresh=None): """ Calculate F0, power, and spectrum for an inputted spectral representation. Parameters ---------- x : np.ndarray, shape (F, T) Matrix of complex spectrogram values, where F is the number of frequency bins and T is the number of time frames. f : np.ndarray, shape (F, T) Matrix of frequencies corresponding to each of the spectrogram values in `x`. f0i : np.ndarray, shape (1, T) Initial estimates of F0 for each time frame. This should be a 1D array containing the F0 estimates for each time point. fMax : float, optional Maximum frequency to consider in the weighted sum. Defaults to 5000 Hz. fThresh : float, optional Maximum distance in Hz from each harmonic to consider. If not specified, no threshold will be applied. Returns ------- f0 : np.ndarray Vector of estimated F0s from the beginning to the end of the input time series. p : np.ndarray Vector of corresponding "powers" derived from the weighted sum of the estimated F0. strips : np.ndarray Estimated spectrum for each partial frequency based on the weighted contributions. """ # if ~exist('fMax', 'var') || isempty(fMax), fMax = 5000; end # if ~exist('fThresh', 'var') || isempty(fThresh), fThresh = 2*median(diff(f(:,1))); end # if fMax is None: # fMax = 5000 if fThresh is None: fThresh = 2 * np.nanmedian(np.diff(f[:, 0])) f[np.isnan(f)] = 0 x2 = np.abs(x) ** 2 np.isnan(x2) wNum = np.zeros_like(x2) wDen = np.zeros_like(x2) maxI = np.max(fMax / f0i[f0i > 0]) strips = [] for i in range(1, int(maxI) + 1): mask = np.abs(f - (f0i * i)) < fThresh strip = x2 * mask strips.append(strip) wNum += 1 / i * strip wDen += strip wNum *= (f < fMax) wDen *= (f < fMax) f0 = np.sum(wNum * f, axis=0) / np.sum(wDen, axis=0) pow = np.sum(wDen, axis=0) return f0, pow, strips
[docs] def f0_est_weighted_sum_spec(noteStart_s, noteEnd_s, midiNote, freqs, D, sr, useIf=True): """ Calculate F0, power, and spectrum for a single note. Parameters ---------- filename : str Name of the WAV file to analyze. noteStart_s : float Start position (in seconds) of the note to analyze. noteEnd_s : float End position (in seconds) of the note to analyze. midiNote : int MIDI note number of the note to analyze. y : np.ndarray Audio time series data from the WAV file. sr : int Sample rate of the audio signal. useIf : bool, optional If true, use instantaneous frequency; otherwise, use spectrogram frequencies. Defaults to True. Returns ------- f0 : np.ndarray Vector of estimated F0s from `noteStart_s` to `noteEnd_s`. p : np.ndarray Vector of corresponding "powers" derived from the weighted sum of the estimated F0. M : np.ndarray Estimated spectrum for the analyzed note. """ # set window and hop win_s = 0.064 win = round(win_s * sr) hop = round(win / 8) # indices for indexing into ifgram (D) noteStart_hop = int(np.floor(noteStart_s * sr / hop)) noteEnd_hop = int(np.floor(noteEnd_s * sr / hop)) # # Cap noteEnd_hop to the maximum allowed index # noteEnd_hop = min(noteEnd_hop, D.shape[1]) inds = range(noteStart_hop, noteEnd_hop) x = np.abs(D[:, inds])**(1/6) f = np.arange(win/2 + 1) * sr / win if useIf: xf = freqs[:, inds] else: xf = np.tile(f, (x.shape[1], 1)).T f0i = librosa.midi_to_hz(midiNote) fMax = 5000 fThresh = 2 * np.nanmedian(np.diff(xf[:, 0])) f0, _, _ = f0_est_weighted_sum(x, xf, f0i, fMax, fThresh) _, p, partials = f0_est_weighted_sum(x ** 6, xf, f0, sr) M = partials[0] for i in range(1, len(partials)): M += partials[i] t = np.arange(len(inds)) * win_s return f0, p, t, M, xf
# Internal utility function, no documentation
[docs] def trim_silences(nmat_dict, y, sr, rms_thresh_db=-40.0, pad=0.25): """ Remove or clamp note events in a note matrix that fall outside the active audio region, as determined by an RMS energy threshold. Parameters ---------- nmat_dict : pd.DataFrame Note matrix dictionary keyed by part name. y : np.ndarray Audio time series at sample rate ``sr``. sr : int Sample rate of ``y`` in Hz. rms_thresh_db : float, optional RMS energy threshold in dBFS below which frames are considered silent. Default is ``-40.0``. pad : float, optional Reserved for future use. Currently unused. Default is ``0.25``. Returns ------- nmat_dict : pd.DataFrame The input dictionary with each part's DataFrame trimmed in-place to the active audio window. """ # Compute RMS and active time window rms = librosa.feature.rms(y=y)[0] times = librosa.frames_to_time(np.arange(len(rms)), sr=sr) active = rms > librosa.db_to_amplitude(rms_thresh_db) if not np.any(active): raise ValueError("No active audio found above threshold") first_active_idx = np.argmax(active) last_active_idx = len(active) - 1 - np.argmax(active[::-1]) start_trim = times[first_active_idx] end_trim = times[last_active_idx] for part, df in nmat_dict.items(): new_rows = [] for _, row in df.iterrows(): onset = row['ONSET_SEC'] offset = row['OFFSET_SEC'] # Discard completely silent notes if offset < start_trim or onset > end_trim: continue # Clamp only edges new_onset = onset if onset >= start_trim else start_trim new_offset = offset if offset <= end_trim else end_trim if new_offset > new_onset: new_row = row.copy() new_row['ONSET_SEC'] = new_onset new_row['OFFSET_SEC'] = new_offset if 'DURATION' in row: new_row['DURATION'] = new_offset - new_onset new_rows.append(new_row) nmat_dict[part] = pd.DataFrame(new_rows, columns=df.columns) return nmat_dict
# Internal utility function, no documentation # Merges grace notes and odd polyrhythmic values into one voice and orders them
[docs] def merge_grace_notes(nmat, offset=0.025): """ Merge grace-note sub-parts back into their parent voice and resolve any resulting onset overlaps. Parameters ---------- nmat : pd.DataFrame Note matrix dictionary keyed by part name. offset : float, optional Time in seconds added to the ``ONSET_SEC`` and ``OFFSET_SEC`` of every grace-note sub-part before merging. Default is ``0.025``. Returns ------- nmat : dict of str → pd.DataFrame The updated note matrix with all grace-note sub-parts folded into their respective base parts and removed as separate keys. """ base_parts = {} for part in list(nmat.keys()): base = part.split(":")[0] base_parts.setdefault(base, []).append(part) for base, parts in base_parts.items(): if len(parts) == 1: continue # no merging needed dfs = [] for part in parts: df = nmat[part].copy() if part != base: # Apply offset to ONSET_SEC (time in seconds), not ONSET (beats) df['ONSET_SEC'] += offset df['OFFSET_SEC'] += offset dfs.append(df) # Sort by ONSET_SEC (time in seconds) instead of ONSET (beats) merged = pd.concat(dfs).sort_values('ONSET_SEC').copy() # Remove duplicate indices that might cause reindexing issues merged = merged[~merged.index.duplicated(keep='first')] # Recalculate DURATION in beats based on the new ordering # But preserve the original ONSET_SEC and OFFSET_SEC timing # Only update DURATION to match the time-based values merged['DURATION'] = merged['OFFSET_SEC'] - merged['ONSET_SEC'] # Ensure minimum duration to prevent zero-duration notes merged['DURATION'] = merged['DURATION'].clip(lower=0.01) # Minimum 10ms merged['OFFSET_SEC'] = merged['ONSET_SEC'] + merged['DURATION'] # Ensure no overlapping notes by adjusting onset times if necessary merged = merged.sort_values('ONSET_SEC') for i in range(1, len(merged)): prev_offset = merged.iloc[i-1]['OFFSET_SEC'] curr_onset = merged.iloc[i]['ONSET_SEC'] if curr_onset < prev_offset: # Adjust current onset to avoid overlap merged.iloc[i, merged.columns.get_loc('ONSET_SEC')] = prev_offset + 0.001 merged.iloc[i, merged.columns.get_loc('OFFSET_SEC')] = merged.iloc[i]['ONSET_SEC'] + merged.iloc[i]['DURATION'] # Replace in dict nmat[base] = merged for part in parts: if part != base: del nmat[part] return nmat