Streaming Integration Events to Graph Capture-Data-Change (CDC) Events

In a Domain-Driven Design (DDD), one domain could send integration events for further processing in another system. However, the different domains could model entities differently, and we need a translation layer in between to keep a consistent mapping between the two domains.

In this blog post, we show one way of converting an integration event to a series of Capture-Data-Change (CDC) events that are easier to consume by a Graph DB.The code associated with this Blog is published on GitHub: https://github.com/ahassany/tutorials/tree/main/consistent-graph.

Transforming integration event context to Graph CDC events

Prerequisites:

This blog post is very hands-on, rapidly exploring different solutions and their limitations. I assume deep knowledge of building streaming applications. Particularly:

Our Domains

Let’s assume that our fictional company has two domains: Orders and Analytics.

The Orders Domain handles everything related to customers’ orders. The Orders Domain sends out order events. Every single event contains the complete information about the order: order ID, customer info, the ordered items, and their prices, etc.… The customer is still free to return items or cancel the whole order within a specified timeframe (let’s say 30 days). When a customer changes an order or cancels it, the Orders Domain sends another integration even with the new order information. However, the order domain does not indicate what changed, just the latest state of the order. When an order is deleted, the Order Domain sends a tombstone message (with Key and null value).

While the Analytics Domain post-process these orders for more insights about the orders. The analytics team, for various reasons, decided to use a Graph DB to do their analysis. In their DB, each order is represented as a node, so each individual ordered item and each customer. These nodes a linked with relations such as HAS_ITEM and ORDERED_BY. Additionally, the analytics team is expected to keep an eventually consistent state with the Orders Domain, i.e., handling deletes and updates from the Orders Domain.

All events are streamed using Kafka.

The Problem

The Analytics Domain cannot consume the Orders Domain integration events directly for the following reason:

  1. The model used in the Orders Domain is different.
  2. Orders mutations are not apparent.

Solution 1: Stateless flatmap

In this solution, we deploy a stateless Kafka Streams application that does the following:

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
inputStream
.flatmap(graphMapper)
.to("graph-events", Produced.with(stringSerde, graphSerde))

In this case, we define a graphMapper as a function that accepts one Order Message and outputs a list of nodes and relations corresponding to the entities in the Analytics domain. The Graph DB consumes those events directly off the Kafka topic graph-events and inserts them into the DB.

Solution 1 solves the first problem of translating the events from one domain to another. But it the Analytics DB quickly becomes out of sync since it doesn’t handle mutations on orders.

Solution 2: Stateful flatmap

To solve the nonobvious way of sending mutations, we need a stateful store in the middle to be able to do diff operations. Furthermore, this stateful store can be limited by a time window (e.g., the maximum amount of time customers are allowed to change or cancel their order).

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
final StoreBuilder<WindowStore<String, Order>> ordersStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
ordersStoreName, retentionPeriod, windowSize, false),
stringSerde,
ordersSerde);
final KStream<String, OrderWithSate> ordersWithState =
inputStream
.transform(
new OrderDiffTransformerSupplier(
ordersStoreName,
windowSize.toMillis(),
ordersStoreBuilder))
ordersWithState
.flatmap(graphCDCMapper)
.to(
"graph-cdc-events",
Produced.with(stringSerde, graphCDCSerde))

Here, we improved our solution by adding OrderDiffTransformerand a WindowStore. The order diff is very simple, it store the latest order info in the windowed store. If any mutation happens on the order, it will do a diff and figure out what was added/removed. Then we output the results to an intermediate stream ordersWithState that annotates each order and order sub-entity (ordered item, customer, etc...) with its state.

This allows the flatmap function from the Orders Domain to the Analytics Domain to output full CDC events. These events can be consumed directly by Neo4j DB.

Let’s assume that the order department exceptionally changed the time window to cancel a given customer’s order. For example, a certain product recall required canceling a set of orders. We need to have observability into any skipped records

Solution 3: Stateful flatmap with Dead-Letter Queue

When any processing error happens in our pipeline, we need to be able to observe it and react to it. In the streaming world, that’s often handled by Dead-Letter Queues (DLQ) .

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
final StoreBuilder<WindowStore<String, Order>> ordersStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
ordersStoreName, retentionPeriod, windowSize, false),
stringSerde,
ordersSerde);
final KStream<String, DLQRecord> ordersWithDLQ =
inputStream
.transform(
new OrderDiffTransformerSupplier(
ordersStoreName,
windowSize.toMillis(),
ordersStoreBuilder))
// Split stream based on whether an error was observed
.branch(
(key, value) -> value.getIsError(),
(key, value) -> !value.getIsError());
// Output to Dead Letter Queue topic
ordersWithDLQ[0].to("dlq", Produced.with(stringSerde, dlqSerde));
// Intermediate transformation for data type
final KStream<String, OrderWithState> orderStateStream =
ordersWithDLQ[1].map(
(key, value) ->
new KeyValue<>(key, value.getOrderWithState()));
ordersWithState
.flatmap(graphCDCMapper)
.to(
"graph-cdc-events",
Produced.with(stringSerde, graphCDCSerde))

Here we encapsulate the output of the transformation into DLQRecord. First, the DLQRecord record passes the information on whether the transformation encountered any errors, and if not, the OrdersWithState (as in Solution 2) is outputed. Then we filter on isError. Good records go to the next stage, while erroneous ones go to the DLQ.

As with any software, any of the intermediate transformations could have a bug that would require us to redeploy the KafkaStreams application and reprocess all events from the beginning. In this case, the CDC graph-cdc-events topic cannot be safely used as a source of truth since it will contain messages produces by multiple versions of the transformation (some of which are buggy).

We can either always recreate the graph-cdc-events topics every time we change the transformation to overcome those limitations. Or even better, tag each change with the version of the transformation. Such that on the ingestion side, we can filter only the messages with the desired version.

Just another nomad in love with technology