Table of Contents

MPI

OpenMPI是MPI的一份开源实现

  1. 每个进程都从 main() 开始运行。
  2. 进程间数据是隔离的,必须通过消息传递共享数据。
  3. 通信必须显式管理(发送方和接收方必须匹配)。

安装

# MacOS
brew install open-mpi

mpicc --version
mpirun --version

使用

# 编译,注意这里的link顺序,【-lc++】要放在源文件后面
mpiCC --std=c++17 -stdlib=libc++ e1.cpp -lc++ -o e1

# 运行
# 运行环境需要配置好OpenMPI,启动程序也要借助mpirun(MPI 应用依赖于 MPI 运行时环境(Runtime Environment,RTE))
# -np 2:启动2个【进程】
mpirun -np 2 ./e1

Send/Recv的同步和异步

// 普通的Send/Recv
MPI_Send(buf, count, type, dest, tag, comm);
MPI_Recv(buf, count, type, source, tag, comm, &status);
 
// TODO:同步/异步

CUDA‑aware MPI

sample code

speed test

// mpiCC --std=c++17 -stdlib=libc++ e1.cpp -lc++ -o e1
// mpirun -np 2 ./e1
#include <algorithm>
#include <iostream>
#include <cstdint>
#include <vector>
#include <map>
 
#include <mpi.h>
 
int32_t this_rank = -1;
 
// run_id: (data_length, elapsed_time)
std::map<int32_t, std::pair<int64_t, double> > run_log;
 
inline void speed_test(int32_t run_id, int64_t data_length, int32_t test_count = 8) {
    MPI_Barrier(MPI_COMM_WORLD);
 
    std::vector<uint8_t> buffer(data_length, 0);
    double start = 0, finish = 0, elapsed_time_sum = 0;
 
    for(int32_t i = 0; i < test_count; i++) {
        MPI_Barrier(MPI_COMM_WORLD);
        start = MPI_Wtime();
 
        if(this_rank == 0) {
            MPI_Send(buffer.data(), data_length, MPI_UNSIGNED_CHAR, 1, 0, MPI_COMM_WORLD);
        } else if(this_rank == 1) {
            MPI_Recv(
                buffer.data(), data_length, MPI_UNSIGNED_CHAR, 0, 0,
                MPI_COMM_WORLD, MPI_STATUS_IGNORE
            );
        }
 
        MPI_Barrier(MPI_COMM_WORLD);
        finish = MPI_Wtime();
        elapsed_time_sum += finish - start;
    }
 
    if(this_rank == 0) {
        run_log[run_id] = std::make_pair(data_length, elapsed_time_sum / test_count);
    }
}
 
int main(int argc, char **argv) {
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &this_rank);
 
    speed_test(1, (int64_t)1024 * 1024 * 32, 16); // warmup
 
    speed_test( 1, (int64_t)1024 * 64); // 64KB
    speed_test( 2, (int64_t)1024 * 128); // 128KB
    speed_test( 3, (int64_t)1024 * 256); // 256KB
    speed_test( 4, (int64_t)1024 * 512); // 512KB
    speed_test( 5, (int64_t)1024 * 1024); // 1MB
    speed_test( 6, (int64_t)1024 * 1024 * 2); // 2MB
    speed_test( 7, (int64_t)1024 * 1024 * 4); // 4MB
    speed_test( 8, (int64_t)1024 * 1024 * 8); // 8MB
    speed_test( 9, (int64_t)1024 * 1024 * 16); // 16MB
    speed_test(10, (int64_t)1024 * 1024 * 32); // 32MB
    speed_test(11, (int64_t)1024 * 1024 * 64); // 64MB
    speed_test(12, (int64_t)1024 * 1024 * 128); // 128MB
    speed_test(13, (int64_t)1024 * 1024 * 256); // 256MB
    speed_test(14, (int64_t)1024 * 1024 * 512); // 512MB
    speed_test(15, (int64_t)1024 * 1024 * 1024); // 1GB
 
    if(this_rank == 0) { // save the result
        FILE *fp = freopen("speed_test_result.csv", "w", stdout);
        for (auto &kv : run_log) {
            double size_MB = kv.second.first / 1024.0 / 1024.0;
            double size_GB = size_MB / 1024.0;
            double time_s = kv.second.second;
            double bw_GBps = size_GB / time_s;
            std::cout << kv.first << ", " << size_MB << "MB, "
                << time_s << "s, " << bw_GBps << "GB/s" << std::endl;
        }
        fclose(fp);
    }
 
    MPI_Finalize();
    return 0;
}

gaussian elimination

// mpiCC --std=c++17 -stdlib=libc++ e2.cpp -lc++ -o e2
// mpirun -np 32 ./e2
#include <algorithm>
#include <iostream>
#include <cstdint>
#include <cstring>
#include <ctime>
#include <vector>
#include <map>
 
#include <mpi.h>
 
struct RunResult {
    int32_t nodes_count;
    int32_t matrix_size;
    double elapsed_time;
    double average_error;
};
 
int32_t this_rank = -1;
 
std::map<int32_t, RunResult> run_log;
 
inline void generate_matrix(
    const int32_t matrix_size,
    std::vector< std::vector<double> > &A,
    std::vector<double> &b,
    std::vector<double> &x
) {
    A = std::vector< std::vector<double> >(
        matrix_size, std::vector<double>(matrix_size, 0)
    );
    b.resize(matrix_size);
    x.resize(matrix_size);
 
    for(int32_t i = 0; i < matrix_size; i++) {
        x[i] = rand() % 1024 + 1;
    }
 
    for(int32_t i = 0; i < matrix_size; i++) {
        double row_sum = 0;
        for(int32_t j = 0; j < matrix_size; j++) {
            if(i == j) continue;
            double v = rand() % 1024 + 1;
            A[i][j] = v;
            row_sum += std::abs(v);
        }
        A[i][i] = row_sum + 1;
    }
 
    for(int32_t i = 0; i < matrix_size; i++) {
        double s = 0;
        for(int32_t j = 0; j < matrix_size; j++) {
            s += A[i][j] * x[j];
        }
        b[i] = s;
    }
}
 
inline void gaussian_elimination(int32_t run_id, int32_t nodes_count, int32_t matrix_size) {
    MPI_Barrier(MPI_COMM_WORLD);
 
    // to run multiple test cases at once
    int32_t color = (this_rank < nodes_count) ? 0 : MPI_UNDEFINED;
    MPI_Comm subcomm;
    MPI_Comm_split(MPI_COMM_WORLD, color, this_rank, &subcomm);
    if (color == MPI_UNDEFINED) {
        return;
    }
 
    int32_t sub_rank, sub_size;
    MPI_Comm_rank(subcomm, &sub_rank);
    MPI_Comm_size(subcomm, &sub_size);
 
    // generate a random matrix on node 0
    std::vector< std::vector<double> > A;
    std::vector<double> b;
    std::vector<double> x;
    if(sub_rank == 0) {
        generate_matrix(matrix_size, A, b, x);
    }
 
    // send / recv the init data
    std::map<int32_t, std::vector<double> > rows_on_this_rank;
    for(int32_t row_index = 0; row_index < matrix_size; row_index++) {
        int32_t target_rank = row_index % sub_size;
        if(sub_rank == target_rank) {
            rows_on_this_rank[row_index] = std::vector<double>(matrix_size + 1, 0);
        }
 
        if(sub_rank == 0) { // send to target
            if(target_rank == 0) { // target is self
                memcpy(
                    rows_on_this_rank[row_index].data(), A[row_index].data(),
                    sizeof(double) * matrix_size
                );
                rows_on_this_rank[row_index][matrix_size] = b[row_index];
            } else { // target is others
                MPI_Send(
                    A[row_index].data(), matrix_size, MPI_DOUBLE, target_rank,
                    row_index, subcomm
                );
                MPI_Send(
                    &b[row_index], 1, MPI_DOUBLE, target_rank, row_index, subcomm
                );
            }
        } else if(sub_rank == target_rank) { // recv from node 0
            MPI_Recv(
                rows_on_this_rank[row_index].data(), matrix_size, MPI_DOUBLE, 0,
                row_index, subcomm, MPI_STATUS_IGNORE
            );
            MPI_Recv(
                &(rows_on_this_rank[row_index][matrix_size]), 1, MPI_DOUBLE, 0,
                row_index, subcomm, MPI_STATUS_IGNORE
            );
        }
    }
 
    MPI_Barrier(subcomm);
    double start = MPI_Wtime();
 
    // Forward elimination
    std::vector<double> pivot_row(matrix_size + 1);
    for(int32_t k = 0; k < matrix_size; k++) {
        int32_t owner_rank = k % sub_size;
 
        if(sub_rank == owner_rank) {
            pivot_row = rows_on_this_rank[k];
        }
 
        MPI_Bcast(
            pivot_row.data(),
            matrix_size + 1,
            MPI_DOUBLE,
            owner_rank,
            subcomm
        );
 
        for(auto &kv : rows_on_this_rank) {
            if(kv.first <= k) continue;
            std::vector<double> &row = kv.second;
 
            double factor = row[k] / pivot_row[k];
            if(std::abs(factor) < 1e-8) continue;
 
            for(int32_t j = k; j < matrix_size + 1; j++) {
                row[j] -= factor * pivot_row[j];
            }
            row[k] = 0;
        }
    }
 
    // Back substitution
    std::vector<double> result_x(matrix_size, 0);
    for(int32_t i = matrix_size - 1; i >= 0; i--) {
        int32_t owner_rank = i % sub_size;
        double xi = 0;
 
        if(sub_rank == owner_rank) {
            std::vector<double> &row = rows_on_this_rank[i];
            double rhs = row[matrix_size];
            for(int32_t j = i + 1; j < matrix_size; j++) {
                rhs -= row[j] * result_x[j];
            }
            xi = rhs / row[i];
        }
 
        MPI_Bcast(&xi, 1, MPI_DOUBLE, owner_rank, subcomm);
        result_x[i] = xi;
    }
 
    MPI_Barrier(subcomm);
    double finish = MPI_Wtime();
 
    MPI_Comm_free(&subcomm);
 
    // save the result
    if(sub_rank == 0) {
        // calc the error
        double error_sum = 0;
        for(int32_t i = 0; i < matrix_size; i++) {
            error_sum += std::abs(x[i] - result_x[i]);
        }
 
        run_log[run_id] = (RunResult){
            nodes_count, matrix_size,
            finish - start, error_sum / matrix_size
        };
    }
}
 
int main(int argc, char **argv) {
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &this_rank);
 
    srand((unsigned)time(nullptr));
 
    std::vector<int32_t> matrix_size_v({512, 1024, 2048, 4096, 8192, 16384});
    std::vector<int32_t> nodes_count_v({1, 2, 4, 8, 16, 32});
 
    int32_t run_id = 0;
    for(int32_t m_size : matrix_size_v) {
        for(int32_t n_count : nodes_count_v) {
            gaussian_elimination(++run_id, n_count, m_size);
        }
    }
 
    if(this_rank == 0) { // save the result
        FILE *fp = freopen("gaussian_elimination_result.csv", "w", stdout);
        for (auto &kv : run_log) {
            int32_t nodes_count = kv.second.nodes_count;
            int32_t matrix_size = kv.second.matrix_size;
            double time_s = kv.second.elapsed_time;
            double avg_err = kv.second.average_error;
            int64_t FLOPS = static_cast<int64_t>(
                (2.0 / 3.0) * matrix_size * matrix_size * matrix_size / time_s
            );
            std::cout << kv.first << ", " << nodes_count << ", "
                << matrix_size << ", " << time_s << "s, " << FLOPS
                << ", " << avg_err << std::endl;
        }
        fclose(fp);
    }
 
    MPI_Finalize();
    return 0;
}