Challenges with ksqlDB non-joined events (Part 1)
Introduction
Change Data Capture (CDC) is often used as a relatively straight forward way of publishing events to Kafka based on changes to an application’s database. The events published reflect the internal table schemas and may prove challenging to process them, forming meaningful business events.
In this article, we’ll delve into a banking scenario where an account processing system utilises a database to store account details across two tables. With the first approach we’ll attempt to use ksqlDB stream to stream joins.
ksqlDB Stream to Stream Joins
ksqlDB can be used to join two streams that forms a single stream with combined data. The join may be established against matching values over a time interval. There is a scenario where a stream may not require an event to be matched with another event in the other stream. This is referred to as an non-joined event. In this section we will illustrate this scenario to observe ksqlDB’s behaviour.
The example will include two streams representing events related to banking accounts. Specifically this would be driven by CDC publishing events from changes to two tables containing account information. This internal schema may suit its application but proves to be challenging when attempting to derive business events from changes to the tables.
The objective is to publish account business events to a single stream reflecting account creation and account update events in strict order.

The table changes are published to their associated ksqlDB streams. When an account is created an insert is made to both tables, publishing an event to its associated stream. However, updates will vary in that the Account Info could be updated alone, or the Account Fraud could be updated alone, publishing single evens in either stream.
Two streams are created representing the feeds from CDC. The join condition will use the account identifier to match the events but we also need the CDC ‘operations’ to match as well. That is an event representing an insert operation in stream ‘account_info’ must be matched to an insert event in stream ‘fraud_detection_info’ and not an update event.
As ksqlDB does not support multiple join conditions the stream key is set to the account Id and forms the join condition whilst the WHERE clause will be used to filter on only matching operations.
CREATE STREAM account_info
(
accountId STRING KEY,
operation STRING,
productCode STRING,
balance DECIMAL(12, 2),
changeDateTime TIMESTAMP
)
WITH (
KAFKA_TOPIC='account_info_topic',
KEY_FORMAT='JSON',
VALUE_FORMAT='JSON',
PARTITIONS=3,
TIMESTAMP='changeDateTime'
);
CREATE STREAM account_fraud_info
(
accountId STRING KEY,
operation STRING,
fraudStatus STRING,
accountRestriction STRING,
changeDateTime TIMESTAMP
)
WITH (
KAFKA_TOPIC='fraud_info_topic',
KEY_FORMAT='JSON',
VALUE_FORMAT='JSON',
PARTITIONS=3,
TIMESTAMP='changeDateTime'
);
The streams are joined using ksqlDB’s CREATE STREAM AS SELECT statement (1). CDC includes an operation field in the metadata it passes along with events it generates. The operation field indicates either an insert or update in the table. When matching events for the join, both the account identifier and the operation must match.
The statement uses a FULL OUTER JOIN to allow for either left-side or right-side nulls for non-joined events. That is, when an event is published to a stream where there is no joined event in the other stream it will be published to the joined stream with null values for the other side.
In addition we include a GRACE period that will alter the behaviour of the OUTER join by preventing non-joined events from being eagerly emitted, that is emitted upon initially receiving the events rather than waiting for the join window to complete.(2)
CREATE STREAM accounts WITH (KAFKA_TOPIC='_accounts') AS
SELECT
ROWKEY as joinKey,
COALESCE(a.accountId, f.accountId) as accountId,
a.productCode,
a.operation as a_operation,
a.balance,
f.fraudStatus,
f.accountRestriction,
f.operation as f_operation,
FORMAT_TIMESTAMP(FROM_UNIXTIME(a.rowtime), 'yyyy-MM-dd HH:mm:ss') AS accountEventTime,
FORMAT_TIMESTAMP(FROM_UNIXTIME(f.rowtime), 'yyyy-MM-dd HH:mm:ss') AS fraudEventTime
FROM account_info a
FULL OUTER JOIN
account_fraud_info f
WITHIN 30 SECONDS GRACE PERIOD 0 SECONDS
ON a.accountId = f.accountId
WHERE (a.operation IS NULL OR f.operation IS NULL OR a.operation = f.operation);
Open ksqlDB CLI to observe the streams:
SET 'auto.offset.reset' = 'earliest';
SELECT * FROM accounts EMIT CHANGES;
The example scenario will follow the timeline in the image below. This sequence includes non-joined events in both the account info stream and account fraud stream in addition to joined events across both streams.

Insert the initial record for account 8001 that results in a join between the account info and fraud details.
INSERT INTO account_info (accountId, operation, productCode, balance, changeDateTime)
VALUES ('8001', 'insert','10', 1000.00, '2023-08-20T09:00:04');
INSERT INTO fraud_detection_info (accountId, operation, fraudStatus, accountRestriction, changeDateTime)
VALUES ('8001', 'insert','CLEARED', 'NONE', '2023-08-20T09:00:05');

The next record for account 8002 is inserted and that results in a join between the account info and fraud details as well.

Subsequent inserts are made for updates to accounts 8003, 8004 and 8005 however none result in non-joined events being published to the joined stream.
INSERT INTO account_info (accountId, operation, productCode, balance, changeDateTime)
VALUES ('8003', 'update', '12', 1500.00, '2023-08-20T09:00:45');
INSERT INTO fraud_detection_info (accountId, operation, fraudStatus, accountRestriction, changeDateTime)
VALUES ('8004', 'update', 'REVIEW', 'FROZEN','2023-08-20T09:01:50');
INSERT INTO account_info (accountId, operation, productCode, balance, changeDateTime)
VALUES ('8005', 'update', '10', 1500.00, '2023-08-20T09:01:55');
When an update event for account 8006 is inserted a non-joined event for account 8003 is published.
INSERT INTO fraud_detection_info (accountId, operation, fraudStatus, accountRestriction, changeDateTime)
VALUES ('8006', 'update','REVIEW', 'FROZEN','2023-08-20T09:02:50');

The event for account 8003 comprises a right-side null, that is the fraud information does not exist and so all fraud attributes are set to null.
Whilst it is acceptable to receive null values for those attributes the issue is the event was “held” until the update event for account 8006 was inserted. Even though the updates for account 8004 and 8005 occur outside of account 8003’s update event window at 09:00:45 they were published to separate partitions.
It appears that ksqlDBs window processing is applied to the same partition. Since account 8006 is published to the same Partition that account 8003 resides in then that essentially unsticks account 8003’s update event.
A final event is inserted for an update to account 8002 at 09:03:30. This event occurs well after the account 8006 update.
INSERT INTO account_info (accountId, operation, productCode, balance, changeDateTime)
VALUES ('8002', 'update', '12', 1500.00, '2023-08-20T09:03:30');
The result is the previous updates for accounts 8004 and 8005 are unstuck given they are on the same partition as the account 8002 update.

This sequence illustrates two key aspects stream-stream FULL OUTER joins in relation to non-joined events:
- Non-joined events are only published after the window has completed, as expected by including the GRACE period and preventing eagerly published events (2).
- Non-joined events are only published when subsequent events are published into their respective streams on the same partition. This is due to our use of event-time semantics (3) (4) where ksqlDB relies on the timestamps within the events to determine the window join behaviour and so events won’t be automatically published by ksqlDB. If there are no other events for some time then the non-joined event won’t be published into the joined stream for that time.
Materialised Views
In the second part of this article series we’ll explore an alternative approach that leverages materialised views in creating a stream with the targeted business events. Hopefully leveraging a materialised views push capability we can have non-joined events published without other events needing to unstick them. We’ll add in support for joining some data from other sources for bonus points.
References
(1) Stream Joins, ksqlDB Documentation
(2) ksqlDB Grace Period Announcement
(4) Time operations