FunASR/funasr/datasets/audio_datasets/preprocessor.py
2024-02-19 21:26:25 +08:00

83 lines
2.2 KiB
Python

import os
import json
import torch
import logging
import concurrent.futures
import librosa
import torch.distributed as dist
from typing import Collection
import torch
import torchaudio
from torch import nn
import random
import re
from funasr.tokenizer.cleaner import TextCleaner
from funasr.register import tables
@tables.register("preprocessor_classes", "SpeechPreprocessSpeedPerturb")
class SpeechPreprocessSpeedPerturb(nn.Module):
def __init__(self, speed_perturb: list=None, **kwargs):
super().__init__()
self.speed_perturb = speed_perturb
def forward(self, waveform, fs, **kwargs):
if self.speed_perturb is None:
return waveform
speed = random.choice(self.speed_perturb)
if speed != 1.0:
waveform, _ = torchaudio.sox_effects.apply_effects_tensor(
torch.tensor(waveform).view(1, -1), fs, [['speed', str(speed)], ['rate', str(fs)]])
waveform = waveform.view(-1)
return waveform
@tables.register("preprocessor_classes", "TextPreprocessSegDict")
class TextPreprocessSegDict(nn.Module):
def __init__(self, seg_dict: str = None,
text_cleaner: Collection[str] = None,
split_with_space: bool = False,
**kwargs):
super().__init__()
self.seg_dict = None
if seg_dict is not None:
self.seg_dict = {}
with open(seg_dict, "r", encoding="utf8") as f:
lines = f.readlines()
for line in lines:
s = line.strip().split()
key = s[0]
value = s[1:]
self.seg_dict[key] = " ".join(value)
self.text_cleaner = TextCleaner(text_cleaner)
self.split_with_space = split_with_space
def forward(self, text, **kwargs):
if self.seg_dict is not None:
text = self.text_cleaner(text)
if self.split_with_space:
tokens = text.strip().split(" ")
if self.seg_dict is not None:
text = seg_tokenize(tokens, self.seg_dict)
return text
def seg_tokenize(txt, seg_dict):
pattern = re.compile(r'^[\u4E00-\u9FA50-9]+$')
out_txt = ""
for word in txt:
word = word.lower()
if word in seg_dict:
out_txt += seg_dict[word] + " "
else:
if pattern.match(word):
for char in word:
if char in seg_dict:
out_txt += seg_dict[char] + " "
else:
out_txt += "<unk>" + " "
else:
out_txt += "<unk>" + " "
return out_txt.strip().split()