Thoughts on Messaging and MVC
When I start to learn a new style of programming I like to experiment by creating my own versions of things. This has lead to writing my own ORM framework and Mime parser just to name a few of my coding accomplishments for which I pride my self. Writing each was a valuable learning experience that allowed me a much better understanding of similar software systems.
So when I started reading about messaging and Command Query Separation I became interested in how they could be implemented in an ASP.NET MVC style application. The biggest issue I faced was the fact that messaging is an asynchronous environment. You publish your message and can go off and do other things while waiting for a response. But in a controller action your command message seemed to be the summation of action call. For instance say you wanted to add an item to a shopping cart. You create an AddItemToCartCommand and publish it to the message bus. Now you need to wait for the response message that the item was added to the cart, or otherwise failed to be added (say the item was discontinued). With most of the service bus implementations handling this situation seemed overly complex. So I coded up a spike to see how I could address the problem while trying to keep things simple and discoverable. What is displayed below is working code and I will publish the implementation details in a follow up post. For now I want to focus on the syntax of how its used.
using System;using iLude.Messaging.Core;using iLude.Messaging.Engine;using iLude.Messaging.Tests.Support.Messages;using NUnit.Framework;using NUnit.Framework.SyntaxHelpers;namespace iLude.Messaging.Tests {[TestFixture]
public class ExampleTests {
private IServiceBus bus;[SetUp]
public void Setup() {
bus = new ServiceBus();}
[Test]
public void When_request_is_sent_should_receive_response() {
var messageReceived = false; var timeoutCalled = false; bus.Subscribe(this).To<RequestMessage>().HandledBy((x, message) => x.ProcessRequest(message)); using(IMessagingSession session = bus.GetSession()) {session.Publish( new RequestMessage { Body = "Hello World" } )
.WaitFor<ResponseMessage>(message => { messageReceived = true; }).OrTimeoutAfter(1.Seconds(), () => { timeoutCalled = true;});
}
Assert.That(messageReceived, Is.True);
Assert.That(timeoutCalled, Is.False);
}
private void ProcessRequest(RequestMessage message) {
bus.Publish(new ResponseMessage(message.ID));}
}
}
First off lets look at the setup of a message handler.
bus.Subscribe(this).To<RequestMessage>().HandledBy((x, message) => x.ProcessRequest(message));For this I could have gone the traditional interface route, but I decided against it for a number of reasons. Chief among those reason is that it limits future convention of configuration possibilities. Instead I opted to use a lambda function to specify the method I wished to be called to handle the message. In this case I also choose to pass in a specific instance that will be notified, But a generic override is also available so that only a class type need specified. The bus will then use an IOC container to fetch an instance of that type to handle the published message.
In this case the handling method is pretty simple.
private void ProcessRequest(RequestMessage message) {
bus.Publish(new ResponseMessage(message.ID));}
We receive a copy of the published message and sent out a response. The ResponseMessage is passed the original message id, which it will use as the correlative ID.
Now we can get to the meat and potatoes:
using(IMessagingSession session = bus.GetSession()) {session.Publish( new RequestMessage { Body = "Hello World" } )
.WaitFor<ResponseMessage>(message => { messageReceived = true; }).OrTimeoutAfter(1.Seconds(), () => { timeoutCalled = true;});
}
First we create a new MessagingSession that is disposable. This session is our Unit of Work. I use it to provide synchronization around this asynchronous operation. The operation is to publish a request message and wait for a response. If we receive one we process the anonymous function which sets messageReceived = true. Otherwise if we have not gotten a response after 1 second we should set timeoutCalled = true. The publish method and its child methods return nearly instantly and execution in the current thread continues to the end of the using statement.
This is where the magic happens, in the session’s dispose method.
public void Dispose() {
bus.Publish(request);
if(resetEvent.WaitOne(timeout)) {action.DynamicInvoke(response);
}
else if(timeoutAction != null) {
timeoutAction.DynamicInvoke();
}
}
At this point the message has not yet been published by the session. When the session’s Dispose method is called it will publish the message to the bus and then blocks the thread using a ManualResetEvent to wait for a response. The session does this by subscribing to the message bus for the requested response message type. If it received a response message with the proper correlative ID it sets the ManualResetEvent. This unblocks the dispose method to invoke the anonymous function. Otherwise if the timeout expires the specified timeout function is called.
I would be very interested in hearing your thoughts on this idea. As I stated this is only a spike and I have not pushed it much beyond this stage. I am also very new to messaging and as such may be doing more than my fair share of bad practices. So all feedback is welcome.