Understanding .NET Framework Task Parallel Library Cancellations

Microsoft .NET Parallel
Computing
Task Parallel
Library (TPL)
includes mechanisms for composing and scheduling work on
a ThreadPool. In an ideal software world composing and scheduling would be all
that is needed. Unfortunately, a developer must deal with exceptions and may
want to interrupt work for any number of reasons.

Luckily, TPL includes Cancellations and AggregateExceptions
for interrupting work and dealing with collections of exceptions often
generated when multiple Tasks are interrupted. There are patterns to
implementing Cancellations and this article demonstrates some of the more
common patterns.

Overview

Running asynchronous operations requires some unique data
structures. Code is often packaged in one part of a subroutine, but executed
elsewhere in the .NET
infrastructure. Separate data structures must handle the: packaging,
execution, and mediation between the packaging and execution.

For example: aborting some scheduled work is not as simple
as executing a break. Concurrent work can create concurrent exceptions.
Something is needed to collect exceptions and funnel them back to the
appropriate “catch” statement. Aborting executing or scheduled work is the
realm of the Cancellation.

Below is an example that demonstrates a common way to
implement a Cancellation.

var cts = new CancellationTokenSource();
var token = cts.Token;

var task = Task.Factory.StartNew<string>(() =>
	{
		while (true)
		{
			Thread.Sleep(50);
			Console.WriteLine("Loop tick");
			token.ThrowIfCancellationRequested();
		}

		return "Did you get here?";
	}
,token);

try
{
	task.Wait(500);
	cts.Cancel();
	task.Wait();
}
catch (AggregateException aexp)
{
	Console.WriteLine("Aggregate Exception was thrown");
}

Console.WriteLine("IsCancelled == " + task.IsCanceled.ToString() + " Status == " +
task.Status.ToString());
Console.WriteLine("Press any key to quit...");
Console.ReadKey();

The code works like this:

  • A CancellationSource class creates a new CancellationToken.
  • TaskFactory creates and starts a new Task. The Task runs a while
    loop that simulates work, putting the Thread to sleep every 50 milliseconds. The
    Task loops and checks the status of the CancellationToken.
  • The Task throws an OperationCancelledException when the
    CancellationToken status changes to cancelled. The OperationCancelledException
    is translated to a TaskCancelException by TPL.
  • Task.Wait allows a few iterations of the Task class loop to
    execute.
  • The CancellationTokenSource.Cancel method sets the
    CancellationToken status.
  • The TPL packages the TaskCancelException into an
    AggregateException and funnels the exception back to the “try” statement around
    the Task.Wait code.

The sample makes extensive use of Llamda
expressions
. In particular, as you’ll see in the other samples in this
article, the code creates a Closure. The Llamda is a function defined within the Main method. The compiler ensures
that the variables in the Llamda are in-scope even if, for example, the Main
function may have exited.

CancellationToken is the focal point for Cancellations.

CancellationTokens

Methods in the CancellationToken class appear below.

public struct CancellationToken
{
public CancellationToken(bool canceled);
	public static bool operator ==(CancellationToken left, CancellationToken right);
	public bool CanBeCanceled { get; }

	public bool IsCancellationRequested { get; }
	public static CancellationToken None { get; }
	public WaitHandle WaitHandle { get; }
	public bool Equals(CancellationToken other);

	public override bool Equals(object other);
	public CancellationTokenRegistration Register(Action callback);

	public CancellationTokenRegistration Register(Action<object> callback, object state);
	public CancellationTokenRegistration Register(Action callback, bool useSynchronizationContext);
	public CancellationTokenRegistration Register(Action<object> callback, object state, bool useSynchronizationContext);
	public void ThrowIfCancellationRequested();
}

 

The sample used the ThrowIfCancelRequested method, but could
have checked the IsCancellationRequested and manually thrown an
OperationCancelledException. Developers who have been doing .NET development
for some time find code above a bit odd.

The first oddity is that StartNew accepts a
CancellationToken parameter, yet the Task must check for a status change. Why
not just delete the Task from memory? Consider this; a running Task may have
allocated resources and aborting a Task my not properly release those
resources. Passing the CancellationToken allows TPL to check the status of the
Token before scheduling and executing the Task.

It may also seem odd that exceptions are used to abort a
running process, but, as the documentation in the resources point out;
exceptions should be generated when code fails to complete and a cancelled Task
is incomplete.

Dealing with Exceptions

The sample below extends the original example and
demonstrates one way to handle the TPL generated exceptions.

var cts = new CancellationTokenSource();
var token  = cts.Token;

var exHandler = new Func<System.Exception,bool>  ( e =>
{
	Console.WriteLine(
    e.Message
    );
  return true;
}
);

var task = Task.Factory.StartNew<string>(() =>
{
	while (true)
  {
		Thread.Sleep(50);
  	Console.WriteLine("Loop tick");
		if (token.IsCancellationRequested) { throw new OperationCanceledException("Hey!! I was cancelled..."); }
  }

  return "Did you get here?";
}
, token);
try
{
	task.Wait(500);
  cts.Cancel();
  task.Wait();
} catch (AggregateException aexp)
{
  	aexp.Flatten().Handle(exHandler);
		Console.WriteLine("Aggregate     Exception was thrown..");
}

Console.WriteLine("IsCancelled == " + task.IsCanceled.ToString() + " Status == " + task.Status.ToString()); 
Console.WriteLine("Press any key to quit..."); 
Console.ReadKey();

Exceptions often contain collections of InnerExceptions.
AggregateException extends the Exception class adding more internal Exception
iteration and processing capabilities to the Exception class.

The example invokes the Flatten and then the Handle method. Flatten recursively consolidates the InnerExceptions of all of the contained
AggregateExeceptions, returning the union of all the InnerException classes. Handle
executes an Action on each Exception contained in the AggregateException.

Thus far the article demonstrated CancellationTokens used in
conjunction with the Task class. CancellationToken usage extends beyond the
Task class.

CancellationTokens Elsewhere

The code below demonstrates how a developer can use the
CancellationToken Register as a Callback mechanism.

var cts = new CancellationTokenSource();
var token = cts.Token;
var keepRunning = true;
 

var task = Task.Factory.StartNew<string>(() =>
{
	while (keepRunning)
	{
		Thread.Sleep(50);
		Console.WriteLine("Loop tick");
	}

 return "Did you get here?";
}
, token);

token.Register(new Action(() =>
{
	keepRunning = false;
}
)
);

token.Register(new Action(() =>
{
	Console.WriteLine("Looks like it's called twice..");
}
)
);

try
{
	task.Wait(500);
	cts.Cancel();
	task.Wait();
}

catch (AggregateException aexp)
{
	Console.WriteLine(aexp.Message);
}

Console.WriteLine("IsCancelled == " + task.IsCanceled.ToString() + " Status == " + task.Status.ToString());
Console.WriteLine("Press any key to quit...");
Console.ReadKey();

The Register method is called twice. When the
CancellationToken is cancelled the Registered code is invoked; once to
terminate the while loop and another time to print some text. Also, because
the Task exited normally, there was no Exception to handle.

CancellationTokens parameters also appear in other TPL data
structures. The example below demonstrates how the BlockingCollection Take
method can break in response to a Cancellation.

var cts = new CancellationTokenSource();
var token = cts.Token;
var blockingCol = new BlockingCollection<string>();

var task = Task.Factory.StartNew<string>(() =>
{
	var curVal = 0;

	while (curVal < 10)
{
		Thread.Sleep(50);
		Console.WriteLine("Loop tick");
		++curVal;
		blockingCol.Add(curVal.ToString());
}

	cts.Cancel();

	return "Did you get here?";
}
);

try
{
	while (true)
	{
		Console.WriteLine(blockingCol.Take(token));
	}
}
catch (OperationCanceledException ex)
{
	Console.WriteLine(ex.Message);
}

Console.WriteLine("Press any key to quit...");
Console.ReadKey();


Like CancellationToken behavior elsewhere, an Exception is
generated. A complete introduction to all the BlockingCollection features is
beyond the scope of this article. However, Introducing
the .NET Framework 4.0 Task Parallel Library BlockingCollection
contains
helpful material.

Conclusion

.NET Framework Task Parallel Library (TPL) Cancellations are
used throughout the TPL. Much of the TPL Cancellation functionality is handled
by the CancellationToken and CancellationTokenSource. Patterns to implementing
Cancellations require a developer to rethink traditional Exception handling.

Resources

Llamda
Expressions

Aggregate
Exception Class

Using
Cancellation Support in .NET Framework 4.0

More by Author

Must Read