This commit is contained in:
lzc
2025-07-02 13:54:05 +08:00
commit 4b3870440c
6351 changed files with 282880 additions and 0 deletions

4
data_utils/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
"""
数据处理工具包
包含数据加载和特征提取功能
"""

Binary file not shown.

Binary file not shown.

362
data_utils/data_loader.py Normal file
View File

@@ -0,0 +1,362 @@
"""
数据加载模块,用于读取并处理语音情感数据集
支持CASIA、SAVEE和RAVDESS三种数据集
"""
import os
import glob
import numpy as np
import pandas as pd
import librosa
import re
from sklearn.model_selection import train_test_split
import warnings
# 定义常量
SAMPLE_RATE = 22050 # 统一采样率
MAX_DURATION = 5 # 最大音频长度(秒)
MAX_SAMPLES = SAMPLE_RATE * MAX_DURATION # 最大样本数
# 情绪映射字典
EMOTION_MAPPING = {
# CASIA 情感映射
'angry': 'angry',
'fear': 'fear',
'happy': 'happy',
'neutral': 'neutral',
'sad': 'sad',
'surprise': 'surprise',
# SAVEE 情感映射
'a': 'angry',
'f': 'fear',
'h': 'happy',
'n': 'neutral',
'sa': 'sad',
'su': 'surprise',
'd': 'disgust', # 注意SAVEE有厌恶情绪但CASIA没有
# RAVDESS 情感映射
'01': 'neutral',
'02': 'calm', # 注意RAVDESS有平静情绪但CASIA没有
'03': 'happy',
'04': 'sad',
'05': 'angry',
'06': 'fear',
'07': 'disgust', # 注意RAVDESS有厌恶情绪但CASIA没有
'08': 'surprise'
}
# 语言映射
LANGUAGE_MAPPING = {
'casia': 'zh', # 中文
'savee': 'en', # 英文
'ravdess': 'en' # 英文
}
def load_casia(data_path):
"""
加载CASIA中文情感语音数据集
Args:
data_path: CASIA数据集路径
Returns:
data_list: 包含(音频数据, 情感标签, 语言标签)的列表
"""
data_list = []
# 确保路径存在
if not os.path.exists(data_path):
print(f"警告: CASIA数据路径不存在: {data_path}")
return data_list
# 尝试获取演员列表
try:
actors = os.listdir(data_path)
except Exception as e:
print(f"错误: 无法读取CASIA目录: {e}")
return data_list
success_count = 0
error_count = 0
for actor in actors:
# 跳过隐藏文件或非目录
if actor.startswith('_') or not os.path.isdir(os.path.join(data_path, actor)):
continue
actor_path = os.path.join(data_path, actor)
emotions = os.listdir(actor_path)
for emotion in emotions:
# 跳过隐藏文件或非目录
if emotion.startswith('_') or not os.path.isdir(os.path.join(actor_path, emotion)):
continue
emotion_path = os.path.join(actor_path, emotion)
audio_files = glob.glob(os.path.join(emotion_path, "*.wav"))
for audio_file in audio_files:
# 读取音频文件
try:
audio, sr = librosa.load(audio_file, sr=SAMPLE_RATE, res_type='kaiser_fast')
# 统一音频长度
if len(audio) < MAX_SAMPLES:
# 音频太短用0填充
padding = MAX_SAMPLES - len(audio)
audio = np.pad(audio, (0, padding), 'constant')
else:
# 音频太长,截断
audio = audio[:MAX_SAMPLES]
data_list.append((audio, EMOTION_MAPPING[emotion], LANGUAGE_MAPPING['casia']))
success_count += 1
except Exception as e:
error_count += 1
if error_count < 10: # 只显示前10个错误避免日志过多
print(f"Error loading {audio_file}: {e}")
elif error_count == 10:
print("过多加载错误,后续错误将不再显示...")
print(f"CASIA数据集: 成功加载 {success_count} 个文件,失败 {error_count} 个文件")
return data_list
def load_savee(data_path):
"""
加载SAVEE英文情感语音数据集
Args:
data_path: SAVEE数据集路径
Returns:
data_list: 包含(音频数据, 情感标签, 语言标签)的列表
"""
data_list = []
# 确保路径存在
if not os.path.exists(data_path):
print(f"警告: SAVEE数据路径不存在: {data_path}")
return data_list
audio_path = os.path.join(data_path, "AudioData")
if not os.path.exists(audio_path):
print(f"警告: SAVEE AudioData路径不存在: {audio_path}")
return data_list
# SAVEE数据集中的四个说话者
actors = ['DC', 'JE', 'JK', 'KL']
success_count = 0
error_count = 0
for actor in actors:
actor_path = os.path.join(audio_path, actor)
if not os.path.isdir(actor_path):
print(f"警告: SAVEE演员目录不存在: {actor_path}")
continue
audio_files = glob.glob(os.path.join(actor_path, "*.wav"))
for audio_file in audio_files:
file_name = os.path.basename(audio_file)
# 提取情感标签SAVEE使用文件名的前1-2个字母作为情感标签
if file_name.startswith("sa"):
emotion = "sa"
elif file_name.startswith("su"):
emotion = "su"
else:
emotion = file_name[0]
try:
audio, sr = librosa.load(audio_file, sr=SAMPLE_RATE, res_type='kaiser_fast')
# 统一音频长度
if len(audio) < MAX_SAMPLES:
padding = MAX_SAMPLES - len(audio)
audio = np.pad(audio, (0, padding), 'constant')
else:
audio = audio[:MAX_SAMPLES]
data_list.append((audio, EMOTION_MAPPING[emotion], LANGUAGE_MAPPING['savee']))
success_count += 1
except Exception as e:
error_count += 1
if error_count < 10: # 只显示前10个错误
print(f"Error loading {audio_file}: {e}")
elif error_count == 10:
print("过多加载错误,后续错误将不再显示...")
print(f"SAVEE数据集: 成功加载 {success_count} 个文件,失败 {error_count} 个文件")
return data_list
def load_ravdess(data_path):
"""
加载RAVDESS英文情感语音数据集
Args:
data_path: RAVDESS数据集路径
Returns:
data_list: 包含(音频数据, 情感标签, 语言标签)的列表
"""
data_list = []
# 确保路径存在
if not os.path.exists(data_path):
print(f"警告: RAVDESS数据路径不存在: {data_path}")
return data_list
# 获取所有演员目录
try:
actor_dirs = glob.glob(os.path.join(data_path, "Actor_*"))
except Exception as e:
print(f"错误: 无法获取RAVDESS演员目录: {e}")
return data_list
if not actor_dirs:
print(f"警告: RAVDESS演员目录为空: {data_path}")
success_count = 0
error_count = 0
for actor_dir in actor_dirs:
audio_files = glob.glob(os.path.join(actor_dir, "*.wav"))
for audio_file in audio_files:
file_name = os.path.basename(audio_file)
# RAVDESS文件名格式: 03-01-05-01-02-01-12.wav
# 05 表示情感类别 (angry)
parts = file_name.split('-')
if len(parts) >= 3:
emotion = parts[2]
try:
audio, sr = librosa.load(audio_file, sr=SAMPLE_RATE, res_type='kaiser_fast')
# 统一音频长度
if len(audio) < MAX_SAMPLES:
padding = MAX_SAMPLES - len(audio)
audio = np.pad(audio, (0, padding), 'constant')
else:
audio = audio[:MAX_SAMPLES]
data_list.append((audio, EMOTION_MAPPING[emotion], LANGUAGE_MAPPING['ravdess']))
success_count += 1
except Exception as e:
error_count += 1
if error_count < 10: # 只显示前10个错误
print(f"Error loading {audio_file}: {e}")
elif error_count == 10:
print("过多加载错误,后续错误将不再显示...")
print(f"RAVDESS数据集: 成功加载 {success_count} 个文件,失败 {error_count} 个文件")
return data_list
def load_all_data(casia_path, savee_path, ravdess_path, selected_emotions=None):
"""
加载所有数据集
Args:
casia_path: CASIA数据集路径
savee_path: SAVEE数据集路径
ravdess_path: RAVDESS数据集路径
selected_emotions: 要使用的情感列表如果为None则使用所有共有的情感
Returns:
X: 音频数据列表
y_emotion: 情感标签列表
y_language: 语言标签列表
"""
print("加载CASIA数据集...")
casia_data = load_casia(casia_path)
print("加载SAVEE数据集...")
savee_data = load_savee(savee_path)
print("加载RAVDESS数据集...")
ravdess_data = load_ravdess(ravdess_path)
# 合并所有数据
all_data = casia_data + savee_data + ravdess_data
# 检查是否有数据被加载
if not all_data:
raise ValueError("没有成功加载任何数据!请检查数据路径和文件格式。")
# 如果指定了要使用的情感列表,筛选数据
if selected_emotions:
filtered_data = [item for item in all_data if item[1] in selected_emotions]
if not filtered_data:
print(f"警告: 筛选后没有匹配的情感数据。可用的情感标签: {set(item[1] for item in all_data)}")
print(f"您请求的情感标签: {selected_emotions}")
# 回退到使用所有数据
filtered_data = all_data
all_data = filtered_data
print(f"总共加载了 {len(all_data)} 个有效音频文件")
# 显示各情感类别的数据分布
emotion_counts = {}
for item in all_data:
emotion = item[1]
if emotion in emotion_counts:
emotion_counts[emotion] += 1
else:
emotion_counts[emotion] = 1
print("数据分布:")
for emotion, count in emotion_counts.items():
print(f" {emotion}: {count} 个样本")
# 分离数据、情感标签和语言标签
X = [item[0] for item in all_data]
y_emotion = [item[1] for item in all_data]
y_language = [item[2] for item in all_data]
return X, y_emotion, y_language
def prepare_data(X, y_emotion, y_language, test_size=0.2, val_size=0.2, random_state=42):
"""
准备训练集、验证集和测试集
Args:
X: 音频数据
y_emotion: 情感标签
y_language: 语言标签
test_size: 测试集比例
val_size: 验证集比例
random_state: 随机种子
Returns:
训练集、验证集和测试集数据和标签
"""
# 确保数据不为空
if len(X) == 0:
raise ValueError("数据集为空,无法进行划分!请确保至少加载了一些有效的音频文件。")
# 先划分出测试集
X_train_val, X_test, y_emotion_train_val, y_emotion_test, y_language_train_val, y_language_test = train_test_split(
X, y_emotion, y_language, test_size=test_size, random_state=random_state, stratify=y_emotion
)
# 从剩余数据中划分训练集和验证集
val_ratio = val_size / (1 - test_size)
X_train, X_val, y_emotion_train, y_emotion_val, y_language_train, y_language_val = train_test_split(
X_train_val, y_emotion_train_val, y_language_train_val,
test_size=val_ratio, random_state=random_state, stratify=y_emotion_train_val
)
# 打印数据集大小
print(f"数据集划分: 训练集 {len(X_train)} 个样本, 验证集 {len(X_val)} 个样本, 测试集 {len(X_test)} 个样本")
return (
X_train, y_emotion_train, y_language_train,
X_val, y_emotion_val, y_language_val,
X_test, y_emotion_test, y_language_test
)

View File

@@ -0,0 +1,302 @@
"""
特征提取模块,用于从音频信号中提取声学特征
"""
import numpy as np
import pandas as pd
import librosa
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler
import pickle
import os
def extract_features(audio, sr=22050):
"""
从单个音频信号提取特征(兼容旧函数名)
Args:
audio: 音频信号
sr: 采样率
Returns:
features: 特征字典
"""
return extract_features_single(audio, sr)
def extract_features_single(audio, sr=22050):
"""
从单个音频信号提取特征
Args:
audio: 音频信号
sr: 采样率
Returns:
features: 特征字典
"""
# 确保音频长度统一
max_samples = sr * 5 # 最大5秒
if len(audio) < max_samples:
# 音频太短用0填充
padding = max_samples - len(audio)
audio = np.pad(audio, (0, padding), 'constant')
else:
# 音频太长,截断
audio = audio[:max_samples]
# 初始化特征字典
features = {}
# 提取ZCR过零率
zcr = librosa.feature.zero_crossing_rate(audio)[0]
features['zero_crossing_rate_mean'] = np.mean(zcr)
features['zero_crossing_rate_std'] = np.std(zcr)
features['zero_crossing_rate_max'] = np.max(zcr)
features['zero_crossing_rate_min'] = np.min(zcr)
# 提取RMS均方根能量
rms = librosa.feature.rms(y=audio)[0]
features['rms_mean'] = np.mean(rms)
features['rms_std'] = np.std(rms)
features['rms_max'] = np.max(rms)
features['rms_min'] = np.min(rms)
# 提取MFCC梅尔频率倒谱系数
mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
for i in range(1, 14):
features[f'mfcc_{i}_mean'] = np.mean(mfccs[i-1])
features[f'mfcc_{i}_std'] = np.std(mfccs[i-1])
features[f'mfcc_{i}_max'] = np.max(mfccs[i-1])
features[f'mfcc_{i}_min'] = np.min(mfccs[i-1])
# 提取频谱质心
spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=sr)[0]
features['spectral_centroid_mean'] = np.mean(spectral_centroid)
features['spectral_centroid_std'] = np.std(spectral_centroid)
features['spectral_centroid_max'] = np.max(spectral_centroid)
features['spectral_centroid_min'] = np.min(spectral_centroid)
# 提取频谱带宽
spectral_bandwidth = librosa.feature.spectral_bandwidth(y=audio, sr=sr)[0]
features['spectral_bandwidth_mean'] = np.mean(spectral_bandwidth)
features['spectral_bandwidth_std'] = np.std(spectral_bandwidth)
features['spectral_bandwidth_max'] = np.max(spectral_bandwidth)
features['spectral_bandwidth_min'] = np.min(spectral_bandwidth)
# 提取频谱衰减
spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sr)[0]
features['spectral_rolloff_mean'] = np.mean(spectral_rolloff)
features['spectral_rolloff_std'] = np.std(spectral_rolloff)
features['spectral_rolloff_max'] = np.max(spectral_rolloff)
features['spectral_rolloff_min'] = np.min(spectral_rolloff)
# 提取色度特征
chroma = librosa.feature.chroma_stft(y=audio, sr=sr)
features['chroma_1_mean'] = np.mean(chroma[0])
features['chroma_2_mean'] = np.mean(chroma[1])
features['chroma_3_mean'] = np.mean(chroma[2])
features['chroma_4_mean'] = np.mean(chroma[3])
features['chroma_5_mean'] = np.mean(chroma[4])
# 提取声音谱Mel频谱
mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr)
features['mel_spec_mean'] = np.mean(mel_spec)
features['mel_spec_std'] = np.std(mel_spec)
# 提取对数功率谱
log_power = librosa.amplitude_to_db(mel_spec)
features['log_power_mean'] = np.mean(log_power)
features['log_power_std'] = np.std(log_power)
# 添加统计矩
features['audio_mean'] = np.mean(audio)
features['audio_std'] = np.std(audio)
features['audio_skew'] = np.mean((audio - np.mean(audio))**3) / (np.std(audio)**3)
features['audio_kurtosis'] = np.mean((audio - np.mean(audio))**4) / (np.std(audio)**4) - 3
# 估计音高
pitches, magnitudes = librosa.piptrack(y=audio, sr=sr)
pitches_mean = []
for t in range(pitches.shape[1]):
idx = np.argmax(magnitudes[:, t])
pitch = pitches[idx, t]
if pitch > 0: # 过滤掉静音帧
pitches_mean.append(pitch)
if pitches_mean: # 确保有有效的音高值
features['pitch_mean'] = np.mean(pitches_mean)
features['pitch_std'] = np.std(pitches_mean) if len(pitches_mean) > 1 else 0
features['pitch_max'] = np.max(pitches_mean)
features['pitch_min'] = np.min(pitches_mean) if len(pitches_mean) > 0 else 0
else:
features['pitch_mean'] = 0
features['pitch_std'] = 0
features['pitch_max'] = 0
features['pitch_min'] = 0
# 提取调谐偏差
tuning_offset = librosa.estimate_tuning(y=audio, sr=sr)
features['tuning_offset'] = tuning_offset
# 新增特征 2提取光谱平坦度指标
spectral_flatness = librosa.feature.spectral_flatness(y=audio)[0]
features['spectral_flatness_mean'] = np.mean(spectral_flatness)
features['spectral_flatness_std'] = np.std(spectral_flatness)
features['spectral_flatness_max'] = np.max(spectral_flatness)
features['spectral_flatness_min'] = np.min(spectral_flatness)
# 新增特征 3提取光谱对比度指标
spectral_contrast = librosa.feature.spectral_contrast(y=audio, sr=sr)
# 为每个频带提取统计特征
for i in range(spectral_contrast.shape[0]):
features[f'spectral_contrast_{i+1}_mean'] = np.mean(spectral_contrast[i])
features[f'spectral_contrast_{i+1}_std'] = np.std(spectral_contrast[i])
# 光谱对比度的总体统计
features['spectral_contrast_mean'] = np.mean(spectral_contrast)
features['spectral_contrast_std'] = np.std(spectral_contrast)
# 新增特征 4梅尔频率特征扩展针对图片中提到的梅尔频率
mfcc_delta = librosa.feature.delta(mfccs)
mfcc_delta2 = librosa.feature.delta(mfccs, order=2)
# 添加一阶差分特征
for i in range(1, 14):
features[f'mfcc_{i}_delta_mean'] = np.mean(mfcc_delta[i-1])
features[f'mfcc_{i}_delta_std'] = np.std(mfcc_delta[i-1])
# 添加二阶差分特征
for i in range(1, 14):
features[f'mfcc_{i}_delta2_mean'] = np.mean(mfcc_delta2[i-1])
features[f'mfcc_{i}_delta2_std'] = np.std(mfcc_delta2[i-1])
return features
def extract_features_batch(audio_list):
"""
批量提取音频特征
Args:
audio_list: 音频信号列表
Returns:
features_list: 特征字典列表
"""
features_list = []
for i, audio in enumerate(tqdm(audio_list, desc="提取特征")):
try:
features = extract_features_single(audio)
features_list.append(features)
except Exception as e:
print(f"处理第{i}个音频时出错: {e}")
# 添加空特征字典,避免索引错误
features_list.append({})
return features_list
def features_to_matrix(features_list, feature_names=None):
"""
将特征字典列表转换为特征矩阵
Args:
features_list: 特征字典列表
feature_names: 特征名称列表如果为None则从第一个非空字典中获取
Returns:
X: 特征矩阵
feature_names: 特征名称列表
"""
# 如果没有提供特征名称,从第一个非空字典中获取
if feature_names is None:
for features in features_list:
if features: # 非空字典
feature_names = list(features.keys())
break
if feature_names is None:
raise ValueError("所有特征字典都是空的,无法确定特征名称")
# 创建特征矩阵
X = np.zeros((len(features_list), len(feature_names)))
for i, features in enumerate(features_list):
if not features: # 空字典
# 填充为0或者可以使用平均值等
continue
for j, name in enumerate(feature_names):
if name in features:
X[i, j] = features[name]
return X, feature_names
def normalize_features(X_train, X_val, X_test):
"""
标准化特征
Args:
X_train: 训练集特征矩阵
X_val: 验证集特征矩阵
X_test: 测试集特征矩阵
Returns:
X_train_norm: 标准化后的训练集
X_val_norm: 标准化后的验证集
X_test_norm: 标准化后的测试集
"""
# 初始化标准化器
scaler = StandardScaler()
# 使用训练集拟合标准化器
scaler.fit(X_train)
# 转换所有数据集
X_train_norm = scaler.transform(X_train)
X_val_norm = scaler.transform(X_val)
X_test_norm = scaler.transform(X_test)
# 确保输出目录存在
output_dir = 'output/emotion_model'
os.makedirs(output_dir, exist_ok=True)
# 保存标准化器
with open(os.path.join(output_dir, 'feature_scaler.pkl'), 'wb') as f:
pickle.dump(scaler, f)
return X_train_norm, X_val_norm, X_test_norm
def normalize_features_with_params(X, scaler):
"""
使用给定的缩放参数标准化特征
Args:
X: 特征矩阵
scaler: 已经拟合的标准化器
Returns:
X_norm: 标准化后的特征矩阵
"""
return scaler.transform(X)
def reshape_for_lstm(X):
"""
将特征矩阵重塑为LSTM输入格式
Args:
X: 特征矩阵,或特征矩阵列表
Returns:
X_reshaped: 重塑后的特征矩阵
"""
# 如果输入是列表,转换为数组
if isinstance(X, list):
X = np.array(X)
# 添加时间步维度
if len(X.shape) == 2:
return X.reshape(X.shape[0], 1, X.shape[1])
# 如果已经是3D直接返回
return X