【分布式】tensorflow 1 分布式代码实战与说明;单个节点上运行 2 个分布式worker工作线程

tensorflow.python.framework.errors_impl.UnknowError: Could not start
gRPC server

1. tf分布式

一台电脑=服务器=server是一个节点,包含了多个GPU。首先分布式的方式就是让多台电脑上的gpu共同干活。

分布式工作分为两个部分,parameter server(ps) 以及worker。眼熟ps与worker,因为这个是工作,每个server,都得干活,所以只能是从这两个工作里面选择。ps的工作类似于存储参数,而损失的计算,梯度的决定都是有worker进行的。这个对代码的影响就是,ps节点其实完全可以由cpu来做。worker必须由gpu做。

整体结构如图:一共四个sever,每个sever假设包含4个GPU,下图一共16个GPU。两个server工作是ps,两个sever的工作是worker,这个name其实没有在代码中配置,所以不用理会。server同做一个工作,也需要区分的,所以又引入了task,并且有task id。这里只是演示一下job(ps,worker)和server(节点)的关系。
在这里插入图片描述

2. 代码

代码的讲解是踩点来的。就是怎么用代码互相交流。

从理论上看,我们需要一些节点,并且给他们分配工作。

所以做一下程序入口接受参数(节点都是是谁,给什么工作了),我比较喜欢接收参数,不喜欢在代码里面写死。因为flags是tf基础,不想解释增加长度。

每个节点都得被单独通知,并且单独运行,这意味着如果你有一个ps,两个worker(一般用一个ps即可),你得在bash命令里:

python train.py --ps都谁(ps_hosts) --worker都谁(woker_hosts) --我被分配干啥(job_name) --我是第几个干这活的(task_index)

python train.py --ps都谁(ps_hosts) --worker都谁(woker_hosts) --我被分配干啥(job_name) --我是第几个干这活的(task_index)

python train.py --ps都谁(ps_hosts) --worker都谁(woker_hosts) --我被分配干啥(job_name) --我是第几个干这活的(task_index)

就是输入三次,跑三次,同时。ps和worker都会等你输完了在一起工作,毕竟要等同伴。

3. 示例

https://gist.github.com/yaroslavvb/1124bb02a9fd4abce3d86caf2f950cb2

"""Benchmark tensorflow distributed by adding vector of ones on worker2
to variable on worker1 as fast as possible.
On 2014 macbook, TensorFlow 0.10 this shows
Local rate:       2175.28 MB per second
Distributed rate: 107.13 MB per second
"""

import subprocess
import tensorflow as tf
import time
import sys

flags = tf.flags
flags.DEFINE_integer("iters", 10, "Maximum number of additions")
flags.DEFINE_integer("data_mb", 100, "size of vector in MBs")
flags.DEFINE_string("port1", "12222", "port of worker1")
flags.DEFINE_string("port2", "12223", "port of worker2")
flags.DEFINE_string("task", "", "internal use")
FLAGS = flags.FLAGS

# setup local cluster from flags
host = "127.0.0.1:"
cluster = {"worker": [host+FLAGS.port1, host+FLAGS.port2]}
clusterspec = tf.train.ClusterSpec(cluster).as_cluster_def()

def default_config():
  optimizer_options = tf.OptimizerOptions(opt_level=tf.OptimizerOptions.L0)
  config = tf.ConfigProto(
    graph_options=tf.GraphOptions(optimizer_options=optimizer_options))
  config.log_device_placement = False
  config.allow_soft_placement = False
  return config

def create_graph(device1, device2):
  """Create graph that keeps variable on device1 and
  vector of ones/addition op on device2"""
  
  tf.reset_default_graph()
  dtype=tf.int32
  params_size = 250*1000*FLAGS.data_mb # 1MB is 250k integers

  with tf.device(device1):
    params = tf.get_variable("params", [params_size], dtype,
                             initializer=tf.zeros_initializer)
  with tf.device(device2):
    # constant node gets placed on device1 because of simple_placer
    #    update = tf.constant(1, shape=[params_size], dtype=dtype)
    update = tf.get_variable("update", [params_size], dtype,
                             initializer=tf.ones_initializer)
    add_op = params.assign_add(update)
    
  init_op = tf.initialize_all_variables()
  return init_op, add_op

def run_benchmark(sess, init_op, add_op):
  """Returns MB/s rate of addition."""
  
  sess.run(init_op)
  sess.run(add_op.op)  # warm-up
  start_time = time.time()
  for i in range(FLAGS.iters):
    # change to add_op.op to make faster
    sess.run(add_op)
  elapsed_time = time.time() - start_time
  return float(FLAGS.iters)*FLAGS.data_mb/elapsed_time


def run_benchmark_local():
  ops = create_graph(None, None)
  sess = tf.Session(config=default_config())
  return run_benchmark(sess, *ops)


def run_benchmark_distributed():
  ops = create_graph("/job:worker/task:0", "/job:worker/task:1")

  # launch distributed service
  def runcmd(cmd): subprocess.Popen(cmd, shell=True, stderr=subprocess.STDOUT)
  runcmd("python %s --task=0"%(sys.argv[0]))
  runcmd("python %s --task=1"%(sys.argv[0]))
  time.sleep(1)

  sess = tf.Session("grpc://"+host+FLAGS.port1, config=default_config())
  return run_benchmark(sess, *ops)
  
if __name__=='__main__':
  if not FLAGS.task:

    rate1 = run_benchmark_local()
    rate2 = run_benchmark_distributed()

    print("Adding data in %d MB chunks" %(FLAGS.data_mb))
    print("Local rate:       %.2f MB per second" %(rate1,))
    print("Distributed rate: %.2f MB per second" %(rate2,))

  else: # Launch TensorFlow server
    server = tf.train.Server(clusterspec, config=default_config(),
                             job_name="worker",
                             task_index=int(FLAGS.task))
    server.join()

4. 单个节点上运行 2 个分布式worker工作线程

https://stackoverflow.com/questions/40877246/distributed-tensorflow-not-working-with-simple-example

https://github.com/ischlag/distributed-tensorflow-example/blob/master/example.py

'''
Distributed Tensorflow 1.2.0 example of using data parallelism and share model parameters.
Trains a simple sigmoid neural network on mnist for 20 epochs on three machines using one parameter server. 

Change the hardcoded host urls below with your own hosts. 
Run like this: 

pc-01$ python example.py --job_name="ps" --task_index=0 
pc-02$ python example.py --job_name="worker" --task_index=0 
pc-03$ python example.py --job_name="worker" --task_index=1 
pc-04$ python example.py --job_name="worker" --task_index=2 

More details here: ischlag.github.io
'''

from __future__ import print_function

import tensorflow as tf
import sys
import time

# cluster specification
parameter_servers = ["pc-01:2222"]
workers = [	"pc-02:2222", 
			"pc-03:2222",
			"pc-04:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})

# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

# start a server for a specific task
server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,
    task_index=FLAGS.task_index)

# config
batch_size = 100
learning_rate = 0.0005
training_epochs = 20
logs_path = "/tmp/mnist/1"

# load mnist data set
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)

if FLAGS.job_name == "ps":
    server.join()
elif FLAGS.job_name == "worker":

	# Between-graph replication
	with tf.device(tf.train.replica_device_setter(
		worker_device="/job:worker/task:%d" % FLAGS.task_index,
		cluster=cluster)):

		# count the number of updates
		global_step = tf.get_variable(
            'global_step',
            [],
            initializer = tf.constant_initializer(0),
			trainable = False)

		# input images
		with tf.name_scope('input'):
		  # None -> batch size can be any size, 784 -> flattened mnist image
		  x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
		  # target 10 output classes
		  y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")

		# model parameters will change during training so we use tf.Variable
		tf.set_random_seed(1)
		with tf.name_scope("weights"):
			W1 = tf.Variable(tf.random_normal([784, 100]))
			W2 = tf.Variable(tf.random_normal([100, 10]))

		# bias
		with tf.name_scope("biases"):
			b1 = tf.Variable(tf.zeros([100]))
			b2 = tf.Variable(tf.zeros([10]))

		# implement model
		with tf.name_scope("softmax"):
			# y is our prediction
			z2 = tf.add(tf.matmul(x,W1),b1)
			a2 = tf.nn.sigmoid(z2)
			z3 = tf.add(tf.matmul(a2,W2),b2)
			y  = tf.nn.softmax(z3)

		# specify cost function
		with tf.name_scope('cross_entropy'):
			# this is our cost
			cross_entropy = tf.reduce_mean(
                -tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))

		# specify optimizer
		with tf.name_scope('train'):
			# optimizer is an "operation" which we can execute in a session
			grad_op = tf.train.GradientDescentOptimizer(learning_rate)
			'''
			rep_op = tf.train.SyncReplicasOptimizer(
                grad_op,
			    replicas_to_aggregate=len(workers),
 				replica_id=FLAGS.task_index, 
 			    total_num_replicas=len(workers),
 				use_locking=True)
 			train_op = rep_op.minimize(cross_entropy, global_step=global_step)
 			'''
			train_op = grad_op.minimize(cross_entropy, global_step=global_step)
			
		'''
		init_token_op = rep_op.get_init_tokens_op()
		chief_queue_runner = rep_op.get_chief_queue_runner()
		'''

		with tf.name_scope('Accuracy'):
			# accuracy
			correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
			accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

		# create a summary for our cost and accuracy
		tf.summary.scalar("cost", cross_entropy)
		tf.summary.scalar("accuracy", accuracy)

		# merge all summaries into a single "operation" which we can execute in a session 
		summary_op = tf.summary.merge_all()
		init_op = tf.global_variables_initializer()
		print("Variables initialized ...")

	sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
														global_step=global_step,
														init_op=init_op)

	begin_time = time.time()
	frequency = 100
	with sv.prepare_or_wait_for_session(server.target) as sess:
		'''
		# is chief
		if FLAGS.task_index == 0:
			sv.start_queue_runners(sess, [chief_queue_runner])
			sess.run(init_token_op)
		'''
		# create log writer object (this will log on every machine)
		writer = tf.summary.FileWriter(logs_path, graph=tf.get_default_graph())
				
		# perform training cycles
		start_time = time.time()
		for epoch in range(training_epochs):

			# number of batches in one epoch
			batch_count = int(mnist.train.num_examples/batch_size)

			count = 0
			for i in range(batch_count):
				batch_x, batch_y = mnist.train.next_batch(batch_size)
				
				# perform the operations we defined earlier on batch
				_, cost, summary, step = sess.run(
												[train_op, cross_entropy, summary_op, global_step], 
												feed_dict={x: batch_x, y_: batch_y})
				writer.add_summary(summary, step)

				count += 1
				if count % frequency == 0 or i+1 == batch_count:
					elapsed_time = time.time() - start_time
					start_time = time.time()
					print("Step: %d," % (step+1), 
								" Epoch: %2d," % (epoch+1), 
								" Batch: %3d of %3d," % (i+1, batch_count), 
								" Cost: %.4f," % cost, 
								" AvgTime: %3.2fms" % float(elapsed_time*1000/frequency))
					count = 0


		print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
		print("Total Time: %3.2fs" % float(time.time() - begin_time))
		print("Final Cost: %.4f" % cost)

	sv.stop()
	print("done")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/132893.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

RetroMAE论文阅读

1. Introduction 在NLP常用的预训练模型通常是由token级别的任务进行训练的,如MLM和Seq2Seq,但是密集检索任务更倾向于句子级别的表示,需要捕捉句子的信息和之间的关系,一般主流的策略是自对比学习(self-contrastive …

人工智能基础——Python:Pillow与图像处理

人工智能的学习之路非常漫长,不少人因为学习路线不对或者学习内容不够专业而举步难行。不过别担心,我为大家整理了一份600多G的学习资源,基本上涵盖了人工智能学习的所有内容。点击下方链接,0元进群领取学习资源,让你的学习之路更加顺畅!记得…

vscode 终端进程启动失败: shell 可执行文件“C:\Windows\System32\WindowsPower

vscode 终端进程启动失败: shell 可执行文件“C:\Windows\System32\WindowsPower 第一次用vscode,然后遇到这个问题,在设置里搜索 terminal.integrated.defaultProfile.windows 将这里的null改成"Command Prompt" 重启就可以了

pyqt环境搭建

创建虚拟环境 # 用管理员身份运行 conda create --prefixE:\Python\envs\pyqt5stu python3.6 # 激活虚拟环境 conda activate E:\Python\envs\pyqt5stu # 退出虚拟环境 conda deactivate安装包 pip install PyQt5 -i https://pypi.douban.com/simple pip install PyQt5-tools…

水果音乐编曲软件 FL Studio v21.1.1.3750 中文免费破解版下载(附中文设置教程)

FL studio21中文别名水果编曲软件,是一款全能的音乐制作软件,包括编曲、录音、剪辑和混音等诸多功能,让你的电脑编程一个全能的录音室,它为您提供了一个集成的开发环境,使用起来非常简单有效,您的工作会变得…

Snipaste截图工具--------开机后自启动设置

1,找到安装Snipaste的目录,创建快捷方式 2,按winR打开运行框(输入shell:startup) 3,将刚才创建的快捷方式拖入此文件夹

58基于matlab的采样的运动规划算法-RRT(Rapidly-exploring Random Trees)

基于matlab的采样的运动规划算法-RRT(Rapidly-exploring Random Trees),3D和2D,原始的RRT算法中将搜索的起点位置作为根节点,然后通过随机采样增加叶子节点的方式,生成一个随机扩展树,当随机树的叶子节点进入目标区域,…

数据结构与算法—搞懂队列

csdn专栏:数据结构与算法 前言 栈和队列是一对紧密相关的数据结构。之前已经介绍过栈(它遵循后进先出的原则),栈的机制相对简单,就像你进入一个狭窄的山洞,山洞只有一个出入口,因此你只能按照后…

Shopee的折扣活动怎么分类?shopee设置折扣注意事项

旺季到来,Shopee会举办一些折扣活动来吸引客户,那么shopee的折扣活动怎么分类,shopee设置折扣注意事项? shopee的折扣活动怎么分类? 满减活动:满减活动是虾皮常见的一种折扣形式。在这种活动中&#xff0…

Citespace的使用

CiteSpace CiteSpace的相关介绍运行CiteSpace CiteSpace的相关介绍 CiteSpace作为一款优秀的文献计量学软件,能够将文献之间的关系以科学知识图谱的方式可视化地展现在我们面前。简单来说,面对海量的文献,CiteSpace能够迅速锁定自己需要关注…

数据结构与算法C语言版学习笔记(6)-树、二叉树、赫夫曼树

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、树的定义1.结点的度、树的度2.结点的逻辑关系3.树的深度4.有序树和无序树5.森林 二、树的存储结构(1)双亲表示法(2&…

Android 使用.9图 NinePatchDrawable实现动态聊天气泡

最近一段时间,在做一个需求,需要实现一个聊天气泡的动画效果,如下图所示: GitHub源码demo ,建议下载demo,运行查看。 动态聊天气泡动画 静态聊天气泡 经过一段时间调研,实现方案如下: 实现方…

FM3793A-高性能PWM控制芯片 超低成本18W-20W 恒功率PD快充

产品描述: FM3793A是一款应用于离线反激式转换器中的高性能电流模式PWM控制器。在 FM3793A中,PWM开关频率最大为65KHz。在轻载和空载条件下,该FM3793A启动间歇模式从而降低开关频率。FM3793A具有丰富的芯片异常状况保护功能,如欠压…

力扣:160. 相交链表(Python3)

题目: 给你两个单链表的头节点 headA 和 headB ,请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点,返回 null 。 图示两个链表在节点 c1 开始相交: 题目数据 保证 整个链式结构中不存在环。 注意,…

基于insightface实现的人脸检测,人脸识别,insightface源码讲解。

目录 1.搭建insightface需要的环境 2.下载insightface工程 3.代码工程文件讲解 3.1 python-package 3.2 进行测试 3.3 examples 4. 人脸识别 5.代码理解: 1.搭建insightface需要的环境 埋个坑,后续再写,笔者在安装过程中遇到了一些问题。…

人工智能基础——Python:运行效率与时间复杂度

人工智能的学习之路非常漫长,不少人因为学习路线不对或者学习内容不够专业而举步难行。不过别担心,我为大家整理了一份600多G的学习资源,基本上涵盖了人工智能学习的所有内容。点击下方链接,0元进群领取学习资源,让你的学习之路更加顺畅!记得…

浅析CC中的点云配准为什么效果好于PCL?

公众号致力于分享点云处理,SLAM,三维视觉,高精地图相关的文章与技术,欢迎各位加入我们,一起交流一起进步,有兴趣的可联系微信:cloudpoint9527。本文来自点云PCL博主的分享,未经作者允许请勿转载…

最新大麦订单生成器 大麦订单图一键生成

1、8.6全新版 本次更新了四种订单模板生成 多模板自由切换 2、在软件中输入生成的信息,这里输入的是商品信息,选择生成的商品图片,最后生成即可 新版大麦订单生成 四种模板图样式展示 这个样式图就是在大麦生成完的一个订单截图&#xff…

大数据毕业设计选题推荐-生产大数据平台-Hadoop-Spark-Hive

✨作者主页:IT毕设梦工厂✨ 个人简介:曾从事计算机专业培训教学,擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

内网安全-基础设施构建-cobaltstrike远控工具beacon使用

kali在CS文件目录下,打开终端,运行命令: /teamserver 192.168.77.128 123456 在windows中双击bat文件: 填写图下信息: 双击运行,CS上线 自查方法:1、kali与物理机可互通 2、物理机与windows10跳板…