详解python的webrtc库实现语音端点检测
引言
语音端点检测最早应用于电话传输和检测系统当中,用于通信信道的时间分配,提高传输线路的利用效率.端点检测属于语音处理系统的前端操作,在语音检测领域意义重大.
但是目前的语音端点检测,尤其是检测 人声 开始和结束的端点始终是属于技术难点,各家公司始终处于 能判断,但是不敢保证 判别准确性 的阶段.
现在基于云端语义库的聊天机器人层出不穷,其中最著名的当属amazon的 Alexa/Echo 智能音箱.
国内如雨后春笋般出现了各种搭载语音聊天的智能音箱(如前几天在知乎上广告的若琪机器人)和各类智能机器人产品.国内语音服务提供商主要面对中文语音服务,由于语音不像图像有分辨率等等较为客观的指标,很多时候凭主观判断,所以较难判断各家语音识别和合成技术的好坏.但是我个人认为,国内的中文语音服务和国外的英文语音服务,在某些方面已经有超越的趋势.
通常搭建机器人聊天系统主要包括以下三个方面:
- 语音转文字(ASR/STT)
- 语义内容(NLU/NLP)
- 文字转语音(TTS)
语音转文字(ASR/STT)
在将语音传给云端API之前,是本地前端的语音采集,这部分主要包括如下几个方面:
- 麦克风降噪
- 声源定位
- 回声消除
- 唤醒词
- 语音端点检测
- 音频格式压缩
python 端点检测
由于实际应用中,单纯依靠能量检测特征检测等方法很难判断人声说话的起始点,所以市面上大多数的语音产品都是使用唤醒词判断语音起始.另外加上声音回路,还可以做语音打断.这样的交互方式可能有些傻,每次必须喊一下 唤醒词 才能继续聊天.这种方式聊多了,个人感觉会嘴巴疼:-O .现在github上有snowboy唤醒词的开源库,大家可以登录snowboy官网训练自己的唤醒词模型.
考虑到用唤醒词嘴巴会累,所以大致调研了一下,Python拥有丰富的库,直接import就能食用.这种方式容易受强噪声干扰,适合一个人在家玩玩.
- pyaudio: pip install pyaudio 可以从设备节点读取原始音频流数据,音频编码是PCM格式;
- webrtcvad: pip install webrtcvad 检测判断一组语音数据是否为空语音;
当检测到持续时间长度 T1 vad检测都有语音活动,可以判定为语音起始;
当检测到持续时间长度 T2 vad检测都没有有语音活动,可以判定为语音结束;
完整程序代码可以从我的github下载
程序很简单,相信看一会儿就明白了
''' Requirements: + pyaudio - `pip install pyaudio` + py-webrtcvad - `pip install webrtcvad` ''' import webrtcvad import collections import sys import signal import pyaudio from array import array from struct import pack import wave import time FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 CHUNK_DURATION_MS = 30 # supports 10, 20 and 30 (ms) PADDING_DURATION_MS = 1500 # 1 sec jugement CHUNK_SIZE = int(RATE CHUNK_DURATION_MS / 1000) # chunk to read CHUNK_BYTES = CHUNK_SIZE 2 # 16bit = 2 bytes, PCM NUM_PADDING_CHUNKS = int(PADDING_DURATION_MS / CHUNK_DURATION_MS) # NUM_WINDOW_CHUNKS = int(240 / CHUNK_DURATION_MS) NUM_WINDOW_CHUNKS = int(400 / CHUNK_DURATION_MS) # 400 ms/ 30ms ge NUM_WINDOW_CHUNKS_END = NUM_WINDOW_CHUNKS 2 START_OFFSET = int(NUM_WINDOW_CHUNKS CHUNK_DURATION_MS 0.5 RATE) vad = webrtcvad.Vad(1) pa = pyaudio.PyAudio() stream = pa.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, start=False, # input_device_index=2, frames_per_buffer=CHUNK_SIZE) got_a_sentence = False leave = False def handle_int(sig, chunk): global leave, got_a_sentence leave = True got_a_sentence = True def record_to_file(path, data, sample_width): "Records from the microphone and outputs the resulting data to 'path'" # sample_width, data = record() data = pack('<' + ('h' len(data)), data) wf = wave.open(path, 'wb') wf.setnchannels(1) wf.setsampwidth(sample_width) wf.setframerate(RATE) wf.writeframes(data) wf.close() def normalize(snd_data): "Average the volume out" MAXIMUM = 32767 # 16384 times = float(MAXIMUM) / max(abs(i) for i in snd_data) r = array('h') for i in snd_data: r.append(int(i times)) return r signal.signal(signal.SIGINT, handle_int) while not leave: ring_buffer = collections.deque(maxlen=NUM_PADDING_CHUNKS) triggered = False voiced_frames = [] ring_buffer_flags = [0] NUM_WINDOW_CHUNKS ring_buffer_index = 0 ring_buffer_flags_end = [0] NUM_WINDOW_CHUNKS_END ring_buffer_index_end = 0 buffer_in = '' # WangS raw_data = array('h') index = 0 start_point = 0 StartTime = time.time() print(" recording: ") stream.start_stream() while not got_a_sentence and not leave: chunk = stream.read(CHUNK_SIZE) # add WangS raw_data.extend(array('h', chunk)) index += CHUNK_SIZE TimeUse = time.time() - StartTime active = vad.is_speech(chunk, RATE) sys.stdout.write('1' if active else '_') ring_buffer_flags[ring_buffer_index] = 1 if active else 0 ring_buffer_index += 1 ring_buffer_index %= NUM_WINDOW_CHUNKS ring_buffer_flags_end[ring_buffer_index_end] = 1 if active else 0 ring_buffer_index_end += 1 ring_buffer_index_end %= NUM_WINDOW_CHUNKS_END # start point detection if not triggered: ring_buffer.append(chunk) num_voiced = sum(ring_buffer_flags) if num_voiced > 0.8 NUM_WINDOW_CHUNKS: sys.stdout.write(' Open ') triggered = True start_point = index - CHUNK_SIZE 20 # start point # voiced_frames.extend(ring_buffer) ring_buffer.clear() # end point detection else: # voiced_frames.append(chunk) ring_buffer.append(chunk) num_unvoiced = NUM_WINDOW_CHUNKS_END - sum(ring_buffer_flags_end) if num_unvoiced > 0.90 NUM_WINDOW_CHUNKS_END or TimeUse > 10: sys.stdout.write(' Close ') triggered = False got_a_sentence = True sys.stdout.flush() sys.stdout.write('\n') # data = b''.join(voiced_frames) stream.stop_stream() print(" done recording") got_a_sentence = False # write to file raw_data.reverse() for index in range(start_point): raw_data.pop() raw_data.reverse() raw_data = normalize(raw_data) record_to_file("recording.wav", raw_data, 2) leave = True stream.close()
程序运行方式sudo python vad.py