Skip to main content

⏭ 线性代数与空间解析几何 下

题目

通过并行计算加速矩阵运算。

题解

多线程

先试试简单的多线程:

#include <iostream>
#include <string>
#include <functional>
#include <chrono>
#include <vector>
#include <thread>
#include <future>
using namespace std;


double timeit(std::function<void()> test) {
auto start = std::chrono::system_clock::now();
test();
auto stop = std::chrono::system_clock::now();
std::chrono::duration<double, std::milli> time = stop - start;
return time.count();
}

class matrix {
size_t m, n;
std::vector<int64_t> e;
public:
explicit matrix(size_t m, size_t n): m(m), n(n), e(m * n) {}
void random() {
for (size_t i = 0; i < m; ++i)
for (size_t j = 0; j < n; ++j)
at(i, j) = rand();
}
matrix(matrix const& that) = default;
matrix(matrix&&) = default;
matrix& operator=(matrix const& that) = default;
matrix& operator=(matrix&& that) = default;
int64_t& at(size_t i, size_t j) { return e[i * m + j]; }
int64_t const& at(size_t i, size_t j) const { return e[i * m + j]; }

matrix const& operator+=(matrix const& that) {
assert(m == that.m && n == that.n);
for (size_t i = 0; i < m; ++i) for (size_t j = 0; j < n; ++j) at(i, j) += that.at(i, j);
return *this;
}

matrix operator+(matrix that) const { return that += *this; }

matrix submat(size_t z, size_t w, size_t u, size_t v) const {
matrix sub(u, v);
for (size_t i = 0; i < u; ++i)
for (size_t j = 0; j < v; ++j)
sub.at(i, j) = at(z + i, w + j);
return sub;
}

matrix mul_plain(matrix const& that) const {
size_t p = that.m;
assert(n == p);
matrix product(m, that.n);
for (size_t i = 0; i < m; ++i)
for (size_t j = 0; j < n; ++j) {
for (size_t k = 0; k < p; ++k)
product.at(i, j) += at(i, k) * that.at(k, j);
}
return product;
}

matrix mul_recursive(matrix const& that, matrix inner_product(matrix const&, matrix const&, matrix const&, matrix const&)) const {
assert(m == that.m && n == that.n);
if (m == 1 && n == 1) {
matrix m(1, 1);
m.at(0, 0) = at(0, 0) * that.at(0, 0);
return m;
}
matrix A11 = submat(0, 0, m / 2, n / 2);
matrix A12 = submat(0, n / 2, m / 2, n - n / 2);
matrix A21 = submat(m / 2, 0, m - m / 2, n / 2);
matrix A22 = submat(m / 2, n / 2, m - m / 2, n - n / 2);
matrix B11 = that.submat(0, 0, m / 2, n / 2);
matrix B12 = that.submat(0, n / 2, m / 2, n - n / 2);
matrix B21 = that.submat(m / 2, 0, m - m / 2, n / 2);
matrix B22 = that.submat(m / 2, n / 2, m - m / 2, n - n / 2);
auto C11_ = std::async(inner_product, A11, B11, A12, B21);
auto C12_ = std::async(inner_product, A11, B12, A12, B22);
auto C21_ = std::async(inner_product, A21, B11, A22, B21);
auto C22_ = std::async(inner_product, A21, B12, A22, B22);
matrix C11 = C11_.get();
matrix C12 = C12_.get();
matrix C21 = C21_.get();
matrix C22 = C22_.get();
matrix r(m, n);
for (size_t i = 0; i < C11.m; ++i)
for (size_t j = 0; j < C11.n; ++j)
r.at(i, j) = C11.at(i, j);
for (size_t i = 0; i < C12.m; ++i)
for (size_t j = 0; j < C12.n; ++j)
r.at(i, C11.n + j) = C12.at(i, j);
for (size_t i = 0; i < C21.m; ++i)
for (size_t j = 0; j < C21.n; ++j)
r.at(C11.m + i, j) = C21.at(i, j);
for (size_t i = 0; i < C22.m; ++i)
for (size_t j = 0; j < C22.n; ++j)
r.at(C11.m + i, C11.n + j) = C22.at(i, j);
return r;
}

matrix mul_parellel(matrix const& that) const {
return mul_recursive(that, [](matrix const& a, matrix const& b, matrix const& c, matrix const& d) {
return a.mul_plain(b) + c.mul_plain(d);
});
}

matrix mul_parellel2(matrix const& that) const {
return mul_recursive(that, [](matrix const& a, matrix const& b, matrix const& c, matrix const& d) {
return a.mul_parellel(b) + c.mul_parellel(d);
});
}

matrix mul_parellel3(matrix const& that) const {
return mul_recursive(that, [](matrix const& a, matrix const& b, matrix const& c, matrix const& d) {
return a.mul_parellel2(b) + c.mul_parellel2(d);
});
}

matrix mul_parellel4(matrix const& that) const {
return mul_recursive(that, [](matrix const& a, matrix const& b, matrix const& c, matrix const& d) {
return a.mul_parellel3(b) + c.mul_parellel3(d);
});
}

bool operator==(matrix const& that) const {
return m == that.m && n == that.n && e == that.e;
}
};

void verify(matrix (matrix::* std)(matrix const&) const, matrix (matrix::* test)(matrix const&) const, const char* impl) {
srand(1);
for (int i = 0; i < 1000; ++i) {
matrix m1(16, 16), m2(16, 16);
m1.random(); m2.random();
if ((m1.*std)(m2) != (m1.*test)(m2)) {
fprintf(stderr, "Invalid implementation: %s\n", impl);
exit(-1);
}
}
fprintf(stdout, "Valid implementation: %s\n", impl);
}

double timeit(matrix (matrix::* test)(matrix const&) const, int size, int times, const char* impl, double base = 0) {
double time = timeit([=] {
srand(1);
for (int i = 0; i < times; ++i) {
matrix m1(size, size), m2(size, size);
m1.random(); m2.random();
(m1.*test)(m2);
}
});
printf("'%s' algorithm took %.2f ms to complete %d times %d x %d matrices multiplication", impl, time, times, size, size);
if (base && base != time) {
printf(", %.2f%% ", abs(base - time) / base * 100);
if (base > time) printf("faster");
else printf("slower");
}
puts("");
return time;
}


int main() {
verify(&matrix::mul_plain, &matrix::mul_parellel, "parellel");
double base = timeit(&matrix::mul_plain, 1024, 1, "sequence");
timeit(&matrix::mul_parellel, 1024, 1, "parellel", base);
timeit(&matrix::mul_parellel2, 1024, 1, "parellel2", base);
timeit(&matrix::mul_parellel3, 1024, 1, "parellel3", base);
timeit(&matrix::mul_parellel4, 1024, 1, "parellel4", base);
}

输出:

Valid implementation: parellel
'sequence' algorithm took 3920.91 ms to complete 1 times 1024 x 1024 matrices multiplication
'parellel' algorithm took 905.67 ms to complete 1 times 1024 x 1024 matrices multiplication, 76.90% faster
'parellel2' algorithm took 239.29 ms to complete 1 times 1024 x 1024 matrices multiplication, 93.90% faster
'parellel3' algorithm took 434.92 ms to complete 1 times 1024 x 1024 matrices multiplication, 88.91% faster
'parellel4' algorithm took 1317.48 ms to complete 1 times 1024 x 1024 matrices multiplication, 66.40% faster

可以看到最初的优化效果还是很明显的,我将一个矩阵分为四个小矩阵再进行乘法计算,得到的结果通过内积得到结果的子矩阵。如此再分一次,效果达到最好,可如果再分,效率不增反降了。这是因为开线程不是没有代价的,分的次数过多必然造成常数过大。

线程池化

此外优化多线程的一个重要方法就是线程池化。考虑到开线程的巨大开销,我们可以准备一组线程放到线程池中,需要进行计算时将任务交给线程池,线程池寻找空闲的线程执行任务。

// ...

// use https://github.com/progschj/ThreadPool
#include "ThreadPool.h"

ThreadPool pool(100);
bool pooled;

template<typename Fn, typename... Args>
[[nodiscard]] inline auto my_async(Fn&& fn, Args&&... args) {
return pooled
? pool.enqueue(std::forward<Fn>(fn), std::forward<Args>(args)...)
: std::async(std::forward<Fn>(fn), std::forward<Args>(args)...);
}

// ...
auto C11_ = my_async(inner_product, A11, B11, A12, B21);
auto C12_ = my_async(inner_product, A11, B12, A12, B22);
auto C21_ = my_async(inner_product, A21, B11, A22, B21);
auto C22_ = my_async(inner_product, A21, B12, A22, B22);
// ...

int main() {
double base = timeit(&matrix::mul_plain, 1024, 1, "sequence");
timeit(&matrix::mul_parellel, 1024, 1, "unpooled", base);
timeit(&matrix::mul_parellel2, 1024, 1, "unpooled2", base);
timeit(&matrix::mul_parellel3, 1024, 1, "unpooled3", base);
timeit(&matrix::mul_parellel4, 1024, 1, "unpooled4", base);
pooled = true;
timeit(&matrix::mul_parellel, 1024, 1, "pooled", base);
timeit(&matrix::mul_parellel2, 1024, 1, "pooled2", base);
timeit(&matrix::mul_parellel3, 1024, 1, "pooled3", base);
timeit(&matrix::mul_parellel4, 1024, 1, "pooled4", base);
}

输出:

'sequence' algorithm took 4011.29 ms to complete 1 times 1024 x 1024 matrices multiplication
'unpooled' algorithm took 895.97 ms to complete 1 times 1024 x 1024 matrices multiplication, 77.66% faster
'unpooled2' algorithm took 241.15 ms to complete 1 times 1024 x 1024 matrices multiplication, 93.99% faster
'unpooled3' algorithm took 439.58 ms to complete 1 times 1024 x 1024 matrices multiplication, 89.04% faster
'unpooled4' algorithm took 1384.14 ms to complete 1 times 1024 x 1024 matrices multiplication, 65.49% faster
'pooled' algorithm took 880.25 ms to complete 1 times 1024 x 1024 matrices multiplication, 78.06% faster
'pooled2' algorithm took 233.02 ms to complete 1 times 1024 x 1024 matrices multiplication, 94.19% faster
'pooled3' algorithm took 329.14 ms to complete 1 times 1024 x 1024 matrices multiplication, 91.79% faster
'pooled4' algorithm took 832.20 ms to complete 1 times 1024 x 1024 matrices multiplication, 79.25% faster

实际上,不仅线程可以池化,内存也可以池化(这里不断分小矩阵用的内存也不少),从而减少内存分配和释放带来的常数。

CUDA

但要说到矩阵并行运算,还得是显卡。借助 CUDA,我只需要写出矩阵某一元素的算式,而并行的事完全不需要自己操心。

#include <iostream>
#include <string>
#include <functional>
#include <chrono>
#include <vector>
#include <cassert>
using namespace std;


#include <cuda.h>

double timeit(std::function<void()> test) {
auto start = std::chrono::system_clock::now();
test();
auto stop = std::chrono::system_clock::now();
std::chrono::duration<double, std::milli> time = stop - start;
return time.count();
}


__global__ void matProduct(int64_t* C, int64_t const* A, int64_t const* B, size_t m, size_t p, size_t n) {
size_t idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx >= m * n) return;

size_t i = idx / m;
size_t j = idx % n;
int64_t r = 0;
for (size_t k = 0; k < p; ++k) {
r += A[i * m + k] * B[k * m + j];
}
C[i * m + j] = r;
}


class matrix {
size_t m, n;
std::vector<int64_t> e;
public:
explicit matrix(size_t m, size_t n): m(m), n(n), e(m * n) {}
void random() {
for (size_t i = 0; i < m; ++i)
for (size_t j = 0; j < n; ++j)
at(i, j) = rand();
}
matrix(matrix const& that) = default;
matrix(matrix&&) = default;
matrix& operator=(matrix const& that) = default;
matrix& operator=(matrix&& that) = default;
int64_t& at(size_t i, size_t j) { return e[i * m + j]; }
int64_t const& at(size_t i, size_t j) const { return e[i * m + j]; }

matrix mul_plain(matrix const& that) const {
size_t p = that.m;
assert(n == p);
matrix product(m, that.n);
for (size_t i = 0; i < m; ++i)
for (size_t j = 0; j < n; ++j) {
for (size_t k = 0; k < p; ++k)
product.at(i, j) += at(i, k) * that.at(k, j);
}
return product;
}


matrix mul_cuda(matrix const& that) const {
size_t p = that.m;
matrix product(m, n);
int64_t *A, *B, *C;
size_t size = sizeof(int64_t) * m * n;
cudaMalloc(&A, size);
cudaMalloc(&B, size);
cudaMalloc(&C, size);
cudaMemcpy(A, e.data(), size, cudaMemcpyHostToDevice);
cudaMemcpy(B, that.e.data(), size, cudaMemcpyHostToDevice);
matProduct<<<m, n>>>(C, A, B, m, p, n);
cudaDeviceSynchronize();
cudaMemcpy(product.e.data(), C, size, cudaMemcpyDeviceToHost);
cudaFree(A);
cudaFree(B);
cudaFree(C);
return product;
}

bool operator==(matrix const& that) const {
return m == that.m && n == that.n && e == that.e;
}
};

void verify(matrix (matrix::* std)(matrix const&) const, matrix (matrix::* test)(matrix const&) const, const char* impl) {
srand(1);
for (int i = 0; i < 1000; ++i) {
matrix m1(16, 16), m2(16, 16);
m1.random(); m2.random();
if (!((m1.*std)(m2) == (m1.*test)(m2))) {
fprintf(stderr, "Invalid implementation: %s\n", impl);
exit(-1);
}
}
fprintf(stdout, "Valid implementation: %s\n", impl);
}

double timeit(matrix (matrix::* test)(matrix const&) const, int size, int times, const char* impl, double base = 0) {
double time = timeit([=] {
srand(1);
for (int i = 0; i < times; ++i) {
matrix m1(size, size), m2(size, size);
m1.random(); m2.random();
(m1.*test)(m2);
}
});
printf("'%s' algorithm took %.2f ms to complete %d times %d x %d matrices multiplication", impl, time, times, size, size);
if (base && base != time) {
printf(", %.2f%% ", abs(base - time) / base * 100);
if (base > time) printf("faster");
else printf("slower");
}
puts("");
return time;
}


int main() {
verify(&matrix::mul_plain, &matrix::mul_cuda, "cuda");
double base1 = timeit(&matrix::mul_plain, 1024, 1, "plain");
double base2 = timeit(&matrix::mul_plain, 4096, 1, "plain");
timeit(&matrix::mul_cuda, 1024, 1, "cuda", base1);
timeit(&matrix::mul_cuda, 4096, 1, "cuda", base2);
}

输出:

Valid implementation: cuda
'plain' algorithm took 4126.10 ms to complete 1 times 1024 x 1024 matrices multiplication
'plain' algorithm took 400662.51 ms to complete 1 times 4096 x 4096 matrices multiplication
'cuda' algorithm took 119.77 ms to complete 1 times 1024 x 1024 matrices multiplication, 97.10% faster
'cuda' algorithm took 950.01 ms to complete 1 times 4096 x 4096 matrices multiplication, 99.76% faster

我专门准备了一个 4096 x 4096 的测试数据,普通方法矩阵边长变大四倍,时间变大了将近一百倍,而 CUDA 只变大了不到八倍。