【分布式通信】NPKit,NCCL的Profiling工具

NPKit介绍

NPKit (Networking Profiling Kit) is a profiling framework designed for popular collective communication libraries (CCLs), including Microsoft MSCCL, NVIDIA NCCL and AMD RCCL.
It enables users to insert customized profiling events into different CCL components, especially into giant GPU kernels.
These events are then automatically placed onto a unified timeline in Google Trace Event Format, which users can then leverage trace viewer to understand CCLs’ workflow and performance.

以NCCL为例,如何使用?

Usage

  1. NCCL 2.17.1-1版本,将文件夹下的 npkit-for-nccl-2.17.1-1.diff 添加到你的nccl源文件中。
    实测nccl2.17和2.18都可以用。使用方法:
    git apply ../NPKit/nccl_sample/npkit-for-nccl-2.17.1-1.diff
    如果是对应的RCCL和MSCCL版本,方法类似。但如果是自己维护的其他分支,最好还是自己看看。

  2. NPKit只有在CPU和GPU没以后overlap的时候使用,所以 NPKIT_FLAGS 也要遵从这个规则。同时 npkit_launcher.sh里面的参数也要对应正确。
    我发现这个仓库提供的bash脚本有一点问题,编译的时候容易失败。所以我把bash脚本中的编译部分注释掉了,在外面编译。编译时记得检查对应的NPKIT_FLAGS
    这个仓库通过编译宏来控制profiling监控的对象,所以如果你这次做了all_reduce,下次想测alltoall,就得换一个NPKIT_FLAGS重新编译。

  3. nccl_testnpkit_runner.sh对应参数正确. 仅支持每个线程有1个GPU, 因此nccl_test运行参数记得是 -g 1

  4. 运行bash npkit_launcher.sh. 这个脚本会调用npkit_runner.

  5. 生成文件 npkit_event_trace.json ,可以用谷歌浏览器打开看。在浏览器那一栏输入chrome://tracing, 然后打开对应文件即可。

在这里插入图片描述

以下是nccl_sample中解析dump为trace的代码。

import argparse
import os
import json

from queue import Queue

def parse_npkit_event_header(npkit_event_header_path):
    npkit_event_def = {'id_to_type': {}, 'type_to_id': {}}
    with open(npkit_event_header_path, 'r') as f:
        lines = [x.strip() for x in f.readlines() if len(x.strip()) != 0]
        line_idx = 0
        while line_idx < len(lines):
            if lines[line_idx].startswith('#define NPKIT_EVENT_'):
                fields = lines[line_idx].split()
                if len(fields) == 3:
                    event_type = fields[1]
                    event_id = int(fields[2], 0)
                    npkit_event_def['type_to_id'][event_type] = event_id
                    npkit_event_def['id_to_type'][event_id] = event_type
            line_idx += 1
    return npkit_event_def

def parse_gpu_clock_scale(gpu_clock_file_path):
    with open(gpu_clock_file_path, 'r') as f:
        freq_in_khz = f.read()
        return float(freq_in_khz) * 1e3 / 1e6

def parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path):
    with open(cpu_clock_num_file_path, 'r') as f:
        num = float(f.read())
    with open(cpu_clock_den_file_path, 'r') as f:
        den = float(f.read())
    return den / num / 1e6

def parse_gpu_event(event_bytes):
    return {
        'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False),
        'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False),
        'rsvd': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False),
        'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False)
    }

def parse_cpu_event(event_bytes):
    return {
        'id': int.from_bytes(event_bytes[0:1], byteorder='little', signed=False),
        'size': int.from_bytes(event_bytes[1:5], byteorder='little', signed=False),
        'slot': int.from_bytes(event_bytes[5:8], byteorder='little', signed=False),
        'timestamp': int.from_bytes(event_bytes[8:16], byteorder='little', signed=False)
    }

def parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale):
    gpu_event_file_path = os.path.join(npkit_dump_dir, 'gpu_events_rank_%d_buf_%d' % (rank, buf_idx))
    raw_event_size = 16
    curr_cpu_base_time = None
    curr_gpu_base_time = None
    gpu_events = []
    event_type_to_seq = {}
    with open(gpu_event_file_path, 'rb') as f:
        raw_content = f.read()
        raw_content_size = len(raw_content)
        raw_content_idx = 0
        while raw_content_idx < raw_content_size:
            parsed_gpu_event = parse_gpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
            if npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_CPU':
                curr_cpu_base_time = parsed_gpu_event['timestamp'] / cpu_clock_scale
                curr_gpu_base_time = None
            elif npkit_event_def['id_to_type'][parsed_gpu_event['id']] == 'NPKIT_EVENT_TIME_SYNC_GPU':
                if curr_gpu_base_time is None:
                    curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale
            else:
                if curr_gpu_base_time is None:
                    curr_gpu_base_time = parsed_gpu_event['timestamp'] / gpu_clock_scale
                event_type = npkit_event_def['id_to_type'][parsed_gpu_event['id']]
                phase = 'B' if event_type.endswith('_ENTRY') else 'E'
                gpu_events.append({
                    'ph': phase,
                    'ts': curr_cpu_base_time + parsed_gpu_event['timestamp'] / gpu_clock_scale - curr_gpu_base_time,
                    'pid': rank,
                    'tid': buf_idx + 1
                })
                if phase == 'B':
                    if event_type not in event_type_to_seq:
                        event_type_to_seq[event_type] = 0
                    gpu_events[-1].update({
                        'name': event_type,
                        'cat': 'GPU',
                        'args': {
                            'rank': rank,
                            'buf_idx': buf_idx,
                            'seq': event_type_to_seq[event_type],
                            'rsvd_0': parsed_gpu_event['rsvd'],
                            'size_0': parsed_gpu_event['size']
                        }
                    })
                    event_type_to_seq[event_type] += 1
                else:
                    gpu_events[-1]['args'] = {'size': parsed_gpu_event['size'], 'rsvd': parsed_gpu_event['rsvd']}
                    delta_time = gpu_events[-1]['ts'] - gpu_events[-2]['ts']
                    gpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else gpu_events[-1]['args']['size'] / delta_time / 1e3
            raw_content_idx += raw_event_size
    return gpu_events

def parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale):
    cpu_event_file_path = os.path.join(npkit_dump_dir, 'cpu_events_rank_%d_channel_%d' % (rank, channel))
    raw_event_size = 16
    cpu_events = []
    event_type_to_seq = {}

    fiber_is_usable = []
    fiber_open_ts = []
    slot_to_fiber_id = {}
    channel_shift = 1000

    with open(cpu_event_file_path, 'rb') as f:
        raw_content = f.read()
        raw_content_size = len(raw_content)
        raw_content_idx = 0
        while raw_content_idx < raw_content_size:
            parsed_cpu_event = parse_cpu_event(raw_content[raw_content_idx : raw_content_idx + raw_event_size])
            event_type = npkit_event_def['id_to_type'][parsed_cpu_event['id']]
            phase = 'B' if event_type.endswith('_ENTRY') else 'E'
            cpu_events.append({
                'ph': phase,
                'ts': parsed_cpu_event['timestamp'] / cpu_clock_scale,
                'pid': rank
            })
            slot = parsed_cpu_event['slot']
            if phase == 'B':
                # Open fiber event
                fiber_id = 0
                while fiber_id < len(fiber_is_usable):
                    if fiber_is_usable[fiber_id]:
                        break
                    fiber_id += 1
                if fiber_id == len(fiber_is_usable):
                    fiber_is_usable.append(True)
                    fiber_open_ts.append(0.0)
                slot_to_fiber_id[slot] = fiber_id
                fiber_open_ts[fiber_id] = cpu_events[-1]['ts']
                fiber_is_usable[fiber_id] = False

                if event_type not in event_type_to_seq:
                    event_type_to_seq[event_type] = 0
                cpu_events[-1].update({
                    'name': event_type,
                    'cat': 'CPU',
                    'args': {
                        'rank': rank,
                        'channel': channel,
                        'slot': parsed_cpu_event['slot'],
                        'seq': event_type_to_seq[event_type],
                        'size_0': parsed_cpu_event['size']
                    }
                })
                event_type_to_seq[event_type] += 1
            else:
                # Close fiber event
                fiber_id = slot_to_fiber_id[slot]
                slot_to_fiber_id.pop(slot)
                last_ts = fiber_open_ts[fiber_id]
                fiber_is_usable[fiber_id] = True

                delta_time = max(0.001, cpu_events[-1]['ts'] - last_ts)
                cpu_events[-1]['args'] = {'size': parsed_cpu_event['size']}
                cpu_events[-1]['args']['bw (GB/s)'] = 0. if delta_time == 0. else cpu_events[-1]['args']['size'] / delta_time / 1e3

            cpu_events[-1]['tid'] = fiber_id + (channel + 1) * channel_shift

            raw_content_idx += raw_event_size
    return cpu_events

def convert_npkit_dump_to_trace(npkit_dump_dir, output_dir, npkit_event_def):
    files_in_dump_dir = next(os.walk(npkit_dump_dir))[2]
    gpu_event_files = [x for x in files_in_dump_dir if x.startswith('gpu_events_rank_')]
    cpu_event_files = [x for x in files_in_dump_dir if x.startswith('cpu_events_rank_')]

    ranks = list(set([int(x.split('_rank_')[1].split('_')[0]) for x in gpu_event_files]))
    buf_indices = list(set([int(x.split('_buf_')[1].split('_')[0]) for x in gpu_event_files]))
    channels = list(set([int(x.split('_channel_')[1].split('_')[0]) for x in cpu_event_files]))

    trace = {'traceEvents': []}

    for rank in ranks:
        cpu_clock_den_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_den_rank_%d' % rank)
        cpu_clock_num_file_path = os.path.join(npkit_dump_dir, 'cpu_clock_period_num_rank_%d' % rank)
        cpu_clock_scale = parse_cpu_clock_scale(cpu_clock_den_file_path, cpu_clock_num_file_path)

        gpu_clock_file_path = os.path.join(npkit_dump_dir, 'gpu_clock_rate_rank_%d' % rank)
        gpu_clock_scale = parse_gpu_clock_scale(gpu_clock_file_path)

        for buf_idx in buf_indices:
            gpu_events = parse_gpu_event_file(npkit_dump_dir, npkit_event_def, rank, buf_idx, gpu_clock_scale, cpu_clock_scale)
            trace['traceEvents'].extend(gpu_events)

        for channel in channels:
            cpu_events = parse_cpu_event_file(npkit_dump_dir, npkit_event_def, rank, channel, cpu_clock_scale)
            trace['traceEvents'].extend(cpu_events)

    trace['traceEvents'].sort(key=lambda x : x['ts'])
    trace['displayTimeUnit'] = 'ns'

    os.makedirs(output_dir, exist_ok=True)
    with open(os.path.join(output_dir, 'npkit_event_trace.json'), 'w') as f:
        json.dump(trace, f)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--npkit_dump_dir', type=str, required=True, help='NPKit dump directory.')
    parser.add_argument('--npkit_event_header_path', type=str, required=True, help='Path to npkit_event.h.')
    parser.add_argument('--output_dir', type=str, required=True, help='Path to output directory.')
    args = parser.parse_args()

    npkit_event_def = parse_npkit_event_header(args.npkit_event_header_path)
    convert_npkit_dump_to_trace(args.npkit_dump_dir, args.output_dir, npkit_event_def)

测试效果

我在8卡A100上跑了一下ring。
也不知道对不对。
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/587177.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java项目:88 springboot104学生网上请假系统设计与实现

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 本学生网上请假系统管理员&#xff0c;教师&#xff0c;学生。 管理员功能有个人中心&#xff0c;学生管理&#xff0c;教师管理&#xff0c;班级信息…

程序员与土地的关系

目录 一、土地对人类的重要性 二、程序员与土地的关系 二、程序员如何利用GIS技术改变土地管理效率&#xff1f; 四、GIS技术有哪些运用&#xff1f; 五、shapely库计算多边形面积的例子 一、土地对人类的重要性 土地资源对人类是至关重要的。土地是人类赖…

力扣HOT100 - 131. 分割回文串

解题思路&#xff1a; class Solution {List<List<String>> res new ArrayList<>();List<String> pathnew ArrayList<>();public List<List<String>> partition(String s) {backtrack(s,0);return res;}public void backtrack(Str…

Windows下面源码安装PostgreSQL

目录 一、环境&#xff1a; 二、安装MSYS2 三、安装PG 四、初始化数据库 五、启停数据库 六、调试PG 平时我们在LINUX下&#xff0c;使用源码安装PG的比较多&#xff0c;但在WINDOWS下安装&#xff0c;一般是使用二机制安装包来安装&#xff0c;能否使用源码来安装呢&…

力扣82-链表、迭代 的思考

题目解读 给定一个已排序的链表的头 head &#xff0c; 删除原始链表中所有重复数字的节点&#xff0c;只留下不同的数字 。返回 已排序的链表 。 两个示范 思考 返回链表&#xff1a;返回更新链表后的头结点&#xff1b; 更新链表&#xff1a;判断重复元素&#xff0c;改变指针…

政府统计中如何使用大数据

当今世界&#xff0c;科技进步日新月异&#xff0c;互联网、云计算、大数据等现代信息技术深刻改变着人类的思维、生产、生活、学习方式。信息技术与经济社会的交汇融合引发了数据爆发式增长&#xff0c;数据已成为重要生产要素和国家基础性战略资源。近年来&#xff0c;国家统…

AI家居设备的未来:智能家庭的下一个大步

&#x1f512;目录 ☂️智能家居设备的发展和AI技术的作用 ❤️AI技术实现智能家居设备的自动化控制和智能化交互的依赖 AI家居设备的未来应用场景 &#x1f4a3;智能家庭在未来的发展和应用前景 &#x1f4a5;智能家居设备的发展和AI技术的作用 智能家居设备的发展和AI技术的…

【webrtc】MessageHandler 9: 基于线程的消息处理:执行Port销毁自己

Port::Port 构造的时候,就触发了一个异步操作,但是这个操作是要在 thread 里执行的,因此要通过post 消息 MSG_DESTROY_IF_DEAD 到thread跑:port的创建并米有要求在thread中 但是port的析构却在thread里 这是为啥呢?

【C# IO操作专题】

FileStream 是一个在多种编程语言中常见的概念&#xff0c;它代表了一个用于读写文件的流。在不同的编程语言中&#xff0c;FileStream 的实现和使用方式可能会有所不同&#xff0c;但基本概念是相似的&#xff1a;它允许程序以流的形式访问文件&#xff0c;即可以顺序地读取或…

二分图--判定以及最大匹配

水了个圈钱杯省一&#xff0c;不过估计国赛也拿不了奖&#xff0c;但还是小小挣扎一下。 什么是二分图&#xff1a;G(V,E)是一个无向图&#xff0c;若顶点V可以分为两个互不相交的子集A,B&#xff0c;并图中的每一条边&#xff08;i,j)所关联的ij属于不同的顶点集&#xff0c;…

2024 华东杯高校数学建模邀请赛(A题)| 比赛出场顺序 | 建模秘籍文章代码思路大全

铛铛&#xff01;小秘籍来咯&#xff01; 小秘籍团队独辟蹊径&#xff0c;以图匹配&#xff0c;多目标规划等强大工具&#xff0c;构建了这一题的详细解答哦&#xff01; 为大家量身打造创新解决方案。小秘籍团队&#xff0c;始终引领着建模问题求解的风潮。 抓紧小秘籍&#x…

24 JavaScript学习:this

this在对象方法中 在 JavaScript 中&#xff0c;this 的值取决于函数被调用的方式。在对象方法中&#xff0c;this 引用的是调用该方法的对象。 让我们看一个简单的例子&#xff1a; const person {firstName: John,lastName: Doe,fullName: function() {return this.firstN…

批处理优化

1.4、总结 Key的最佳实践 固定格式&#xff1a;[业务名]:[数据名]:[id]足够简短&#xff1a;不超过44字节不包含特殊字符 Value的最佳实践&#xff1a; 合理的拆分数据&#xff0c;拒绝BigKey选择合适数据结构Hash结构的entry数量不要超过1000设置合理的超时时间 2、批处理优…

cnPuTTY 0.81.0.1—PuTTY Release 0.81中文版本简单说明~~

2024-04-15 官方发布PuTTY 0.81本次发布主要修复了使用521位ECDSA密钥时的一个严重漏洞(CVE-2024-31497)。 如果您使用521位ECDSA私钥与任何早期版本的PuTTY组合&#xff0c;请考虑私钥已泄露的问题。强烈建议从相关文件中删除公钥&#xff0c;并使用新版本程序重新生成密钥对。…

6.C++模板(超全)

目录 1. 泛型编程 2. 函数模板 2.1 函数模板概念 2.1 函数模板格式 2.2 函数模板的原理 2.3 函数模板的实例化 2.4 模板参数的匹配原则 3. 类模板 1. 泛型编程 如何实现一个通用的交换函数呢&#xff1f; void Swap(int& left, int& right) {int temp left;…

【大模型学习】Transformer(学习笔记)

Transformer介绍 word2vec Word2Vec是一种用于将词语映射到连续向量空间的技术&#xff0c;它是由Google的Tomas Mikolov等人开发的。Word2Vec模型通过学习大量文本数据中的词语上下文信息&#xff0c;将每个词语表示为高维空间中的向量。在这个向量空间中&#xff0c;具有相似…

关于用户体验和设计思维

介绍 要开发有效的原型并为用户提供出色的体验&#xff0c;了解用户体验 (UX) 和设计思维的原则至关重要。 用户体验是用户与产品、服务或系统交互并获得相应体验的过程。 设计思维是一种解决问题的方法&#xff0c;侧重于创新和创造。 在启动期实现用户体验和设计思维时&#…

Chinese-CLIP使用教程

目录 一&#xff1a;运行环境 二&#xff1a;代码架构 三&#xff1a;数据集准备 1. 文本数据处理 训练集文本处理 测试集文本处理 2. 图像数据处理 3. 生成LMDB数据库 四、模型微调 五&#xff1a;模型验证与测试 1. 提取图文特征 2. 图文检索 3. 计算召回率 六…

23 JavaScript学习:验证API

JavaScript验证API 举例&#xff1a; <input id"id1" type"number" min"100" max"300" required> <button onclick"myFunction()">验证</button><p id"demo"></p><script>f…

pinctrl和gpio子系统

文章目录 一、pinctrl 子系统简介二、pinctrl子系统的配置形式分析1.主要功能2.配置格式3.pinctrl驱动匹配 三、gpio子系统1.gpio系统使用流程 四、程序举例-led五、总结 一、pinctrl 子系统简介 在led操作设备树的实验中&#xff0c;对于gpio的初始化是直接操作的寄存器&…