torch分布式通信基础

torch分布式通信基础

  • 1. 点到点通信
  • 2. 集群通信

官网文档:WRITING DISTRIBUTED APPLICATIONS WITH PYTORCH

1. 点到点通信

在这里插入图片描述

# 同步,peer-2-peer数据传递
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def test_send_recv_sync(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        dist.send(tensor=tensor, dst=1) # 需要指定dst,发送的目标
    else:
        dist.recv(tensor=tensor, src=0) # 需要指定src,从哪儿接收
    print('Rank ', rank, ' has data ', tensor[0])

# 异步
def test_send_recv_async(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        req = dist.isend(tensor=tensor, dst=1)
    else:
        req = dist.irecv(tensor=tensor, src=0)
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

def init_process(rank, size, backend='gloo'):
    """ 这里初始化分布式环境,设定Master机器以及端口号 """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29598'
    dist.init_process_group(backend, rank=rank, world_size=size)
    #test_send_recv_sync(rank, size)
    test_send_recv_async(rank, size)

if __name__ == "__main__":
    size = 2
    processes = []
    mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

2. 集群通信

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def test_broadcast(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
      tensor += 2
    else:
      tensor += 1
    dist.broadcast(tensor=tensor,src=0) # src指定broad_cast的源
    print("******test_broadcast******")
    print('Rank ', rank, ' has data ', tensor) # 结果都是 2

def test_scatter(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
      tensor_list = [torch.tensor([1.0]), torch.tensor([2.0]), torch.tensor([3.0]), torch.tensor([4.0])]
      dist.scatter(tensor, scatter_list = tensor_list, src = 0)
    else:
      dist.scatter(tensor, scatter_list = [], src = 0)
    print("******test_scatter******")
    print('Rank ', rank, ' has data ', tensor) # 结果是[[1], [2], [3], [4]]

def test_reduce(rank, size):
    tensor = torch.ones(1)
    dist.reduce(tensor=tensor, dst=0) # dst指定哪个进程进行reduce, 默认操作是加法
    print("******test_reduce******")
    print('Rank ', rank, ' has data ', tensor)

def test_all_reduce(rank, size):
    tensor = torch.ones(1)
    dist.all_reduce(tensor=tensor,op=dist.ReduceOp.SUM)
    print("******test_all_reduce******")
    print('Rank ', rank, ' has data ', tensor)  # 结果都是 4

def test_gather(rank, size):
    tensor = torch.ones(1)
    if rank == 0:
      output = [torch.zeros(1) for _ in range(size)]
      dist.gather(tensor, gather_list=output, dst=0)
    else:
      dist.gather(tensor, gather_list=[], dst=0)
    if rank == 0:
      print("******test_gather******")
      print('Rank ', rank, ' has data ', output)  # 结果是 [[1,1,1,1]]

def test_all_gather(rank, size):
    output = [torch.zeros(1) for _ in range(size)]
    tensor = torch.ones(1)
    dist.all_gather(output, tensor)
    print("******test_all_gather******")
    print('Rank ', rank, ' has data ', output)  # 结果都是 [1,1,1,1]

def init_process(rank, size, backend='gloo'):
    """ 这里初始化分布式环境,设定Master机器以及端口号 """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29596'
    dist.init_process_group(backend, rank=rank, world_size=size)
    test_reduce(rank, size)
    test_all_reduce(rank, size)
    test_gather(rank, size)
    test_all_gather(rank, size)
    test_broadcast(rank, size)
    test_scatter(rank, size)

if __name__ == "__main__":
    size = 4
    processes = []
    mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

需要注意的一点是:
这里面的调用都是同步的,可以理解为,每个进程都调用到通信api时,真正的有效数据传输才开始,然后通信完成之后,代码继续往下跑。实际上有些通信进程并不获取数据,这些进程可能并不会被阻塞。

文档最后,提供了一个简单的类似 DDP 的实现,里面核心的部分就是:
在这里插入图片描述
在这里插入图片描述
这也进一步阐释了DDP的核心逻辑:
反向计算完成之后,汇总梯度信息(求均值),然后再更新参数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/41866.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CSS:给子元素设置了浮动,页面缩放的时候,子元素往下掉

前言 给子元素设置了浮动&#xff0c;页面缩放的时候&#xff0c;子元素往下掉 html代码&#xff1a; <div class"father"><div class"child1"></div><div class"child2"></div> </div>css代码 .child1…

动态内存管理(C语言)

动态内存管理 1. 为什么存在动态内存管理2. 动态内存函数的介绍2.1 malloc函数和free函数2.2 calloc函数2.3 realloc函数 3. 常见的动态内存错误3.1 对NULL指针的解引用操作3.2 对动态开辟空间的越界访问3.3 对非动态开辟内存使用free函数3.4 使用free释放动态开辟内存的一部分…

旅游卡小程序软件招商加盟代理

旅游卡小程序软件招商加盟代理 我国人民生活水平的提高&#xff0c;旅游业成为了人们生活中必不可少的一部分。旅游卡小程序软件作为旅游行业的重要组成部分&#xff0c;也日益受到人们的关注。如今&#xff0c;旅游卡小程序软件招商加盟代理已经成为了一个热门的投资创业项…

详解c++---特殊类设计

目录标题 设计一个不能被拷贝的类设计一个只能从堆上创建对象的类设计一个只能在栈上创建对象的类设计一个无法被继承的类什么是单例模式饿汉模式饿汉模式的缺点懒汉模式懒汉模式的优点懒汉模式的缺点特殊的懒汉 设计一个不能被拷贝的类 拷贝只会放生在两个场景中&#xff1a;…

Jmeter+Maven+jenkins+eclipse搭建自动化测试平台

背景&#xff1a; 首先用jmeter录制或者书写性能测试的脚本&#xff0c;用maven添加相关依赖&#xff0c;把性能测试的代码提交到github&#xff0c;在jenkins配置git下载性能测试的代码&#xff0c;配置运行脚本和测试报告&#xff0c;配置运行失败自动发邮件通知&#xff0c…

Go-新手速成-流程语句

1if Go的if不建议写&#xff08;&#xff09;&#xff0c;over //if条件判断age : 16if age < 18 {fmt.Println("未成年")} 2for循环 Go摈弃了while和do while 循环&#xff0c;因为他做到了极简(也不要括号) 这么写可以 total : 0for i : 0; i < 100; i {…

Goby 漏洞发布|天擎终端安全管理系统 YII_CSRF_TOKEN 远程代码执行漏洞

漏洞名称&#xff1a;天擎终端安全管理系统 YII_CSRF_TOKEN 远程代码执行漏洞 English Name&#xff1a;Tianqing terminal security management system YII_CSRF_TOKEN remote code execution vulnerability CVSS core: 9.8 影响资产数&#xff1a;875 漏洞描述&#xff1…

2023婴幼儿奶粉市场数据分析(天猫数据中心)

我国婴幼儿奶粉市场一直保持着相当大的规模&#xff0c;虽然近几年新生人口数量不断下降&#xff0c;但伴随消费者的消费升级不断加速、大龄孩童吃奶粉的时间延长等&#xff0c;整体来看&#xff0c;婴幼儿奶粉行业市场规模保持平稳。 根据鲸参谋电商数据分析平台的相关数据显示…

centos升级龙蜥

centos升级龙蜥 龙蜥简介龙蜥官方社区centos升级龙蜥首先确认自己的centos版本下载迁移镜像源安装epel源迁移工具安装i686包查看执行迁移脚本结果查看重启机器查看系统信息 龙蜥简介 2021年10月19日的大会上&#xff0c;阿里云发布全新操作系统“龙蜥”并宣布开源。龙蜥操作系…

【实战篇】docker-compose部署go项目

一、场景&#xff1a; 二、需求 三、实操 Stage 1&#xff1a;GoLand 中 build 生成二进制文件 Stage 2&#xff1a;编写 Dockerfile Stage 3&#xff1a;编写 docker-compose.yaml Stage 4&#xff1a;文件上传到 ubuntu 服务器上&#xff0c;并设置文件读写权限 Stage…

写给后端开发的『vue3』请求后端接口

本文分享一下在vue3前端项目中请求后端接口获取数据。比较简单&#xff0c;内容如下&#xff1a; 1、使用axios请求后端接口 首先npm install axios&#xff0c;添加axios依赖&#xff0c;就靠它来请求后端接口了&#xff0c;基本等同于使用jquery发ajax。 # src/main.js i…

ElasticSearch文档(document)在index上的增删改查

文章目录 一、document定义&#xff1a;二、单条增删改查1、创建索引&#xff1a;2、添加文档&#xff1a;3、获取文档&#xff1a;4、更新文档&#xff1a;5、删除文档&#xff1a; 三、批量增删改查&#xff1a;1、批量添加文档&#xff1a;2、批量更新文档&#xff1a;3、批…

将 QtPropertyBrowser 加入到 vs2015 qt版中工程中

我是直接将QtPropertyBrowser的所有文件当子文件夹全部加入到工程里的 加完之后&#xff0c;ok&#xff0c;编译错误 出现一堆以下错误&#xff0c;我只拿出一条 moc_qtbuttonpropertybrowser.cpp(94): error C2027: 使用了未定义类型“QtButtonPropertyBrowserPrivate” 然…

grpc中间件之链路追踪(otel+jaeger)

参考文档 https://github.com/grpc-ecosystem/go-grpc-middleware/blob/main/examples/client/main.go https://github.com/grpc-ecosystem/go-grpc-middleware/blob/main/examples/server/main.go https://github.com/open-telemetry/opentelemetry-go/blob/main/example/jaeg…

学会项目成本管理计算,PMP计算题就是送分题

学会项目成本管理计算&#xff0c;PMP计算题就是送分题 PMP中的计算主要在 <项目成本管理> 的控制成本部分&#xff0c;服务于挣值管理&#xff08;EVM&#xff0c;Earned Value Management&#xff09;、挣值分析&#xff08;EVA&#xff0c;Earned Value Analysis&…

基于SSM的校园二手交易平台

零、源码获取&#xff1a; 链接点击直达&#xff1a;下载链接 一、设计概要 本次设计的是一个校园二手交易平台&#xff08;C2C&#xff09;&#xff0c;C2C指个人与个人之间的电子商务&#xff0c;买家可以查看所有卖家发布的商品&#xff0c;并且根据分类进行商品过滤&…

微信怎么自动加好友,通过好友后自动打招呼

很多客户朋友每天花大量的时间用手机搜索添加好友&#xff0c;这样的添加很集中也容易频繁&#xff0c;而且效率还低。对方通过后&#xff0c;有时也不能及时和客户搭建链接&#xff0c;导致客户也流失了。 现在可以实现自动添加和自动打招呼哦&#xff0c;只需要导入数据、设置…

SpringCloud(三)LoadBalancer负载均衡

一、负载均衡 实际上&#xff0c;在添加LoadBalanced注解之后&#xff0c;会启用拦截器对我们发起的服务调用请求进行拦截&#xff08;注意这里是针对我们发起的请求进行拦截&#xff09;&#xff0c;叫做LoadBalancerInterceptor&#xff0c;它实现ClientHttpRequestIntercep…

店铺记账用什么软件好?应该如何选购?

店铺记账过程中&#xff0c;会遇到各种问题&#xff1a;手写记账容易出错、效率低下、数据容易丢失&#xff1b;手动整理数据导致实际库存和账面库存不匹配&#xff0c;影响补货和订单管理。 而借助专业的店铺记账软件&#xff0c;可以有效解决上面这些问题&#xff0c;通过自动…

redis穿透问题

1.概述 一个热点数据在高并发情况下过期时间到了&#xff0c;会导致大量流量查询redis为null&#xff0c;进而请求数据库进行更新数据&#xff0c;从流量上来说请求打到了数据库上&#xff0c;这种情况可能会造成mysql服务崩溃。 2. 解决方式之一&#xff08;加锁解决之本地锁&…