# 学习笔记TF040:多GPU并行

2017-08-12 20:12:16来源:cnblogs.com作者:利炳根人点击

TensorFlow并行，模型并行，数据并行。模型并行根据不同模型设计不同并行方式，模型不同计算节点放在不同硬伯上资源运算。数据并行，比较通用简便实现大规模并行方式，同时使用多个硬件资源计算不同batch数据梯度，汇总梯度全局参数更新。

loss从最开始4点几，到第70万步，降到0.07。平均每个batch耗时0.021s，平均每秒训练6000个样本，单GPU 4倍。

`    import os.path    import re    import time    import numpy as np    import tensorflow as tf    import cifar10    batch_size=128    #train_dir='/tmp/cifar10_train'    max_steps=1000000    num_gpus=4    #log_device_placement=False    def tower_loss(scope):      """Calculate the total loss on a single tower running the CIFAR model.      Args:        scope: unique prefix string identifying the CIFAR tower, e.g. 'tower_0'      Returns:         Tensor of shape [] containing the total loss for a batch of data      """      # Get images and labels for CIFAR-10.      images, labels = cifar10.distorted_inputs()      # Build inference Graph.      logits = cifar10.inference(images)      # Build the portion of the Graph calculating the losses. Note that we will      # assemble the total_loss using a custom function below.      _ = cifar10.loss(logits, labels)      # Assemble all of the losses for the current tower only.      losses = tf.get_collection('losses', scope)      # Calculate the total loss for the current tower.      total_loss = tf.add_n(losses, name='total_loss')      # Compute the moving average of all individual losses and the total loss.      # loss_averages = tf.train.ExponentialMovingAverage(0.9, name='avg')      # loss_averages_op = loss_averages.apply(losses + [total_loss])      # Attach a scalar summary to all individual losses and the total loss; do the      # same for the averaged version of the losses.      # for l in losses + [total_loss]:        # Remove 'tower_[0-9]/' from the name in case this is a multi-GPU training        # session. This helps the clarity of presentation on tensorboard.        # loss_name = re.sub('%s_[0-9]*/' % cifar10.TOWER_NAME, '', l.op.name)        # Name each loss as '(raw)' and name the moving average version of the loss        # as the original loss name.        # tf.scalar_summary(loss_name +' (raw)', l)        # tf.scalar_summary(loss_name, loss_averages.average(l))        # with tf.control_dependencies([loss_averages_op]):        # total_loss = tf.identity(total_loss)      return total_loss    def average_gradients(tower_grads):      """Calculate the average gradient for each shared variable across all towers.      Note that this function provides a synchronization point across all towers.      Args:        tower_grads: List of lists of (gradient, variable) tuples. The outer list          is over individual gradients. The inner list is over the gradient          calculation for each tower.      Returns:         List of pairs of (gradient, variable) where the gradient has been averaged         across all towers.      """      average_grads = []      for grad_and_vars in zip(*tower_grads):        # Note that each grad_and_vars looks like the following:        #   ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))        grads = []        for g, _ in grad_and_vars:          # Add 0 dimension to the gradients to represent the tower.          expanded_g = tf.expand_dims(g, 0)          # Append on a 'tower' dimension which we will average over below.          grads.append(expanded_g)        # Average over the 'tower' dimension.        grad = tf.concat(grads, 0)        grad = tf.reduce_mean(grad, 0)        # Keep in mind that the Variables are redundant because they are shared        # across towers. So .. we will just return the first tower's pointer to        # the Variable.        v = grad_and_vars[0][1]        grad_and_var = (grad, v)        average_grads.append(grad_and_var)      return average_grads    def train():      """Train CIFAR-10 for a number of steps."""      with tf.Graph().as_default(), tf.device('/cpu:0'):        # Create a variable to count the number of train() calls. This equals the        # number of batches processed * FLAGS.num_gpus.        global_step = tf.get_variable(            'global_step', [],            initializer=tf.constant_initializer(0), trainable=False)        # Calculate the learning rate schedule.        num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /                             batch_size)        decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)        # Decay the learning rate exponentially based on the number of steps.        lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,                                        global_step,                                        decay_steps,                                        cifar10.LEARNING_RATE_DECAY_FACTOR,                                        staircase=True)        # Create an optimizer that performs gradient descent.        opt = tf.train.GradientDescentOptimizer(lr)        # Calculate the gradients for each model tower.        tower_grads = []        for i in range(num_gpus):          with tf.device('/gpu:%d' % i):            with tf.name_scope('%s_%d' % (cifar10.TOWER_NAME, i)) as scope:              # Calculate the loss for one tower of the CIFAR model. This function              # constructs the entire CIFAR model but shares the variables across              # all towers.              loss = tower_loss(scope)              # Reuse variables for the next tower.              tf.get_variable_scope().reuse_variables()              # Retain the summaries from the final tower.              # summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)              # Calculate the gradients for the batch of data on this CIFAR tower.              grads = opt.compute_gradients(loss)              # Keep track of the gradients across all towers.              tower_grads.append(grads)        # We must calculate the mean of each gradient. Note that this is the        # synchronization point across all towers.        grads = average_gradients(tower_grads)        # Add a summary to track the learning rate.        # summaries.append(tf.scalar_summary('learning_rate', lr))        # Add histograms for gradients.        # for grad, var in grads:        #     if grad is not None:        #         summaries.append(        #             tf.histogram_summary(var.op.name + '/gradients', grad))        # Apply the gradients to adjust the shared variables.        apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)        # Add histograms for trainable variables.        # for var in tf.trainable_variables():        #     summaries.append(tf.histogram_summary(var.op.name, var))        # Track the moving averages of all trainable variables.        # variable_averages = tf.train.ExponentialMovingAverage(        #     cifar10.MOVING_AVERAGE_DECAY, global_step)        # variables_averages_op = variable_averages.apply(tf.trainable_variables())        # Group all updates to into a single train op.        # train_op = tf.group(apply_gradient_op, variables_averages_op)        # Create a saver.        saver = tf.train.Saver(tf.all_variables())        # Build the summary operation from the last tower summaries.        # summary_op = tf.merge_summary(summaries)        # Build an initialization operation to run below.        init = tf.global_variables_initializer()        # Start running operations on the Graph. allow_soft_placement must be set to        # True to build towers on GPU, as some of the ops do not have GPU        # implementations.        sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True))        sess.run(init)        # Start the queue runners.        tf.train.start_queue_runners(sess=sess)        # summary_writer = tf.train.SummaryWriter(train_dir, sess.graph)        for step in range(max_steps):          start_time = time.time()          _, loss_value = sess.run([apply_gradient_op, loss])          duration = time.time() - start_time          assert not np.isnan(loss_value), 'Model diverged with loss = NaN'          if step % 10 == 0:            num_examples_per_step = batch_size * num_gpus            examples_per_sec = num_examples_per_step / duration            sec_per_batch = duration / num_gpus            format_str = ('step %d, loss = %.2f (%.1f examples/sec; %.3f '                          'sec/batch)')            print (format_str % (step, loss_value,                                 examples_per_sec, sec_per_batch))            # if step % 100 == 0:            #     summary_str = sess.run(summary_op)            #     summary_writer.add_summary(summary_str, step)          # Save the model checkpoint periodically.          if step % 1000 == 0 or (step + 1) == max_steps:          # checkpoint_path = os.path.join(train_dir, 'model.ckpt')            saver.save(sess, '/tmp/cifar10_train/model.ckpt', global_step=step)    cifar10.maybe_download_and_extract()    #if tf.gfile.Exists(train_dir):    #  tf.gfile.DeleteRecursively(train_dir)    #tf.gfile.MakeDirs(train_dir)    train()`

《TensorFlow实战》