feat: 新增Python调用麦克风录音示例代码
This commit is contained in:
parent
40a82dd860
commit
0de121e7ff
96
soundcard_demo.py
Normal file
96
soundcard_demo.py
Normal file
@ -0,0 +1,96 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
'''
|
||||
# @Author :FXB
|
||||
# @Description : 示例代码
|
||||
# @Function :语音识别示例
|
||||
'''
|
||||
|
||||
import json
|
||||
import time
|
||||
import rospy
|
||||
import base64
|
||||
import requests
|
||||
import soundcard as sc
|
||||
import soundfile as sf
|
||||
|
||||
|
||||
class DemoSpeechVoice:
|
||||
def __init__(self):
|
||||
self.voice_server_url = "http://127.0.0.1:10086"
|
||||
self.voice_duration = 6
|
||||
self.recording_status = False
|
||||
self.continuous_recording = False
|
||||
|
||||
# 检查服务端状态
|
||||
def server_status(self):
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
try:
|
||||
response = requests.get(self.voice_server_url + "/status", headers=headers, timeout=10)
|
||||
response.raise_for_status()
|
||||
return json.loads(response.text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
rospy.logerr(f"Error1: {e}")
|
||||
time.sleep(0.5)
|
||||
return {"code":1,"msg":"error_msg","status":"Request to the server error."}
|
||||
except Exception as e:
|
||||
rospy.logerr(f"Error2: {e}")
|
||||
time.sleep(0.5)
|
||||
return {"code":1,"msg":"error_msg","status":"Other Error"}
|
||||
|
||||
def record_with_soundcard(self, filename="temp_audio.wav", duration=5, samplerate=44100):
|
||||
"""使用soundcard库录制"""
|
||||
mic = sc.default_microphone()
|
||||
print("Recording...")
|
||||
|
||||
with mic.recorder(samplerate=samplerate) as recorder:
|
||||
data = recorder.record(numframes=samplerate*duration)
|
||||
|
||||
sf.write(filename, data, samplerate)
|
||||
return filename
|
||||
|
||||
# 发送至服务端进行识别并返回识别结果
|
||||
def asr_demo_api(self, wav_path):
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
try:
|
||||
with open(wav_path, "rb") as f:
|
||||
wav_data = base64.b64encode(f.read()).decode()
|
||||
data = {"wav": wav_data}
|
||||
# 设置超时时间,避免长时间等待
|
||||
response = requests.post(self.voice_server_url + "/asr", headers=headers, json=data, timeout=5)
|
||||
response.raise_for_status()
|
||||
response = response.json()
|
||||
if response['code'] == 0:
|
||||
res = response['res']
|
||||
self.recording_status = False
|
||||
return res
|
||||
else:
|
||||
return f"API Error: {response['msg']}"
|
||||
except requests.exceptions.RequestException as e:
|
||||
return f"Network Error: Failed to connect to the server. Details: {str(e)}"
|
||||
except Exception as e:
|
||||
return f"Unexpected Error: {str(e)}"
|
||||
|
||||
def asr_start(self):
|
||||
wav_path = self.record_with_soundcard(duration=self.voice_duration)
|
||||
res = self.asr_demo_api(wav_path)
|
||||
print(f"Recognition Result: {res}")
|
||||
|
||||
def run(self):
|
||||
status = self.server_status()
|
||||
if status["code"] == 0 and self.recording_status == False:
|
||||
self.asr_start()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
rospy.init_node('ASR_node')
|
||||
print("[ASR_node] init start.")
|
||||
|
||||
try:
|
||||
# 创建实例并运行
|
||||
voice = DemoSpeechVoice()
|
||||
voice.run()
|
||||
rospy.spin()
|
||||
except rospy.ROSInterruptException:
|
||||
pass
|
||||
Loading…
Reference in New Issue
Block a user