Skip to content

Commit

Permalink
Speed optimization training from scratch (#305)
Browse files Browse the repository at this point in the history
* Adding changes for DDP + AMP

* remove initializa_device_settings from example script

* enable checkpointing again. change params

* WIP fix checkpointing for DDP. fix streamdatasilo for non-DDP. switch to WrappedDDP

* WIP reproducibility of runs. add seeds

* clean up

* update params in example

* WIP adjust sagemaker script

* improve logging for sagemaker

* update example scripts

* update trainer_state_dict and eval only in main proc

* fix epoch in tqdm bar

* catch failing reports

* fix desynchronization issue in distributed training w/ unequal num of batches per rank

* fix numbering of steps for saving / resuming

* move all_reduce sync into separate fn

* minor cleaning

* add heuristic estimate of samples

* simplify grouper in StreamingDataSet

* update example scripts

* don't allow more docs for estimate than for actual training

* fix estimate for max_docs=None

* Add shuffling of data for StreamingDataSilo

* Add randomization of file

* Fix filepath conversion

* Write remainder docs to file in randomize_and_split_file()

* Remove #TODO

* smaller fixes, gradient clipping, amp support

* change args. fix file splitting in distributed mode. log learning rate

* fix filepath for splitting

* add dockerfile for sagemaker training from scratch

* simplify calculation of optimization steps

* Add option to disable gradient clipping

* simplify dockerfile

* Update docs and default for gradient clipping

Co-authored-by: Abhinav Sharma <abhinav0301@gmail.com>
Co-authored-by: Tanay Soni <tanaysoni12@gmail.com>
Co-authored-by: Timo Moeller <timo.moeller@deepset.ai>
  • Loading branch information
4 people authored Jun 19, 2020
1 parent 06e45f9 commit 5151b36
Show file tree
Hide file tree
Showing 11 changed files with 527 additions and 163 deletions.
29 changes: 25 additions & 4 deletions Dockerfile-GPU
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
FROM nvidia/cuda:10.2-base-ubuntu18.04
RUN apt-get update && apt-get install -y python3-pip && apt-get install -y python3-pip && apt-get install -y git
FROM pytorch/pytorch:1.5-cuda10.1-cudnn7-devel

RUN apt-get update && apt-get install -y git

# Setup locales
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
locales
RUN locale-gen en_US.UTF-8
ENV LANG en_US.UTF-8
ENV LANGUAGE en_US:en
ENV LC_ALL en_US.UTF-8

WORKDIR /home/user

# Install apex
RUN git clone https://github.com/NVIDIA/apex \
&& cd apex \
&& pip install -v --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" ./



# Install FARM
COPY setup.py requirements.txt readme.rst /home/user/
RUN pip3 install -r requirements.txt
RUN pip install -r requirements.txt
COPY farm farm
RUN pip3 install -e .
RUN pip install -e .


# Copy Training Scripts
COPY examples examples

Expand Down
6 changes: 6 additions & 0 deletions Dockerfile-SageMaker
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM deepset/farm-gpu:latest
COPY examples examples
#COPY data/test data/test

# ENV SAGEMAKER_PROGRAM train.py
ENTRYPOINT ["python3","-m", "torch.distributed.launch", "--nproc_per_node=4", "examples/train_from_scratch_with_sagemaker.py"]
110 changes: 78 additions & 32 deletions examples/train_from_scratch.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,109 @@
# fmt: off
import argparse
import logging
from pathlib import Path

from transformers.tokenization_bert import BertTokenizer

from farm.data_handler.data_silo import StreamingDataSilo
from farm.modeling.tokenization import Tokenizer
from farm.data_handler.data_silo import StreamingDataSilo, DataSilo
from farm.data_handler.processor import BertStyleLMProcessor
from farm.data_handler.utils import randomize_and_split_file
from farm.modeling.adaptive_model import AdaptiveModel
from farm.modeling.language_model import LanguageModel
from farm.modeling.optimization import initialize_optimizer
from farm.modeling.prediction_head import BertLMHead, NextSentenceHead
from farm.train import Trainer
from farm.utils import set_all_seeds, MLFlowLogger, initialize_device_settings
import torch

# To get the best speed in a multi-GPU environment, launch the script via
# python -m torch.distributed.launch --nproc_per_node=<NUM_GPUS> train_from_scratch.py

def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank",
type=int,
default=-1,
help="local_rank for distributed training on GPUs")
args = parser.parse_args()
return args


def train_from_scratch():
args = parse_arguments()
use_amp = "O2" # using "O2" here allows roughly 30% larger batch_sizes and 45% speed up

logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
level=logging.INFO,
)

ml_logger = MLFlowLogger(tracking_uri="")
ml_logger.init_experiment(experiment_name="from_scratch", run_name="debug")
# Only the main process should log here
if args.local_rank in [-1, 0]:
ml_logger = MLFlowLogger(tracking_uri="https://public-mlflow.deepset.ai/")
ml_logger.init_experiment(experiment_name="train_from_scratch", run_name="run")

#########################
######## Settings
########################
set_all_seeds(seed=39)
device, n_gpu = initialize_device_settings(use_cuda=True)
evaluate_every = 5000
vocab_size = 30522
# dev_filename = None
device, n_gpu = initialize_device_settings(use_cuda=True, local_rank=args.local_rank, use_amp=use_amp)

save_dir = Path("saved_models/train_from_scratch")
data_dir = Path("data/test")

n_epochs = 10
learning_rate = 1e-4
warmup_proportion = 0.05
batch_size = 16 # (probably only possible via gradient accumulation steps)
max_seq_len = 64
# Option A) just using a single file
# train_filename = "train.txt"

# Option B) (recommended when using StreamingDataSilo):
# split and shuffle that file to have random order within and across epochs
randomize_and_split_file(data_dir / "train.txt", output_dir=Path("data/split_files"), docs_per_file=1000)
train_filename = Path("data/split_files")

data_dir = Path("data/lm_finetune_nips")
train_filename = "train.txt"
dev_filename = "dev.txt"

distributed = args.local_rank != -1
max_seq_len = 128
batch_size = 8 #if distributed: this is per_gpu
grad_acc = 1
learning_rate = 1e-4
warmup_proportion = 0.05
n_epochs = 2
evaluate_every = 15000
log_loss_every=2
checkpoint_every = 500
checkpoint_root_dir = Path("checkpoints")
checkpoints_to_keep = 4
next_sent_pred_style = "bert-style" #or "sentence"
max_docs = None

# Choose enough workers to queue sufficient batches during training.
# Optimal number depends on your GPU speed, CPU speed and number of cores
# 16 works well on a 4x V100 machine with 16 cores (AWS: p3.8xlarge). For a single GPU you will need less.
data_loader_workers = 1

# 1.Create a tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
tokenizer = Tokenizer.load("bert-base-uncased", do_lower_case=True)

# 2. Create a DataProcessor that handles all the conversion from raw text into a pytorch Dataset
# 2. Create a DataProcessor that handles all the conversion from raw text into a PyTorch Dataset
processor = BertStyleLMProcessor(
data_dir=data_dir,
tokenizer=tokenizer, max_seq_len=max_seq_len,
train_filename=train_filename,
dev_filename=dev_filename,
test_filename=None,
next_sent_pred_style=next_sent_pred_style,
max_docs=max_docs
)

# 3. Create a DataSilo that loads several datasets (train/dev/test), provides DataLoaders for them and
# calculates a few descriptive statistics of our datasets
stream_data_silo = StreamingDataSilo(processor=processor, batch_size=batch_size)
# stream_data_silo = DataSilo(processor=processor, batch_size=batch_size, distributed=distributed)
stream_data_silo = StreamingDataSilo(processor=processor, batch_size=batch_size, distributed=distributed,
dataloader_workers=data_loader_workers)

# 4. Create an AdaptiveModel
# a) which consists of a pretrained language model as a basis
language_model = LanguageModel.from_scratch("bert", vocab_size)
language_model = LanguageModel.from_scratch("bert", tokenizer.vocab_size)

# b) and *two* prediction heads on top that are suited for our task => Language Model finetuning
lm_prediction_head = BertLMHead(768, vocab_size)
next_sentence_head = NextSentenceHead([768, 2], task_name="nextsentence")
lm_prediction_head = BertLMHead(768, tokenizer.vocab_size)
next_sentence_head = NextSentenceHead(num_labels=2, task_name="nextsentence")

model = AdaptiveModel(
language_model=language_model,
Expand All @@ -84,7 +121,10 @@ def train_from_scratch():
n_batches=len(stream_data_silo.get_data_loader("train")),
n_epochs=n_epochs,
device=device,
grad_acc_steps=8,
grad_acc_steps=grad_acc,
distributed=distributed,
use_amp=use_amp,
local_rank=args.local_rank
)

# 6. Feed everything to the Trainer, which keeps care of growing our model and evaluates it from time to time
Expand All @@ -96,17 +136,23 @@ def train_from_scratch():
n_gpu=n_gpu,
lr_schedule=lr_schedule,
evaluate_every=evaluate_every,
log_loss_every=log_loss_every,
device=device,
grad_acc_steps=8,
checkpoint_root_dir=Path("saved_models/train_from_scratch/checkpoints"),
grad_acc_steps=grad_acc,
local_rank=args.local_rank,
checkpoint_every=checkpoint_every,
checkpoint_root_dir=checkpoint_root_dir,
checkpoints_to_keep=checkpoints_to_keep,
use_amp=use_amp
)
# 7. Let it grow! Watch the tracked metrics live on the public mlflow server: https://public-mlflow.deepset.ai
trainer.train()

# 8. Hooray! You have a model. Store it:
model.save(save_dir)
processor.save(save_dir)

if args.local_rank != -1:
torch.distributed.destroy_process_group()

if __name__ == "__main__":
train_from_scratch()
train_from_scratch()
111 changes: 87 additions & 24 deletions examples/train_from_scratch_with_sagemaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,27 @@

from transformers.tokenization_bert import BertTokenizer

from farm.data_handler.data_silo import StreamingDataSilo
from farm.data_handler.data_silo import StreamingDataSilo, DataSilo
from farm.data_handler.processor import BertStyleLMProcessor
from farm.modeling.adaptive_model import AdaptiveModel
from farm.modeling.language_model import LanguageModel
from farm.modeling.optimization import initialize_optimizer
from farm.modeling.prediction_head import BertLMHead, NextSentenceHead
from farm.data_handler.utils import randomize_and_split_file
from farm.train import Trainer
from farm.utils import set_all_seeds, MLFlowLogger, initialize_device_settings
from farm.utils import set_all_seeds, StdoutLogger, initialize_device_settings
import argparse
import torch

def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank",
type=int,
default=-1,
help="local_rank for distributed training on GPUs")
args, unknownargs = parser.parse_known_args()
logging.info(f"Got unknown arguments: {unknownargs}")
return args


def train_from_scratch(args):
Expand All @@ -21,57 +34,100 @@ def train_from_scratch(args):
level=logging.INFO,
)

ml_logger = MLFlowLogger(tracking_uri=args.get("mlflow_tracking_uri", "file:/opt/ml/model/mlflow"))
ml_logger.init_experiment(experiment_name="train_from_scratch", run_name="run")
#TODO prettify this loading of params from two sources (cmd + json)
cmd_args = parse_arguments()
args["local_rank"] = cmd_args.local_rank
logging.info(f'local_rank: {args["local_rank"]}')

next_sent_task = bool(int(args.get("next_sent_task", 1)))
distributed = True
use_amp = args.get("use_amp", None)
use_amp = None if use_amp == "" else use_amp


# Only the main process should log here
if args["local_rank"] in [-1, 0]:
ml_logger = StdoutLogger(tracking_uri=None)
ml_logger.init_experiment(experiment_name="train_from_scratch", run_name="run")

set_all_seeds(seed=39)
device, n_gpu = initialize_device_settings(use_cuda=True)
evaluate_every = int(args["evaluate_every"])

device, n_gpu = initialize_device_settings(use_cuda=True, local_rank=args["local_rank"], use_amp=use_amp)
effective_batch_size = int(args["per_gpu_batch_size"]) * int(args["gradient_accumulation_steps"]) * torch.distributed.get_world_size()

logging.info(
f'Training with effective batch size of {effective_batch_size} '
f'(per_gpu_batch_size = {int(args["per_gpu_batch_size"])}, gradient_accumulation_steps={int(args["gradient_accumulation_steps"])}, n_gpus = {torch.distributed.get_world_size()} )')

save_dir = Path("/opt/ml/model")
data_dir = Path("/opt/ml/input/data/input_channel")

# Split and shuffle training data
if args["local_rank"] in [-1, 0]:
randomize_and_split_file(data_dir / args["train_file"], output_dir=data_dir / "split_files")
# let other processes wait for splitted files from rank 0
torch.distributed.barrier()


args["train_file"] = data_dir / "split_files"

# 1.Create a tokenizer
tokenizer = BertTokenizer(data_dir/args["vocab_file"], do_lower_case=args["do_lower_case"])
tokenizer = BertTokenizer(data_dir/args["vocab_file"], do_lower_case=bool(int(args["do_lower_case"])))

# 2. Create a DataProcessor that handles all the conversion from raw text into a PyTorch Dataset
processor = BertStyleLMProcessor(
data_dir=data_dir,
tokenizer=tokenizer, max_seq_len=int(args["max_seq_len"]),
train_filename=args["train_file"],
train_filename=args.get("train_file"),
dev_filename=args.get("dev_file", None),
test_filename=args.get("test_file", None),
next_sent_pred_style=args.get("next_sent_pred_style", "bert-style"),
max_docs=args.get("max_docs", None),
next_sent_pred=next_sent_task
)

# 3. Create a DataSilo that loads several datasets (train/dev/test), provides DataLoaders for them and
# calculates a few descriptive statistics of our datasets
stream_data_silo = StreamingDataSilo(processor=processor, batch_size=int(args["batch_size"]))
# 3. Create a DataSilo that loads several datasets (train/dev/test) and provides DataLoaders for them
data_silo = StreamingDataSilo(processor=processor, batch_size=int(args["per_gpu_batch_size"]),
dataloader_workers=int(args.get("data_loader_workers", 8)),
distributed=distributed)


# 4. Create an AdaptiveModel
# a) which consists of a pretrained language model as a basis
language_model = LanguageModel.from_scratch("bert", tokenizer.vocab_size)

# b) and *two* prediction heads on top that are suited for our task => Language Model finetuning
lm_prediction_head = BertLMHead(768, tokenizer.vocab_size)
next_sentence_head = NextSentenceHead([768, 2], task_name="nextsentence")

model = AdaptiveModel(
language_model=language_model,
prediction_heads=[lm_prediction_head, next_sentence_head],
embeds_dropout_prob=0.1,
lm_output_types=["per_token", "per_sequence"],
device=device,
)
if next_sent_task:
next_sentence_head = NextSentenceHead(num_labels=2, task_name="nextsentence")
model = AdaptiveModel(
language_model=language_model,
prediction_heads=[lm_prediction_head, next_sentence_head],
embeds_dropout_prob=0.1,
lm_output_types=["per_token", "per_sequence"],
device=device,
)
else:
model = AdaptiveModel(
language_model=language_model,
prediction_heads=[lm_prediction_head],
embeds_dropout_prob=0.1,
lm_output_types=["per_token"],
device=device,
)

# 5. Create an optimizer
model, optimizer, lr_schedule = initialize_optimizer(
model=model,
learning_rate=float(args["learning_rate"]),
schedule_opts={"name": "LinearWarmup", "warmup_proportion": float(args["warmup_proportion"])},
n_batches=len(stream_data_silo.get_data_loader("train")),
n_batches=len(data_silo.get_data_loader("train")),
n_epochs=int(args["n_epochs"]),
device=device,
grad_acc_steps=int(args["gradient_accumulation_steps"]),
distributed=distributed,
use_amp=use_amp,
local_rank=args["local_rank"]
)

# 6. Feed everything to the Trainer, which keeps care of growing our model and evaluates it from time to time
Expand All @@ -85,17 +141,24 @@ def train_from_scratch(args):
trainer = Trainer.create_or_load_checkpoint(
model=model,
optimizer=optimizer,
data_silo=stream_data_silo,
data_silo=data_silo,
epochs=int(args["n_epochs"]),
n_gpu=n_gpu,
lr_schedule=lr_schedule,
evaluate_every=evaluate_every,
evaluate_every=int(args["evaluate_every"]),
log_loss_every=int(args.get("log_loss_every", 500)),
log_learning_rate=bool(int(args.get("log_learning_rate", 0))),
device=device,
local_rank=args["local_rank"],
grad_acc_steps=int(args["gradient_accumulation_steps"]),
checkpoint_every=checkpoint_every,
checkpoint_root_dir=checkpoint_root_dir,
checkpoints_to_keep=int(args.get("checkpoints_to_keep", 10)),
disable_tqdm=True,
use_amp=use_amp,
)
# 7. Let it grow! Watch the tracked metrics live on the public mlflow server: https://public-mlflow.deepset.ai

# 7. Let it grow!
trainer.train()

# 8. Hooray! You have a model. Store it:
Expand Down
Loading

0 comments on commit 5151b36

Please sign in to comment.