- 背景
Reduce是几乎所有多线程技术的基础和关键,同样也是诸如深度学习等领域的核心,简单如卷积运算,复杂如梯度聚合、分布式训练等,了解CUDA实现reduce,以及优化reduce是理解CUDA软硬件连接点的很好切入点。
硬件环境:
- 结果展示
chapter2 reduce test
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 0: 8389334.000, 4.3356 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 1: 8389335.000, 1.3475 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 2: 8389335.000, 1.3096 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 3: 8389335.000, 1.3098 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 4: 8389335.000, 1.3093 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 5: 8389335.000, 1.3119 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 6: 8389335.000, 1.3132 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 7: 8389335.000, 1.3157 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, L1 v7: 8389335.000, 1.3086 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, L1 Co: 8389335.000, 2.6103 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, L any: 8389335.000, 1.6096 ms
sum of 16777216 random nums, host: 8389334.731, 18.6554 ms, GPU 8: 8389335.000, 1.3160 ms
- 源码
#include"../include/utils/cx.h"
#include"../include/utils/cxtimers.h"
#include"cooperative_groups.h" // for cg namespace
#include"cooperative_groups/reduce.h" // for cg::reduce
#include"../include/utils/helper_math.h" // for overload += operator for reinterpret (CYDA SDK)
#include<random>
namespace cg = cooperative_groups;
__global__ void reduce0(float* x, int n) {
int tid = blockDim.x * blockIdx.x + threadIdx.x;
x[tid] += x[tid+n];
}
__global__ void reduce1(float *x,int N)
{
int tid = blockDim.x*blockIdx.x+threadIdx.x;
float tsum = 0.0f;
int stride = gridDim.x*blockDim.x;
for(int k=tid; k<N; k += stride) tsum += x[k];
x[tid] = tsum;
}
__global__ void reduce2(float *y,float *x,int N)
{
extern __shared__ float tsum[]; // Dynamically Allocated Shared Mem
int id = threadIdx.x;
int tid = blockDim.x*blockIdx.x+threadIdx.x;
int stride = gridDim.x*blockDim.x;
tsum[id] = 0.0f;
for(int k=tid;k<N;k+=stride) tsum[id] += x[k];
__syncthreads();
for(int k=blockDim.x/2; k>0; k /= 2){ // power of 2 reduction loop
if(id<k) tsum[id] += tsum[id+k];
__syncthreads();
}
if(id==0) y[blockIdx.x] = tsum[0]; // store one value per block
}
__global__ void reduce3(float *y,float *x,int N)
{
extern __shared__ float tsum[];
int id = threadIdx.x;
int tid = blockDim.x*blockIdx.x+threadIdx.x;
int stride = gridDim.x*blockDim.x;
tsum[id] = 0.0f;
for(int k=tid;k<N;k+=stride) tsum[id] += x[k];
__syncthreads();
int block2 = cx::pow2ceil(blockDim.x); // next higher power of 2
for(int k=block2/2; k>0; k >>= 1){ // power of 2 reduction loop
if(id<k && id+k < blockDim.x) tsum[id] += tsum[id+k];
__syncthreads();
}
if(id==0) y[blockIdx.x] = tsum[0]; // store one value per block
}
__global__ void reduce4(float * y,float * x,int N)
{
extern __shared__ float tsum[];
int id = threadIdx.x;
int tid = blockDim.x*blockIdx.x+threadIdx.x;
int stride = gridDim.x*blockDim.x;
tsum[id] = 0.0f;
for(int k=tid;k<N;k+=stride) tsum[id] += x[k];
__syncthreads();
if(id<256 && id+256 < blockDim.x) tsum[id] += tsum[id+256]; __syncthreads();
if(id<128) tsum[id] += tsum[id+128]; __syncthreads();
if(id< 64) tsum[id] += tsum[id+ 64]; __syncthreads();
if(id< 32) tsum[id] += tsum[id+ 32]; __syncthreads();
// warp 0 only from here
if(id< 16) tsum[id] += tsum[id+16]; __syncwarp();
if(id< 8) tsum[id] += tsum[id+ 8]; __syncwarp();
if(id< 4) tsum[id] += tsum[id+ 4]; __syncwarp();
if(id< 2) tsum[id] += tsum[id+ 2]; __syncwarp();
if(id==0) y[blockIdx.x] = tsum[0]+tsum[1];
}
template <int blockSize>
__global__ void reduce5(r_Ptr<float> sums,cr_Ptr<float> data,int n)
{
// This template kernel assumes that blockDim.x = blockSize,
// that blockSize is power of 2 between 64 and 1024
__shared__ float s[blockSize];
int id = threadIdx.x; // rank in block
s[id] = 0;
for(int tid = blockSize*blockIdx.x+threadIdx.x;
tid < n; tid += blockSize*gridDim.x) s[id] += data[tid];
__syncthreads();
if(blockSize > 512 && id < 512 && id+512 < blockSize)s[id] += s[id+512];
__syncthreads();
if(blockSize > 256 && id < 256 && id+256 < blockSize)s[id] += s[id+256];
__syncthreads();
if(blockSize > 128 && id < 128 && id+128 < blockSize)s[id] += s[id+128];
__syncthreads();
if(blockSize > 64 && id < 64 && id+ 64 < blockSize)s[id] += s[id+64];
__syncthreads();
if(id < 32) { // single warp from here
s[id] += s[id + 32]; __syncwarp(); // the syncwarps
if(id < 16) s[id] += s[id + 16]; __syncwarp(); // are required
if(id < 8) s[id] += s[id + 8]; __syncwarp(); // for devices of
if(id < 4) s[id] += s[id + 4]; __syncwarp(); // CC = 7 and above
if(id < 2) s[id] += s[id + 2]; __syncwarp(); // for CC < 7
if(id < 1) s[id] += s[id + 1]; __syncwarp(); // they do nothing
if(id == 0) sums[blockIdx.x] = s[0]; // store block sum
}
}
// using warp_shrl functions
template<int blockSize>
__global__ void reduce6(r_Ptr<float> sums, cr_Ptr<float> data, int n) {
__shared__ float s[blockSize];
auto grid = cg::this_grid();
auto block = cg::this_thread_block();
auto warp = cg::tiled_partition<32>(block);
int id = block.thread_rank();
s[id] = 0.0f;
for (int tid = grid.thread_rank(); tid < n; tid += grid.size()) {
s[id] += data[tid];
}
block.sync();
if (blockSize > 512 && id < 512 && id + 512 < blockSize) {
s[id] += s[id + 512];
}
block.sync();
if (blockSize > 256 && id < 256 && id + 256 < blockSize) {
s[id] += s[id+256];
}
block.sync();
if (blockSize > 128 && id < 128 && id + 128 < blockSize) {
s[id] += s[id+128];
}
block.sync();
if (blockSize > 64 && id < 64 && id + 64 < blockSize) {
s[id] += s[id+64];
}
block.sync();
if (warp.meta_group_rank() == 0) {
s[id] += s[id+32];warp.sync();
s[id] += warp.shfl_down(s[id], 16);
s[id] += warp.shfl_down(s[id], 8);
s[id] += warp.shfl_down(s[id], 4);
s[id] += warp.shfl_down(s[id], 2);
s[id] += warp.shfl_down(s[id], 1);
if (id == 0){
sums[blockIdx.x] = s[0];
}
}
}
// warp-only reduce function
__global__ void reduce7(r_Ptr<float> sums,cr_Ptr<float> data,int n)
{
// This kernel assumes the array sums is set to zeros on entry
// also blockSize is multiple of 32 (should always be true)
auto grid = cg::this_grid();
auto block = cg::this_thread_block();
auto warp = cg::tiled_partition<32>(block);
float v = 0.0f; // accumulate thread sums in register variable v
for(int tid=grid.thread_rank(); tid<n; tid+=grid.size()) v += data[tid];
warp.sync();
v += warp.shfl_down(v,16); // |
v += warp.shfl_down(v,8); // | warp level
v += warp.shfl_down(v,4); // | reduce here
v += warp.shfl_down(v,2); // |
v += warp.shfl_down(v,1); // |
// use atomicAdd here to sum over warps
if(warp.thread_rank()==0) atomicAdd(&sums[block.group_index().x],v);
}
// warp-only and L1 cache function
__global__ void reduce7_L1(r_Ptr<float> sums, cr_Ptr<float> data, int n) {
auto grid = cg::this_grid();
auto block = cg::this_thread_block();
auto warp = cg::tiled_partition<32>(block);
float4 v4 = {0.0f, 0.0f, 0.0f, 0.0f};
for(int tid = grid.thread_rank(); tid < n/4; tid += grid.size()) {
v4 += reinterpret_cast<const float4 *>(data)[tid];
}
float v = v4.x + v4.y + v4.z + v4.w;
warp.sync();
v += warp.shfl_down(v, 16);
v += warp.shfl_down(v, 8);
v += warp.shfl_down(v, 4);
v += warp.shfl_down(v, 2);
v += warp.shfl_down(v, 1);
if (warp.thread_rank() == 0){
atomicAdd(&sums[block.group_index().x], v);
}
}
__device__ void reduce7_L1_coal(r_Ptr<float>sums,cr_Ptr<float>data,int n)
{
// This function assumes that a.size() is power of 2 in [1,32]
// and that n is a multiple of 4
auto g = cg::this_grid();
auto b = cg::this_thread_block();
auto a = cg::coalesced_threads(); // active threads in warp
float4 v4 ={0,0,0,0};
for(int tid = g.thread_rank(); tid < n/4; tid += g.size())
v4 += reinterpret_cast<const float4 *>(data)[tid];
float v = v4.x + v4.y + v4.z + v4.w;
a.sync();
if(a.size() > 16) v += a.shfl_down(v,16); // NB no new
if(a.size() > 8) v += a.shfl_down(v,8); // thread
if(a.size() > 4) v += a.shfl_down(v,4); // divergence
if(a.size() > 2) v += a.shfl_down(v,2); // allowed
if(a.size() > 1) v += a.shfl_down(v,1); // here // rank here is within coal group therefore
if(a.thread_rank() == 0) atomicAdd(&sums[b.group_index().x],v); // rank==0 OK even for odd only threads
}
__global__ void reduce7_coal_even_odd(r_Ptr<float>sumeven,r_Ptr<float>sumodd,cr_Ptr<float>data,int n)
{
// divergent code here
if(threadIdx.x%2==0) reduce7_L1_coal(sumeven,data,n);
else reduce7_L1_coal(sumodd,data,n);
}
// reduce L1 coal any
__device__ void reduce7_L1_coal_any(r_Ptr<float>sums,cr_Ptr<float>data,int n)
{
// This function works for any value of a.size() in [1,32]
// it assumes that n is a multiple of 4
auto g = cg::this_grid();
auto b = cg::this_thread_block();
auto w = cg::tiled_partition<32>(b); // whole warp
auto a = cg::coalesced_threads(); // active threads in warp
int warps = g.group_dim().x*w.meta_group_size(); // number of warps in grid
// divide data into contiguous parts, with one part per warp
int part_size = ((n/4)+warps-1)/warps;
int part_start = (b.group_index().x*w.meta_group_size() +
w.meta_group_rank())*part_size;
int part_end = min(part_start+part_size,n/4);
// get part sub-sums into threads of a
float4 v4 ={0,0,0,0};
int id = a.thread_rank();
for(int k=part_start+id; k<part_end; k+=a.size()) // adjacent adds within
v4 += reinterpret_cast<const float4 *>(data)[k]; // the warp
float v = v4.x + v4.y + v4.z + v4.w;
a.sync();
// now reduce over a
// first deal with items held by ranks >= kstart
int kstart = 1 << (31 - __clz(a.size())); // max power of 2 <= a.size()
if(a.size() > kstart) {
float w = a.shfl_down(v,kstart);
if(a.thread_rank() < a.size()-kstart) v += w;// only update v for
a.sync(); // valid low ranking threads
}
// then do power of 2 reduction
for(int k = kstart/2; k>0; k /= 2) v += a.shfl_down(v,k);
if(a.thread_rank() == 0) atomicAdd(&sums[b.group_index().x],v);
}
__global__ void reduce7_any(r_Ptr<float>sums,cr_Ptr<float>data,int n)
{
if(threadIdx.x % 3 == 0) reduce7_L1_coal_any(sums,data,n);
}
// cg warp-level function
__global__ void reduce8(r_Ptr<float> sums, cr_Ptr<float> data, int n) {
auto grid = cg::this_grid();
auto block = cg::this_thread_block();
auto warp = cg::tiled_partition<32>(block);
float v = 0.0f;
for(int tid = grid.thread_rank(); tid < n; tid += grid.size()) {
v += data[tid];
}
v = cg::reduce(warp, v, cg::plus<float>());
warp.sync();
if (warp.thread_rank() == 0) {
atomicAdd(&sums[block.group_index().x], v);
}
}
void test_reduce(const int N) {
// const int N = 1 << 24;
const int blocks = 256;
const int threads = 256;
const int nreps = 1000;
thrust::host_vector<float> x(N);
thrust::device_vector<float> dx(N);
// init data
std::default_random_engine gen(42);
std::uniform_real_distribution<float> fran(0.0, 1.0);
for (int k = 0; k < N; k++) {
x[k] = fran(gen);
}
// host to device
dx = x;
cx::timer tim;
// cpu time test
double host_sum = 0.0;
for (int k = 0; k < N; k++) {
host_sum += x[k];
}
double t1 = tim.lap_ms();
// gpu test reduce0
double gpu_sum = 0.0;
tim.reset();
for (int m = N/2; m > 0; m /= 2) {
int threads = std::min(256, m);
int blocks = std::max(m / 256, 1);
reduce0<<<blocks, threads>>> (dx.data().get(), m);
}
cudaDeviceSynchronize();
double t2 = tim.lap_ms();
// device to host
gpu_sum = dx[0];
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 0: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t2);
// gpu test reduce1
dx = x;
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce1<<<blocks, threads>>> (dx.data().get(), N);
reduce1<<<1, threads>>> (dx.data().get(), blocks * threads);
reduce1<<<1, 1>>> (dx.data().get(), threads);
if (rep == 0) gpu_sum = dx[0];
}
cudaDeviceSynchronize();
double t3 = tim.lap_ms() / nreps;
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 1: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t3);
// gpu test reduce2
dx = x;
thrust::device_vector<float> dy(blocks);
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce2<<<blocks, threads, threads * sizeof(float)>>> (dy.data().get(), dx.data().get(), N);
reduce2<<<1, threads, blocks * sizeof(float)>>> (dx.data().get(), dy.data().get(), blocks);
if (rep == 0) gpu_sum = dx[0];
}
cudaDeviceSynchronize();
double t4 = tim.lap_ms() / nreps;
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 2: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t4);
// gpu test reduce3
dx = x;
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce3<<<blocks, threads, threads * sizeof(float)>>> (dy.data().get(), dx.data().get(), N);
reduce3<<<1, threads, blocks * sizeof(float)>>> (dx.data().get(), dy.data().get(), blocks);
if (rep == 0) gpu_sum = dx[0];
}
cudaDeviceSynchronize();
double t5 = tim.lap_ms() / nreps;
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 3: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t5);
// gpu test reduce4
dx = x;
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce4<<<blocks, threads, threads * sizeof(float)>>> (dy.data().get(), dx.data().get(), N);
reduce4<<<1, threads, blocks * sizeof(float)>>> (dx.data().get(), dy.data().get(), blocks);
if (rep == 0) gpu_sum = dx[0];
}
cudaDeviceSynchronize();
double t6 = tim.lap_ms() / nreps;
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 4: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t6);
// gpu test reduce5
dx = x;
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce5<256><<<blocks,threads,threads*sizeof(float)>>>(dy.data().get(),dx.data().get(),N);
}
reduce4<<<1, blocks, blocks*sizeof(float)>>>(dx.data().get(), dy.data().get(), blocks);
cudaDeviceSynchronize();
double t7 = tim.lap_ms() / nreps;
gpu_sum = dx[0];
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 5: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t7);
// gpu tst reduce6
dx = x;
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce6<256><<<blocks, threads, threads*sizeof(float)>>>(dy.data().get(), dx.data().get(), N);
}
reduce4<<<1, blocks, blocks*sizeof(float)>>>(dx.data().get(), dy.data().get(), blocks);
cudaDeviceSynchronize();
double t8 = tim.lap_ms() / nreps;
gpu_sum = dx[0];
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 6: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t8);
// gpu test reduce7
dx = x;
thrust::device_vector<float> dz(blocks);
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce7<<<blocks, threads, threads*sizeof(float)>>>(dz.data().get(), dx.data().get(), N);
reduce7<<<1, blocks, blocks*sizeof(float)>>>(dx.data().get(), dz.data().get(), blocks);
if (rep == 0) gpu_sum = dx[0];
}
cudaDeviceSynchronize();
double t9 = tim.lap_ms() / nreps;
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 7: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t9);
// gpu test reduce7_L1
dx = x;
thrust::fill(dz.begin(), dz.end(), 0.0);
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce7_L1<<<blocks, threads, threads*sizeof(float)>>>(dz.data().get(), dx.data().get(), N);
reduce7_L1<<<1, blocks, blocks*sizeof(float)>>>(dx.data().get(), dz.data().get(), blocks);
if (rep == 0) gpu_sum = dx[0];
}
cudaDeviceSynchronize();
double t91 = tim.lap_ms() / nreps;
printf("sum of %d random nums, host: %.3f, %.4f ms, L1 v7: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t91);
// gpu test reduce7_L1 coal
thrust::device_vector<float> dy_even(blocks); // only even elements are used
thrust::device_vector<float> dy_odd(blocks); // only odd elements are used
dx = x;
tim.reset();
for(int rep=0;rep<nreps;rep++){
reduce7_coal_even_odd<<<blocks,threads>>>(dy_even.data().get(),dy_odd.data().get(),dx.data().get(),N);
if (rep == 0) {
// use reduce7_L1 for final step
// dx[0] = 0.0f; // clear output buffer
reduce7_L1<<<1,blocks>>>(dx.data().get(),dy_even.data().get(),blocks);
reduce7_L1<<<1,blocks>>>(dx.data().get(),dy_odd.data().get(),blocks);
gpu_sum = dx[0];
}
}
cudaDeviceSynchronize();
double t92 = tim.lap_ms()/nreps;
// gpu_sum = dx[0]; // D2H copy (1 word)
printf("sum of %d random nums, host: %.3f, %.4f ms, L1 Co: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t92);
// gpu test reduce 7 L1 coal any
dx = x;
thrust::fill(dy.begin(), dy.end(), 0.0);
tim.reset();
for(int rep=0;rep<nreps;rep++){
reduce7_any<<<blocks,threads>>>(dy.data().get(),dx.data().get(),N);
if (rep == 0) {
reduce7_L1<<<1,blocks>>>(dx.data().get(),dy.data().get(),blocks);
gpu_sum = dx[0];
}
}
cudaDeviceSynchronize();
double t93 = tim.lap_ms() / nreps;
printf("sum of %d random nums, host: %.3f, %.4f ms, L any: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t93);
// gpu test reduce8
dx = x;
thrust::fill(dy.begin(), dy.end(), 0.0);
tim.reset();
for (int rep = 0; rep < nreps; rep++) {
reduce8<<<blocks, threads, threads*sizeof(float)>>>(dy.data().get(), dx.data().get(), N);
reduce8<<<1, blocks, blocks*sizeof(float)>>>(dx.data().get(), dy.data().get(), blocks);
if (rep == 0) gpu_sum = dx[0];
}
cudaDeviceSynchronize();
double t10 = tim.lap_ms() / nreps;
printf("sum of %d random nums, host: %.3f, %.4f ms, GPU 8: %.3f, %.4f ms \n", N, host_sum, t1, gpu_sum, t10);
}
- 小结
1)尝试了9+1中CUDA中最基本的reduce方法,从基本方法到高级库,从精度和速度两方面进行的对比,可以看到与CPU的reduce算法相比,GPU算法明显快很多,更好的显卡,速度应该会更快,同时要注意部分精度损失,这也是CUDA编程中应注意到是否在误差允许范围内;
2)理论上reduce7_L1_coal应该比reduce7_L1,reduce7速度更快,实测本地电脑测试反而更慢了,猜测原因可能是机器的GPU性能受限导致的;
3)以上测试结果在不同机子上的总体趋势较一致,细微有差别,与GPU的架构、block\threads的数量、数据计算的总量等都有关系,实际应用中可能需要进一步微调。