Source code for pyampact.alignmentUtils

"""
alignmentUtils
==============

.. autosummary::
    :toctree: generated/

    dpcore
    dp
    gh    
    g
    orio_simmx
    simmx
    maptimes    
    f0_est_weighted_sum
    f0_est_weighted_sum_spec
    durations_from_midi_ticks
    load_audiofile

"""

import wave
import numpy as np
import pandas as pd
import librosa
import mido
import warnings


__all__ = [
    "dpcore",
    "dp",
    "gh",
    "g",
    "orio_simmx",
    "simmx",
    "maptimes",
    "f0_est_weighted_sum",
    "f0_est_weighted_sum_spec",
    "durations_from_midi_ticks",
    "load_audiofile"
]


[docs] def dpcore(M, pen): """Core dynamic programming calculation of best path. M[r,c] is the array of local costs. Create D[r,c] as the array of costs-of-best-paths to r,c, and phi[r,c] as the indicator of the point preceding [r,c] to allow traceback; 0 = (r-1,c-1), 1 = (r,c-1), 2 = (r-1, c) Parameters ---------- M : np.ndarray A 2D array of local costs, where M[r, c] represents the cost at position (r, c). pen : float A penalty value applied for non-diagonal movements in the path. Returns ------- D : np.ndarray A 2D array of cumulative best costs to each point (r,c), starting from (0,0). phi : np.ndarray A 2D array of integers used for traceback, where: - 0 indicates the previous point was (r-1, c-1). - 1 indicates the previous point was (r, c-1). - 2 indicates the previous point was (r-1, c). """ # Pure python equivalent D = np.zeros(M.shape, dtype=float) phi = np.zeros(M.shape, dtype=int) # bottom edge can only come from preceding column D[0, 1:] = M[0, 0]+np.cumsum(M[0, 1:]+pen) phi[0, 1:] = 1 # left edge can only come from preceding row D[1:, 0] = M[0, 0]+np.cumsum(M[1:, 0]+pen) phi[1:, 0] = 2 # initialize bottom left D[0, 0] = M[0, 0] phi[0, 0] = 0 # Calculate the rest recursively for c in range(1, np.shape(M)[1]): for r in range(1, np.shape(M)[0]): best_preceding_costs = [ D[r-1, c-1], pen+D[r, c-1], pen+D[r-1, c]] tb = np.argmin(best_preceding_costs) D[r, c] = best_preceding_costs[tb] + M[r, c] phi[r, c] = tb return D, phi
[docs] def dp(local_costs, penalty=0.1, gutter=0.0, G=0.5): """ Use dynamic programming to find a min-cost path through a matrix of local costs. Parameters ---------- local_costs : np.ndarray A 2D matrix of local costs, where each cell represents the cost associated with that specific position. penalty : float, optional An additional cost incurred for moving in the horizontal or vertical direction (i.e., (0,1) and (1,0) steps). Default is 0.1. gutter : float, optional A proportion of edge length that allows for deviations away from the bottom-left corner (-1,-1) in the optimal path. Default is 0.0, meaning the path must reach the top-right corner. G : float, optional A proportion of the edge length considered for identifying gulleys in the cost matrix. Default is 0.5. Returns ------- p : np.ndarray An array of row indices corresponding to the best path. q : np.ndarray An array of column indices corresponding to the best path. total_costs : np.ndarray A 2D array of minimum costs to reach each cell in the local costs matrix. phi : np.ndarray A traceback matrix indicating the preceding best-path step for each cell, where: - 0 indicates a diagonal predecessor. - 1 indicates the previous column (same row). - 2 indicates the previous row (same column). """ rows, cols = np.shape(local_costs) total_costs = np.zeros((rows + 1, cols + 1), float) total_costs[0, :] = np.inf total_costs[:, 0] = np.inf total_costs[0, 0] = 0 total_costs[1:(rows + 1), 1:(cols + 1)] = local_costs phi = np.zeros((rows, cols), int) for i in range(1, rows + 1): for j in range(1, cols + 1): choices = [total_costs[i - 1, j - 1], total_costs[i - 1, j] + penalty, total_costs[i, j - 1] + penalty] tb = np.argmin(choices) total_costs[i, j] += choices[tb] phi[i - 1, j - 1] = tb + 1 total_costs = total_costs[1:, 1:] phi = phi[1:, 1:] if gutter == 0: i, j = rows - 1, cols - 1 else: best_top_pt = np.argmin(total_costs[-1, :int(G * cols)]) best_right_pt = np.argmin(total_costs[:int(G * rows), -1]) if total_costs[-1, best_top_pt] < total_costs[best_right_pt, -1]: i, j = rows - 1, best_top_pt else: i, j = best_right_pt, cols - 1 p, q = [i], [j] while i > 0 and j > 0: tb = phi[i - 1, j - 1] if tb == 1: i -= 1 j -= 1 elif tb == 2: i -= 1 elif tb == 3: j -= 1 p.insert(0, i) q.insert(0, j) return np.array(p[1:]), np.array(q[1:]), total_costs, phi
[docs] def gh(v1, i1, v2, i2, domain, frac=0.5): """ Get an element that is `frac` fraction of the way between `v1[i1]` and `v2[i2]`, but check bounds on both vectors. `frac` of 0 returns `v1[i1]`, `frac` of 1 returns `v2[i2]`, `frac` of 0.5 (the default) returns halfway between them. Parameters ---------- v1 : np.ndarray A 1D numpy array representing the first vector. i1 : int The index in `v1` from which to retrieve the value. v2 : np.ndarray A 1D numpy array representing the second vector. i2 : int The index in `v2` from which to retrieve the value. domain : tuple A tuple representing the valid bounds for both vectors. This should define the minimum and maximum allowable indices. frac : float, optional A fraction indicating how far between the two specified elements to interpolate. Default is 0.5. Returns ------- float The element that is `frac` fraction of the way between `v1[i1]` and `v2[i2]`, clipped to the specified domain bounds. """ x1 = g(v1, i1, domain) x2 = g(v2, i2, domain) return int(frac * x1 + (1 - frac) * x2)
[docs] def g(vec, idx, domain): """ Get an element from `vec`, checking bounds. `Domain` is the set of points that `vec` is a subset of. Parameters ---------- vec : np.ndarray A 1D numpy array representing the input vector. idx : int The index of the desired element in `vec`. domain : np.ndarray A 1D numpy array representing the set of valid points, of which `vec` is a subset. Returns ------- float The element from `vec` at index `idx` if it is within bounds; otherwise, the first element of `domain` if `idx` is less than 0, or the last element of `domain` if `idx` exceeds the bounds of `vec`. """ if idx < 1: return domain[0] elif idx > len(vec): return domain[-1] else: return vec[idx - 1]
[docs] def orio_simmx(M, D): """ Calculate an Orio&Schwartz-style (Peak Structure Distance) similarity matrix. Parameters ---------- M : np.ndarray A binary mask where each column corresponds to a row in the output similarity matrix S. The mask indicates the presence or absence of MIDI notes or relevant features. D : np.ndarray The regular spectrogram where the columns of the similarity matrix S correspond to the columns of D. This spectrogram represents the audio signal over time and frequency. Returns ------- np.ndarray The similarity matrix S, calculated based on the Peak Structure Distance between the binary mask M and the spectrogram D. """ # Convert to NumPy arrays if input is DataFrame M = M.values if isinstance(M, pd.DataFrame) else M D = D.values if isinstance(D, pd.DataFrame) else D # Calculate the similarities S = np.zeros((M.shape[1], D.shape[1])) D = D**2 M = M**2 nDc = np.sqrt(np.sum(D, axis=0)) nDc = nDc + (nDc == 0) # Evaluate one row at a time with warnings.catch_warnings(): # Suppress imaginary number warnings, for now warnings.filterwarnings("ignore", category=RuntimeWarning) for r in range(M.shape[1]): S[r, :] = np.sqrt(M[:, r] @ D) / nDc return S
[docs] def simmx(A, B): """ Calculate a similarity matrix between feature matrices A and B. Parameters ---------- A : np.ndarray The first feature matrix, where each row represents a sample and each column represents a feature. B : np.ndarray, optional The second feature matrix. If not provided, B will be set to A, allowing for self-similarity calculation. Returns ------- np.ndarray The similarity matrix between A and B, where the element at (i, j) represents the similarity between the i-th sample of A and the j-th sample of B. """ A = A.values if isinstance(A, pd.DataFrame) else A B = B.values if isinstance(B, pd.DataFrame) else B EA = np.sqrt(np.sum(A**2, axis=0)) EB = np.sqrt(np.sum(B**2, axis=0)) M = (A.T @ B) / (EA[:, None] @ EB[None, :]) return M
[docs] def maptimes(t, intime, outtime): """ Map the times in t according to the mapping that each point in intime corresponds to that value in outtime. Parameters ---------- t : array-like The input times to be mapped. This can be a list or a numpy array of time values. intime : array-like The reference input time points. Each point corresponds to a value in `outtime`. outtime : array-like The corresponding output time points that map to the values in `intime`. Returns ------- np.ndarray An array of the mapped times, where each time in `t` is replaced by its corresponding value in `outtime` based on the mapping provided by `intime`. If a time in `t` does not have a corresponding `intime`, it will be returned unchanged. """ t = np.array(t) intime = np.array(intime) outtime = np.array(outtime) # Ensure that intime and outtime are sorted sort_idx = np.argsort(intime) intime = intime[sort_idx] outtime = outtime[sort_idx] u = np.interp(t, intime, outtime, left=outtime[0], right=outtime[-1]) u = np.round(u, decimals=3) return u
# These are the new functions to replace calculate_f0_est
[docs] def f0_est_weighted_sum(x, f, f0i, fMax=5000, fThresh=None): """ Calculate F0, power, and spectrum for an inputted spectral representation. Parameters ---------- x : np.ndarray, shape (F, T) Matrix of complex spectrogram values, where F is the number of frequency bins and T is the number of time frames. f : np.ndarray, shape (F, T) Matrix of frequencies corresponding to each of the spectrogram values in `x`. f0i : np.ndarray, shape (1, T) Initial estimates of F0 for each time frame. This should be a 1D array containing the F0 estimates for each time point. fMax : float, optional Maximum frequency to consider in the weighted sum. Defaults to 5000 Hz. fThresh : float, optional Maximum distance in Hz from each harmonic to consider. If not specified, no threshold will be applied. Returns ------- f0 : np.ndarray Vector of estimated F0s from the beginning to the end of the input time series. p : np.ndarray Vector of corresponding "powers" derived from the weighted sum of the estimated F0. strips : np.ndarray Estimated spectrum for each partial frequency based on the weighted contributions. """ # if ~exist('fMax', 'var') || isempty(fMax), fMax = 5000; end # if ~exist('fThresh', 'var') || isempty(fThresh), fThresh = 2*median(diff(f(:,1))); end # if fMax is None: # fMax = 5000 if fThresh is None: fThresh = 2 * np.nanmedian(np.diff(f[:, 0])) f[np.isnan(f)] = 0 x2 = np.abs(x) ** 2 np.isnan(x2) wNum = np.zeros_like(x2) wDen = np.zeros_like(x2) maxI = np.max(fMax / f0i[f0i > 0]) strips = [] for i in range(1, int(maxI) + 1): mask = np.abs(f - (f0i * i)) < fThresh strip = x2 * mask strips.append(strip) wNum += 1 / i * strip # .toarray() wDen += strip # .toarray() wNum *= (f < fMax) wDen *= (f < fMax) f0 = np.sum(wNum * f, axis=0) / np.sum(wDen, axis=0) pow = np.sum(wDen, axis=0) return f0, pow, strips
[docs] def f0_est_weighted_sum_spec(filename, noteStart_s, noteEnd_s, midiNote, y, sr, useIf=True): """ Calculate F0, power, and spectrum for a single note. Parameters ---------- filename : str Name of the WAV file to analyze. noteStart_s : float Start position (in seconds) of the note to analyze. noteEnd_s : float End position (in seconds) of the note to analyze. midiNote : int MIDI note number of the note to analyze. y : np.ndarray Audio time series data from the WAV file. sr : int Sample rate of the audio signal. useIf : bool, optional If true, use instantaneous frequency; otherwise, use spectrogram frequencies. Defaults to True. Returns ------- f0 : np.ndarray Vector of estimated F0s from `noteStart_s` to `noteEnd_s`. p : np.ndarray Vector of corresponding "powers" derived from the weighted sum of the estimated F0. M : np.ndarray Estimated spectrum for the analyzed note. """ # set window and hop win_s = 0.064 win = round(win_s * sr) hop = round(win / 8) # load if gram freqs, times, D = librosa.reassigned_spectrogram( y=y, sr=sr, hop_length=hop) # indices for indexing into ifgram (D) noteStart_hop = int(np.floor(noteStart_s * sr / hop)) noteEnd_hop = int(np.floor(noteEnd_s * sr / hop)) inds = range(noteStart_hop, noteEnd_hop) x = np.abs(D[:, inds])**(1/6) f = np.arange(win/2 + 1) * sr / win if useIf: xf = freqs[:, inds] else: xf = np.tile(f, (x.shape[1], 1)).T f0i = librosa.midi_to_hz(midiNote) fMax = 5000 fThresh = 2 * np.nanmedian(np.diff(xf[:, 0])) f0, _, _ = f0_est_weighted_sum(x, xf, f0i, fMax, fThresh) _, p, partials = f0_est_weighted_sum(x ** 6, xf, f0, sr) M = partials[0] for i in range(1, len(partials)): M += partials[i] t = np.arange(len(inds)) * win_s return f0, p, t, M, xf
[docs] def durations_from_midi_ticks(filename): """ Extract note durations from a MIDI file using MIDI ticks. This function processes a MIDI file, calculates note onset and offset times based on MIDI ticks and tempo, and returns the duration matrix (nmat). It handles tempo changes and computes times by converting MIDI ticks to seconds. Assumes a default pulses-per-quarter-note (PPQN) value of 96. Parameters ---------- filename : str Path to the MIDI file to be processed. Returns ------- np.ndarray A numpy array where each row contains the start and end times of notes (in seconds) based on MIDI ticks and tempo changes. """ # symbolic.py converts the files beforehand mid = mido.MidiFile(filename) nmat = [] # Set PPQN to 96 ppqn = 96 # Convert ticks per beat to seconds per tick seconds_per_tick = 60 / (500000 / ppqn) # Default tempo in microseconds per quarter note (500000 µs = 120 BPM) current_tempo = 500000 for track in mid.tracks: cum_time = 0 for msg in track: cum_time += msg.time if msg.type == 'set_tempo': current_tempo = msg.tempo # Update seconds per tick based on new tempo seconds_per_tick = current_tempo / (1_000_000 * ppqn) if msg.type == 'note_on' and msg.velocity > 0: note = msg.note velocity = msg.velocity start_time = cum_time * seconds_per_tick nmat.append([start_time, 0]) if msg.type == 'note_off' or (msg.type == 'note_on' and msg.velocity == 0): for event in reversed(nmat): if event[1] == 0 and event[0] <= cum_time * seconds_per_tick: end_time = cum_time * seconds_per_tick event[1] = end_time break # Convert nmat to np.array and return only the 3rd and 4th columns return np.array(nmat)
[docs] def load_audiofile(audio_file): """ Loads file for analysis. Currently only the WAV file format is allowed. AIFF, MP3, etc. must be converted prior. Parameters ---------- audio_file : str The path to the wav file to be loaded. Returns ------- audio_data : The loaded wav file as a numpy array. sr : The original sample rate of the audio file. """ with wave.open(audio_file, 'rb') as wf: sr = wf.getframerate() n_channels = wf.getnchannels() n_frames = wf.getnframes() # Read the raw audio data raw_audio = wf.readframes(n_frames) # Convert raw audio to numpy array audio_data = np.frombuffer(raw_audio, dtype=np.int16) # If stereo, reshape to (n_frames, n_channels) if n_channels > 1: audio_data = audio_data.reshape(-1, n_channels) # Flatten to a single channel if needed audio_data = audio_data.flatten() return audio_data, sr