This commit is contained in:
游雁 2024-03-24 00:45:45 +08:00
parent fffb628d31
commit 873cfae5c3
4 changed files with 45 additions and 7 deletions

View File

@ -149,8 +149,8 @@ def main(**kwargs):
# dataset
logging.info("Build dataloader")
dataloader_class = tables.dataloader_classes.get(kwargs["dataset_conf"].get("dataloader", "DataloaderMapStyle"))
# dataloader = dataloader_class(**kwargs)
dataloader_tr, dataloader_val = dataloader_class(**kwargs)
trainer = Trainer(local_rank=local_rank,
use_ddp=use_ddp,
use_fsdp=use_fsdp,
@ -172,15 +172,15 @@ def main(**kwargs):
except:
writer = None
if use_ddp or use_fsdp:
context = Join([model])
else:
# if use_ddp or use_fsdp:
# context = Join([model])
# else:
context = nullcontext()
for epoch in range(trainer.start_epoch, trainer.max_epoch + 1):
time1 = time.perf_counter()
with context:
# dataloader_tr, dataloader_val = dataloader.build_iter(epoch)
trainer.train_epoch(
model=model,
optim=optim,

View File

@ -212,7 +212,7 @@ class CustomDistributedBufferBatchSampler(Sampler):
def set_epoch(self, epoch):
self.epoch = epoch
class CustomDistributedDynamicBatchSampler(Sampler):
class CustomDistributedDynamicBatchSampler(DistributedSampler):
def __init__(self, dataset,
batch_size,
num_replicas=None,

View File

@ -25,6 +25,37 @@ def DataloaderMapStyle(frontend=None, tokenizer=None, **kwargs):
return dataloader_tr, dataloader_val
# @tables.register("dataloader_classes", "DataloaderMapStyle")
class DataloaderMapStyle:
def __init__(self, frontend=None, tokenizer=None, **kwargs):
# dataset
logging.info("Build dataloader")
dataset_class = tables.dataset_classes.get(kwargs.get("dataset", "AudioDataset"))
dataset_tr = dataset_class(kwargs.get("train_data_set_list"), frontend=frontend, tokenizer=tokenizer,
is_training=True, **kwargs.get("dataset_conf"))
dataset_val = dataset_class(kwargs.get("valid_data_set_list"), frontend=frontend, tokenizer=tokenizer,
is_training=False, **kwargs.get("dataset_conf"))
self.dataset_tr = dataset_tr
self.dataset_val = dataset_val
self.kwargs = kwargs
def build_iter(self, epoch=0):
# dataloader
batch_sampler = self.kwargs["dataset_conf"].get("batch_sampler", "BatchSampler")
batch_sampler_val = None
if batch_sampler is not None:
batch_sampler_class = tables.batch_sampler_classes.get(batch_sampler)
batch_sampler = batch_sampler_class(self.dataset_tr, **self.kwargs.get("dataset_conf"))
batch_sampler_val = batch_sampler_class(self.dataset_val, is_training=False, **self.kwargs.get("dataset_conf"))
batch_sampler["batch_sampler"].set_epoch(epoch)
batch_sampler_val.set_epoch(epohc)
dataloader_tr = torch.utils.data.DataLoader(self.dataset_tr, collate_fn=self.dataset_tr.collator, **batch_sampler)
dataloader_val = torch.utils.data.DataLoader(self.dataset_val, collate_fn=self.dataset_val.collator, **batch_sampler_val)
return dataloader_tr, dataloader_val
@tables.register("dataloader_classes", "DataloaderIterable")
def DataloaderIterable(frontend=None, tokenizer=None, **kwargs):

View File

@ -249,6 +249,9 @@ class Trainer:
speed_stats = {}
time5 = time.perf_counter()
iterator_stop = torch.tensor(0).to(self.device)
dist.barrier()
print(f"before iter, iterator_stop: {iterator_stop}\n")
dataloader_train.batch_sampler.set_epoch(epoch)
for batch_idx, batch in enumerate(dataloader_train):
if self.use_ddp or self.use_fsdp:
dist.all_reduce(iterator_stop, dist.ReduceOp.SUM)
@ -392,9 +395,13 @@ class Trainer:
speed_stats = {}
time5 = time.perf_counter()
iterator_stop = torch.tensor(0).to(self.device)
dist.barrier()
print(f"before iter, iterator_stop: {iterator_stop}\n")
for batch_idx, batch in enumerate(dataloader_val):
if self.use_ddp or self.use_fsdp:
dist.all_reduce(iterator_stop, dist.ReduceOp.SUM)
if epoch >= 1:
print(f"iterator_stop: {iterator_stop}\n")
if iterator_stop > 0:
break
time1 = time.perf_counter()