import time import numpy as np import collections import pyaudio from pygame import mixer import os from pygame import mixer import tensorflow as tf class RingBuffer(object): """Ring buffer to hold audio from PortAudio""" def __init__(self, size = 4096): self._buf = collections.deque(maxlen=size) def extend(self, data): """Adds data to the end of buffer""" self._buf.extend(data) def getNumpyArrayData(self): """Retrieves data from the beginning of buffer """ tmp = bytes(bytearray(self._buf)) # self._buf.clear() # print(222) # print(tmp) import librosa if len(tmp) != 0: tmp = np.frombuffer(tmp, dtype=np.int16) global SAMPLE_RATE if SAMPLE_RATE != RATE_44100: tmp = tmp.astype(np.float32) tmp = librosa.resample(tmp, SAMPLE_RATE, RATE_44100).astype(np.int16) return tmp else: return np.ones(0) modelPath='/home/pi/model/voice_detection/model.tflite' labels_path='/home/pi/model/voice_detection/labels.txt' RATE_44100 = 44100 SAMPLE_RATE = 16000 CHANNELS = 1 CHUNK = 1024 ring_buffer_for_self_trained_model = RingBuffer(CHANNELS * SAMPLE_RATE * 2) def load_labels(path): with open(path, 'r') as f: return {i: line.strip() for i, line in enumerate(f.readlines())} def audio_callback(in_data, frame_count, time_info, status): ring_buffer_for_self_trained_model.extend(in_data) play_data = chr(0) * len(in_data) return play_data, pyaudio.paContinue def play_mp3(path): mixer.music.load(path) mixer.music.play() pa = pyaudio.PyAudio() stream = pa.open( input=True, output=False, input_device_index=0, format=pa.get_format_from_width(2), channels=CHANNELS, rate=SAMPLE_RATE, frames_per_buffer=CHUNK, stream_callback=audio_callback) labels = load_labels(labels_path) interpreter = tf.lite.Interpreter(model_path=modelPath) interpreter.allocate_tensors() input_details = interpreter.get_input_details() output_details = interpreter.get_output_details() # print(input_details) # print('\n\n') # print(output_details) mixer.init() #初始化pygame mixer cry_times = 0 calm_times = 0 status = '' print('-----------------start listening----------------') try: while True: xs = ring_buffer_for_self_trained_model.getNumpyArrayData() if xs.size == 0: time.sleep(0.1) continue input_length=44032 if len(xs) >= input_length: xs = xs[:input_length] else: xs = np.resize(xs, (input_length,)) in_tensor = tf.constant(xs, shape=(1, input_length), dtype=tf.float32) / 32768.0 interpreter.set_tensor(input_details[0]['index'], in_tensor) interpreter.invoke() output_data = np.squeeze(interpreter.get_tensor(output_details[0]['index'])) sorted_indexs = np.argsort(output_data) #将output_data中的元素从小到大排列,提取其对应的index(索引) maxIndex = sorted_indexs[-1] #index=0为背景音 t1 = time.time() # print(labels[maxIndex], output_data[maxIndex]) if labels[maxIndex] == '婴儿哭' and output_data[maxIndex] > 0.8: #连续5次都检测到哭声,才播放音乐,防止误检测 calm_times = 0 cry_times += 1 print('ctring+', cry_times) if cry_times > 5 and status != 'playing': print('playing music===========') mixer.music.load('/home/pi/Music/摇篮曲.mp3') mixer.music.play() status = 'playing' elif labels[maxIndex] != '婴儿哭': cry_times = 0 calm_times += 1 if calm_times > 5 and status == 'playing': # 连续5次都没有检测到哭声,才停止音乐,防止误检测 mixer.music.stop() print('stop play music=======') status = '' t2 = time.time() if t2 - t1 < 0.5: time.sleep(0.5-(t2-t1)) # 每个0.5s做一次声音检测,避免过于频繁 finally: print('finally========') stream.stop_stream() stream.close() pa.terminate()