Merge pull request #2269 from modelscope/dev_sx2

update sensevoice small with timestamp
This commit is contained in:
Shi Xian 2024-12-05 19:30:30 +08:00 committed by GitHub
commit 22b928dd3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 153 additions and 3 deletions

View File

@ -29,6 +29,21 @@ res = model.generate(
text = rich_transcription_postprocess(res[0]["text"])
print(text)
# en with timestamp
res = model.generate(
input=f"{model.model_path}/example/en.mp3",
cache={},
language="auto", # "zn", "en", "yue", "ja", "ko", "nospeech"
use_itn=True,
batch_size_s=60,
merge_vad=True, #
merge_length_s=15,
output_timestamp=True,
)
print(res)
text = rich_transcription_postprocess(res[0]["text"])
print(text)
# zh
res = model.generate(
input=f"{model.model_path}/example/zh.mp3",
@ -42,6 +57,21 @@ res = model.generate(
text = rich_transcription_postprocess(res[0]["text"])
print(text)
# zh with timestamp
res = model.generate(
input=f"{model.model_path}/example/zh.mp3",
cache={},
language="auto", # "zn", "en", "yue", "ja", "ko", "nospeech"
use_itn=True,
batch_size_s=60,
merge_vad=True, #
merge_length_s=15,
output_timestamp=True,
)
print(res)
text = rich_transcription_postprocess(res[0]["text"])
print(text)
# yue
res = model.generate(
input=f"{model.model_path}/example/yue.mp3",

View File

@ -19,6 +19,7 @@ from funasr.register import tables
from funasr.models.paraformer.search import Hypothesis
from funasr.models.sense_voice.utils.ctc_alignment import ctc_forced_align
class SinusoidalPositionEncoder(torch.nn.Module):
@ -857,6 +858,8 @@ class SenseVoiceSmall(nn.Module):
use_itn = kwargs.get("use_itn", False)
textnorm = kwargs.get("text_norm", None)
output_timestamp = kwargs.get("output_timestamp", False)
if textnorm is None:
textnorm = "withitn" if use_itn else "woitn"
textnorm_query = self.embed(
@ -905,18 +908,70 @@ class SenseVoiceSmall(nn.Module):
# Change integer-ids to tokens
text = tokenizer.decode(token_int)
result_i = {"key": key[i], "text": text}
results.append(result_i)
# result_i = {"key": key[i], "text": text}
# results.append(result_i)
if ibest_writer is not None:
ibest_writer["text"][key[i]] = text
if output_timestamp:
from itertools import groupby
timestamp = []
tokens = tokenizer.text2tokens(text)[4:]
logits_speech = self.ctc.softmax(encoder_out)[i, 4:encoder_out_lens[i].item(), :]
pred = logits_speech.argmax(-1).cpu()
logits_speech[pred==self.blank_id, self.blank_id] = 0
align = ctc_forced_align(
logits_speech.unsqueeze(0).float(),
torch.Tensor(token_int[4:]).unsqueeze(0).long().to(logits_speech.device),
(encoder_out_lens-4).long(),
torch.tensor(len(token_int)-4).unsqueeze(0).long().to(logits_speech.device),
ignore_id=self.ignore_id,
)
pred = groupby(align[0, :encoder_out_lens[0]])
_start = 0
token_id = 0
ts_max = encoder_out_lens[i] - 4
for pred_token, pred_frame in pred:
_end = _start + len(list(pred_frame))
if pred_token != 0:
ts_left = max((_start*60-30)/1000, 0)
ts_right = min((_end*60-30)/1000, (ts_max*60-30)/1000)
timestamp.append([tokens[token_id], ts_left, ts_right])
token_id += 1
_start = _end
timestamp = self.post(timestamp)
result_i = {"key": key[i], "text": text, "timestamp": timestamp}
results.append(result_i)
else:
result_i = {"key": key[i], "text": text}
results.append(result_i)
return results, meta_data
def post(self, timestamp):
timestamp_new = []
for i, t in enumerate(timestamp):
word, start, end = t
if word == '':
continue
if i == 0:
# timestamp_new.append([word, start, end])
timestamp_new.append([int(start*1000), int(end*1000)])
elif word.startswith("") or len(word) == 1 or not word[1].isalpha():
word = word[1:]
# timestamp_new.append([word, start, end])
timestamp_new.append([int(start*1000), int(end*1000)])
else:
# timestamp_new[-1][0] += word
timestamp_new[-1][1] = int(end*1000)
return timestamp_new
def export(self, **kwargs):
from .export_meta import export_rebuild_model
from export_meta import export_rebuild_model
if "max_seq_len" not in kwargs:
kwargs["max_seq_len"] = 512
models = export_rebuild_model(model=self, **kwargs)
return models
return results, meta_data

View File

@ -0,0 +1,65 @@
import torch
def ctc_forced_align(
log_probs: torch.Tensor,
targets: torch.Tensor,
input_lengths: torch.Tensor,
target_lengths: torch.Tensor,
blank: int = 0,
ignore_id: int = -1,
) -> torch.Tensor:
"""Align a CTC label sequence to an emission.
Args:
log_probs (Tensor): log probability of CTC emission output.
Tensor of shape `(B, T, C)`. where `B` is the batch size, `T` is the input length,
`C` is the number of characters in alphabet including blank.
targets (Tensor): Target sequence. Tensor of shape `(B, L)`,
where `L` is the target length.
input_lengths (Tensor):
Lengths of the inputs (max value must each be <= `T`). 1-D Tensor of shape `(B,)`.
target_lengths (Tensor):
Lengths of the targets. 1-D Tensor of shape `(B,)`.
blank_id (int, optional): The index of blank symbol in CTC emission. (Default: 0)
ignore_id (int, optional): The index of ignore symbol in CTC emission. (Default: -1)
"""
targets[targets == ignore_id] = blank
batch_size, input_time_size, _ = log_probs.size()
bsz_indices = torch.arange(batch_size, device=input_lengths.device)
_t_a_r_g_e_t_s_ = torch.cat(
(
torch.stack((torch.full_like(targets, blank), targets), dim=-1).flatten(start_dim=1),
torch.full_like(targets[:, :1], blank),
),
dim=-1,
)
diff_labels = torch.cat(
(
torch.as_tensor([[False, False]], device=targets.device).expand(batch_size, -1),
_t_a_r_g_e_t_s_[:, 2:] != _t_a_r_g_e_t_s_[:, :-2],
),
dim=1,
)
neg_inf = torch.tensor(float("-inf"), device=log_probs.device, dtype=log_probs.dtype)
padding_num = 2
padded_t = padding_num + _t_a_r_g_e_t_s_.size(-1)
best_score = torch.full((batch_size, padded_t), neg_inf, device=log_probs.device, dtype=log_probs.dtype)
best_score[:, padding_num + 0] = log_probs[:, 0, blank]
best_score[:, padding_num + 1] = log_probs[bsz_indices, 0, _t_a_r_g_e_t_s_[:, 1]]
backpointers = torch.zeros((batch_size, input_time_size, padded_t), device=log_probs.device, dtype=targets.dtype)
for t in range(1, input_time_size):
prev = torch.stack(
(best_score[:, 2:], best_score[:, 1:-1], torch.where(diff_labels, best_score[:, :-2], neg_inf))
)
prev_max_value, prev_max_idx = prev.max(dim=0)
best_score[:, padding_num:] = log_probs[:, t].gather(-1, _t_a_r_g_e_t_s_) + prev_max_value
backpointers[:, t, padding_num:] = prev_max_idx
l1l2 = best_score.gather(
-1, torch.stack((padding_num + target_lengths * 2 - 1, padding_num + target_lengths * 2), dim=-1)
)
path = torch.zeros((batch_size, input_time_size), device=best_score.device, dtype=torch.long)
path[bsz_indices, input_lengths - 1] = padding_num + target_lengths * 2 - 1 + l1l2.argmax(dim=-1)
for t in range(input_time_size - 1, 0, -1):
target_indices = path[:, t]
prev_max_idx = backpointers[bsz_indices, t, target_indices]
path[:, t - 1] += target_indices - prev_max_idx
alignments = _t_a_r_g_e_t_s_.gather(dim=-1, index=(path - padding_num).clamp(min=0))
return alignments