From 8738d3d007b2faec4f5dd3c73065cd00b89130f8 Mon Sep 17 00:00:00 2001 From: John Calderon Date: Mon, 25 Nov 2024 16:13:53 -0500 Subject: [PATCH 1/2] added pytorch lightning support --- deepview_profile/export_converter.py | 153 ++++++++++++++++++++++ deepview_profile/pl/deepview_callback.py | 92 +++++++++++++ deepview_profile/pl/deepview_interface.py | 141 ++++++++++++++++++++ deepview_profile/profiler/utilization.py | 14 +- examples/pytorch_lightning/example.py | 64 +++++++++ pyproject.toml | 1 + 6 files changed, 455 insertions(+), 10 deletions(-) create mode 100644 deepview_profile/export_converter.py create mode 100644 deepview_profile/pl/deepview_callback.py create mode 100644 deepview_profile/pl/deepview_interface.py create mode 100644 examples/pytorch_lightning/example.py diff --git a/deepview_profile/export_converter.py b/deepview_profile/export_converter.py new file mode 100644 index 0000000..1cd0c22 --- /dev/null +++ b/deepview_profile/export_converter.py @@ -0,0 +1,153 @@ +import json + + +def convert(message): + new_message = {} + with open("message.json", "w") as fp: + json.dump(message, fp, indent=4) + + new_message["ddp"] = {} + new_message["message_type"] = message["message_type"] + new_message["project_root"] = message["project_root"] + new_message["project_entry_point"] = message["project_entry_point"] + + new_message["hardware_info"] = { + "hostname": message["hardware_info"]["hostname"], + "os": message["hardware_info"]["os"], + "gpus": message["hardware_info"]["gpus"], + } + + new_message["throughput"] = { + "samples_per_second": message["throughput"]["samples_per_second"], + "predicted_max_samples_per_second": message["throughput"][ + "predicted_max_samples_per_second" + ], + "run_time_ms": ( + [ + message["throughput"]["run_time_ms"]["slope"], + message["throughput"]["run_time_ms"]["bias"], + ] + if "run_time_ms" in message["throughput"] + else [0, 0] + ), + "peak_usage_bytes": ( + [ + message["throughput"]["peak_usage_bytes"]["slope"], + message["throughput"]["peak_usage_bytes"]["bias"], + ] + if "peak_usage_bytes" in message["throughput"] + else [0, 0] + ), + "batch_size_context": None, + "can_manipulate_batch_size": False, + } + + new_message["utilization"] = message["utilization"] + + def fix(a): + for d in ["cpu", "gpu"]: + for s in ["Forward", "Backward"]: + if f"{d}_{s.lower()}" in a: + a[f"{d}{s}"] = a[f"{d}_{s.lower()}"] + del a[f"{d}_{s.lower()}"] + else: + a[f"{d}{s}"] = 0 + + if f"{d}_{s.lower()}_span" in a: + a[f"{d}{s}Span"] = a[f"{d}_{s.lower()}_span"] + del a[f"{d}_{s.lower()}_span"] + else: + a[f"{d}{s}Span"] = 0 + + if "children" not in a: + a["children"] = [] + return + + if a: + for c in a["children"]: + fix(c) + + fix(new_message["utilization"]["rootNode"]) if new_message["utilization"].get('rootNode', None) else None + try: + new_message["utilization"]["tensor_core_usage"] = message["utilization"][ + "tensor_utilization" + ] + except: + new_message["utilization"]["tensor_core_usage"] = 0 + + new_message["habitat"] = { + "predictions": [ + ( + [prediction["device_name"], prediction["runtime_ms"]] + if prediction["device_name"] != "unavailable" + else ["default_device", 0] + ) + for prediction in message["habitat"]["predictions"] + ] + } + + new_message["breakdown"] = { + "peak_usage_bytes": int(message["breakdown"]["peak_usage_bytes"]), + "memory_capacity_bytes": int(message["breakdown"]["memory_capacity_bytes"]), + "iteration_run_time_ms": message["breakdown"]["iteration_run_time_ms"], + # TODO change these hardcoded numbers + "batch_size": 48, + "num_nodes_operation_tree": len(message["breakdown"]["operation_tree"]), + "num_nodes_weight_tree": 0, + "operation_tree": [ + { + "name": op["name"], + "num_children": op["num_children"] if "num_children" in op else 0, + "forward_ms": op["operation"]["forward_ms"], + "backward_ms": op["operation"]["backward_ms"], + "size_bytes": ( + int(op["operation"]["size_bytes"]) + if "size_bytes" in op["operation"] + else 0 + ), + "file_refs": ( + [ + { + "path": "/".join(ctx["context"]["file_path"]["components"]), + "line_no": ctx["context"]["line_number"], + "run_time_ms": ctx["run_time_ms"], + "size_bytes": ( + int(ctx["size_bytes"]) if "size_bytes" in ctx else 0 + ), + } + for ctx in op["operation"]["context_info_map"] + ] + if "context_info_map" in op["operation"] + else list() + ), + } + for op in message["breakdown"]["operation_tree"] + ], + } + + def fix_components(m): + for c in m["components"]: + if "consumption_joules" not in c: + c["consumption"] = 0 + else: + c["consumption"] = c["consumption_joules"] + del c["consumption_joules"] + c["type"] = c["component_type"] + if c["type"] == "ENERGY_NVIDIA": + c["type"] = "ENERGY_GPU" + del c["component_type"] + + new_message["energy"] = { + "current": { + "total_consumption": message["energy"]["total_consumption"], + "components": message["energy"]["components"], + "batch_size": 48, + }, + "past_measurements": message["energy"]["past_measurements"], + } + + fix_components(new_message["energy"]["current"]) + for m in new_message["energy"]["past_measurements"]: + fix_components(m) + + return new_message diff --git a/deepview_profile/pl/deepview_callback.py b/deepview_profile/pl/deepview_callback.py new file mode 100644 index 0000000..aa64d78 --- /dev/null +++ b/deepview_profile/pl/deepview_callback.py @@ -0,0 +1,92 @@ +from typing import Callable, Tuple + +import time +import os +import json +import torch +import sys + +try: + import pytorch_lightning as pl +except ImportError: + sys.exit("Please install pytorch-lightning:\nuse: pip install lightning\nExiting...") + +from termcolor import colored +from deepview_profile.pl.deepview_interface import trigger_profiling + + +class DeepViewProfilerCallback(pl.Callback): + def __init__(self, profile_name: str): + super().__init__() + self.profiling_triggered = False + self.output_filename = f"{profile_name}_{int(time.time())}.json" + + def on_train_batch_end( + self, + trainer: pl.Trainer, + pl_module: pl.LightningModule, + outputs, + batch, + batch_idx, + ): + + # only do this once + if self.profiling_triggered: + return + + print(colored("DeepViewProfiler: Running profiling.", "green")) + + """ + need 3 things: + + input_provider: just return batch + model_provider: just return pl_module + iteration_provider: a lambda function that (a) calls pl_module.forward_step and (b) calls loss.backward + """ + initial_batch_size = batch[0].shape[0] + + def input_provider(batch_size: int = initial_batch_size) -> Tuple: + model_inputs = list() + for elem in batch: + # we assume the first dimension is the batch dimension + model_inputs.append( + elem[:1].repeat([batch_size] + [1 for _ in elem.shape[1:]]) + ) + return (tuple(model_inputs), 0) + + model_provider = lambda: pl_module + + def iteration_provider(module: torch.nn.Module) -> Callable: + def iteration(*args, **kwargs): + loss = module.training_step(*args, **kwargs) + loss.backward() + + return iteration + + project_root = os.getcwd() + + output = trigger_profiling( + project_root, + "entry_point.py", + initial_batch_size, + input_provider, + model_provider, + iteration_provider, + ) + + with open(self.output_filename, "w") as fp: + json.dump(output, fp, indent=4) + + print( + colored( + f"DeepViewProfiler: Profiling complete! Report written to ", "green" + ) + + colored(self.output_filename, "green", attrs=["bold"]) + ) + print( + colored( + f"DeepViewProfiler: View your report at https://deepview.centml.ai", + "green", + ) + ) + self.profiling_triggered = True diff --git a/deepview_profile/pl/deepview_interface.py b/deepview_profile/pl/deepview_interface.py new file mode 100644 index 0000000..4e9742c --- /dev/null +++ b/deepview_profile/pl/deepview_interface.py @@ -0,0 +1,141 @@ +import sys +from typing import Callable +import platform + +from deepview_profile.analysis.session import AnalysisSession +from deepview_profile.exceptions import AnalysisError +from deepview_profile.nvml import NVML + +# from deepview_profile.utils import release_memory, next_message_to_dict, files_encoded_unique +from deepview_profile.utils import release_memory, files_encoded_unique +from deepview_profile.error_printing import print_analysis_error + +from google.protobuf.json_format import MessageToDict + + +def measure_breakdown(session, nvml): + print("analysis: running measure_breakdown()") + yield session.measure_breakdown(nvml) + release_memory() + + +def measure_throughput(session): + print("analysis: running measure_throughput()") + yield session.measure_throughput() + release_memory() + + +def habitat_predict(session): + print("analysis: running deepview_predict()") + yield session.habitat_predict() + release_memory() + + +def measure_utilization(session): + print("analysis: running measure_utilization()") + yield session.measure_utilization() + release_memory() + + +def energy_compute(session): + print("analysis: running energy_compute()") + yield session.energy_compute() + release_memory() + + +def ddp_analysis(session): + print("analysis: running ddp_computation()") + yield session.ddp_computation() + release_memory() + + +def hardware_information(nvml): + hardware_info = { + "hostname": platform.node(), + "os": " ".join(list(platform.uname())), + "gpus": nvml.get_device_names(), + } + return hardware_info + + +class DummyStaticAnalyzer: + def batch_size_location(self): + return None + + +def next_message_to_dict(a): + message = next(a) + return MessageToDict(message, preserving_proto_field_name=True) + + +def trigger_profiling( + project_root: str, + entry_point: str, + initial_batch_size: int, + input_provider: Callable, + model_provider: Callable, + iteration_provider: Callable, +): + try: + data = { + "analysis": { + "message_type": "analysis", + "project_root": project_root, + "project_entry_point": entry_point, + "hardware_info": {}, + "throughput": {}, + "breakdown": {}, + "habitat": {}, + "additionalProviders": "", + "energy": {}, + "utilization": {}, + "ddp": {}, + }, + "epochs": 50, + "iterations": 1000, + "encodedFiles": [], + } + + session = AnalysisSession( + project_root, + entry_point, + project_root, + model_provider, + input_provider, + iteration_provider, + initial_batch_size, + DummyStaticAnalyzer(), + ) + release_memory() + + exclude_source = False + + with NVML() as nvml: + data["analysis"]["hardware_info"] = hardware_information(nvml) + data["analysis"]["breakdown"] = next_message_to_dict( + measure_breakdown(session, nvml) + ) + + operation_tree = data["analysis"]["breakdown"]["operation_tree"] + if not exclude_source and operation_tree is not None: + data["encodedFiles"] = files_encoded_unique(operation_tree) + + data["analysis"]["throughput"] = next_message_to_dict( + measure_throughput(session) + ) + data["analysis"]["habitat"] = next_message_to_dict(habitat_predict(session)) + data["analysis"]["utilization"] = next_message_to_dict( + measure_utilization(session) + ) + data["analysis"]["energy"] = next_message_to_dict(energy_compute(session)) + # data['analysis']['ddp'] = next_message_to_dict(ddp_analysis(session)) + + from deepview_profile.export_converter import convert + + data["analysis"] = convert(data["analysis"]) + + return data + + except AnalysisError as ex: + print_analysis_error(ex) + sys.exit(1) diff --git a/deepview_profile/profiler/utilization.py b/deepview_profile/profiler/utilization.py index 000abe1..a66cd1d 100644 --- a/deepview_profile/profiler/utilization.py +++ b/deepview_profile/profiler/utilization.py @@ -395,19 +395,11 @@ def _deepview_analysis(self, filepath): startTime = time.time() tp = self._get_perfetto_object(filepath) - profilerStepStart = tp.query_dict( - "select * from slices where name like '%ProfilerStep%'" - ) - main_track = profilerStepStart[0]["track_id"] - start = profilerStepStart[0]["ts"] - end = start + profilerStepStart[0]["dur"] - profilerStartDepth = profilerStepStart[0]["depth"] - rootQuery = tp.query_dict( f""" select * from slices where name like '%nn.Module:%' and depth = - (SELECT MIN(depth) from slices where name like '%nn.Module%' and depth>{profilerStartDepth} and ts between {start} and {end} and track_id = {main_track}) + (SELECT MIN(depth) from slices where name like '%nn.Module%') """ )[0] @@ -545,7 +537,9 @@ def serialize_response(respNode, rootNode): _serialize_node(respNode, rootNode) -def utilization_analysis(model_provider, input_provider, iteration_provider): +def utilization_analysis( + model_provider, input_provider, iteration_provider +): model = model_provider() inputs = input_provider() iteration = iteration_provider(model) diff --git a/examples/pytorch_lightning/example.py b/examples/pytorch_lightning/example.py new file mode 100644 index 0000000..2cb1a8b --- /dev/null +++ b/examples/pytorch_lightning/example.py @@ -0,0 +1,64 @@ +import torch +import torch.nn as nn +from torch.utils.data import DataLoader +from torchvision import datasets, transforms, models +import pytorch_lightning as pl + +from deepview_profile.pl.deepview_callback import DeepViewProfilerCallback + +class ResNetModel(pl.LightningModule): + def __init__(self, num_classes=10, learning_rate=1e-3): + super(ResNetModel, self).__init__() + self.model = models.resnet18(pretrained=True) + self.model.conv1 = nn.Conv2d( + 1, 64, kernel_size=7, stride=2, padding=3, bias=False + ) + self.model.fc = nn.Linear(self.model.fc.in_features, num_classes) + self.learning_rate = learning_rate + self.criterion = nn.CrossEntropyLoss() + + def forward(self, x): + return self.model(x) + + def training_step(self, batch, batch_idx): + x, y = batch + y_hat = self(x) + loss = torch.nn.functional.cross_entropy(y_hat, y) + return loss + + def validation_step(self, batch, batch_idx): + x, y = batch + y_hat = self(x) + loss = torch.nn.functional.cross_entropy(y_hat, y) + acc = (y_hat.argmax(dim=1) == y).float().mean() + self.log('val_loss', loss) + self.log('val_acc', acc) + + def configure_optimizers(self): + optimizer = torch.optim.Adam(self.parameters(), lr=1e-3) + return optimizer + +def mnist_dataloader(batch_size=32): + transform = transforms.Compose([transforms.Resize(224), + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,))]) + + mnist_train = datasets.MNIST(root='mnist_data', train=True, download=True, transform=transform) + mnist_val = datasets.MNIST(root='mnist_data', train=False, download=True, transform=transform) + + train_loader = DataLoader(mnist_train, batch_size=batch_size, shuffle=True) + val_loader = DataLoader(mnist_val, batch_size=batch_size) + + return train_loader, val_loader + +if __name__ == '__main__': + train_loader, val_loader = mnist_dataloader(batch_size=16) + model = ResNetModel() + + dv_callback = DeepViewProfilerCallback("example") + + trainer = pl.Trainer( + max_epochs=2, accelerator='gpu', devices=1, + callbacks=[dv_callback] + ) + trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=val_loader) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 105b8fd..d8650ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ orjson = "*" torch-tb-profiler = "*" pymongo = "*" scipy = "*" +termcolor = "*" [tool.poetry.dev-dependencies] From 346e099647de26defa3f20881b3e330457db8142 Mon Sep 17 00:00:00 2001 From: John Calderon Date: Tue, 26 Nov 2024 11:20:57 -0500 Subject: [PATCH 2/2] fix format errors --- deepview_profile/export_converter.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/deepview_profile/export_converter.py b/deepview_profile/export_converter.py index 1cd0c22..a5e33fe 100644 --- a/deepview_profile/export_converter.py +++ b/deepview_profile/export_converter.py @@ -67,7 +67,11 @@ def fix(a): for c in a["children"]: fix(c) - fix(new_message["utilization"]["rootNode"]) if new_message["utilization"].get('rootNode', None) else None + ( + fix(new_message["utilization"]["rootNode"]) + if new_message["utilization"].get("rootNode", None) + else None + ) try: new_message["utilization"]["tensor_core_usage"] = message["utilization"][ "tensor_utilization" @@ -143,11 +147,12 @@ def fix_components(m): "components": message["energy"]["components"], "batch_size": 48, }, - "past_measurements": message["energy"]["past_measurements"], + "past_measurements": message["energy"].get("past_measurements", None), } fix_components(new_message["energy"]["current"]) - for m in new_message["energy"]["past_measurements"]: - fix_components(m) + if new_message["energy"].get("past_measurements", None): + for m in new_message["energy"]["past_measurements"]: + fix_components(m) return new_message