Using queues in Microsoft Azure for a scalable application

Any website will start with a handful of users and grow (hopefully) to many thousands. Azure allows you to control the amount of server resources your application will use, and hence the cost of hosting – with low use at the start and growing as the site requirements change.

One aspect that needs to work harder is in writing to your database. Your database access code needs to react to varying levels of usage. By using a combination of Azures queue storage and worker roles we can increase or decrease the resources allocated to database writes depending on demand.

Essentially what we need to do is pipe all database access requests through a queue and let the worker role do the actual database writes.

Initialising the queue storage.

To setup queue storage we need to set the configuration within the Azure startup solution. Expand the Startup project of your solution and look for two files;

ServiceConfiguration.csdef
ServiceConfiguration.cscfg

Starting with the csdef file open it in Visual Studio and look for the ConfigurationSettings tag

    <ConfigurationSettings>
      <Setting name="DiagnosticsConnectionString" />
      <Setting name="MessageConnectionString" />
    </ConfigurationSettings>

The above tag shows two setup. There needs to be one each for the Roles(s) in your Azure application. These entries are then echoed in the .cscfgfile

    <ConfigurationSettings>
      <Setting name="DiagnosticsConnectionString" value="UseDevelopmentStorage=true" />
      <Setting name="MessageConnectionString" value="UseDevelopmentStorage=true" />
    </ConfigurationSettings>

For the moment I am using Development storage so this is what the current settings point to. So that is the storage setup.

The base Messaging Object

All the objects I am going to write to the database are based on a single object that implements IDisposable.

 public class MessagingObject : IMessagingObject,IDisposable
    {
        public string IP
        {
            get;
            set;
        }

        public MessageAction Action
        {
            get;
            set;
        }

        public void Dispose()
        {
            IP = String.Empty;
        }

Going quickly through the object. First off it inherits from IDisposable. Apart from the obvious benefits of disposing of objects properly. It also allows us to use the following construct when using any of our MessageObjects.

            using (MyObject myInstance = new MyObject())
            {
            }

The MessageObject only exists between the curly Brackets and is disposed of by the .Net Namespace.

As part of the security of the application I am going to record the IP address. This can be found by running the following code;

            OperationContext context = OperationContext.Current;
            MessageProperties prop = context.IncomingMessageProperties;
            RemoteEndpointMessageProperty endpoint = prop[RemoteEndpointMessageProperty.Name] as RemoteEndpointMessageProperty;
            string ip = endpoint.Address;

Next we have the action. This is how we are going to tell the worker role what we want done with the object. I’ve used an enumerated type – which may grow in the future

public enum MessageAction
{
        Create,
        Write
}

At the bottom is a Dispose.

Finally – this object implements an IMessageObject which forces the provision of these base elements.

    public interface IMessagingObject
    {
        String IP { get; set; }
        MessageAction Action { get; set; }
    }

Posting our objects to the queue.

Queue storage requires that the objects are text – so we now need to convert our objects to a string representation. As it happens there is a way of converting objects to a string built into the .Net Serialization Namespace. To use this ensure your create a reference in your project to the System.Runtime.Serialization namespace and place a using statement at the top of your module;

using System.Xml.Serialization;

We first have to initialize an object to do our serialization.

XmlSerializer serializer = new XmlSerializer(typeof(MyObject));

The XmlSerializer uses a stream to write the serialized object so we have to set those up as well.

 MemoryStream ms = new MemoryStream();
 XmlTextWriter xmlTextWriter = new XmlTextWriter(ms, Encoding.ASCII);

Serializing the object is fairly easy. We just use the Serialize method

serializer.Serialize(xmlTextWriter, myObject);

This takes the TextWriter and the object as parameters.

Next we need to connect to the queue to store the message

                var storageAccount = CloudStorageAccount.FromConfigurationSetting("MessageConnectionString");

                // retrieve a reference to the messages queue
                var queueClient = storageAccount.CreateCloudQueueClient();
                var queue = queueClient.GetQueueReference("messagequeue");

                queue.CreateIfNotExist();

So here we we have a message and a queue – now to write that message to the queue.

                var cloudMessage = new CloudQueueMessage(ms.ToArray());
                queue.AddMessage(cloudMessage);

First the MemoryStream is converted to a Byte Array and then this is pushed to the queue.

That now leaves the message on the queue – the next stage is to write the message to the database – and this is done by a worker role.

Initialising the connection to the queue to read it.

Queues are described as FIFO – First In, First Out – which is why they are good for the current activity – since we need to ensure everything is written in the same order of the request.

We first need to open a connection to the queue

            var storageAccount = CloudStorageAccount.FromConfigurationSetting("MessageConnectionString");

            // retrieve a reference to the messages queue
            var queueClient = storageAccount.CreateCloudQueueClient();
            var queue = queueClient.GetQueueReference("messagequeue");

Reading entries from the queue.

A worker role just sits in the background – goes to sleep for a pre-determined period and then wakes up to do it’s task – in this instance read the queue.

First activity is to get the message at the front of the queue.

 var msg = queue.GetMessage();

Now this code is going to have to work for any type of object we throw at it – so we need a way of telling what kind of object it is – so we can then later initialise the correct deserialization object.

                        sMsg    = msg.AsString;
                        doc     = new System.Xml.XmlDocument();

                        doc.LoadXml(sMsg);
                        ms = new MemoryStream(encoder.GetBytes(sMsg));

                        documentElement = doc.DocumentElement.Name;

All this code is doing is reading the message into an XML document and then reading the first element – this will be the type of the object.

Once we have done this a simple switch statement can then be used to select the correct deserialization.

switch (documentElement.ToLower())
                        {
                            case "user":
                                serializer = new XmlSerializer(typeof(MyObject));
                                MessageMarshaller.Execute((MyObject)serializer.Deserialize(ms));
                                break;   

                            default:
                                serializer = new XmlSerializer(typeof(Object));
                                break;
                        }

The real magic here is the use of Overloading to select the correct MessageMarshaller. .Net does all the hard work of selecting the correct one for us. The Deserializer method returns an object and we cast it to the correct type. Now all the MessageMarshaller does is use the MessageAction to select the correct action.

A version of the Marshaller is then created for all objects in our database.

A few things to tidy up.

There is a few things missing from this. The most important is error handling. If a message is not decoded and then subsequently written to the database that must be handled. In this instance Messages should be written to a quarantine table and a function written to write them manually to the database.

How this helps in scalability.

There can be any number of a particular role running in an application and this is how the scalability works. By careful analysis of the queue size we can see how many copies should be running and increase, or decrease the number as required. Since the more you have, the higher the cost, and the fewer you have, the higher the response times – a careful balance needs to be maintained between cost and response.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s