1. The Message Passing Model
MPI (Message Passing Interface) is the standard for distributed-memory parallel programming. Each process has its own private memory; data is exchanged explicitly via messages.
┌──────────────┐ message ┌──────────────┐
│ Process 0 │ ────────────→ │ Process 1 │
│ ┌────────┐ │ │ ┌────────┐ │
│ │ Memory │ │ │ │ Memory │ │
│ └────────┘ │ ←──────────── │ └────────┘ │
└──────────────┘ message └──────────────┘
Key characteristics:
- Processes are independent programs (SPMD: Single Program, Multiple Data)
- No shared state—all communication is explicit
- Scales to thousands of nodes across networks
2. MPI Basics
Initialization and Finalization
Every MPI program begins with MPI_Init and ends with MPI_Finalize:
#include <mpi.h>
int main(
int argc,
char** argv) {
MPI_Init(&argc, &argv);
int rank, size;
MPI_Finalize();
return 0;
}
constexpr MPI_Comm MPI_COMM_WORLD
Compile and run:
mpicxx -o program program.cpp
mpirun -np 4 ./program
Communicators
A communicator defines a group of processes that can communicate. MPI_COMM_WORLD includes all processes. You can create subgroups for hierarchical algorithms.
| Function | Description |
MPI_Comm_rank(comm, &rank) | Get process ID within communicator |
MPI_Comm_size(comm, &size) | Get number of processes |
MPI_Comm_split(...) | Create sub-communicators |
3. Point-to-Point Communication
Blocking Send/Receive
The most basic operations: MPI_Send and MPI_Recv.
if (rank == 0) {
double data = 3.14;
}
else if (rank == 1) {
double data;
MPI_Recv(&data, 1, MPI_DOUBLE, 0, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
Parameters:
count: Number of elements (not bytes)
tag: Message identifier (use 0 if you don't need it)
source/dest: Rank of partner process
Common MPI Datatypes
| MPI Type | C Type |
MPI_CHAR | char |
MPI_INT | int |
MPI_LONG | long |
MPI_FLOAT | float |
MPI_DOUBLE | double |
Deadlock
Deadlock occurs when processes wait for each other indefinitely:
if (rank == 0) {
MPI_Recv(..., 1, ...);
MPI_Send(..., 1, ...);
} else {
MPI_Recv(..., 0, ...);
MPI_Send(..., 0, ...);
}
Solution: Use MPI_Sendrecv or non-blocking operations:
MPI_Sendrecv(&send_data, 1, MPI_DOUBLE, partner, 0,
&recv_data, 1, MPI_DOUBLE, partner, 0,
Non-Blocking Communication
Non-blocking calls return immediately; use MPI_Wait to complete:
MPI_Request request;
MPI_Isend(&data, n, MPI_DOUBLE, dest, tag, comm, &request);
MPI_Wait(&request, MPI_STATUS_IGNORE);
This enables overlapping computation and communication.
4. Collective Operations
Collectives involve all processes in a communicator. They are optimized and easier to use than manual point-to-point patterns.
Broadcast
One process sends data to all others:
Before: [A B C D] [ - - - - ] [ - - - - ] [ - - - - ]
↓ ↓ ↓ ↓
After: [A B C D] [A B C D] [A B C D] [A B C D]
double data[4];
if (rank == 0) {
}
Reduce
Combine data from all processes using an operation:
Process 0: [3] ─┐
Process 1: [1] ─┼─→ MPI_SUM → [10] on root
Process 2: [4] ─┤
Process 3: [2] ─┘
double local_sum = compute_partial();
double global_sum;
MPI_Reduce(&local_sum, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0,
MPI_COMM_WORLD);
Reduction operations: MPI_SUM, MPI_PROD, MPI_MAX, MPI_MIN, MPI_LAND, MPI_LOR
Allreduce
Like Reduce, but result goes to all processes:
MPI_Allreduce(&local, &global, 1, MPI_DOUBLE, MPI_SUM,
MPI_COMM_WORLD);
Gather and Scatter
Gather: Collect data from all processes to root:
Before: [A] [B] [C] [D]
↘ ↓ ↙
After: [A B C D] on root
Scatter: Distribute data from root to all:
Before: [A B C D] on root
↙ ↓ ↘
After: [A] [B] [C] [D]
double local_val = rank * 1.0;
double* gathered = (rank == 0) ? new double[size] : nullptr;
MPI_Gather(&local_val, 1, MPI_DOUBLE, gathered, 1, MPI_DOUBLE, 0, comm);
double recv_val;
MPI_Scatter(gathered, 1, MPI_DOUBLE, &recv_val, 1, MPI_DOUBLE, 0, comm);
Allgather
Every process gets the gathered result:
double local = rank;
double all_data[size];
MPI_Allgather(&local, 1, MPI_DOUBLE, all_data, 1, MPI_DOUBLE, comm);
Summary Table
| Collective | Description | Complexity |
MPI_Bcast | One-to-all | \(O(\log p)\) |
MPI_Reduce | All-to-one with operation | \(O(\log p)\) |
MPI_Allreduce | Reduce + Broadcast | \(O(\log p)\) |
MPI_Gather | Collect to root | \(O(p)\) |
MPI_Scatter | Distribute from root | \(O(p)\) |
MPI_Allgather | Gather + Broadcast | \(O(p)\) |
MPI_Alltoall | Complete exchange | \(O(p)\) |
5. Distributed Linear Algebra with MPI
Distributed Vector
Partition a vector of length \(n\) across \(p\) processes:
int local_n = n / size;
std::vector<double> local_x(local_n);
Distributed Dot Product
\[\mathbf{x}^T \mathbf{y} = \sum_{i=0}^{p-1} \underbrace{\sum_{j \in \text{local}_i} x_j y_j}_{\text{local dot}}\]
double local_dot = 0.0;
for (int i = 0; i < local_n; ++i) {
local_dot += local_x[i] * local_y[i];
}
double global_dot;
MPI_Allreduce(&local_dot, &global_dot, 1, MPI_DOUBLE, MPI_SUM, comm);
Communication: One Allreduce = \(O(\log p)\) messages.
Distributed Norm
\[\|\mathbf{x}\|_2 = \sqrt{\sum_i x_i^2} = \sqrt{\text{Allreduce}\left(\sum_{j \in \text{local}} x_j^2\right)}\]
double local_sq = 0.0;
for (int i = 0; i < local_n; ++i) {
local_sq += local_x[i] * local_x[i];
}
double global_sq;
MPI_Allreduce(&local_sq, &global_sq, 1, MPI_DOUBLE, MPI_SUM, comm);
double norm = std::sqrt(global_sq);
Distributed Matrix-Vector Product
For \(\mathbf{y} = A\mathbf{x}\) with row-distributed \(A\):
Process 0: A[0:m/p, :] needs all of x → computes y[0:m/p]
Process 1: A[m/p:2m/p, :] needs all of x → computes y[m/p:2m/p]
...
Algorithm:
Allgather the distributed vector \(\mathbf{x}\)
- Each process computes its local rows
std::vector<double> full_x(n);
MPI_Allgather(local_x.data(), local_n, MPI_DOUBLE,
full_x.data(), local_n, MPI_DOUBLE, comm);
for (int i = 0; i < local_m; ++i) {
local_y[i] = 0.0;
for (int j = 0; j < n; ++j) {
local_y[i] += local_A[i * n + j] * full_x[j];
}
}
Communication cost: \(O(n)\) for the Allgather.
6. Performance Considerations
Communication vs. Computation
Let \(t_s\) = latency (startup time), \(t_w\) = time per word.
Point-to-point: \(T = t_s + n \cdot t_w\)
Broadcast (tree-based): \(T = t_s \log p + n \cdot t_w \log p\)
Allreduce: \(T = 2 t_s \log p + 2n \cdot t_w\)
Hiding Latency
Use non-blocking collectives (MPI-3):
MPI_Request req;
MPI_Iallreduce(&local, &global, 1, MPI_DOUBLE, MPI_SUM, comm, &req);
do_local_work();
MPI_Wait(&req, MPI_STATUS_IGNORE);
Load Balancing
Uneven work distribution kills parallel efficiency. For \(n\) not divisible by \(p\):
int local_n = n / size;
int remainder = n % size;
if (rank < remainder) local_n++;
7. Example: Parallel Conjugate Gradient
The CG algorithm for solving \(A\mathbf{x} = \mathbf{b}\) uses only:
- Matrix-vector products
- Dot products
- Vector updates (AXPY)
r = b - A*x;
p = r;
rsold = dot(r, r);
for (int i = 0; i < max_iter; ++i) {
Ap = A * p;
alpha = rsold / dot(p, Ap);
x = x + alpha * p;
r = r - alpha * Ap;
rsnew = dot(r, r);
if (sqrt(rsnew) < tol) break;
p = r + (rsnew/rsold) * p;
rsold = rsnew;
}
Communication per iteration: 2 Allreduce + 1 Allgather.
8. Our Library's MPI Interface
The num::mpi namespace provides:
}
void broadcast(real *data, idx n, int root=0, MPI_Comm comm=MPI_COMM_WORLD)
Broadcast from root.
void allreduce_sum(real *data, idx n, MPI_Comm comm=MPI_COMM_WORLD)
Allreduce sum.
real norm(const Vector &x, MPI_Comm comm=MPI_COMM_WORLD)
Distributed norm.
void init(int *argc, char ***argv)
Initialize MPI (call once)
void finalize()
Finalize MPI.
int size(MPI_Comm comm=MPI_COMM_WORLD)
Get communicator size.
int rank(MPI_Comm comm=MPI_COMM_WORLD)
Get communicator rank.
real dot(const Vector &x, const Vector &y, MPI_Comm comm=MPI_COMM_WORLD)
Distributed dot product (each rank holds partial vector)
constexpr T ipow(T x) noexcept
Compute x^N at compile time via repeated squaring.
BasicVector< real > Vector
Real-valued dense vector with full backend dispatch (CPU + GPU)
Usage:
#include "numerics/numerics.hpp"
int main(
int argc,
char** argv) {
int local_n = 1000;
}
Exercises
- Write an MPI program where each process computes its rank squared, then use
MPI_Reduce to find the sum of all squares.
- Implement a parallel vector addition \(\mathbf{z} = \mathbf{x} + \mathbf{y}\) where vectors are distributed across processes.
- Analyze the communication cost of a distributed matrix-vector product with an \(n \times n\) matrix on \(p\) processes (1D row distribution).
- Modify the dot product to use non-blocking
MPI_Iallreduce. What computation could you overlap?
- What happens if you call
MPI_Recv with source = MPI_ANY_SOURCE? When is this useful?
References
- Gropp, Lusk & Skjellum, Using MPI
- MPI Forum, MPI Standard (mpi-forum.org)
- Pacheco, Parallel Programming with MPI