Hello there, welcome to The Little Messaging Book. This is a book that can serve as an introduction to messaging, and it might also contain little nuggets of inspiration for fairly seasoned messaging implementors too.
The book is targeted at software developers, who are interested in getting started building systems using messaging – most likely in .NET using Rebus – but parts of it are written without specific technologies or platforms in mind, so there might be some words of more general wisdom in it too.
After reading this book, you should have a general understanding of which mechanisms are involved in a messaging-based system, and you should know what to Bingle for, when seeking further enlightenment.
I will try to keep this book short 🙂 and I will use emojis here and there 🚀🧀🎺 and throughout this book I will refer to myself as “I” and “me”.
The book is written by me 🤓 My name is Mogens Heller Grabe. I was born in 1979 and I live in Horsens in Denmark with my wife and our two sons. In addition to messing with computers, I like to run, play squash, brew, drink, and monger beers, listen to music and play the guitar, usually only a few of them simultaneously.
Throughout my career as a software developer, I have had an intense, yet pragmatic interest in distributed systems, which means that I have always thought that message queues and databases were pretty cool, and I have always tried to use them to produce simpler and more wholesome software.
In 2011, when I needed a “service bus” (I’ll get back to that later on 🙂) I went and made my own: Rebus. I will talk a lot about Rebus in this book.
After having enjoyed working with Rebus professionally as a consultant for 5-6 years, helping clients support their businesses with messaging, I decided to start a company primarily devoted to helping software developers get a luxurious Rebus experience: Rebus FM – “Rebus Fleet Management”.
This means I make a living out of selling Rebus Pro subscriptions, which consists of a support agreement and a “Fleet Manager” (for managing your fleet of buses…), which in turn means that I have a commercial interest in spreading the word about Rebus.
I am an open source hippie 🌻 by heart though. Rebus and all of its integration libraries would not be where they are today without the 100+ contributors, who have contributed with their pull requests, suggestions, comments, etc, so I promise that Rebus will always be free for everyone to use! 🚌
The last thing I want to say in this introduction is this: I didn’t discover or invent anything.
Rebus has drawn lots of inspiration from NServicebus, which is another great .NET service bus implementation, and all of the messaging patterns implemented by Rebus are described in the literature, where “Enterprise Integration Patterns” by Gregor Hohpe and Bobby Woolf in my view is the most prominent place to look for these things.
Some great writing about architecture quality attributes can be found in “Software Architecture In Practice” by Len Bass, Paul Clements, and Rick Kazman.
Throughout this book I will assert and claim various things, and I will not bother to put in formal references. Chances are, if something I say sounds like it could be true and/or clever in some way, then it probably comes from one of the aforementioned books. 🙃
A few times I might postulate something and say “In MY experience, bla bla bla (…)”, which is just a work-related injury that stems from having worked as a consultant for many years… – I apologize in advance. 🙄
This introduction is long enough already. Let’s go!
Before we dive into messaging patterns, I think it makes sense to set up a tiny framework that we can use later on to describe properties and inherent qualities of the patterns discussed.
The thing is: I am going to casually throw around words like “availability”, “scalability”, “maintainability”, etc.
These words are “architecture quality attributes”, which are words that describe the non-functional properties of a system.
There exists formal methods for evaluating these attributes, but since many of them are pretty fluffy in nature, my own personal experience is that it’s often sufficient to use the words to be able to describe trade-offs in verbal and written communication, and not necessarily evaluate them formally.
Many of the quality attributes end with “ility”, e.g. “availability”, “durability”, etc., which is why they’re sometimes referred to as “ilities”.
When you come across “ilities” in text, it could be an indication that someone actually took the time to put some thought into why certain decisione were made. 😉
When you’re designing distributed systems, it’s nice to be able to communicate clearly what your design consists of. That’s what messaging patterns are for: Communication!
You could also think of messaging patterns as blocks of Lego, but that’s not the primary angle, in my opinion, because it’s the wrong way around.
Messaging patterns should more be thought of as condensed descriptions of patterns, which have emerged from anthropological studies of successful software systems. Thus they’re just a way to explain which concepts and contraptions were involved in solving some particular problem in a good way, thus contributing to building successful software.
It doesn’t mean that software will automatically be successful, if you can recognize a long list of patterns in it. But if your system IS successful from a technical point of view, I bet you can find a couple. 🙂
In this book, several well-known messaging patterns will be mentioned and/or explained to some degree with concrete examples on how they are manifested in code using Rebus as an example.
Armed with the vocabularies provided by these two areas, I can make statements like “Using competing consumers in this situation improves availability and scalability at the expense of making deployability slightly more involved”, and then this pretty condensed sentence will actually make sense to the right audience.
Being able to do that can be pretty great, because then you can document why certain decisions have been made, and you can explain the layout of areas of your architecture, sometimes entire subsystems, in a tight, condensed, and unambiguous language.
When you’re designing distributed systems, it’s nice if the independent components that make our your system are able to communicate.
This usually involves some kind of remoting, e.g. in the form of HTTP APIs and message brokers. Another way to look at it, would be to divide the communication into whether it’s synchronous or asynchronous.
Synchronous communication can be had in many ways, e.g. in the form of various Remote Procedure Call (RPC) protocols, or in the form of the ubiquitous HTTP protocol.
Even though your concrete RPC protocol or HTTP API might offer an asynchronous programming API, they’re still synchronous for the sake of this discussion in that they all require the caller to stay put somehow, until a reponse is received.
Another way to see it is this: If a client holds transient state after having sent a request, then the protocol is synchronous.
Truly asynchronous communication can only be had if some kind of messaging middleware is involved, i.e. a mediating party must somehow help with preserving state in the face of failures like coding errors, resource depletion, network interruptions, hardware failures, power loss, etc etc etc. 😱
Messaging middleware often provides its service in the form of durable asynchronous message queues as its most basic communication channel, and that is exactly the type of channel this book is about.
Over the recent years, a variation of the traditional message queue has become popular in the form of the log-based brokers, Kafka, Pulsar, + probably a couple more I don’t know about. They offer asynchronous communication too, and if you squint a little, they may also seem like they’re very similar to message queues. Once you get closer though, you will find a bunch of differences, which I’ll dive a little bit into in Queues vs. logs.
To be entirely clear, let’s go through each word and see what they imply:
“Durable” because the queue’s data is not lost, even though processes crash or someone accidentally pulls the power cord out of the socket. This implies that messages are stored in persistent storage somehow, usually in a database or in the file system.
“Asynchronous” because you can put a message into the queue, and you do NOT have to wait until the receiver gets the message – you’re free to send as many messages as you like, usually as fast as you can produce them, and the middleware will simply enqueue the messages. This also means that if you want to get a reply, you need to wait for it using another channel.
“Message” because the queue’s data consists of little meaningful data nuggets, which we call “messages” (more about that later), to which we add whichever semantic feels appropriate in a given context.
“Queue” because the nuggets go into one end and come out of the other, usually in the order they entered. You can also call that a First-In-First-Out (FIFO) queue. Logically you can think of the queue as an unbounded buffer that accepts messages as fast as they’re produced, and delivers messages as fast as they’re consumed.
Durable asynchronous message queues can be had in different technological flavors, where each specific implementation carries with it certain properties around the level of availability they provide, which operations story they offer, etc.
Moreover, if you’re hosting your software in a cloud somewhere (e.g. AWS or Azure) it can be beneficial to make use of a message queue implementation provided by that particular cloud.
For example: If you’re running in Azure, you probably want to take a look at Azure Service Bus or Azure Storage Queues.
If your system is running in AWS, you probably want to use Amazon SQS.
If your software is running on a machine in your closet or in a hosting center somewhere, you might want to use MSMQ (if you’re in a classic Windows Server domain-based environment) or RabbitMQ.
Or maybe your message queue requirements are modest, and you care more about operational simplicity, and the only thing you have in the world is a SQL Server/PostgreSQL/Oracle installation – then those databases can luckily also be used as if they were “real” message queues.
The concrete technology involved does not matter much – what matters is that you have durable asynchronous message queues (in one form or another) at your disposal, because with those we can build some pretty magnificent pieces of software 🙏
The following sections will just briefly introduce a couple of the popular queuing systems.
Azure Storage Queues is the most basic queue implementation integrated in the Azure platform. It is part of an ordinary storage account, along with tables and blobs, and as such it is very inexpensive to work with, and capable of storing many many GBs of messages.
It is also pretty crude in some regards – e.g. its basic API involves polling for messages, meaning that the consuming code probably needs to implement some kind of back-off behavior to avoid paying for too many unnecessary HTTP requests.
Moreover, it doesn’t support any kind of multicast messages (i.e. sending one message to multiple queues), so if you e.g. want to implement the classic publish/subscribe (“pub/sub”) pattern directly on top of Azure Storage Queues, then you need to either a) implement your own handling of subscriptions, or b) use a library like Rebus, that provides this particular mechanism.
At the same time, it has some pretty sophisticated features – e.g. it has native support for “visibility delay”, i.e. being able to send a message “into the future”.
Azure Storage Queues has lease-based message transactions, which means that a message is not deleted when it is received, it is just invisible to other consumers for a while. If the recipient then acknowledges (“ACKs”) the message within that timeframe, the message gets deleted. If not, the message becomes visible again for other consumers to receive.
Azure Service Bus is the next tier in Azure’s messaging offerings, bringing even more sophisticated routing capabilities and better APIs to the table.
For example, it has native support for publish/subscribe messaging in the form of a topic-based model, features for dead-lettering, etc.
Its API has two modes: A binary protocol (which is preferred, because it is “slimmer” and more optimized) and HTTPS (which may be used in scenarios where local IP filtering prevents reaching out to arbitrary TCP/IP things on the internet).
The “Azure Service Bus” thing on Azure comes with a bunch of other stuff too, like e.g. the “Relay Service”, which is irrelevant in this context.
Azure Service Bus also has some pretty non-standard features, like e.g. the ability to filter messages received on a subscription for messages that satisfy some criteria, thus adding an extra level of filtering beyond what is provided by the topic-based pub/sub model.
Similar to Azure Storage Queues, Azure Service Bus implements lease-based message transactions, meaning that messages must be ACKed within some specific timeframe to be deleted from the queue, otherwise they will become visible again.
MSMQ (or “Microsoft Message Queuing”) is a queuing system, which is
built into Windows. It has been available since Windows 2000, and it is
still being updated from time to time. Unfortunately, Microsoft chose to
skip the entire System.Messaging
namespace when porting
code from old .NET Framework to .NET Core/.NET, so there’s no longer
support in .NET for using MSMQ.
This is a shame, because MSMQ is pretty nice queuing system, and I know of many companies around the world are still using it.
To enable it in Windows, visit the appropriate “Add/Remove Windows Features” area of the Windows version you are using, and put a checkmark next to “Message Queuing”.
MSMQ works best when all of the participating machines are part of the same Windows Domain. Actually, I have only heard bad things about when people try to make MSMQ communicate across network boundaries, e.g. in/out of a DMZ. This is particularly troublesome, because it might be possible to make MSMQ communicate properly for a while by disabling security on all queues and opening a bunch of ports, but then messages often stop flowing after a while because nobody knows why!! 😩
My advice is: Don’t do it. Only use MSMQ when all machines are on the same Windows Domain.
The good thing with MSMQ is that – provided that all machines are on the same domain – it has great availability (because there is no central server and all communication with the queuing system is local), is has great scalability (because there is no central server and messages are copied from machine to machine on a peer-to-peer basis), and it is extremely easy to operate because it’s basically self-tending.
RabbitMQ is one of the most popular message brokers. It’s built in Erlang, it’s completely free, it has some pretty flexible features that allow for building queuing systems with high availability, and it is also pretty fast. Moveover, it delivers many ways for you to shoot your own leg off, so it’s a pretty interesting tool in that regard. 😜
No, seriously: RabbitMQ is great, no doubt about it – but it has some pitfalls too, especially around its master-master cluster configuration. You should read what Kyle Kingsbury says before diving into this mode of operation.
Also it provides many non-durable ways of sending messages, which oftens gets people hooked on it (“Wow, RabbitMQ is FAST!!”) and then it turns out that what they thought were safely handled by the broker was actually just stored in memory of the receiving node and would be have been lost if it had crashed.
To me, RabbitMQ is the quintessential AMQP implementation, because the messaging model is built around the notion of “exchanges”, “topics”, “bindings”, and “queues”. It should be noted though that it implements AMQP 0.9.1, which is a pretty different thing from AMQP 1.
An exchange is where you deliver your messages, and each message then has a “routing key” attached to it. The type of exchange then decides how the routing key is interpreted.
If e.g. the exchange is of the “topic” type, then the routing key is interpreted as a topic, meaning that a copy of the message gets sent to all queues subscribed to that topic.
That’s basically it. 🙂 RabbitMQ can do more stuff, and even more if you extend it with plugins.
Amazon SQS is a pretty simple queuing system, which is part of AWS. It has a REST-based API and it supports the most basic queue operations, and one sophisticated feature: The ability to delay message delivery into the future.
It does not have native publish/subscribe support, so you would have to manage subscriptions and message distribution yourself, if you wanted to so pub/sub messaging with it.
If all you have is a relational database (e.g. if your operations department are not keen on introducing message brokers and stuff into their pen), then you can use tables as if they were message queues.
A message is just a chunk of data meant for some specific recipient to receive, so it’s pretty easy to come up with a model where e.g. tables represent queues and rows are messages, and then each recipient simply queries its own table for messages intended for it.
In order to send a message to a specific recipient, you then simply insert a row into the recipient’s table.
It requires another level of sophistication of course, if you want to allow for multiple recipients to take messages out of the same “queue” without accidentally handling the same message more than once, and you need some kind of protocol to handle “transactions” (i.e. the ability to handle a message without it being deleted no matter how the handling might fail).
Also, if you have high message throughput requirements, a RDBMS might not be the best fit for the task, because the transaction log can quickly become quite big because of message churn.
Now you have come this far. If you have read everything in the previous sections, then you might be filled with the impression that even though most queuing systems have many traits in common, they are also quite different in some regards, and they all come with their own best practives, caveats, etc.
If that’s your impression, then you are absolutely correct.
This is one of the areas where Rebus finds its justification. 😎 Because Rebus lets you use a sensible subset of the functionality provided by the queuing systems, possibly “polyfilling” in a few places where features are missing, thus enabling a faily consistent experience, in turn making your code portable.
If you think this sounds cool, read on! 👇
Rebus is a .NET library that comes in the form of a NuGet package.
Rebus provides a programming model, where you – as a software developer interested in satisfying functional requirements – get to program message handlers and send and publish messages without worrying too much about the specifics of whichever queuing platform your system is actually running on.
It does this by providing an abstraction based on common messaging patterns, and then you can make it run on some specific queues by configuring it to do that.
For example, you can bring in the Rebus.Msmq
NuGet
package or the Rebus.AmazonSQS
NuGet package and then
decide whether to run on local MSMQ or AmazonSQS in AWS by means of
changing this line:
.Transport(t => t.UseMsmq("my-queue"))
to this line:
.Transport(t => t.UseAmazonSQS(..., "my-queue"))
and back. Everywhere else in the code, there’s nothing that smells remotely like MSMQ or Amazon SQS, which is neat 🤗
And then, when you want to exercise your code in a realistic integration test, but you want it to run on the build server too, you can change it to:
.Transport(t => t.UseInMemoryTransport(..., "my-queue"))
because Rebus can run with in-memory queues too, without any additional NuGet packages.
This way, systems built with Rebus can be made to be completely portable, allowing them to be moved between on-premise hosting and cloud hosting environments with minimal change, thus reducing the degree of lock-in to your cloud vendor.
Rebus provides mechanisms to handle subscriptions too, making it possible to use publish/subscribe messaging and not care about whether subscriptions are stored in a SQL Server somewhere, or whether the chosen queuing technology has native support for pub/sub messaging.
Same thing with messages “sent into the future” – with Rebus, you can just
.Defer(TimeSpan.FromMinutes(5), yourMessage); await bus
to send your message and not have it delivered until after 5 minutes have passed, and then Rebus either uses the queuing system’s native ability to set the correct visibility timeout, or stores the message in a database somewhere to be delivered later.
In addition to these things, Rebus provides a model for handling long-runnning processes (“sagas”, known in the literature as “process managers”) by taking care of message correlation, persistence, and concurrency.
It handles serialization and deserialization of messages. It can GZIP message contents, if the messages are big. It can use its “data bus” to transfer huge attachments, if the messages would otherwise become too big to transfer over queues. It can encrypt message contents, if the messages are secret.
It can perform a ridiculous amount of I/O work, using only one single
thread and a tiny fraction of your CPU, because it is based on .NET
Task
s to the core, relying on .NET’s Task Parallel Library
and the .NET thread pool to schedule their execution.
Last, but not least: It’s quite pleasant to work with. Its error messages are explanatory and helpful. 😇
All this comes at a price, of course: $ 0. Rebus is MIT-licensed, and you can do with it whatever you want. Of course I would personally love it if you became a Rebus Pro subscriber 😉, but I also believe that many good things will happen in the future when more people are using Rebus and are happy in doing so.
While Rebus is pretty flexible and extremely extensible in some regards, it does build on an opinionated view of the world.
Specifically, the following sections explain how some of its quality attributes have been prioritized, which can help users better understand which scenarios it is good for.
In most cases where a trade-off was required between performance and some other factor, Rebus has opted for optimizing the experience for the developer.
In other words, on the scale between “extreme performance” and “safety of use”, Rebus has positioned itself around here:
↓[============================================================]
Extreme Safety performance of use
This means that many things are taken care of automatically, and
messages are durable by default. It’s possible with some transports to
relax durability by adding the rbs2-express
header to a
message, but it’s a thing you would have to be pretty explicit
about.
On the scale between “extreme performance” and “developer friendliness”, Rebus has positioned itself around here:
↓[============================================================]
Extreme Developer performance friendliness
meaning that Rebus exposes programming APIs that are not necessarily the most efficient from a performance point of view, they are meant to be friendly to the developer and thus pleasant to use.
On this last scale between “extreme performance” and “clean and testable code”, Rebus has positioned itself around here:
↓[============================================================]
Extreme Clean and performance testable code
because Rebus goes to great lengths to prevent abstractions from leaking or let you program your way into a corner in any other way by tying yourself to the concrete technologies underneath Rebus, just to gain better performance.
Summary: If you’re in the messaging game for extreme performance, you should probably not pick Rebus.
It does not mean that Rebus is slow, it just means that if your objective is to push the queuing system to its MAX, then you should probably not use Rebus for that… If, on the other hand, you care about solving business problems and writing maintainable and portable code, then you should definitely take a look at Rebus.
One of the initial design decisions in Rebus was that it should not ignore the CAP theorem.
The CAP theorem states (roughly) that: When there’s trouble with the network communication in a distributed system, you can either have AVAILABILITY or CONSISTENCY.
Here, choosing “availability” means that your system keeps working, but data in one place may be out of sync with data in another place.
Conversely, choosing “consistency” would means that your system would NOT keep working, because it would NOT be able to keep data in sync across all places without the network communication that failed, so therefore it has no other choice than to STOP THE WORLD.
The choice between AVAILABILITY and CONSISTENCY must be made in all situations where two or more parties are involved in a transaction of some kind – e.g. in a scenario where a message is received (1st transaction involves the queuing system) and some data in a database is updated (2nd transaction involves the database) – and it is a mathematical fact that both cannot be had.
The two-phase commit protocol tries to achieve CONSISTENCY by means of implementing a protocol that consists of the following two steps after having done some work in multiple transactions:
This way, the risk that e.g. one transaction is committed and another one gets rolled back is minimized. The problem arises when something fails during the COMMIT phase, like e.g. the last party can suddenly not be communicated with, and therefore the transaction coordinator cannot tell it to commit.
In this case, the coordinator hos no other choice than to leave the transaction hanging with any locks associated with it, i.e. it sacrifices AVAILABILITY in an attempt to achieve CONSISTENCY.
When I say that Rebus should not ignore the CAP theorem, I mean that this brutal fact should not be ignored, and therefore it is better that developer do NOT rely on two-phase commit to do their work.
With Rebus, you are encouraged to be conscious about the fact that the transactions are nested like this:
/ begin queue tx (i.e. receive message – make it invisible to others)
|
| / begin "work tx" (your code defines what this is)
| |
| | perform operations in "work tx" in here
| |
| \ commit "work tx"
|
tx (i.e. delete message from queue) \ commit queue
This also means that there is a small, but very real, risk that the following thing happens: After committing your “work tx” successfully (like e.g. incrementing a counter in your database), someone pulls the power cord out of the wall socket, and the system dies right before getting to commit the queue transaction – it would look like this:
/ begin queue tx (i.e. receive message – make it invisible to others)
|
| / begin "work tx" (your code defines what this is)
| |
| | increment counter in database
| |
| \ commit "work tx"
|
X (...power cord was pulled!)
When the machine boots again and your system starts running again, the message will be received once more, and your counter will be incremented again – UNLESS YOU SPECIFICALLY ENSURE THAT THAT CANNOT HAPPEN. 🤔
If you’ve read all the text up until this point, you have come across several examples of how message consumption is made safe, such that messages cannot be lost, even when message processing fails.
Most queuing systems support some kind of protocol that allows for receiving each message in two steps:
This way, if you don’t acknowledge (either by explicitly telling the queuing system to roll back the message transaction, or implicitly by just being silent about it), the message will become visible again to other consumers.
With this simple protocol, we can ensure that the only way messages can disappear, is if they have been handled with success. We can do this by arranging the transaction the way we have seen before:
/ begin queue tx (i.e. receive message – make it invisible to others)
|
| / begin "work tx" (your code defines what this is)
| |
| |
| |
| \ commit "work tx"
|
tx (i.e. delete message from queue) \ commit queue
But it also means that there is a risk that your “work tx” can be executed more than once, as we have seen.
This is called “at least once”-delivery, which means that messages are guaranteed to be delivered once - but then if something fails, you will get to receive them more 2 or more times.
Out of all of the available delivery guarantees
we prefer “at least once”, because “at most once” means that messages will be LOST if something fails, and “exactly once” is IMPOSSIBLE (in the general case) as per the CAP theorem. 🤥
With “at least once”-delivery, we are guaranteed to get at least one chance to handle each message, so we just need to gracefully handle the case where a message gets handled twice (or more… 🤯)
By now, it’s probably clear that “at least once”-delivery is preferred, because that is the only delivery guarantee that allows us to be sure that we get to handle each message and not risk losing one.
What’s left now is to ensure that the world (i.e. our system) does not end up in the wrong state if we get to experience a re-delivered message. So how do we do that?
In math, an “idempotent” operation is a function that yields the same result, no matter how many times it is called with the same input. We borrow this word, and then we aim for making our message handling idempotent!
How we exactly go about doing that certainly depends a lot on which kind of work we are doing.
In most cases, we generate a bunch of side effect that we can consciously choose to ignore.
If e.g. your message handler outputs log statements to a set of files or a log aggregator, then the fact that the log statements go into the files or the aggregator is a side effect. But chances are that we only use this logging for debugging and statistics, so it doesn’t bring our system into a bad state that the same (or different?) log statements are generated during the processing of a re-delivered message - in fact, it’s nice if your trace logs are repeated, so you can see afterwards that a message delivery was retried.
In other cases, we may be so lucky that the nature of the work we are doing is naturally idempotent, or maybe we determine that the consequences of processing the same message twice are so insignificant that we don’t care about it.
An example could be a handler that receives a
DeliveryAddressChanged
event and updates the delivery
address of a portion of a specific order – if we processed this event
twice, we would simply overwrite the delivery address with the same
address, thus not actually changing the state of the order at all.
Of course we can add more nuance to this scenario, e.g. if the
handler published an OrderChanged
event after processing
the DeliveryAddressChanged
event – because semantically it
would be wrong: The order would not actually have been changed!
In other cases, we might be so lucky (and pragmatic 😎) that our idempotency can be had simply by virtue of our data containing sufficiently relevant information to be able to discard redundant/expired information.
For example if we look at the DeliveryAddressChanged
event from before – now imagine that the event contains a timestamp of
when the event was generated, and we stored the timestamp along with the
delivery address on the order in our database.
Our logic could then look like this:
if (timestampFromEvent <= timestampFromOrder) {
// ignore the event
return;
}
thus ignoring re-delivered and outdated events.
In fact, this solution has the nice property that it also handles reordered messages: Even though most message queues are FIFO, you should never count on FIFO in a strict mathematical sense, because many things in the real world can break that. For example, receiving messages concurrently (multiple processes and/or multiple threads), messages travelling over different routes thus taking different times to reach their destination, messages having spent time in “dead-letter queues”, etc.
In some cases we need our handler to be truly idempotent.
This can be implemented in several ways, e.g. the “brute force”-approach where we simply store message IDs of handled messages in our work database (with a unique constraint on the ID), so we can insert a row into this table when handling a message.
We could then detect a re-delivered message by looking up the message ID, and then we can take appropriate measures if we detect a re-delivered message.
Even though most (if not all) queueing systems are FIFO, i.e. they adhere to the “first in first out” principle, you should not count too much on it, as there’s several normal and good reasons why messages can be more or less reordered.
To achieve higher message processing rates, it’s pretty normal to allow for some kind of parallelism when processing incoming messages. Rebus defaults to allowing 5 handlers to run concurrently, so the order in which handlers are going to do their things is going to be pretty non-deterministic simply because of that. Factor in the ability to do competing consumers, this way distributing load to multiple nodes, message handlers end up running in an even more unpredictable order.
Lastly, it’s pretty normal for message handling to sometimes fail, which ultimately ends up moving the problematic message to a dead-letter queue. At a later time, the message is probably moved back to its source queue, which - as you can imagine - has potential for messing up the ordering even more!
In summary: Build your systems with tolerance for some degree of message reordering, maybe even to tolerate messages in any order. If you think of each message as an operand in algebra, you could say that systems need to be commutative.
Another design decision made in Rebus is that you should not handle (most) errors, you should let exceptions occur.
Rebus will log exceptions as they happen, and if they keep happening when processing a specific message, that message will be moved to a “dead-letter queue” (also known as an “error queue”, and messages that cannot be consumed because of errors are often called “poison messages”) – this way, the message does not clog up the pipes, while it remains safely stored in a queue somewhere.
When you want to configure Rebus in an application, the general pattern looks like this (assuming Microsoft’s generic host):
.AddRebus(
services=> configure
configure .Transport(t => t.Use(...))
.(...)
);
or something like this (anywhere else):
.With(...)
Configure.Transport(t => t.Use(...))
.(...)
.Start();
Configuring Rebus with a transport (i.e. a queuing system) is the minimum amount of configuration you can get away with.
Of course the two previous code sample do not compile, but they show the necessary parts. If you’re using Azure Service Bus in a console application, a fully functional configuration could look like this (only omitting the actual connection string):
using var activator = new BuiltinHandlerActivator();
.With(activator)
Configure.Transport(t => t.UseAzureServiceBus("Endpoint=sb://(...)", "your-queue"))
.Start();
.WriteLine("Press ENTER to quit");
Console.ReadLine(); Console
which will start a Rebus instance that receives messages from the
your-queue
Azure Service Bus queue. It will automatically
pick some sensible defaults, like e.g. using JSON message serialization,
using one single thread to process incoming messages, etc.
If you were using Micosoft’s generic host and therefore would use Microsoft DI as your application container, your configuration could look like this:
.AddRebus(
services=> configure
configure .Transport(t => t.UseAzureServiceBus("Endpoint=sb://(...)", "your-queue"))
);
Rebus’ configuration mechanism relies on C#’s extension methods a lot, so whenever you
install-package <some-kind-of-rebus-package>
chances are (at least if the package adheres to the usual patterns
found within most Rebus packages) there will be imported extension
methods into the Rebus.Config
namespace. This way, if
you’ve added
using Rebus.Config;
at the top of your code file, the imported extension methods automatically become available. All of the official Rebus packages adhere to this pattern.
Rebus can run in two modes
The configuration shown above uses the transport configuration extension
.UseAzureServiceBus(connectionString, "your-queue") t
which implies that there’s an input queue called
your-queue
, hence it is capable of receiving messages. To
configure a one-way client with Azure Service Bus, I would use the
.UseAzureServiceBusAsOneWayClient(connectionString) t
extension instead. Most transports, if not all, follow this pattern:
Use(...)
for the normal mode, and
Use(...)AsOneWayClient
for the one-way client mode.
To show a more realistic example, one could imagine a Rebus endpoint hosted with Microsoft’s generic host in a web app, using RabbitMQ as the transport, Microsoft DI to instantiate message handlers, enabling compression, using SQL Server to store sagas and timeouts – it could look like this:
var builder = WebApplication.CreateBuilder(args);
// add other things, e.g.
.Services.AddRazorPages();
builder
// add Rebus
var cfg = builder.Configuration;
var rabbit = cfg.GetConnectionString("RabbitMq");
var mssql = cfg.GetConnectionString("SqlServer");
.Services.AddRebus(
builder=> configure
configure .Transport(t => t.UseRabbitMq(rabbitmq, "your-queue"))
.Sagas(s => s.StoreInSqlServer(mssql, "Sagas", "SagaIndex"))
.Timeouts(t => t.StoreInSqlServer(mssql, "Timeouts"))
.Options(o => {
.EnableCompression(bodySizeThresholdBytes: 32768);
o.SetNumberOfWorkers(numberOfWorkers: 4);
o.SetMaxParallelism(maxParallelism: 25);
o})
);
.AddRebusHandler<MyMessageHandler>(); services
While the example in the previous section might be fairly realistic in what it configures, you will probably want to pull out various things into the application’s configuration file.
In doing this, the configuration can quickly become hard to oversee, because so many things end up being configured.
var builder = WebApplication.CreateBuilder(args);
// add other things, e.g.
.Services.AddRazorPages();
builder
// add Rebus
var cfg = builder.Configuration;
var rabbit = cfg.GetConnectionString("RabbitMq");
var mssql = cfg.GetConnectionString("SqlServer");
var settings = cfg.GetSection("AppSettings");
var inputQueueName = settings.GetValue<string>("RebusQueueName");
var gzipThreshold = settings.GetValue<int>("GzipThresholdBytes");
var numWorkers = settings.GetValue<int>("RebusNumberOfWorkers");
var maxParallelism = settings.GetValue<int>("RebusMaxParallelism");
var timeoutsTableName = settings.GetValue<string>("RebusTimeoutsTableName");
var sagasTable = settings.GetValue<string>("RebusSagasTableName");
var indexTable = settings.GetValue<string>("RebusSagaIndexTableName");
.Services.AddRebus(
builder=> configure
configure .Transport(t => t.UseRabbitMq(rabbitmq, inputQueueName))
.Sagas(s => s.StoreInSqlServer(mssql, sagasTable, indexTable))
.Timeouts(t => t.StoreInSqlServer(mssql, timeoutsTableName))
.Options(o => {
.EnableCompression(bodySizeThresholdBytes: gzipThreshold);
o.SetNumberOfWorkers(numberOfWorkers: numWorkers);
o.SetMaxParallelism(maxParallelism: maxParallelism);
o})
);
.AddRebusHandler<MyMessageHandler>(); services
It’s pretty simple: Move most of the Rebus configuration bits out into extension methods, which you then share between your applications using a NuGet package.
In countless projects, I’ve had extension methods in place that would allow me to do this:
var services = builder.Services;
var configuration = builder.Configuration;
.AddRebus(
services=> configure
configure .AsServer("my-queue-name", configuration)
);
where AsServer
could then look like this (pretending to
be using RabbitMQ + SQL Server as in the previous example):
public static class ServiceCollectionRebusExtensions
{
public static RebusConfigurer AsServer(this RebusConfigurer configurer,
string queueName, IConfiguration configuration)
{
var rabbit = cfg.GetConnectionString("RabbitMq");
var mssql = cfg.GetConnectionString("SqlServer");
var settings = cfg.GetSection("AppSettings");
var gzipThreshold = settings.GetValue<int>("GzipThresholdBytes");
var numWorkers = settings.GetValue<int>("RebusNumberOfWorkers");
var maxParallelism = settings.GetValue<int>("RebusMaxParallelism");
var timeoutsTableName = settings["RebusTimeoutsTableName"];
var sagasTable = settings["RebusSagasTableName"];
var indexTable = settings["RebusSagaIndexTableName"];
return configurer
.Transport(t => t.UseRabbitMq(rabbit, queueName))
.Sagas(s => s.StoreInSqlServer(mssql, sagasTable, indexTable))
.Timeouts(t => t.StoreInSqlServer(mssql, timeoutsTableName))
.Options(o => {
.EnableCompression(bodySizeThresholdBytes: gzipThreshold);
o.SetNumberOfWorkers(numberOfWorkers: numWorkers);
o.SetMaxParallelism(maxParallelism: maxParallelism);
o});
}
}
Of course this extension method might look a little bit overwhelming, but it’s neat in the sense that it helps with keeping Rebus configurations consistent across projects, and it hides all of the nitty-gritty parts of applying configuration.
At the time of writing this (it’s 2022 🙂), I would say the recommended IoC/DI container is Microsoft’s own, aptly named “Service Provider” (or “Microsoft.Extensions.DependencyInjection”), as it has quickly become very widespread. You will automatically be using it if you’re starting out with any of the new project templates for .NET Core/.NET for either web apps, daemon apps, or console jobs, because Microsoft’s generic host uses it internally. Also, it’s a pretty neat container that can do most of what even seasoned DI container users want.
The code samples shown so far with
.AddRebus(
services=> configure
configure .Transport(t => t.(...))
);
have been based on the Rebus.ServiceProvider NuGet package, which is Rebus’ integration with Microsoft’s DI container. In the following sections a few alternative containers will be presented.
Using Rebus together with Autofac is as simple as installing the Rebus.Autofac NuGet package, and then you can configure Rebus by going
.RegisterRebus(
builder=> configure
configure .Transport(t => t.(...))
);
on the container builder. If you need to resolve something from the
container to configure the bus, there’s an overload that passes
IComponentContext
to the configuration function, so you can
do something like this:
.RegisterRebus(
builder(configure, context) => configure
.Logging(l => l.Serilog(context.Resolve<Serilog.ILogger>()))
.Transport(t => {
var settings = context.Resolve<IBusSettings>();
.UseAzureServiceBus(settings.ConnectionString, settings.InputQueueName);
t})
);
Please note that Rebus will start itself (i.e. it will begin consuming messages) when the container is built, so
var container = builder.Build();
is where the action begins. If you want to delay starting the bus to
some other time, you can set startAutomatically: false
when
registering it
.RegisterRebus(
builder=> configure
configure .Transport(t => t.(...)),
: false
startAutomatically);
and then you can build the container
var container = builder.Build();
without starting the bus. The bus will be created and everything, it will just have 0 workers active. When you then think it’s time to get started, you can call the
.StartBus(); container
extension method.
Using Rebus together with SimpleInjector is as simple as importing the Rebus.SimpleInjector NuGet package, and then you can configure Rebus by adding code like
.RegisterRebus(
container=> configure
configure .Transport(T => t.(...))
);
to your application. When using SimpleInjector, it’s customary to call
.Verify(); container
at some point after having made all of the necessary registrations, but before the application starts resolving anything from it. This will make SimpleInjector iterate over all of its registrations and resolve them to see, if they can in fact be resolved.
By default, Rebus will start itself at this point! If this is not
what you want, set startAutomatically: false
when
registering the bus:
.RegisterRebus(
container=> configure
configure .Transport(T => t.(...)),
: false
startAutomatically);
and then call the
.StartBus(); container
extension method when the app is ready to start receiving messages.
While I don’t use it for new projects anymore, this IoC container is near and dear to my heart, because it’s the first one I was introduced to, and the first one I learned to use.
In opposition to other more modern containers, Castle Windsor does not have a clear separation between the registration phase and the resolution phase, which seems to have become best practice for most sensible containers.
This also means that the container provides no obvious time for Rebus to start itself, and so you must take care of that yourself. You can use Castle Windsor together with Rebus by importing the Rebus.Castle.Windsor NuGet package, and then you configure it like this:
// container is IWindsorContainer
.With(new CastleWindsorContainerAdapter(container))
Configure.Transport(t => t.(...))
.Start();
which will configure Rebus and start it immediately! If this is not
what you want, you can delay starting the bus to some other time by
calling Create
at the end like this:
// container is IWindsorContainer
var starter = Configure.With(new CastleWindsorContainerAdapter(container))
.Transport(t => t.(...))
.Create();
// starter is IBusStarter
and then call
.Start(); starter
when it suits you.
The Rebus.Castle.Windsor NuGet package can also help you register
your Rebus handlers, e.g. with the RegisterHandler<>
method:
.RegisterHandler<SomeMessageHandler>(); container
which will register SomeMessageHandler
in the correct
way. A couple of assembly-scanning autoregistration helpers are also
available, so e.g.
.AutoRegisterHandlersFromAssemblyOf<SomeMessageHandler>(); container
will scan the entire assembly that SomeMessageHandler
resides in, registering all of the found Rebus handlers in the
container.
If you make use of any of the assembly-scanning autoregistration methods, please back your code up by automated tests that verify that all of the expected message handlers have been found and registered as expected.
Regardless of which IoC container integration package you are using, the package will make the following registrations in the container:
IBus
: Rebus’ main interface which you’ll resolve to
send/publish messages, etc.ISyncBus
: Rebus’ synchronous API – holds synchronous
versions of all IBus
methods, i.e. instead of returning
Task
they’re just void
methodIMessageContext
: Rebus’ message context – can ONLY be
resolved when inside a Rebus message handlerRebus (actually: the RebusBus
instance, which is Rebus’
implementation of IBus
) is meant to be a singleton, which
is created once and then disposed when your application shuts down. It’s
important that it’s disposed properly, because it’ll allow for all
currently active message handlers to finish what they’re doing before
shutting down (of course without accepting any new messages in…)
All of the available IoC container integration packages help with
enforcing this, as the registered IBus
implementation will
be configured to be a singleton. Also, if the container provides any way
of doing this, it will also automatically dispose the bus properly, when
the container is disposed.
In this chapter I’ll go through a couple of concepts and explain what they are, and then for each concept I will show how it’s represented in Rebus.
What is a “message”?
It’s pretty simply, really: A message is a discrete little nugget of information that lends itself well to be transported from a sender to a recipient.
The sender and the recipient then agree, somehow, on some kind of protocol around this message – usually, this will include some kind of agreement that:
In order to implement this in a way that is practical when machines are also involved, you will almost always come across messages in the form of
and
where the headers are stored in a general format (thus allowing them to be read by anyone), and the body is just raw data encoded in a format that is then specified by one of the headers.
The thing that differentiates headers from the body of a message, is that the headers can be read and interpreted by anyone, because they’re represented in an already agreed-upon format, which is usually also supported by the queuing system.
Headers are often as simple as a list of key-value pairs.
Typical headers could be information like:
but also more esoteric user-provided information like:
or whatever the developers feels like.
Leveraging the headers of messages flowing through systems can be a powerful way of extending the system with new cross-cutting features, but since it’s possible to mess up things in hard-to-debug ways, it is generally advised to leave the headers alone. ☠️
All of the normal Rebus header keys can be found as constants on the
Headers
class, e.g. like Headers.MessageId
and
Headers.ContentType
.
Here’s an example of some headers found on a typical Rebus message along with an explanation of each:
rbs2-msg-id
:
29f9b537-1bf2-42e9-9049-86b3f38836b0
The message ID is automatically provided by Rebus whenever you send
or publish a message. It defaults to
Guid.NewGuid().ToString()
, which means that you are sure to
get a globally unique ID for every single message.
If you want, you can override this header, by setting it to something yourself – but please be careful that you understand how Rebus tracks failed messages (and how Fleet Manager correlates activity for each message, if you’re using Fleet Manager!), before you get too creative with customized message IDs. 👻
When you publish an event, and the event is delivered to multiple subscribers (either because the transport supports multicast and delivers a copy to each, or because Rebus does the hard work of delivering each copy) the ID will be the same for all copies of that message.
rbs2-intent
: p2p
The “intent” is an indication of why the message was sent.
Possible values (automatically provided by Rebus) are p2p
(“point-to-point”, meaning that the message was SENT to one specific
recipient) and pub
(“publish”, meaning that the message was
published).
This information is not used by Rebus, but it is can be useful in debugging situations to understand the origin of a consumed message.
rbs2-return-address
:
payment-processor-backend@PRODSRV003
This header is the “return address” for a message, which will be the
queue that Rebus replies back to if the handler calls
await bus.Reply(...)
.
The value defaults to be the input queue of the sender, but it’s possible (and entirely legal) to set the return address to another endpoint if needed.
The message body then contains the actual contents of the message.
It is usually necessary to decode the headers and use information about the message type, encoding, etc. to be able to correctly decode the body. This way, it’s possible to encode messages differently (e.g. use difference serializers, GZip some of the messages, maybe even encrypt them etc.), and then still be able to decode the message as a recipient.
By default, Rebus will serialize messages as UTF8-encoded JSON and
set their rbs2-content-type
header to
application/json; encoding=utf-8
(which is the official
MIME-standardized way of expressing that).
When working with messaging systems, the most pervasive concept is that of a “queue”. You probably already have a pretty good grasp of the functionality that a queue can provide, but we’ll highlight the important properties here anyway, because they are key to building robust software.
When we are talking about queues in this document, we mean queues that are ASYNCHRONOUS and DURABLE. “ASYNCHRONOUS” in the sense that they do not wait for anything else when accepting a sent message, and “DURABLE” in the sense that when they accept a sent message (and program execution is returned back to you, as a message sender) then the message is persisted somewhere (most likely on disk in a database file somewhere, or by virtue of having been replicated a number of times to multiple nodes in a cluster).
These two properties lay the foundation for the most important guarantee that the queuing system provides:
👉 Messages – once sent – reach their destination 👈
This is really important, because it makes it possible for us to build programs that communicate with other programs, possibly running in another process/on another machine/in another cloud, and expect their correspondence to remain unbroken, even when network connections come and go, and sometimes something explodes somewhere, but you have a backup, and your programs just resume when they left off when you reboot the machines or restore the backup. Cool!
Most queues are FIFO (First-In-First-Out) by nature, i.e. messages end up being received in the order they were put in. While this property can be counted on as a loose property of the queue, it should NEVER be counted on in a strict sense, as many things can lead to messages being received and processed out-of-order.
When you’re working with Rebus, you will come across the concept of an “input queue”. There is nothing special about that queue, it’s just what we call the queue that a bus instance will receive messages from.
A Rebus instance always has at most 1 input queue. If it doesn’t have an input queue, then it’s a “one-way client”, which is a bus instance that can only be used to send and publish messages.
When a Rebus instance is started, it will check that its own input queue exists and create it if it doesn’t. It will then start consuming messages from it. It will go through ALL messages in the queue and try to handle them as best as it can.
A common misconception is that Rebus can “filter” messages somehow, i.e. that it will only consume messages which it can handle. There’s no way (at least not generally with queuing systems) to skip receiving certain messages, and so Rebus will receive each and every message in the queue.
Once received, it will try to process the message. Rebus will not know beforehand whether it can
so, as you can see, receiving a message means that Rebus must also try to handle it.
Another queue with significance (but without any special properties – it’s just a normal queue), is the “error queue” – or the “dead-letter queue”, which is probably a more common term for it. For legacy reasons, Rebus calls is the “error queue”, and by default it will automatically create it in the form of a queue named “error”.
When Rebus, for whatever reason, cannot handle a message it has received, it will move the message to the error queue to avoid having the message “clog the pipes”, so to speak. Rebus will default to make 5 attempts at trying to handle the message before moving it to the error queue, so many transient errors can be overcome by this little bit of extra persistence.
The attempts will happens very quickly through, as quickly as it can
5 times in a row, and then – the 6th time it seems the same message – Rebus will attach a header to it containing all of the caught exception details, and move the message to the error queue, this way making it possible to later diagnose what went wrong with handling this particular message.
Queues are the basic vehicle of delivery for messages. Some queuing systems make it possible to receive messages from other “things”, e.g. from “topics” or from “subscriptions” (looking at you, Azure Service Bus!), but that’s kind of wonky, and Rebus will not help you do that.
In the case of Azure Service Bus, Rebus will treat topics as topics (i.e. as “named hubs of interest”) and subscriptions as exactly that (which is also what RabbitMQ calls “binding”), which in turn means that a subscription BINDS a topic to a queue.
In other words, with a suitable subscription in place, a queue can get to receive copies of messages published to a topic.
We’ll talk some more about that in the chapter about routing!
On of the bigger questions when working with messaging is this: Where do the messages go?
To answer that, we should talk about message types. And not just “message types” as in “which .NET type does this particular data layout correspond to”, more like ARCHEtypes: Overarching categories that all message types can be divided into. From now on, we’ll call them archetypes, just to be clear about it.
In the literature, different archetypes are described, where the most prominent examples are
These message archetypes are usually used in different places in the code, and for different purposes. In the following section they will be described further.
Let’s start at the end: Document messages are data carriers, i.e. messages that simply transfer data. The idea about a documenbt message is not that the sender has any special agenda with it, it’s just that the sender wishes to make the data available, and so the data is transferred in one or more messages. It is then up to the receiver to decide what to do about it.
In a way, document messages could be considered orthogonal to the command/event distinction, as commands could carry a lot of date, and so can events, so we’ll talk a little bit about them again in a bit.
Command messages are messages that tell someone to do something. Therefore, they will most often be used e.g. when someone needs someone else’s help with achieving a task.
They are usually named in the imperative form,
e.g. ProcessOrder
, FulfillPayment
,
MoveCaseToArchive
, etc.
Generally, command messages will naturally be sent to one single –
and only 1 – recipient, as that’s the one that provides the necessary
service. E.g. if the command is ProcessOrder
, the recipient
could be the “Order Processor Service”, which has the “order_processor”
input queue.
Rebus has an operation that is meant for routing commands: SEND. In code it looks like this:
.Send(new ProcessOrder(...)); await bus
As you can see, there’s no destination specified in the code above,
so Rebus needs to be told BEFOREHAND where to send messages of type
ProcessOrder
– that’s what “endpoint mappings” are for.
Here’s how it’s done:
.AddRebus(
services=> configure
configure .(...)
.Routing(t => t.TypeBased()
.Map<ProcessOrder>("order_processor"))
);
With this endpoint mapping
(ProcessOrder => order_processor
) in place, the bus now
knows which queue should be the destination when you send
ProcessOrder
commands.
In some situations, it’s beneficial to cut the processing of something up into smaller increments each represented by a command for internal use in a service – for that purpose, Rebus has a special SEND LOCAL operation, which always sends to the sender’s own input queue:
.SendLocal(new ProcessOrder(...)); await bus
Since the destination is always the sender’s own input queue, no endpoint mappings are necessary to do this.
In other situations, it can be nice to be able to specify the destination more dynamically – that’s what the routing API is for! With it, you can specify the destination queue explicitly for each send operation:
var routing = bus.Advanced.Routing;
.Send("alternative_order_processor", new ProcessOrder(...)); await routing
(here sending the ProcessOrder
command to the
“alternative_order_processor” queue).
Event messages are messages that tell the recipient that something has happened. Therefore, they will most often be sent when something did in fact happen, and this knowledge should be broadcast to the world.
They are usually named in the past tense,
e.g. OrderProcessed
, PaymentFulfilled
,
CaseMovedToArchive
, etc. As you can probably also see by
now, it’s not uncommon that events follow commands, e.g. like
ProcessOrder
to
“order_processor”OrderProcessed
thus “surrounding” the business operation that they both refer to.
Routing events is slightly different from commands, as commands are sent directly to a queue. Events are PUBLISHED, and then a copy of the event is distributed to the input queue of anyone who SUBSCRIBED. This way, the routing of events is conceptually slightly higher level than routing commands: There’s a mapping table involved, and copies are distributed to 0..n destination queues.
Here’s how it’s done with Rebus: Either use a transport that has native support for publish/subscribe (e.g. like Azure Service Bus or RabbitMQ), or configure a shared subscription storage somewhere (e.g. if you’re using AmazonSQS, MSMQ, Azure Storage Queues, etc.). Exactly how you configure a subscription storage is described in the next chapter.
To subscribe to a message, the subscriber needs to call
.Subscribe<OrderProcessed>(); await bus
which is usually done whenever the application starts up.
Subscriptions are always persistent, so once this call has been made,
the subscriber will receive all published OrderProcessed
events in its input queue, for eternity, even though the subscriber is
not running. It’s still good practice though to leave the call to
await bus.Subscribe<OrderProcessed>()
as part of the
startup sequence of the subscriber, because this way it’s clear which
events it has subscribed to.
When the Order Processor Service then processed an order, it should
.Publish(new OrderProcessed(...)); await bus
which will then have the bus distribute copies of the event to all subscribers.
That’s basically all there is to it. 🙂 Underneath it all, Rebus uses “topic-based routing” to map published events to subscribers. Internally, Rebus will deduce a TOPIC like
OrderProcessorService.Messages.OrderProcessed, OrderProcessorService.Messages
from the OrderProcessed
event type mentioned above –
it’s a string that defaults to be the simple, assembly-qualified name of
the .NET type (i.e. full type name, including namespace and assembly
name, just stripped from version, culture, and public key token).
This string then identifies the topic, which can then be referenced by both the publisher and the subscriber, in turn allowing them to find each other without either one of them knowing that other one exists.
This is called
👉 Loose coupling 👈
and it’s a great way for systems to communicate.
Some queuing systems (e.g. like Azure Service Bus and Rabbit) support topic-based routing out of the box. This way, you can provide your Rebus instance with a connection string for any of them and just let the transport handle the bindings between topics and queues.
Other, more crude queuing systems, like AmazonSQS and MSMQ, do NOT support topic-based routing!
But if you’re using Rebus, you’re in luck, because Rebus can “polyfill” this capability by configuring it with a subscription storage. Rebus will then use the subscription storage to map between topics and subscriber queues, thus enabling the exact same
// anywhere you like:
.Subscribe<SomeEvent>();
await bus
// also anywhere you like:
.Publish(new SomeEvent(...)); await bus
experience with these transports too. Btw. a “subscription storage”
is just a central database, that all your Rebus instances can then use
as a central registry for mappings from topics to subscriber queues.
Configuring one can be as simple as importing the Rebus.SqlServer NuGet
package and then calling StoreInSqlServer
in the
Subscriptions
configurer like so:
.AddRebus(
services=> configure
configure .Transport(t => t.UseAzureStorageQueues(...))
.Subscriptions(s => s.StoreInSqlServer(..., isCentralized: true))
);
PLEASE NOTE the use of isCentralized: true
in the
configuration – it means that all Rebus instances can expect to reach
the same SQL Server instance, thus enabling subscribers to register
themselves directly in the database when they call
.Subscribe<SomeEvent>();
await bus// 👆 will insert row like 👇
// | TOPIC | QUEUE |
// | Messages.SomeEvent, Messages | subscriber_queue |
//
// in the subscription database...
This is by far the simplest way of doing it, because the database simply works as a central registry of subscribers. This also provides a place to go look, if you want to see who is currently subscribed to what.
Another mode is the “fully decentralized mode”, where subscribers do NOT have access to a subscription storage. In this mode, the queuing system is the only thing that is shared between publishers and subscriber, and so a subscriber must register its subscription by sending a message to the subscriber.
To be able to do that, the subscriber must know the input queue name of the publisher, which you can tell it with an endpoint mapping:
.AddRebus(
services=> configure
configure .(...)
.Routing(t => t.TypeBased()
.Map<OrderProcessed>("order_processor"))
);
Subscribers equipped with the endpoint mapping shown above can then go call:
.Subscribe<OrderProcessed>();
await bus// but now this 👆 will send
// an internal SubscribeRequest to
// the publisher, asking it to sign
// up the subscriber
thus making it possible for publishers to keep their subscription storage private.
Remember the distinction between “business” and “technical” level sagas.
Hybrid applications etc etc.
FakeBus
to record bus eventsSagaFixture
to test your sagasIntegration testing Rebus endpoints using in-mem persistence and queues.
Implement routing of transport messages.
Something about implementing an alternative to “competing consumers” with Rebus.
Move arbitrarily large chunks of data with the data bus.
Using Rebus from deep within a call hierarchy e.g. in ASP.NET and WPF.
Hosting Rebus-based applications in various environments. Remember point about some kind of manager on the host that ensures that crashed applications be restarted.
From afar, queue-based brokers and log-based brokers may seem interchangeable, and in theory you could model one with the other – but they have a bunch of differences that would make it difficult and pretty wonky outside of an academic setting to try to switch from queues to logs or from logs to queues in most realistic scenarios!
The thing is, they have some very different properties, which end up being reflected in how you will model your messages and message correspondances.
Log-based brokers provide a strict guarantee of the ordering of messages, messages are not deleted as they are read, and consumers themselves keep track of which messages they have read, which they can easily do because messages are numbered sequentially. This means that consumers can be absolutely sure that they have never processed one particular message before, especially if they chose to commit their read positions transactionally along with whatever work they’re doing.
To distribute work among multiple processes, log-based brokers offer the concept of “consumer groups”, which means that the broker helps with distributing messages among multiple consumer instances. Since this introduces concurrency and non-determinism between individual consumer instances, it requires that the message producer provides each message with a “partition key”, which the broker then hashes and uses to assign the message to a “partition”, which in turn gets assigned/revoked to individual consumer instances as they come and go, thus guaranteeing that messages within each partition key are consumed by the same consumer instance. Therefore, consumers can rely on message ordering within each partition key.
Because of the strict ordering, if a log-based consumer encounters an
error (e.g. a NullReferenceException
😅), it has no other
choice than to stop processing messages if ordering must be preserved.
Of course it could write the poison message to an error topic,
or it could simply skip processing the message, but that would ruin the
ordering.
Queue-based brokers do NOT provide a strict guarantee of message ordering. Or, to be more precise: They often do, but you are advised to not actually count on it too much. The thing is, you’ll almost always want to have some kind of parallelism enabled, either within each consumer instance, or by deploying competing consumers, which causes non-deterministic ordering of messages, at least within messages that are close to each other in the queue.
…And then a message fails to be processed and gets moved to the dead-letter queue. 😨 Then the consumers happily continue processing all the other messages, and then after an hour or so, when you have fixed the bug in the consumer, you then move the message back into the consumer’s queue (the message’s “source queue”), which causes an hour-old message to be processed AFTER all the other messages.
Queue-based systems must be able to handle this!
In practice, if you want to be VERY strict about it, it can be quite difficult to guarantee across the board, and doing so would require sacrificing some of the nice things about queue-based messaging. So, in practice, you’ll usually be strict about it in a few places and then tolerate reordered messages to an acceptable degree in all the other places.
Which degress is then acceptable? Only you, The Modeler Of The System And Its Message Correspondances can know that. 😉 Sometimes you can get away with structuring your domain model so that ordered message processing is not even required – this is the best thing to do, because then each message can be processed individually, and you don’t have to worry. Other tricks, like comparing state, timestamps, revision numbers, etc. can also be employed.
Queue-based consumers will usually receive and handle one single message at a time, and then concurrency is used to increase throughput by increasing parallelism, but the individual consumer deals with a single message at a time.
This is neat, because the handling of the message can be treated as a single unit of work: Either it is successfully completed, or it fails!
It lends itself very well to the concepts available in C#, for example we can
which is neat when writing message handling code, as data (the message) and logic (the message handler) are clearly represented, can be easily understood, can be exercised in automated tests, etc.
With log-based brokers, consumers will usually be able to receive messages in batches, which makes it possible to achieve much higher throughput than if messages were received individually – but then the consumer has a BATCH of messages to handle, e.g. 10, 100, 1000, or 100k, which then requires that the consumer makes up its mind about how to model its unit of work boundary.
Does it handle the messages sequentially, one at a time? This way, each message can succeed individually, and if the last one fails, at least the preceding messages were processed. This will most likely be slow though, because there’s no parallelism.
Does it handle the messages in parallel? This will mess up the ordering of the messages, so probably no.
Does it create one large unit of work for the entire batch (or for chunks of the batch)? This can be great, but then if the last message fails, the entire batch (or the chunk) could not be processed.
In practice, you’ll probably find that message handling code looks very different between queue-based consumers and log-based consumers. 🙂
When you’re deploying a new subscriber into a system with event-based messages, you almost always have to put some thought into how the subscriber should handle the fact that so much history has gone by without it receiving one single event.
In a queue-based environment, the subscriber has no way to get the history of events, so it may require that you seed its database (if it has one) with a one-off job or some artificially generated event messages.
With log-based brokers, depending on the configuration, topics may contain all of the history that a new subscriber needs, and so this can often be fairly easy, maybe even trivial, to introduce new subscribers.
…which leads me to:
This is probably more on the OPS side of the “Queues vs. logs”-discussion, but in these DevOps days we should also talk about this. 🙂 + it also has a connection to the subscriber bootstrapping problem.
With queue-based brokers you’ll probably have to think only a little bit about storage, maybe by deciding that “OK we’ll just say that the maximum size of each queue is 10 GB”, or something like that, and then that will be enough in practice to satisfy all of your storage needs. When messages are put into the queue they subtract a little bit from the queue’s available storage, and once a consumer takes a message out of the queue and ACKs having processed it, it adds a little bit back to the available storage for that queue again.
With log-based brokers you will have to decide how much history you allocate space for (probably per topic), e.g. you can decide to store 7 days of messages, or you can decide to store all messages since the beginning of time. To complement this, you can also decide on a “compaction strategy”, which basically consists of deciding on a key of the message within which the broker guarantees to always keep at least the last one – this way the broker can optimise storage use by removing old messages within a specific key value.
As you can probably see, you can remove the bootstrapping problem completely, or you can reduce it by providing some limited history of events to new subscribers. Or you can of course still have situations where the stored history is not enough, and you will need to seed the databases of subscribers manually somehow. YMMV 😉