diff --git a/funasr/runtime/python/websocket/wss_client_asr.py b/funasr/runtime/python/websocket/wss_client_asr.py index dec598abf..bd9e89f5b 100644 --- a/funasr/runtime/python/websocket/wss_client_asr.py +++ b/funasr/runtime/python/websocket/wss_client_asr.py @@ -71,7 +71,6 @@ print(args) from queue import Queue voices = Queue() - ibest_writer = None if args.output_dir is not None: writer = DatadirWriter(args.output_dir) @@ -118,9 +117,11 @@ async def record_from_scp(chunk_begin, chunk_size): wavs = wavs[chunk_begin:chunk_begin + chunk_size] for wav in wavs: wav_splits = wav.strip().split() + wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo" wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0] - + if not len(wav_path.strip())>0: + continue if wav_path.endswith(".pcm"): with open(wav_path, "rb") as f: audio_bytes = f.read() @@ -142,47 +143,43 @@ async def record_from_scp(chunk_begin, chunk_size): # send first time message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "wav_name": wav_name, "is_speaking": True}) - voices.put(message) + #voices.put(message) + await websocket.send(message) is_speaking = True for i in range(chunk_num): beg = i * stride data = audio_bytes[beg:beg + stride] message = data - voices.put(message) + #voices.put(message) + await websocket.send(message) if i == chunk_num - 1: is_speaking = False message = json.dumps({"is_speaking": is_speaking}) - voices.put(message) + #voices.put(message) + await websocket.send(message) # print("data_chunk: ", len(data_chunk)) # print(voices.qsize()) sleep_duration = 0.001 if args.send_without_sleep else 60 * args.chunk_size[1] / args.chunk_interval / 1000 await asyncio.sleep(sleep_duration) + while not voices.empty(): + await asyncio.sleep(1) + await asyncio.sleep(3) + await websocket.close() + + + -async def ws_send(): - global voices - global websocket - print("started to sending data!") - while True: - while not voices.empty(): - data = voices.get() - voices.task_done() - try: - await websocket.send(data) - except Exception as e: - print('Exception occurred:', e) - traceback.print_exc() - exit(0) - await asyncio.sleep(0.005) - await asyncio.sleep(0.005) - + + async def message(id): - global websocket + global websocket,voices text_print = "" text_print_2pass_online = "" text_print_2pass_offline = "" - while True: - try: + try: + while True: + meg = await websocket.recv() meg = json.loads(meg) wav_name = meg.get("wav_name", "demo") @@ -213,10 +210,11 @@ async def message(id): os.system('clear') print("\rpid" + str(id) + ": " + text_print) - except Exception as e: + except Exception as e: print("Exception:", e) - traceback.print_exc() - exit(0) + #traceback.print_exc() + #await websocket.close() + async def print_messge(): @@ -228,11 +226,16 @@ async def print_messge(): print(meg) except Exception as e: print("Exception:", e) - traceback.print_exc() + #traceback.print_exc() exit(0) async def ws_client(id, chunk_begin, chunk_size): - global websocket + if args.audio_in is None: + chunk_begin=0 + chunk_size=1 + global websocket,voices + for i in range(chunk_begin,chunk_begin+chunk_size): + voices = Queue() if args.ssl == 1: ssl_context = ssl.SSLContext() ssl_context.check_hostname = False @@ -242,14 +245,16 @@ async def ws_client(id, chunk_begin, chunk_size): uri = "ws://{}:{}".format(args.host, args.port) ssl_context = None print("connect to", uri) - async for websocket in websockets.connect(uri, subprotocols=["binary"], ping_interval=None, ssl=ssl_context): + async with websockets.connect(uri, subprotocols=["binary"], ping_interval=None, ssl=ssl_context) as websocket: if args.audio_in is not None: - task = asyncio.create_task(record_from_scp(chunk_begin, chunk_size)) + task = asyncio.create_task(record_from_scp(i, 1)) else: task = asyncio.create_task(record_microphone()) - task2 = asyncio.create_task(ws_send()) + #task2 = asyncio.create_task(ws_send()) task3 = asyncio.create_task(message(id)) - await asyncio.gather(task, task2, task3) + await asyncio.gather(task, task3) + exit(0) + def one_thread(id, chunk_begin, chunk_size): asyncio.get_event_loop().run_until_complete(ws_client(id, chunk_begin, chunk_size))