Thread Coordination Primitives
Barrier
Barrier
allows multiple threads to execute in same phase and not moving to next phase until all participant signaled.
Barrier
must have a specified count of participant threadsbarrier.SignalAndWait()
increases the inner counter inBarrier
- phase is completed when the number of threads signaled reaches the count of barrier
- new phase is started when any thread signaled made the counter increased from 0 to 1
- each thread would not continue until sufficient count of threads have signaled(participated)
// you may register a event on phase finished
Barrier barrier = new(3, b => {
Console.WriteLine($"phase {b.CurrentPhaseNumber} has finished");
});
var tasks = Enumerable.Range(1, 3).Select(_ => {
return Task.Run(() => {
// notifying barrier that this thread has entered
// the counter inside increments
// and all participants would continue when the count hits limit
// and the count would reset to the sepcified
barrier.SignalAndWait(); // signal for the first time to enter phase 0
Console.WriteLine($"Task {Task.CurrentId} is handling phase {barrier.CurrentPhaseNumber}");
barrier.SignalAndWait(); // signal again indicating to enter phase 1
});
});
Task.WaitAll(tasks);
// phase 0 has finished
// Task 1 is handling phase 1
// Task 3 is handling phase 1
// Task 2 is handling phase 1
// phase 1 has finished
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CountDownEvent
CountDownEvent
is a primitive to block certain threads until a count down has reached
CountdownEvent @event = new(3);
var waiting = Task.Run(() => {
Console.WriteLine("This task is waiting for count down completion");
@event.Wait(); // blocking until count down finished
Console.WriteLine("Task finished after count down");
});
var countdown = Enumerable.Range(1, 3).Select(_ => {
return Task.Run(() => {
Thread.Sleep(1000);
Console.WriteLine("count down");
@event.Signal();
});
});
Task.WaitAll([waiting, .. countdown]);
// This task is waiting for count down finished
// count down
// count down
// count down
// Task finished after count down
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
NOTE
You may increase a count dynamically using AddCount
or TryAddCount
ManualResetEventSlim
ManualResetEventSlim
behaves somewhat like a one-time counter that counts up to 1, it can be used to implement a continuation.
// optionally set initialState impling whther the event is already signaled
ManualResetEventSlim @event = new(initialState: false);
var waiting = Task.Run(() => {
Console.WriteLine("waiting for one time signal");
@event.Wait();
Console.WriteLine("post action triggered");
@event.Wait();
if (@event.Wait(1000)) { }
Console.WriteLine("still reachable since the event has to be reset manually to regain blocking");
// reset the count to regain blocking
@event.Reset();
@event.Wait(); // infinite blocking since now no signal would be sent again
});
var count = Task.Run(() => {
Console.WriteLine("perform something and then signal");
@event.Set();
});
waiting.Wait();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
NOTE
Use AutoResetEvent
if auto reset on count is required which automatically reset state when wait succeeded
AutoResetEvent @event = new(initialState: false);
var waiting = Task.Run(() => {
Console.WriteLine("waiting for one time signal");
_ = @event.WaitOne();
Console.WriteLine("post action triggered");
_ = @event.WaitOne(); // would hang here forever since state was reset automatically
Console.WriteLine("not reachable!!");
});
var countdown = Task.Run(() => {
Console.WriteLine("perform something and then signal");
_ = @event.Set();
});
waiting.Wait();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SemaphoreSlim
SemaphoreSlim
allows limited count of multiple threads to execute concurrently and release the hold dynamically.
semaphore.Wait(...)
: would block current thread when count is 0, so you should release to increase the count somewhere to make sure it continuessemaphore.Release(int?)
: to release specific count so other threads could continue- returns a previous count of semaphore
maxCount
on constructor came into play since release is a manual control.- when
maxCount
was reached,SemaphoreFullException
would be thrown.
So the count is essentially a limit of how many threads could continue for now, if the count reaches 0, all other threads not entered semaphore have to wait. Remember to release to get the count back so other thread could enter semaphore later.
using System.Diagnostics;
using System.Runtime.ConstrainedExecution;
// three in a row, can dynamically increase the count to 10 at most
SemaphoreSlim semaphore = new(initialCount: 3, maxCount: 10);
int shared = 0;
var tasks = Enumerable.Range(1, 100).Select(n => {
return Task.Run(() => {
semaphore.Wait(); // would block when count is 0
Thread.Sleep(1000);
Console.WriteLine(n); // order is not guaranteed by semaphore
Interlocked.Add(ref shared, 1); // multiple thread would still come in so protection is neeeded
_ = semaphore.Release();
});
});
Task.WaitAll(tasks);
Debug.Assert(shared is 100);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24