inference

This commit is contained in:
游雁 2024-06-24 17:05:43 +08:00
parent 06839ef605
commit b84a203d16
3 changed files with 6 additions and 540 deletions

View File

@ -484,226 +484,6 @@ class SANMEncoder(nn.Module):
return xs_pad, ilens, None
@tables.register("encoder_classes", "SANMTPEncoder")
class SANMTPEncoder(nn.Module):
"""
Author: Speech Lab of DAMO Academy, Alibaba Group
SCAMA: Streaming chunk-aware multihead attention for online end-to-end speech recognition
https://arxiv.org/abs/2006.01713
"""
def __init__(
self,
input_size: int,
output_size: int = 256,
attention_heads: int = 4,
linear_units: int = 2048,
num_blocks: int = 6,
tp_blocks: int = 0,
dropout_rate: float = 0.1,
positional_dropout_rate: float = 0.1,
attention_dropout_rate: float = 0.0,
stochastic_depth_rate: float = 0.0,
input_layer: Optional[str] = "conv2d",
pos_enc_class=SinusoidalPositionEncoder,
normalize_before: bool = True,
concat_after: bool = False,
positionwise_layer_type: str = "linear",
positionwise_conv_kernel_size: int = 1,
padding_idx: int = -1,
kernel_size: int = 11,
sanm_shfit: int = 0,
selfattention_layer_type: str = "sanm",
):
super().__init__()
self._output_size = output_size
if input_layer == "linear":
self.embed = torch.nn.Sequential(
torch.nn.Linear(input_size, output_size),
torch.nn.LayerNorm(output_size),
torch.nn.Dropout(dropout_rate),
torch.nn.ReLU(),
eval(pos_enc_class)(output_size, positional_dropout_rate),
)
elif input_layer == "linear_no_pos":
self.embed = torch.nn.Sequential(
torch.nn.Linear(input_size, output_size),
torch.nn.LayerNorm(output_size),
torch.nn.Dropout(dropout_rate),
eval(pos_enc_class)(output_size, positional_dropout_rate, use_pos=False),
)
elif input_layer == "conv2d":
self.embed = Conv2dSubsampling(input_size, output_size, dropout_rate)
elif input_layer == "conv2d2":
self.embed = Conv2dSubsampling2(input_size, output_size, dropout_rate)
elif input_layer == "conv2d6":
self.embed = Conv2dSubsampling6(input_size, output_size, dropout_rate)
elif input_layer == "conv2d8":
self.embed = Conv2dSubsampling8(input_size, output_size, dropout_rate)
elif input_layer == "embed":
self.embed = torch.nn.Sequential(
torch.nn.Embedding(input_size, output_size, padding_idx=padding_idx),
eval(pos_enc_class)(output_size, positional_dropout_rate),
)
elif input_layer is None:
if input_size == output_size:
self.embed = None
else:
self.embed = torch.nn.Linear(input_size, output_size)
elif input_layer == "pe":
self.embed = SinusoidalPositionEncoder()
elif input_layer == "pe_online":
self.embed = StreamSinusoidalPositionEncoder()
else:
raise ValueError("unknown input_layer: " + input_layer)
self.normalize_before = normalize_before
if positionwise_layer_type == "linear":
positionwise_layer = PositionwiseFeedForward
positionwise_layer_args = (
output_size,
linear_units,
dropout_rate,
)
elif positionwise_layer_type == "conv1d":
positionwise_layer = MultiLayeredConv1d
positionwise_layer_args = (
output_size,
linear_units,
positionwise_conv_kernel_size,
dropout_rate,
)
elif positionwise_layer_type == "conv1d-linear":
positionwise_layer = Conv1dLinear
positionwise_layer_args = (
output_size,
linear_units,
positionwise_conv_kernel_size,
dropout_rate,
)
else:
raise NotImplementedError("Support only linear or conv1d.")
if selfattention_layer_type == "selfattn":
encoder_selfattn_layer = MultiHeadedAttention
encoder_selfattn_layer_args = (
attention_heads,
output_size,
attention_dropout_rate,
)
elif selfattention_layer_type == "sanm":
encoder_selfattn_layer = MultiHeadedAttentionSANM
encoder_selfattn_layer_args0 = (
attention_heads,
input_size,
output_size,
attention_dropout_rate,
kernel_size,
sanm_shfit,
)
encoder_selfattn_layer_args = (
attention_heads,
output_size,
output_size,
attention_dropout_rate,
kernel_size,
sanm_shfit,
)
self.encoders0 = repeat(
1,
lambda lnum: EncoderLayerSANM(
input_size,
output_size,
encoder_selfattn_layer(*encoder_selfattn_layer_args0),
positionwise_layer(*positionwise_layer_args),
dropout_rate,
normalize_before,
concat_after,
),
)
self.encoders = repeat(
num_blocks - 1,
lambda lnum: EncoderLayerSANM(
output_size,
output_size,
encoder_selfattn_layer(*encoder_selfattn_layer_args),
positionwise_layer(*positionwise_layer_args),
dropout_rate,
normalize_before,
concat_after,
stochastic_depth_rate,
),
)
self.tp_encoders = repeat(
tp_blocks,
lambda lnum: EncoderLayerSANM(
output_size,
output_size,
encoder_selfattn_layer(*encoder_selfattn_layer_args),
positionwise_layer(*positionwise_layer_args),
dropout_rate,
normalize_before,
concat_after,
stochastic_depth_rate,
),
)
if self.normalize_before:
self.after_norm = LayerNorm(output_size)
self.tp_blocks = tp_blocks
if self.tp_blocks > 0:
self.tp_norm = LayerNorm(output_size)
def output_size(self) -> int:
return self._output_size
def forward(
self,
xs_pad: torch.Tensor,
ilens: torch.Tensor,
) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]:
"""Embed positions in tensor.
Args:
xs_pad: input tensor (B, L, D)
ilens: input length (B)
prev_states: Not to be used now.
Returns:
position embedded tensor and mask
"""
masks = (~make_pad_mask(ilens)[:, None, :]).to(xs_pad.device)
xs_pad *= self.output_size() ** 0.5
if self.embed is None:
xs_pad = xs_pad
elif (
isinstance(self.embed, Conv2dSubsampling)
or isinstance(self.embed, Conv2dSubsampling2)
or isinstance(self.embed, Conv2dSubsampling6)
or isinstance(self.embed, Conv2dSubsampling8)
):
short_status, limit_size = check_short_utt(self.embed, xs_pad.size(1))
if short_status:
raise TooShortUttError(
f"has {xs_pad.size(1)} frames and is too short for subsampling "
+ f"(it needs more than {limit_size} frames), return empty results",
xs_pad.size(1),
limit_size,
)
xs_pad, masks = self.embed(xs_pad, masks)
else:
xs_pad = self.embed(xs_pad)
# forward encoder1
mask_shfit_chunk, mask_att_chunk_encoder = None, None
encoder_outs = self.encoders0(xs_pad, masks, None, mask_shfit_chunk, mask_att_chunk_encoder)
xs_pad, masks = encoder_outs[0], encoder_outs[1]
encoder_outs = self.encoders(xs_pad, masks, None, mask_shfit_chunk, mask_att_chunk_encoder)
xs_pad, masks = encoder_outs[0], encoder_outs[1]
if self.normalize_before:
xs_pad = self.after_norm(xs_pad)
# forward encoder2
olens = masks.squeeze(1).sum(1)
mask_shfit_chunk2, mask_att_chunk_encoder2 = None, None
for layer_idx, encoder_layer in enumerate(self.tp_encoders):
encoder_outs = encoder_layer(xs_pad, masks, None, mask_shfit_chunk2, mask_att_chunk_encoder2)
xs_pad, masks = encoder_outs[0], encoder_outs[1]
if self.tp_blocks > 0:
xs_pad = self.tp_norm(xs_pad)
return xs_pad, olens
class EncoderLayerSANMExport(nn.Module):
def __init__(
self,

View File

@ -1392,323 +1392,3 @@ class SenseVoiceSANM(nn.Module):
from funasr.models.paraformer.search import Hypothesis
from funasr.utils import postprocess_utils
@tables.register("model_classes", "SenseVoiceSANMCTC")
class SenseVoiceSANMCTC(nn.Module):
"""CTC-attention hybrid Encoder-Decoder model"""
def __init__(
self,
specaug: str = None,
specaug_conf: dict = None,
normalize: str = None,
normalize_conf: dict = None,
encoder: str = None,
encoder_conf: dict = None,
ctc_conf: dict = None,
input_size: int = 80,
vocab_size: int = -1,
ignore_id: int = -1,
blank_id: int = 0,
sos: int = 1,
eos: int = 2,
length_normalized_loss: bool = False,
**kwargs,
):
super().__init__()
if specaug is not None:
specaug_class = tables.specaug_classes.get(specaug)
specaug = specaug_class(**specaug_conf)
if normalize is not None:
normalize_class = tables.normalize_classes.get(normalize)
normalize = normalize_class(**normalize_conf)
encoder_class = tables.encoder_classes.get(encoder)
encoder = encoder_class(input_size=input_size, **encoder_conf)
encoder_output_size = encoder.output_size()
if ctc_conf is None:
ctc_conf = {}
ctc = CTC(odim=vocab_size, encoder_output_size=encoder_output_size, **ctc_conf)
self.blank_id = blank_id
self.sos = sos if sos is not None else vocab_size - 1
self.eos = eos if eos is not None else vocab_size - 1
self.vocab_size = vocab_size
self.ignore_id = ignore_id
self.specaug = specaug
self.normalize = normalize
self.encoder = encoder
self.length_normalized_loss = length_normalized_loss
self.error_calculator = None
self.criterion_att = LabelSmoothingLoss(
size=self.vocab_size,
padding_idx=self.ignore_id,
smoothing=kwargs.get("lsm_weight", 0.0),
normalize_length=self.length_normalized_loss,
)
self.ctc = ctc
self.length_normalized_loss = length_normalized_loss
self.encoder_output_size = encoder_output_size
self.lid_dict = {"auto": 0, "zh": 3, "en": 4, "yue": 7, "ja": 11, "ko": 12, "nospeech": 13}
self.lid_int_dict = {24884: 3, 24885: 4, 24888: 7, 24892: 11, 24896: 12, 24992: 13}
self.textnorm_dict = {"withitn": 14, "woitn": 15}
self.textnorm_int_dict = {25016: 14, 25017: 15}
self.embed = torch.nn.Embedding(7 + len(self.lid_dict) + len(self.textnorm_dict), input_size)
def forward(
self,
speech: torch.Tensor,
speech_lengths: torch.Tensor,
text: torch.Tensor,
text_lengths: torch.Tensor,
**kwargs,
):
"""Encoder + Decoder + Calc loss
Args:
speech: (Batch, Length, ...)
speech_lengths: (Batch, )
text: (Batch, Length)
text_lengths: (Batch,)
"""
# import pdb;
# pdb.set_trace()
if len(text_lengths.size()) > 1:
text_lengths = text_lengths[:, 0]
if len(speech_lengths.size()) > 1:
speech_lengths = speech_lengths[:, 0]
batch_size = speech.shape[0]
# 1. Encoder
encoder_out, encoder_out_lens = self.encode(speech, speech_lengths, text)
loss_ctc, cer_ctc = None, None
loss_rich, acc_rich = None, None
stats = dict()
loss_ctc, cer_ctc = self._calc_ctc_loss(
encoder_out[:, 4:, :], encoder_out_lens - 4, text[:, 4:], text_lengths - 4
)
loss_rich, acc_rich = self._calc_rich_ce_loss(
encoder_out[:, :4, :], text[:, :4]
)
loss = loss_ctc
# Collect total loss stats
stats["loss"] = torch.clone(loss.detach()) if loss_ctc is not None else None
stats["loss_rich"] = torch.clone(loss_rich.detach()) if loss_rich is not None else None
stats["acc_rich"] = acc_rich
# force_gatherable: to-device and to-tensor if scalar for DataParallel
if self.length_normalized_loss:
batch_size = int((text_lengths + 1).sum())
loss, stats, weight = force_gatherable((loss, stats, batch_size), loss.device)
return loss, stats, weight
def encode(
self,
speech: torch.Tensor,
speech_lengths: torch.Tensor,
text: torch.Tensor,
**kwargs,
):
"""Frontend + Encoder. Note that this method is used by asr_inference.py
Args:
speech: (Batch, Length, ...)
speech_lengths: (Batch, )
ind: int
"""
# Data augmentation
if self.specaug is not None and self.training:
speech, speech_lengths = self.specaug(speech, speech_lengths)
# Normalization for feature: e.g. Global-CMVN, Utterance-CMVN
if self.normalize is not None:
speech, speech_lengths = self.normalize(speech, speech_lengths)
lids = torch.LongTensor([[self.lid_int_dict[int(lid)] if torch.rand(1) > 0.2 and int(lid) in self.lid_int_dict else 0 ] for lid in text[:, 0]]).to(speech.device)
language_query = self.embed(lids)
styles = torch.LongTensor([[self.textnorm_int_dict[int(style)]] for style in text[:, 3]]).to(speech.device)
style_query = self.embed(styles)
speech = torch.cat((style_query, speech), dim=1)
speech_lengths += 1
event_emo_query = self.embed(torch.LongTensor([[1, 2]]).to(speech.device)).repeat(speech.size(0), 1, 1)
input_query = torch.cat((language_query, event_emo_query), dim=1)
speech = torch.cat((input_query, speech), dim=1)
speech_lengths += 3
# Forward encoder
# feats: (Batch, Length, Dim)
# -> encoder_out: (Batch, Length2, Dim2)
encoder_out, encoder_out_lens = self.encoder(speech, speech_lengths)
return encoder_out, encoder_out_lens
def _calc_ctc_loss(
self,
encoder_out: torch.Tensor,
encoder_out_lens: torch.Tensor,
ys_pad: torch.Tensor,
ys_pad_lens: torch.Tensor,
):
# Calc CTC loss
loss_ctc = self.ctc(encoder_out, encoder_out_lens, ys_pad, ys_pad_lens)
# Calc CER using CTC
cer_ctc = None
if not self.training and self.error_calculator is not None:
ys_hat = self.ctc.argmax(encoder_out).data
cer_ctc = self.error_calculator(ys_hat.cpu(), ys_pad.cpu(), is_ctc=True)
return loss_ctc, cer_ctc
def _calc_rich_ce_loss(
self,
encoder_out: torch.Tensor,
ys_pad: torch.Tensor,
):
decoder_out = self.ctc.ctc_lo(encoder_out)
# 2. Compute attention loss
loss_rich = self.criterion_att(decoder_out, ys_pad.contiguous())
acc_rich = th_accuracy(
decoder_out.view(-1, self.vocab_size),
ys_pad.contiguous(),
ignore_label=self.ignore_id,
)
return loss_rich, acc_rich
def inference(
self,
data_in,
data_lengths=None,
key: list = None,
tokenizer=None,
frontend=None,
**kwargs,
):
if kwargs.get("batch_size", 1) > 1:
raise NotImplementedError("batch decoding is not implemented")
meta_data = {}
if (
isinstance(data_in, torch.Tensor) and kwargs.get("data_type", "sound") == "fbank"
): # fbank
speech, speech_lengths = data_in, data_lengths
if len(speech.shape) < 3:
speech = speech[None, :, :]
if speech_lengths is None:
speech_lengths = speech.shape[1]
else:
# extract fbank feats
time1 = time.perf_counter()
audio_sample_list = load_audio_text_image_video(
data_in,
fs=frontend.fs,
audio_fs=kwargs.get("fs", 16000),
data_type=kwargs.get("data_type", "sound"),
tokenizer=tokenizer,
)
time2 = time.perf_counter()
meta_data["load_data"] = f"{time2 - time1:0.3f}"
speech, speech_lengths = extract_fbank(
audio_sample_list, data_type=kwargs.get("data_type", "sound"), frontend=frontend
)
time3 = time.perf_counter()
meta_data["extract_feat"] = f"{time3 - time2:0.3f}"
meta_data["batch_data_time"] = (
speech_lengths.sum().item() * frontend.frame_shift * frontend.lfr_n / 1000
)
speech = speech.to(device=kwargs["device"])
speech_lengths = speech_lengths.to(device=kwargs["device"])
language = kwargs.get("language", None)
if language is not None:
language_query = self.embed(
torch.LongTensor(
[[self.lid_dict[language] if language in self.lid_dict else 0]]
).to(speech.device)
).repeat(speech.size(0), 1, 1)
else:
language_query = self.embed(torch.LongTensor([[0]]).to(speech.device)).repeat(
speech.size(0), 1, 1
)
textnorm = kwargs.get("text_norm", "woitn")
textnorm_query = self.embed(
torch.LongTensor([[self.textnorm_dict[textnorm]]]).to(speech.device)
).repeat(speech.size(0), 1, 1)
speech = torch.cat((textnorm_query, speech), dim=1)
speech_lengths += 1
event_emo_query = self.embed(torch.LongTensor([[1, 2]]).to(speech.device)).repeat(
speech.size(0), 1, 1
)
input_query = torch.cat((language_query, event_emo_query), dim=1)
speech = torch.cat((input_query, speech), dim=1)
speech_lengths += 3
# Encoder
encoder_out, encoder_out_lens = self.encoder(speech, speech_lengths)
if isinstance(encoder_out, tuple):
encoder_out = encoder_out[0]
# c. Passed the encoder result and the beam search
ctc_logits = self.ctc.log_softmax(encoder_out)
results = []
b, n, d = encoder_out.size()
if isinstance(key[0], (list, tuple)):
key = key[0]
if len(key) < b:
key = key * b
for i in range(b):
x = ctc_logits[i, : encoder_out_lens[i], :]
yseq = x.argmax(dim=-1)
yseq = torch.unique_consecutive(yseq, dim=-1)
yseq = torch.tensor([self.sos] + yseq.tolist() + [self.eos], device=yseq.device)
nbest_hyps = [Hypothesis(yseq=yseq)]
for nbest_idx, hyp in enumerate(nbest_hyps):
ibest_writer = None
if kwargs.get("output_dir") is not None:
if not hasattr(self, "writer"):
self.writer = DatadirWriter(kwargs.get("output_dir"))
ibest_writer = self.writer[f"{nbest_idx + 1}best_recog"]
# remove sos/eos and get results
last_pos = -1
if isinstance(hyp.yseq, list):
token_int = hyp.yseq[1:last_pos]
else:
token_int = hyp.yseq[1:last_pos].tolist()
# remove blank symbol id, which is assumed to be 0
token_int = list(
filter(
lambda x: x != self.eos and x != self.sos and x != self.blank_id, token_int
)
)
# Change integer-ids to tokens
text = tokenizer.decode(token_int)
result_i = {"key": key[i], "text": text}
results.append(result_i)
if ibest_writer is not None:
ibest_writer["token"][key[i]] = " ".join(token)
ibest_writer["text"][key[i]] = text_postprocessed
return results, meta_data

View File

@ -49,3 +49,9 @@ class SentencepiecesTokenizer(BaseTokenizer):
def get_vocab_size(self):
return self.sp.GetPieceSize()
def ids2tokens(self, *args, **kwargs):
return self.decode(*args, **kwargs)
def tokens2ids(self, *args, **kwargs):
return self.encode(*args, **kwargs)