Parallel Linq
Parallel Linq is an extension to process your query in a parallel manner and Linq style. Just like Parallel class, Parallel Linq is based on Task and are all blocking operations too.
ParallelEnumerable.AsParallel: to convert a enumerable to aParallelQuery<T>- Parallel counterpart of linq operators such as
Select,Sum... ParallelEnumerable.AsSequential: to convert aParallelQuery<T>back to normalIEnumerable<T>ParallelEnumerable.WithCancellation: pass cancellation tokenParallelEnumerable.WithMergeOptions: control how to buffer the scheduled iterations
ParallelQuery is IEnumerable
ParallelQuery<T> is a simple wrapper on IEnumerable<T>, some of the implementations were overridden in the class itself, the rest were implemented as extension on ParallelEnumerable. The compiler would choose another version of linq operator from ParallelEnumerable extension when the compile-time type is a ParallelQuery<T>.
public class ParallelQuery<TSource> : ParallelQuery, IEnumerable<TSource> { /* ... */ }AsParallel & AsSequential & AsEnumerable
ParallelEnumerable.AsParallel()is for converting a enumerable toParallelQueryParallelEnumerable.AsSequential()is an extension dedicated forParallelQuery<T>- does not change runtime type of source
- but would notify the compiler to force the source to pick general implementations from
Enumerableextension - all subsequent operations would became sequential which is not in parallel
ParallelEnumerable.AsEnumerable()is a common extension onIEnumerable<T>, howeverParallelEnumerable.AsEnumerableexists to unwrap the backing enumerable when working withParallelQuery<T>- identical to
ParallelEnumerable.AsSequential()
- identical to
_ = Enumerable.Range(1, 100)
.AsParallel()
.Select(x => x) // ParallelEnumerable.Select
.AsEnumerable()
.Select(x => x) // Enumerable.Select
.AsSequential()
.Select(x => x) // Enumerable.Select2
3
4
5
6
7
NOTE
They're all deferred execution
Preserve Ordering
AsOrdered makes sure the subsequent operation in parallel would preseve order of the original enumerable.
var seq = Enumerable.Range(1, 100);
var ordered = Enumerable.Range(1, 100)
.AsParallel()
.AsOrdered()
.Select(x => x);
Console.WriteLine(seq.SequenceEqual(ordered)); // always true2
3
4
5
6
7
However, preseving ordering would be consuming anyway, so you could disable it when ordering does not matters anymore using ParallelEnumerable.AsUnordered
var ordered = Enumerable.Range(1, 100)
.AsParallel()
.AsOrdered()
.Select(x => x)
.AsUnordered(); // cancel the ordering preservation2
3
4
5
NOTE
Cancellation & Exception
It's basically a functional version of parallel loop, so exception handling and cancellation is just as the same as Parallel class.
- use
ParallelEnumerable.WithCancellationto specify a cancellation token - cancellation is special so please catch it as
OperationCanceledException - other exceptions from all iterations are just wrapped in
AggregateException - remember to evaluate the query otherwise such cancellation or exception would never be triggered
var parallelSeq = ParallelEnumerable.Range(1, 100);
var cts = new CancellationTokenSource(2000);
var query = parallelSeq
.WithCancellation(cts.Token)
.Select(x => {
Thread.Sleep(2000);
cts.Token.ThrowIfCancellationRequested();
if (int.IsOddInteger(x)) throw new Exception("a normal exception was thrown");
return x * x;
});
try {
// you must consume query
_ = query.ToList();
} catch (AggregateException ex) {
ex.Handle(iex => {
switch (iex) {
case Exception:
Console.WriteLine(ex.Message);
return true;
default:
return false;
}
});
} catch (OperationCanceledException) {
Console.WriteLine($"{nameof(OperationCanceledException)} was thrown");
}2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Merge Option
Parallel iterations were scheduled as groups, so buffering is enabled by default and the size of group is dependent on the system.
ParallelMergeOptions.AutoBuffered: the default option, buffering made by internal designParallelMergeOptions.Default: alias toAutoBufferedParallelMergeOptions.FullyBuffered: source not available until all iteration were finishedParallelMergeOptions.NotBuffered: yield item immediately whenever available
var query = ParallelEnumerable.Range(1, 100);
.WithMergeOptions(ParallelMergeOptions.FullyBuffered)
.Select(x => {
Thread.Sleep(Random.Shared.Next(100));
Console.WriteLine("produced");
return x;
});
foreach (var _ in query) {
Console.WriteLine("consumed");
}
// consume only happens after all were produced when FullyBuffered is specified
// produced
// produced
// produced
// ...
// consumed
// consumed
// consumed2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var query = ParallelEnumerable.Range(1, 100);
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x => {
Thread.Sleep(Random.Shared.Next(100));
Console.WriteLine("produced");
return x;
});
foreach (var _ in query) {
Console.WriteLine("consumed");
}
// consuming happens as long as one was available
// produced
// consumed
// produced
// consumed
// produced
// ...
// consumed2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Performance Enhancement
Local Storage
ParallelEnumerable.Aggregate is exactly the role the perform local storage, the following example uses one of its most flexible overload.
var size = Directory.EnumerateFiles(@"c:/Users//User/Projects/nix-config", "*", SearchOption.AllDirectories)
.AsParallel()
.Aggregate(
seed: 0L,
updateAccumulatorFunc: (localSum, curr) => localSum + new FileInfo(curr).Length, // iteration
combineAccumulatorsFunc: (sum, localSum) => sum + localSum, // add up when each group was finished
resultSelector: i => i / 1024D // post action to transform the result
);
Console.WriteLine($"size in kb: {size}"); 2
3
4
5
6
7
8
9
10