声音信号的基频检测(python版本)

news2025/6/8 10:20:12

import math
import wave
import array
import functools
from abc import ABC, abstractmethod
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import os
import sys

# ====================== 设计模式部分 ======================
class PreprocessStrategy(ABC):
    """预处理策略基类"""
    @abstractmethod
    def process(self, signal):
        pass

class FrameStrategy(ABC):
    """分帧策略基类"""
    @abstractmethod
    def frame(self, signal, sample_rate):
        pass

class PitchDetector(ABC):
    """基频检测器基类"""
    @abstractmethod
    def detect(self, frames, sample_rate):
        pass

class Visualizer(ABC):
    """可视化器基类"""
    @abstractmethod
    def visualize(self, analysis_data):
        pass

class ProcessorFactory:
    """处理器工厂(工厂模式)"""
    @staticmethod
    def create_preprocessor(strategy_type):
        if strategy_type == "preemphasis":
            return PreEmphasisStrategy()
        raise ValueError("未知的预处理策略")

    @staticmethod
    def create_framer(strategy_type):
        if strategy_type == "fixed":
            return FixedFrameStrategy()
        raise ValueError("未知的分帧策略")

    @staticmethod
    def create_detector(strategy_type):
        if strategy_type == "autocorrelation":
            return AutoCorrelationDetector()
        elif strategy_type == "cepstrum":
            return CepstrumDetector()
        raise ValueError("未知的检测策略")
    
    @staticmethod
    def create_visualizer(strategy_type):
        if strategy_type == "matplotlib":
            return MatplotlibVisualizer()
        elif strategy_type == "text":
            return TextVisualizer()
        raise ValueError("未知的可视化策略")

# ====================== 信号处理部分 ======================
class PreEmphasisStrategy(PreprocessStrategy):
    """预加重策略(提升高频分量)"""
    def process(self, signal):
        return [signal[i] - 0.97 * signal[i-1] for i in range(1, len(signal))]

class FixedFrameStrategy(FrameStrategy):
    """固定分帧策略(策略模式)"""
    def __init__(self, frame_ms=25, overlap_ratio=0.5):
        self.frame_ms = frame_ms
        self.overlap_ratio = overlap_ratio
    
    def frame(self, signal, sample_rate):
        frame_length = int(sample_rate * self.frame_ms / 1000)
        step = int(frame_length * (1 - self.overlap_ratio))
        frames = []
        for start in range(0, len(signal) - frame_length, step):
            frames.append(signal[start:start+frame_length])
        return frames

class AutoCorrelationDetector(PitchDetector):
    """自相关基频检测(核心算法)"""
    def detect(self, frames, sample_rate):
        pitches = []
        acf_values = []  # 存储每帧的自相关值用于可视化
        
        for frame in frames:
            # 计算自相关函数
            acf = self._autocorrelation(frame)
            acf_values.append(acf)
            # 寻找基频峰值
            pitch = self._find_pitch(acf, sample_rate)
            pitches.append(pitch)
            
        return pitches, acf_values  # 返回基频和自相关值

    def _autocorrelation(self, frame):
        n = len(frame)
        acf = []
        for lag in range(n//2):  # 只计算一半延迟
            total = 0.0
            for i in range(n - lag):
                total += frame[i] * frame[i + lag]
            acf.append(total)
        return acf

    def _find_pitch(self, acf, sample_rate):
        # 忽略前10个样本(避免直流分量影响)
        if len(acf) < 20:  # 确保有足够的数据点
            return 0
            
        search_range = acf[10:]
        if not search_range:
            return 0
        
        # 寻找最大峰值位置
        max_index = search_range.index(max(search_range)) + 10
        
        # 二次插值提高精度
        if 1 < max_index < len(acf)-1:
            alpha = acf[max_index-1]
            beta = acf[max_index]
            gamma = acf[max_index+1]
            
            # 避免除以零
            denominator = alpha - 2*beta + gamma
            if abs(denominator) < 1e-10:  # 防止分母接近零
                peak_index = max_index
            else:
                delta = 0.5 * (alpha - gamma) / denominator
                peak_index = max_index + delta
        else:
            peak_index = max_index
        
        # 计算基频
        if peak_index > 0:
            return sample_rate / peak_index
        return 0

class CepstrumDetector(PitchDetector):
    """倒谱法基频检测(备选策略)"""
    def detect(self, frames, sample_rate):
        # 实现类似自相关法,使用倒谱峰值检测
        # 此处省略具体实现
        return [0] * len(frames), [[]] * len(frames)

# ====================== 可视化部分 ======================
class MatplotlibVisualizer(Visualizer):
    """使用matplotlib进行可视化(需要matplotlib库)"""
    def __init__(self):
        # 解决中文显示问题
        self._configure_matplotlib()
    
    def _configure_matplotlib(self):
        """配置matplotlib以支持中文显示"""
        plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans', 'Arial Unicode MS', 'sans-serif']
        plt.rcParams['axes.unicode_minus'] = False
    
    def visualize(self, analysis_data):
        # 解包分析数据
        original_signal = analysis_data['original_signal']
        processed_signal = analysis_data['processed_signal']
        frames = analysis_data['frames']
        windowed_frames = analysis_data['windowed_frames']
        pitches = analysis_data['pitches']
        acf_values = analysis_data['acf_values']
        sample_rate = analysis_data['sample_rate']
        
        # 创建画布
        fig = plt.figure(figsize=(15, 12))
        gs = GridSpec(4, 2, figure=fig)
        
        # 1. 原始信号和预处理信号对比
        ax1 = plt.subplot(gs[0, :])
        time_original = [i / sample_rate for i in range(len(original_signal))]
        time_processed = [i / sample_rate for i in range(len(processed_signal))]
        ax1.plot(time_original, original_signal, label='原始信号')
        ax1.plot(time_processed, processed_signal, label='预处理后信号', alpha=0.7)
        ax1.set_title('原始信号 vs 预处理信号')
        ax1.set_xlabel('时间 (秒)')
        ax1.set_ylabel('振幅')
        ax1.legend()
        ax1.grid(True)
        
        # 2. 第一帧的原始和加窗信号
        ax2 = plt.subplot(gs[1, 0])
        frame_index = 0
        frame_time = [i / sample_rate * 1000 for i in range(len(frames[frame_index]))]  # 毫秒
        ax2.plot(frame_time, frames[frame_index], label='原始帧')
        ax2.plot(frame_time, windowed_frames[frame_index], label='加窗帧')
        ax2.set_title(f'第{frame_index+1}帧信号 (原始 vs 加窗)')
        ax2.set_xlabel('时间 (毫秒)')
        ax2.set_ylabel('振幅')
        ax2.legend()
        ax2.grid(True)
        
        # 3. 第一帧的自相关函数
        ax3 = plt.subplot(gs[1, 1])
        lags = [i * 1000 / sample_rate for i in range(len(acf_values[frame_index]))]  # 毫秒
        ax3.plot(lags, acf_values[frame_index])
        ax3.set_title(f'第{frame_index+1}帧的自相关函数')
        ax3.set_xlabel('延迟 (毫秒)')
        ax3.set_ylabel('自相关值')
        ax3.grid(True)
        
        # 标记基音周期
        pitch = pitches[frame_index]
        if pitch > 0:
            period = 1000 / pitch  # 周期(毫秒)
            ax3.axvline(period, color='r', linestyle='--', 
                         label=f'基音周期: {period:.2f}ms')
            ax3.legend()
        
        # 4. 基频检测结果
        ax4 = plt.subplot(gs[2, :])
        frame_times = [i * (len(frames[0]) * (1-0.5) / sample_rate) 
                      for i in range(len(pitches))]  # 帧中心时间
        ax4.plot(frame_times, pitches)
        ax4.set_title('基频检测结果')
        ax4.set_xlabel('时间 (秒)')
        ax4.set_ylabel('频率 (Hz)')
        ax4.grid(True)
        
        # 5. 频谱图
        ax5 = plt.subplot(gs[3, 0])
        frame_to_show = windowed_frames[frame_index]
        n = len(frame_to_show)
        # 使用DFT计算频谱
        spectrum = [abs(self._dft(frame_to_show, k)) for k in range(n//2)]
        freqs = [k * sample_rate / n for k in range(n//2)]
        ax5.plot(freqs, spectrum)
        ax5.set_title(f'第{frame_index+1}帧频谱')
        ax5.set_xlabel('频率 (Hz)')
        ax5.set_ylabel('幅度')
        ax5.set_xlim(0, 2000)
        ax5.grid(True)
        
        # 标记基频和谐波
        if pitch > 0:
            ax5.axvline(pitch, color='r', linestyle='--', label=f'基频: {pitch:.1f}Hz')
            # 标记前5个谐波
            for i in range(2, 6):
                harmonic = i * pitch
                if harmonic < 2000:
                    ax5.axvline(harmonic, color='g', linestyle=':', 
                                label=f'{i}次谐波' if i==2 else None)
            ax5.legend()
        
        # 6. 原始信号频谱图
        ax6 = plt.subplot(gs[3, 1])
        # 对整段信号进行DFT
        full_spectrum = [abs(self._dft(original_signal, k)) for k in range(len(original_signal)//2)]
        full_freqs = [k * sample_rate / len(original_signal) for k in range(len(original_signal)//2)]
        ax6.plot(full_freqs, full_spectrum)
        ax6.set_title('整段信号频谱')
        ax6.set_xlabel('频率 (Hz)')
        ax6.set_ylabel('幅度')
        ax6.set_xlim(0, 2000)
        ax6.grid(True)
        
        plt.tight_layout()
        plt.show()
    
    def _dft(self, x, k):
        """离散傅里叶变换 (不使用第三方库)"""
        N = len(x)
        real = 0.0
        imag = 0.0
        for n in range(N):
            angle = 2 * math.pi * k * n / N
            real += x[n] * math.cos(angle)
            imag -= x[n] * math.sin(angle)
        return math.sqrt(real*real + imag*imag) / N

class TextVisualizer(Visualizer):
    """文本可视化器(用于无图形界面环境)"""
    def visualize(self, analysis_data):
        pitches = analysis_data['pitches']
        print("\n基频检测结果摘要:")
        print(f"分析帧数: {len(pitches)}")
        
        # 计算有效基频统计
        valid_pitches = [p for p in pitches if 50 < p < 800]
        if valid_pitches:
            avg_pitch = sum(valid_pitches) / len(valid_pitches)
            min_pitch = min(valid_pitches)
            max_pitch = max(valid_pitches)
            print(f"平均基频: {avg_pitch:.2f} Hz")
            print(f"最小基频: {min_pitch:.2f} Hz")
            print(f"最大基频: {max_pitch:.2f} Hz")
        else:
            print("未检测到有效基频")
        
        # 打印前5帧的基频
        print("\n前5帧基频值:")
        for i, pitch in enumerate(pitches[:5]):
            print(f"帧 {i+1}: {pitch:.2f} Hz")

# ====================== 工具函数 ======================
def normalize_signal(signal):
    """信号归一化到[-1, 1]范围"""
    max_val = max(abs(x) for x in signal)
    return [x / max_val for x in signal] if max_val > 0 else signal

def hamming_window(frame):
    """应用汉明窗减少频谱泄露"""
    N = len(frame)
    return [frame[i] * (0.54 - 0.46 * math.cos(2 * math.pi * i / (N - 1))) 
            for i in range(N)]

# ====================== 核心处理流程 ======================
class PitchAnalyzer:
    """基频分析主流程(门面模式)"""
    def __init__(self, config):
        self.preprocessor = ProcessorFactory.create_preprocessor(config["preprocess"])
        self.framer = ProcessorFactory.create_framer(config["frame"])
        self.detector = ProcessorFactory.create_detector(config["detect"])
        self.visualizer = ProcessorFactory.create_visualizer(config.get("visualize", "text"))
        self.config = config

    def analyze(self, signal, sample_rate, visualize=False):
        # 保存原始信号用于可视化
        original_signal = signal.copy()
        
        # 1. 预处理
        processed = self.preprocessor.process(signal)
        
        # 2. 分帧
        frames = self.framer.frame(processed, sample_rate)
        
        # 3. 加窗处理
        windowed_frames = [hamming_window(frame) for frame in frames]
        
        # 4. 基频检测
        pitches, acf_values = self.detector.detect(windowed_frames, sample_rate)
        
        # 准备可视化数据
        analysis_data = {
            'original_signal': original_signal,
            'processed_signal': processed,
            'frames': frames,
            'windowed_frames': windowed_frames,
            'pitches': pitches,
            'acf_values': acf_values,
            'sample_rate': sample_rate
        }
        
        # 5. 可视化
        if visualize:
            self.visualizer.visualize(analysis_data)
        
        return pitches, analysis_data

# ====================== 文件处理 ======================
def read_wav_file(filename):
    """读取WAV文件返回信号和采样率"""
    with wave.open(filename, 'rb') as wav:
        n_frames = wav.getnframes()
        sample_rate = wav.getframerate()
        data = wav.readframes(n_frames)
        # 将字节数据转换为浮点数
        samples = array.array('h', data)
        return [s / 32768.0 for s in samples], sample_rate

# ====================== 测试用例 ======================
def test_pitch_analysis(show_plot=True):
    """测试基频检测功能"""
    # 生成440Hz正弦波(A4标准音)
    sample_rate = 44100
    duration = 0.5  # 0.5秒
    freq = 440.0
    t = [i / sample_rate for i in range(int(sample_rate * duration))]
    signal = [math.sin(2 * math.pi * freq * t_i) for t_i in t]
    
    # 添加噪声模拟真实环境
    signal = [s + 0.1 * math.sin(2 * math.pi * 3000 * t_i) for s, t_i in zip(signal, t)]
    
    # 配置分析器
    config = {
        "preprocess": "preemphasis",
        "frame": "fixed",
        "detect": "autocorrelation",
        "visualize": "matplotlib" if show_plot else "text"
    }
    analyzer = PitchAnalyzer(config)
    
    # 执行分析并可视化
    pitches, analysis_data = analyzer.analyze(signal, sample_rate, visualize=show_plot)
    
    # 验证结果(取有效帧的平均值)
    valid_pitches = [p for p in pitches if 50 < p < 800]
    if valid_pitches:
        avg_pitch = sum(valid_pitches) / len(valid_pitches)
        # 允许±5Hz误差
        assert 435 < avg_pitch < 445, f"检测失败:期望440Hz,得到{avg_pitch:.2f}Hz"
        print(f"测试通过!检测基频:{avg_pitch:.2f}Hz (期望440Hz)")
    else:
        print("测试失败:未检测到有效基频")

def test_zero_division():
    """测试除以零异常处理"""
    # 创建一个全零信号,会触发除以零异常
    sample_rate = 44100
    signal = [0.0] * 1000  # 1秒的静音
    
    # 配置分析器
    config = {
        "preprocess": "preemphasis",
        "frame": "fixed",
        "detect": "autocorrelation",
        "visualize": "text"
    }
    analyzer = PitchAnalyzer(config)
    
    # 执行分析
    pitches, _ = analyzer.analyze(signal, sample_rate)
    
    # 验证结果应为0或接近0
    assert all(p < 1e-6 for p in pitches), "全零信号处理失败"
    print("除以零测试通过!")

def test_real_audio(filename=r"E:\temp\sample.wav"):
    """测试真实音频文件"""
    try:
        signal, sample_rate = read_wav_file(filename)
    except FileNotFoundError:
        print(f"文件 {filename} 未找到,使用测试信号代替")
        test_pitch_analysis(show_plot=True)
        return
    
    # 只取前1秒音频
    if len(signal) > sample_rate:
        signal = signal[:sample_rate]
    
    # 配置分析器
    config = {
        "preprocess": "preemphasis",
        "frame": "fixed",
        "detect": "autocorrelation",
        "visualize": "matplotlib"
    }
    analyzer = PitchAnalyzer(config)
    
    # 执行分析并可视化
    analyzer.analyze(signal, sample_rate, visualize=True)

if __name__ == "__main__":
    #print("1. 测试合成信号")
    #test_pitch_analysis(show_plot=True)
    
    #print("\n2. 测试除以零处理")
    #test_zero_division()
    
    print("\n3. 测试真实音频(需要sample.wav文件)")
    test_real_audio()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2404015.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32 控制12VRGB灯带颜色亮度调节,TFTLCD显示

接了一个同学的小项目&#xff0c;要实现控制一个实体&#xff0c;控制灯带的亮度为红/绿/蓝/白/黄以及亮度的叠加。 时间要的比较急&#xff0c;要两天实现&#xff0c;因此不能打板&#xff0c;只能采用现有模块拼接。 一. 实施方案 一开始觉得很简单&#xff0c;就是使用五…

【JJ斗地主-注册安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞 …

《绩效管理》要点总结与分享

目录 绩效管理与目标设定 绩效管理的循环&#xff1a;PDCA 绩效目标的设定要点 绩效设定的工具&#xff1a;SMART法则 绩效跟踪与评估 刻板印象&#xff1a;STAR法 晕轮效应&#xff1a;对比评价法 近因效应&#xff1a;关键事项评估表 绩效面谈 面谈前准备工作 汉堡…

Microsoft前后端不分离编程新风向:cshtml

文章目录 什么是CSHTML&#xff1f;基础语法内联表达式代码块控制结构 布局页面_ViewStart.cshtml_Layout.cshtml使用布局 模型绑定强类型视图模型集合 HTML辅助方法基本表单验证 局部视图创建局部视图使用局部视图 高级特性视图组件依赖注入Tag Helpers 性能优化缓存捆绑和压缩…

【评测】用Flux的图片文本修改的PS效果

【评测】Flux的图片文本修改的PS效果 1. 百度图库找一张有英文的图片 2. 打开https://playground.bfl.ai/image/edit上传图片 3. 输入提示词 “change brarfant to goodbeer” 图片的文字被修改了

数据库管理-第334期 Oracle Database 23ai测试版RAC部署文档(20250607)

数据库管理334期 2024-06-07 数据库管理-第334期 Oracle Database 23ai测试版RAC部署文档&#xff08;20240607&#xff09;1 环境与安装介质2 操作标准系统配置2.1 关闭防火墙2.2 关闭SELinux2.3 关闭avahi-daemon2.4 时间同步配置 3 存储服务器配置3.1 配置本地yum源3.2 安装…

AI生成的基于html+marked.js实现的Markdown转html工具,离线使用,可实时预览 [

有一个markdown格式的文档&#xff0c;手头只有notepad的MarkdownPanel插件可以预览&#xff0c;但是只能预览&#xff0c;不能直接转换为html文件下载&#xff0c;直接复制预览的内效果又不太好&#xff0c;度娘也能找到很多工具&#xff0c;但是都需要在线使用。所以考虑用AI…

机器学习:load_predict_project

本文目录&#xff1a; 一、project目录二、utils里的两个工具包&#xff08;一&#xff09;common.py&#xff08;二&#xff09;log.py 三、src文件夹代码&#xff08;一&#xff09;模型训练&#xff08;train.py&#xff09;&#xff08;二&#xff09;模型预测&#xff08;…

【storage】

文章目录 1、RAM and ROM2、DRAM and SRAM2、Flash Memory&#xff08;闪存&#xff09;4、DDR and SPI NOR Flash5、eMMC6、SPI NOR vs SPI NAND vs eMMC vs SD附录——prototype and demo board附录——U盘、SD卡、TF卡、SSD参考 1、RAM and ROM RAM&#xff08;Random Acce…

JVM 垃圾回收器 详解

垃圾收集器 SerialSerial Old&#xff1a;单线程回收&#xff0c;适用于单核CPU场景ParNewCMS&#xff1a;暂停时间较短&#xff0c;适用于大型互联网应用中与用户交互的部分Paraller ScavengeParallel Old&#xff1a;吞吐量高&#xff0c;适用于后台进行大量数据操作G1&#…

FreeRTOS任务之深入篇

目录 1.Tick1.1 Tick的概念1.2 Tick与任务调度1.3 Tick与延时函数 2.任务状态2.1 运行状态 (Running)2.2 就绪状态 (Ready)2.3 阻塞状态 (Blocked)5.4 暂停状态 (Suspended)2.5 特殊状态&#xff1a;删除状态 (Deleted)5.6 任务状态转换2.7 实验 3.Delay函数3.1 两个函数3.2 实…

Linux 系统、代码与服务器进阶知识深度解析

在数字化时代&#xff0c;Linux 系统凭借其开源、稳定、安全的特性&#xff0c;成为服务器领域和软件开发的核心支柱。除了算法优化技巧&#xff0c;Linux 系统在网络服务、容器化技术、服务器安全等方面也蕴含着丰富的知识和实用技术。接下来&#xff0c;我们将深入探讨这些领…

人工智能--AI换脸

本文实现了一个简易的人脸交换程序&#xff0c;主要功能包括&#xff1a;1)检查所需的模型文件是否存在&#xff1b;2)使用预训练的Caffe模型检测图像中的人脸&#xff1b;3)将源图像的人脸区域通过泊松融合无缝地替换到目标图像上。程序通过OpenCV的DNN模块加载人脸检测模型&a…

NLP学习路线图(二十七):Transformer编码器/解码器

一、Transformer概览&#xff1a;抛弃循环&#xff0c;拥抱注意力 传统RNN及其变体&#xff08;如LSTM、GRU&#xff09;处理序列数据时存在顺序依赖的瓶颈&#xff1a;必须逐个处理序列元素&#xff0c;难以并行计算&#xff0c;且对长程依赖建模能力较弱。Transformer的革命…

【机器学习】支持向量机实验报告——基于SVM进行分类预测

目录 一、实验题目描述 二、实验步骤 三、Python代码实现基于SVM进行分类预测 四、我的收获 五、我的感受 一、实验题目描述 实验题目&#xff1a;基于SVM进行分类预测 实验要求&#xff1a;通过给定数据&#xff0c;使用支持向量机算法&#xff08;SVM&#xff09;实现分…

HA: Wordy靶场

HA: Wordy 来自 <HA: Wordy ~ VulnHub> 1&#xff0c;将两台虚拟机网络连接都改为NAT模式 2&#xff0c;攻击机上做namp局域网扫描发现靶机 nmap -sn 192.168.23.0/24 那么攻击机IP为192.168.23.128&#xff0c;靶场IP192.168.23.130 3&#xff0c;对靶机进行端口服务探…

中国移动6周年!

基站超过250万个 网络规模全球最大、质量最优 覆盖全国96%人口 在全国率先实现乡乡双千兆 服务用户超5.7亿 网络上下行均值接入速率均居行业首位 行业应用快速推广&#xff0c;数量超5万个 3CC、RedCap、通感一体、 无线AI改造等技术成熟商用 客户品牌持续升级&#x…

408第一季 - 数据结构 - 树与二叉树II

二叉树的先中后序遍历 理解 那主播&#xff0c;请问你有没有更快的遍历方式呢 有的&#xff0c;兄弟有的 以中序遍历为例啊 找左边有没有东西&#xff0c;左边没东西那它就自由了&#xff0c;就按上面的图举例子 A左边有东西&#xff0c;是B&#xff0c;B左边没东西&#xf…

从上下文学习和微调看语言模型的泛化:一项对照研究

大型语言模型表现出令人兴奋的能力&#xff0c;但也可以从微调中表现出令人惊讶的狭窄泛化。例如&#xff0c;他们可能无法概括为简单的关系反转&#xff0c;或者无法根据训练信息进行简单的逻辑推理。这些未能从微调中概括出来的失败可能会阻碍这些模型的实际应用。另一方面&a…

智慧城市建设方案

第1章 总体说明 1.1 建设背景 1.2 建设目标 1.3 项目建设主要内容 1.4 设计原则 第2章 对项目的理解 2.1 现状分析 2.2 业务需求分析 2.3 功能需求分析 第3章 大数据平台建设方案 3.1 大数据平台总体设计 3.2 大数据平台功能设计 3.3 平台应用 第4章 政策标准保障…