Streaming Integration Events to Graph Capture-Data-Change (CDC) Events

Transforming integration event context to Graph CDC events

Our Domains

The Problem

  1. The model used in the Orders Domain is different.
  2. Orders mutations are not apparent.

Solution 1: Stateless flatmap

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
inputStream
.flatmap(graphMapper)
.to("graph-events", Produced.with(stringSerde, graphSerde))

Solution 1: Limitations

Solution 2: Stateful flatmap

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
final StoreBuilder<WindowStore<String, Order>> ordersStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
ordersStoreName, retentionPeriod, windowSize, false),
stringSerde,
ordersSerde);
final KStream<String, OrderWithSate> ordersWithState =
inputStream
.transform(
new OrderDiffTransformerSupplier(
ordersStoreName,
windowSize.toMillis(),
ordersStoreBuilder))
ordersWithState
.flatmap(graphCDCMapper)
.to(
"graph-cdc-events",
Produced.with(stringSerde, graphCDCSerde))

Solution 2: limitations

Solution 3: Stateful flatmap with Dead-Letter Queue

final KStream<String, Order> inputStream =
builder.stream(
"orders",
Consumed.with(stringSerde, ordersSerde));
final StoreBuilder<WindowStore<String, Order>> ordersStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
ordersStoreName, retentionPeriod, windowSize, false),
stringSerde,
ordersSerde);
final KStream<String, DLQRecord> ordersWithDLQ =
inputStream
.transform(
new OrderDiffTransformerSupplier(
ordersStoreName,
windowSize.toMillis(),
ordersStoreBuilder))
// Split stream based on whether an error was observed
.branch(
(key, value) -> value.getIsError(),
(key, value) -> !value.getIsError());
// Output to Dead Letter Queue topic
ordersWithDLQ[0].to("dlq", Produced.with(stringSerde, dlqSerde));
// Intermediate transformation for data type
final KStream<String, OrderWithState> orderStateStream =
ordersWithDLQ[1].map(
(key, value) ->
new KeyValue<>(key, value.getOrderWithState()));
ordersWithState
.flatmap(graphCDCMapper)
.to(
"graph-cdc-events",
Produced.with(stringSerde, graphCDCSerde))

Solution 3: limitations

--

--

--

Just another nomad in love with technology

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

UK’s largest poultry supplier automates with RPA and Low-code

Automating Swords & Souls Training — Part 3

A Tour of Google Cloud

Payments in Flutter with Razorpay Payment Gateway

Create Searchable Audio Using Python

A woman listening to audio recordings

OpenBSD 6.4: Installing a Seriously Underrated OS in a Virtual Machine

openbsd desktop wallpaper with puffy logo

The Grid Fill Problem

10 Best Free Full stack Java development courses for Beginners and Experienced Programmers in 2022

10 Best Free Full stack Java development courses for Beginners and Experienced Programmers

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Ahmed Elhassany

Ahmed Elhassany

Just another nomad in love with technology

More from Medium

Real-Time Analytics on Kinesis Event Streams Using Rockset, Druid, Elasticsearch and Redshift

Real-Time Analytics on Kinesis Event Streams

Analyze Bank Transaction Data using Graph (Part 3/3)

Sailing through Kafka Streams

Build Serverless Streaming Architectures with Upstash Kafka