C# Parallel Programming:#5 Synchronization Primitives

2022-07-18 • 19 min read

在我接觸到 Synchronization Primitives 這個名詞之前,我還不太清楚它指的是什麼東西,在.NET 官網的中文翻譯稱為「同步處理原始物件」,所以在這篇會來介紹什麼是Synchronization Primitives以及其用途。

什麼是 Synchronization Primitives

在講解Synchronization Primitives之前,必須先來聊解什麼是臨界區間(Critical Section),若有讀過或上過作業系統(Operating System)可能對此名詞不陌生,當在 multi-thread 的環境下,由於會有存取共享記憶體(share memory)的部分,若每個 thread 在同時間進行存取某個記憶位置,肯定會有資料上的不確定性,也就是 race condition 的問題,所以臨界區間是為了解決此問題而存在,它會存於在每個 thread 中,利用一些機制保證當存取某段記憶體位置時,同時間只能有一個 thread 進行,在程式碼中會分成三個區段:

  • 進入區(entry section):進去臨界區間前的那段控制程式碼。
  • 離開區(exit section): 離開臨界區間前的那段控制程式碼。
  • 剩餘區(remainder section): 不是進入區、臨界區間與離開區的都是剩餘區。
while(true) {
  進入區
    臨界區間
  離開區
    剩餘區
}

臨界區間的成立必須有三個必要條件:

  • 互斥(mutual exclusion) : 同時間最多只能有一個 thread 進入臨界區間。
  • 能持續運作(progress): 若臨界區間沒有任何 thread 在其中,只有不再剩餘區間的 thread 可以競爭進入臨界區間且必須在有限時間內產出優勝者。
  • 有限等待(bounded waiting): 當某個 thread 到進入區等待進入時,其它 thread 進入臨界區間的次數必須有個上限值,既不能使某個 thread 無限等待。

而這篇要說的Synchronization Primitives其實就是用來同步存取共享記憶體與協調 thread 之間的互動所提供的一些類別、方法或語法,我們將類型分為五種:

  • Interlocked Operations
  • Locking Primitives
  • Signaling Primitives
  • Lightweight Primitives
  • Barrier 與 Countdown Events
  • SpinWait

Interlocked Operations

Interlocked這個 class 能讓我們在不同 thread 上透過 atomic operations 操作共用變數,atomic operations 操作的背後理念就是永遠不會在同一時間發生兩件事情,例如當我們同時按下兩顆鍵盤上的按鍵,無論如何一定有一個先後,不論我們按的有多快多同時,這意味著我們不需要等待或者產生任何碰撞。看看下面的例子:

int _counter = 0;
Parallel.For(1, 1000, i =>
{
    Thread.Sleep(100);
    _counter++;
});
Console.WriteLine($"Value for counter should be 999 and is { _counter }");

//Value for counter should be 999 and is 959

可以看到結果的值不如我們預期,這是 race condition 造成的,因為當前 thread 所讀取的值並不保證是最新的結果。

為了避免這種情況,我們可以使用Interlocked所提供方法來達到 thread-safe 的效果:

int _counter = 0;
Parallel.For(1, 1000, i =>
{
    Thread.Sleep(100);
    Interlocked.Increment(ref _counter);
});
Console.WriteLine($"Value for counter should be 999 and is { _counter }");

//Value for counter should be 999 and is 999

上面範例使用了Increment將變數遞增,除了此方法外還提供了AddAndCompareExchangeDecrementExchangeMemoryBarrierOrRead等方法。

Locking Primitives

為了實現Critical Section的機制,.NET 提供了一些類別或語法能夠使我們達到此目的,這些類別或語法能夠locking(鎖定)某些資源,使其同時間只能讓單一 thread 進行存取。

再介紹這些類別或語法之前,我們先來了解ThreadState,它能夠使用我們了解當下 thread 的狀態,下圖為 thread 的 life cycle:

Thread Life Cycle

  • Unstarted: Thread.Start方法還沒被該 thread 呼叫。
  • Running: Thread.Start已經被呼叫,並且沒有處於任何阻塞、暫停或停止的狀態。
  • WaitSleepJoin: 當 thread 呼叫了Wait()Sleep()Join(),就會進入阻塞狀態。
  • StopRequested: thread 正被要求停止中。
  • Stopped: thread 已經停止。
  • SuspendRequested: thread 正被要求暫停中。
  • Suspended: thread 已經暫停。
  • AbortRequested: Abort()已經被呼叫,但 thread 還沒有收到會嘗試終止它的 ThreadAbortException。
  • Aborted: 目前 thread 已經無作用,但狀態還未切成Stopped

lockMutexSemaphoreSemaphoreSlim

lockMutex只允許單一 thread 存取受保護的資源,而lock是類別Monitor所包裝過的語法糖,而SemaphoreSemaphoreSlim可以讓指定的 thread 數量同時存取受保護的資源。

lockSemaphore不能夠跨 process,而MutexSemaphoreSlim能夠跨 process。

Locking Primitives

Lock

先來看看以下程式碼:

var range = Enumerable.Range(1, 1000);
Stopwatch watch = Stopwatch.StartNew();
for(int i = 0; i < range.Count(); i++)
{
    Thread.Sleep(10);
    File.AppendAllText("test.txt", i.ToString());
}
watch.Stop();
Console.WriteLine($"Total time to write file is {watch.ElapsedMilliseconds}");

//Total time  to write file is 13303

我們嘗試將數字 1 到 1000 寫入某個檔案中,並且每次停滯 10 milliseconds, 再加上 IO 的時間,所以可以看到最後花費了 13303 milliseconds,若我們嘗試用AsParallel()AsOrdered()使其能平行寫入並且又保持順序:

var range = Enumerable.Range(1, 1000);
Stopwatch watch = Stopwatch.StartNew();
range.AsParallel().AsOrdered().ForAll(i =>
{
    Thread.Sleep(10);
    File.AppendAllText("test.txt", i.ToString());
});
watch.Stop();
Console.WriteLine($"Total time to write file is {watch.ElapsedMilliseconds}");

此時會拋出System.IO.IOException: The process cannot access the file 'test.txt' because it is being used by another process

由於我們嘗試使多個 thread 同時間寫入檔案,所以系統拋出了這個例外訊息,這意味著檔案的寫入必須再受保護範圍內,所以我們可以使用lock來達成這項功能,由於lock需要接收一個object作為識別的媒介,所以需要先宣告一個靜態變數:

static object _lock = new object();
...

var range = Enumerable.Range(1, 1000);
Stopwatch watch = Stopwatch.StartNew();
range.AsParallel().AsOrdered().ForAll(i =>
{
    Thread.Sleep(10);
    lock(_lock)
    {
        File.AppendAllText("test.txt", i.ToString());
    }
});
watch.Stop();
Console.WriteLine($"Total time to write file is {watch.ElapsedMilliseconds}");

//Total time to write file is 2345

由於我們已經使用lock來保護檔案寫入的部分,Thread.Sleep(10)可以當作是每個 thread 各自完成各自工作的時間,此時就能把它抽出來,這樣就能有效提升速度。

上面有提到lock是類別Monitor所包裝的語法糖,所以我們也能使用Monitor來實作:

static object _lock = new object();
...

var range = Enumerable.Range(1, 1000);
Stopwatch watch = Stopwatch.StartNew();
range.AsParallel().AsOrdered().ForAll(i =>
{
    Thread.Sleep(10);
    Monitor.Enter(_lock);
    try
    {
        File.WriteAllText("test.txt", i.ToString());
    }
    finally
    {
        Monitor.Exit(_lock);
    }
});
watch.Stop();
Console.WriteLine($"Total time to write file is {watch.ElapsedMilliseconds}");

//Total time  to write file is 1771

Mutex

若有兩個以上的 process 同時對檔案進行寫入的動作,若使用上面的lock依舊會拋出System.IO.IOException: The process cannot access the file 'test.txt' because it is being used by another process這個例外,由於lock不支援跨 process,所以此時我們可以改用Mutex替代:

private static Mutex mutex = new Mutex();
...

var range = Enumerable.Range(1, 1000);
Stopwatch watch = Stopwatch.StartNew();
range.AsParallel().AsOrdered().ForAll(i =>
{
    Thread.Sleep(10);
    mutex.WaitOne();
    File.AppendAllText("test.txt", i.ToString());
    mutex.ReleaseMutex();
});
watch.Stop();
Console.WriteLine($"Total time to write file is {watch.ElapsedMilliseconds}");

//Total time  to write file is 1965

透過宣告Mutex物件,使用WaitOne()ReleaseMutex來鎖定與釋放共享資源,WaitOne()還可以讓我們指定時間內若未收到釋放的訊號,則自動放棄進入受保護的區域,例如:

private static Mutex mutex = new Mutex();
...

var range = Enumerable.Range(1, 1000);
Stopwatch watch = Stopwatch.StartNew();
range.AsParallel().AsOrdered().ForAll(i =>
{
    Thread.Sleep(10);
    if (mutex.WaitOne(3000))
    {
        File.AppendAllText("test.txt", i.ToString());
        mutex.ReleaseMutex();
    }
    else
    {
        Console.WriteLine($"Timeout");
    };
});
watch.Stop();
Console.WriteLine($"Total time to write file is {watch.ElapsedMilliseconds}");

//Total time  to write file is 1971

Semaphore

上面提到的lockMutex同時間都只能使單一 thread 存取共享資源,而接下來要介紹的Semaphore可以讓我們設定允許通過的數量,它可以接受兩個參數,第一個參數為initialCount,可以設定多少 thread 在初始化後進入,第二個參數為maximumCount,可以設定可進入區間的最大 thread 數量。

看下面範例程式:

Semaphore semaphore = new Semaphore(3, 3);
var range = Enumerable.Range(1, 1000);
range.AsParallel().AsOrdered().ForAll(i =>
{
    semaphore.WaitOne();
    Console.WriteLine($"Index {i} making service call using Task {Task.CurrentId}");
    Thread.Sleep(1000);
    Console.WriteLine($"Index {i} releasing semaphore using Task {Task.CurrentId}");
    semaphore.Release();
});

//Index 3 making service call using Task 3
//Index 2 making service call using Task 4
//Index 1 making service call using Task 6
//Index 1 releasing semaphore using Task 6
//Index 2 releasing semaphore using Task 4
//Index 3 releasing semaphore using Task 3
//Index 6 making service call using Task 8
//Index 7 making service call using Task 5
//Index 8 making service call using Task 9
//Index 6 releasing semaphore using Task 8
//Index 8 releasing semaphore using Task 9
//Index 7 releasing semaphore using Task 5
//Index 9 making service call using Task 3
//Index 11 making service call using Task 4
//Index 10 making service call using Task 6
//Index 10 releasing semaphore using Task 6
//Index 11 releasing semaphore using Task 4
//Index 5 making service call using Task 7
//...

可以看到同時間會有三個 thread 進入區間,接著被釋放後,又會緊接著讓下個 thread 進入。

Semaphore可以分 local semaphore 與 global semaphore,我們可以替它指定一個名子,若有指定它就會被創建為 global semaphore,若未指定,則為 local semaphore:

Semaphore semaphore = new Semaphore(1,10, "Globalsemaphore");

Signaling Primitives

Signaling Primitives(訊號處理原始物件)表示 thread 必須等待其他 thread 的信號(訊號)才能繼續執行,接下來會介紹幾種.NET 提供的類別或方法讓我們可以達到等待的功用:

  • Thread.Join
  • EventWaitHandle

Thread.Join

直接來看一個簡單的例子:

int result = 0;
Thread childThread = new Thread(() =>
{
    Thread.Sleep(5000);
    result = 10;
});
childThread.Start();
Console.WriteLine($"Result is {result}");

//Result is 0

上面的例子我們期待 result 可以等於 10,但由於 main thread 執行的太快,還沒等到 childThread 執行完成就已經先把 result 打印出來了,所以這時我們可以使用Join()來等待childThread執行完成:

int result = 0;
Thread childThread = new Thread(() =>
{
    Thread.Sleep(5000);
    result = 10;
});
childThread.Start();
childThread.Join();
Console.WriteLine($"Result is {result}");

//Result is 10

EventWaitHandle

EventWaitHandle 類別用於 thread 的同步處理,在介紹它之前,先來說說AutoResetEventManualResetEvent這兩個類別,它們都可以使用WaitOne方法將 thread 的狀態變成 WaitSleepJoin,使用Set方法可以將狀態變回 Running 狀態,而這兩個類別的差別在於AutoResetEvent在呼叫Set之後會自動設回未收到信號的狀態,而ManualResetEvent在呼叫Set之後需要在呼叫Reset才會回到未收到信號的狀態,看看下面的範例

AutoResetEvent:

static AutoResetEvent autoResetEvent = new AutoResetEvent(false);
static void Main(string[] args)
{
    int sum = 0;
    try
    {
        Task signallingTask = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                Thread.Sleep(1000);
                autoResetEvent.Set();
                Console.WriteLine($"AutoResetEvent call set()");
            }
        });

        Parallel.For(1, 10, (i) => {
            Console.WriteLine($"Task with id {Task.CurrentId} waiting for signal to enter");
            autoResetEvent.WaitOne();
            Console.WriteLine($"Task with id {Task.CurrentId} received signal to enter");
            sum += i;
        });
    }
    catch (AggregateException ex)
    {
        foreach (var inner in ex.InnerExceptions)
        {
            Console.WriteLine(inner.Message);
        }
    }
    Console.ReadLine();
}

//Task with id 3 waiting for signal to enter
//Task with id 2 waiting for signal to enter
//Task with id 4 waiting for signal to enter
//Task with id 5 waiting for signal to enter
//Task with id 6 waiting for signal to enter
//Task with id 7 waiting for signal to enter
//Task with id 8 waiting for signal to enter
//Task with id 9 waiting for signal to enter
//Task with id 10 waiting for signal to enter
//AutoResetEvent call set()
//Task with id 2 received signal to enter
//AutoResetEvent call set()
//Task with id 3 received signal to enter
//AutoResetEvent call set()
//Task with id 4 received signal to enter
//AutoResetEvent call set()
//Task with id 5 received signal to enter
//AutoResetEvent call set()
//Task with id 6 received signal to enter
//AutoResetEvent call set()
//Task with id 7 received signal to enter
//AutoResetEvent call set()
//Task with id 8 received signal to enter
//AutoResetEvent call set()
//Task with id 9 received signal to enter
//AutoResetEvent call set()
//Task with id 10 received signal to enter
//AutoResetEvent call set()

在一開始我們建立了發送信號的 task,使其每秒會呼叫Set方法來釋放 thread,接著透過Parallel建了 10 個 tasks,其中我們使用了WaitOne方法來阻塞每個 task 繼續進行下去,所以在一開始所有 task 的會處於阻塞的狀態,直到發送信號的 task 開始呼叫Set

ManualResetEvent:

static ManualResetEvent manualResetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
    int sum = 0;
    try
    {
        Task signalOffTask = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                Thread.Sleep(1000);
                manualResetEvent.Reset();
                Console.WriteLine("Signal Off");
            }
        });

        Task signalOnTask = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                Thread.Sleep(3000);
                manualResetEvent.Set();
                Console.WriteLine("Signal On");
            }
        });


        for (int i = 0; i < 3; i++)
        {
            Parallel.For(0, 5, (i) => {
                Console.WriteLine($"Task with id {Task.CurrentId} waiting for signal to enter");
                manualResetEvent.WaitOne();
                Console.WriteLine($"Task with id {Task.CurrentId} received signal to enter");
                sum += i;
            });
            Thread.Sleep(2000);
        }
    }
    catch (AggregateException ex)
    {
        foreach (var inner in ex.InnerExceptions)
        {
            Console.WriteLine(inner.Message);
        }
    }
    Console.ReadLine();
}

//Task with id 7 waiting for signal to enter
//Task with id 3 waiting for signal to enter
//Task with id 6 waiting for signal to enter
//Task with id 5 waiting for signal to enter
//Task with id 4 waiting for signal to enter
//Task with id 8 waiting for signal to enter
//Task with id 9 waiting for signal to enter
//Task with id 10 waiting for signal to enter
//Signal Off
//Task with id 11 waiting for signal to enter
//Signal Off
//Signal On
//Task with id 6 received signal to enter
//Task with id 7 received signal to enter
//Task with id 5 received signal to enter
//Task with id 3 received signal to enter
//Task with id 4 received signal to enter
//Task with id 11 received signal to enter
//Signal Off
//Task with id 8 received signal to enter
//Task with id 9 received signal to enter
//Task with id 10 received signal to enter
//Signal Off
//Signal Off
//Signal On

AutoResetEvent類似,但在上面多了一組 SignalOff 的 task,當Set被呼叫後,可以看到所有被阻塞的 task 都立刻被釋放了,直到Reset再次被呼叫才會再次阻塞。

這兩個類別皆繼承EventWaitHandleAutoResetEvent等於使用EventResetMode.AutoReset來建立EventWaitHandle,而ManualResetEvent等於使用EventResetMode.ManualReset來建立EventWaitHandle

Lightweight Primitives

ReaderWriterLockSlim

ReaderWriterLockSlim是輕量級版的ReaderWriterLock,它們允許多個 thread 進行讀取,在寫入時只允許單一 thread 進行,在官方網站已經建議使用ReaderWriterLockSlim來代替ReaderWriterLockReaderWriterLockSlim可以避免產生 deadlock(死結)且效能上也比ReaderWriterLock好,下面的例子使用了三個 reader thread 與一個 writer thread:

static ReaderWriterLockSlim _readerWriterLockSlim = new ReaderWriterLockSlim();
static List<int> _list = new List<int>();
static void WriterTask()
{
    for (int i = 0; i < 4; i++)
    {
        try
        {
            _readerWriterLockSlim.EnterWriteLock();
            Console.WriteLine($"Entered WriteLock on Task {Task.CurrentId}");
            int random = new Random().Next(1, 10);
            _list.Add(random);
            Console.WriteLine($"Added {random} to list on Task {Task.CurrentId}");
            Console.WriteLine($"Exiting WriteLock on Task {Task.CurrentId}");
        }
        finally
        {
            _readerWriterLockSlim.ExitWriteLock();
        }
        Thread.Sleep(1000);
    }
}

static void ReaderTask()
{
    for (int i = 0; i < 2; i++)
    {
        _readerWriterLockSlim.EnterReadLock();
        Console.WriteLine($"Entered ReadLock on Task {Task.CurrentId}");
        Console.WriteLine($"Items: {_list.Select(j => j.ToString()).Aggregate((a, b) => a + "," + b)} on Task{Task.CurrentId}");
        Console.WriteLine($"Exiting ReadLock on Task {Task.CurrentId}");
        _readerWriterLockSlim.ExitReadLock();
        Thread.Sleep(1000);
    }
}

private static void ReaderWriteLockSlim()
{
    Task writerTask = Task.Factory.StartNew(WriterTask);
    for (int i = 0; i < 3; i++)
    {
        Task readerTask = Task.Factory.StartNew(ReaderTask);
    }
}

static void Main(string[] args)
{
    try
    {
        ReaderWriteLockSlim();
    }
    catch (AggregateException ex)
    {
        foreach (var inner in ex.InnerExceptions)
        {
            Console.WriteLine(inner.Message);
        }
    }
    Console.ReadLine();
}

// Entered WriteLock on Task 1
// Added 6 to list on Task 1
// Exiting WriteLock on Task 1
// Entered ReadLock on Task 4
// Entered ReadLock on Task 3
// Items: 6 on Task4
// Exiting ReadLock on Task 4
// Items: 6 on Task3
// Entered ReadLock on Task 2
// Exiting ReadLock on Task 3
// Items: 6 on Task2
// Exiting ReadLock on Task 2
// Entered WriteLock on Task 1
// Added 7 to list on Task 1
// Exiting WriteLock on Task 1
// Entered ReadLock on Task 2
// Entered ReadLock on Task 4
// Items: 6,7 on Task4
// Exiting ReadLock on Task 4
// Entered ReadLock on Task 3
// Items: 6,7 on Task2
// Exiting ReadLock on Task 2
// Items: 6,7 on Task3
// Exiting ReadLock on Task 3
// Entered WriteLock on Task 1
// Added 2 to list on Task 1
// Exiting WriteLock on Task 1
// Entered WriteLock on Task 1
// Added 6 to list on Task 1
// Exiting WriteLock on Task 1

SemaphoreSlim

SemaphoreSlim是輕量級版的Semaphore,在前面有稍微提到過,它能允許指定數量的 thread 通過,看看下面的例子:

 static void Main(string[] args)
{
    try
    {
        var range = Enumerable.Range(1, 12);
        SemaphoreSlim semaphore = new SemaphoreSlim(3, 3);
        range.AsParallel().AsOrdered().ForAll(i =>
        {
            try
            {
                semaphore.Wait();
                Console.WriteLine($"Index {i} making service call using Task {Task.CurrentId}");
                Thread.Sleep(1000);
                Console.WriteLine($"Index {i} releasing semaphore using Task {Task.CurrentId}");
            }
            finally
            {
                semaphore.Release();
            }
        });
    }
    catch (AggregateException ex)
    {
        foreach (var inner in ex.InnerExceptions)
        {
            Console.WriteLine(inner.Message);
        }
    }
    Console.ReadLine();
}

// Index 5 making service call using Task 7
// Index 1 making service call using Task 6
// Index 2 making service call using Task 8
// Index 5 releasing semaphore using Task 7
// Index 7 making service call using Task 5
// Index 1 releasing semaphore using Task 6
// Index 2 releasing semaphore using Task 8
// Index 10 making service call using Task 6
// Index 11 making service call using Task 8
// Index 11 releasing semaphore using Task 8
// Index 10 releasing semaphore using Task 6
// Index 7 releasing semaphore using Task 5
// Index 8 making service call using Task 9
// Index 3 making service call using Task 3
// Index 9 making service call using Task 7
// Index 9 releasing semaphore using Task 7
// Index 3 releasing semaphore using Task 3
// Index 8 releasing semaphore using Task 9
// Index 12 making service call using Task 8
// Index 6 making service call using Task 4
// Index 4 making service call using Task 2
// Index 12 releasing semaphore using Task 8
// Index 4 releasing semaphore using Task 2
// Index 6 releasing semaphore using Task 4

每次至多只會有三個 thread 在其中運行,每當其中一個釋放下一個就會補上。

ManualResetEventSlim

ManualResetEventSlim是輕量級版的ManualResetEvent,它擁有比ManualResetEvent較好的 performance 與更少的開銷,我們可以像使用ManualResetEvent一樣:

ManualResetEventSlim manualResetEvent = new ManualResetEventSlim(false);

只是需要使用Wait()來代替WaitOne(),與前面幾個輕量級版的類似。

Barrier 與 Countdown Events

BarrierCountdownEvent與前面介紹的 Synchronization Primitives 用途有些不同,此兩種比較偏向在多個任務不同階段的處理流程。

Barrier

若我們有四個 Task,每個 Task 須完成四個階段,每一階段都需等待其他Task完成後才能進行下個階段,那麼我們就可以使用Barrier來達成此目的:

Barrier

static int _TaskNum = 4;
static Task[] _Tasks;
static Barrier _Barrier;


static void PhaseZero(int taskId)
{
    Console.WriteLine("Task: #{0} ===== Phase 0", taskId);
}

static void PhaseOne(int taskId)
{
    Console.WriteLine("Task: #{0} ***** Phase 1", taskId);
}

static void PhaseTwo(int taskId)
{
    Console.WriteLine("Task: #{0} $$$$$ Phase 2", taskId);
}

static void PhaseThree(int taskId)
{
    Console.WriteLine("Task: #{0} @@@@@ Phase 3", taskId);
}

static void Main(string[] args)
{
    try
    {
        _Tasks = new Task[_TaskNum];
        _Barrier = new Barrier(_TaskNum, (barrier) =>
        {
            Console.WriteLine("----------------{0} Phase Completed--------------------------", barrier.CurrentPhaseNumber);
        });

        for (int i = 0; i < _TaskNum; i++)
        {
            _Tasks[i] = Task.Factory.StartNew((num) =>
            {
                var task_id = (int)num;

                PhaseZero(task_id);
                _Barrier.SignalAndWait();

                PhaseOne(task_id);
                _Barrier.SignalAndWait();

                PhaseTwo(task_id);
                _Barrier.SignalAndWait();

                PhaseThree(task_id);
                _Barrier.SignalAndWait();
            }, i);
        }

        var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) =>
        {
            Task.WaitAll(_Tasks);
            Console.WriteLine("=============================");
            Console.WriteLine("All Phase is completed");
            _Barrier.Dispose();

        });

        finalTask.Wait();
        Console.ReadLine();
    }
    catch (AggregateException ex)
    {
        foreach (var inner in ex.InnerExceptions)
        {
            Console.WriteLine(inner.Message);
        }
    }
}

//Task: #1 ===== Phase 0
//Task: #0 ===== Phase 0
//Task: #2 ===== Phase 0
//Task: #3 ===== Phase 0
//----------------0 Phase Completed--------------------------
//Task: #3 ***** Phase 1
//Task: #1 ***** Phase 1
//Task: #2 ***** Phase 1
//Task: #0 ***** Phase 1
//----------------1 Phase Completed--------------------------
//Task: #0 $$$$$ Phase 2
//Task: #3 $$$$$ Phase 2
//Task: #2 $$$$$ Phase 2
//Task: #1 $$$$$ Phase 2
//----------------2 Phase Completed--------------------------
//Task: #1 @@@@@ Phase 3
//Task: #3 @@@@@ Phase 3
//Task: #2 @@@@@ Phase 3
//Task: #0 @@@@@ Phase 3
//----------------3 Phase Completed--------------------------
//=============================
//All Phase is completed

BarrierSignalAndWait()可以發出訊號,表示自己已抵達屏障,並且等待其他所有參加的 Task 到來,一旦等到所設定的數量,就會進入下一個階段。

CountdownEvent

CountdownEvent需要接收到一定數量的訊號,才會繼續往下進行,下面我們建立五個Task,來測試:

static int _TaskNum = 5;
static Task[] _Tasks;
static CountdownEvent countdownEvent = new CountdownEvent(5);

static void ProcessSomething(int taskId)
{
    Console.WriteLine("Task: #{0} ===== Do Something......", taskId);
}

static void Main(string[] args)
{
    try
    {
        _Tasks = new Task[_TaskNum];
        for (int i = 0; i < _TaskNum; i++)
        {
            _Tasks[i] = Task.Factory.StartNew((num) =>
            {
                var task_id = (int)num;
                ProcessSomething(task_id);
                countdownEvent.Signal();
            }, i);
        }

        countdownEvent.Wait();
        Console.WriteLine("=============================");
        Console.WriteLine("All Task is completed");
        Console.ReadLine();
    }
    catch (AggregateException ex)
    {
        foreach (var inner in ex.InnerExceptions)
        {
            Console.WriteLine(inner.Message);
        }
    }
}

//Task: #1 ===== Do Something......
//Task: #0 ===== Do Something......
//Task: #3 ===== Do Something......
//Task: #4 ===== Do Something......
//Task: #2 ===== Do Something......
//=============================
//All Task is completed

CountdownEvent的建構式可以指定需等待訊號的數量,每當Signal()被呼叫,CountdownEvent中的CurrentCount就會減一,而Wait()則會等待計數到達才釋放當前thread。

SpinWait

SpinWait 是一個輕量型同步處理類型,多數時候我們會像Thread.Sleep一樣來使用它,但它比Thread.Sleep產生更少kernel間接費用在Context Switch上,SpinWait比較偏向在CPU進行等待,若等待時間較長或者條件不滿足時,則會自動轉換到kernel環境進行等待。

網路有些文章會說SpinWait的資源利用度會比Thread.Sleep來的好,但我自己測試的結果覺得差異性不大,但根據官方說法,在等待時間較短時,選擇使用SpinWait會是比較好的選擇,至少不需耗費資源再Context Switch上。

Copyright © 2023. Papan01