1,代码重新编辑
为了地毯式地检查结果的正确性,这里修改了代码
主要步骤为
step1: data_p指向的空间中,分别生成随机数;
step2: 分别拷贝到gpu的sendbuff的显存中;
step3: 通过nccl_all_reduce sum;
step4: 取回 recvbuff中的数据;
step5: 将data_p中的数据 allreduce sum;
step6: 对比 recvbuff中的数据与 data_p中的数据的一致性;
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <omp.h>
// seed_base
// void gen_512(seed, A, start_idx, N)
// gen_512(seed_base+0, A, 0*512, N); gen_512(seed_base + 1, A+1*512); gen_512(seed_base + 2, A+2*512); ... gen_512(seed_base + n, A, , N); ...
#if 1
//NN is tile length
#define NN 312
#define MM 156
#define MATRIX_A 0xB5026F5AA96619E9ULL
#define UM 0xFFFFFFFF80000000ULL /* Most significant 33 bits */
#define LM 0x7FFFFFFFULL /* Least significant 31 bits */
class thread_mt199937{
public:
void srand(unsigned long long seed)
{
init_genrand64(seed);
}
/* generates a random number on [0, 2^63-1]-interval */
long long genrand64_int63(void)
{
return (long long)(genrand64_int64() >> 1);
}
/* generates a random number on [0,1]-real-interval */
double genrand64_real1(void)
{
return (genrand64_int64() >> 11) * (1.0/9007199254740991.0);
}
/* generates a random number on [0,1)-real-interval */
double genrand64_real2(void)
{
return (genrand64_int64() >> 11) * (1.0/9007199254740992.0);
}
/* generates a random number on (0,1)-real-interval */
double genrand64_real3(void)
{
return ((genrand64_int64() >> 12) + 0.5) * (1.0/4503599627370496.0);
}
private:
/* The array for the state vector */
unsigned long long mt[NN];
/* mti==NN+1 means mt[NN] is not initialized */
int mti=NN+1;
/* initializes mt[NN] with a seed */
void init_genrand64(unsigned long long seed)
{
mt[0] = seed;
for (mti=1; mti<NN; mti++)
mt[mti] = (6364136223846793005ULL * (mt[mti-1] ^ (mt[mti-1] >> 62)) + mti);
}
/* initialize by an array with array-length */
/* init_key is the array for initializing keys */
/* key_length is its length */
void init_by_array64(unsigned long long init_key[],
unsigned long long key_length)
{
unsigned long long i, j, k;
init_genrand64(19650218ULL);
i=1; j=0;
k = (NN>key_length ? NN : key_length);
for (; k; k--) {
mt[i] = (mt[i] ^ ((mt[i-1] ^ (mt[i-1] >> 62)) * 3935559000370003845ULL))
+ init_key[j] + j; /* non linear */
i++; j++;
if (i>=NN) { mt[0] = mt[NN-1]; i=1; }
if (j>=key_length) j=0;
}
for (k=NN-1; k; k--) {
mt[i] = (mt[i] ^ ((mt[i-1] ^ (mt[i-1] >> 62)) * 2862933555777941757ULL))
- i; /* non linear */
i++;
if (i>=NN) { mt[0] = mt[NN-1]; i=1; }
}
mt[0] = 1ULL << 63; /* MSB is 1; assuring non-zero initial array */
}
/* generates a random number on [0, 2^64-1]-interval */
unsigned long long genrand64_int64(void)
{
int i;
unsigned long long x;
static unsigned long long mag01[2]={0ULL, MATRIX_A};
if (mti >= NN) { /* generate NN words at one time */
/* if init_genrand64() has not been called, */
/* a default initial seed is used */
if (mti == NN+1)
init_genrand64(5489ULL);
for (i=0;i<NN-MM;i++) {
x = (mt[i]&UM)|(mt[i+1]&LM);
mt[i] = mt[i+MM] ^ (x>>1) ^ mag01[(int)(x&1ULL)];
}
for (;i<NN-1;i++) {
x = (mt[i]&UM)|(mt[i+1]&LM);
mt[i] = mt[i+(MM-NN)] ^ (x>>1) ^ mag01[(int)(x&1ULL)];
}
x = (mt[NN-1]&UM)|(mt[0]&LM);
mt[NN-1] = mt[MM-1] ^ (x>>1) ^ mag01[(int)(x&1ULL)];
mti = 0;
//printf("LL::\n");
}
x = mt[mti++];
x ^= (x >> 29) & 0x5555555555555555ULL;
x ^= (x << 17) & 0x71D67FFFEDA60000ULL;
x ^= (x << 37) & 0xFFF7EEE000000000ULL;
x ^= (x >> 43);
return x;
}
};
#endif
//#define NN 312
// multi thread accelerating, to be singleton
class parallel_mt19937{
public:
// int _base_seed;
void srand(unsigned long long seed)
{
_base_seed = seed;
}
void rand_float(float* A, unsigned long len);
private:
unsigned long long _base_seed = 0;
};
//int pmt19937::_base_seed = 0;
void parallel_mt19937::rand_float(float* A, unsigned long len)
{
#pragma omp parallel
{
unsigned long block_dim = omp_get_num_threads();
unsigned long thid = omp_get_thread_num();
if(thid == block_dim -1)
std::cout << "Here are " << thid << " threads generating random number ..."<<std::endl;
unsigned long tile_count = (len+NN-1)/NN;
thread_mt199937 *t_mt_p = new thread_mt199937();// to be singleton
for(unsigned long tile_id=thid; tile_id<tile_count; tile_id+=block_dim){
//each tile has a specific seed: (_base_seed + tile_id) to smt19937, to keep consistence
unsigned long tile_seed = _base_seed + tile_id;
t_mt_p->srand(tile_seed);
//if(thid == 35) std::cout << "Hello from thread " << thid << std::endl;
unsigned long tile_idx_start = tile_id*NN;
unsigned long tile_idx_end = (((tile_id+1)*NN) <= len)? (tile_id+1)*NN: len;
for(unsigned long idx = tile_idx_start; idx<tile_idx_end; idx++)
A[idx] = float(t_mt_p->genrand64_real2());
}
delete t_mt_p;
}
}
//
//#define BUILD_MAIN
//
#ifdef BUILD_MAIN
void print_matrix(unsigned long m, unsigned long n, float* A, unsigned long lda)
{
for(unsigned long i=m-7; i<m; i++){
for(unsigned long j=n-7; j<n; j++){
printf("%7.4f ", A[i + j*lda]);
}
printf("\n");
}
}
#include <string.h>
int main(){
unsigned long m = 312*4*32*32*128;
unsigned long n = 36;
unsigned long lda = m;
float* A = nullptr;
printf("LL: 00\n");
A = (float*)malloc(lda * n * sizeof(float));
//memset(A, 0x7F, lda*n*sizeof(float));
printf("LL:: 01\n");
parallel_mt19937 gen_rand;
gen_rand.srand(2024);
gen_rand.rand_float(A, lda*n);
printf("LL:: 02\n");
print_matrix(m, n, A, lda);
free(A);
return 0;
}
#endif
void init_data_float(float** data_p, int nDev, int size, unsigned long seed)
{
for(int idx=0; idx<nDev; idx++){
*(data_p+idx) = nullptr;
*(data_p+idx) = (float*)malloc(size*sizeof(float));
}
parallel_mt19937 gen_rand;
for(int idx =0; idx<nDev; idx++){
gen_rand.srand(seed+idx);
gen_rand.rand_float(*(data_p + idx), size);
}
}
void host_all_reduce_origin(float** sendbuff, int nDev, int size)
{
for(int idx=1; idx<nDev; idx++){
#pragma omp paralell for
for(int i=0; i<size; i++)
sendbuff[0][i] += sendbuff[idx][i];
}
}
void check_equal(float** result_recv_data_buff, float* sendbuff_0, int nDev, int size)
{
for(int idx=0; idx<nDev; idx++){
#pragma omp paralell for
for(int i=0; i<size; i++){
if(result_recv_data_buff[idx][i] != sendbuff_0[i])
printf("ERROR: %7.4f != %7.4f idx=%d, i=%d\n", result_recv_data_buff[idx][i], sendbuff_0[i], idx, i);
return;
}
}
}
void free_buff(float** buffArray, int n)
{
for(int i=0; i<n; i++){
if(buffArray[i] != nullptr){
free(buffArray[i]);
}
}
}
//
#include <stdlib.h>
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#define CUDACHECK(cmd) do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t res = cmd; \
if (res != ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(res)); \
exit(EXIT_FAILURE); \
} \
} while(0)
int main(int argc, char* argv[])
{
ncclComm_t comms[4];
//managing 4 devices
//int nDev = 4;
int nDev = 2;
int size = 32*1024*1024*16;
int devs[4] = { 0, 1, 2, 3 };
float** data_p = (float**)malloc(nDev*sizeof(float*));
init_data_float(data_p, nDev, size, 2024);
//allocating and initializing device buffers
float** sendbuff = (float**)malloc(nDev * sizeof(float*));
float** recvbuff = (float**)malloc(nDev * sizeof(float*));
cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaMalloc((void**)sendbuff + i, size * sizeof(float)));
CUDACHECK(cudaMalloc((void**)recvbuff + i, size * sizeof(float)));
//CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
CUDACHECK(cudaMemcpy(sendbuff[i], data_p[i], size*sizeof(float), cudaMemcpyHostToDevice));
//CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(s+i));
}
printf("LL:: 04\n");
//initializing NCCL
NCCLCHECK(ncclCommInitAll(comms, nDev, devs));
printf("LL:: 05\n");
//calling NCCL communication API. Group API is required when using
//multiple devices per thread
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < nDev; ++i)
NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
comms[i], s[i]));
NCCLCHECK(ncclGroupEnd());
printf("LL:: 06\n");
/*LL::
In a sum allreduce operation between k ranks,
each rank will provide an array in of N values,
and receive identical results in array out of N values,
where out[i] = in0[i]+in1[i]+…+in(k-1)[i]
*/
//synchronizing on CUDA streams to wait for completion of NCCL operation
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaStreamSynchronize(s[i]));
}
printf("LL:: 07\n");
/*check:
each recvbuff[0] = sendbuff_0[0]+ sendbuff_1[0];
0x01010101 + 0x01010101 = 0x02020202;
*/
float** result_recv_data_buff;
result_recv_data_buff = (float**)malloc(nDev*sizeof(float*));
init_data_float(result_recv_data_buff, nDev, size, 2024+nDev);
printf("LL:: 08\n");
for(int idx=0; idx<nDev; idx++){
cudaMemcpy(result_recv_data_buff[idx], recvbuff[idx], size*sizeof(float), cudaMemcpyDeviceToHost);
}
printf("LL:: 09\n");
host_all_reduce_origin(data_p, nDev, size);
printf("LL:: 10\n");
// ditanshi check
check_equal(result_recv_data_buff, data_p[0], nDev, size);
printf("LL:: 11\n");
free_buff(data_p, nDev);
free_buff(result_recv_data_buff, nDev);
//free device buffers
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(i));
CUDACHECK(cudaFree(sendbuff[i]));
CUDACHECK(cudaFree(recvbuff[i]));
}
//finalizing NCCL
for(int i = 0; i < nDev; ++i)
ncclCommDestroy(comms[i]);
printf("Success \n");
return 0;
}
Makefile
EXE := ex_1_1_SingleProcessSingleThreadMultipleDevices
all: $(EXE)
INC := -I /usr/local/cuda/include -I /home/hipper/ex_nccl_20240701/local/include/
LD_FLAGS := -L /usr/local/cuda/lib64 -L /home/hipper/ex_nccl_20240701/local/lib -lcudart -lnccl -fopenmp
%: %.cpp
g++ -g $< -o $@ $(INC) $(LD_FLAGS)
.PHONY: clean
clean:
-rm -rf $(EXE)
2,编译运行
3,问题
显存占用了 7GB 和 8GB,实际数据应该只有2GB,recvbuff 2GB,总共4GB