高性能计算 第一次作业 运行方法 进入到code/build
文件夹下,输入如下命令。
在code/build
会生成9个可执行文件,名字形如test_x.exe
,x
从1到9,分别对应下面每一节的测例,如下所示。
test_1.exe
:通用矩阵乘法
test_2.exe
:strassen
test_3.exe
:内存重排
test_4.exe
:AVX指令集
test_5.exe
:多线程
test_6.exe
:编译器优化
test_7.exe
:大规模通用矩阵乘法
test_8.exe
:大规模稀疏矩阵乘法
test_9.exe
:大规模矩阵乘法比较
test_x.exe
的测试代码是test_x.cpp
。
对于MKL库,进入文件夹code/mkl
,然后输入如下命令。
. ~/intel/oneapi/setvars.sh make
即可运行使用MKL库的例子。
硬件数据 运行环境:VMware 16.x虚拟机
操作系统:Ubuntu 18.04
内存:8G
CPU:6个逻辑CPU
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 vendor_id : GenuineIntelcpu family : 6 model : 166 model name : Intel(R) Core(TM) i5 -10210 U CPU @ 1 .60 GHzstepping : 0 microcode : 0 xb4 cpu MHz : 2112 .000 cache size : 6144 KBphysical id : 0 siblings : 6 core id : 5 cpu cores : 6 apicid : 5 initial apicid : 5 fpu : yesfpu_exception : yescpuid level : 22 wp : yesflags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ss ht syscall nx pdpe1 gb rdtscp lm constant_tsc arch_perfmon nopl xtopology tsc_reliable nonstop_tsc cpuid pni pclmulqdq ssse3 fma cx16 pcid sse4 _1 sse4 _2 x2 apic movbe popcnt tsc_deadline_timer aes xsave avx f16 c rdrand hypervisor lahf_lm abm 3 dnowprefetch cpuid_fault invpcid_single ssbd ibrs ibpb stibp ibrs_enhanced fsgsbase tsc_adjust bmi1 avx2 smep bmi2 invpcid rdseed adx smap clflushopt xsaveopt xsavec xgetbv1 xsaves arat md_clear flush_l1 d arch_capabilitiesbugs : spectre_v1 spectre_v2 spec_store_bypass swapgs itlb_multihitbogomips : 4224 .00 clflush size : 64 cache_alignment : 64 address sizes : 45 bits physical, 48 bits virtualpower management:
通用矩阵乘法 对于一个规模为$M\times K$的矩阵$A$和一个规模为$K\times N$的矩阵$B$,通用矩阵乘法(GEMM)定义为 $$ C=AB $$ 其中,$A$和$B$的乘积$C$的每一项为 $$ C_{i,j}=\sum_{k=1}^{K}(A_{i,k}B_{k,j}) $$ 首先,为了方便地从代码中表示矩阵和矩阵之间的运算,定义一个矩阵类Matrix
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 class Matrix { private : float **mat; int num_row; int num_col; public : Matrix (); Matrix (int num_row, int num_col); Matrix (const Matrix &another_mat); Matrix &operator =(const Matrix &another_mat); ~Matrix (); Matrix dot (const Matrix &another_mat) ; float &operator () (int i, int j) ; float operator () (int i, int j) const ; Matrix operator +(const Matrix &another_mat); Matrix operator -(const Matrix &another_mat); static Matrix zeros (int num_row, int num_col) ; static Matrix randn (int num_row, int num_col) ; friend std::ostream &operator <<(std::ostream &out, const Matrix &mat); int get_num_row () const ; int get_num_col () const ; Matrix transpose () const ; float mean () const ; float variance () const ;private : static float **allocate_memory (int num_row, int num_col) ; static void free_memory (float **mat, int num_row, int num_col) ; };
矩阵类的定义中包含了矩阵的大部分操作,如下所示。
计算类型:矩阵乘法、加法、减法、转置、均值和方差。
初始化类型:constructor、destructor、copy constructor、赋值运算符重载、访问矩阵元素、生成零矩阵和生成服从$(0,1^2)$正态分布随机矩阵。
其他类型:获取矩阵的行、列数目,分配和释放内存空间。
通用矩阵乘法的实现如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Matrix Matrix::dot (const Matrix &another_mat) { if (num_col != another_mat.num_row) { return Matrix (); } int new_row = num_row; int new_col = another_mat.num_col; Matrix res = Matrix (new_row, new_col); for (int i = 0 ; i < new_row; ++i) { for (int j = 0 ; j < new_col; ++j) { res.mat[i][j] = 0 ; for (int k = 0 ; k < num_col; ++k) { res.mat[i][j] += mat[i][k] * another_mat.mat[k][j]; } } } return res; }
第3-6行,判断是否满足矩阵乘法的条件,如果不满足,则返回空矩阵。
第12-22行,按照通用矩阵乘法公式实现。
实现了矩阵乘法后,可以做一个简单的测试检验实现的正确性。首先,生成两个随机矩阵a
和b
。a
的大小是$5\times4$,b
的大小是$4\times 3$。
Matrix a = Matrix::randn (5 , 4 ); Matrix b = Matrix::randn (4 , 3 ); std::cout << "a = " << a << std::endl; std::cout << "b = " << b << std::endl;
计算通用矩阵乘法,得到乘积c
。
Matrix c = a.dot (b); std::cout << "c = " << c << std::endl;
运行结果如下。
可以看到计算的结果是正确的,完整代码保存在code/test/test_1.cpp
下。
通用矩阵乘法优化
显然,优化的前提是保证正确性。而下面的优化方法是经过了正确性验证的,但限于篇幅,不再给出验证方法和结果。
计算时间的方法 优化的结果可以通过优化前后的矩阵乘法的运行时间长短来衡量。运行时间包括了两个主要方面,一个是算法的计算时间,另外一个是分配和释放存储中间计算结果的内存所消耗的时间。
由于C函数库的时间计算只能精确到秒级别,并不能满足本次实验的需要。所以,需要借助于Linux的函数库中的gettimeofday
函数来得到微秒级别的时间,方法如下。
#include <sys/time.h> double Utils::current_time_us () { struct timeval tv ; gettimeofday (&tv, NULL ); return tv.tv_sec + tv.tv_usec * 1e-6 ; }
第6行,利用函数gettimeofday
得到微秒级别的时间,获取的时间保存在struct timeval
类型的变量tv
中。
第7行,struct timeval
包含两个域,tv_sec
和tv_usec
。tv_sec
存储了当前时间的秒级别的部分,tv_usec
存储了当前时间的微秒部分。因此,当前时间的计算公式为 $$ time=tv_sec+tv_usec\times10^{-6} $$ 计算出当前时间后,只需要在计算矩阵乘法前后分别调用Utils::current_time_us
,得到的结果之差即为计算时间。
基于算法分析的方法
基于算法分析的方法只实现了strassen算法。
假设矩阵$A$和$B$都是$N\times N$的方阵,且$N$是$2$的$n$次幂。即存在整数$n$,使得 $$ N=2^n $$ 对于矩阵乘法 $$ C=AB $$ 首先将$A,B,C$分成相等的4部分。 $$ A=\begin{bmatrix} A_{1,1}&A_{1,2}\ A_{2,1}&A_{2,2}\ \end{bmatrix},\ B=\begin{bmatrix} B_{1,1}&B_{1,2}\ B_{2,1}&B_{2,2}\ \end{bmatrix},\ C=\begin{bmatrix} C_{1,1}&C_{1,2}\ C_{2,1}&C_{2,2}\ \end{bmatrix} $$ 利用分块矩阵乘法可以得到 $$ \begin{align} C_{1,1}=A_{1,1}B_{1,1}+A_{1,2}B_{2,1}\ C_{1,2}=A_{1,1}B_{1,2}+A_{1,2}B_{2,2}\ C_{2,1}=A_{2,1}B_{1,1}+A_{2,2}B_{2,1}\ C_{2,2}=A_{2,1}B_{1,2}+A_{2,2}B_{2,2}\ \end{align} $$ 引入中间变量 $$ \begin{align} &M_1=(A_{1,1}+A_{2,2})(B_{1,1}+B_{2,2})\ &M_2=(A_{2,1}+A_{2,2})B_{1,1}\ &M_3=A_{1,1}(B_{1,2}-B_{2,2})\ &M_4=A_{2,2}(B_{2,1}-B_{1,1})\ &M_5=(A_{1,1}+A_{1,2})B_{2,2}\ &M_6=(A_{2,1}-A_{1,1})(B_{1,1}+B_{1,2})\ &M_7=(A_{1,2}-A_{2,2})(B_{2,1}+B_{2,2}) \end{align} $$ 可以将上述乘法化简为 $$ \begin{align} &C_{1,1}=M_1+M_4-M_5+M_7\ &C_{1,2}=M_3+M_5\ &C_{2,1}=M_2+M_4\ &C_{2,2}=M_1-M_2+M_3+M_6 \end{align} $$ 代码实现如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 Matrix Matrix::strassen_dot (const Matrix &another_mat) { if (num_row != num_col || another_mat.num_row != another_mat.num_col || num_row != another_mat.num_row) { return Matrix (); } int size = 0 ; for (int i = 0 ; i < 11 ; ++i) { if (pow2[i] >= num_row) { size = pow2[i]; break ; } } if (!size) { return Matrix (); } Matrix a = Matrix::zeros (size, size).set_block (*this , 0 , 0 ); Matrix b = Matrix::zeros (size, size).set_block (another_mat, 0 , 0 ); int half_size = size / 2 ; Matrix a11 = a.get_block (0 , half_size - 1 , 0 , half_size - 1 ); Matrix a12 = a.get_block (0 , half_size - 1 , half_size, size - 1 ); Matrix a21 = a.get_block (half_size, size - 1 , 0 , half_size - 1 ); Matrix a22 = a.get_block (half_size, size - 1 , half_size, size - 1 ); Matrix b11 = b.get_block (0 , half_size - 1 , 0 , half_size - 1 ); Matrix b12 = b.get_block (0 , half_size - 1 , half_size, size - 1 ); Matrix b21 = b.get_block (half_size, size - 1 , 0 , half_size - 1 ); Matrix b22 = b.get_block (half_size, size - 1 , half_size, size - 1 ); Matrix m1 = (a11 + a22).dot (b11 + b22); Matrix m2 = (a21 + a22).dot (b11); Matrix m3 = a11.dot (b12 - b22); Matrix m4 = a22.dot (b21 - b11); Matrix m5 = (a11 + a12).dot (b22); Matrix m6 = (a21 - a11).dot (b11 + b12); Matrix m7 = (a12 - a22).dot (b21 + b22); Matrix c = Matrix::zeros (size, size); c.set_block (m1 + m4 - m5 + m7, 0 , 0 ); c.set_block (m3 + m5, 0 , half_size); c.set_block (m2 + m4, half_size, 0 ); c.set_block (m1 - m2 + m3 + m6, half_size, half_size); return c.get_block (0 , num_row - 1 , 0 , num_col - 1 ); }
第3-8行,检查是否满足矩阵乘法和方阵的条件,如果不满足,则返回空矩阵。
第10-26行,将矩阵补齐至$2$的整数幂的形式,多余部分补零。
第18-37行,对矩阵a
和b
分块。
第39-51行,实现strassen算法的计算公式。
在代码中,对矩阵中的块进行处理的函数如下。
Matrix get_block (int row_start, int row_end, int col_start, int col_end) ;Matrix &set_block (const Matrix &block, int row_start, int col_start) ;
实现了strassen算法后,测试strassen算法的运算时间。测试的用例分别是$128$$, $$256, $$512,$$1024,$$2048$的方阵。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void strassen_test (int N) { Matrix a = Matrix::randn (N, N); Matrix b = Matrix::randn (N, N); double time_start = Utils::current_time_us (); Matrix c = a.strassen_dot (b); double time_end = Utils::current_time_us (); cout << "[" << a.get_num_row () << ", " << a.get_num_col () << "]" << " x [" << b.get_num_row () << ", " << b.get_num_col () << "]" << " => [" << c.get_num_row () << ", " << c.get_num_col () << "]" << ", cost: " << time_end - time_start << "s." << endl; }int main () { strassen_test (128 ); strassen_test (256 ); strassen_test (512 ); strassen_test (1024 ); strassen_test (2048 ); return 0 ; }
结果如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$2ms$,$23ms$,$187ms$,$1220ms$,$10448ms$。
完整代码在code/test/test_2.cpp
中。
基于软件优化的方法 内存重排 对于通用矩阵乘法公式 $$ C_{i,j}=\sum_{k=1}^K(A_{i,k}B_{k,j}) $$ 在实际计算的过程当中,需要使用三重循环来分别枚举$i,j,k$,且$i,j,k$的枚举顺序并不会影响最终结果的计算。前面的通用矩阵乘法的实现方法是先枚举$i$,然后枚举$j$,最后枚举$k$。这样的实现方法是最为直观的,也是直接反映了矩阵乘法的计算公式。
但是,对于一个使用了行优先存储的CPU来说,之前的实现方法并不是效率最高的。因为对于$i,j,k$的枚举顺序,对每一个$i,j$来说,都需要枚举全部的$k$。此时,对于矩阵$B$的访问是按照下面的序列的顺序访问的。 $$ B_{1,j},\ B_{2,j},\ \cdots B_{K,j} $$ 显然,对矩阵$B$的访问是列优先的,在访问过程中会造成较多的cache失效的情况。为了提高内存访问的效率,对矩阵$B$的访问也需要遵循行优先的方式。注意到对矩阵$A,B,C$的访问方式分别是$(i,j),(i,k),(k,j)$,为了体现行优先,只需要$i$在$j$之前枚举,$i$在$k$之前枚举,$k$在$j$之前枚举即可,即遵循$i,k,j$的枚举方式。
实现代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Matrix Matrix::ikj_dot (const Matrix &another_mat) { if (num_col != another_mat.num_row) { return Matrix (); } int new_row = num_row; int new_col = another_mat.num_col; Matrix res = Matrix::zeros (new_row, new_col); for (int i = 0 ; i < new_row; ++i) { for (int k = 0 ; k < num_col; ++k) { for (int j = 0 ; j < new_col; ++j) { res.mat[i][j] += mat[i][k] * another_mat.mat[k][j]; } } } return res; }
和Matrix::dot
相比,这里只是简单地调整了第14行和第16行的顺序,其余均保持不变。
使用$128$$, $$256, $$512,$$1024,$$2048$的方阵进行测试,结果如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$1ms$,$3ms$,$27ms$,$194ms$,$1817ms$。
完整代码在code/test/test_3.cpp
中。
AVX指令集 SIMD(Single Instruction Multiple Data,单指令多数据流),是一种实现空间上的并行性的技术。这种技术使用一个控制器控制多个处理单元,同时对一组数据中的每一个数据执行相同的操作。在 SIMD 指令执行期间,任意时刻都只有一个进程在运行,即 SIMD 没有并发性,仅仅只是同时进行计算。
在 Intel 的 x86 微架构处理器中,SIMD 指令集有 MMX、SSE、SSE2、SSE3、SSSE3、SSE4.1、SSE4.2、AVX、AVX2、AVX512。
AVX 是 SSE 架构的延伸,将 SSE 的 XMM 128bit 寄存器升级成了 YMM 256bit 寄存器,同时浮点运算命令扩展至 256 位,运算效率提升了一倍。另外,AVX 还添加了三操作数指令,以减少在编码时先复制再运算的动作。
AVX2 将大多数整数运算命令扩展至 256 位,同时支持 FMA(Fused Multiply-Accumulate,融合乘法累加)运算,可以在提高运算效率的同时减少运算时的精度损失。
AVX512 将 AVX 指令进一步扩展至 512 位。
本次实验使用的是AV2指令集。
由于AVX指令集进行的是向量运算,需要将原本的矩阵乘法向量化。为了简述矩阵乘法向量化的思想,这里首先使用$4\times 4$的方阵来解释。
对于两个$4\times 4$的方阵$A,B$的矩阵乘法 $$ \begin{bmatrix} a_{1,1}&a_{1,2}\ a_{2,1}&a_{2,2} \end{bmatrix}\cdot \begin{bmatrix} b_{1,1}&b_{1,2}\ b_{2,1}&b_{2,2} \end{bmatrix}= \begin{bmatrix} c_{1,1}&c_{1,2}\ c_{2,1}&c_{2,2} \end{bmatrix} $$ 对于矩阵$C$的第一行,计算过程为 $$ \begin{align} c_{1,1}=a_{1,1}b_{1,1}+a_{1,2}b_{2,1}\ c_{1,2}=a_{1,1}b_{1,2}+a_{1,2}b_{2,2} \end{align} $$ 可向量化为 $$ \begin{bmatrix} c_{1,1}\c_{1,2} \end{bmatrix}= a_{1,1}\cdot \begin{bmatrix} b_{1,1}\b_{1,2} \end{bmatrix}+a_{1,2}\cdot\begin{bmatrix} b_{2,1}\c_{2,2} \end{bmatrix} $$ 同理 $$ \begin{bmatrix} c_{2,1}\c_{2,2} \end{bmatrix}= a_{2,1}\cdot \begin{bmatrix} b_{1,1}\b_{1,2} \end{bmatrix}+a_{2,2}\cdot\begin{bmatrix} b_{2,1}\b_{2,2} \end{bmatrix} $$ 上面的情况不难推广到一般情况。
假设一个AVX向量只能够存储$T$个矩阵的元素,对于矩阵乘法 $$ C=AB $$ 矩阵$C$的第$i$行第$j$列开始的、按行遍历的$T$个元素可以按如下公式计算。 $$ \begin{bmatrix} c_{i,j}\c_{i,j+1}\\vdots\c_{i,j+T-1} \end{bmatrix}= \sum_{k=1}^{K}a_{i,k}\begin{bmatrix} b_{k,j}\c_{k,j+1}\\vdots\c_{k,j+T-1} \end{bmatrix} $$ 实现代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <immintrin.h> Matrix Matrix::avx_dot (const Matrix &another_mat) { if (num_col != another_mat.num_row) { return Matrix (); } int align_col = another_mat.num_col / 8 * 8 ; if (align_col < another_mat.num_col) { align_col += 8 ; } Matrix m = Matrix::zeros (another_mat.num_row, align_col); m.set_block (another_mat, 0 , 0 ); Matrix res = Matrix::zeros (num_row, m.num_col); __m256 a, b, c; for (int i = 0 ; i < num_row; ++i) { for (int k = 0 ; k < num_col; ++k) { a = _mm256_set1_ps(mat[i][k]); for (int j = 0 ; j < m.num_col; j += 8 ) { b = _mm256_loadu_ps(&m.mat[k][j]); c = _mm256_loadu_ps(&res.mat[i][j]); c = _mm256_fmadd_ps(a, b, c); _mm256_storeu_ps(&res.mat[i][j], c); } } } return res.get_block (0 , num_row - 1 , 0 , another_mat.num_col - 1 ); }
第5-8行,判断是否满足矩阵乘法的条件,如果不满足,则返回空矩阵。
第16-18行,由于这里使用了256位的AVX向量,矩阵的每个元素是float
格式,32位。每个AVX向量只能够存下8个矩阵的元素。对于矩阵乘法 $$ C=AB $$ 为了计算方便,需要将矩阵$B$的行对齐到8的倍数,多余部分补0即可。
第22-35行,AVX矩阵乘法的实现。这里使用了$i,k,j$的遍历方式,首先需要将矩阵的元素加载到AVX向量,然后计算,最后写回到结果矩阵res
即可。
代码使用了AVX的四个函数。
extern __m256 _mm256_set1_ps(float );extern __m256 _mm256_loadu_ps(float const *a);extern __m256 _mm256_fmadd_ps(__m256 a, __m256 b, __m256 c);extern void _mm256_storeu_ps(float *a, __m256 b);
第37行,由于之前进行了内存对齐,在返回结果之前需要将对齐部分去除。
最后的编译参数需要加上-mfma
和-mavx
。
使用$128$$, $$256, $$512,$$1024,$$2048$的方阵进行测试,结果如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$0.5ms$,$6.1ms$,$58.0ms$,$249.2ms$,$2511.9ms$。
完整代码在code/test/test_4.cpp
中。
多线程 注意到矩阵乘法的计算过程中 $$ C_{i,j}=\sum_{k=1}^K(A_{i,k}B_{k,j}) $$ 只有对矩阵$C$的计算才涉及写操作,对矩阵$A$和$B$的运算都是读操作,并且矩阵$C$的各个元素的计算是相互独立的。因此,矩阵$C$的计算不会产生race condition的现象,可以容易地进行并行化处理。
可以使用OpenMP来进行并行化处理,实现代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #include <omp.h> Matrix Matrix::multithread_dot (const Matrix &another_mat) { if (num_col != another_mat.num_row) { return Matrix (); } int new_row = num_row; int new_col = another_mat.num_col; Matrix res = Matrix::zeros (new_row, new_col);#pragma omp parallel for for (int i = 0 ; i < new_row; ++i) { for (int k = 0 ; k < num_col; ++k) { for (int j = 0 ; j < new_col; ++j) { res.mat[i][j] += mat[i][k] * another_mat.mat[k][j]; } } } return res; }
实现代码只是在原来的内存重排的优化方法的基础上加上了一条语句即可。
最后的编译参数需要加上-fopenmp
。
使用$128$$, $$256, $$512,$$1024,$$2048$的方阵进行测试,结果如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$0.4ms$,$3.1ms$,$20.2ms$,$99.1ms$,$424.5ms$。
完整代码在code/test/test_5.cpp
中。
编译器优化 g++
编译选项中有4个编译优化选项。
-O0
:关闭所有优化选项。
-O1
:最基本的优化等级。
-O2
:推荐的优化等级,除非你有特殊的需求。-O2会比-O1启用多一些标记。设置了-O2后,编译器会试图提高代码性能而不会增大体积和大量占用的编译时间。
-O3
:最高的优化等级。但用-O3来编译所有的软件包将产生更大体积更耗内存的二进制文件,大大增加编译失败的机会或不可预知的程序行为。
分别使用上面4个优化选项测试在通用矩阵乘法上的性能
-O0
的性能如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$10ms$,$127ms$,$1056ms$,$77323ms$,$131530ms$。
-O1
的性能如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$8ms$,$70ms$,$592ms$,$4211ms$,$100477ms$。
-O2
的性能如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$2ms$,$28ms$,$271ms$,$2160ms$,$100289ms$。
-O3
的性能如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$3ms$,$22ms$,$314s$,$2109ms$,$100259ms$。
综合方法 可以将多线程、内存重排、编译器优化综合起来优化,即在strassen算法中的矩阵乘法的实现使用了多线程+内存重排的方式,最后的代码编译使用-O3
优化。
实现代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 Matrix Matrix::s_mt_ikj_dot (const Matrix &another_mat) { if (num_row != num_col || another_mat.num_row != another_mat.num_col || num_row != another_mat.num_row) { return Matrix (); } int size = 0 ; for (int i = 0 ; i < 11 ; ++i) { if (pow2[i] >= num_row) { size = pow2[i]; break ; } } if (!size) { return Matrix (); } Matrix a = Matrix::zeros (size, size).set_block (*this , 0 , 0 ); Matrix b = Matrix::zeros (size, size).set_block (another_mat, 0 , 0 ); int half_size = size / 2 ; Matrix a11 = a.get_block (0 , half_size - 1 , 0 , half_size - 1 ); Matrix a12 = a.get_block (0 , half_size - 1 , half_size, size - 1 ); Matrix a21 = a.get_block (half_size, size - 1 , 0 , half_size - 1 ); Matrix a22 = a.get_block (half_size, size - 1 , half_size, size - 1 ); Matrix b11 = b.get_block (0 , half_size - 1 , 0 , half_size - 1 ); Matrix b12 = b.get_block (0 , half_size - 1 , half_size, size - 1 ); Matrix b21 = b.get_block (half_size, size - 1 , 0 , half_size - 1 ); Matrix b22 = b.get_block (half_size, size - 1 , half_size, size - 1 ); Matrix m1 = (a11 + a22).multithread_dot (b11 + b22); Matrix m2 = (a21 + a22).multithread_dot (b11); Matrix m3 = a11.multithread_dot (b12 - b22); Matrix m4 = a22.multithread_dot (b21 - b11); Matrix m5 = (a11 + a12).multithread_dot (b22); Matrix m6 = (a21 - a11).multithread_dot (b11 + b12); Matrix m7 = (a12 - a22).multithread_dot (b21 + b22); Matrix c = Matrix::zeros (size, size); c.set_block (m1 + m4 - m5 + m7, 0 , 0 ); c.set_block (m3 + m5, 0 , half_size); c.set_block (m2 + m4, half_size, 0 ); c.set_block (m1 - m2 + m3 + m6, half_size, half_size); return c.get_block (0 , num_row - 1 , 0 , num_col - 1 ); }
使用$128$$, $$256, $$512,$$1024,$$2048$的方阵进行测试,结果如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$1ms$,$25ms$,$46ms$,$100ms$,$422ms$。
完整代码在code/test/test_6.cpp
中。
MKL 安装了Intel的oneAPI Base Toolkit后,使用oneapi-cli
命令创建一个MKL矩阵乘法的例子,无需自己编写。
使用$128$$, $$256, $$512,$$1024,$$2048$的方阵进行测试,结果如下。
从结果中可以看到,5种不同大小的方阵的运行时间分别是$0.09ms$,$0.07ms$,$0.11ms$,$0.10ms$,$0.09ms$。
MKL的代码保存在code/mkl
下。
综合比较 综合比较结果如下。
可以看到MKL库的加速比远超于其他的所有优化方法,达到了$10^{5}$和$10^{6}$的数量级,原因可能是MKL库可能利用了包括上面的所有优化方法,且利用了许多CPU特性和矩阵本身的特征。
去掉MKL项后的比较结果如下。
从结果中可以看到,每一种加速方法均有明显的加速效果。
从平均加速效果来看,利用多线程加速的效果最好,内存重排的效果次之,AVX的效果稍逊,而利用strassen算法的效果稍差,O3优化的效果最差。可能的原因是AVX和strassen算法中都涉及到许多的内存复制操作,消耗了部分时间。内存重排并不会涉及到内存复制操作,因此加速效果比前者显著。多线程本身使用了内存重排又利用了CPU的6个核心,因此效果最好。代码默认使用O3优化,因此前面的优化效果是叠加了O3优化的效果的,相比之下O3优化的效果最差。
对于综合方法,strassen算法涉及的内存复制操作削减了综合方法内部多线程的加速效果,因此当矩阵规模较小时,加速效果接近于单纯的strassen算法。但当矩阵规模增大时,矩阵的计算成为性能的主要影响因素后,strassen算法的作用便可以发挥出来,使得综合方法的加速效果优于单纯的多线程加速。
大规模矩阵计算优化 在给出大规模矩阵算法与实现之前,需要作出如下假设。
矩阵很大,内存中不仅无法保存任何一个矩阵,还无法保存矩阵的任意一行或任意一列。
外存很大,外存可以保存任意多个矩阵。
矩阵的行和列数分别使用一个4字节的整数表示。
矩阵的每个元素使用一个4字节的单精度浮点数表示。
矩阵保存在二进制文件中,保存格式为先存储行数和列数,然后按行优先的方式存储矩阵的元素,即。
大规模通用矩阵乘法 基于假设1,假设内存中只可以存储下两个行向量和少量的变量。每个行向量可以保存ROW_MAX
个矩阵的元素。基于假设5,采用行优先的方式读取矩阵可以加快矩阵的读取速度,并且可以一次读入ROW_MAX
。此时不难想到,大规模通用矩阵乘法应该类似于AVX的向量化计算方法。在AVX的计算方法中,每次计算前需要将行向量从内存中载入AVX向量。类似地,只需要在每次计算前将行向量从外存中载入内存中,然后使用和AVX计算相同的计算过程即可实现大规模通用矩阵乘法。
实现代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 void Matrix::huge_matrix_dot (std::string path_a, std::string path_b, std::string path_c) { std::ifstream a_in (path_a.c_str(), std::ios::in | std::ios::binary) ; std::ifstream b_in (path_b.c_str(), std::ios::in | std::ios::binary) ; std::fstream c_out (path_c.c_str(), std::ios::out) ; if (!(a_in.is_open () && b_in.is_open () && c_out.is_open ())) return ; c_out.close (); c_out.open (path_c.c_str (), std::ios::out | std::ios::binary | std::ios::in); if (!c_out.is_open ()) { a_in.close (); b_in.close (); return ; } int a_num_row, a_num_col, b_num_row, b_num_col, c_num_row, c_num_col; a_in.read ((char *)&a_num_row, sizeof (int )); a_in.read ((char *)&a_num_col, sizeof (int )); b_in.read ((char *)&b_num_row, sizeof (int )); b_in.read ((char *)&b_num_col, sizeof (int )); if (a_num_col != b_num_row) { a_in.close (); b_in.close (); c_out.close (); return ; } c_num_row = a_num_row; c_num_col = b_num_col; c_out.write ((char *)&c_num_row, sizeof (int )); c_out.write ((char *)&c_num_col, sizeof (int )); float tmp = 0 ; for (int i = 0 ; i < c_num_row; ++i) { for (int j = 0 ; j < c_num_col; ++j) { c_out.write ((char *)&tmp, sizeof (float )); } } float a = 0.0 ; float *b = new float [ROW_MAX]; float *c = new float [ROW_MAX]; if (b == nullptr || c == nullptr ) { exit (-1 ); } int num_element = 0 ; int b_start_byte = 0 ; int c_start_byte = 0 ; int data_start_byte = sizeof (int ) + sizeof (int ); for (int i = 0 ; i < c_num_row; ++i) { for (int k = 0 ; k < a_num_col; ++k) { a_in.read ((char *)&a, sizeof (float )); for (int j = 0 ; j < c_num_col; j += ROW_MAX) { b_start_byte = data_start_byte + (k * b_num_col + j) * sizeof (float ); c_start_byte = data_start_byte + (i * c_num_col + j) * sizeof (float ); num_element = (j + ROW_MAX < c_num_col) ? ROW_MAX : c_num_col - j; get_huge_matrix_line (b_in, b_start_byte, num_element, b); get_huge_matrix_line (c_out, c_start_byte, num_element, c); for (int element = 0 ; element < num_element; ++element) { c[element] += a * b[element]; } set_huge_matrix_line (c_out, c_start_byte, num_element, c); } } } a_in.close (); b_in.close (); c_out.close (); }
假设矩阵乘法是 $$ C=AB $$ 矩阵$A,B,C$的保存路径分别是path_a
,path_b
和path_c
。
第3-18行,打开文件进行读写。
第20-33行,读入矩阵$A,B$的行数和列数,判断是否满足矩阵乘法的条件。
第35-39行,计算矩阵$C$的行数和列数并写入结果文件。
第43-49行,初始化矩阵$C$的结果文件为零,写入的数量和矩阵$C$的大小相同,为后面矩阵计算时随机访问矩阵$C$的结果文件保留空间。
第51-53行,向内存申请两个大小为ROW_MAX
的向量。基于前面的假设,内存中只能存储下两个固定大小的行向量和若干变量。此后,不会向内存中申请任何内存。
第65-88行,执行向量化计算,计算公式如下。 $$ \begin{bmatrix} c_{i,j}\c_{i,j+1}\\vdots\c_{i,j+T-1} \end{bmatrix}= \sum_{k=1}^{K}a_{i,k}\begin{bmatrix} b_{k,j}\c_{k,j+1}\\vdots\c_{k,j+T-1} \end{bmatrix} $$ 第69行,读入矩阵$A$的元素$a_{i,k}$。
第73-74行,计算矩阵$B,C$的读取位置,读取行向量的函数如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void get_huge_matrix_line (std::ifstream &in, int start_byte, int num_element, float *v) { in.seekg (start_byte, std::ios::beg); for (int i = 0 ; i < num_element; ++i) { in.read ((char *)&v[i], sizeof (float )); } }void get_huge_matrix_line (std::fstream &in, int start_byte, int num_element, float *v) { in.seekg (start_byte, std::ios::beg); for (int i = 0 ; i < num_element; ++i) { in.read ((char *)&v[i], sizeof (float )); } }
第75行,由于矩阵$B$的列数并不一定是8的倍数,需要做边缘处理。
第77-78行,读入$B,C$的行向量。
第80-83行,执行向量化计算。
第85行,将计算结果写回矩阵$C$的对应部分,写入行向量的函数如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void set_huge_matrix_line (std::ofstream &out, int start_byte, int num_element, const float *v) { out.seekp (start_byte, std::ios::beg); for (int i = 0 ; i < num_element; ++i) { out.write ((char *)&v[i], sizeof (float )); } }void set_huge_matrix_line (std::fstream &out, int start_byte, int num_element, const float *v) { out.seekp (start_byte, std::ios::beg); for (int i = 0 ; i < num_element; ++i) { out.write ((char *)&v[i], sizeof (float )); } }
为了测试上述大规模矩阵计算方法的正确性,需要将ROW_MAX
设置为一个较小的值。
同时,矩阵乘法可以通过之前的一般方法计算。通过比较一般方法和大规模计算方法的结果即可验证结果的正确性。
完整代码保存在code/src/huge_matrix.cpp
中。
分别使用三个测例进行测试,下面的矩阵$A,B$都是随机矩阵。
$(4,4)$的矩阵$A$和$(4,4)$的矩阵$B$。
$(12,4)$的矩阵$A$和$(4,40)$的矩阵$B$。
$(10,100)$的矩阵$A$和$(100,50)$的矩阵$B$。
然后比较大规模矩阵乘法的结果和一般方法的结果,结果如下。
可以看到三个测例中,大规模矩阵乘法的结果和一般方法的结果差异为0,均通过正确性验证。由于算法并不依赖于具体的ROW_MAX
,在实际过程中将ROW_MAX
调大后,即可推广到一般情况。
测试代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void huge_matrix_test (int m, int n, int k) { Utils::generate_matrix_to_file (m, n, "a.bin" ); Utils::generate_matrix_to_file (n, k, "b.bin" ); Matrix::huge_matrix_dot ("a.bin" , "b.bin" , "c.bin" ); Matrix result = Utils::read_matrix_from_file ("c.bin" ); Matrix a = Utils::read_matrix_from_file ("a.bin" ); Matrix b = Utils::read_matrix_from_file ("b.bin" ); Matrix real = a.ikj_dot (b); cout << "[" << a.get_num_row () << ", " << a.get_num_col () << "]" << " x [" << b.get_num_row () << ", " << b.get_num_col () << "]" << " => [" << result.get_num_row () << ", " << result.get_num_col () << "]" << ", diff: " << (real - result).variance () << endl; }
完整代码保存在code/test/test_7.cpp
大规模稀疏矩阵乘法 在给出大规模稀疏矩阵乘法之前,需要做出如下假设。
假设矩阵足够稀疏,内存中可以放下矩阵所有的非零项。
在满足上述假设的前提下,基于MapReduce编程模型给出大规模稀疏矩阵乘法的思路。假设矩阵$A,B$的规模分别为$M\times N,N\times K$,则矩阵乘法如下 $$ C=AB $$
稀疏化处理:master从文件中按若干个元素批量读取的方式读入矩阵$A,B$,对于矩阵中的非零项,按$(i,j,type,val)$的方式保存在内存中。零项则不做处理。其中,$(i,j)$是矩阵的行和列,$type$是矩阵的类型,$type=0$表示该项是矩阵$A$的元素,$type=1$表示该项是矩阵$B$的元素,$val$是元素的值。此时,矩阵$A,B$可以用$(i,j,type,val)$序列来表示。由于矩阵足够稀疏,序列的规模应该远远小于矩阵的规模。
Map:master将前面稀疏化处理后得到的$(i,j,type,val)$序列发到mapper中,mapper会返回一个Key-Value的数据结构。其中,Key用二元组$(i,j)$表示,代表$C$中的第$i$行第$j$列。Value是$(i,j,type,val)$序列,表示参与到计算$C_{i,j}$过程中所有的矩阵$A,B$的非零元素。对于$(i,j,type,val)$序列中的每一项,需要根据$type$的值分两种情况处理
$type = 0$: 表示该项属于矩阵$A$。由于矩阵$A$的$(i,j)$项会参与到$C_{i,0},C_{i,1},\cdots C_{i,K-1}$的计算过程当中,因此需要将该项放入到Key分别为$(i,0),(i,1),\cdots,(i,K-1)$的Value中。
$type = 1$: 表示该项属于矩阵$B$。由于矩阵$B$的$(i,j)$项会参与到$C_{0,j},C_{1,j},\cdots C_{M-1,j}$的计算过程当中,因此需要将该项放入到Key分别为$(0,j),(1,j),\cdots,(M-1,j)$的Value中。
遍历完$(i,j,type,val)$序列即可得到最终的Key-Value集合。
Reduce:master将每一个Key对应的Value分别发到Reducer中。假设Key=$(i,j)$,根据Map的规则,Key对应的Value保存了用于计算$C_{i,j}$的元素。此时,将Value按照每一项的$type$的值分成两个序列$S_A,S_B$。其中,$S_A$保存了$type=0$的项,$S_B$保存了$type=1$的项。然后,按照规则对$S_A$和$S_B$进行排序。排序规则为:对于保存在$S_A,S_B$中的项,如果$(i_1,j_1,type,val_1)$排列在$(i_2,j_2,type,val_2)$之前,则有 $$ i_1<i_2\ or\ i_1=i_2,\ j_1<j_2 $$ 排序完成后,对$k$进行从$0$到$N-1$进行遍历。对每一个$k$,若存在$i,j$,使得 $$ \begin{align} (i,k,0,val_i)\in S_A\ (k,j,1,val_j)\in S_B \end{align} $$ 则把$val_i\times val_j$加到最终的结果中。
后处理:master收集每一个Key=$(i,j)$的Reduce结果,将结果写入$C_{i,j}$,若Key不存在于Map的结果中,则写入0。
代码实现如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 void Matrix::sparse_huge_matrix_dot (std::string path_a, std::string path_b, std::string path_c) { std::string path_a_sparse = path_a + ".sparse" ; std::string path_b_sparse = path_b + ".sparse" ; bool flag = false ; flag = get_sparse_representation (path_a, path_a_sparse); if (!flag) return ; flag = get_sparse_representation (path_b, path_b_sparse); if (!flag) return ; std::ifstream a_in (path_a_sparse.c_str(), std::ios::binary | std::ios::in) ; std::ifstream b_in (path_b_sparse.c_str(), std::ios::binary | std::ios::in) ; if (!(a_in.is_open () && b_in.is_open ())) return ; int a_num_row, a_num_col, b_num_row, b_num_col; a_in.read ((char *)&a_num_row, sizeof (int )); a_in.read ((char *)&a_num_col, sizeof (int )); b_in.read ((char *)&b_num_row, sizeof (int )); b_in.read ((char *)&b_num_col, sizeof (int )); a_in.close (); b_in.close (); if (a_num_col != b_num_row) { return ; } int c_num_row = a_num_row; int c_num_col = b_num_col; std::map<std::pair<int , int >, std::vector<MapReduceType>> maps = sparse_matrix_mapper (path_a_sparse, path_b_sparse); std::vector<SparseItem> result; SparseItem item; for (auto &iter : maps) { item.row = iter.first.first; item.col = iter.first.second; item.val = sparse_matrix_reducer (a_num_col, iter.second); result.push_back (item); } std::ofstream c_out (path_c.c_str(), std::ios::trunc | std::ios::binary) ; if (!c_out.is_open ()) { return ; } c_out.write ((char *)&c_num_row, sizeof (int )); c_out.write ((char *)&c_num_col, sizeof (int )); size_t k = 0 ; float zero_float = 0.0f ; for (int i = 0 ; i < c_num_row; ++i) { for (int j = 0 ; j < c_num_col; ++j) { if (k < result.size () && i == result[k].row && j == result[k].col) { c_out.write ((char *)&result[k].val, sizeof (float )); ++k; } else { c_out.write ((char *)&zero_float, sizeof (float )); } } } c_out.close (); }
假设矩阵$A, B, C$分别存储在path_a
,path_b
,path_c
中。
第3-15行,分别对矩阵$A,B$做稀疏化处理,结果分别保存在中间文件中。
第16-41行,读入稀疏化处理结果,发送到mapper中,进行Map操作,得到Key-Value数据。
第46-52行,根据Map返回的结果,对每一个Key,将Value发送到reducer中,进行Reduce操作。
第54-86行,根据Reduce结果,进行后处理。
进行稀疏化处理的函数如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 bool get_sparse_representation (std::string in_path, std::string out_path) { std::ifstream in (in_path.c_str(), std::ios::in | std::ios::binary) ; std::ofstream out (out_path.c_str(), std::ios::trunc | std::ios::binary) ; if (!(in.is_open () && out.is_open ())) { return false ; } int num_row, num_col; in.read ((char *)&num_row, sizeof (int )); in.read ((char *)&num_col, sizeof (int )); out.write ((char *)&num_row, sizeof (int )); out.write ((char *)&num_col, sizeof (int )); float tmp = 0.0 ; SparseItem item; for (int i = 0 ; i < num_row; ++i) { for (int j = 0 ; j < num_col; ++j) { in.read ((char *)&tmp, sizeof (float )); if (tmp != 0.0f ) { item.row = i; item.col = j; item.val = tmp; out.write ((char *)&item, sizeof (SparseItem)); } } } in.close (); out.close (); return true ; }
进行Map操作的函数如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 #include <map> #include <vector> struct MapReduceType { int row; int col; float value; int type; }; std::map<std::pair<int , int >, std::vector<MapReduceType>>sparse_matrix_mapper (std::string path_a_sparse, std::string path_b_sparse) { std::ifstream a_in (path_a_sparse.c_str(), std::ios::binary | std::ios::in) ; std::ifstream b_in (path_b_sparse.c_str(), std::ios::binary | std::ios::in) ; if (!(a_in.is_open () && b_in.is_open ())) return std::map<std::pair<int , int >, std::vector<MapReduceType>>(); SparseItem item; MapReduceType mr_item; std::pair<int , int > key; std::map<std::pair<int , int >, std::vector<MapReduceType>> res; int a_num_row, a_num_col, b_num_row, b_num_col; a_in.read ((char *)&a_num_row, sizeof (int )); a_in.read ((char *)&a_num_col, sizeof (int )); b_in.read ((char *)&b_num_row, sizeof (int )); b_in.read ((char *)&b_num_col, sizeof (int )); a_in.read ((char *)&item, sizeof (SparseItem)); while (!a_in.eof ()) { mr_item.row = item.row; mr_item.col = item.col; mr_item.type = 0 ; mr_item.value = item.val; key.first = item.row; for (int i = 0 ; i < b_num_col; ++i) { key.second = i; if (res.find (key) == res.end ()) { res[key] = std::vector<MapReduceType>(); } res[key].push_back (mr_item); } a_in.read ((char *)&item, sizeof (SparseItem)); } b_in.read ((char *)&item, sizeof (SparseItem)); while (!b_in.eof ()) { mr_item.row = item.row; mr_item.col = item.col; mr_item.type = 1 ; mr_item.value = item.val; key.second = item.col; for (int i = 0 ; i < a_num_row; ++i) { key.first = i; if (res.find (key) == res.end ()) { res[key] = std::vector<MapReduceType>(); } res[key].push_back (mr_item); } b_in.read ((char *)&item, sizeof (SparseItem)); } a_in.close (); b_in.close (); return res; }
进行Reduce操作的函数如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 bool mr_value_compare_function (const MapReduceType &a, const MapReduceType &b) { return (a.row <= b.row) && (a.col <= b.col); }#include <algorithm> float sparse_matrix_reducer (int num_mid, const std::vector<MapReduceType> &value) { std::vector<MapReduceType> a; std::vector<MapReduceType> b; for (auto &iter : value) { if (iter.type == 0 ) { a.push_back (iter); } else { b.push_back (iter); } } std::sort (a.begin (), a.end (), mr_value_compare_function); std::sort (b.begin (), b.end (), mr_value_compare_function); size_t i = 0 ; size_t j = 0 ; int k = 0 ; float res = 0.0f ; while (k < num_mid && i < a.size () && j < b.size ()) { if (k == a[i].col && k == b[j].row) { res += a[i].value * b[j].value; ++i; ++j; } else if (k == a[i].col) { ++i; } else if (k == b[j].row) { ++j; } ++k; } return res; }
上面三个函数的代码均是对算法的直接实现,较为直观,因此不再赘述。
完整代码保存在code/src/huge_matrix.cpp
中。
分别使用三个测例进行测试,下面的矩阵$A,B$都是随机稀疏化矩阵。
$(4,4)$的矩阵$A$和$(4,4)$的矩阵$B$。
$(12,4)$的矩阵$A$和$(4,40)$的矩阵$B$。
$(10,100)$的矩阵$A$和$(100,50)$的矩阵$B$。
然后比较大规模稀疏矩阵乘法的结果的一般方法的结果,结果如下。
可以看到三个测例中,大规模矩阵乘法的结果的一般方法的结果差异为0,均通过正确性验证。上面的方法虽然只是在单机上测试,但不难迁移到Hadoop的MapReduce框架,不再赘述。
测试代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void sparse_huge_matrix_test (int m, int n, int k) { Matrix a = Matrix::sparse_randn (m, n); Matrix b = Matrix::sparse_randn (n, k); Utils::save_matrix_to_file (a, "sparse_a.bin" ); Utils::save_matrix_to_file (b, "sparse_b.bin" ); Matrix::sparse_huge_matrix_dot ("sparse_a.bin" , "sparse_b.bin" , "sparse_c.bin" ); Matrix real = a.ikj_dot (b); Matrix result = Utils::read_matrix_from_file ("sparse_c.bin" ); cout << "[" << a.get_num_row () << ", " << a.get_num_col () << "]" << " x [" << b.get_num_row () << ", " << b.get_num_col () << "]" << " => [" << result.get_num_row () << ", " << result.get_num_col () << "]" << ", diff: " << (real - result).variance () << endl; }
代码保存在code/test/test_8.cpp
中。
其中,随机稀疏矩阵的生成代码如下。
Matrix Matrix::sparse_randn (int num_row, int num_col, float p) { srand (time (NULL )); Matrix res = Matrix::randn (num_row, num_col); for (int i = 0 ; i < res.get_num_row (); ++i) { for (int j = 0 ; j < res.get_num_col (); ++j) { if ((rand () % 999 ) / 1000.0f < p) { res (i, j) = 0 ; } } } return res; }
例如,生成一个$8\times 8$的稀疏矩阵如下。
std::cout << Matrix::sparse_randn (4 , 4 , 0.8 ) << std::endl;
结果如下。
p
用于调整稀疏率,越大越稀疏。
综合比较 分别使用五个测试用例进行测试,每个用例都是随机稀疏化方阵,大小分别是$4\times4$,$16\times 16$,$64\times64$,$256\times256$,$512\times 512$。
对于大规模通用矩阵乘法,用时分别是$0.2ms$,$2.9ms$,$254.7ms$,$15121ms$,$123534ms$。
对于大规模稀疏矩阵乘法,用时分别是$0.4ms$,$0.8ms$,$34.7s$,$1496.8ms$,$16806.1ms$。
加速比为
可以看到大规模稀疏矩阵算法可以加速稀疏矩阵的运算速度,最高的加速比可以达到10。
完整代码放置在code/test/test_9.cpp
。