FunASR/funasr/export/models/decoder/xformer_decoder.py
2023-07-14 17:43:40 +08:00

122 lines
4.0 KiB
Python

import os
import torch
import torch.nn as nn
from funasr.modules.attention import MultiHeadedAttention
from funasr.export.models.modules.decoder_layer import DecoderLayer as OnnxDecoderLayer
from funasr.export.models.language_models.embed import Embedding
from funasr.export.models.modules.multihead_att import \
OnnxMultiHeadedAttention
from funasr.export.utils.torch_function import MakePadMask, subsequent_mask
class XformerDecoder(nn.Module):
def __init__(self,
model,
max_seq_len = 512,
model_name = 'decoder',
onnx: bool = True,):
super().__init__()
self.embed = Embedding(model.embed, max_seq_len)
self.model = model
if onnx:
self.make_pad_mask = MakePadMask(max_seq_len, flip=False)
else:
self.make_pad_mask = subsequent_mask(max_seq_len, flip=False)
if isinstance(self.model.decoders[0].self_attn, MultiHeadedAttention):
self.num_heads = self.model.decoders[0].self_attn.h
self.hidden_size = self.model.decoders[0].self_attn.linear_out.out_features
# replace multi-head attention module into customized module.
for i, d in enumerate(self.model.decoders):
# d is DecoderLayer
if isinstance(d.self_attn, MultiHeadedAttention):
d.self_attn = OnnxMultiHeadedAttention(d.self_attn)
if isinstance(d.src_attn, MultiHeadedAttention):
d.src_attn = OnnxMultiHeadedAttention(d.src_attn)
self.model.decoders[i] = OnnxDecoderLayer(d)
self.model_name = model_name
def prepare_mask(self, mask):
mask_3d_btd = mask[:, :, None]
if len(mask.shape) == 2:
mask_4d_bhlt = 1 - mask[:, None, None, :]
elif len(mask.shape) == 3:
mask_4d_bhlt = 1 - mask[:, None, :]
mask_4d_bhlt = mask_4d_bhlt * -10000.0
return mask_3d_btd, mask_4d_bhlt
def forward(self,
tgt,
memory,
cache):
mask = subsequent_mask(tgt.size(-1)).unsqueeze(0) # (B, T)
x = self.embed(tgt)
mask = self.prepare_mask(mask)
new_cache = []
for c, decoder in zip(cache, self.model.decoders):
x, mask = decoder(x, mask, memory, None, c)
new_cache.append(x)
x = x[:, 1:, :]
if self.model.normalize_before:
y = self.model.after_norm(x[:, -1])
else:
y = x[:, -1]
if self.model.output_layer is not None:
y = torch.log_softmax(self.model.output_layer(y), dim=-1)
return y, new_cache
def get_dummy_inputs(self, enc_size):
tgt = torch.LongTensor([0]).unsqueeze(0)
memory = torch.randn(1, 100, enc_size)
cache_num = len(self.model.decoders)
cache = [
torch.zeros((1, 1, self.model.decoders[0].size))
for _ in range(cache_num)
]
return (tgt, memory, cache)
def is_optimizable(self):
return True
def get_input_names(self):
cache_num = len(self.model.decoders)
return ["tgt", "memory"] + [
"cache_%d" % i for i in range(cache_num)
]
def get_output_names(self):
cache_num = len(self.model.decoders)
return ["y"] + ["out_cache_%d" % i for i in range(cache_num)]
def get_dynamic_axes(self):
ret = {
"tgt": {0: "tgt_batch", 1: "tgt_length"},
"memory": {0: "memory_batch", 1: "memory_length"},
}
cache_num = len(self.model.decoders)
ret.update(
{
"cache_%d" % d: {0: "cache_%d_batch" % d, 2: "cache_%d_length" % d}
for d in range(cache_num)
}
)
return ret
def get_model_config(self, path):
return {
"dec_type": "XformerDecoder",
"model_path": os.path.join(path, f"{self.model_name}.onnx"),
"n_layers": len(self.model.decoders),
"odim": self.model.decoders[0].size,
}