Source code for pyampact.dataCompilation

"""
dataCompilation
===============

.. autosummary::
    :toctree: generated/

    data_compilation
"""

import os
import sys
import numpy as np
import librosa
import librosa.display
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
from matplotlib.patches import Patch
from scipy import signal

from pyampact.performance import estimate_perceptual_parameters
from pyampact.alignmentUtils import f0_est_weighted_sum_spec
from pyampact.symbolic import *

__all__ = [
    "data_compilation",
    "export_selected_columns",
    "visualise_alignment_from_nmat",
    "plot_piano_roll",
]


[docs] def data_compilation( y, original_sr, hop_length, winms, tsr, spec, nmat, piece, audio_file_path, force_pyin=False ): """ Compile per-note perceptual descriptors from an aligned audio-score pair and write the results to disk in the appropriate format (.krn, .mei, or .csv). F0 estimation strategy is selected automatically based on the number of parts in the score: monophonic pieces use pyin directly; polyphonic pieces attempt pitch-separated F0 estimation via the reassigned spectrogram and fall back to pyin on failure. Set ``force_pyin=True`` to override and use pyin for all pieces regardless of polyphony. Parameters ---------- y : ndarray Audio time series at the original sample rate. original_sr : int Sample rate of ``y``. hop_length : int Hop size in samples at ``tsr`` used during alignment (e.g. 32). winms : float Analysis window size in milliseconds (e.g. 100). tsr : int Target sample rate used during alignment (e.g. 4000). spec : ndarray Magnitude spectrogram produced by alignment, shape (freq x frames). nmat : dict Note matrix dict returned by ``run_alignment``, keyed by part name. Each value is a DataFrame with at minimum ONSET_SEC, OFFSET_SEC, and MIDI columns. piece : Score Score object returned by ``run_alignment`` / ``load_score``. audio_file_path : str Path to the source audio file. Used to derive the output folder name (``output_files/output_<stem>/``) and filename stem. force_pyin : bool, optional If True, use pyin F0 estimation for all notes regardless of whether the piece is monophonic or polyphonic. Default is False (auto-detect). Returns ------- nmat : dict The input note matrix dict with perceptual descriptor columns appended to each part DataFrame, including: f0Vals, meanf0, ppitch1, ppitch2, jitter, vibratoDepth, vibratoRate, pwrVals, meanPwr, shimmer, specCentVals, meanSpecCent, specBandwidthVals, meanSpecBandwidth, specContrastVals, meanSpecContrast, specFlatnessVals, meanSpecFlatness, specRolloffVals, meanSpecRolloff. fileOutput : str Path to the primary output file written to disk: a .krn file for Humdrum kern scores, a .csv file for Tony CSV scores, or a .mei file (with companion .csv) for all other formats. Notes ----- Spectral features (centroid, bandwidth, contrast, flatness, rolloff) are precomputed once over the full spectrogram and sliced per note to avoid redundant computation. F0 and RMS power are computed at ``original_sr`` using window and hop sizes derived from the alignment spectrogram dimensions rather than the caller-supplied ``hop_length``, which ensures consistency with the DTW alignment grid. """ # --- Derive output folder from the audio filename --- # e.g. "./test_files/B063_00-01.wav" → "output_files/output_B063_00-01/" audio_stem = os.path.splitext(os.path.basename(audio_file_path))[0] output_dir = os.path.join("output_files", f"output_{audio_stem}") os.makedirs(output_dir, exist_ok=True) # Base path for all output files (no extension — each writer appends its own) output_path = os.path.join(output_dir, audio_stem) all_note_vals = [] all_note_ids = [] # fft_len at tsr (alignment sample rate) fft_len_tsr = int(2 ** np.round(np.log(winms / 1000 * tsr) / np.log(2))) fft_len_tsr = max(256, fft_len_tsr) # Magnitude spectrogram (freq x frames) – produced at tsr by alignment S = np.abs(spec) # Derive the actual hop used by align_midi_wav from the spec dimensions and # the audio duration at tsr. This is robust regardless of what hop_length # value the caller passes in (which is often the run_alignment default of 32, # not the true STFT hop of ~100). n_frames = S.shape[1] n_samples_tsr = int(round(len(y) * tsr / original_sr)) # align_midi_wav uses boundary=None, padded=False: # n_frames = 1 + (n_samples - fft_len) // hop_samp => hop_samp = (n_samples - fft_len) // (n_frames - 1) if n_frames > 1: hop_samp_tsr = max(1, (n_samples_tsr - fft_len_tsr) // (n_frames - 1)) else: hop_samp_tsr = fft_len_tsr # Frame time grid for spec (derived from actual hop at tsr) frame_times = np.arange(n_frames) * hop_samp_tsr / tsr # Convert to original_sr samples for librosa pyin / rms analysis hop_length_orig = max(1, int(round(hop_samp_tsr * original_sr / tsr))) fft_len_orig = max(256, int(round(fft_len_tsr * original_sr / tsr))) # Global f0 (pyin) at original_sr f0_all, _, _ = librosa.pyin( y, fmin=librosa.note_to_hz("C2"), fmax=librosa.note_to_hz("C7"), sr=original_sr, frame_length=fft_len_orig, hop_length=hop_length_orig, ) f0_times = librosa.frames_to_time( np.arange(len(f0_all)), sr=original_sr, hop_length=hop_length_orig, ) # Global power (rms) at original_sr pwr_all = librosa.feature.rms( y=y, frame_length=fft_len_orig, hop_length=hop_length_orig, ).flatten() pwr_times = librosa.frames_to_time( np.arange(len(pwr_all)), sr=original_sr, hop_length=hop_length_orig, ) # Precompute spectral features ONCE over the whole piece # Shapes: # centroid/bandwidth/flatness/rolloff: (1, frames) # contrast: (bands, frames) spec_centroid_all = librosa.feature.spectral_centroid(S=S) spec_bandwidth_all = librosa.feature.spectral_bandwidth(S=S) spec_contrast_all = librosa.feature.spectral_contrast(S=S) spec_flatness_all = librosa.feature.spectral_flatness(S=S) spec_rolloff_all = librosa.feature.spectral_rolloff(S=S) def slice_indices(times, onset, offset): i0 = np.searchsorted(times, onset, side="left") i1 = np.searchsorted(times, offset, side="right") if i1 <= i0: return None return i0, i1 def slice_indices_vec(times, onsets, offsets): i0 = np.searchsorted(times, onsets, side="left") i1 = np.searchsorted(times, offsets, side="right") ok = i1 > i0 return i0, i1, ok def slice_spec_dict(i0, i1): return { "spec_centroid": spec_centroid_all[..., i0:i1], "spec_bandwidth": spec_bandwidth_all[..., i0:i1], "spec_contrast": spec_contrast_all[..., i0:i1], "spec_flatness": spec_flatness_all[..., i0:i1], "spec_rolloff": spec_rolloff_all[..., i0:i1], } # is_single_part: true when there is only one instrument/voice part is_monophonic = len(nmat) == 1 freqs_rs = times_rs = D_rs = None if not is_monophonic: freqs_rs, times_rs, D_rs = librosa.reassigned_spectrogram( y=y, sr=original_sr, hop_length=hop_length_orig, ) for key, df in nmat.items(): onsets = df["ONSET_SEC"].to_numpy(dtype=float) offsets = df["OFFSET_SEC"].to_numpy(dtype=float) midis = df["MIDI"].to_numpy(dtype=float) ids = df.index.to_list() note_vals = [] note_ids = [] # Vectorized index lookup (cuts python overhead) f0_i0, f0_i1, f0_ok = slice_indices_vec(f0_times, onsets, offsets) pwr_i0, pwr_i1, pwr_ok = slice_indices_vec(pwr_times, onsets, offsets) spec_i0, spec_i1, spec_ok = slice_indices_vec(frame_times, onsets, offsets) for i in range(len(df)): if not (f0_ok[i] and pwr_ok[i] and spec_ok[i]): note_vals.append(_nan_note()) note_ids.append(ids[i]) continue onset = onsets[i] offset = offsets[i] midi = midis[i] f0_seg = f0_all[f0_i0[i] : f0_i1[i]] pwr_seg = pwr_all[pwr_i0[i] : pwr_i1[i]] M_dict = slice_spec_dict(spec_i0[i], spec_i1[i]) if is_monophonic or force_pyin == True: note_vals.append( estimate_perceptual_parameters( f0_seg, pwr_seg, M_dict, original_sr, hop_length_orig, 1, ) ) else: # Try the pitch-separated path first; fall back to the global # pyin f0/rms segments when f0_est_weighted_sum_spec fails # (it throws on most notes, leaving descriptors all-NaN). try: f0, pwr, t, _M_unused, xf = f0_est_weighted_sum_spec( onset, offset, midi, freqs_rs, D_rs, original_sr, ) note_vals.append( estimate_perceptual_parameters( f0, pwr, M_dict, original_sr, hop_length_orig, 1, ) ) except Exception: # Fall back to global pyin f0 + rms power for this note note_vals.append( estimate_perceptual_parameters( f0_seg, pwr_seg, M_dict, original_sr, hop_length_orig, 1, ) ) note_ids.append(ids[i]) all_note_vals.append(note_vals) all_note_ids.append(note_ids) loc = 0 for key, df in nmat.items(): vals = all_note_vals[loc] df["f0Vals"] = [v["f0_vals"] for v in vals] df["meanf0"] = [_safe_nanmean(v) for v in df["f0Vals"]] df["ppitch1"] = [v["ppitch"][0] for v in vals] df["ppitch2"] = [v["ppitch"][1] for v in vals] df["jitter"] = [v["jitter"] for v in vals] df["vibratoDepth"] = [v["vibrato_depth"] for v in vals] df["vibratoRate"] = [v["vibrato_rate"] for v in vals] df["pwrVals"] = [v["pwr_vals"] for v in vals] df["meanPwr"] = [_safe_nanmean(v) for v in df["pwrVals"]] df["shimmer"] = [v["shimmer"] for v in vals] df["specCentVals"] = [v["spec_centroid"] for v in vals] df["meanSpecCent"] = [_safe_nanmean(v) for v in df["specCentVals"]] df["specBandwidthVals"] = [v["spec_bandwidth"] for v in vals] df["meanSpecBandwidth"] = [_safe_nanmean(v) for v in df["specBandwidthVals"]] df["specContrastVals"] = [v["spec_contrast"] for v in vals] df["meanSpecContrast"] = [_safe_nanmean(v) for v in df["specContrastVals"]] df["specFlatnessVals"] = [v["spec_flatness"] for v in vals] df["meanSpecFlatness"] = [_safe_nanmean(v) for v in df["specFlatnessVals"]] df["specRolloffVals"] = [v["spec_rolloff"] for v in vals] df["meanSpecRolloff"] = [_safe_nanmean(v) for v in df["specRolloffVals"]] loc += 1 nmat_export = _convert_nmat_for_export(nmat) ext = getattr(piece, "fileExtension", None) if ext == "krn": # Scalar descriptor columns to export as kern analysis spines. # ONSET_SEC / OFFSET_SEC give each note its audio-aligned timing. ANALYSIS_COLS = [ 'ONSET_SEC', 'OFFSET_SEC', 'meanf0', 'ppitch1', 'ppitch2', 'jitter', 'vibratoDepth', 'vibratoRate', 'meanPwr', 'shimmer', 'meanSpecCent', 'meanSpecBandwidth', 'meanSpecContrast', 'meanSpecFlatness', 'meanSpecRolloff', ] # Check for a **harte spine imported from the source kern file. harte_raw = piece._analyses.get('harte', None) if isinstance(harte_raw, list) and len(harte_raw): harte_raw = harte_raw[0] has_harte = ( harte_raw is not None and isinstance(harte_raw, pd.Series) and not harte_raw.empty ) # Build a forward-filled harte lookup dict once (beat_offset → label). if has_harte: harte_clean = harte_raw.sort_index() harte_clean = harte_clean[~harte_clean.index.duplicated(keep='last')].ffill() # Check for a **harm spine (roman numeral analysis) in the source kern file. harm_raw = piece._analyses.get('harm', None) if isinstance(harm_raw, list) and len(harm_raw): harm_raw = harm_raw[0] has_harm = ( harm_raw is not None and isinstance(harm_raw, pd.Series) and not harm_raw.empty ) # Build a forward-filled harm lookup dict once (beat_offset → label). if has_harm: harm_clean = harm_raw.sort_index() harm_clean = harm_clean[~harm_clean.index.duplicated(keep='last')].ffill() part_dfs = [] for part, df_part in nmat.items(): available = [c for c in ANALYSIS_COLS if c in df_part.columns] dfc = df_part[available].copy() # Attach harm (roman numeral) labels by beat offset — prepend so # it appears as the first analysis spine in the output kern file. if has_harm and 'ONSET' in df_part.columns: unique_onsets = pd.Index(df_part['ONSET'].unique()) combined_idx = harm_clean.index.union(unique_onsets) harm_lookup = harm_clean.reindex(combined_idx).ffill() onset_to_harm = harm_lookup.to_dict() dfc['harm'] = df_part['ONSET'].map(onset_to_harm).values dfc = dfc[['harm'] + [c for c in dfc.columns if c != 'harm']] # Attach harte labels by beat offset if has_harte and 'ONSET' in df_part.columns: unique_onsets = pd.Index(df_part['ONSET'].unique()) combined_idx = harte_clean.index.union(unique_onsets) harte_lookup = harte_clean.reindex(combined_idx).ffill() onset_to_harte = harte_lookup.to_dict() dfc['harte'] = df_part['ONSET'].map(onset_to_harte).values # Keep harm first if both spines are present front_cols = [c for c in ['harm', 'harte'] if c in dfc.columns] dfc = dfc[front_cols + [c for c in dfc.columns if c not in front_cols]] # Re-index from XML_ID → global beat-offset if 'ONSET' in df_part.columns: dfc.index = df_part['ONSET'].values # Chord notes share an onset; keep last (highest MIDI = melody note) dfc = dfc[~dfc.index.duplicated(keep='last')] part_dfs.append(dfc) # Merge all parts: union of all beat offsets, first non-NaN wins per cell if part_dfs: combined = part_dfs[0] for other in part_dfs[1:]: combined = combined.combine_first(other) combined = combined.sort_index() else: combined = pd.DataFrame() toKern( piece, path_name=f"{output_path}.krn", include_lyrics=False, include_dynamics=False, analysis_dfs={'analysis': combined} if not combined.empty else None, ) fileOutput = f"{output_path}.krn" elif ext in ("csv", "txt"): # CSV scores (speech/music Tony CSV) — skip MEI entirely, write nmat directly csv_path = f"{output_path}.csv" frames = [] for part_name, df_part in nmat.items(): frames.append(df_part) if frames: pd.concat(frames).to_csv(csv_path) fileOutput = csv_path else: mei_path = f"{output_path}.mei" insertAudioAnalysis( piece, mei_path, nmat_export, mimetype="audio/aiff", target=audio_file_path, ) fileOutput = mei_path # Write a CSV alongside MEI output for non-krn, non-csv formats csv_path = f"{output_path}.csv" _mei_to_csv(mei_path, csv_path) return nmat, fileOutput
# Utility functions for data processing, no documentation def _safe_nanmean(x): if x is None: return np.nan x = np.asarray(x) # Empty array if x.size == 0: return np.nan # All values are NaN if np.isnan(x).all(): return np.nan return np.nanmean(x) def _nan_note(): return { "f0_vals": np.nan, "ppitch": (np.nan, np.nan), "jitter": np.nan, "vibrato_depth": np.nan, "vibrato_rate": np.nan, "pwr_vals": np.nan, "shimmer": np.nan, "spec_centroid": np.nan, "spec_bandwidth": np.nan, "spec_contrast": np.nan, "spec_flatness": np.nan, "spec_rolloff": np.nan, } def _convert_nmat_for_export(nmat): list_columns = [ "f0Vals", "pwrVals", "specCentVals", "specBandwidthVals", "specContrastVals", "specFlatnessVals", "specRolloffVals", ] out = {} for part, df_part in nmat.items(): dfc = df_part.copy() for col in list_columns: if col in dfc.columns: dfc[col] = dfc[col].astype(str) out[part] = dfc return out # ---------- MEI → CSV helpers ---------- _MEI_NS = {"mei": "http://www.music-encoding.org/ns/mei"} _PITCH_NAMES = ['C', 'C#', 'D', 'Eb', 'E', 'F', 'F#', 'G', 'Ab', 'A', 'Bb', 'B'] _CSV_EXPORT_COLS = [ "xmlid", "MIDI", "pitch_from_midi", "ONSET_SEC", "OFFSET_SEC", "meanf0", "ppitch1", "ppitch2", "jitter", "vibratoDepth", "vibratoRate", "meanPwr", "shimmer", "meanSpecCent", "meanSpecBandwidth", "meanSpecContrast", "meanSpecFlatness", "meanSpecRolloff", ] def _midi_to_pitch(midi): import math if midi is None or (isinstance(midi, float) and math.isnan(midi)): return "" midi = int(round(midi)) octave = midi // 12 - 1 return f"{_PITCH_NAMES[midi % 12]}{octave}" def _extract_notes_from_mei(mei_path: str) -> "pd.DataFrame": import xml.etree.ElementTree as ET import json tree = ET.parse(mei_path) root = tree.getroot() rows = [] for when in root.findall(".//mei:performance//mei:when", _MEI_NS): ext = when.find("mei:extData", _MEI_NS) if ext is None or ext.text is None: continue text = ext.text.strip() if text.startswith("<![CDATA["): text = text.replace("<![CDATA[", "").replace("]]>", "").strip() try: data = json.loads(text) except Exception: continue row = dict(data) row["xmlid"] = when.attrib.get("{http://www.w3.org/XML/1998/namespace}id") rows.append(row) if not rows: return pd.DataFrame() return pd.DataFrame(rows) def _mei_to_csv(mei_path: str, csv_path: str) -> None: """Extract note-level descriptors from a pyAMPACT MEI file and write a CSV.""" df = _extract_notes_from_mei(mei_path) if df.empty: print(f" mei_to_csv: no extData found in {mei_path}, CSV not written.") return # Ensure all expected columns exist (fill missing with NA) for col in _CSV_EXPORT_COLS: if col not in df.columns: df[col] = pd.NA df["pitch_from_midi"] = df["MIDI"].apply(_midi_to_pitch) df_out = df[_CSV_EXPORT_COLS].copy() df_out = df_out.dropna(axis=1, how="all") df_out.to_csv(csv_path, index=False)
[docs] def export_selected_columns(nmat, columns, audio_file_path=None, output_path=None): """ Export a user-defined subset of descriptor columns from a note matrix to CSV. Parameters ---------- nmat : pd.DataFrame Note matrix containing selected olumns. columns : list of str Column names to include in the exported CSV. audio_file_path : str, optional Path to the source audio file. When provided, the output is written to ``output_files/output_<stem>/<stem>_selected.csv`` alongside the other pyAMPACT output files for that recording. output_path : str, optional Explicit destination path for the CSV file. Takes precedence over the auto-derived path when both ``audio_file_path`` and ``output_path`` are given. If neither is provided, defaults to ``./output_selected_data.csv``. Returns ------- None """ if audio_file_path is not None: audio_stem = os.path.splitext(os.path.basename(audio_file_path))[0] output_dir = os.path.join("output_files", f"output_{audio_stem}") os.makedirs(output_dir, exist_ok=True) output_path = output_path or os.path.join(output_dir, f"{audio_stem}_selected.csv") else: output_path = output_path or "./output_selected_data.csv" os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) selected = [] if isinstance(nmat, dict): dfs = nmat.values() elif isinstance(nmat, pd.DataFrame): dfs = [nmat] elif isinstance(nmat, np.ndarray) and all(isinstance(x, pd.DataFrame) for x in nmat): dfs = list(nmat) else: raise TypeError(f"Unsupported type for nmat: {type(nmat)}") for df in dfs: df = df.copy() for col in columns: if col not in df.columns: df[col] = pd.NA selected.append(df[columns]) if not selected: return combined = pd.concat(selected, ignore_index=True) combined.to_csv(output_path, index=False)
def midi_to_freq(midi): return 440.0 * (2 ** ((midi - 69) / 12)) enharmonic_map = { "A#": "Bb", "C#": "Db", "D#": "Eb", "F#": "Gb", "G#": "Ab", } def visualise_alignment_from_nmat( nmat_dict, y, original_sr, target_sr, hop_length, winms, audio_file_path, ): # --- Derive output folder from the audio filename --- audio_stem = os.path.splitext(os.path.basename(audio_file_path))[0] output_dir = os.path.join("output_files", f"output_{audio_stem}") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, audio_stem) # --- FFT length computed at ORIGINAL sample rate --- win_sec = winms / 1000.0 fft_len = int(2 ** np.round(np.log(win_sec * original_sr) / np.log(2))) fft_len = max(256, fft_len) # --- hop is already in samples at original_sr --- hop_sec = hop_length / target_sr hop_samp = int(hop_sec * original_sr) noverlap = fft_len - hop_samp freqs, times, Zxx = signal.stft( y, fs=original_sr, window="hamming", nperseg=fft_len, noverlap=noverlap, nfft=fft_len, boundary=None, padded=False, ) D = np.abs(Zxx) D /= (D.max() if D.max() > 0 else 1.0) S = librosa.amplitude_to_db(D, ref=np.max) plt.figure(figsize=(12, 6)) librosa.display.specshow( S, x_coords=times, y_coords=freqs, x_axis="time", y_axis="log", cmap="gray_r", ) plt.title(f"Spectrogram + Notes: {output_path}") base_colors = ["red","blue","green","orange","purple","cyan","magenta","yellow"] color_cycle = (base_colors * ((len(nmat_dict) // len(base_colors)) + 1))[:len(nmat_dict)] legend_elements = [] for idx, (_, notes) in enumerate(nmat_dict.items()): if notes.empty: continue color = color_cycle[idx] legend_elements.append(Patch(facecolor=color, label=f"Part-{idx+1}")) for _, row in notes.iterrows(): if any(k not in row.index for k in ("MIDI","ONSET_SEC","OFFSET_SEC")): continue freq = midi_to_freq(row["MIDI"]) start = row["ONSET_SEC"] end = row["OFFSET_SEC"] plt.fill_between([start, end], freq - 15, freq + 15, color=color, alpha=0.4) plt.ylim(20, original_sr / 2) plt.xlim(0, times[-1]) plt.colorbar(format="%+2.0f dB") plt.legend(handles=legend_elements, loc="upper right") plt.savefig(f"{output_path}.png", dpi=300) plt.close()
[docs] def plot_piano_roll( piece, nmat, audio_file_path, target_sr, hop_length, verbose=False ): """ Build a piano-roll image from aligned note data and save it alongside the other output files for this audio file. The piano roll is painted in audio-seconds (x-axis) vs MIDI pitch (y-axis). Spine annotations present in the score (keys, harm, chord/harte, function) are extracted, remapped from quarter-note offsets to audio seconds, and printed to stdout for inspection — exactly as the original exampleScript did. Parameters ---------- piece : Score The Score object returned by run_alignment / load_score. nmat : dict The aligned note-matrix dict returned by data_compilation. audio_file_path : str Path to the original audio file — used to derive the output folder name. target_sr : int Target sample rate used during alignment (e.g. 4000). hop_length : int Hop size in samples at target_sr used during alignment (e.g. 32). verbose : boolean If true then print the piano roll specs Returns ------- pr : ndarray (128 × n_cols, float32) The raw piano-roll matrix, in case the caller wants to post-process it. audio_axis : pd.Index The time axis (seconds) corresponding to the columns of pr. """ # ── Derive output folder ────────────────────────────────────────────────── audio_stem = os.path.splitext(os.path.basename(audio_file_path))[0] output_dir = os.path.join("output_files", f"output_{audio_stem}") os.makedirs(output_dir, exist_ok=True) # ── Flatten all parts of nmat into one DataFrame ────────────────────────── nmat_symbolic = nmats(piece) all_notes = pd.concat(nmat_symbolic.values()).reset_index(drop=True) all_notes = all_notes.dropna(subset=["MIDI", "ONSET_SEC", "OFFSET_SEC"]) all_notes = all_notes[all_notes["MIDI"] >= 0] if all_notes.empty: print("plot_piano_roll: no valid notes found, skipping.") return None, None # ── Audio time axis ─────────────────────────────────────────────────────── audio_start = all_notes["ONSET_SEC"].min() audio_end = all_notes["OFFSET_SEC"].max() n_cols = int((audio_end - audio_start) * target_sr / hop_length) + 1 col_times = audio_start + np.arange(n_cols) * (hop_length / target_sr) audio_axis = pd.Index(col_times) # ── QN → audio-seconds mapping (from aligned nmat) ─────────────────────── qn_sec = ( all_notes[["ONSET", "ONSET_SEC"]] .dropna() .drop_duplicates("ONSET") .sort_values("ONSET") ) qn_pts = qn_sec["ONSET"].values sec_pts = qn_sec["ONSET_SEC"].values def qn_to_sec(qn_arr): return np.interp(np.asarray(qn_arr, dtype=float), qn_pts, sec_pts) # ── Piano roll matrix (128 × n_cols) ───────────────────────────────────── pr = np.zeros((128, n_cols), dtype=np.float32) for _, row in all_notes.iterrows(): midi = int(row["MIDI"]) i0 = int(np.searchsorted(col_times, float(row["ONSET_SEC"]))) i1 = int(np.searchsorted(col_times, float(row["OFFSET_SEC"]))) i1 = max(i1, i0 + 1) pr[midi, max(i0, 0):min(i1, n_cols)] = 1.0 # ── Convert spine annotations: QN offsets → audio seconds → audio_axis ─── def spine_to_audio(raw): ser = raw[0].copy() if isinstance(raw, list) else raw.copy() if ser.empty: return pd.Series(dtype=object, index=audio_axis) # Strip humdrum dot continuation tokens ser = ser[ser != "."].dropna() if ser.empty: return pd.Series(dtype=object, index=audio_axis) # Remap QN index to audio seconds ser.index = pd.Index(qn_to_sec(ser.index.astype(float))) ser = ser[~ser.index.duplicated(keep="last")].sort_index() # Insert into NaN series covering full audio_axis, then ffill target = pd.Series(np.nan, index=audio_axis, dtype=object) combined = ser.combine_first(target).sort_index().ffill() return combined.reindex(audio_axis) keys_audio = spine_to_audio(piece._analyses.get("keys", pd.Series(dtype=object))) harm_audio = spine_to_audio(piece._analyses.get("harm", [])) chords_audio = spine_to_audio(piece._analyses.get("chord", piece._analyses.get("harte", []))) functions_audio = spine_to_audio(piece._analyses.get("function", [])) if verbose==True: print("keys:"); print(keys_audio) print("\nharm:"); print(harm_audio) print("\nchords:"); print(chords_audio) print("\nfunctions:"); print(functions_audio) # ── Plot ────────────────────────────────────────────────────────────────── title = getattr(piece, "fileName", audio_stem) t_min = float(audio_axis[0]) t_max = float(audio_axis[-1]) fig, ax = plt.subplots(figsize=(12, 5), facecolor="#fafaf7") ax.imshow( pr, aspect="auto", origin="lower", extent=[t_min, t_max, 0, 128], cmap="Blues", interpolation="nearest", vmin=0, vmax=1, ) ax.set_xlim(t_min, t_max) ax.xaxis.set_major_locator(ticker.MaxNLocator(nbins=10)) ax.xaxis.set_major_formatter(ticker.FormatStrFormatter("%.1f")) ax.set_xlabel("Audio time (seconds)", fontsize=9, color="#555") ax.set_ylabel("MIDI pitch", fontsize=9, color="#555") ax.set_title( f"pyAMPACT \u2014 {title}", fontsize=11, fontweight="bold", pad=8, color="#1a1a1a", ) ax.tick_params(labelsize=8, color="#aaa") for sp in ax.spines.values(): sp.set_edgecolor("#ddd") fig.tight_layout() out_path = os.path.join(output_dir, f"{audio_stem}_piano_roll.png") fig.savefig(out_path, dpi=150, bbox_inches="tight") plt.close(fig) return pr, audio_axis