Microsoft .NET Reactive Extensions and .NET Framework Task Parallel Library

In making parallel computing more accessible, Microsoft has also made asynchronous execution more accessible. Nowhere is this fact more apparent than in technologies built from the ground up on these new asynchronous capabilities. Take, for example, Reactive Extensions (Rx); a set of libraries built on LINQ and the Task Parallel Library (TPL).

Rx's LINQ compositional model has garnered a lot of attention in the Rx documentation. This should be no surprise; much of what a developer creates in Rx are LINQ expressions. As stated earlier; Task Parallel Library (TPL) is also an important component in the Rx architecture. In fact, TPL underlies some of Rx's asynchronous behavior. When dealing with issues like Thread affinity; understanding how Rx leverages TPL is essential. Using an Rx sample application, this article will demonstrate how TPL fits within Rx.

Rx is relatively new, so here is a quick introduction.

Rx Introduction

As stated earlier Rx is a set of extensions to the .NET Framework for asynchronously consuming observable collections. Rx is built on .NET Framework components like LINQ and the Task Parallel Library.

Understanding Rx begins with understanding the IObservable, IObserver, and IEnumerable Interfaces. IEnumerable collections are consumed in a "pull-based" fashion. For example: Foreach loop. IObservable are consumed in a "push-based" or rather eventing fashion. The IObservable and IObserver interfaces appear below.

public interface IObservable<out T>
    {
        IDisposable Subscribe(IObserver<T> observer);
    }

    public interface IObserver<in T>
    {
        void OnCompleted();
        void OnError(Exception error);
        void OnNext(T value);
    }

Eventing usually follows a subscribe, receive events, and unsubscribe sort of pattern. IObservable encapsulates this in Subscribe, OnNext, OnError methods. Wrapping something like, for example, text output operations would allow a developer to consume text output in a LINQ query.

As I stated earlier, TPL underlies some of Rx's asynchronous behavior. Before showing where TPL fits into Rx, it's important to know what TPL is.

Task Parallel Library Overview

I think of TPL as a new set of classes that follow an alternative model to traditional .NET Threads and Threading data structures. The core part of TPL is the Task class. Tasks are a sort of wrapper for work to be done. Tasks are assigned a workload, which is usually a delegate or llamda expression. Tasks are a higher level of abstraction than a Thread. A Thread executes the underlying work, but a Task allows a developer to structure and compose the execution of the underlying work, by invoking a Task and even linking a Task's completion to other Tasks.

Task execution is actually handled by a second component called a TaskScheduler. TaskScheduler manages the collection of Threads for a Task class workload. .NET Framework includes a default TaskSheduler, but a developer may want to create a custom TaskScheduler to handle custom workloads.

For the most part, the TPL is hidden within Rx. In fact, guidelines mentioned in the resources below lead a developer to believe that concurrency features of TPL may not always be necessary. However there are areas where TPL surfaces. As I mentioned earlier, I'm going to demonstrate where, using some sample code.

Rx Sample

The sample application appears below.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Concurrency;
using System.Threading;
using System.Threading.Tasks;
using System.Disposables;

namespace Test.Rx.TaskScheduler
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> ob =
            Observable.CreateWithDisposable<int>(o =>
            {
                var cancel = new CancellationDisposable(); 

                Scheduler.NewThread.Schedule(() =>
                {
                    int i = 0;
                    while (true )
                    {
                        Thread.Sleep(200);
                        if (!cancel.Token.IsCancellationRequested) { o.OnNext(i++); }
                        else
                        {
                            Console.WriteLine("Cancel event signaled");
                            o.OnCompleted();
                            break;
                        }
                    }
                }
                );

                return cancel;
            }
            );

            IDisposable subscription = ob.Subscribe(i => Console.WriteLine(i));
            Console.WriteLine("Press any key to cancel. . . ");
            Console.ReadKey();
            subscription.Dispose();
            Console.WriteLine("Press any key to quit. . . ");
            Console.ReadKey();
        }
    }
}

I'll cover the Scheduler class later in the article. For now, understand that Scheduler is the point where a developer can interact with the TPL.

CancellationDisposable is a wrapper for the TPL CancellationToken. The Token property allows access to the CancellationToken. As you can see in the code above; the llamda pauses before checking the cancellation token. A complete discussion of CancellationTokens is beyond the scope of this article.

The sample invokes the CancellationDisposable Token property when the user presses any key. When the Token is invoked; IsCancellationRequested becomes true and the llamda invokes OnCompleted. OnCompleted signals the observer that there are no more events to observe.

Rx Scheduler class is the most interesting part of the sample and as mentioned before the point where a developer can adjust how TPL works with Rx.

Rx Scheduler

The Scheduler class appears below.

    // Summary:
    //     Provides a set of static methods for creating Schedulers. public static class Scheduler
    {
        // Summary:
        //     Gets the scheduler that schedules work as soon as possible on the current
        //     thread. public static CurrentThreadScheduler CurrentThread { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work on the current Dispatcher. public static DispatcherScheduler Dispatcher { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work immediately on the current thread. public static ImmediateScheduler Immediate { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work on a new thread. public static NewThreadScheduler NewThread { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work on the default Task Factory. public static TaskPoolScheduler TaskPool { get; }
        //
        // Summary:
        //     Gets the scheduler that schedules work on the ThreadPool. public static ThreadPoolScheduler ThreadPool { get; }

        // Summary:
        //     Schedules action to be executed recursively. public static IDisposable Schedule(this IScheduler scheduler, Action<Action> action);
        //
        // Summary:
        //     Schedules action to be executed recursively at each dueTime. public static IDisposable Schedule(this IScheduler scheduler, Action<Action<DateTimeOffset>> action, DateTimeOffset dueTime);
        //
        // Summary:
        //     Schedules action to be executed recursively after each dueTime. public static IDisposable Schedule(this IScheduler scheduler, Action<Action<TimeSpan>> action, TimeSpan dueTime);
        //
        // Summary:
        //     Schedules action to be executed at dueTime. public static IDisposable Schedule(this IScheduler scheduler, Action action, DateTimeOffset dueTime);
    }

Each property in the Scheduler class is a portal to changing where Rx invokes the llambda expression. Deciding which Scheduler class property to use depends on the application.

This was a long running llambda so the sample utilized the Scheduler that, according to documentation, creates a new Thread. Had the Immediate or CurrentThread property been used, the application Thread would never have been available to display or collect input.

Dispatcher can be used with Windows Presentation Foundation (WPF). WPF controls can only be adjusted from the Thread they have been created in. At first this may appear to be a limitation, but consider what would happen if two separate Threads attempted to adjust a control at the same time.

Conclusion

Rx is a set of libraries built on LINQ and the Task Parallel Library (TPL). The Rx Scheduler class allows a developer to plug into the underlying TPL. Scheduler includes properties for working with Windows Presentation Foundation (WPF) and even a custom TPL TaskScheduler.

Sources

Reactive Framework Rx Wiki

"Understanding Tasks in .NET Framework 4.0 Task Parallel Library"



About the Author

Jeffrey Juday

Jeff is a software developer specializing in enterprise application integration solutions utilizing BizTalk, SharePoint, WCF, WF, and SQL Server. Jeff has been developing software with Microsoft tools for more than 15 years in a variety of industries including: military, manufacturing, financial services, management consulting, and computer security. Jeff is a Microsoft BizTalk MVP. Jeff spends his spare time with his wife Sherrill and daughter Alexandra.

Related Articles

Comments

  • There are no comments yet. Be the first to comment!

Leave a Comment
  • Your email address will not be published. All fields are required.

Top White Papers and Webcasts

  • Live Webinar Tuesday, August 26, 2014 1:00 PM EDT Customers are more empowered and connected than ever, and the customer's journey has grown more complex. Their expectations are growing and trust is diminishing as they may interact with multiple brands through web, mobile and social channels. Considering 70% of the buying process in a complex sale is already complete before prospects are willing to engage with a live salesperson -- it's critical to understand your customers and anticipate their needs.* …

  • Packaged application development teams frequently operate with limited testing environments due to time and labor constraints. By virtualizing the entire application stack, packaged application development teams can deliver business results faster, at higher quality, and with lower risk.

Most Popular Programming Stories

More for Developers

Latest Developer Headlines

RSS Feeds