diff --git a/funasr/runtime/python/websocket/wss_client_asr.py b/funasr/runtime/python/websocket/wss_client_asr.py index 0dd236d62..2ea8a1623 100644 --- a/funasr/runtime/python/websocket/wss_client_asr.py +++ b/funasr/runtime/python/websocket/wss_client_asr.py @@ -1,7 +1,7 @@ # -*- encoding: utf-8 -*- import os import time -import websockets,ssl +import websockets, ssl import asyncio # import threading import argparse @@ -12,6 +12,7 @@ from funasr.fileio.datadir_writer import DatadirWriter import logging +SUPPORT_AUDIO_TYPE_SETS = ['.wav', '.pcm'] logging.basicConfig(level=logging.ERROR) parser = argparse.ArgumentParser() @@ -53,7 +54,7 @@ parser.add_argument("--output_dir", type=str, default=None, help="output_dir") - + parser.add_argument("--ssl", type=int, default=1, @@ -68,22 +69,25 @@ args.chunk_size = [int(x) for x in args.chunk_size.split(",")] print(args) # voices = asyncio.Queue() from queue import Queue -voices = Queue() +voices = Queue() +offline_msg_done=False + ibest_writer = None if args.output_dir is not None: writer = DatadirWriter(args.output_dir) ibest_writer = writer[f"1best_recog"] + async def record_microphone(): is_finished = False import pyaudio - #print("2") - global voices + # print("2") + global voices FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 - chunk_size = 60*args.chunk_size[1]/args.chunk_interval + chunk_size = 60 * args.chunk_size[1] / args.chunk_interval CHUNK = int(RATE / 1000 * chunk_size) p = pyaudio.PyAudio() @@ -94,19 +98,16 @@ async def record_microphone(): input=True, frames_per_buffer=CHUNK) - message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "wav_name": "microphone", "is_speaking": True}) + message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, + "wav_name": "microphone", "is_speaking": True}) voices.put(message) while True: - data = stream.read(CHUNK) - message = data - + message = data voices.put(message) - await asyncio.sleep(0.005) -async def record_from_scp(chunk_begin,chunk_size): - import wave +async def record_from_scp(chunk_begin, chunk_size): global voices is_finished = False if args.audio_in.endswith(".scp"): @@ -114,91 +115,98 @@ async def record_from_scp(chunk_begin,chunk_size): wavs = f_scp.readlines() else: wavs = [args.audio_in] - if chunk_size>0: - wavs=wavs[chunk_begin:chunk_begin+chunk_size] + if chunk_size > 0: + wavs = wavs[chunk_begin:chunk_begin + chunk_size] for wav in wavs: wav_splits = wav.strip().split() + wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo" wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0] - - # bytes_f = open(wav_path, "rb") - # bytes_data = bytes_f.read() - with wave.open(wav_path, "rb") as wav_file: - params = wav_file.getparams() - # header_length = wav_file.getheaders()[0][1] - # wav_file.setpos(header_length) - frames = wav_file.readframes(wav_file.getnframes()) + if not len(wav_path.strip())>0: + continue + if wav_path.endswith(".pcm"): + with open(wav_path, "rb") as f: + audio_bytes = f.read() + elif wav_path.endswith(".wav"): + import wave + with wave.open(wav_path, "rb") as wav_file: + params = wav_file.getparams() + frames = wav_file.readframes(wav_file.getnframes()) + audio_bytes = bytes(frames) + else: + raise NotImplementedError( + f'Not supported audio type') - audio_bytes = bytes(frames) # stride = int(args.chunk_size/1000*16000*2) - stride = int(60*args.chunk_size[1]/args.chunk_interval/1000*16000*2) - chunk_num = (len(audio_bytes)-1)//stride + 1 + stride = int(60 * args.chunk_size[1] / args.chunk_interval / 1000 * 16000 * 2) + chunk_num = (len(audio_bytes) - 1) // stride + 1 # print(stride) - + # send first time - message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, "wav_name": wav_name,"is_speaking": True}) - voices.put(message) + message = json.dumps({"mode": args.mode, "chunk_size": args.chunk_size, "chunk_interval": args.chunk_interval, + "wav_name": wav_name, "is_speaking": True}) + #voices.put(message) + await websocket.send(message) is_speaking = True for i in range(chunk_num): - beg = i*stride - data = audio_bytes[beg:beg+stride] - message = data - voices.put(message) - if i == chunk_num-1: + beg = i * stride + data = audio_bytes[beg:beg + stride] + message = data + #voices.put(message) + await websocket.send(message) + if i == chunk_num - 1: is_speaking = False message = json.dumps({"is_speaking": is_speaking}) - voices.put(message) - # print("data_chunk: ", len(data_chunk)) - # print(voices.qsize()) - sleep_duration = 0.001 if args.send_without_sleep else 60*args.chunk_size[1]/args.chunk_interval/1000 + #voices.put(message) + await websocket.send(message) + + sleep_duration = 0.001 if args.send_without_sleep else 60 * args.chunk_size[1] / args.chunk_interval / 1000 await asyncio.sleep(sleep_duration) + # when all data sent, we need to close websocket + while not voices.empty(): + await asyncio.sleep(1) + await asyncio.sleep(3) + # offline model need to wait for message recved + + if args.mode=="offline": + global offline_msg_done + while not offline_msg_done: + await asyncio.sleep(1) + + await websocket.close() + + + - -async def ws_send(): - global voices - global websocket - print("started to sending data!") - while True: - while not voices.empty(): - data = voices.get() - voices.task_done() - try: - await websocket.send(data) - except Exception as e: - print('Exception occurred:', e) - traceback.print_exc() - exit(0) - await asyncio.sleep(0.005) - await asyncio.sleep(0.005) - - - + + async def message(id): - global websocket + global websocket,voices,offline_msg_done text_print = "" text_print_2pass_online = "" text_print_2pass_offline = "" - while True: - try: + try: + while True: + meg = await websocket.recv() meg = json.loads(meg) wav_name = meg.get("wav_name", "demo") - # print(wav_name) text = meg["text"] if ibest_writer is not None: ibest_writer["text"][wav_name] = text - + if meg["mode"] == "online": text_print += "{}".format(text) text_print = text_print[-args.words_max_print:] os.system('clear') - print("\rpid"+str(id)+": "+text_print) + print("\rpid" + str(id) + ": " + text_print) elif meg["mode"] == "offline": text_print += "{}".format(text) text_print = text_print[-args.words_max_print:] os.system('clear') - print("\rpid"+str(id)+": "+text_print) + print("\rpid" + str(id) + ": " + text_print) + offline_msg_done=True else: if meg["mode"] == "2pass-online": text_print_2pass_online += "{}".format(text) @@ -211,10 +219,12 @@ async def message(id): os.system('clear') print("\rpid" + str(id) + ": " + text_print) - except Exception as e: + except Exception as e: print("Exception:", e) - traceback.print_exc() - exit(0) + #traceback.print_exc() + #await websocket.close() + + async def print_messge(): global websocket @@ -225,72 +235,87 @@ async def print_messge(): print(meg) except Exception as e: print("Exception:", e) - traceback.print_exc() + #traceback.print_exc() exit(0) -async def ws_client(id,chunk_begin,chunk_size): - global websocket - if args.ssl==1: - ssl_context = ssl.SSLContext() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - uri = "wss://{}:{}".format(args.host, args.port) +async def ws_client(id, chunk_begin, chunk_size): + if args.audio_in is None: + chunk_begin=0 + chunk_size=1 + global websocket,voices,offline_msg_done + + for i in range(chunk_begin,chunk_begin+chunk_size): + offline_msg_done=False + voices = Queue() + if args.ssl == 1: + ssl_context = ssl.SSLContext() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + uri = "wss://{}:{}".format(args.host, args.port) else: - uri = "ws://{}:{}".format(args.host, args.port) - ssl_context=None - print("connect to",uri) - async for websocket in websockets.connect(uri, subprotocols=["binary"], ping_interval=None,ssl=ssl_context): + uri = "ws://{}:{}".format(args.host, args.port) + ssl_context = None + print("connect to", uri) + async with websockets.connect(uri, subprotocols=["binary"], ping_interval=None, ssl=ssl_context) as websocket: if args.audio_in is not None: - task = asyncio.create_task(record_from_scp(chunk_begin,chunk_size)) + task = asyncio.create_task(record_from_scp(i, 1)) else: task = asyncio.create_task(record_microphone()) - task2 = asyncio.create_task(ws_send()) - task3 = asyncio.create_task(message(id)) - await asyncio.gather(task, task2, task3) - -def one_thread(id,chunk_begin,chunk_size): - asyncio.get_event_loop().run_until_complete(ws_client(id,chunk_begin,chunk_size)) - asyncio.get_event_loop().run_forever() + #task2 = asyncio.create_task(ws_send()) + task3 = asyncio.create_task(message(str(id)+"_"+str(i))) #processid+fileid + await asyncio.gather(task, task3) + exit(0) + +def one_thread(id, chunk_begin, chunk_size): + asyncio.get_event_loop().run_until_complete(ws_client(id, chunk_begin, chunk_size)) + asyncio.get_event_loop().run_forever() if __name__ == '__main__': - # for microphone - if args.audio_in is None: - p = Process(target=one_thread,args=(0, 0, 0)) - p.start() - p.join() - print('end') - else: - # calculate the number of wavs for each preocess - if args.audio_in.endswith(".scp"): - f_scp = open(args.audio_in) - wavs = f_scp.readlines() - else: - wavs = [args.audio_in] - total_len=len(wavs) - if total_len>=args.test_thread_num: - chunk_size=int((total_len)/args.test_thread_num) - remain_wavs=total_len-chunk_size*args.test_thread_num - else: - chunk_size=1 - remain_wavs=0 + # for microphone + if args.audio_in is None: + p = Process(target=one_thread, args=(0, 0, 0)) + p.start() + p.join() + print('end') + else: + # calculate the number of wavs for each preocess + if args.audio_in.endswith(".scp"): + f_scp = open(args.audio_in) + wavs = f_scp.readlines() + else: + wavs = [args.audio_in] + for wav in wavs: + wav_splits = wav.strip().split() + wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo" + wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0] + audio_type = os.path.splitext(wav_path)[-1].lower() + if audio_type not in SUPPORT_AUDIO_TYPE_SETS: + raise NotImplementedError( + f'Not supported audio type: {audio_type}') - process_list = [] - chunk_begin=0 - for i in range(args.test_thread_num): - now_chunk_size= chunk_size - if remain_wavs>0: - now_chunk_size=chunk_size+1 - remain_wavs=remain_wavs-1 - # process i handle wavs at chunk_begin and size of now_chunk_size - p = Process(target=one_thread,args=(i,chunk_begin,now_chunk_size)) - chunk_begin=chunk_begin+now_chunk_size - p.start() - process_list.append(p) + total_len = len(wavs) + if total_len >= args.test_thread_num: + chunk_size = int(total_len / args.test_thread_num) + remain_wavs = total_len - chunk_size * args.test_thread_num + else: + chunk_size = 1 + remain_wavs = 0 - for i in process_list: - p.join() - - print('end') + process_list = [] + chunk_begin = 0 + for i in range(args.test_thread_num): + now_chunk_size = chunk_size + if remain_wavs > 0: + now_chunk_size = chunk_size + 1 + remain_wavs = remain_wavs - 1 + # process i handle wavs at chunk_begin and size of now_chunk_size + p = Process(target=one_thread, args=(i, chunk_begin, now_chunk_size)) + chunk_begin = chunk_begin + now_chunk_size + p.start() + process_list.append(p) + for i in process_list: + p.join() + print('end') diff --git a/funasr/runtime/websocket/CMakeLists.txt b/funasr/runtime/websocket/CMakeLists.txt index 58ca97248..c1715d828 100644 --- a/funasr/runtime/websocket/CMakeLists.txt +++ b/funasr/runtime/websocket/CMakeLists.txt @@ -6,12 +6,10 @@ set(CMAKE_CXX_STANDARD 14 CACHE STRING "The C++ version to be used.") set(CMAKE_POSITION_INDEPENDENT_CODE ON) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) - option(ENABLE_WEBSOCKET "Whether to build websocket server" ON) if(ENABLE_WEBSOCKET) # cmake_policy(SET CMP0135 NEW) - include(FetchContent) FetchContent_Declare(websocketpp GIT_REPOSITORY https://github.com/zaphoyd/websocketpp.git @@ -22,7 +20,6 @@ if(ENABLE_WEBSOCKET) FetchContent_MakeAvailable(websocketpp) include_directories(${PROJECT_SOURCE_DIR}/third_party/websocket) - FetchContent_Declare(asio URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-24-0.tar.gz SOURCE_DIR ${PROJECT_SOURCE_DIR}/third_party/asio @@ -38,8 +35,6 @@ if(ENABLE_WEBSOCKET) FetchContent_MakeAvailable(json) include_directories(${PROJECT_SOURCE_DIR}/third_party/json/include) - - endif() @@ -61,8 +56,8 @@ add_subdirectory(${PROJECT_SOURCE_DIR}/../onnxruntime/third_party/glog glog) # install openssl first apt-get install libssl-dev find_package(OpenSSL REQUIRED) -add_executable(websocketmain "websocketmain.cpp" "websocketsrv.cpp") -add_executable(websocketclient "websocketclient.cpp") +add_executable(funasr-ws-server "funasr-ws-server.cpp" "websocket-server.cpp") +add_executable(funasr-ws-client "funasr-ws-client.cpp") -target_link_libraries(websocketclient PUBLIC funasr ssl crypto) -target_link_libraries(websocketmain PUBLIC funasr ssl crypto) +target_link_libraries(funasr-ws-client PUBLIC funasr ssl crypto) +target_link_libraries(funasr-ws-server PUBLIC funasr ssl crypto) diff --git a/funasr/runtime/websocket/funasr-ws-client.cpp b/funasr/runtime/websocket/funasr-ws-client.cpp new file mode 100644 index 000000000..4a3c7516d --- /dev/null +++ b/funasr/runtime/websocket/funasr-ws-client.cpp @@ -0,0 +1,366 @@ +/** + * Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights + * Reserved. MIT License (https://opensource.org/licenses/MIT) + */ +/* 2022-2023 by zhaomingwork */ + +// client for websocket, support multiple threads +// ./funasr-ws-client --server-ip +// --port +// --wav-path +// [--thread-num ] +// [--is-ssl ] [--] +// [--version] [-h] +// example: +// ./funasr-ws-client --server-ip 127.0.0.1 --port 8889 --wav-path test.wav --thread-num 1 --is-ssl 0 + +#define ASIO_STANDALONE 1 +#include +#include +#include +#include +#include +#include + +#include "audio.h" +#include "nlohmann/json.hpp" +#include "tclap/CmdLine.h" + +/** + * Define a semi-cross platform helper method that waits/sleeps for a bit. + */ +void WaitABit() { + #ifdef WIN32 + Sleep(1000); + #else + sleep(1); + #endif +} +std::atomic wav_index(0); + +bool IsTargetFile(const std::string& filename, const std::string target) { + std::size_t pos = filename.find_last_of("."); + if (pos == std::string::npos) { + return false; + } + std::string extension = filename.substr(pos + 1); + return (extension == target); +} + +typedef websocketpp::config::asio_client::message_type::ptr message_ptr; +typedef websocketpp::lib::shared_ptr context_ptr; +using websocketpp::lib::bind; +using websocketpp::lib::placeholders::_1; +using websocketpp::lib::placeholders::_2; +context_ptr OnTlsInit(websocketpp::connection_hdl) { + context_ptr ctx = websocketpp::lib::make_shared( + asio::ssl::context::sslv23); + + try { + ctx->set_options( + asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv2 | + asio::ssl::context::no_sslv3 | asio::ssl::context::single_dh_use); + + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + } + return ctx; +} + +// template for tls or not config +template +class WebsocketClient { + public: + // typedef websocketpp::client client; + // typedef websocketpp::client + // wss_client; + typedef websocketpp::lib::lock_guard scoped_lock; + + WebsocketClient(int is_ssl) : m_open(false), m_done(false) { + // set up access channels to only log interesting things + m_client.clear_access_channels(websocketpp::log::alevel::all); + m_client.set_access_channels(websocketpp::log::alevel::connect); + m_client.set_access_channels(websocketpp::log::alevel::disconnect); + m_client.set_access_channels(websocketpp::log::alevel::app); + + // Initialize the Asio transport policy + m_client.init_asio(); + + // Bind the handlers we are using + using websocketpp::lib::bind; + using websocketpp::lib::placeholders::_1; + m_client.set_open_handler(bind(&WebsocketClient::on_open, this, _1)); + m_client.set_close_handler(bind(&WebsocketClient::on_close, this, _1)); + // m_client.set_close_handler(bind(&WebsocketClient::on_close, this, _1)); + + m_client.set_message_handler( + [this](websocketpp::connection_hdl hdl, message_ptr msg) { + on_message(hdl, msg); + }); + + m_client.set_fail_handler(bind(&WebsocketClient::on_fail, this, _1)); + m_client.clear_access_channels(websocketpp::log::alevel::all); + } + + void on_message(websocketpp::connection_hdl hdl, message_ptr msg) { + const std::string& payload = msg->get_payload(); + switch (msg->get_opcode()) { + case websocketpp::frame::opcode::text: + total_num=total_num+1; + LOG(INFO)<& wav_list, const std::vector& wav_ids) { + // Create a new connection to the given URI + websocketpp::lib::error_code ec; + typename websocketpp::client::connection_ptr con = + m_client.get_connection(uri, ec); + if (ec) { + m_client.get_alog().write(websocketpp::log::alevel::app, + "Get Connection Error: " + ec.message()); + return; + } + // Grab a handle for this connection so we can talk to it in a thread + // safe manor after the event loop starts. + m_hdl = con->get_handle(); + + // Queue the connection. No DNS queries or network connections will be + // made until the io_service event loop is run. + m_client.connect(con); + + // Create a thread to run the ASIO io_service event loop + websocketpp::lib::thread asio_thread(&websocketpp::client::run, + &m_client); + while(true){ + int i = wav_index.fetch_add(1); + if (i >= wav_list.size()) { + break; + } + send_wav_data(wav_list[i], wav_ids[i]); + } + WaitABit(); + + asio_thread.join(); + + } + + // The open handler will signal that we are ready to start sending data + void on_open(websocketpp::connection_hdl) { + m_client.get_alog().write(websocketpp::log::alevel::app, + "Connection opened, starting data!"); + + scoped_lock guard(m_lock); + m_open = true; + } + + // The close handler will signal that we should stop sending data + void on_close(websocketpp::connection_hdl) { + m_client.get_alog().write(websocketpp::log::alevel::app, + "Connection closed, stopping data!"); + + scoped_lock guard(m_lock); + m_done = true; + } + + // The fail handler will signal that we should stop sending data + void on_fail(websocketpp::connection_hdl) { + m_client.get_alog().write(websocketpp::log::alevel::app, + "Connection failed, stopping data!"); + + scoped_lock guard(m_lock); + m_done = true; + } + // send wav to server + void send_wav_data(string wav_path, string wav_id) { + uint64_t count = 0; + std::stringstream val; + + funasr::Audio audio(1); + int32_t sampling_rate = 16000; + if(IsTargetFile(wav_path.c_str(), "wav")){ + int32_t sampling_rate = -1; + if(!audio.LoadWav(wav_path.c_str(), &sampling_rate)) + return ; + }else if(IsTargetFile(wav_path.c_str(), "pcm")){ + if (!audio.LoadPcmwav(wav_path.c_str(), &sampling_rate)) + return ; + }else{ + printf("Wrong wav extension"); + exit(-1); + } + + float* buff; + int len; + int flag = 0; + bool wait = false; + while (1) { + { + scoped_lock guard(m_lock); + // If the connection has been closed, stop generating data + if (m_done) { + break; + } + // If the connection hasn't been opened yet wait a bit and retry + if (!m_open) { + wait = true; + } else { + break; + } + } + if (wait) { + LOG(INFO) << "wait.." << m_open; + WaitABit(); + continue; + } + } + websocketpp::lib::error_code ec; + + nlohmann::json jsonbegin; + nlohmann::json chunk_size = nlohmann::json::array(); + chunk_size.push_back(5); + chunk_size.push_back(0); + chunk_size.push_back(5); + jsonbegin["chunk_size"] = chunk_size; + jsonbegin["chunk_interval"] = 10; + jsonbegin["wav_name"] = wav_id; + jsonbegin["is_speaking"] = true; + m_client.send(m_hdl, jsonbegin.dump(), websocketpp::frame::opcode::text, + ec); + + // fetch wav data use asr engine api + while (audio.Fetch(buff, len, flag) > 0) { + short iArray[len]; + + // convert float -1,1 to short -32768,32767 + for (size_t i = 0; i < len; ++i) { + iArray[i] = (short)(buff[i] * 32767); + } + // send data to server + m_client.send(m_hdl, iArray, len * sizeof(short), + websocketpp::frame::opcode::binary, ec); + LOG(INFO) << "sended data len=" << len * sizeof(short); + // The most likely error that we will get is that the connection is + // not in the right state. Usually this means we tried to send a + // message to a connection that was closed or in the process of + // closing. While many errors here can be easily recovered from, + // in this simple example, we'll stop the data loop. + if (ec) { + m_client.get_alog().write(websocketpp::log::alevel::app, + "Send Error: " + ec.message()); + break; + } + // WaitABit(); + } + nlohmann::json jsonresult; + jsonresult["is_speaking"] = false; + m_client.send(m_hdl, jsonresult.dump(), websocketpp::frame::opcode::text, + ec); + // WaitABit(); + } + websocketpp::client m_client; + + private: + websocketpp::connection_hdl m_hdl; + websocketpp::lib::mutex m_lock; + bool m_open; + bool m_done; + int total_num=0; +}; + +int main(int argc, char* argv[]) { + google::InitGoogleLogging(argv[0]); + FLAGS_logtostderr = true; + + TCLAP::CmdLine cmd("funasr-ws-client", ' ', "1.0"); + TCLAP::ValueArg server_ip_("", "server-ip", "server-ip", true, + "127.0.0.1", "string"); + TCLAP::ValueArg port_("", "port", "port", true, "8889", "string"); + TCLAP::ValueArg wav_path_("", "wav-path", + "the input could be: wav_path, e.g.: asr_example.wav; pcm_path, e.g.: asr_example.pcm; wav.scp, kaldi style wav list (wav_id \t wav_path)", + true, "", "string"); + TCLAP::ValueArg thread_num_("", "thread-num", "thread-num", + false, 1, "int"); + TCLAP::ValueArg is_ssl_( + "", "is-ssl", "is-ssl is 1 means use wss connection, or use ws connection", + false, 0, "int"); + + cmd.add(server_ip_); + cmd.add(port_); + cmd.add(wav_path_); + cmd.add(thread_num_); + cmd.add(is_ssl_); + cmd.parse(argc, argv); + + std::string server_ip = server_ip_.getValue(); + std::string port = port_.getValue(); + std::string wav_path = wav_path_.getValue(); + int threads_num = thread_num_.getValue(); + int is_ssl = is_ssl_.getValue(); + + std::vector client_threads; + std::string uri = ""; + if (is_ssl == 1) { + uri = "wss://" + server_ip + ":" + port; + } else { + uri = "ws://" + server_ip + ":" + port; + } + + // read wav_path + std::vector wav_list; + std::vector wav_ids; + string default_id = "wav_default_id"; + if(IsTargetFile(wav_path, "wav") || IsTargetFile(wav_path, "pcm")){ + wav_list.emplace_back(wav_path); + wav_ids.emplace_back(default_id); + } + else if(IsTargetFile(wav_path, "scp")){ + ifstream in(wav_path); + if (!in.is_open()) { + printf("Failed to open scp file"); + return 0; + } + string line; + while(getline(in, line)) + { + istringstream iss(line); + string column1, column2; + iss >> column1 >> column2; + wav_list.emplace_back(column2); + wav_ids.emplace_back(column1); + } + in.close(); + }else{ + printf("Please check the wav extension!"); + exit(-1); + } + + for (size_t i = 0; i < threads_num; i++) { + client_threads.emplace_back([uri, wav_list, wav_ids, is_ssl]() { + if (is_ssl == 1) { + WebsocketClient c(is_ssl); + + c.m_client.set_tls_init_handler(bind(&OnTlsInit, ::_1)); + + c.run(uri, wav_list, wav_ids); + } else { + WebsocketClient c(is_ssl); + + c.run(uri, wav_list, wav_ids); + } + }); + } + + for (auto& t : client_threads) { + t.join(); + } +} \ No newline at end of file diff --git a/funasr/runtime/websocket/websocketmain.cpp b/funasr/runtime/websocket/funasr-ws-server.cpp similarity index 97% rename from funasr/runtime/websocket/websocketmain.cpp rename to funasr/runtime/websocket/funasr-ws-server.cpp index fabf6d81f..872f6a1a9 100644 --- a/funasr/runtime/websocket/websocketmain.cpp +++ b/funasr/runtime/websocket/funasr-ws-server.cpp @@ -5,12 +5,12 @@ /* 2022-2023 by zhaomingwork */ // io server -// Usage:websocketmain [--model_thread_num ] [--decoder_thread_num ] +// Usage:funasr-ws-server [--model_thread_num ] [--decoder_thread_num ] // [--io_thread_num ] [--port ] [--listen_ip // ] [--punc-quant ] [--punc-dir ] // [--vad-quant ] [--vad-dir ] [--quantize // ] --model-dir [--] [--version] [-h] -#include "websocketsrv.h" +#include "websocket-server.h" using namespace std; void GetValue(TCLAP::ValueArg& value_arg, string key, @@ -25,7 +25,7 @@ int main(int argc, char* argv[]) { google::InitGoogleLogging(argv[0]); FLAGS_logtostderr = true; - TCLAP::CmdLine cmd("websocketmain", ' ', "1.0"); + TCLAP::CmdLine cmd("funasr-ws-server", ' ', "1.0"); TCLAP::ValueArg model_dir( "", MODEL_DIR, "the asr model path, which contains model.onnx, config.yaml, am.mvn", diff --git a/funasr/runtime/websocket/readme.md b/funasr/runtime/websocket/readme.md index 99255c8bb..4a1a9d42c 100644 --- a/funasr/runtime/websocket/readme.md +++ b/funasr/runtime/websocket/readme.md @@ -51,7 +51,7 @@ make ```shell cd bin - ./websocketmain [--model_thread_num ] [--decoder_thread_num ] + ./funasr-ws-server [--model_thread_num ] [--decoder_thread_num ] [--io_thread_num ] [--port ] [--listen_ip ] [--punc-quant ] [--punc-dir ] [--vad-quant ] [--vad-dir ] [--quantize @@ -88,19 +88,38 @@ Where: If use vad, please add: --vad-dir If use punc, please add: --punc-dir example: - websocketmain --model-dir /FunASR/funasr/runtime/onnxruntime/export/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch + funasr-ws-server --model-dir /FunASR/funasr/runtime/onnxruntime/export/damo/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch ``` ## Run websocket client test ```shell -Usage: ./websocketclient server_ip port wav_path threads_num is_ssl +./funasr-ws-client --server-ip + --port + --wav-path + [--thread-num ] + [--is-ssl ] [--] + [--version] [-h] -is_ssl is 1 means use wss connection, or use ws connection +Where: + --server-ip + (required) server-ip + + --port + (required) port + + --wav-path + (required) the input could be: wav_path, e.g.: asr_example.wav; + pcm_path, e.g.: asr_example.pcm; wav.scp, kaldi style wav list (wav_id \t wav_path) + + --thread-num + thread-num + + --is-ssl + is-ssl is 1 means use wss connection, or use ws connection example: - -websocketclient 127.0.0.1 8889 funasr/runtime/websocket/test.pcm.wav 64 0 +./funasr-ws-client --server-ip 127.0.0.1 --port 8889 --wav-path test.wav --thread-num 1 --is-ssl 0 result json, example like: {"mode":"offline","text":"欢迎大家来体验达摩院推出的语音识别模型","wav_name":"wav2"} diff --git a/funasr/runtime/websocket/websocketsrv.cpp b/funasr/runtime/websocket/websocket-server.cpp similarity index 88% rename from funasr/runtime/websocket/websocketsrv.cpp rename to funasr/runtime/websocket/websocket-server.cpp index eb3c8db5b..a311c2355 100644 --- a/funasr/runtime/websocket/websocketsrv.cpp +++ b/funasr/runtime/websocket/websocket-server.cpp @@ -10,7 +10,7 @@ // pools, one for handle network data and one for asr decoder. // now only support offline engine. -#include "websocketsrv.h" +#include "websocket-server.h" #include #include @@ -22,12 +22,11 @@ context_ptr WebSocketServer::on_tls_init(tls_mode mode, std::string& s_keyfile) { namespace asio = websocketpp::lib::asio; - std::cout << "on_tls_init called with hdl: " << hdl.lock().get() << std::endl; - std::cout << "using TLS mode: " + LOG(INFO) << "on_tls_init called with hdl: " << hdl.lock().get(); + LOG(INFO) << "using TLS mode: " << (mode == MOZILLA_MODERN ? "Mozilla Modern" - : "Mozilla Intermediate") - << std::endl; - + : "Mozilla Intermediate"); + context_ptr ctx = websocketpp::lib::make_shared( asio::ssl::context::sslv23); @@ -49,7 +48,7 @@ context_ptr WebSocketServer::on_tls_init(tls_mode mode, ctx->use_private_key_file(s_keyfile, asio::ssl::context::pem); } catch (std::exception& e) { - std::cout << "Exception: " << e.what() << std::endl; + LOG(INFO) << "Exception: " << e.what(); } return ctx; } @@ -86,8 +85,7 @@ void WebSocketServer::do_decoder(const std::vector& buffer, ec); } - std::cout << "buffer.size=" << buffer.size() - << ",result json=" << jsonresult.dump() << std::endl; + LOG(INFO) << "buffer.size=" << buffer.size() << ",result json=" << jsonresult.dump(); if (!isonline) { // close the client if it is not online asr // server_->close(hdl, websocketpp::close::status::normal, "DONE", ec); @@ -110,14 +108,14 @@ void WebSocketServer::on_open(websocketpp::connection_hdl hdl) { data_msg->samples = std::make_shared>(); data_msg->msg = nlohmann::json::parse("{}"); data_map.emplace(hdl, data_msg); - std::cout << "on_open, active connections: " << data_map.size() << std::endl; + LOG(INFO) << "on_open, active connections: " << data_map.size(); } void WebSocketServer::on_close(websocketpp::connection_hdl hdl) { scoped_lock guard(m_lock); data_map.erase(hdl); // remove data vector when connection is closed - std::cout << "on_close, active connections: " << data_map.size() << std::endl; + LOG(INFO) << "on_close, active connections: " << data_map.size(); } // remove closed connection @@ -143,7 +141,7 @@ void WebSocketServer::check_and_clean_connection() { } for (auto hdl : to_remove) { data_map.erase(hdl); - std::cout << "remove one connection " << std::endl; + LOG(INFO)<< "remove one connection "; } } void WebSocketServer::on_message(websocketpp::connection_hdl hdl, @@ -161,7 +159,7 @@ void WebSocketServer::on_message(websocketpp::connection_hdl hdl, lock.unlock(); if (sample_data_p == nullptr) { - std::cout << "error when fetch sample data vector" << std::endl; + LOG(INFO) << "error when fetch sample data vector"; return; } @@ -176,7 +174,7 @@ void WebSocketServer::on_message(websocketpp::connection_hdl hdl, if (jsonresult["is_speaking"] == false || jsonresult["is_finished"] == true) { - std::cout << "client done" << std::endl; + LOG(INFO) << "client done"; if (isonline) { // do_close(ws); @@ -225,9 +223,9 @@ void WebSocketServer::initAsr(std::map& model_path, // init model with api asr_hanlde = FunOfflineInit(model_path, thread_num); - std::cout << "model ready" << std::endl; + LOG(INFO) << "model successfully inited"; } catch (const std::exception& e) { - std::cout << e.what() << std::endl; + LOG(INFO) << e.what(); } } diff --git a/funasr/runtime/websocket/websocketsrv.h b/funasr/runtime/websocket/websocket-server.h similarity index 98% rename from funasr/runtime/websocket/websocketsrv.h rename to funasr/runtime/websocket/websocket-server.h index 3cb881692..198af1cfd 100644 --- a/funasr/runtime/websocket/websocketsrv.h +++ b/funasr/runtime/websocket/websocket-server.h @@ -10,8 +10,8 @@ // pools, one for handle network data and one for asr decoder. // now only support offline engine. -#ifndef WEBSOCKETSRV_SERVER_H_ -#define WEBSOCKETSRV_SERVER_H_ +#ifndef WEBSOCKET_SERVER_H_ +#define WEBSOCKET_SERVER_H_ #include #include @@ -134,4 +134,4 @@ class WebSocketServer { websocketpp::lib::mutex m_lock; // mutex for sample_map }; -#endif // WEBSOCKETSRV_SERVER_H_ +#endif // WEBSOCKET_SERVER_H_ diff --git a/funasr/runtime/websocket/websocketclient.cpp b/funasr/runtime/websocket/websocketclient.cpp deleted file mode 100644 index e9f8f1dfd..000000000 --- a/funasr/runtime/websocket/websocketclient.cpp +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights - * Reserved. MIT License (https://opensource.org/licenses/MIT) - */ -/* 2022-2023 by zhaomingwork */ - -// client for websocket, support multiple threads -// Usage: websocketclient server_ip port wav_path threads_num - -#define ASIO_STANDALONE 1 -#include -#include -#include - -#include "audio.h" -#include "nlohmann/json.hpp" - -/** - * Define a semi-cross platform helper method that waits/sleeps for a bit. - */ -void wait_a_bit() { -#ifdef WIN32 - Sleep(1000); -#else - sleep(1); -#endif -} -typedef websocketpp::config::asio_client::message_type::ptr message_ptr; -typedef websocketpp::lib::shared_ptr - context_ptr; -using websocketpp::lib::bind; -using websocketpp::lib::placeholders::_1; -using websocketpp::lib::placeholders::_2; -context_ptr on_tls_init(websocketpp::connection_hdl) { - context_ptr ctx = websocketpp::lib::make_shared( - asio::ssl::context::sslv23); - - try { - ctx->set_options( - asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv2 | - asio::ssl::context::no_sslv3 | asio::ssl::context::single_dh_use); - - } catch (std::exception& e) { - std::cout << e.what() << std::endl; - } - return ctx; -} -// template for tls or not config -template -class websocket_client { - public: - // typedef websocketpp::client client; - // typedef websocketpp::client - // wss_client; - typedef websocketpp::lib::lock_guard scoped_lock; - - websocket_client(int is_ssl) : m_open(false), m_done(false) { - // set up access channels to only log interesting things - - m_client.clear_access_channels(websocketpp::log::alevel::all); - m_client.set_access_channels(websocketpp::log::alevel::connect); - m_client.set_access_channels(websocketpp::log::alevel::disconnect); - m_client.set_access_channels(websocketpp::log::alevel::app); - - // Initialize the Asio transport policy - m_client.init_asio(); - - // Bind the handlers we are using - using websocketpp::lib::bind; - using websocketpp::lib::placeholders::_1; - m_client.set_open_handler(bind(&websocket_client::on_open, this, _1)); - m_client.set_close_handler(bind(&websocket_client::on_close, this, _1)); - m_client.set_close_handler(bind(&websocket_client::on_close, this, _1)); - - m_client.set_message_handler( - [this](websocketpp::connection_hdl hdl, message_ptr msg) { - on_message(hdl, msg); - }); - - m_client.set_fail_handler(bind(&websocket_client::on_fail, this, _1)); - m_client.clear_access_channels(websocketpp::log::alevel::all); - } - void on_message(websocketpp::connection_hdl hdl, message_ptr msg) { - const std::string& payload = msg->get_payload(); - switch (msg->get_opcode()) { - case websocketpp::frame::opcode::text: - std::cout << "on_message=" << payload << std::endl; - } - } - // This method will block until the connection is complete - - void run(const std::string& uri, const std::string& wav_path) { - // Create a new connection to the given URI - websocketpp::lib::error_code ec; - typename websocketpp::client::connection_ptr con = - m_client.get_connection(uri, ec); - if (ec) { - m_client.get_alog().write(websocketpp::log::alevel::app, - "Get Connection Error: " + ec.message()); - return; - } - this->wav_path = std::move(wav_path); - // Grab a handle for this connection so we can talk to it in a thread - // safe manor after the event loop starts. - m_hdl = con->get_handle(); - - // Queue the connection. No DNS queries or network connections will be - // made until the io_service event loop is run. - m_client.connect(con); - - // Create a thread to run the ASIO io_service event loop - websocketpp::lib::thread asio_thread(&websocketpp::client::run, - &m_client); - - send_wav_data(); - asio_thread.join(); - } - - // The open handler will signal that we are ready to start sending data - void on_open(websocketpp::connection_hdl) { - m_client.get_alog().write(websocketpp::log::alevel::app, - "Connection opened, starting data!"); - - scoped_lock guard(m_lock); - m_open = true; - } - - // The close handler will signal that we should stop sending data - void on_close(websocketpp::connection_hdl) { - m_client.get_alog().write(websocketpp::log::alevel::app, - "Connection closed, stopping data!"); - - scoped_lock guard(m_lock); - m_done = true; - } - - // The fail handler will signal that we should stop sending data - void on_fail(websocketpp::connection_hdl) { - m_client.get_alog().write(websocketpp::log::alevel::app, - "Connection failed, stopping data!"); - - scoped_lock guard(m_lock); - m_done = true; - } - // send wav to server - void send_wav_data() { - uint64_t count = 0; - std::stringstream val; - - funasr::Audio audio(1); - int32_t sampling_rate = 16000; - - if (!audio.LoadPcmwav(wav_path.c_str(), &sampling_rate)) { - std::cout << "error in load wav" << std::endl; - return; - } - - float* buff; - int len; - int flag = 0; - bool wait = false; - while (1) { - { - scoped_lock guard(m_lock); - // If the connection has been closed, stop generating data - if (m_done) { - break; - } - - // If the connection hasn't been opened yet wait a bit and retry - if (!m_open) { - wait = true; - } else { - break; - } - } - - if (wait) { - std::cout << "wait.." << m_open << std::endl; - wait_a_bit(); - - continue; - } - } - websocketpp::lib::error_code ec; - - nlohmann::json jsonbegin; - nlohmann::json chunk_size = nlohmann::json::array(); - chunk_size.push_back(5); - chunk_size.push_back(0); - chunk_size.push_back(5); - jsonbegin["chunk_size"] = chunk_size; - jsonbegin["chunk_interval"] = 10; - jsonbegin["wav_name"] = "damo"; - jsonbegin["is_speaking"] = true; - m_client.send(m_hdl, jsonbegin.dump(), websocketpp::frame::opcode::text, - ec); - - // fetch wav data use asr engine api - while (audio.Fetch(buff, len, flag) > 0) { - short iArray[len]; - - // convert float -1,1 to short -32768,32767 - for (size_t i = 0; i < len; ++i) { - iArray[i] = (short)(buff[i] * 32767); - } - // send data to server - m_client.send(m_hdl, iArray, len * sizeof(short), - websocketpp::frame::opcode::binary, ec); - std::cout << "sended data len=" << len * sizeof(short) << std::endl; - // The most likely error that we will get is that the connection is - // not in the right state. Usually this means we tried to send a - // message to a connection that was closed or in the process of - // closing. While many errors here can be easily recovered from, - // in this simple example, we'll stop the data loop. - if (ec) { - m_client.get_alog().write(websocketpp::log::alevel::app, - "Send Error: " + ec.message()); - break; - } - - wait_a_bit(); - } - nlohmann::json jsonresult; - jsonresult["is_speaking"] = false; - m_client.send(m_hdl, jsonresult.dump(), websocketpp::frame::opcode::text, - ec); - wait_a_bit(); - } - websocketpp::client m_client; - - private: - websocketpp::connection_hdl m_hdl; - websocketpp::lib::mutex m_lock; - std::string wav_path; - bool m_open; - bool m_done; -}; - -int main(int argc, char* argv[]) { - if (argc < 6) { - printf("Usage: %s server_ip port wav_path threads_num is_ssl\n", argv[0]); - exit(-1); - } - std::string server_ip = argv[1]; - std::string port = argv[2]; - std::string wav_path = argv[3]; - int threads_num = atoi(argv[4]); - int is_ssl = atoi(argv[5]); - std::vector client_threads; - std::string uri = ""; - if (is_ssl == 1) { - uri = "wss://" + server_ip + ":" + port; - } else { - uri = "ws://" + server_ip + ":" + port; - } - - for (size_t i = 0; i < threads_num; i++) { - client_threads.emplace_back([uri, wav_path, is_ssl]() { - if (is_ssl == 1) { - websocket_client c(is_ssl); - - c.m_client.set_tls_init_handler(bind(&on_tls_init, ::_1)); - - c.run(uri, wav_path); - } else { - websocket_client c(is_ssl); - - c.run(uri, wav_path); - } - }); - } - - for (auto& t : client_threads) { - t.join(); - } -} \ No newline at end of file diff --git a/tests/test_asr_inference_pipeline.py b/tests/test_asr_inference_pipeline.py index 9098ea62d..2b21acf99 100644 --- a/tests/test_asr_inference_pipeline.py +++ b/tests/test_asr_inference_pipeline.py @@ -87,6 +87,7 @@ class TestParaformerInferencePipelines(unittest.TestCase): rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_hotword.wav') logger.info("asr inference result: {0}".format(rec_result)) + assert rec_result["text"] == "国务院发展研究中心市场经济研究所副所长邓郁松认为" def test_paraformer_large_aishell1(self): inference_pipeline = pipeline( @@ -95,6 +96,7 @@ class TestParaformerInferencePipelines(unittest.TestCase): rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) + assert rec_result["text"] == "欢迎大家来体验达摩院推出的语音识别模型" def test_paraformer_large_aishell2(self): inference_pipeline = pipeline( @@ -103,6 +105,7 @@ class TestParaformerInferencePipelines(unittest.TestCase): rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) + assert rec_result["text"] == "欢迎大家来体验达摩院推出的语音识别模型" def test_paraformer_large_common(self): inference_pipeline = pipeline( @@ -111,6 +114,7 @@ class TestParaformerInferencePipelines(unittest.TestCase): rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) + assert rec_result["text"] == "欢迎大家来体验达摩院推出的语音识别模型" def test_paraformer_large_online_common(self): inference_pipeline = pipeline( @@ -119,6 +123,7 @@ class TestParaformerInferencePipelines(unittest.TestCase): rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) + assert rec_result["text"] == "欢迎大 家来 体验达 摩院推 出的 语音识 别模 型" def test_paraformer_online_common(self): inference_pipeline = pipeline( @@ -127,6 +132,7 @@ class TestParaformerInferencePipelines(unittest.TestCase): rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr inference result: {0}".format(rec_result)) + assert rec_result["text"] == "欢迎 大家来 体验达 摩院推 出的 语音识 别模 型" def test_paraformer_tiny_commandword(self): inference_pipeline = pipeline( diff --git a/tests/test_asr_vad_punc_inference_pipeline.py b/tests/test_asr_vad_punc_inference_pipeline.py index 628b256c0..f86f23dab 100644 --- a/tests/test_asr_vad_punc_inference_pipeline.py +++ b/tests/test_asr_vad_punc_inference_pipeline.py @@ -26,6 +26,7 @@ class TestParaformerInferencePipelines(unittest.TestCase): rec_result = inference_pipeline( audio_in='https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ASR/test_audio/asr_example_zh.wav') logger.info("asr_vad_punc inference result: {0}".format(rec_result)) + assert rec_result["text"] == "欢迎大家来体验达摩院推出的语音识别模型。" if __name__ == '__main__':