From ec05a7cb1f530662ca253002ebbe8ce675ce1da6 Mon Sep 17 00:00:00 2001 From: cgisky1980 Date: Tue, 21 Mar 2023 20:09:33 +0800 Subject: [PATCH 1/5] Create vad_asr_websocket_client.py --- .../vad_asr_websocket_client.py | 197 ++++++++++++++++++ 1 file changed, 197 insertions(+) create mode 100644 funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py diff --git a/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py b/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py new file mode 100644 index 000000000..c5096cb5c --- /dev/null +++ b/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py @@ -0,0 +1,197 @@ +#""" from https://github.com/cgisky1980/550W_AI_Assistant """ + +from modelscope.pipelines import pipeline +from modelscope.utils.constant import Tasks +from modelscope.utils.logger import get_logger +import logging +logger = get_logger(log_level=logging.CRITICAL) +logger.setLevel(logging.CRITICAL) +import websocket +import pyaudio +import time +import json +import threading + + +# ---------WebsocketClient相关 主要处理 on_message on_open 已经做了断线重连处理 +class WebsocketClient(object): + def __init__(self, address, message_callback=None): + super(WebsocketClient, self).__init__() + self.address = address + self.message_callback = None + + def on_message(self, ws, message): + try: + messages = json.loads( + (message.encode("raw_unicode_escape")).decode() + ) # 收到WS消息后的处理 + if messages.get("type") == "ping": + self.ws.send('{"type":"pong"}') + except json.JSONDecodeError as e: + print(f"JSONDecodeError: {e}") + except KeyError: + print("KeyError!") + + def on_error(self, ws, error): + print("client error:", error) + + def on_close(self, ws): + print("### client closed ###") + self.ws.close() + self.is_running = False + + def on_open(self, ws): # 连上ws后发布登录信息 + self.is_running = True + self.ws.send( + '{"type":"login","uid":"asr","pwd":"tts9102093109"}' + ) # WS链接上后的登陆处理 + + def close_connect(self): + self.ws.close() + + def send_message(self, message): + try: + self.ws.send(message) + except BaseException as err: + pass + + def run(self): # WS初始化 + websocket.enableTrace(True) + self.ws = websocket.WebSocketApp( + self.address, + on_message=lambda ws, message: self.on_message(ws, message), + on_error=lambda ws, error: self.on_error(ws, error), + on_close=lambda ws: self.on_close(ws), + ) + websocket.enableTrace(False) # 要看ws调试信息,请把这行注释掉 + self.ws.on_open = lambda ws: self.on_open(ws) + self.is_running = False + # WS断线重连判断 + while True: + if not self.is_running: + self.ws.run_forever() + time.sleep(3) # 3秒检测一次 + + +class WSClient(object): + def __init__(self, address, call_back): + super(WSClient, self).__init__() + self.client = WebsocketClient(address, call_back) + self.client_thread = None + + def run(self): + self.client_thread = threading.Thread(target=self.run_client) + self.client_thread.start() + + def run_client(self): + self.client.run() + + def send_message(self, message): + self.client.send_message(message) + + +def vad(data): # VAD推理 + segments_result = vad_pipline(audio_in=data) + if segments_result["text"] == "[]": + return False + else: + return True + + +# 创建一个VAD对象 +vad_pipline = pipeline( + task=Tasks.voice_activity_detection, + model="damo/speech_fsmn_vad_zh-cn-16k-common-pytorch", + model_revision="v1.2.0", + output_dir=None, + batch_size=1, +) + +param_dict = dict() +param_dict["hotword"] = "小五 小五月" # 设置热词,用空格隔开 + + +# 创建一个ASR对象 +inference_pipeline2 = pipeline( + task=Tasks.auto_speech_recognition, + model="damo/speech_paraformer-large-contextual_asr_nat-zh-cn-16k-common-vocab8404", + param_dict=param_dict, +) + +# 创建一个PyAudio对象 +p = pyaudio.PyAudio() + +# 定义一些参数 +FORMAT = pyaudio.paInt16 # 采样格式 +CHANNELS = 1 # 单声道 +RATE = 16000 # 采样率 +CHUNK = int(RATE / 1000 * 300) # 每个片段的帧数(300毫秒) +RECORD_NUM = 0 # 录制时长(片段) + +# 打开输入流 +stream = p.open( + format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=True, + frames_per_buffer=CHUNK, +) + +print("开始...") + +# 创建一个WS连接 +ws_client = WSClient("ws://localhost:7272", None) +ws_client.run() + +frames = [] # 存储所有的帧数据 +buffer = [] # 存储缓存中的帧数据(最多两个片段) +silence_count = 0 # 统计连续静音的次数 +speech_detected = False # 标记是否检测到语音 + +# 循环读取输入流中的数据 +while True: + data = stream.read(CHUNK) # 读取一个片段的数据 + buffer.append(data) # 将当前数据添加到缓存中 + + if len(buffer) > 2: + buffer.pop(0) # 如果缓存超过两个片段,则删除最早的一个 + + if speech_detected: + frames.append(data) + RECORD_NUM += 1 + # print(str(RECORD_NUM)+ "\r") + + if vad(data): # VAD 判断是否有声音 + if not speech_detected: + print("开始录音...") + speech_detected = True # 标记为检测到语音 + frames = [] + frames.extend(buffer) # 把之前2个语音数据快加入 + silence_count = 0 # 重置静音次数 + + else: + silence_count += 1 # 增加静音次数 + #检测静音次数4次 或者已经录了50个数据块,则录音停止 + if speech_detected and (silence_count > 4 or RECORD_NUM > 50): + print("停止录音...") + audio_in = b"".join(frames) + rec_result = inference_pipeline2(audio_in=audio_in) # ws播报数据 + rec_result["type"] = "nlp" # 添加ws播报数据 + ws_client.send_message( + json.dumps(rec_result, ensure_ascii=False) + ) # ws发送到服务端 + print(rec_result) + frames = [] # 清空所有的帧数据 + buffer = [] # 清空缓存中的帧数据(最多两个片段) + silence_count = 0 # 统计连续静音的次数清零 + speech_detected = False # 标记是否检测到语音 + # RECORD_NUM = 0 + +print("结束录制...") + +# 停止并关闭输入流 +stream.stop_stream() +stream.close() + +# 关闭PyAudio对象 +p.terminate() From e11618bbb49f6742f6ecde1985b6a096934bea66 Mon Sep 17 00:00:00 2001 From: cgisky1980 Date: Tue, 21 Mar 2023 20:33:13 +0800 Subject: [PATCH 2/5] Create ws_server_demo.py --- .../vad_asr_websocket_client/ws_server_demo.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 funasr/runtime/python/vad_asr_websocket_client/ws_server_demo.py diff --git a/funasr/runtime/python/vad_asr_websocket_client/ws_server_demo.py b/funasr/runtime/python/vad_asr_websocket_client/ws_server_demo.py new file mode 100644 index 000000000..55d0315ce --- /dev/null +++ b/funasr/runtime/python/vad_asr_websocket_client/ws_server_demo.py @@ -0,0 +1,13 @@ +# server.py +import asyncio +import websockets + +async def echo(websocket, path): + async for message in websocket: + print(message) # 打印收到的消息 + await websocket.send(message) + +start_server = websockets.serve(echo, "localhost", 7272) + +asyncio.get_event_loop().run_until_complete(start_server) +asyncio.get_event_loop().run_forever() \ No newline at end of file From b1a5fbd433da55291f0a3d9df3fa1e85e6fcbc66 Mon Sep 17 00:00:00 2001 From: cgisky1980 Date: Tue, 21 Mar 2023 20:42:40 +0800 Subject: [PATCH 3/5] Update vad_asr_websocket_client.py --- .../python/vad_asr_websocket_client/vad_asr_websocket_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py b/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py index c5096cb5c..7f5ed539b 100644 --- a/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py +++ b/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py @@ -185,7 +185,7 @@ while True: buffer = [] # 清空缓存中的帧数据(最多两个片段) silence_count = 0 # 统计连续静音的次数清零 speech_detected = False # 标记是否检测到语音 - # RECORD_NUM = 0 + RECORD_NUM = 0 print("结束录制...") From 61089908a0e8e7a8b6a05493b35a1c42854dd8d9 Mon Sep 17 00:00:00 2001 From: cgisky1980 Date: Thu, 23 Mar 2023 00:36:04 +0800 Subject: [PATCH 4/5] websocket runtime --- funasr/runtime/websocket/ASR_client.py | 73 +++++++++++++ funasr/runtime/websocket/ASR_server.py | 143 +++++++++++++++++++++++++ 2 files changed, 216 insertions(+) create mode 100644 funasr/runtime/websocket/ASR_client.py create mode 100644 funasr/runtime/websocket/ASR_server.py diff --git a/funasr/runtime/websocket/ASR_client.py b/funasr/runtime/websocket/ASR_client.py new file mode 100644 index 000000000..7dce8803f --- /dev/null +++ b/funasr/runtime/websocket/ASR_client.py @@ -0,0 +1,73 @@ +import pyaudio +import websocket #区别服务端这里是 websocket-client库 +import time +import websockets +import asyncio +from queue import Queue +import threading +voices = Queue() +async def hello(): + global ws # 定义一个全局变量ws,用于保存websocket连接对象 + uri = "ws://localhost:8899" + ws = await websockets.connect(uri, subprotocols=["binary"]) # 创建一个长连接 + ws.max_size = 1024 * 1024 * 20 + print("connected ws server") +async def send(data): + global ws # 引用全局变量ws + try: + await ws.send(data) # 通过ws对象发送数据 + except Exception as e: + print('Exception occurred:', e) + + + +asyncio.get_event_loop().run_until_complete(hello()) # 启动协程 + + +# 其他函数可以通过调用send(data)来发送数据,例如: +async def test(): + #print("2") + global voices + FORMAT = pyaudio.paInt16 + CHANNELS = 1 + RATE = 16000 + CHUNK = int(RATE / 1000 * 300) + + p = pyaudio.PyAudio() + + stream = p.open(format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=True, + frames_per_buffer=CHUNK) + + while True: + + data = stream.read(CHUNK) + + voices.put(data) + #print(voices.qsize()) + await asyncio.sleep(0.01) + + + + + +async def ws_send(): + global voices + print("started to sending data!") + while True: + while not voices.empty(): + data = voices.get() + voices.task_done() + await send(data) + await asyncio.sleep(0.01) + await asyncio.sleep(0.01) + +async def main(): + task = asyncio.create_task(test()) # 创建一个后台任务 + task2 = asyncio.create_task(ws_send()) # 创建一个后台任务 + + await asyncio.gather(task, task2) + +asyncio.run(main()) \ No newline at end of file diff --git a/funasr/runtime/websocket/ASR_server.py b/funasr/runtime/websocket/ASR_server.py new file mode 100644 index 000000000..3627d3a9b --- /dev/null +++ b/funasr/runtime/websocket/ASR_server.py @@ -0,0 +1,143 @@ +# server.py 注意本例仅处理单个clent发送的语音数据,并未对多client连接进行判断和处理 +from modelscope.pipelines import pipeline +from modelscope.utils.constant import Tasks +from modelscope.utils.logger import get_logger +import logging + +logger = get_logger(log_level=logging.CRITICAL) +logger.setLevel(logging.CRITICAL) +import asyncio +import websockets #区别客户端这里是 websockets库 +import time +from queue import Queue +import threading + +print("model loading") +voices = Queue() +speek = Queue() +# 创建一个VAD对象 +vad_pipline = pipeline( + task=Tasks.voice_activity_detection, + model="damo/speech_fsmn_vad_zh-cn-16k-common-pytorch", + model_revision="v1.2.0", + output_dir=None, + batch_size=1, +) + +# 创建一个ASR对象 +param_dict = dict() +param_dict["hotword"] = "小五 小五月" # 设置热词,用空格隔开 +inference_pipeline2 = pipeline( + task=Tasks.auto_speech_recognition, + model="damo/speech_paraformer-large-contextual_asr_nat-zh-cn-16k-common-vocab8404", + param_dict=param_dict, +) +print("model loaded") + + + +async def echo(websocket, path): + global voices + try: + async for message in websocket: + voices.put(message) + #print("put") + except websockets.exceptions.ConnectionClosedError as e: + print('Connection closed with exception:', e) + except Exception as e: + print('Exception occurred:', e) + +start_server = websockets.serve(echo, "localhost", 8899, subprotocols=["binary"],ping_interval=None) + + +def vad(data): # 推理 + global vad_pipline + #print(type(data)) + segments_result = vad_pipline(audio_in=data) + #print(segments_result) + if len(segments_result) == 0: + return False + else: + return True + +def asr(): # 推理 + global inference_pipeline2 + global speek + while True: + while not speek.empty(): + audio_in = speek.get() + speek.task_done() + rec_result = inference_pipeline2(audio_in=audio_in) + print(rec_result) + time.sleep(0.1) + time.sleep(0.1) + + +def main(): # 推理 + frames = [] # 存储所有的帧数据 + buffer = [] # 存储缓存中的帧数据(最多两个片段) + silence_count = 0 # 统计连续静音的次数 + speech_detected = False # 标记是否检测到语音 + RECORD_NUM = 0 + global voices + global speek + while True: + while not voices.empty(): + + data = voices.get() + #print("队列排队数",voices.qsize()) + voices.task_done() + buffer.append(data) + if len(buffer) > 2: + buffer.pop(0) # 如果缓存超过两个片段,则删除最早的一个 + + if speech_detected: + frames.append(data) + RECORD_NUM += 1 + + if vad(data): + if not speech_detected: + print("检测到人声...") + speech_detected = True # 标记为检测到语音 + frames = [] + frames.extend(buffer) # 把之前2个语音数据快加入 + silence_count = 0 # 重置静音次数 + else: + silence_count += 1 # 增加静音次数 + + if speech_detected and (silence_count > 4 or RECORD_NUM > 50): #这里 50 可根据需求改为合适的数据快数量 + print("说话结束或者超过设置最长时间...") + audio_in = b"".join(frames) + #asrt = threading.Thread(target=asr,args=(audio_in,)) + #asrt.start() + speek.put(audio_in) + #rec_result = inference_pipeline2(audio_in=audio_in) # ASR 模型里跑一跑 + frames = [] # 清空所有的帧数据 + buffer = [] # 清空缓存中的帧数据(最多两个片段) + silence_count = 0 # 统计连续静音的次数清零 + speech_detected = False # 标记是否检测到语音 + RECORD_NUM = 0 + time.sleep(0.01) + time.sleep(0.01) + + + +s = threading.Thread(target=main) +s.start() +s = threading.Thread(target=asr) +s.start() + +asyncio.get_event_loop().run_until_complete(start_server) +asyncio.get_event_loop().run_forever() + + + + + + + + + + + + From 60d5ac64c1415d7cfd4e9e96f14a89eb20bc9074 Mon Sep 17 00:00:00 2001 From: cgisky1980 Date: Thu, 23 Mar 2023 00:48:09 +0800 Subject: [PATCH 5/5] websocket runtime --- .../vad_asr_websocket_client.py | 197 ------------------ .../ws_server_demo.py | 13 -- .../{ => python}/websocket/ASR_client.py | 0 .../{ => python}/websocket/ASR_server.py | 0 4 files changed, 210 deletions(-) delete mode 100644 funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py delete mode 100644 funasr/runtime/python/vad_asr_websocket_client/ws_server_demo.py rename funasr/runtime/{ => python}/websocket/ASR_client.py (100%) rename funasr/runtime/{ => python}/websocket/ASR_server.py (100%) diff --git a/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py b/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py deleted file mode 100644 index 7f5ed539b..000000000 --- a/funasr/runtime/python/vad_asr_websocket_client/vad_asr_websocket_client.py +++ /dev/null @@ -1,197 +0,0 @@ -#""" from https://github.com/cgisky1980/550W_AI_Assistant """ - -from modelscope.pipelines import pipeline -from modelscope.utils.constant import Tasks -from modelscope.utils.logger import get_logger -import logging -logger = get_logger(log_level=logging.CRITICAL) -logger.setLevel(logging.CRITICAL) -import websocket -import pyaudio -import time -import json -import threading - - -# ---------WebsocketClient相关 主要处理 on_message on_open 已经做了断线重连处理 -class WebsocketClient(object): - def __init__(self, address, message_callback=None): - super(WebsocketClient, self).__init__() - self.address = address - self.message_callback = None - - def on_message(self, ws, message): - try: - messages = json.loads( - (message.encode("raw_unicode_escape")).decode() - ) # 收到WS消息后的处理 - if messages.get("type") == "ping": - self.ws.send('{"type":"pong"}') - except json.JSONDecodeError as e: - print(f"JSONDecodeError: {e}") - except KeyError: - print("KeyError!") - - def on_error(self, ws, error): - print("client error:", error) - - def on_close(self, ws): - print("### client closed ###") - self.ws.close() - self.is_running = False - - def on_open(self, ws): # 连上ws后发布登录信息 - self.is_running = True - self.ws.send( - '{"type":"login","uid":"asr","pwd":"tts9102093109"}' - ) # WS链接上后的登陆处理 - - def close_connect(self): - self.ws.close() - - def send_message(self, message): - try: - self.ws.send(message) - except BaseException as err: - pass - - def run(self): # WS初始化 - websocket.enableTrace(True) - self.ws = websocket.WebSocketApp( - self.address, - on_message=lambda ws, message: self.on_message(ws, message), - on_error=lambda ws, error: self.on_error(ws, error), - on_close=lambda ws: self.on_close(ws), - ) - websocket.enableTrace(False) # 要看ws调试信息,请把这行注释掉 - self.ws.on_open = lambda ws: self.on_open(ws) - self.is_running = False - # WS断线重连判断 - while True: - if not self.is_running: - self.ws.run_forever() - time.sleep(3) # 3秒检测一次 - - -class WSClient(object): - def __init__(self, address, call_back): - super(WSClient, self).__init__() - self.client = WebsocketClient(address, call_back) - self.client_thread = None - - def run(self): - self.client_thread = threading.Thread(target=self.run_client) - self.client_thread.start() - - def run_client(self): - self.client.run() - - def send_message(self, message): - self.client.send_message(message) - - -def vad(data): # VAD推理 - segments_result = vad_pipline(audio_in=data) - if segments_result["text"] == "[]": - return False - else: - return True - - -# 创建一个VAD对象 -vad_pipline = pipeline( - task=Tasks.voice_activity_detection, - model="damo/speech_fsmn_vad_zh-cn-16k-common-pytorch", - model_revision="v1.2.0", - output_dir=None, - batch_size=1, -) - -param_dict = dict() -param_dict["hotword"] = "小五 小五月" # 设置热词,用空格隔开 - - -# 创建一个ASR对象 -inference_pipeline2 = pipeline( - task=Tasks.auto_speech_recognition, - model="damo/speech_paraformer-large-contextual_asr_nat-zh-cn-16k-common-vocab8404", - param_dict=param_dict, -) - -# 创建一个PyAudio对象 -p = pyaudio.PyAudio() - -# 定义一些参数 -FORMAT = pyaudio.paInt16 # 采样格式 -CHANNELS = 1 # 单声道 -RATE = 16000 # 采样率 -CHUNK = int(RATE / 1000 * 300) # 每个片段的帧数(300毫秒) -RECORD_NUM = 0 # 录制时长(片段) - -# 打开输入流 -stream = p.open( - format=FORMAT, - channels=CHANNELS, - rate=RATE, - input=True, - frames_per_buffer=CHUNK, -) - -print("开始...") - -# 创建一个WS连接 -ws_client = WSClient("ws://localhost:7272", None) -ws_client.run() - -frames = [] # 存储所有的帧数据 -buffer = [] # 存储缓存中的帧数据(最多两个片段) -silence_count = 0 # 统计连续静音的次数 -speech_detected = False # 标记是否检测到语音 - -# 循环读取输入流中的数据 -while True: - data = stream.read(CHUNK) # 读取一个片段的数据 - buffer.append(data) # 将当前数据添加到缓存中 - - if len(buffer) > 2: - buffer.pop(0) # 如果缓存超过两个片段,则删除最早的一个 - - if speech_detected: - frames.append(data) - RECORD_NUM += 1 - # print(str(RECORD_NUM)+ "\r") - - if vad(data): # VAD 判断是否有声音 - if not speech_detected: - print("开始录音...") - speech_detected = True # 标记为检测到语音 - frames = [] - frames.extend(buffer) # 把之前2个语音数据快加入 - silence_count = 0 # 重置静音次数 - - else: - silence_count += 1 # 增加静音次数 - #检测静音次数4次 或者已经录了50个数据块,则录音停止 - if speech_detected and (silence_count > 4 or RECORD_NUM > 50): - print("停止录音...") - audio_in = b"".join(frames) - rec_result = inference_pipeline2(audio_in=audio_in) # ws播报数据 - rec_result["type"] = "nlp" # 添加ws播报数据 - ws_client.send_message( - json.dumps(rec_result, ensure_ascii=False) - ) # ws发送到服务端 - print(rec_result) - frames = [] # 清空所有的帧数据 - buffer = [] # 清空缓存中的帧数据(最多两个片段) - silence_count = 0 # 统计连续静音的次数清零 - speech_detected = False # 标记是否检测到语音 - RECORD_NUM = 0 - -print("结束录制...") - -# 停止并关闭输入流 -stream.stop_stream() -stream.close() - -# 关闭PyAudio对象 -p.terminate() diff --git a/funasr/runtime/python/vad_asr_websocket_client/ws_server_demo.py b/funasr/runtime/python/vad_asr_websocket_client/ws_server_demo.py deleted file mode 100644 index 55d0315ce..000000000 --- a/funasr/runtime/python/vad_asr_websocket_client/ws_server_demo.py +++ /dev/null @@ -1,13 +0,0 @@ -# server.py -import asyncio -import websockets - -async def echo(websocket, path): - async for message in websocket: - print(message) # 打印收到的消息 - await websocket.send(message) - -start_server = websockets.serve(echo, "localhost", 7272) - -asyncio.get_event_loop().run_until_complete(start_server) -asyncio.get_event_loop().run_forever() \ No newline at end of file diff --git a/funasr/runtime/websocket/ASR_client.py b/funasr/runtime/python/websocket/ASR_client.py similarity index 100% rename from funasr/runtime/websocket/ASR_client.py rename to funasr/runtime/python/websocket/ASR_client.py diff --git a/funasr/runtime/websocket/ASR_server.py b/funasr/runtime/python/websocket/ASR_server.py similarity index 100% rename from funasr/runtime/websocket/ASR_server.py rename to funasr/runtime/python/websocket/ASR_server.py