FunASR/funasr/datasets/audio_datasets/samplers.py
zhifu gao 4482bbcbb9
train (#1521)
* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer

* trainer
2024-03-21 11:49:30 +08:00

328 lines
12 KiB
Python

import torch
import numpy as np
import logging
import math
import torch.distributed as dist
from torch.utils.data import DistributedSampler
from torch.utils.data import BatchSampler, Sampler
import torch.distributed as dist
from funasr.register import tables
@tables.register("batch_sampler_classes", "BatchSampler")
@tables.register("batch_sampler_classes", "CustomDistributedBatchSampler")
@tables.register("batch_sampler_classes", "CustomDistributedDynamicBatchSampler")
@tables.register("batch_sampler_classes", "DynamicBatchLocalShuffleSampler")
@tables.register("batch_sampler_classes", "RankFullLocalShuffleBatchSampler")
@tables.register("batch_sampler_classes", "RankFullLocalShuffleDynamicBatchSampler")
def CustomDistributedBatchSampler_fn(dataset, **kwargs):
dataloader_args = {}
batch_type = kwargs.get("batch_type", "example")
if batch_type == "example":
batch_sampler = CustomDistributedBatchSampler(dataset, **kwargs)
else:
batch_sampler = CustomDistributedDynamicBatchSampler(dataset, **kwargs)
dataloader_args["batch_sampler"] = batch_sampler
dataloader_args["num_workers"] = kwargs.get("num_workers", 4)
dataloader_args["pin_memory"] = kwargs.get("pin_memory", True)
return dataloader_args
class CustomDistributedBatchSampler(Sampler):
def __init__(self, dataset,
batch_size,
num_replicas=None,
rank=None,
shuffle=True,
drop_last=False,
is_training: bool = True,
**kwargs,
):
try:
rank = dist.get_rank()
num_replicas = dist.get_world_size()
except:
rank = 0
num_replicas = 1
self.rank = rank
self.num_replicas = num_replicas
self.dataset = dataset
self.batch_size = batch_size
self.is_training = is_training
self.shuffle = shuffle and is_training
self.drop_last = drop_last
# self.total_size = len(dataset)
if self.drop_last:
self.total_size = (len(self.dataset) // (batch_size * num_replicas)) * (batch_size * num_replicas)
else:
self.total_size = math.ceil(len(self.dataset) / (batch_size * num_replicas)) * (batch_size * num_replicas)
self.num_samples = int(self.total_size // self.num_replicas)
self.epoch = 0
self.max_token_length = kwargs.get("max_token_length", None)
self.length_scale_source = kwargs.get("length_scale_source", 1.0)
def __iter__(self):
# Generate a list of indices
if self.shuffle:
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
# Add extra samples to make it evenly divisible
padding_size = self.total_size - len(indices)
if padding_size <= len(indices):
indices += indices[:padding_size]
else:
indices += (indices * (padding_size // len(indices)) + indices[:padding_size % len(indices)])
assert len(indices) == self.total_size
# Subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
# Filter out indices with length greater than the max length, if provided
if self.max_token_length is not None:
filtered_indices = []
for idx in indices:
source_len = self.dataset.get_source_len(idx) / self.length_scale_source
if source_len <= self.max_token_length:
filtered_indices.append(idx)
indices = filtered_indices
# Now that we have only the indices for this replica, chunk them into batches
batches = [indices[i:i + self.batch_size] for i in range(0, len(indices), self.batch_size)]
# Drop the last batch if it's not full and drop_last is True
if self.drop_last and len(batches[-1]) != self.batch_size:
batches = batches[:-1]
return iter(batches)
def __len__(self):
return self.num_samples // self.batch_size
def set_epoch(self, epoch):
self.epoch = epoch
class CustomDistributedBufferBatchSampler(Sampler):
def __init__(self, dataset,
batch_size,
num_replicas=None,
rank=None,
shuffle=True,
drop_last=False,
is_training: bool = True,
sort_size: int = 1024,
**kwargs,
):
try:
rank = dist.get_rank()
num_replicas = dist.get_world_size()
except:
rank = 0
num_replicas = 1
self.rank = rank
self.num_replicas = num_replicas
self.dataset = dataset
self.batch_size = batch_size
self.is_training = is_training
self.shuffle = shuffle and is_training
self.drop_last = drop_last
# self.total_size = len(dataset)
if self.drop_last:
self.total_size = (len(self.dataset) // (batch_size * num_replicas)) * (batch_size * num_replicas)
else:
self.total_size = math.ceil(len(self.dataset) / (batch_size * num_replicas)) * (batch_size * num_replicas)
self.num_samples = int(self.total_size // self.num_replicas)
self.epoch = 0
self.max_token_length = kwargs.get("max_token_length", None)
self.length_scale_source = kwargs.get("length_scale_source", 1.0)
self.sort_size = sort_size
def __iter__(self):
# Generate a list of indices
if self.shuffle:
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
# Add extra samples to make it evenly divisible
padding_size = self.total_size - len(indices)
if padding_size <= len(indices):
indices += indices[:padding_size]
else:
indices += (indices * (padding_size // len(indices)) + indices[:padding_size % len(indices)])
assert len(indices) == self.total_size
# Subsample
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
# Filter out indices with length greater than the max length, if provided
if self.max_token_length is not None:
filtered_indices = []
for idx in indices:
source_len = self.dataset.get_source_len(idx) / self.length_scale_source
if source_len <= self.max_token_length:
filtered_indices.append(idx)
indices = filtered_indices
# Buffer sorting logic
sorted_batches = []
buffer = []
for idx in indices:
buffer.append(idx)
if len(buffer) >= self.sort_size:
# Sort the buffer based on some criteria, e.g., dataset sample length
buffer.sort(key=lambda x: self.dataset.get_source_len(x))
sorted_batches.extend(self._create_batches_from_buffer(buffer))
buffer = []
# Handle the remaining items in the buffer
if buffer:
buffer.sort(key=lambda x: self.dataset.get_source_len(x))
sorted_batches.extend(self._create_batches_from_buffer(buffer))
return iter(sorted_batches)
def _create_batches_from_buffer(self, buffer):
# Function to convert the sorted buffer into batches
batched_buffer = [buffer[i:i + self.batch_size] for i in range(0, len(buffer), self.batch_size)]
if self.drop_last and len(batched_buffer[-1]) != self.batch_size:
batched_buffer = batched_buffer[:-1]
return batched_buffer
def __len__(self):
return self.num_samples // self.batch_size
def set_epoch(self, epoch):
self.epoch = epoch
class CustomDistributedDynamicBatchSampler(Sampler):
def __init__(self, dataset,
batch_size,
num_replicas=None,
rank=None,
shuffle=True,
drop_last=False,
is_training: bool = True,
**kwargs,
):
try:
rank = dist.get_rank()
num_replicas = dist.get_world_size()
except:
rank = 0
num_replicas = 1
self.rank = rank
self.num_replicas = num_replicas
self.dataset = dataset
self.batch_size = batch_size
self.is_training = is_training
self.shuffle = shuffle and is_training
self.drop_last = drop_last
self.total_size = len(self.dataset)
# self.num_samples = int(math.ceil(self.total_size / self.num_replicas))
self.epoch = 0
def __iter__(self):
if self.shuffle:
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist()
else:
indices = list(range(len(self.dataset)))
indices = indices[self.rank:self.total_size:self.num_replicas]
batches = []
batch = []
max_len_in_batch = 0
current_batch_length = 0
for idx in indices:
sample_length = self.dataset.get_source_len(idx)
potential_batch_length = (max_len_in_batch if sample_length < max_len_in_batch else sample_length) * (
len(batch) + 1)
if potential_batch_length <= self.batch_size:
batch.append(idx)
if sample_length > max_len_in_batch:
max_len_in_batch = sample_length
current_batch_length = max_len_in_batch * len(batch)
else:
batches.append(batch)
batch = [idx]
max_len_in_batch = sample_length
current_batch_length = max_len_in_batch
# Add the last batch if it's not empty and we're not dropping it
if batch and (not self.drop_last or len(batch) * max_len_in_batch == self.batch_size):
batches.append(batch)
return iter(batches)
def __len__(self):
return 1
def set_epoch(self, epoch):
self.epoch = epoch
class DistributedSamplerWarp(BatchSampler):
def __init__(self, dataset, batch_size, num_replicas=None, rank=None, shuffle=True, drop_last=False):
if num_replicas is None:
if not torch.distributed.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = torch.distributed.get_world_size()
if rank is None:
if not torch.distributed.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = torch.distributed.get_rank()
self.dataset = dataset
self.batch_size = batch_size
self.num_replicas = num_replicas
self.rank = rank
self.shuffle = shuffle
self.drop_last = drop_last
# Create an instance of the DistributedSampler
self.sampler = DistributedSampler(
self.dataset,
num_replicas=self.num_replicas,
rank=self.rank,
shuffle=self.shuffle
)
# Call BatchSampler's constructor
super().__init__(self.sampler, batch_size, drop_last)
def __iter__(self):
# If we shuffle, we need to call the set_epoch method
if self.shuffle:
self.sampler.set_epoch(self.epoch)
# Generate batch indices using the parent class
return super().__iter__()
def set_epoch(self, epoch):
self.epoch = epoch