Negotiation

All methods for handling versioning covered thus far have assumed single direction traffic from a producer to N consumers. This is a good model to consider for messaging systems, but it also comes with some limitations, such as the need for every consumer to understand what the producer publishes.

In the weak-schema and basic versioning chapters, we looked at strategies for making changes to messages in a versioning-friendly way. In particular, with weak and hybrid schema, the focus was on making changes such that they were version-insensitive for consumers.

There are other models available that do not make this assumption. Some transports support bi-directional communications, which can be used to negotiate what format a message will arrive in. The most common of such transports in use is HTTP.

Atom

Atom feeds, though designed for blogs, are a common distribution method for event streams. The Atom protocol has been around for a long time and is supported by a huge number of things on the Internet; Chrome even has a custom Atom viewer built in. Overall, the protocol is relatively simple, but it can be a great tool when dealing with versioning.

When going to an Atom feed, a document is returned with the latest events, as well as rel links that allow paging through the feed.

 1   "links": [
 2     {
 3       "uri": "http://127.0.0.1:2113/streams/newstream",
 4       "relation": "self"
 5     },
 6     {
 7       "uri": "http://127.0.0.1:2113/streams/newstream/head/backward/20",
 8       "relation": "first"
 9     },
10     {
11       "uri": "http://127.0.0.1:2113/streams/newstream/3/forward/20",
12       "relation": "previous"
13     },
14     {
15       "uri": "http://127.0.0.1:2113/streams/newstream/metadata",
16       "relation": "metadata"
17     }
18   ]

Note: The json version of a feed being used here, as it is easier on the eyes than an XML feed.

These links are defined in RFC 5005 and allow for the paging of a larger feed via a set of smaller documents. As an example, to read the entire feed, use the last link then follow the previous link until the previous link disappears (at the head of the feed). This allows a subscriber to navigate through the overall feed.

Each page of the feed is then a document that contains links back to documents that describe the individual events.

 1     {
 2       "title": "1@newstream",
 3       "id": "http://127.0.0.1:2113/streams/newstream/1",
 4       "updated": "2017-02-06T15:23:24.30135Z",
 5       "author": {
 6         "name": "EventStore"
 7       },
 8       "summary": "SomeEvent",
 9       "links": [
10         {
11           "uri": "http://127.0.0.1:2113/streams/newstream/1",
12           "relation": "edit"
13         },
14         {
15           "uri": "http://127.0.0.1:2113/streams/newstream/1",
16           "relation": "alternate"
17         }
18       ]
19     },
20     {
21       "title": "0@newstream",
22       "id": "http://127.0.0.1:2113/streams/newstream/0",
23       "updated": "2017-02-06T15:23:20.986492Z",
24       "author": {
25         "name": "EventStore"
26       },
27       "summary": "SomeEvent",
28       "links": [
29         {
30           "uri": "http://127.0.0.1:2113/streams/newstream/0",
31           "relation": "edit"
32         },
33         {
34           "uri": "http://127.0.0.1:2113/streams/newstream/0",
35           "relation": "alternate"
36         }
37       ]

The alternate rel link points back to an individual event. A client would then get http://127.0.0.1:2113/streams/newstream/0 and http://127.0.0.1:2113/streams/newstream/1 in order to get the events that make up the feed. As new events arrive in the feed, new pages and URIs for events will be created. This allows a client to follow the feed and continually receive events as they occur in the system.

An interesting side benefit worth noting here is that the URIs and the pages are both immutable. As such, they can be infinitely cached, allowing systems to benefit from heavy use of off-the-shelf caching to enable better scalability and geographic distribution.

From a versioning perspective, the most important feature is that the client is requesting the event from the URI. As the client is making this request, it is also free to include an “Accept: something” header with it. This can tell the backend what format the client understands so it can provide it in an appropriate format. Client A may specify it understands vnd.company.com+event_v3 and Client B may specify it understands vnd.company.com+event_v5. Each client could then receive the event in the format it understands.

While HTTP in general and Atom in particular are not the only protocols to support such an interaction, they are, by far, the most popular. A similar protocol could, however, be implemented over nearly any transport, even message queues such as rabbitmq.

In such a case, the key to the protocol is that the payload is not sent via the main feed; a URI or some other accessor is sent instead, following which the client determines how it would like to receive the payload. As part of this process, the client can determine which version of the payload it can understand and ensure it gets that one. This is implemented by sending a message over the transport such as:

1 MessageAvailable {
2 	id : '0dd5976d-ad99-4333-b4e1-6b3b87af2f8f',
3 	type : 'fooOccurred'
4 	uri : 'http://mycompany.com/messages/0dd5976d-ad99-4333-b4e1-6b3b87af2f8f'
5 }

Upon receipt, the client could then do a HTTP GET to the provided URI and content-type negotiate the message. Note that there is a unique URI for each message. Often, this is implemented by then converting the message; however, it could also just be that the producer has prepopulated a few versions into a key/value store and provides multiple URIs as rel links in the message to let the clients know where the messages are. For example:

1 <MessageAvailable id="0dd5976d-ad99-4333-b4e1-6b3b87af2f8f">
2 	<link rel="data" type="application/xml" href="http://kv/0dd5976d-ad99-4333-b4e\
3 1-6b3b87af2f8f/xml"/> 
4 	<link rel="data" type="application/json" href="http://kv/0dd5976d-ad99-4333-b4\
5 e1-6b3b87af2f8f/json"/>
6 </MessageAvailable>

Custom media types can also be used.

An obvious issue that arises with this style of interchanging messages is in how the flow of a message then works. An announcement message is sent over the queue or previously over the Atom feed. Upon receipt of the announcement, the client must then fetch the message in the format they prefer. While quite flexible, this method is less than optimal in terms of performance, as it requires a request per message.

Move Negotiation Forward

An astute reader will notice that, for most systems, a consumer that wants version 2 of a message will for a long period of time continue to want version 2 of the message. Handling the content-type negotiation on a per-message basis will be wasteful, as it will want the same version every time for a long period of time.

Some message transports and middleware support the concept of moving content-type negotations forward. A consumer will register the content types it is interested in ahead of time and the publisher will then push the events in one of the pre-determined formats, as opposed to sending an announcement and requiring the consumer to negotiate per message.

While many transports support this, it can also be implemented relatively easily on top of any existing transport, providing a control channel is also provided (assuming an unidirectional transport, such as a message queue). In such a case, the consumer sends all of the content types it is capable of receiving beforehand. The publisher then converts a given message to the best match content type on its end before sending it to the consumer.

There are a few benefits here. Obviously, not needing to negotiate per message when changes only happen rarely, often on the order of months, has some performance advantages. Another less looked at benefit is that, if using type-based schema, only the translation code needs to be updated. Consumers can be updated as needed and can still get the old version of the message they understand. This is particularly valuable when there are a large number of consumers often managed by a large number of teams. If a new field is to be added to an event, only the teams that need to use it need support for it.

In practice, for all content-type negotiation there is also often a depreciation period, where very old versions of messages gradually cease to be supported.

Although it requires more work and is slightly less flexible than negotiation per request, moving negotiation forward can be useful in a large number of scenarios. The main circumstance in which you might consider using it is if you need to push more than roughly 100 messages per second to your consumers. When pushing larger quantities of messages, the ability to not negotiate on a per-message basis significantly improves performance.

How to Translate

For both per message content-type negotiation and moving the negotiation forward, the producer will need to translate between versions of the message. The translations can happen either using type-based schema or weak schema.

The code will receive a message of version Vx. It will then need to translate that message to the version that the consumer wants. This process is very similar to the upcasting of events using type-based schema that was looked at in the basic versioning chapter.

1 InventoryItemDeactivated_v2 ConvertFrom(InventoryItemDeactivated_v1 e) {
2     return new InventoryItemDeactivated_v2(e.Id, "Before we cared about reason.\
3 ");
4 }

One major difference between the upcasting seen with type-based schema is it is often also needed to downcast. Given an event on version 12, it is common to need to convert it to version 9 for a given consumer. In such a case, there are two methods: to convert from v1 to v2, and to convert from v2 to v1.

1 InventoryItemDeactivated_v2 ConvertFrom(InventoryItemDeactivated_v1 e) {
2     return new InventoryItemDeactivated_v2(e.Id, "Before we cared about reason.\
3 ");
4 }
5 
6 
7 InventoryItemDeactivated_v1 ConvertFrom(InventoryItemDeactivated_v2 e) {
8     return new InventoryItemDeactivated_v1(e.Id);
9 }

These methods then get registered, allowing the conversion from one version to any other. Occasionally, it can be preferred if there are many versions of events to allow skipping methods — for example, converting from v1 to v9 directly as opposed to performing nine conversions.

Weak schema follows a similar pattern, though it can often do it in a single step as opposed to converting each version, as more compatibility rules are to be adhered to with weak schema.

Overall, the conversion between versions of events is relatively straightforward code, though it can quickly become large with a large number of events and a large number of versions. If you find you are encountering a bit of a code explosion, it might be worth considering depreciating old versions of events.

Use with Structural Data

Thus far, the discussion has revolved around the versioning of messages. The same pattern, however, can be used for structural data. It is quite common in integration scenarios to have a large number of consumers that are interested in getting an updated structural view of data as it changes.

A perfect example of this came up a few years ago when I was looking at an integration scenario at a financial institution. The system in question exists in almost every financial organization and is known as “security master”. Security master holds all the metadata associated with CUSIPs (unique identifiers of securities). Almost everything at the institution needs access to this metadata. For each security, it contains information such as market cap, links to other securities, and whether or not it is tradeable. It is common for a financial institution to have hundreds or even thousands of integrations to security master.

Before discussing how a similar pattern to the message versioning was used here, it is worth discussing how the existing system was implemented. The company was largely focused on Oracle databases. The system for managing the data was a relatively simple CRUD application that updated the data in a few tables in the Oracle database. Consumers, for the most part, were interested in one of three views. They either wanted summary information, a detailed view, or raw data. The tables in the master system were then replicated to all other databases, with the master system being the single source. As such, these tables would live in the database of every consumer. If you wanted access to the information, you would open a ticket with IT and they would set up a new replication to your database, with the tables then updating accordingly.

While this replication scheme seems reasonable with a small-scale intergration, remember there were more than 1000 integrations done this way! A new design was thought up that operated very similarly to the message negotiation system above. Changes to security master were written as events. Consumers all received an Atom feed representing the changes that had occurred. As in the message system, these links pointed back to a small service that would handle the requests.

//TODO insert diagram

Instead of converting the messages to different views, this service would instead replay all events up to a given event. An example URI might be http://bank.com/securities/AAPL/5, in which AAPL is the security and 5 is the event up to which you’ve requested information. This code looked very similar to any other projection and would just return the state as of event 5.

One added benefit of this URI scheme is that, given the immutability of URIs, AAPL at event 5 will always be the same, allowing for caching. Another is that consumers will get a state per change, thereby not always getting the latest value, which can miss a change if these are multiple.

It is important to note that the service responsible for generating the varying structural views of the state at a given point is at most a few hundred lines of code. It basically just has a few small in-memory projections to build the varying structural views and return them.

Upon receiving a new link via the Atom feed, consumers could then content-type negotiate what they wanted to receive. Most clients just needed summary information and could use vnd.bank.com.security_summary+json. Others who needed the detailed view could use vnd.bank.com.security_detail+json. On top of this, tickets were no longer required in order to get access to the information. The security metadata was not secured information, and so URIs containing the information could be published. You could get per security, per market, or all changes via Atom feeds. If you needed information on a new staging server, you could just point to the URI that matches what you need and you were off.

Where this method really shines, though, is when a change eventually happens. The United States government, for example, might decide to track new terrorism information. Very few systems will actually care about this, but some may. By introducing a new vnd.bank.com.security_detail_2+json, the systems that need access to this information can be updated while all of the other systems will continue working with the first version of the security detail without needing an update.

While this pattern is not directly related to versioning of Event Sourced systems, it is a useful application of the same pattern. It is also another style of integration for Event Sourced systems where structure is used instead of shipping events. It is a good tool to have in your toolbox.