Streaming Integration Events to Graph Capture-Data-Change (CDC) Events

Transforming integration event context to Graph CDC events

Our Domains

Let’s assume that our fictional company has two domains: Orders and Analytics.

The Problem

The Analytics Domain cannot consume the Orders Domain integration events directly for the following reason:

  1. The model used in the Orders Domain is different.
  2. Orders mutations are not apparent.

Solution 1: Stateless flatmap

In this solution, we deploy a stateless Kafka Streams application that does the following:

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
inputStream
.flatmap(graphMapper)
.to("graph-events", Produced.with(stringSerde, graphSerde))

Solution 1: Limitations

Solution 1 solves the first problem of translating the events from one domain to another. But it the Analytics DB quickly becomes out of sync since it doesn’t handle mutations on orders.

Solution 2: Stateful flatmap

To solve the nonobvious way of sending mutations, we need a stateful store in the middle to be able to do diff operations. Furthermore, this stateful store can be limited by a time window (e.g., the maximum amount of time customers are allowed to change or cancel their order).

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
final StoreBuilder<WindowStore<String, Order>> ordersStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
ordersStoreName, retentionPeriod, windowSize, false),
stringSerde,
ordersSerde);
final KStream<String, OrderWithSate> ordersWithState =
inputStream
.transform(
new OrderDiffTransformerSupplier(
ordersStoreName,
windowSize.toMillis(),
ordersStoreBuilder))
ordersWithState
.flatmap(graphCDCMapper)
.to(
"graph-cdc-events",
Produced.with(stringSerde, graphCDCSerde))

Solution 2: limitations

Let’s assume that the order department exceptionally changed the time window to cancel a given customer’s order. For example, a certain product recall required canceling a set of orders. We need to have observability into any skipped records

Solution 3: Stateful flatmap with Dead-Letter Queue

When any processing error happens in our pipeline, we need to be able to observe it and react to it. In the streaming world, that’s often handled by Dead-Letter Queues (DLQ) .

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
final StoreBuilder<WindowStore<String, Order>> ordersStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
ordersStoreName, retentionPeriod, windowSize, false),
stringSerde,
ordersSerde);
final KStream<String, DLQRecord> ordersWithDLQ =
inputStream
.transform(
new OrderDiffTransformerSupplier(
ordersStoreName,
windowSize.toMillis(),
ordersStoreBuilder))
// Split stream based on whether an error was observed
.branch(
(key, value) -> value.getIsError(),
(key, value) -> !value.getIsError());
// Output to Dead Letter Queue topic
ordersWithDLQ[0].to("dlq", Produced.with(stringSerde, dlqSerde));
// Intermediate transformation for data type
final KStream<String, OrderWithState> orderStateStream =
ordersWithDLQ[1].map(
(key, value) ->
new KeyValue<>(key, value.getOrderWithState()));
ordersWithState
.flatmap(graphCDCMapper)
.to(
"graph-cdc-events",
Produced.with(stringSerde, graphCDCSerde))

Solution 3: limitations

As with any software, any of the intermediate transformations could have a bug that would require us to redeploy the KafkaStreams application and reprocess all events from the beginning. In this case, the CDC graph-cdc-events topic cannot be safely used as a source of truth since it will contain messages produces by multiple versions of the transformation (some of which are buggy).

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store