Source code for pyampact.speechDescriptors

"""
speechDescriptors
==============

.. autosummary::
    :toctree: generated/

    get_speech_descriptors
"""

import parselmouth
import librosa
import pandas as pd
import numpy as np

from pyampact.speechDescriptorsUtils import (
    compute_correlation_dimension,
    compute_dfa,
    compute_emd_features,
    compute_mfcc,
    compute_ppe,
    compute_recurrence_period_and_rpde,
    compute_spectral_spread,
    compute_tqwt_features,
    compute_wt_features,
    feature_spectral_flux,
    compute_vot,
    compute_bark_band_energies,
    compute_dpi,
    compute_avqi,
)

__all__ = ["get_speech_descriptors"]


def createDataframe(speech_file):
    df = pd.read_csv(speech_file, header=None, names=["onset_sec", "pitch_est", "duration", "word"])
    return df


[docs] def get_speech_descriptors(audio, df, speaker_type="female"): """ Calculates list of speech descriptors with audio file and CSV provided by the force-align notebook https://github.com/pyampact/pyAMPACTtutorials/blob/main/Tutorial_05_pyAMPACT_ForceAlign.ipynb Parameters ---------- audio : ndarray Audio time series of the speech file df : DataFrame df of onset, pitch_est(Hz), duration, and word of each word in passage, output by force align speaker_type : str Determine the resonance ceiling for formant and CPPS calculation: - male = 5000 - female = 5500 - child = 8000 Returns ------- globals_df : Pandas DataFrame The global inputs and values of the whole audio sample df : Pandas Dataframe All speech descriptors analyzed and output in the process """ # df is a dict of DataFrames — get first part df = list(df.values())[0] df = df[["ONSET_SEC", "OFFSET_SEC", "AVG PITCH IN HZ", "DURATION", "WORD"]] snd = parselmouth.Sound(audio) # Analysis parameters (must match Praat script for primitives) time_step = 0.01 pitch_floor = 120 pitch_ceiling = 600 octave_cost = 0.01 octave_jump_cost = 0.35 voicing_threshold = 0.45 silence_threshold = 0.03 # resonance ceiling (Praat script uses 5500 for DB/female) resonance_ceiling = {"male": 5000, "female": 5500, "child": 8000}.get( speaker_type.lower(), 5500 ) # Global pitch metrics (metadata only) pitch_cc_global = parselmouth.praat.call(snd, "To Pitch...", time_step, pitch_floor, pitch_ceiling) min_pitch_global = parselmouth.praat.call(pitch_cc_global, "Get minimum", 0, 0, "Hertz", "Parabolic") max_pitch_global = parselmouth.praat.call(pitch_cc_global, "Get maximum", 0, 0, "Hertz", "Parabolic") voiced_global = pitch_cc_global.selected_array["frequency"] voiced_global = voiced_global[voiced_global > 0] pitch_range = float(voiced_global.max() - voiced_global.min()) if voiced_global.size else 0.0 # Output columns (keep your existing schema) out_cols = [ "mean_pitch", "pitch_min", "pitch_max", "mean_intensity", "mean_f1", "mean_f2", "mean_f3", "mean_f4", "jitter_local", "jitter_percent", "shimmer_local", "shimmer_percent", "pulse_energy", "cpps", "cpp", "mean_spec_centroid", "mean_spec_flux", "gne_mean", "gne_max", "hnr_mean_db", "nhr_mean_db", "noise_intensity_db", "vfer", "ppe", "vot_ms", "ac_strength", "voicing", "mean_bark", "d2", "dfa_alpha", "pause_durations", "mean_dpi", "emd_n_imfs", "emd_energy_ratio", "emd_entropy", "energy", "mfcc", "rp", "rpde", "mean_spec_spread", "tqwt_e1", "tqwt_e2", "tqwt_e3", "tqwt_e4", "tqwt_e5", "tqwt_entropy", "tqwt_centroid", "tqwt_low_high_ratio", "wt_entropy", "wt_low_high_ratio", ] rows = [] min_required = 6.4 / pitch_floor # 0.053333... for i in df.index: onset = float(df.at[i, "ONSET_SEC"]) offset = float(df.at[i, "OFFSET_SEC"]) dur = offset - onset # default undefined (Praat uses undefined) row = {k: np.nan for k in out_cols} row["pause_durations"] = [] row["mfcc"] = None if not np.isfinite(dur) or dur <= 0 or dur < min_required: rows.append(row) continue # segment with LOCAL time axis [0..dur] seg = snd.extract_part(from_time=onset, to_time=offset, preserve_times=False) sr = int(seg.sampling_frequency) mid = dur / 2.0 # ========= PRIMITIVES (MATCH PRAAT SCRIPT) ========= # Pitch (cc) p = parselmouth.praat.call(seg, "To Pitch...", time_step, pitch_floor, pitch_ceiling) row["pitch_min"] = parselmouth.praat.call(p, "Get minimum", 0, 0, "Hertz", "Parabolic") row["pitch_max"] = parselmouth.praat.call(p, "Get maximum", 0, 0, "Hertz", "Parabolic") # Mean pitch (not in Praat output table, but keep your column) f0 = p.selected_array["frequency"] f0v = f0[f0 > 0] row["mean_pitch"] = float(np.mean(f0v)) if f0v.size else np.nan # Intensity w_int = parselmouth.praat.call(seg, "To Intensity...", pitch_floor, time_step, "no") row["mean_intensity"] = parselmouth.praat.call(w_int, "Get mean", 0, 0, "dB") # PointProcess + jitter/shimmer/pulse energy w_pp = parselmouth.praat.call(seg, "To PointProcess (periodic, cc)", pitch_floor, pitch_ceiling) minPeriod = 1.0 / pitch_ceiling maxPeriod = 1.0 / pitch_floor row["jitter_local"] = parselmouth.praat.call( w_pp, "Get jitter (local)", 0, 0, minPeriod, maxPeriod, 1.3 ) row["jitter_percent"] = row["jitter_local"] * 100.0 if np.isfinite(row["jitter_local"]) else np.nan row["shimmer_local"] = parselmouth.praat.call( [seg, w_pp], "Get shimmer (local)", 0, 0, minPeriod, maxPeriod, 1.3, 1.6 ) row["shimmer_percent"] = row["shimmer_local"] * 100.0 if np.isfinite(row["shimmer_local"]) else np.nan pulse = parselmouth.praat.call(w_pp, "To Sound (pulse train)", 44100, 1.0, 0.05, 2000) row["pulse_energy"] = parselmouth.praat.call(pulse, "Get energy", 0, 0) # Formants (midpoint sample, burg params must match) w_form = parselmouth.praat.call(seg, "To Formant (burg)...", time_step, 5, resonance_ceiling, 0.025, 50) # Keep your mean_f* column names, but fill with Praat midpoint semantics row["mean_f1"] = parselmouth.praat.call(w_form, "Get value at time", 1, mid, "Hertz", "Linear") row["mean_f2"] = parselmouth.praat.call(w_form, "Get value at time", 2, mid, "Hertz", "Linear") row["mean_f3"] = parselmouth.praat.call(w_form, "Get value at time", 3, mid, "Hertz", "Linear") row["mean_f4"] = parselmouth.praat.call(w_form, "Get value at time", 4, mid, "Hertz", "Linear") # CPPS (only if dur >= 0.1) if dur >= 0.1: pcg = parselmouth.praat.call(seg, "To PowerCepstrogram...", pitch_floor, time_step, resonance_ceiling, 50) minQ = 1.0 / pitch_ceiling maxQ = 1.0 / pitch_floor row["cpps"] = parselmouth.praat.call( pcg, "Get CPPS", "yes", 0.02, 0.0005, pitch_floor, pitch_ceiling, 0.05, "parabolic", minQ, maxQ, "Straight", "Robust" ) else: pcg = None # GNE (only if dur >= 0.1 in Praat script) if dur >= 0.1: gneObj = parselmouth.praat.call(seg, "To Harmonicity (gne)...", 500, 4500, 1000, 80) row["gne_max"] = parselmouth.praat.call(gneObj, "Get maximum") gneVals = np.array(gneObj.values, dtype=float) row["gne_mean"] = float(np.nanmean(gneVals)) if gneVals.size else np.nan else: gneVals = np.array([], dtype=float) # HNR / NHR (segment, cc) harm = parselmouth.praat.call(seg, "To Harmonicity (cc)...", time_step, pitch_floor, 0.1, 1.0) row["hnr_mean_db"] = parselmouth.praat.call(harm, "Get mean", 0, 0) row["nhr_mean_db"] = -row["hnr_mean_db"] if np.isfinite(row["hnr_mean_db"]) else np.nan # Voicing (AC pitch mean) — exact Praat params p_ac = parselmouth.praat.call( seg, "To Pitch (ac)", time_step, pitch_floor, 15, "off", silence_threshold, voicing_threshold, octave_cost, octave_jump_cost, 0.14, pitch_ceiling ) row["voicing"] = parselmouth.praat.call(p_ac, "Get mean", 0, 0, "Hertz") # Energy (segment) row["energy"] = parselmouth.praat.call(seg, "Get energy", 0, 0) # CPP (custom calc) only if you built pcg if pcg is not None: pcg_mat = np.array(parselmouth.praat.call(pcg, "To Matrix")) mean_ceps = np.mean(pcg_mat, axis=1) mean_ceps_db = 10 * np.log10(np.maximum(mean_ceps, 1e-12)) n_quef = mean_ceps_db.size quef = np.linspace(0, 1.0 / pitch_floor, n_quef) mask_peak = (quef >= 1.0 / pitch_ceiling) & (quef <= 1.0 / pitch_floor) if np.any(mask_peak): peak_idx = int(np.argmax(mean_ceps_db[mask_peak])) peak_q = float(quef[mask_peak][peak_idx]) peak_val = float(mean_ceps_db[mask_peak][peak_idx]) mask_trend = (quef >= 0.001) & (quef <= 0.02) if np.sum(mask_trend) >= 2: slope, intercept = np.polyfit(quef[mask_trend], mean_ceps_db[mask_trend], 1) row["cpp"] = peak_val - (slope * peak_q + intercept) # Spectral features (librosa) on seg y = seg.values[0].astype(float, copy=False) if y.size: spec_centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0] row["mean_spec_centroid"] = float(np.mean(spec_centroid)) if spec_centroid.size else np.nan S = np.abs(librosa.stft(y, n_fft=2048, hop_length=512)) spec_flux = feature_spectral_flux(S, sr) row["mean_spec_flux"] = float(np.mean(spec_flux)) if np.size(spec_flux) else np.nan row["mean_bark"] = compute_bark_band_energies(S, sr, n_bands=24) freqs = np.linspace(0, sr / 2, S.shape[0]) row["mean_spec_spread"] = compute_spectral_spread(S, freqs) else: S = None # Noise intensity + VFER from HNR (keep your definitions) if np.isfinite(row["hnr_mean_db"]) and np.isfinite(row["mean_intensity"]): hnr_lin = 10 ** (row["hnr_mean_db"] / 10.0) noise_fraction = 1.0 / (1.0 + hnr_lin) row["noise_intensity_db"] = row["mean_intensity"] + 10 * np.log10(noise_fraction) row["vfer"] = hnr_lin / (1.0 + hnr_lin) # PPE (use segment pitch object p on local grid) t_values = np.arange(0.0, dur, time_step) pitch_segment = np.array([p.get_value_at_time(t) for t in t_values], dtype=float) row["ppe"] = compute_ppe(pitch_segment) # VOT (use seg + p) row["vot_ms"] = compute_vot(seg, p, sr) # AC strength (Praat Pitch (ac) confidence) if "strength" in p_ac.selected_array.dtype.names: strength = p_ac.selected_array["strength"].astype(float, copy=False) strength = strength[~np.isnan(strength)] row["ac_strength"] = float(np.mean(strength)) if strength.size else np.nan else: row["ac_strength"] = np.nan # Correlation dimension / DFA / EMD on gneVals (as you had) if gneVals.size: row["d2"] = compute_correlation_dimension(gneVals) row["dfa_alpha"] = compute_dfa(gneVals) row["emd_n_imfs"], row["emd_energy_ratio"], row["emd_entropy"] = compute_emd_features(gneVals) # DPI (use seg + seg intensity + segment pitch p) if dur >= min_required: seg_intensity = seg.to_intensity(minimum_pitch=pitch_floor, time_step=time_step, subtract_mean=False) pause_durations, mean_dpi = compute_dpi( seg, p, seg_intensity, intensity_thresh=row["mean_intensity"] - 10 ) row["pause_durations"] = pause_durations row["mean_dpi"] = mean_dpi # RP / RPDE (use segment point process w_pp, local times) rp, rpde = compute_recurrence_period_and_rpde(w_pp, 0.0, dur) row["rp"] = rp row["rpde"] = rpde # TQWT / WT (keep) if y.size: ( row["tqwt_e1"], row["tqwt_e2"], row["tqwt_e3"], row["tqwt_e4"], row["tqwt_e5"], row["tqwt_entropy"], row["tqwt_centroid"], row["tqwt_low_high_ratio"], ) = compute_tqwt_features(y=y, sr=sr, Q=1.0, r=3.0, J=8, n_keep=5) row["wt_entropy"], row["wt_low_high_ratio"] = compute_wt_features(y) # MFCC optional row["mfcc"] = None # keep as you had (disabled) rows.append(row) # attach results out_df = pd.DataFrame(rows, index=df.index) df[out_cols] = out_df[out_cols] # ---------- REORDER COLUMNS FOR CSV OUTPUT ONLY ---------- COLUMN_ORDER = [ # identity / segmentation "WORD", "ONSET_SEC", "OFFSET_SEC", "DURATION", # pitch & voicing "mean_pitch", "pitch_min", "pitch_max", "voicing", "ac_strength", "ppe", "vot_ms", # intensity & energy "mean_intensity", "pulse_energy", "energy", "noise_intensity_db", "vfer", # perturbation "jitter_local", "jitter_percent", "shimmer_local", "shimmer_percent", # formants "mean_f1", "mean_f2", "mean_f3", "mean_f4", # cepstral / harmonic "cpps", "cpp", "gne_max", "gne_mean", "hnr_mean_db", "nhr_mean_db", # spectral "mean_spec_centroid", "mean_spec_flux", "mean_spec_spread", "mean_bark", # temporal / nonlinear "rp", "rpde", "d2", "dfa_alpha", "pause_durations", "mean_dpi", # decomposition "emd_n_imfs", "emd_energy_ratio", "emd_entropy", # transforms "tqwt_e1", "tqwt_e2", "tqwt_e3", "tqwt_e4", "tqwt_e5", "tqwt_entropy", "tqwt_centroid", "tqwt_low_high_ratio", "wt_entropy", "wt_low_high_ratio", # heavy payloads "mfcc", ] # keep only columns that exist (Praat vs Python compatibility) ordered_cols = [c for c in COLUMN_ORDER if c in df.columns] df = df.loc[:, ordered_cols] globals_df = pd.DataFrame([{ "pitch_floor": pitch_floor, "pitch_ceiling": pitch_ceiling, "pitch_range": pitch_range, "octave_cost": octave_cost, "octave_jump_cost": octave_jump_cost, "voicing_threshold": voicing_threshold, "silence_threshold": silence_threshold, "formant_ceiling": resonance_ceiling, "time_step": time_step, "speaker_type": speaker_type, "global_pitch_min": float(min_pitch_global) if np.isfinite(min_pitch_global) else np.nan, "global_pitch_max": float(max_pitch_global) if np.isfinite(max_pitch_global) else np.nan, }]) return globals_df, df