Asynchronous denormalization and transactional messaging with MongoDB change streams
MongoDB provides a very useful set of abstractions for building a reliable service. I've found in practice, few databases can rival its indexing, resiliency (automated failover, even between regions; sharding) and consistency (primary and secondary reads) characteristics and their corresponding client library support. Of course, for all that, you give up flexible transaction boundaries (among other things). With a traditional RDBMS, you can describe the data you need to fit your business model, and worry about isolating writes among different tables later. With document-stores like MongoDB, you must design your documents around sensible isolation boundaries and query patterns.
So, when joins are needed, you're typically advised to denormalize the joined data in order to keep queries to a single document, but at the same time MongoDB provides little facilities for denormalizing data!
Enter change streams: using change streams, we can effectively create eventually consistent materialized views which reliably denormalize data by listening to when data changes, updating related documents as needed. Change streams also enable other asynchronous patterns like ordered, reliable, at-least-once event publishing, asynchronous callbacks, sagas, etc. They effectively allow us to build distributed transactions at the application layer. These have some of the characteristics of transactions–namely, they will eventually succeed–but we intentionally give up isolation for performance. That is, if our change listener is going to update a document B as a result of a write to document A, then queries may see B after A's write but before B's update.
This is easier said than done. Change streams provide the essential change data capture primitive to make this happen, but there is more work to be done to achieve the higher level abstractions described above. Fortunately, it not too difficult if you know what to watch out for.
Denormalizing in MongoDB
Without change streams (or tailing the oplog), we can still try to denormalize.
The problem is race conditions. Let's say on write to a document A, we look up some information from document B.
| Time | Process | Operation | 
|---|---|---|
| t1 | p1 | Receives write of A. Looks up joined B data. Data not found. | 
| t2 | p2 | Writes related B data | 
| t3 | p1 | Writes to A without joined B data. Will not be consistent until another write! | 
This race occurs due to possible non-repeatable and phantom reads. If we try to also add a lookup of A while writing to B, we have a similar race: the related A document may not yet exist, or, if our update depends on the state of A, may be updated concurrently.
The only way you could recover in this scenario is by having a background "repair" process which periodically polls and fixes inconsistencies.
With change streams, we can do better. If we listen to both the writes of A and B, we will always eventually denormalize with consistent data.
| Time | Process | Operation | 
|---|---|---|
| t1 | p1 | Write to A | 
| t2 | pN | Receives A change. Looks up joined B data, not found. | 
| t3 | p2 | Write to joined B data | 
| t4 | pN | Receives B change. Looks up A, found. Updated. | 
| Time | Process | Operation | 
|---|---|---|
| t1 | p2 | Write to joined B data | 
| t2 | pN | Receives B change. Looks up A, not found. | 
| t3 | p1 | Insert A | 
| t4 | pN | Receives A change. Looks up B, found. Updated. | 
You can still also do a lookup on write if you like, as an imperfect consistency improvement.
Closing change stream gaps
Change streams can enable this, but there are a few problems for a production, scalable solution:
- There is no tracking of where you left off when your process restarts
- Each listening process will receive all changes, but we only want each to be acted on once, and in order
To workaround problems with multiple listener processes, you might be able to isolate your listener (apart from, say, the rest of your service API), and enforce a single replica via a scheduler like kubernetes. Personally, I prefer the cohesiveness of a single deployment. Additionally, if I deploy to multiple kubernetes clusters for redundancy, then kubernetes abstractions alone cannot enforce 1 listener replica globally across all clusters, nevermind if I'm not using kubernetes at all.
Instead, we put our listener behind a pessimistic lock. We can implement such a lock using MongoDB itself. Its findAndModify operation with upsert, majority write concern, and a unique index on an identifier of the locked resource will suffice.
Using the java client, such an upsert looks something like:
    final BsonDocument found = collection
        .withWriteConcern(WriteConcern.MAJORITY)
        .findOneAndUpdate(
            and(
                eq("_id", resource),
                or(
                    // Lock is expired...
                    or(
                        eq("expiresAt", null), 
                        not(exists("expiresAt")), 
                        lte("expiresAt", clock.instant()),
                    // ...or we own the lock
                    eq("listenerId", listenerId))),
            combine(
                set("listenerId", listenerId),
                set("expiresAt", clock.instant().plus(leaseTime)))),
            new FindOneAndUpdateOptions()
                .upsert(true));This will extend our lock if we currently have it, and attempt to acquire it otherwise. By using findAndModify, no two listenerIds can acquire the lock at the same time, because MongoDB isolates the "find and modify" from concurrent reads or writes. If the lock does not yet exist, no two listeners will acquire it, because the implicit unique index on _id will prevent more than one insert.
So, before listening, try to lock some resource (say a collection, for simplicity). Then, listen. While listening, keep pinging the lock to extend the lease. If you lose the lock, stop listening. There's more to it than that, but that's it at a high level. We'll come back to this.
The next requirement is to track where we left off when our process inevitably restarts. Simple: after we've successfully processed a change, save the change's resume token back to our lock document. If this update fails, then we must retry indefinitely or stop our listener (you must not progress to the next change until we've saved the resume token; this maintains ordered processing). Effectively this is a change "acknowledgement," quite analogous to the acknowledgements message brokers use to also implement at-least-once guaranteed delivery.
  public void commit(String resource, BsonDocument resumeToken) 
      throws LostLockException {
    UpdateResult result = collection
        // Again we need a majority write concern to protect our processed changes'
        // resume tokens from rollbacks.
        .withWriteConcern(WriteConcern.MAJORITY)
        .updateOne(
            and(
                eq("_id", resource),
                eq("listenerId", listenerId)),
            combine(
                set("expiresAt", clock.instant().plus(leaseTime)),
                set("resumeToken", resumeToken)));
    if (result.getMatchedCount() == 0) {
      throw new LostLockException();
    }
  }Additionally, when initially acquiring the lock, read back the resume token to use when we start listening.
    final BsonDocument found = collection
        .withWriteConcern(WriteConcern.MAJORITY)
        .findOneAndUpdate(
            and(
                eq("_id", resource),
                or(
                    // Lock is expired...
                    or(
                        eq("expiresAt", null),
                        not(exists("expiresAt")),
                        lte("expiresAt", clock.instant()),
                    // ...or we own the lock
                    eq("listenerId", listenerId))),
            combine(
                set("listenerId", listenerId),
                set("expiresAt", clock.instant().plus(leaseTime)))),
            new FindOneAndUpdateOptions()
                .projection(include("resumeToken"))
                .returnDocument(ReturnDocument.AFTER)
                .upsert(true));We're almost good to go. The problem is our lock is fragile: we may lose it at any time without knowing. A thread or its process may pause arbitrarily, at any time, such as due to a garbage collection pause, CPU steal, hypervisor migration, or any number of reasons beyond our control. Any synchronous remote calls during change processing can also experience arbitrary delays, during which the lock may also expire. We can try to mitigate it by setting very long lease times relative to our periodic refresh, but now when our process crashes, it takes all of that time before we start doing any useful work again.
Detecting lost locks
Martin Kleppmann describes the solution well in his article about distributed locking. The solution boils down to augmenting our approach in two ways:
- Processing a change must participate in the lock
- We need a strictly monotonically increasing version number (or "fencing token") to use during processing
Let's break those down a little bit. As we identified already, alone there's no way we can guarantee the work we do with the lock will aways be done while the lock is owned by our thread. So it follows that the work itself (such as an update to denormalize some data) must unfortunately participate in the lock algorithm. This is where our monotonically increasing value comes in.
In our denormalization example, give each document a version number. This number starts at 0 and increases with every write to the document (you may have such a version number anyway already to detect concurrent read-modify-write cycles with the same document). When we receive a change to document B, we update the related document A with the denormaized data and also the version of B used. Not only that, but when we do this update, we will query not just on the identifiers for A but for this version: for reference we'll call it bVersion. That is, we will only update a document that is both related to B and has a bVersion less than the B version we received in the change. If our lock has expired, and, crucially, another thread has taken the lock and acted on this change, the bVersion will be equal or greater to the bVersion we are currently processing. Thus, we will not update A. In those cases where our update does not match a document, we know we have lost our lock and therefore stop our listener.
| Time | Process | Operation | 
|---|---|---|
| t1 | pN | Write to joined B data, version 2 (v2). | 
| t2 | p1 | Receives B change v2. Process pauses! | 
| t3 | - | p1's lock expires. | 
| t4 | p2 | Acquires lock. Begins listening where last acknowledged, receiving B change v2. | 
| t5 | p2 | Writes B v2 denormalized state to A. | 
| t6 | pN | Writes B v3. | 
| t7 | p2 | Receives B v3 change, writes denormalized state to A. This succeeds because version 3 is greater than version 2. | 
| t8 | p1 | Process unpauses. Attempts write of B v2 state to A. This fails because version 2 is not greater than version 3. Process stops listening. | 
As you can see, using the fencing token, even when our lock expires out from under us, we will still eventually detect it. In this way, our lock's expiration timestamp is really a simple optimization. It's not actually effective in enforcing the lock, but it is an easy way to avoid an excessive amount of contention. The enforcement of the lock ultimately comes down to an atomic single document update-if-current; essentially, the same paradigm as commonly used in optimistic locking and multi-version concurrency control (MVCC).
In the above example, our process is already acting on versioned state, so we can use that version. Absent that, we can generate an independent version: a version of the lock. When we acquire the lock, increment a version, and return that along with the last acknowledged resume token.
All together, this looks like:
  public Optional<ListenerLock> acquireOrRefreshFor(String resource) {
    try {
      final BsonDocument found = collection
          .withWriteConcern(WriteConcern.MAJORITY)
          .findOneAndUpdate(
              and(
                  eq("_id", resource),
                  or(lockIsExpired(), eq("listenerId", listenerId))),
              // This uses the ability to use an aggregation pipeline in an update new 
              // with MongoDB 4.2 in order to conditionally increment the version. A 
              // simple always-incrementing counter works fine, too, it just gets 
              // redundantly incremented on refreshes.
              singletonList(combine(
                  set("listenerId", listenerId),
                  set("version", sameIfRefreshOtherwiseIncrement()),
                  set("expiresAt", clock.instant().plus(leaseTime)))),
              new FindOneAndUpdateOptions()
                  .projection(include("resumeToken", "version"))
                  .returnDocument(ReturnDocument.AFTER)
                  .upsert(true));
      return Optional.ofNullable(found)
          .map(it -> new ListenerLock(
              it.getNumber("version"),
              it.getDocument("resumeToken", null)));
    } catch (MongoCommandException e) {
      final ErrorCategory errorCategory = ErrorCategory.fromErrorCode(e.getErrorCode());
      if (errorCategory.equals(DUPLICATE_KEY)) {
        return Optional.empty();
      }
      throw e;
    }
  }
  public void commit(String resource, BsonDocument resumeToken) 
      throws LostLockException {
    UpdateResult result = collection
        .withWriteConcern(WriteConcern.MAJORITY)
        .updateOne(
            and(
                eq("_id", resource),
                eq("listenerId", listenerId)),
            combine(
                set("expiresAt", clock.instant().plus(leaseTime)),
                set("resumeToken", resumeToken)));
    if (result.getMatchedCount() == 0) {
      throw new LostLockException();
    }
  }
  private Bson lockIsExpired() {
    return or(
        eq("expiresAt", null),
        not(exists("expiresAt")),
        lte("expiresAt", clock.instant())
    );
  }
  
  private Document sameIfRefreshOtherwiseIncrement() {
    return new Document("$cond", new Document(ImmutableMap.of(
        "if", new Document("$ne", asList("$listenerId", listenerId)),
        "then", new Document("$ifNull", asList(
            new Document("$add", asList("$version", 1)),
            0)),
        "else", "$version")));
  }
Finally, if your change processing includes other side effects, like an external API call, that call must also participate in the lock. As described above, the API must accept the fencing token, and use it while processing the request.
Bootstrapping the change listener
If you go to use this, you may quickly find there is actually a simple race we haven't solved for yet. It starts at the beginning... the very beginning.
| Time | Process | Operation | 
|---|---|---|
| t1 | pN | First write to a watched document | 
| t2 | pN | First change listener starts. No resume token saved previously, so none used. Write in t1 will be missed! | 
| Time | Process | Operation | 
|---|---|---|
| t1 | p1 | Change listener starts. No resume token saved previously, so none used. | 
| t2 | pN | First write to a watched document | 
| t3 | p1 | Change listener crashes before processing t2's change. | 
| t4 | pN | New change listener starts. No resume token saved previously, so none used. Write in t2 will be missed! | 
Without a resume token, our listener is starting up simply by starting listening from now. However there is nothing that guarantees a write didn't happen before this, and we need a way to process these writes as well.
When there is no resume token, it basically means that we should start from the "beginning"–the first change. However, the furthest you can go back to resume a change stream is the oldest entry in the oplog (change streams are backed by the oplog). As of MongoDB 4.0, you can start a change stream from a timestamp, however this timestamp must be in the range of the oplog. We can't just say, "start from the oldest entry in the oplog, whatever that is." This makes it tricky.
The oplog is just a collection in MongoDB. Above I mentioned the old "tailing the oplog" technique; reading from the oplog is an old trick. In our case, what we need is quite simple, and not as fraught as trying to use the oplog for change data capture. We just need the oldest record's ts field value. We can retrieve that like so:
    MongoDatabase local = client.getDatabase("local");
    MongoCollection<Document> oplog = local.getCollection("oplog.rs");
    // Filter out "no-op" operations like replicaset initialization, which we cannot 
    // start from
    Document first = oplog.find(ne("op", "n"))
        .sort(Sorts.ascending("$natural"))
        .limit(1)
        .first();
    return Optional.ofNullable(first).map((op) -> op.get("ts", BsonTimestamp.class));When starting our listener, we already first reqire a lock. If we adjust this to also require either a resume token (resumeAfter(token)) or an oplog entry timestamp (startAtOperationTime(timestamp)) as shown above, we can avoid the race conditions shown in figures 4a and 4b. That is, it no longer matters when the change stream starts relative to the first write, so long as the change stream starts while that write is still within the oplog. Effectively, our listener must act a bit like another replicaset member, and so the oplog size must be large enough for it to "catch up" to "now." This is true regardless of whether we have a resume token or not to resume from.
Try it out!
At Red Hat, we're using a basic version of this algorithm in one of our services. I've published an evolved fork of the code on GitHub demonstrating the techniques and code in this post. I hope you find the code useful in your own work.Next steps
This works quite well if you're okay limiting an entire collection or database to a single thread. In Kafka terms, these are our "partitions." I am currently investigating how to implement intra-collection partitions.Summary
- MongoDB is a solid choice for workloads that require multi-region high availability, assuming your business logic is stable enough for you to feel comfortable defining isolated write boundaries up front (such as when using Domain Driven Design "aggregates").
- Where joins are needed, we'd like to denormalize instead of using aggregation, but multi-document transactions are new, slow, and complex, and we don't need strong isolation.
- Additionally, decoupled, asynchronous integration with peer services through event messages is an effective integration pattern.
- Change streams provide the necessary core abstraction to build transactional denormalization and messaging that MongoDB does not provide out of the box.
- To use change streams for these purposes reliably, we must use a lock, fencing token, and save our resume tokens after each change is processed.
- To bootstrap the change stream listener before we have a resume token saved, we must look up the oldest entry in the oplog.
- Check out roger, an open source prototype library which demonstrates these techniques.
Thanks for reading!
 
Comments