マルチスレッド:最小限の同期による合計

これまで、std::vector の合計には 2 つの方法を使用してきました。まず、1 つのスレッドですべての計算を行いました (シングル スレッド:ベクトルの合計)。 2 番目の複数のスレッドが結果に対して同じ変数を共有しました (マルチスレッド:ベクトルの合計)。特に、2 番目の戦略は非常に単純でした。この投稿では、両方の投稿に関する私の知識を適用します。私の目標は、スレッドが可能な限り互いに独立して合計を実行し、同期のオーバーヘッドを削減することです。

スレッドを独立して動作させ、同期を最小限に抑えるために、いくつかのアイデアを考えています。ローカル変数、スレッド ローカル データだけでなく、タスクも機能するはずです。今、私は興味があります.

私の戦略

私の戦略は同じままです。前回の投稿と同様に、4 つのコアと GCC を搭載したデスクトップ PC と、2 つのコアと cl.exe を搭載したラップトップを使用しています。最大限の最適化を行わない場合と行った場合の結果を提供します。詳細については、こちらをご覧ください:シングルトンのスレッドセーフな初期化。

ローカル変数

各スレッドにはローカル合計変数があるため、同期せずにジョブを実行できます。ローカル合計変数を合計することだけが必要です。ローカル結果の追加は、保護する必要がある重要なセクションです。これは、さまざまな方法で行うことができます。前に簡単な発言。加算は 4 回しか行われないため、どの同期を使用するかは、パフォーマンスの観点からはそれほど重要ではありません。しかし、私の発言の代わりに、std::lock_guard と、順次一貫性と緩和されたセマンティクスを持つアトミックを使用します。

std::lock_guard

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// localVariable.cpp

#include <mutex>
#include <chrono>
#include <iostream>
#include <random>
#include <thread>
#include <utility>
#include <vector>

constexpr long long size= 100000000; 

constexpr long long firBound= 25000000;
constexpr long long secBound= 50000000;
constexpr long long thiBound= 75000000;
constexpr long long fouBound= 100000000;

std::mutex myMutex;

void sumUp(unsigned long long& sum, const std::vector<int>& val, unsigned long long beg, unsigned long long end){
 unsigned long long tmpSum{};
 for (auto i= beg; i < end; ++i){
 tmpSum += val[i];
 }
 std::lock_guard<std::mutex> lockGuard(myMutex);
 sum+= tmpSum;
}

int main(){

 std::cout << std::endl;

 std::vector<int> randValues;
 randValues.reserve(size);

 std::mt19937 engine;
 std::uniform_int_distribution<> uniformDist(1,10);
 for ( long long i=0 ; i< size ; ++i) randValues.push_back(uniformDist(engine));
 
 unsigned long long sum{}; 
 auto start = std::chrono::system_clock::now();
 
 std::thread t1(sumUp,std::ref(sum),std::ref(randValues),0,firBound);
 std::thread t2(sumUp,std::ref(sum),std::ref(randValues),firBound,secBound);
 std::thread t3(sumUp,std::ref(sum),std::ref(randValues),secBound,thiBound);
 std::thread t4(sumUp,std::ref(sum),std::ref(randValues),thiBound,fouBound); 
 
 t1.join();
 t2.join();
 t3.join();
 t4.join();
 
 std::chrono::duration<double> dur= std::chrono::system_clock::now() - start;
 std::cout << "Time for addition " << dur.count() << " seconds" << std::endl;
 std::cout << "Result: " << sum << std::endl;

 std::cout << std::endl;

}

25 行目と 26 行目が重要な行です。ここで、ローカル合計結果 tmpSum がグローバル合計に追加されます。ローカル変数の例が変化する場所は正確にはどこですか?

最適化なし

最大限の最適化

順次整合性のあるアトミック操作

私の最初の最適化は、std::lock_guard で保護されたグローバル合計 sum 変数をアトミック変数に置き換えることです。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// localVariableAtomic.cpp

#include <atomic>
#include <chrono>
#include <iostream>
#include <random>
#include <thread>
#include <utility>
#include <vector>

constexpr long long size= 100000000; 

constexpr long long firBound= 25000000;
constexpr long long secBound= 50000000;
constexpr long long thiBound= 75000000;
constexpr long long fouBound= 100000000;

void sumUp(std::atomic<unsigned long long>& sum, const std::vector<int>& val, unsigned long long beg, unsigned long long end){
 unsigned int long long tmpSum{};
 for (auto i= beg; i < end; ++i){
 tmpSum += val[i];
 }
 sum+= tmpSum;
}

int main(){

 std::cout << std::endl;

 std::vector<int> randValues;
 randValues.reserve(size);

 std::mt19937 engine;
 std::uniform_int_distribution<> uniformDist(1,10);
 for ( long long i=0 ; i< size ; ++i) randValues.push_back(uniformDist(engine));
 
 std::atomic<unsigned long long> sum{}; 
 auto start = std::chrono::system_clock::now();
 
 std::thread t1(sumUp,std::ref(sum),std::ref(randValues),0,firBound);
 std::thread t2(sumUp,std::ref(sum),std::ref(randValues),firBound,secBound);
 std::thread t3(sumUp,std::ref(sum),std::ref(randValues),secBound,thiBound);
 std::thread t4(sumUp,std::ref(sum),std::ref(randValues),thiBound,fouBound); 
 
 t1.join();
 t2.join();
 t3.join();
 t4.join();
 
 std::chrono::duration<double> dur= std::chrono::system_clock::now() - start;
 std::cout << "Time for addition " << dur.count() << " seconds" << std::endl;
 std::cout << "Result: " << sum << std::endl;

 std::cout << std::endl;

}

最適化なし

最大限の最適化

セマンティックが緩和されたアトミック操作

もっとうまくやることができます。順次一貫性のある既定のメモリ モデルの代わりに、緩和されたセマンティクスを使用します。 23 行目の追加がどの順序で行われるかは問題ではないため、これは明確に定義されています。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// localVariableAtomicRelaxed.cpp

#include <atomic>
#include <chrono>
#include <iostream>
#include <random>
#include <thread>
#include <utility>
#include <vector>

constexpr long long size= 100000000; 

constexpr long long firBound= 25000000;
constexpr long long secBound= 50000000;
constexpr long long thiBound= 75000000;
constexpr long long fouBound= 100000000;

void sumUp(std::atomic<unsigned long long>& sum, const std::vector<int>& val, unsigned long long beg, unsigned long long end){
 unsigned int long long tmpSum{};
 for (auto i= beg; i < end; ++i){
 tmpSum += val[i];
 }
 sum.fetch_add(tmpSum,std::memory_order_relaxed);
}

int main(){

 std::cout << std::endl;

 std::vector<int> randValues;
 randValues.reserve(size);

 std::mt19937 engine;
 std::uniform_int_distribution<> uniformDist(1,10);
 for ( long long i=0 ; i< size ; ++i) randValues.push_back(uniformDist(engine));
 
 std::atomic<unsigned long long> sum{}; 
 auto start = std::chrono::system_clock::now();
 
 std::thread t1(sumUp,std::ref(sum),std::ref(randValues),0,firBound);
 std::thread t2(sumUp,std::ref(sum),std::ref(randValues),firBound,secBound);
 std::thread t3(sumUp,std::ref(sum),std::ref(randValues),secBound,thiBound);
 std::thread t4(sumUp,std::ref(sum),std::ref(randValues),thiBound,fouBound); 
 
 
 t1.join();
 t2.join();
 t3.join();
 t4.join();
 std::chrono::duration<double> dur= std::chrono::system_clock::now() - start;
 std::cout << "Time for addition " << dur.count() << " seconds" << std::endl;
 std::cout << "Result: " << sum << std::endl;

 std::cout << std::endl;

}

最適化なし

最大限の最適化

次の戦略も同様です。しかし、今はスレッド ローカル データを使用しています。

スレッド ローカル データ

スレッド ローカル データは、各スレッドが排他的に所有するデータです。必要に応じて作成されます。したがって、スレッド ローカル データはローカル合計変数 tmpSum に完全に適合します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// threadLocal.cpp

#include <atomic>
#include <chrono>
#include <iostream>
#include <random>
#include <thread>
#include <utility>
#include <vector>

constexpr long long size= 100000000; 

constexpr long long firBound= 25000000;
constexpr long long secBound= 50000000;
constexpr long long thiBound= 75000000;
constexpr long long fouBound= 100000000;

thread_local unsigned long long tmpSum= 0;

void sumUp(std::atomic<unsigned long long>& sum, const std::vector<int>& val, unsigned long long beg, unsigned long long end){
 for (auto i= beg; i < end; ++i){
 tmpSum += val[i];
 }
 sum.fetch_add(tmpSum,std::memory_order_relaxed);
}

int main(){

 std::cout << std::endl;

 std::vector<int> randValues;
 randValues.reserve(size);

 std::mt19937 engine;
 std::uniform_int_distribution<> uniformDist(1,10);
 for ( long long i=0 ; i< size ; ++i) randValues.push_back(uniformDist(engine));
 
 std::atomic<unsigned long long> sum{}; 
 auto start = std::chrono::system_clock::now();
 
 std::thread t1(sumUp,std::ref(sum),std::ref(randValues),0,firBound);
 std::thread t2(sumUp,std::ref(sum),std::ref(randValues),firBound,secBound);
 std::thread t3(sumUp,std::ref(sum),std::ref(randValues),secBound,thiBound);
 std::thread t4(sumUp,std::ref(sum),std::ref(randValues),thiBound,fouBound); 
 
 t1.join();
 t2.join();
 t3.join();
 t4.join();
 
 std::chrono::duration<double> dur= std::chrono::system_clock::now() - start;
 std::cout << "Time for addition " << dur.count() << " seconds" << std::endl;
 std::cout << "Result: " << sum << std::endl;

 std::cout << std::endl;

}

18 行目でスレッド ローカル変数 tmpSum を宣言し、それを 22 行目と 24 行目の加算に使用します。スレッド ローカル変数と前のプログラムのローカル変数の小さな違いは、スレッド ローカル変数の有効期間がそのスレッドの存続期間にバインドされています。ローカル変数の有効期間は、そのスコープによって異なります。

最適化なし

最大限の最適化

質問は。同期せずに合計を高速に計算することは可能ですか?はい。

タスク

タスクを使用すると、同期なしでジョブ全体を実行できます。各合計は個別のスレッドで実行され、最終的な合計は 1 つのスレッドで実行されます。タスクの詳細は次のとおりです。次のプログラムでは promise と future を使用します。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// tasks.cpp

#include <chrono>
#include <future>
#include <iostream>
#include <random>
#include <thread>
#include <utility>
#include <vector>

constexpr long long size= 100000000; 

constexpr long long firBound= 25000000;
constexpr long long secBound= 50000000;
constexpr long long thiBound= 75000000;
constexpr long long fouBound= 100000000;

void sumUp(std::promise<unsigned long long>&& prom, const std::vector<int>& val, unsigned long long beg, unsigned long long end){
 unsigned long long sum={};
 for (auto i= beg; i < end; ++i){
 sum += val[i];
 }
 prom.set_value(sum);
}

int main(){

 std::cout << std::endl;

 std::vector<int> randValues;
 randValues.reserve(size);

 std::mt19937 engine;
 std::uniform_int_distribution<> uniformDist(1,10);
 for ( long long i=0 ; i< size ; ++i) randValues.push_back(uniformDist(engine));
 
 std::promise<unsigned long long> prom1;
 std::promise<unsigned long long> prom2;
 std::promise<unsigned long long> prom3;
 std::promise<unsigned long long> prom4;
 
 auto fut1= prom1.get_future();
 auto fut2= prom2.get_future();
 auto fut3= prom3.get_future();
 auto fut4= prom4.get_future();
 
 
 auto start = std::chrono::system_clock::now();

 std::thread t1(sumUp,std::move(prom1),std::ref(randValues),0,firBound);
 std::thread t2(sumUp,std::move(prom2),std::ref(randValues),firBound,secBound);
 std::thread t3(sumUp,std::move(prom3),std::ref(randValues),secBound,thiBound);
 std::thread t4(sumUp,std::move(prom4),std::ref(randValues),thiBound,fouBound);
 
 auto sum= fut1.get() + fut2.get() + fut3.get() + fut4.get();
 
 std::chrono::duration<double> dur= std::chrono::system_clock::now() - start;
 std::cout << "Time for addition " << dur.count() << " seconds" << std::endl;
 std::cout << "Result: " << sum << std::endl;
 
 t1.join();
 t2.join();
 t3.join();
 t4.join();

 std::cout << std::endl;

}

37 ~ 45 行で 4 つのプロミスを定義し、それらから関連する先物を作成します。各 promise は、50 行目から 52 行目で別のスレッドに移動されます。プロミスは移動のみ可能です。したがって、std::move を使用します。スレッドの作業パッケージは関数 sumUp です (18 ~ 24 行目)。 sumUp は、右辺値参照によるプロミスを最初の引数として受け取ります。先物は 55 行目で結果を要求します。 get 呼び出しがブロックされています。

最適化なし

最大限の最適化

概要のすべての数値

概要

前述のとおり、数値は Linux の場合と非常によく似ています。私はいつも同じ戦略を使用しているので、それは驚くことではありません。同期せずにローカルで部分和を計算し、ローカル和を追加します。部分合計の加算は同期する必要があります。私が驚いたのは、最大限の最適化を行っても大きな違いがないことです。

Windows では、話はまったく異なります。まず、プログラムを最大限に最適化してコンパイルするか、最適化せずにコンパイルするかによって、大きな違いが生じます。 2 番目の Windows は Linux よりもはるかに遅いです。 Windows には 2 コアしかないのに、Linux 4 にはコアがないためかどうかはわかりません。

次は?

ベクトルを合計するための数値と、そこから得られる結果については、次の記事で説明します。