MPI
OpenMPI是MPI的一份开源实现
每个进程都从 main() 开始运行。
进程间数据是隔离的,必须通过消息传递共享数据。
通信必须显式管理(发送方和接收方必须匹配)。
安装
# MacOS
brew install open-mpi
mpicc --version
mpirun --version
使用
# 编译,注意这里的link顺序,【-lc++】要放在源文件后面
mpiCC --std=c++17 -stdlib=libc++ e1.cpp -lc++ -o e1
# 运行
# 运行环境需要配置好OpenMPI,启动程序也要借助mpirun(MPI 应用依赖于 MPI 运行时环境(Runtime Environment,RTE))
# -np 2:启动2个【进程】
mpirun -np 2 ./e1
Send/Recv的同步和异步
// 普通的Send/Recv
MPI_Send(buf, count, type, dest, tag, comm);
MPI_Recv(buf, count, type, source, tag, comm, &status);
// TODO:同步/异步
CUDA‑aware MPI
sample code
speed test
// mpiCC --std=c++17 -stdlib=libc++ e1.cpp -lc++ -o e1
// mpirun -np 2 ./e1
#include <algorithm>
#include <iostream>
#include <cstdint>
#include <vector>
#include <map>
#include <mpi.h>
int32_t this_rank = -1;
// run_id: (data_length, elapsed_time)
std::map<int32_t, std::pair<int64_t, double> > run_log;
inline void speed_test(int32_t run_id, int64_t data_length, int32_t test_count = 8) {
MPI_Barrier(MPI_COMM_WORLD);
std::vector<uint8_t> buffer(data_length, 0);
double start = 0, finish = 0, elapsed_time_sum = 0;
for(int32_t i = 0; i < test_count; i++) {
MPI_Barrier(MPI_COMM_WORLD);
start = MPI_Wtime();
if(this_rank == 0) {
MPI_Send(buffer.data(), data_length, MPI_UNSIGNED_CHAR, 1, 0, MPI_COMM_WORLD);
} else if(this_rank == 1) {
MPI_Recv(
buffer.data(), data_length, MPI_UNSIGNED_CHAR, 0, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE
);
}
MPI_Barrier(MPI_COMM_WORLD);
finish = MPI_Wtime();
elapsed_time_sum += finish - start;
}
if(this_rank == 0) {
run_log[run_id] = std::make_pair(data_length, elapsed_time_sum / test_count);
}
}
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &this_rank);
speed_test(1, (int64_t)1024 * 1024 * 32, 16); // warmup
speed_test( 1, (int64_t)1024 * 64); // 64KB
speed_test( 2, (int64_t)1024 * 128); // 128KB
speed_test( 3, (int64_t)1024 * 256); // 256KB
speed_test( 4, (int64_t)1024 * 512); // 512KB
speed_test( 5, (int64_t)1024 * 1024); // 1MB
speed_test( 6, (int64_t)1024 * 1024 * 2); // 2MB
speed_test( 7, (int64_t)1024 * 1024 * 4); // 4MB
speed_test( 8, (int64_t)1024 * 1024 * 8); // 8MB
speed_test( 9, (int64_t)1024 * 1024 * 16); // 16MB
speed_test(10, (int64_t)1024 * 1024 * 32); // 32MB
speed_test(11, (int64_t)1024 * 1024 * 64); // 64MB
speed_test(12, (int64_t)1024 * 1024 * 128); // 128MB
speed_test(13, (int64_t)1024 * 1024 * 256); // 256MB
speed_test(14, (int64_t)1024 * 1024 * 512); // 512MB
speed_test(15, (int64_t)1024 * 1024 * 1024); // 1GB
if(this_rank == 0) { // save the result
FILE *fp = freopen("speed_test_result.csv", "w", stdout);
for (auto &kv : run_log) {
double size_MB = kv.second.first / 1024.0 / 1024.0;
double size_GB = size_MB / 1024.0;
double time_s = kv.second.second;
double bw_GBps = size_GB / time_s;
std::cout << kv.first << ", " << size_MB << "MB, "
<< time_s << "s, " << bw_GBps << "GB/s" << std::endl;
}
fclose(fp);
}
MPI_Finalize();
return 0;
}
gaussian elimination
// mpiCC --std=c++17 -stdlib=libc++ e2.cpp -lc++ -o e2
// mpirun -np 32 ./e2
#include <algorithm>
#include <iostream>
#include <cstdint>
#include <cstring>
#include <ctime>
#include <vector>
#include <map>
#include <mpi.h>
struct RunResult {
int32_t nodes_count;
int32_t matrix_size;
double elapsed_time;
double average_error;
};
int32_t this_rank = -1;
std::map<int32_t, RunResult> run_log;
inline void generate_matrix(
const int32_t matrix_size,
std::vector< std::vector<double> > &A,
std::vector<double> &b,
std::vector<double> &x
) {
A = std::vector< std::vector<double> >(
matrix_size, std::vector<double>(matrix_size, 0)
);
b.resize(matrix_size);
x.resize(matrix_size);
for(int32_t i = 0; i < matrix_size; i++) {
x[i] = rand() % 1024 + 1;
}
for(int32_t i = 0; i < matrix_size; i++) {
double row_sum = 0;
for(int32_t j = 0; j < matrix_size; j++) {
if(i == j) continue;
double v = rand() % 1024 + 1;
A[i][j] = v;
row_sum += std::abs(v);
}
A[i][i] = row_sum + 1;
}
for(int32_t i = 0; i < matrix_size; i++) {
double s = 0;
for(int32_t j = 0; j < matrix_size; j++) {
s += A[i][j] * x[j];
}
b[i] = s;
}
}
inline void gaussian_elimination(int32_t run_id, int32_t nodes_count, int32_t matrix_size) {
MPI_Barrier(MPI_COMM_WORLD);
// to run multiple test cases at once
int32_t color = (this_rank < nodes_count) ? 0 : MPI_UNDEFINED;
MPI_Comm subcomm;
MPI_Comm_split(MPI_COMM_WORLD, color, this_rank, &subcomm);
if (color == MPI_UNDEFINED) {
return;
}
int32_t sub_rank, sub_size;
MPI_Comm_rank(subcomm, &sub_rank);
MPI_Comm_size(subcomm, &sub_size);
// generate a random matrix on node 0
std::vector< std::vector<double> > A;
std::vector<double> b;
std::vector<double> x;
if(sub_rank == 0) {
generate_matrix(matrix_size, A, b, x);
}
// send / recv the init data
std::map<int32_t, std::vector<double> > rows_on_this_rank;
for(int32_t row_index = 0; row_index < matrix_size; row_index++) {
int32_t target_rank = row_index % sub_size;
if(sub_rank == target_rank) {
rows_on_this_rank[row_index] = std::vector<double>(matrix_size + 1, 0);
}
if(sub_rank == 0) { // send to target
if(target_rank == 0) { // target is self
memcpy(
rows_on_this_rank[row_index].data(), A[row_index].data(),
sizeof(double) * matrix_size
);
rows_on_this_rank[row_index][matrix_size] = b[row_index];
} else { // target is others
MPI_Send(
A[row_index].data(), matrix_size, MPI_DOUBLE, target_rank,
row_index, subcomm
);
MPI_Send(
&b[row_index], 1, MPI_DOUBLE, target_rank, row_index, subcomm
);
}
} else if(sub_rank == target_rank) { // recv from node 0
MPI_Recv(
rows_on_this_rank[row_index].data(), matrix_size, MPI_DOUBLE, 0,
row_index, subcomm, MPI_STATUS_IGNORE
);
MPI_Recv(
&(rows_on_this_rank[row_index][matrix_size]), 1, MPI_DOUBLE, 0,
row_index, subcomm, MPI_STATUS_IGNORE
);
}
}
MPI_Barrier(subcomm);
double start = MPI_Wtime();
// Forward elimination
std::vector<double> pivot_row(matrix_size + 1);
for(int32_t k = 0; k < matrix_size; k++) {
int32_t owner_rank = k % sub_size;
if(sub_rank == owner_rank) {
pivot_row = rows_on_this_rank[k];
}
MPI_Bcast(
pivot_row.data(),
matrix_size + 1,
MPI_DOUBLE,
owner_rank,
subcomm
);
for(auto &kv : rows_on_this_rank) {
if(kv.first <= k) continue;
std::vector<double> &row = kv.second;
double factor = row[k] / pivot_row[k];
if(std::abs(factor) < 1e-8) continue;
for(int32_t j = k; j < matrix_size + 1; j++) {
row[j] -= factor * pivot_row[j];
}
row[k] = 0;
}
}
// Back substitution
std::vector<double> result_x(matrix_size, 0);
for(int32_t i = matrix_size - 1; i >= 0; i--) {
int32_t owner_rank = i % sub_size;
double xi = 0;
if(sub_rank == owner_rank) {
std::vector<double> &row = rows_on_this_rank[i];
double rhs = row[matrix_size];
for(int32_t j = i + 1; j < matrix_size; j++) {
rhs -= row[j] * result_x[j];
}
xi = rhs / row[i];
}
MPI_Bcast(&xi, 1, MPI_DOUBLE, owner_rank, subcomm);
result_x[i] = xi;
}
MPI_Barrier(subcomm);
double finish = MPI_Wtime();
MPI_Comm_free(&subcomm);
// save the result
if(sub_rank == 0) {
// calc the error
double error_sum = 0;
for(int32_t i = 0; i < matrix_size; i++) {
error_sum += std::abs(x[i] - result_x[i]);
}
run_log[run_id] = (RunResult){
nodes_count, matrix_size,
finish - start, error_sum / matrix_size
};
}
}
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &this_rank);
srand((unsigned)time(nullptr));
std::vector<int32_t> matrix_size_v({512, 1024, 2048, 4096, 8192, 16384});
std::vector<int32_t> nodes_count_v({1, 2, 4, 8, 16, 32});
int32_t run_id = 0;
for(int32_t m_size : matrix_size_v) {
for(int32_t n_count : nodes_count_v) {
gaussian_elimination(++run_id, n_count, m_size);
}
}
if(this_rank == 0) { // save the result
FILE *fp = freopen("gaussian_elimination_result.csv", "w", stdout);
for (auto &kv : run_log) {
int32_t nodes_count = kv.second.nodes_count;
int32_t matrix_size = kv.second.matrix_size;
double time_s = kv.second.elapsed_time;
double avg_err = kv.second.average_error;
int64_t FLOPS = static_cast<int64_t>(
(2.0 / 3.0) * matrix_size * matrix_size * matrix_size / time_s
);
std::cout << kv.first << ", " << nodes_count << ", "
<< matrix_size << ", " << time_s << "s, " << FLOPS
<< ", " << avg_err << std::endl;
}
fclose(fp);
}
MPI_Finalize();
return 0;
}