Source code for pyampact.alignmentUtils

"""
alignmentUtils
==============

.. autosummary::
    :toctree: generated/

    dpcore
    dp
    gh    
    g
    orio_simmx    
    maptimes    
    f0_est_weighted_sum
    f0_est_weighted_sum_spec
    durations_from_midi_ticks
    load_audiofile
    trim_silences
    merge_grace_notes

"""

import wave
import numpy as np
import pandas as pd
import librosa
import mido
import warnings


__all__ = [
    "dpcore",
    "dp",
    "gh",
    "g",
    "orio_simmx",    
    "maptimes",
    "f0_est_weighted_sum",
    "f0_est_weighted_sum_spec",
    "durations_from_midi_ticks",
    "load_audiofile",
    "trim_silences",
    "merge_grace_notes"
]


[docs] def dpcore(M, pen): """Core dynamic programming calculation of best path. M[r,c] is the array of local costs. Create D[r,c] as the array of costs-of-best-paths to r,c, and phi[r,c] as the indicator of the point preceding [r,c] to allow traceback; 0 = (r-1,c-1), 1 = (r,c-1), 2 = (r-1, c) Parameters ---------- M : np.ndarray A 2D array of local costs, where M[r, c] represents the cost at position (r, c). pen : float A penalty value applied for non-diagonal movements in the path. Returns ------- D : np.ndarray A 2D array of cumulative best costs to each point (r,c), starting from (0,0). phi : np.ndarray A 2D array of integers used for traceback, where: - 0 indicates the previous point was (r-1, c-1). - 1 indicates the previous point was (r, c-1). - 2 indicates the previous point was (r-1, c). """ # Pure python equivalent D = np.zeros(M.shape, dtype=float) phi = np.zeros(M.shape, dtype=int) # bottom edge can only come from preceding column D[0, 1:] = M[0, 0]+np.cumsum(M[0, 1:]+pen) phi[0, 1:] = 1 # left edge can only come from preceding row D[1:, 0] = M[0, 0]+np.cumsum(M[1:, 0]+pen) phi[1:, 0] = 2 # initialize bottom left D[0, 0] = M[0, 0] phi[0, 0] = 0 # Calculate the rest recursively for c in range(1, np.shape(M)[1]): for r in range(1, np.shape(M)[0]): best_preceding_costs = [ D[r-1, c-1], pen+D[r, c-1], pen+D[r-1, c]] tb = np.argmin(best_preceding_costs) D[r, c] = best_preceding_costs[tb] + M[r, c] phi[r, c] = tb return D, phi
[docs] def dp(local_costs, penalty=0.1, gutter=0.0, G=0.5): """ Use dynamic programming to find a min-cost path through a matrix of local costs. Parameters ---------- local_costs : np.ndarray A 2D matrix of local costs, where each cell represents the cost associated with that specific position. penalty : float, optional An additional cost incurred for moving in the horizontal or vertical direction (i.e., (0,1) and (1,0) steps). Default is 0.1. gutter : float, optional A proportion of edge length that allows for deviations away from the bottom-left corner (-1,-1) in the optimal path. Default is 0.0, meaning the path must reach the top-right corner. G : float, optional A proportion of the edge length considered for identifying gulleys in the cost matrix. Default is 0.5. Returns ------- p : np.ndarray An array of row indices corresponding to the best path. q : np.ndarray An array of column indices corresponding to the best path. total_costs : np.ndarray A 2D array of minimum costs to reach each cell in the local costs matrix. phi : np.ndarray A traceback matrix indicating the preceding best-path step for each cell, where: - 0 indicates a diagonal predecessor. - 1 indicates the previous column (same row). - 2 indicates the previous row (same column). """ rows, cols = np.shape(local_costs) total_costs = np.zeros((rows + 1, cols + 1), float) total_costs[0, :] = np.inf total_costs[:, 0] = np.inf total_costs[0, 0] = 0 total_costs[1:(rows + 1), 1:(cols + 1)] = local_costs phi = np.zeros((rows, cols), int) for i in range(1, rows + 1): for j in range(1, cols + 1): choices = [total_costs[i - 1, j - 1], total_costs[i - 1, j] + penalty, total_costs[i, j - 1] + penalty] tb = np.argmin(choices) total_costs[i, j] += choices[tb] phi[i - 1, j - 1] = tb + 1 total_costs = total_costs[1:, 1:] phi = phi[1:, 1:] if gutter == 0: i, j = rows - 1, cols - 1 else: best_top_pt = np.argmin(total_costs[-1, :int(G * cols)]) best_right_pt = np.argmin(total_costs[:int(G * rows), -1]) if total_costs[-1, best_top_pt] < total_costs[best_right_pt, -1]: i, j = rows - 1, best_top_pt else: i, j = best_right_pt, cols - 1 p, q = [i], [j] while i > 0 and j > 0: tb = phi[i - 1, j - 1] if tb == 1: i -= 1 j -= 1 elif tb == 2: i -= 1 elif tb == 3: j -= 1 p.insert(0, i) q.insert(0, j) return np.array(p[1:]), np.array(q[1:]), total_costs
[docs] def gh(v1, i1, v2, i2, domain, frac=0.5): """ Get an element that is `frac` fraction of the way between `v1[i1]` and `v2[i2]`, but check bounds on both vectors. `frac` of 0 returns `v1[i1]`, `frac` of 1 returns `v2[i2]`, `frac` of 0.5 (the default) returns halfway between them. Parameters ---------- v1 : np.ndarray A 1D numpy array representing the first vector. i1 : int The index in `v1` from which to retrieve the value. v2 : np.ndarray A 1D numpy array representing the second vector. i2 : int The index in `v2` from which to retrieve the value. domain : tuple A tuple representing the valid bounds for both vectors. This should define the minimum and maximum allowable indices. frac : float, optional A fraction indicating how far between the two specified elements to interpolate. Default is 0.5. Returns ------- float The element that is `frac` fraction of the way between `v1[i1]` and `v2[i2]`, clipped to the specified domain bounds. """ x1 = g(v1, i1, domain) x2 = g(v2, i2, domain) return int(frac * x1 + (1 - frac) * x2)
[docs] def g(vec, idx, domain): """ Get an element from `vec`, checking bounds. `Domain` is the set of points that `vec` is a subset of. Parameters ---------- vec : np.ndarray A 1D numpy array representing the input vector. idx : int The index of the desired element in `vec`. domain : np.ndarray A 1D numpy array representing the set of valid points, of which `vec` is a subset. Returns ------- float The element from `vec` at index `idx` if it is within bounds; otherwise, the first element of `domain` if `idx` is less than 0, or the last element of `domain` if `idx` exceeds the bounds of `vec`. """ if idx < 1: return domain[0] elif idx > len(vec): return domain[-1] else: return vec[idx - 1]
[docs] def orio_simmx(M, D): """ Calculate an Orio&Schwartz-style (Peak Structure Distance) similarity matrix. Parameters ---------- M : np.ndarray A binary mask where each column corresponds to a row in the output similarity matrix S. The mask indicates the presence or absence of MIDI notes or relevant features. D : np.ndarray The regular spectrogram where the columns of the similarity matrix S correspond to the columns of D. This spectrogram represents the audio signal over time and frequency. Returns ------- np.ndarray The similarity matrix S, calculated based on the Peak Structure Distance between the binary mask M and the spectrogram D. """ # Convert to NumPy arrays if input is DataFrame M = M.values if isinstance(M, pd.DataFrame) else M D = D.values if isinstance(D, pd.DataFrame) else D # Calculate the similarities S = np.zeros((M.shape[1], D.shape[1])) D = D**2 M = M**2 nDc = np.sqrt(np.sum(D, axis=0)) nDc = nDc + (nDc == 0) # Evaluate one row at a time with warnings.catch_warnings(): # Suppress imaginary number warnings, for now warnings.filterwarnings("ignore", category=RuntimeWarning) for r in range(M.shape[1]): S[r, :] = np.sqrt(M[:, r] @ D) / nDc return S
[docs] def maptimes(t, intime, outtime): """ Map onset/offset times using linear interpolation from `intime` to `outtime`. Parameters ---------- t : np.ndarray of shape (N, 2) Each row contains [onset, offset]. Returns ------- np.ndarray of shape (N, 2) Mapped onset and offset times. """ t = np.asarray(t) intime = np.asarray(intime) outtime = np.asarray(outtime) original_shape = t.shape t = t.flatten() u = np.empty_like(t, dtype=outtime.dtype) for i, ti in enumerate(t): idx = np.searchsorted(intime, ti, side='right') if idx >= len(outtime): idx = len(outtime) - 1 u[i] = outtime[idx] return u.reshape(original_shape)
# These are the new functions to replace calculate_f0_est
[docs] def f0_est_weighted_sum(x, f, f0i, fMax=20000, fThresh=None): """ Calculate F0, power, and spectrum for an inputted spectral representation. Parameters ---------- x : np.ndarray, shape (F, T) Matrix of complex spectrogram values, where F is the number of frequency bins and T is the number of time frames. f : np.ndarray, shape (F, T) Matrix of frequencies corresponding to each of the spectrogram values in `x`. f0i : np.ndarray, shape (1, T) Initial estimates of F0 for each time frame. This should be a 1D array containing the F0 estimates for each time point. fMax : float, optional Maximum frequency to consider in the weighted sum. Defaults to 5000 Hz. fThresh : float, optional Maximum distance in Hz from each harmonic to consider. If not specified, no threshold will be applied. Returns ------- f0 : np.ndarray Vector of estimated F0s from the beginning to the end of the input time series. p : np.ndarray Vector of corresponding "powers" derived from the weighted sum of the estimated F0. strips : np.ndarray Estimated spectrum for each partial frequency based on the weighted contributions. """ # if ~exist('fMax', 'var') || isempty(fMax), fMax = 5000; end # if ~exist('fThresh', 'var') || isempty(fThresh), fThresh = 2*median(diff(f(:,1))); end # if fMax is None: # fMax = 5000 if fThresh is None: fThresh = 2 * np.nanmedian(np.diff(f[:, 0])) f[np.isnan(f)] = 0 x2 = np.abs(x) ** 2 np.isnan(x2) wNum = np.zeros_like(x2) wDen = np.zeros_like(x2) maxI = np.max(fMax / f0i[f0i > 0]) strips = [] for i in range(1, int(maxI) + 1): mask = np.abs(f - (f0i * i)) < fThresh strip = x2 * mask strips.append(strip) wNum += 1 / i * strip wDen += strip wNum *= (f < fMax) wDen *= (f < fMax) f0 = np.sum(wNum * f, axis=0) / np.sum(wDen, axis=0) pow = np.sum(wDen, axis=0) return f0, pow, strips
[docs] def f0_est_weighted_sum_spec(noteStart_s, noteEnd_s, midiNote, freqs, D, sr, useIf=True): """ Calculate F0, power, and spectrum for a single note. Parameters ---------- filename : str Name of the WAV file to analyze. noteStart_s : float Start position (in seconds) of the note to analyze. noteEnd_s : float End position (in seconds) of the note to analyze. midiNote : int MIDI note number of the note to analyze. y : np.ndarray Audio time series data from the WAV file. sr : int Sample rate of the audio signal. useIf : bool, optional If true, use instantaneous frequency; otherwise, use spectrogram frequencies. Defaults to True. Returns ------- f0 : np.ndarray Vector of estimated F0s from `noteStart_s` to `noteEnd_s`. p : np.ndarray Vector of corresponding "powers" derived from the weighted sum of the estimated F0. M : np.ndarray Estimated spectrum for the analyzed note. """ # set window and hop win_s = 0.064 win = round(win_s * sr) hop = round(win / 8) # indices for indexing into ifgram (D) noteStart_hop = int(np.floor(noteStart_s * sr / hop)) noteEnd_hop = int(np.floor(noteEnd_s * sr / hop)) # # Cap noteEnd_hop to the maximum allowed index # noteEnd_hop = min(noteEnd_hop, D.shape[1]) inds = range(noteStart_hop, noteEnd_hop) x = np.abs(D[:, inds])**(1/6) f = np.arange(win/2 + 1) * sr / win if useIf: xf = freqs[:, inds] else: xf = np.tile(f, (x.shape[1], 1)).T f0i = librosa.midi_to_hz(midiNote) fMax = 5000 fThresh = 2 * np.nanmedian(np.diff(xf[:, 0])) f0, _, _ = f0_est_weighted_sum(x, xf, f0i, fMax, fThresh) _, p, partials = f0_est_weighted_sum(x ** 6, xf, f0, sr) M = partials[0] for i in range(1, len(partials)): M += partials[i] t = np.arange(len(inds)) * win_s return f0, p, t, M, xf
[docs] def durations_from_midi_ticks(filename): """ Extract note durations from a MIDI file using MIDI ticks. This function processes a MIDI file, calculates note onset and offset times based on MIDI ticks and tempo, and returns the duration matrix (nmat). It handles tempo changes and computes times by converting MIDI ticks to seconds. Assumes a default pulses-per-quarter-note (PPQN) value of 96. Parameters ---------- filename : str Path to the MIDI file to be processed. Returns ------- np.ndarray A numpy array where each row contains the start and end times of notes (in seconds) based on MIDI ticks and tempo changes. """ # symbolic.py converts the files beforehand mid = mido.MidiFile(filename) nmat = [] # Set PPQN to 96 ppqn = 96 # Convert ticks per beat to seconds per tick seconds_per_tick = 60 / (500000 / ppqn) # Default tempo in microseconds per quarter note (500000 µs = 120 BPM) current_tempo = 500000 for track in mid.tracks: cum_time = 0 for msg in track: cum_time += msg.time if msg.type == 'set_tempo': current_tempo = msg.tempo # Update seconds per tick based on new tempo seconds_per_tick = current_tempo / (1_000_000 * ppqn) if msg.type == 'note_on' and msg.velocity > 0: note = msg.note velocity = msg.velocity start_time = cum_time * seconds_per_tick nmat.append([start_time, 0]) if msg.type == 'note_off' or (msg.type == 'note_on' and msg.velocity == 0): for event in reversed(nmat): if event[1] == 0 and event[0] <= cum_time * seconds_per_tick: end_time = cum_time * seconds_per_tick event[1] = end_time break # Convert nmat to np.array and return only the 3rd and 4th columns return np.array(nmat)
[docs] def load_audiofile(audio_file, mono_mode="mean"): """ Parameters ---------- audio_file : str The path to the wav file to be loaded. Returns ------- audio_data : The loaded wav file as a numpy array. sr : The original sample rate of the audio file. Load WAV and (optionally) mix to mono while preserving loudness. mono_mode: - "mean": average channels (librosa-style; safest headroom) - do librosa.to_mono() - "equal_power": sum channels and divide by sqrt(C) (~keeps perceived power) """ with wave.open(audio_file, 'rb') as wf: sr = wf.getframerate() n_channels = wf.getnchannels() n_frames = wf.getnframes() sampwidth = wf.getsampwidth() # bytes per sample raw = wf.readframes(n_frames) # ---- decode to float32 in [-1, 1] (16-bit PCM assumed, like your code) ---- if sampwidth != 2: raise ValueError(f"Only 16-bit PCM supported here (got {8*sampwidth}-bit).") x = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0 # shape to (n_frames, n_channels) if multichannel if n_channels > 1: y = librosa.to_mono(x) else: y = x # already mono (shape: (n_frames,)) return y, sr
# Internal utility function, no documentation
[docs] def trim_silences(nmat_dict, y, sr, rms_thresh_db=-40.0, pad=0.25): # Compute RMS and active time window rms = librosa.feature.rms(y=y)[0] times = librosa.frames_to_time(np.arange(len(rms)), sr=sr) active = rms > librosa.db_to_amplitude(rms_thresh_db) if not np.any(active): raise ValueError("No active audio found above threshold") first_active_idx = np.argmax(active) last_active_idx = len(active) - 1 - np.argmax(active[::-1]) start_trim = times[first_active_idx] end_trim = times[last_active_idx] for part, df in nmat_dict.items(): new_rows = [] for _, row in df.iterrows(): onset = row['ONSET_SEC'] offset = row['OFFSET_SEC'] # Discard completely silent notes if offset < start_trim or onset > end_trim: continue # Clamp only edges new_onset = onset if onset >= start_trim else start_trim new_offset = offset if offset <= end_trim else end_trim if new_offset > new_onset: new_row = row.copy() new_row['ONSET_SEC'] = new_onset new_row['OFFSET_SEC'] = new_offset if 'DURATION' in row: new_row['DURATION'] = new_offset - new_onset new_rows.append(new_row) nmat_dict[part] = pd.DataFrame(new_rows, columns=df.columns) return nmat_dict
# Internal utility function, no documentation # Merges grace notes and odd polyrhythmic values into one voice and orders them
[docs] def merge_grace_notes(nmat, offset=0.025): base_parts = {} for part in list(nmat.keys()): base = part.split(":")[0] base_parts.setdefault(base, []).append(part) for base, parts in base_parts.items(): if len(parts) == 1: continue # no merging needed dfs = [] for part in parts: df = nmat[part].copy() if part != base: df['ONSET'] += offset dfs.append(df) merged = pd.concat(dfs).sort_values('ONSET').copy() # Recalculate DURATION merged['DURATION'] = merged['ONSET'].shift(-1) - merged['ONSET'] merged['DURATION'] = merged['DURATION'].fillna(0.25) # Recalculate ONSET_SEC and OFFSET_SEC directly from ONSET and DURATION merged['ONSET_SEC'] = merged['ONSET'] merged['OFFSET_SEC'] = merged['ONSET'] + merged['DURATION'] # Replace in dict nmat[base] = merged for part in parts: if part != base: del nmat[part] return nmat