C# Parallel Programming:#2 Task Parallelism

2021-09-09 • 17 min read

在前面一章我們介紹了.NET 中 Task Parallel Library (TPL)一些 thread 的基本用法,其目的是希望能夠使開發者容易使用,除了ThreadThreadPool 之外,還有這篇要介紹的System.Threading.Tasks,其中有涵蓋許多類別,執行方式類似於 javascript 中的Promise,且涵蓋了ThreadThreadPool的一些優點,我們先來看看這三者的差異性。

ThreadThreadPoolTask

這三個在使用上的選擇很容易混淆,且我們平常使用時,都認為只要有達到目的就好,這樣做雖然沒什麼錯,但若我們能深入了解或許能避免產生 bug 與改善 performance。

Thread

Thread屬於 OS-level thread,它擁有自己的 stack 與 kernel resources,且提供較靈活的操作空間,例如我們能使用Abort()Suspend()Resume()等方法來操作此 thread,但其缺點就在創建的成本,上一篇有提到,每次建立一個 thread 都必須耗費一些 memory 與 CPU time 用於它的 stack 上以及 threads 間 context-switch 的成本,所以較好的做法是利用 thread pool 來執行程式碼,交給 CLR 管理這一切。但有時候你別無選擇,因為你希望能替 thread 設置 name 好讓你 debug 容易些,或者你需要一個 apartment state 用來 show 你的 UI,又或者你希望有個 object 只能在特定的 thread 裡面被使用,那麼使用Thread能簡單的達到這些效果。

ThreadPool

ThreadPool是由 CLR 管理 threads 的容器,你能操作的空間有限,像上面Thread能做的事ThreadPool幾乎都不能,但它的好處就是可以避免我們過度創建過多的 thread,但缺點就是你無法得知你送進去的工作是否已經完成,也沒辦法拿到任何 result,很適合丟了就不管的簡短工作。

Task

最後是我們這章要介紹的Task,它不像Thread一樣創建了一個 OS-level thread,而是將工作交由TaskScheduler來負責執行,而TaskScheduler預設的行為就像ThreadPool一樣。 Task不像ThreadPool,它能夠讓我們知道工作是否已經完成,以及拿到最後的 result,所以在大部分時候,都建議使用Task來代替上面兩個方案。底下將介紹它有哪些功能,

建立與執行

TPL 提供了很多方式讓我們建立與執行 parallelism 的工作,而這篇的主軸System.Threading.Tasks這個 namespace 下有許多類別與方法能達到此目的,這裡先介紹以下三種比較常見的用法:

  • System.Threading.Tasks.Task 類別
  • System.Threading.Tasks.Task.Factory.StartNew 方法
  • System.Threading.Tasks.Task.Run 方法

System.Threading.Tasks.Task

Task基於Task-Based Asynchronous Pattern(TAP)的類別之一,透過方法Start來執行非同步的作業,若需要 return 值則需要使用 generic 的版本Task<T>,我們可以使用以下幾種方式來執行:

  • lambda expression
Task task = new Task (() => PrintNumber());
task.Start();
  • Action delegate
Task task = new Task (new Action (PrintNumber));
task.Start();
  • delegate
Task task = new Task (delegate {PrintNumber();});
task.Start();

System.Threading.Tasks.Task.Factory.StartNew

我們也可以使用Task.Factory類別中的StartNew方法來執行我們的非同步工作,它允許我們帶入一些參數來控制其行為,這裡先看看簡單的用法,後面會在詳細的介紹:

  • lambda expression
Task.Factory.StartNew(() => PrintNumber());
  • Action delegate
Task.Factory.StartNew(new Action (PrintNumber));
  • delegate
Task.Factory.StartNew(delegate {PrintNumber();});

System.Threading.Tasks.Task.Run

Task.RunTasks.Task.Factory.StartNew帶固定參數的包裝方法:

Task.Factory.StartNew(someAction,
    CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

用法也跟上面雷同:

  • lambda expression
Task.Run(() => PrintNumber());
  • Action delegate
Task.Run(new Action (PrintNumber));
  • delegate
Task.Run(delegate {PrintNumber();});

獲取 return 值

上述介紹的三個方法與類別皆可以透過 generic 來獲取非同步執行完後的結果,在執行完後會返回以下類別的物件:

  • Task<T>
  • Task.Factory.StartNew<T>
  • Task.Run<T>

我們能透過Result property 來獲取返回的值,它也有阻塞功能的功能,等同於使用Task.Wait:

static void Main(string[] args)
{
    var t1 = Task.Run(() =>
    {
        return Sum(100000);
    });

    Console.WriteLine(t1.Result);

    var t2 = Task.Factory.StartNew(() =>
    {
        return Sum(100000);
    });

    Console.WriteLine(t2.Result);

    var t3 = Task.Run(() =>
    {
        return Sum(100000);
    });

    Console.WriteLine(t3.Result);
}

private static int Sum(int n)
{
    int sum = 0;
    for (int i = 0; i < n; i++)
    {
        sum += i;
    }
    return sum;
}

//704982704
//704982704
//704982704

取消工作

有些時候我們可能需要中斷非同步工作,我們可以透過CancellationTokenSource來創建一把取消非同工作的權杖,上述的三種都有提供多載方法,下面以Task.Factory.StarNew為例:

static void Main(string[] args)
{
  CancellationTokenSource source = new CancellationTokenSource();
  CancellationToken token = source.Token;

  Task.Factory.StartNew(async () =>
  {
      for(int i = 0; i < 1000000; i++)
      {
          await Task.Delay(1000);
          Console.WriteLine(i);
          if (token.IsCancellationRequested)
          {
            token.ThrowIfCancellationRequested();
          }
      }
  }, token);

  Console.ReadLine();
  Console.WriteLine("Cancel Job");
  source.Cancel();
}

透過創建CancellationTokenSource來取得它的 token property,它可以給我們用來取消非同步工作用。上面的例子中,非同步工作會每一秒 print 當前迭代數字,我們可以透過按下任意鍵取消此非同步工作,token.IsCancellationRequested在接受到取消的訊息會被設為 true,token.ThrowIfCancellationRequested則用於跳出此非同步作業。

Tasks.Task.Factory.StartNew 與 Tasks.Task.Run

前面提到Tasks.Task.Run是固定參數的Tasks.Task.Factory.StartNew的包裝方法,這裡就來介紹一下Tasks.Task.Factory.StartNew的多載用法:

Task.Factory.StartNew(someAction,
    CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

第一個參數我想沒什麼問題,就是帶入需要非同步的函式,第二個CancellationToken在上面也提到過,用於取消非同步工作,再來我們先看看TaskCreationOptions,它主要用於創建TaskScheduler時,它有以下七種選項:

  • None(此為預設值)
  • AttachedToParent
  • DenyChildAttach
  • HideScheduler
  • LongRunning
  • PreferFairness
  • RunContinuationsAsynchronously

AttachedToParent

若在一個Task中在創建另外一個Task,預設行為是不會等子Task結束工作:

static void Main(string[] args)
{
    var parent = Task.Factory.StartNew(() => {
        Console.WriteLine("parent task executing.");

        var child = Task.Factory.StartNew(() => {
            Console.WriteLine("child task starting.");
            Thread.SpinWait(500000);
            Console.WriteLine("child task completing.");
        });
    });

    parent.Wait();
    Console.WriteLine("parent has completed.");
}

//parent task executing.
//parent has completed.
//child task starting.

上面範例中,還沒等到 child 程式就已經結束了,我們可以透過AttachedToParent,讓 parent 等待 child 完成:

static void Main(string[] args)
{
    var parent = Task.Factory.StartNew(() => {
        Console.WriteLine("parent task executing.");

        var child = Task.Factory.StartNew(() => {
            Console.WriteLine("child task starting.");
            Thread.SpinWait(500000);
            Console.WriteLine("child task completing.");
        }, TaskCreationOptions.AttachedToParent);    });

    parent.Wait();
    Console.WriteLine("parent has completed.");
}

//parent task executing.
//child task starting.
//child task completing.
//parent has completed.

DenyChildAttach

若父Task不希望它的子Task使用 AttachedToParent 到它身上,則可以使用此選項,那麼子Task的 AttachedToParent 將會失效。

HideScheduler

Task將會使用預設的TaskScheduler

LongRunning

Task中的 thread 都是由 thread pool 來管理,thread pool 的 thread 數量則根據電腦核心數來決定,當有一個需要長時間的工作進入到 thread pool 時,若沒有足夠的 thread 可用時(或者因為這個工作可能會使 thread pool 為了其他非同步工作配置新的 thread),這個選項提供了一個超額訂閱的功能,為它創建一個額外的 thread,避免它去阻塞當前的 thread pool,至於多久才算是 long running 呢?我在網路上搜尋到的建議是超過半秒就可以使用此參數。

PreferFairness

告訴TaskScheduler以公平的方式來安排工作執行的先後順序,越早安排的工作將可能越早被執行。

RunContinuationsAsynchronously

此選項是避免在某些情況下程式造成 deadlocks(死結),會以非同步(不同 thread 上)的方式執行後續的工作:

static void Main(string[] args)
{
    ThreadPool.SetMinThreads(100, 100);
    Console.WriteLine("Main CurrentManagedThreadId:" + Environment.CurrentManagedThreadId);
    var tcs = new TaskCompletionSource<bool>();
    ContinueWith(1, tcs.Task);
    ContinueWith(2, tcs.Task);
    Task.Run(() =>
    {
        Console.WriteLine("Task Run CurrentManagedThreadId:" + Environment.CurrentManagedThreadId);
        tcs.TrySetResult(true);
    });
    Console.ReadLine();
}

static void print(int id) => Console.WriteLine($"continuation:{id}\tCurrentManagedThread:{Environment.CurrentManagedThreadId}");

static Task ContinueWith(int id, Task task)
{
    return task.ContinueWith(
         t => print(id),
         CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
//Main CurrentManagedThreadId:1
//Task Run CurrentManagedThreadId:4
//continuation:1  CurrentManagedThread:4
//continuation:2  CurrentManagedThread:4

在未設置TaskCreationOption.RunContinuationAsyncchronously情況下時,使用ContinueWith來接續前一個工作都還是會使用在同一個 thread 上,所以我們可以使用TaskCreationOption.RunContinuationAsyncchronously來解決這類問題:

static void Main(string[] args)
{
    ThreadPool.SetMinThreads(100, 100);
    Console.WriteLine("Main CurrentManagedThreadId:" + Environment.CurrentManagedThreadId);
    var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);    ContinueWith(1, tcs.Task);
    ContinueWith(2, tcs.Task);
    Task.Run(() =>
    {
        Console.WriteLine("Task Run CurrentManagedThreadId:" + Environment.CurrentManagedThreadId);
        tcs.TrySetResult(true);
    });
    Console.ReadLine();
}

static void print(int id) => Console.WriteLine($"continuation:{id}\tCurrentManagedThread:{Environment.CurrentManagedThreadId}");

static Task ContinueWith(int id, Task task)
{
    return task.ContinueWith(
         t => print(id),
         CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}

//Main CurrentManagedThreadId:1
//Task Run CurrentManagedThreadId:4
//continuation:1  CurrentManagedThread:5
//continuation:2  CurrentManagedThread:7

等待工作完成

上面提到我們可以透過Task.Result等待 return 值回來,TPL 還提供了其他種 API 供我們使用:

  • Task.Wait
  • Task.WaitAll
  • Task.WaitAny
  • Task.WhenAll
  • Task.WhenAny

Task.Wait

此方法會阻塞當前 thread 直到非同步工作完成,它還有幾種多載方法:

  • Wait(CancellationToken): 等待非同步工作完成或者 cancellation token 接受到取消的命令。
  • Wait(int): 限制非同步工作在有限時間內完成(milliseconds),否則取消等待。
  • Wait(TimeSpan): 等待非同步工作在指定的時間間隔內完成。
  • Wait(int, CancellationToken): 限制非同步工作在有限時間內完成或者 cancellation token 接受到取消的命令,否則取消等待。

Task.WaitAll

此方法可接受多Task,等待每個非同步工作都完成。

Task taskA = Task.Factory.StartNew(() =>
Console.WriteLine("TaskA finished"));
Task taskB = Task.Factory.StartNew(() =>
Console.WriteLine("TaskB finished"));
Task.WaitAll(taskA, taskB);
Console.WriteLine("Calling method finishes");

Task.WaitAny

此方法與Task.WaitAll類似,差別在於只要任一個非同步工作完成,就會取消等待。

Task taskA = Task.Factory.StartNew(() =>
Console.WriteLine("TaskA finished"));
Task taskB = Task.Factory.StartNew(() =>
Console.WriteLine("TaskB finished"));
Task.WaitAny(taskA, taskB);
Console.WriteLine("Calling method finishes");

Task.WhenAll

此方法與Task.WaitAll類似,但它不會阻塞當前 thread,取而代之會 return 一個Task類別的物件。

Task taskA = Task.Factory.StartNew(() =>
Console.WriteLine("TaskA finished"));
Task taskB = Task.Factory.StartNew(() =>
Console.WriteLine("TaskB finished"));
var t = Task.WhenAll(taskA, taskB);
Console.WriteLine("Calling method finishes");
t.Wait();

Task.WhenAny

此方法與Task.WaitAny類似,但它不會阻塞當前 thread,取而代之會 return 一個Task類別的物件。

Task taskA = Task.Factory.StartNew(() =>
Console.WriteLine("TaskA finished"));
Task taskB = Task.Factory.StartNew(() =>
Console.WriteLine("TaskB finished"));
var t = Task.WhenAny(taskA, taskB);
Console.WriteLine("Calling method finishes");
t.Wait();

異常處理

我們來看一段程式碼,一個簡單捕獲非同步工作的 exception:

static void Main(string[] args)
{
    try
    {
        var task = Task.Run(() =>
        {
            int n1 = 0, n2 = 1;
            var result = n2 / n1;
        });

        task.Wait();
    } catch(AggregateException ex)
    {
        Console.WriteLine($"Task has finished with exception { ex.InnerException.Message}");
    }
    Console.ReadLine();
}

//Task has finished with exception Attempted to divide by zero.

AggregateException 用來將多個錯誤合併成單一的 throwable 例外狀況物件。 它常被運用於 TPL 與 PLINQ 中,我們可以透過InnerException.Message來獲取異常拋出的訊息。 但有時候我們可能會用WaitAll組合多個非同步工作,這時我們可以透過AggregateException中的InnerExceptions來獲取拋出的異常訊息:

static void Main(string[] args)
{
    Task taskA = Task.Factory.StartNew(() => throw new DivideByZeroException());
    Task taskB = Task.Factory.StartNew(() => throw new ArithmeticException());
    Task taskC = Task.Factory.StartNew(() => throw new NullReferenceException());

    try
    {
        Task.WaitAll(taskA, taskB, taskC);
    } catch( AggregateException ex)
    {
        foreach (Exception innerException in ex.InnerExceptions)
        {
            Console.WriteLine(innerException.Message);
        }
    }
    Console.ReadLine();
}

//Attempted to divide by zero.
//Overflow or underflow in the arithmetic operation.
//Object reference not set to an instance of an object.

若需要針對特定的 exception 進行處理可以使用AggregateException.Handle,當拋出異常時 return true,代表有在我們能夠處理的範圍內:

static void Main(string[] args)
{
    Task taskA = Task.Factory.StartNew(() => throw new DivideByZeroException());
    Task taskB = Task.Factory.StartNew(() => throw new ArithmeticException());
    Task taskC = Task.Factory.StartNew(() => throw new NullReferenceException());

    try
    {
        Task.WaitAll(taskA, taskB, taskC);
    } catch( AggregateException ex)
    {
        ex.Handle(innerEx =>
        {
            if(innerEx is DivideByZeroException)
            {
                Console.WriteLine(innerEx.Message);
                return true;
            }
            return false;
        });
    }
    Console.ReadLine();
}
//Attempted to divide by zero.
//Unhandled exception. System.AggregateException: One or more errors occurred...

Continuation tasks

若我們需要在非同步工作完成之後執行其它的工作,TPL 提供了一些有用的方法:

  • Task.ContinueWith
  • Task.Factory.ContinueWhenAll
  • Task.Factory.ContinueWhenAll<T>
  • Task.Factory.ContinueWhenAny
  • Task.Factory.ContinueWhenAny<T>

Task.ContinueWith

此方法可以讓我們接續前面的非同步工作,使用的方式與前面介紹的那些差異不大:

static void Main(string[] args)
{
    Task.Run(() =>
    {
        Console.WriteLine("Fetching Data");
        return 200;
    }).ContinueWith((e) =>
    {
        Console.WriteLine("GetData {0}", e.Result);
    });
    Console.ReadLine();
}

//Fetching Data
//GetData 200

ContinueWith可以串連多個工作:

Task.Run(() =>
{
    Console.WriteLine("Fetching Data");
    return 200;
}).ContinueWith((e) =>
{
    Console.WriteLine("GetData {0}", e.Result);
}).ContinueWith(()=>{
    Console.WriteLine("END");
});

TPL 還提供了Threading.Tasks.TaskContinuationOptions這個列舉讓我們控制 continue task 的行為:

  • None: 預設值,只要主要的非同步工作完成(不論是被取消或者拋出異常)就會接續執行。
  • OnlyOnRanToCompletion: 當主要的非同步工作成功的被完成才會執行(既不是被取消或者拋出異常)。
  • NotOnRanToCompletion:: 當主要的非同步工作被取消或者拋出異常時,才會接續執行。
  • OnlyOnFaulted: 只有當主要的非同步工作拋出異常時,才會接續執行。
  • NotOnFaulted: 當主要的非同步工作沒有拋出異常,就會接續執行。
  • OnlyOnCancelled: 只有當主要的非同步工作被取消時,才會接續執行。
  • NotOnCancelled: 當主要的非同步工作沒有被取消,就會接續執行。

Task.Factory.ContinueWhenAllTask.Factory.ContinueWhenAll<T>

ContinueWhenAll可以讓我們同時間執行多個非同步工作,等到所有非同步工作都完成再接續執行另外的工作,與WhenAll類似,它會 return 一個Task類別的物件:

int a = 2, b = 3;
Task<int> taskA = Task.Factory.StartNew<int>(() => a * a);
Task<int> taskB = Task.Factory.StartNew<int>(() => b * b);
Task<int> taskC = Task.Factory.StartNew<int>(() => 2 * a * b);
var t = Task.Factory.ContinueWhenAll<int>(new Task[]
{ taskA, taskB, taskC }, (tasks)
=> tasks.Sum(t => (t as Task<int>).Result));
Console.WriteLine(t.Result);

//25

Task.Factory.ContinueWhenAnyTask.Factory.ContinueWhenAny<T>

ContinueWhenAny可以讓我們同時間執行多個非同步工作,當有任一個非同步工作完成,就會馬上執行,與WhenAll類似,它會 return 一個Task類別的物件:

int number = 13;
Task<bool> taskA = Task.Factory.StartNew<bool>(() =>
number / 2 != 0);
Task<bool> taskB = Task.Factory.StartNew<bool>(() =>
(number / 2) * 2 != number);
Task<bool> taskC = Task.Factory.StartNew<bool>(() =>
(number & 1) != 0);
Task.Factory.ContinueWhenAny<bool>(new Task<bool>[]
{ taskA, taskB, taskC }, (task) =>
{
    Console.WriteLine((task as Task<bool>).Result);
});
Console.ReadLine();

Work Stealing Queues

Work Stealing Queues 這個名詞不只用於 C#當中,它是用於解決 parallel 程序中 multithreaded 分配工作的問題,而 queue 這個資料結構的特徵就是 FIFO,在前面我們提過 TPL 是基於ThreadPoolThreadPool擁有一個 global queue 與多個 threads,每個 thread 又各自擁有自己的 local queue,每個 thread 都有訪問 global queue 的權利:

Thread Pool

當我們在主 thread 創建新的非同步工作時,就會將其排入 global queue 中:

Thread Pool

local queue 的工作來自於該 thread 自己產生的工作,當 thread 把所有工作都完成時,會去 global queue 尋找有無新的工作,若沒有則會再去其他 thread 的 local queue 尋找,我們稱此最佳化技術為 Work-Stealing,且在竊取其他 thread 的工作同時,也會遵循 FIFO 的方式:

Thread Pool

Reference

Copyright © 2023. Papan01