Building an Azure Queuing Message Pipeline

All Azure Queuing mechanisms support a byte array payload.  Serialization and deserialization as demonstrated in this article http://www.codeguru.com/columns/experts/.net-azure-queuing-and-serialization.htm  move from payload to concrete class.  However, as the number of distinct message payloads grows a developer eventually needs to build layers of indirection into the message deserialization and serialization process.  Often the approach requires a series of stages applied to the message.

A Pipeline is a common pattern to process something in stages.  Developers following a Pipeline pattern join the input of one class or method to the output of a preceding class or method.  To facilitate a Pipeline implementation a message should be wrapped in layers; essentially wrapping a message within a message.  Each stage in the Pipeline then operates on a different layer.  Envelope patterns are a common way to create message layers.  Outer more general message layers are wrapped outside of more specific inner message layers.  What follows is an approach to implementing a message Pipeline utilizing a message Envelope pattern.

The Envelope

The Envelope patterns exactly what a developer would assume it patterns.  It’s a wrapper for varied contents.  Following is the sample code envelope design.

[DataContract(Namespace = "http://schemas.datacontract.org/2004/07/Message.Services", Name = "MessageEnvelope")]
public sealed class MessageEnvelope
{
    #region Properties
    [DataMember]
    public DateTime CreateDateTime { get; set; }
    [DataMember]
    public string InternalVersion { get; set; }
    [DataMember]
    public string MessageType { get; set; }
    [DataMember]
    public string Payload { get; set; }
 
    #endregion
 
    #region Initialization and Construction
 
    private void InitProperties()
    {
        this.Payload = null;
        this.CreateDateTime = DateTime.Now;
        var assm = Assembly.GetExecutingAssembly().GetName();
        this.InternalVersion = assm.FullName;
 
        this.MessageType = "";
    }
 
    public MessageEnvelope(string payload)
    {
        this.InitProperties();
        this.Payload = payload;
    }
 
    #endregion
}
 
 

There are two parts to the Envelope; a MessageType identifying the payload, and a Payload property containing the message payload.   The MessageType is a string though it could have easily been an integer, Enum, or a URI value.  A string can be converted into any of the other datatypes so it’s the most flexible choice.  The Payload is a string because the message contents will be a serialized to XML.  A complete review of the Serialization involved in the process is beyond the scope of this article; however you’ll find a good review in the resources at the end of the article.  Serialization and Deserialization is captured in the following code.

public static class PayloadTransformServices
{
public static T DeSerializeToClass<T>(string xmlData, List<Type> internalTypes)
{
    T obj;
    DataContractSerializer serializer = new DataContractSerializer(internalTypes[0], internalTypes);
    var reader = new XmlTextReader(new StringReader(xmlData));
    obj = (T)serializer.ReadObject(reader);
    return obj;
}
 
public static StringBuilder SerializeToStringBuilder(object obj, List<Type> internalTypes)
{
    var sb = new StringBuilder();
    var xmlWrite = new XmlTextWriter(new StringWriter(sb));
    var mainTypeIndex = 0;
 
    for (int n = 0; n < internalTypes.Count; ++n)
    {
        if (obj.GetType() == internalTypes[n])
        {
            mainTypeIndex = n;
        }
    }
 
    DataContractSerializer serializer = new DataContractSerializer(internalTypes[mainTypeIndex], internalTypes);
    serializer.WriteObject(xmlWrite, obj);
    return sb;
}
}
 

The internalTypes parameter is required; because a class may be composed of other internal classes.  Serialization only works on primitives unless the serializer is directed to use other classes.  When a developer is working with WCF and Serialization; for security reasons, WCF Messages have buffer limitations.  There are default settings on the message and on the message contents.  For example: the Payload property could easily exceed the default message size for message property contents.  Developers may need to adjust code like in the following sample if message sizes exceed these defaults.

int KB = 1024;
Binding binding = null;
int maxMessageSize = 256 * KB;
var testBinding = new BasicHttpBinding();
 
testBinding.MaxBufferSize = maxMessageSize;
testBinding.MaxReceivedMessageSize = testBinding.MaxBufferSize;
testBinding.ReaderQuotas.MaxStringContentLength = testBinding.MaxBufferSize;
 

Contents like MessageType could have been stored in the underlying message header and, indeed a developer could lift this information out of the Envelope into the system message header.  However it often helps with testing and reusability to make the message independent of the underlying messaging system.  For example: if a developer wanted to reuse code across Azure Queues and Azure Service Bus implementing the Envelope in a separate class would allow code sharing across the messaging systems.  The reusable code would work on the Envelope after unpacking it to the more generic Envelop class.

When an application shares messages it’s often helpful to separate out the shared format from the message manipulation and transformation operations.  One approach to separation is to move message operations into extension methods.

Envelope Extension Methods

Extension Methods allow a developer to add Methods to a class without implementing the methods in the class.  For a good demonstration on implementing Extension Methods a developer should examine the LINQ components in .NET.  Following are Extension Methods that operate on the Envelope.

public static void EncodePayload(this MessageEnvelope envelope, object rawPayload,List<IPayloadTransformation> encodeTrans)
{
    object obj = null;            
    bool firstOne = true;
 
    foreach (var trans in encodeTrans)
    {
        if (firstOne)
        {
            obj = trans.RunTransformation(rawPayload);
            firstOne = false;
        }
        else
        { 
            obj = trans.RunTransformation(obj);
        }
    }
 
    envelope.Payload = obj.ToString();
}
 
public static T DecodePayload<T>(this MessageEnvelope envelope, List<IPayloadTransformation> decodeTrans)
{
    object obj = null;
    bool firstOne = true;
 
    foreach (var trans in decodeTrans)
    {
        if (firstOne)
        {
            obj = trans.RunTransformation(envelope.Payload);
            firstOne = false;
        }
        else
        {
            obj = trans.RunTransformation(obj);
        }
    }
 
    return (T)obj;
}
 

Methods utilize an IPayloadTransformation interface.  The interface is demonstrated in the following code.

public interface IPayloadTransformation
{
    object RunTransformation(object input);
}
 

An IPayloadTransformation interface implementation follows.  The implementation leverages the Serialization code demonstrated earlier in the article.

public interface IPayloadTransformation
{
    object RunTransformation(object input);
}
 
 
internal sealed class SerializedTransform : IPayloadTransformation
{
private List<Type> _sendTypes = new List<Type>();
private TransformDirection _direction = TransformDirection.None;
 
public SerializedTransform(TransformDirection direction,Type resultType = null)
{
    _direction = direction;
 
    if (_direction == TransformDirection.RawToResult)
    {
        _sendTypes.Add(typeof(AnotherContract));
        _sendTypes.Add(typeof(Another2Contract));
    }
    else
    { _sendTypes.Add(resultType); }
}
 
#region IPayloadTransformation Members
 
public object RunTransformation(object input)
{
    object obj = null;
 
    switch (_direction)
    {
        case TransformDirection.RawToResult:
            obj = PayloadTransformServices.SerializeToStringBuilder(input, _sendTypes);
            break;
        case TransformDirection.ResultToRaw:
            obj = PayloadTransformServices.DeSerializeToClass<object>(input as string, _sendTypes);
            break;
    }
 
    return obj;
}
 
#endregion
}
 
 

Extension Methods were not entirely necessary here.  The main benefit to using an Extension Method is that the developer could, for example, vary message transformation operations across different platforms and provide a Mobile and Server version.

Extension Methods also force a developer to keep the Envelope class simple.  Because Extension Methods cannot “see” private scoped variables the methods won’t be working on internal parts of the message.  Therefore a message assembly can remain static even as operations on the message vary.

The Envelope and Envelope Extension Methods work with another set of classes.

Pipeline and Factory

The Factory pattern is often useful when a developer should hide complicated class creation.  There may be internal details to building a Pipeline that shouldn’t bleed into the core Envelope components.  Separating creation into a Factory also allows building different Pipelines dependent on the MessageType in the Envelope.  The Pipeline and Factory interfaces follow.

public interface IMessageEnvelopePipeline
{
    MessageEnvelope GetResponse();
}
 
public interface IMessageEnvelopePipelineFactory
{
    IMessageEnvelopePipeline Create(Uri originatingEndpoint,MessageEnvelope envelope);
}
 

The approach enforced by the interfaces encourages a developer to store Envelopes within Envelopes.  So, for example, a developer may want to implement multiple components; a more general encryption component and a more specific internal component.  The approach, when coupled with the IPayloadTransformation interface, encourages layering.  One might think that there is overhead in this approach and a developer would be correct, there is overhead.  Consider though, the overhead is a small percentage of the total time required to move the message across the network or across the Internet.

The URI parameter differentiates messages arriving from multiple endpoints.  Internal code would parse the URI or use something like a UriTemplate class to dispatch processing into deeper parts of the application.  The following code demonstrates how a developer implements the interfaces.

internal sealed class UnhandledEnvelopePipeline : IMessageEnvelopePipeline
{
    private MessageEnvelope _envelope = null;
 
    internal UnhandledEnvelopePipeline(MessageEnvelope envelope)
    {
        _envelope = envelope;
    }
 
    public MessageEnvelope GetResponse()
    {
        var msg = new MessageEnvelope(new Exception("EXCEPTION: Unhandled envelope of type " + _envelope.MessageType));
            
        return msg;
    }
}
 
internal sealed class UnhandledEnvelopePipelineFactory : IMessageEnvelopePipelineFactory
{
    public IMessageEnvelopePipeline Create(Uri originatingEndpoint,MessageEnvelope envelope)
    {
        return new UnhandledEnvelopePipeline(envelope);
    }
}
 

A service may have multiple Factories that, in turn, build multiple Pipelines.  A developer could, for example, leverage MEF to instantiate the Factories.  Unprocessed Envelopes would return null from their Factory allowing more other processing to move onto the next Factory.  The code may look something like the following.

[Export(typeof(IMessageEnvelopePipelineFactory))]
public class DataTransferPipelineFactory : IMessageEnvelopePipelineFactory

A full explanation to using MEF is beyond the scope of this article, but you’ll find a good explanation here http://www.codeguru.com/csharp/article.php/c20017/NET-Workload-Indirection-with-Managed-Extensibility-Framework-and-Task-Parallel-Library.htm.

The example Exception implementation would become the default if a Factory was not found to handle the Envelope for the particular MessageType.

Conclusion

A developer building messaging solutions with Azure Queuing and Azure Service Bus should follow a Pipeline pattern when processing a significant number of different messages.  One approach to building a Pipeline is to layer a message following an Envelope pattern.  Pipeline components then operate on the layers in stages.  This article demonstrated an approach that also utilized the Factory pattern, .NET Serialization, and Extension methods.

Resources

Envelope pattern

UriTemplate class

Serialization and Queuing



Related Articles

Comments

  • There are no comments yet. Be the first to comment!

Leave a Comment
  • Your email address will not be published. All fields are required.

Top White Papers and Webcasts

  • Live Event Date: December 11, 2014 @ 1:00 p.m. ET / 10:00 a.m. PT Market pressures to move more quickly and develop innovative applications are forcing organizations to rethink how they develop and release applications. The combination of public clouds and physical back-end infrastructures are a means to get applications out faster. However, these hybrid solutions complicate DevOps adoption, with application delivery pipelines that span across complex hybrid cloud and non-cloud environments. Check out this …

  • On-demand Event Event Date: October 29, 2014 It's well understood how critical version control is for code. However, its importance to DevOps isn't always recognized. The 2014 DevOps Survey of Practice shows that one of the key predictors of DevOps success is putting all production environment artifacts into version control. In this webcast, Gene Kim discusses these survey findings and shares woeful tales of artifact management gone wrong! Gene also shares examples of how high-performing DevOps …

Most Popular Programming Stories

More for Developers

RSS Feeds