C# Parallel Programming:#3 Data Parallelism

2021-12-24 • 11 min read

前面幾章我們已經針對 Task Parallel Library (TPL)有些基礎概念,在這章中將會使用別的方式針對資料層面進行平行處理(資料平行處理原則)。

以下是這章將會介紹到的部分:

  • 透過 TPL 中的System.Threading.Tasks.Parallel實現資料平行處理原則。
  • 取消資料平行處理。
  • 了解變數的儲存機制於資料平行處理原則中(分割區域變數執行緒區域變數)。

Parallel Loops

當我們想讓資料平行處理,最好的情況下是所以資料都能夠被獨立計算,這樣能使我們在使用平行處裡上獲得最佳的效率,而我們這章要介紹的System.Threading.Tasks.Parallel也比較適合用於這種情況上,下面將介紹三種基本的使用方式:

  • Parallel.Invoke
  • Parallel.For
  • Parallel.ForEach

Parallel.Invoke

這是System.Threading.Tasks.Parallel中最簡單明瞭的使用方式,看一下它所接受的參數就可一目了然:

public static void Invoke(
    params Action[] actions
)

使用這個方法有幾個特點需要注意:

  • 沒有辦法保證執行的順序是根據參數輸入的順序。
  • 這個方法不會有任何傳回值。
  • 不論因為正常終止或異常終止,都會等到所有 Action 結束。

看看一個簡單的範例:

try
{
    Parallel.Invoke(
    new Action(() => Console.WriteLine("Action 1")),
    new Action(() => Console.WriteLine("Action 2")),
    new Action(() => Console.WriteLine("Action 3")),
    new Action(() => Console.WriteLine("Action 4")),
    new Action(() => Console.WriteLine("Action 5")),
    new Action(() => Console.WriteLine("Action 6")),
    new Action(() => Console.WriteLine("Action 7")),
    new Action(() => Console.WriteLine("Action 8")),
    new Action(() => Console.WriteLine("Action 9")),
    new Action(() => Console.WriteLine("Action 10")));
}
catch (AggregateException aggregateException)
{
    foreach (var ex in aggregateException.InnerExceptions)
    {
        Console.WriteLine(ex.Message);
    }
}
Console.WriteLine("Unblocked");
Console.ReadLine();


//Action 3
//Action 1
//Action 6
//Action 2
//Action 5
//Action 4
//Action 8
//Action 7
//Action 9
//Action 10
//Unblocked

當我們需要將一些 Action 進行平行處理時,這個方法可以讓我們很簡單地達成,並且它會等到所有 Action 結束才繼續執行後續的程式碼。若有 exception 會由AggregateException來進行收集的動作,上面可以看到AggregateException.InnerExceptions是可以被迭代的。

Parallel.For

類似平行處理中的for迴圈,它會回傳一個ParallelLoopResult的 instance,其中它有兩個 properties,isCompletedLowestBreakIteration,可以讓我們確認執行的狀態,下面我們來看看此方法的宣告、可能出現的結果與例子:

public static ParallelLoopResult For
(
    Int fromIncalme,
    Int toExclusiveme,
    Action<int> action
)
IsCompleted LowestBreakIteration Reason
True N/A Run to completion
false Null Loop stopped pre-matching
false Non-null integral value Break called on the loop
Parallel.For (0, 100, (i) => Console.WriteLine(i));

此方法的前面兩個參數接受開始與結束的 index,最後面接受一個整數泛型 Action,而在此之外我們可能已經有一組資料可以透過 index 來獲取對應的資料用於此 Action。 而IsCompleted為 false 的情況 就像我們在for迴圈時會使用的break,可以用於中斷迭代,後面會在介紹。

Parallel.ForEach

它與Parallel.For一樣會回傳一個ParallelLoopResult的 instance,使用方式與Parallel.For稍有不同:

public static ParallelLoopResult ForEach<TSource>
(
    IEnumerable<TSource> source,
    Action<TSource> body
);

來看看例子:

List<string> urls = new List<string>() {"www.google.com",
                                        "www.yahoo.com",
                                        "www.bing.com" };
Parallel.ForEach(urls, url =>
{
    Ping pinger = new Ping();
    Console.WriteLine($"Ping Url {url} status is {pinger.Send(url).Status}
    by Task {Task.CurrentId}");
});

//Ping Url www.google.com status is Success By Task 1
//Ping Url www.yahoo.com status is Success By Task 2
//Ping Url www.bing.com status is Success By Task 3

概念上與Parallel.For類似,只是這裡會直接遍歷資料,而Parallel.For則能讓我們選擇資料區間。

Degree of Parallelism

平行處理莫過於想要增加執行速度,基於此情況下我們都知道控制資源的重要性,有時我們會想控制 task 的數量,不希望被過度的創建,degree of parallelism 是指能同時平行執行的數量,上面我們介紹的三種方法可以透過ParallelOptions class 中的MaxDegreeOfParallelism來進行設置,預設為 64:

public static void Invoke(
    ParallelOptions parallelOptions,
    params Action[] actions
);

public static ParallelLoopResult For(
    int fromInclusive,
    int toExclusive,
    ParallelOptions parallelOptions,
    Action<int> body
);

public static ParallelLoopResult ForEach<TSource>(
    IEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Action<TSource> body
);

看看下面例子:

Parallel.For(1, 100, new ParallelOptions { MaxDegreeOfParallelism = 4 },
index =>
{
    Console.WriteLine($"Index {index} executing on Task Id { Task.CurrentId} ");
});

//Index 25 executing on Task Id 2
//Index 1 executing on Task Id 1
//Index 49 executing on Task Id 3
//Index 73 executing on Task Id 4
//Index 26 executing on Task Id 2
//Index 2 executing on Task Id 1
//Index 3 executing on Task Id 1
//Index 4 executing on Task Id 1
//Index 27 executing on Task Id 2
//...

可以看到上面只有 task ID 1、2、3 與 4 被使用到。

中斷迴圈

由於我們無法像for迴圈那樣使用break這種特殊語法來中斷平行處理中的工作,但我們可以使用以下幾種方式來中斷:

  • ParallelLoopState.Break
  • ParallelLoopState.Stop
  • CancellationToken

ParallelLoopState.Break

ParallelLoopState.Break是一個方法,當它被呼叫時,若有成功的中斷我們可以從LowestBreakIteration來得知觸發中斷的最低迭代數,但它不會立刻停止下來,它會等到所有小於LowestBreakIteration的迭代都完成了才停止運行,來看看下面的例子:

var numbers = Enumerable.Range(1, 1000);
Parallel.ForEach(numbers, (i, parallelLoopState) =>
    {
        Console.WriteLine($"For i={i} LowestBreakIteration = {parallelLoopState.LowestBreakIteration} and Task id ={Task.CurrentId}");
        if (i >= 10)
        {
            parallelLoopState.Break();
        }
    });

//For i=6 LowestBreakIteration =  and Task id =1
//For i=4 LowestBreakIteration =  and Task id =4
//For i=1 LowestBreakIteration =  and Task id =2
//For i=2 LowestBreakIteration =  and Task id =5
//For i=3 LowestBreakIteration =  and Task id =3
//For i=5 LowestBreakIteration =  and Task id =6
//For i=8 LowestBreakIteration =  and Task id =8
//For i=7 LowestBreakIteration =  and Task id =7
//For i=17 LowestBreakIteration =  and Task id =7
//For i=10 LowestBreakIteration =  and Task id =1
//For i=11 LowestBreakIteration =  and Task id =4
//For i=12 LowestBreakIteration =  and Task id =2
//For i=13 LowestBreakIteration =  and Task id =5
//For i=14 LowestBreakIteration =  and Task id =3
//For i=15 LowestBreakIteration =  and Task id =6
//For i=16 LowestBreakIteration =  and Task id =8
//For i=9 LowestBreakIteration =  and Task id =9
//For i=21 LowestBreakIteration = 9 and Task id =10
//For i=20 LowestBreakIteration = 9 and Task id =9
//For i=19 LowestBreakIteration = 9 and Task id =1
//For i=18 LowestBreakIteration = 16 and Task id =7

ParallelLoopState.Stop

ParallelLoopState.Stop能更快的停止迭代,因為它與ParallelLoopState.Break差別在與它不會等小於LowestBreakIteration的迭代都完成才停止,多數時候我們可能更傾向於使用ParallelLoopState.Stop

CancellationToken

CancellationToken在前面幾章介紹Task時有使用到過,我們可以創建一把權杖,讓我們在某個時間點進行中斷的動作:

 try
{
    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    Task.Factory.StartNew(() =>
    {
        Thread.Sleep(5000);
        cancellationTokenSource.Cancel();
        Console.WriteLine("Token has been cancelled");
    });

    Parallel.For(0, long.MaxValue, new ParallelOptions() { CancellationToken = cancellationTokenSource.Token }, index =>
    {
        Thread.Sleep(3000);
        Console.WriteLine($"Index {index}");
    });
}
catch (OperationCanceledException)
{
    Console.WriteLine("Cancellation exception caught!");
}

//Index 1152921504606846975
//Index 0
//Index 2305843009213693950
//Index 3458764513820540925
//Index 4611686018427387900
//Index 5764607523034234875
//Index 8070450532247928825
//Index 6917529027641081850
//Index 9223372036854775800
//Index 1
//Token has been cancelled
//Index 1152921504606846976
//Index 4611686018427387901
//Index 1152921504606846977
//Index 2305843009213693951
//Index 2
//Index 3458764513820540926
//Index 5764607523034234876
//Index 6917529027641081851
//Index 8070450532247928826
//Index 2305843009213693953
//Index 9223372036854775801
//Index 4
//Index 3458764513820540928
//Cancellation exception caught!

首先我們先創建一個CancellationTokenSource的物件,接著透過Task使其在五秒後呼叫Cancel()進行中斷動作,接著我們將此物件傳給ParallelOptions.CancellationToken,告訴此平行處理所歸屬的權杖是哪個,上面看到當Cancel()被呼叫了不會立刻停止,原因在於下面那些迭代都屬於正在執行中,概念與上面提到的類似。

變數儲存機制於平行處理中

在執行平行處理時,最終結果通常會儲存於一個全域變數(或者區域變數),但通常這樣的作法會有額外的開銷(跨 thread),若我們每個迭代都直接存取全域變數,會有 race condition 的問題,就算我們透過 lock 的機制來保護,那也變得失去使用平行處理的意義,比較好的做法是各自 thread 使用自己的區域變數,最後再將結果的集合往外面送。 為了減少這些開銷,這裡要介紹如何使用 thread-local-variables(執行緒區域變數)與 partition-local-variables(分割區域變數)。

Thread local variable

這裡我們直接以例子說明,假設現在我們使用四個 Task 來計算 1 加到 60,平均每個 Task 需負責 15 個迭代,下面使用Parallel.For來實作此例子,接著我們再一一介紹:

 var numbers = Enumerable.Range(1, 60);
long sumOfNumbers = 0;
Action<long> taskFinishedMethod = (taskResult) =>
{
    Console.WriteLine($"Sum at the end of all task iterations for task { Task.CurrentId} is { taskResult }");
    Interlocked.Add(ref sumOfNumbers, taskResult);
};

Parallel.For(0, numbers.Count() + 1,
    () => 0, // initialize the thread local variable
    (j, parallelLoopState, subtotal) =>
    {
      subtotal += j;
      return subtotal;
    } // method invoked by the loop on each iteration,
    taskFinishedMethod
);
Console.WriteLine($"The total of 60 numbers is {sumOfNumbers}");

首先numberssumOfNumbers應該沒什麼問題,一個是需要被處理的資料,一個是最後儲存最後結果的變數,接著我們看taskFinishedMethod這個 Action,當Parallel.For中每個Task完成工作後就會執行此Action,其中使用Interlocked來保護變數sumOfNumbers

Parallel.For的參數部分,與前面介紹的有點不同,前兩個參數一樣是開始與結束位置,第三個變數則用於初始化 thread local variable,第四個參數與上面介紹一樣,是邏輯處理的部分,但可以看到它的第三個參數subtotal,它就是這裡所指的 thread local variable,每次迭代都會將這個值傳給下個迭代,當執行完最後一次迭代就會執行taskFinishedMethod,之後再由taskFinishedMethod來處理最後的加總部分。

Partition local variable

Partition local variable 概念上與 thread local variable 幾乎沒什麼區別,差別在於由Task的概念切換成 partition,且多個 partition 可以用在同一個 thread 上,下面使用Parallel.ForEach來實作:

var numbers = Enumerable.Range(1, 60);
long sumOfNumbers = 0;
Action<long> taskFinishedMethod = (taskResult) =>
{
    Console.WriteLine($"Sum at the end of all task iterations for task { Task.CurrentId} is { taskResult }");
    Interlocked.Add(ref sumOfNumbers, taskResult);
};

Parallel.ForEach(numbers, () => 0, (j, parallelLoopState, subtotal) =>
    {
      subtotal += j;
      return subtotal;
    },
    taskFinishedMethod
);
Console.WriteLine($"The total of 60 numbers is {sumOfNumbers}");

從官網上來看,Parallel.For使用 thread local variable,Parallel.ForEach則使用 partition local variable,目前還沒找到其他方式來直接實作類似的概念,不過當我們使用資料平行處理原則時,可以留意一下這部分。

Reference

Copyright © 2023. Papan01