# Copyright (c) Alibaba, Inc. and its affiliates. # Part of the implementation is borrowed from espnet/espnet. from typing import Tuple import numpy as np import torch import torchaudio.compliance.kaldi as kaldi from funasr.models.frontend.abs_frontend import AbsFrontend from typeguard import check_argument_types from torch.nn.utils.rnn import pad_sequence import kaldifeat def load_cmvn(cmvn_file): with open(cmvn_file, 'r', encoding='utf-8') as f: lines = f.readlines() means_list = [] vars_list = [] for i in range(len(lines)): line_item = lines[i].split() if line_item[0] == '': line_item = lines[i + 1].split() if line_item[0] == '': add_shift_line = line_item[3:(len(line_item) - 1)] means_list = list(add_shift_line) continue elif line_item[0] == '': line_item = lines[i + 1].split() if line_item[0] == '': rescale_line = line_item[3:(len(line_item) - 1)] vars_list = list(rescale_line) continue means = np.array(means_list).astype(np.float) vars = np.array(vars_list).astype(np.float) cmvn = np.array([means, vars]) cmvn = torch.as_tensor(cmvn) return cmvn def apply_cmvn(inputs, cmvn_file): # noqa """ Apply CMVN with mvn data """ device = inputs.device dtype = inputs.dtype frame, dim = inputs.shape cmvn = load_cmvn(cmvn_file) means = np.tile(cmvn[0:1, :dim], (frame, 1)) vars = np.tile(cmvn[1:2, :dim], (frame, 1)) inputs += torch.from_numpy(means).type(dtype).to(device) inputs *= torch.from_numpy(vars).type(dtype).to(device) return inputs.type(torch.float32) def apply_lfr(inputs, lfr_m, lfr_n): LFR_inputs = [] T = inputs.shape[0] T_lfr = int(np.ceil(T / lfr_n)) left_padding = inputs[0].repeat((lfr_m - 1) // 2, 1) inputs = torch.vstack((left_padding, inputs)) T = T + (lfr_m - 1) // 2 for i in range(T_lfr): if lfr_m <= T - i * lfr_n: LFR_inputs.append((inputs[i * lfr_n:i * lfr_n + lfr_m]).view(1, -1)) else: # process last LFR frame num_padding = lfr_m - (T - i * lfr_n) frame = (inputs[i * lfr_n:]).view(-1) for _ in range(num_padding): frame = torch.hstack((frame, inputs[-1])) LFR_inputs.append(frame) LFR_outputs = torch.vstack(LFR_inputs) return LFR_outputs.type(torch.float32) class WavFrontend_kaldifeat(AbsFrontend): """Conventional frontend structure for ASR. """ def __init__( self, cmvn_file: str = None, fs: int = 16000, window: str = 'hamming', n_mels: int = 80, frame_length: int = 25, frame_shift: int = 10, lfr_m: int = 1, lfr_n: int = 1, dither: float = 1.0, snip_edges: bool = True, upsacle_samples: bool = True, device: str = 'cpu', **kwargs, ): super().__init__() opts = kaldifeat.FbankOptions() opts.device = device opts.frame_opts.samp_freq = fs opts.frame_opts.dither = dither opts.frame_opts.window_type = window opts.frame_opts.frame_shift_ms = float(frame_shift) opts.frame_opts.frame_length_ms = float(frame_length) opts.mel_opts.num_bins = n_mels opts.energy_floor = 0 opts.frame_opts.snip_edges = snip_edges opts.mel_opts.debug_mel = False self.opts = opts self.fbank_fn = None self.fbank_beg_idx = 0 self.reset_fbank_status() self.lfr_m = lfr_m self.lfr_n = lfr_n self.cmvn_file = cmvn_file self.upsacle_samples = upsacle_samples def output_size(self) -> int: return self.n_mels * self.lfr_m def forward_fbank( self, input: torch.Tensor, input_lengths: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: batch_size = input.size(0) feats = [] feats_lens = [] for i in range(batch_size): waveform_length = input_lengths[i] waveform = input[i][:waveform_length] waveform = waveform * (1 << 15) self.fbank_fn.accept_waveform(self.opts.frame_opts.samp_freq, waveform.tolist()) frames = self.fbank_fn.num_frames_ready frames_cur = frames - self.fbank_beg_idx mat = torch.empty([frames_cur, self.opts.mel_opts.num_bins], dtype=torch.float32).to( device=self.opts.device) for i in range(self.fbank_beg_idx, frames): mat[i, :] = self.fbank_fn.get_frame(i) self.fbank_beg_idx += frames_cur feat_length = mat.size(0) feats.append(mat) feats_lens.append(feat_length) feats_lens = torch.as_tensor(feats_lens) feats_pad = pad_sequence(feats, batch_first=True, padding_value=0.0) return feats_pad, feats_lens def reset_fbank_status(self): self.fbank_fn = kaldifeat.OnlineFbank(self.opts) self.fbank_beg_idx = 0 def forward_lfr_cmvn( self, input: torch.Tensor, input_lengths: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: batch_size = input.size(0) feats = [] feats_lens = [] for i in range(batch_size): mat = input[i, :input_lengths[i], :] if self.lfr_m != 1 or self.lfr_n != 1: mat = apply_lfr(mat, self.lfr_m, self.lfr_n) if self.cmvn_file is not None: mat = apply_cmvn(mat, self.cmvn_file) feat_length = mat.size(0) feats.append(mat) feats_lens.append(feat_length) feats_lens = torch.as_tensor(feats_lens) feats_pad = pad_sequence(feats, batch_first=True, padding_value=0.0) return feats_pad, feats_lens