Parallel Linq
Parallel Linq is an extension to process your query in a parallel manner and Linq style. Just like Parallel
class, Parallel Linq is based on Task
and are all blocking operations too.
ParallelEnumerable.AsParallel
: to convert a enumerable to aParallelQuery<T>
- Parallel counterpart of linq operators such as
Select
,Sum
... ParallelEnumerable.AsSequential
: to convert aParallelQuery<T>
back to normalIEnumerable<T>
ParallelEnumerable.WithCancellation
: pass cancellation tokenParallelEnumerable.WithMergeOptions
: control how to buffer the scheduled iterations
ParallelQuery
is IEnumerable
ParallelQuery<T>
is a simple wrapper on IEnumerable<T>
, some of the implementations were overridden in the class itself, the rest were implemented as extension on ParallelEnumerable
. The compiler would choose another version of linq operator from ParallelEnumerable
extension when the compile-time type is a ParallelQuery<T>
.
public class ParallelQuery<TSource> : ParallelQuery, IEnumerable<TSource> { /* ... */ }
AsParallel
& AsSequential
& AsEnumerable
ParallelEnumerable.AsParallel()
is for converting a enumerable toParallelQuery
ParallelEnumerable.AsSequential()
is an extension dedicated forParallelQuery<T>
- does not change runtime type of source
- but would notify the compiler to force the source to pick general implementations from
Enumerable
extension - all subsequent operations would became sequential which is not in parallel
ParallelEnumerable.AsEnumerable()
is a common extension onIEnumerable<T>
, howeverParallelEnumerable.AsEnumerable
exists to unwrap the backing enumerable when working withParallelQuery<T>
- identical to
ParallelEnumerable.AsSequential()
- identical to
_ = Enumerable.Range(1, 100)
.AsParallel()
.Select(x => x) // ParallelEnumerable.Select
.AsEnumerable()
.Select(x => x) // Enumerable.Select
.AsSequential()
.Select(x => x) // Enumerable.Select
2
3
4
5
6
7
NOTE
They're all deferred execution
Preserve Ordering
AsOrdered
makes sure the subsequent operation in parallel would preseve order of the original enumerable.
var seq = Enumerable.Range(1, 100);
var ordered = Enumerable.Range(1, 100)
.AsParallel()
.AsOrdered()
.Select(x => x);
Console.WriteLine(seq.SequenceEqual(ordered)); // always true
2
3
4
5
6
7
However, preseving ordering would be consuming anyway, so you could disable it when ordering does not matters anymore using ParallelEnumerable.AsUnordered
var ordered = Enumerable.Range(1, 100)
.AsParallel()
.AsOrdered()
.Select(x => x)
.AsUnordered(); // cancel the ordering preservation
2
3
4
5
NOTE
Cancellation & Exception
It's basically a functional version of parallel loop, so exception handling and cancellation is just as the same as Parallel
class.
- use
ParallelEnumerable.WithCancellation
to specify a cancellation token - cancellation is special so please catch it as
OperationCanceledException
- other exceptions from all iterations are just wrapped in
AggregateException
- remember to evaluate the query otherwise such cancellation or exception would never be triggered
var parallelSeq = ParallelEnumerable.Range(1, 100);
var cts = new CancellationTokenSource(2000);
var query = parallelSeq
.WithCancellation(cts.Token)
.Select(x => {
Thread.Sleep(2000);
cts.Token.ThrowIfCancellationRequested();
if (int.IsOddInteger(x)) throw new Exception("a normal exception was thrown");
return x * x;
});
try {
// you must consume query
_ = query.ToList();
} catch (AggregateException ex) {
ex.Handle(iex => {
switch (iex) {
case Exception:
Console.WriteLine(ex.Message);
return true;
default:
return false;
}
});
} catch (OperationCanceledException) {
Console.WriteLine($"{nameof(OperationCanceledException)} was thrown");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Merge Option
Parallel iterations were scheduled as groups, so buffering is enabled by default and the size of group is dependent on the system.
ParallelMergeOptions.AutoBuffered
ParallelMergeOptions.Default
: alias toAutoBuffered
ParallelMergeOptions.FullyBuffered
: source not available until all iteration were finishedParallelMergeOptions.NotBuffered
: yield item immediately whenever available
var query = ParallelEnumerable.Range(1, 100);
.WithMergeOptions(ParallelMergeOptions.FullyBuffered)
.Select(x => {
Thread.Sleep(Random.Shared.Next(100));
Console.WriteLine("produced");
return x;
});
foreach (var _ in query) {
Console.WriteLine("consumed");
}
// consume only happens after all were produced when FullyBuffered is specified
// produced
// produced
// produced
// ...
// consumed
// consumed
// consumed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var query = ParallelEnumerable.Range(1, 100);
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x => {
Thread.Sleep(Random.Shared.Next(100));
Console.WriteLine("produced");
return x;
});
foreach (var _ in query) {
Console.WriteLine("consumed");
}
// consuming happens as long as one was available
// produced
// consumed
// produced
// consumed
// produced
// ...
// consumed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Performance Enhancement
Local Storage
ParallelEnumerable.Aggregate
is exactly the role the perform local storage, the following example uses one of its most flexible overload.
var size = Directory.EnumerateFiles(@"c:/Users//User/Projects/nix-config", "*", SearchOption.AllDirectories)
.AsParallel()
.Aggregate(
seed: 0L,
updateAccumulatorFunc: (localSum, curr) => localSum + new FileInfo(curr).Length, // iteration
combineAccumulatorsFunc: (sum, localSum) => sum + localSum, // add up when each group was finished
resultSelector: i => i / 1024D // post action to transform the result
);
Console.WriteLine($"size in kb: {size}");
2
3
4
5
6
7
8
9
10