Joining multiple materialised views using KSQLDB
The example scenario is a retail banking loan solution where a customer initially undertakes a needs analysis interview via an online web form based application that identifies the right product for the customer.
The customer is then provided a quote for the loan including rates and fees with which they may choose to tailor the loan characteristics.
Finally in accepting the quote the customer submits a loan application that will trigger the accounts origination process. These three phases of the customer enquiry are handled by separate bounded contexts; Needs Analysis, Quoting and Loan Application. A stream from each bounded context feeds into another bounded context that oversees the customer’s loan enquiry, the Enquiry bounded context.
There are two serverless functions, one providing the means to create the customer enquiry and the other to retrieve the details of the customer’s enquiry. These functions integrate directly with KSQLDB and provide an API for a front end web view.

In the next example we’ll establish a materialised view (2) from a base materialised view and join two other materialised views of which one is built from multiple, merged streams.
Setting up the tables and streams
First create the streams from each of the Needs Analysis, Quoting and Loan Application bounded contexts. These will drive the resume point and status of the enquiry. The resume point tells the consuming web view where the customer has reached in the enquiry journey. This would allow the web view to then engage with the right bounded context if the customer stops and then later resumes their enquiry.
CREATE STREAM NeedsAnalysisInterviewStarted (enquiryId VARCHAR KEY, customerId VARCHAR) WITH (kafka_topic='NeedsAnalysisInteriewStartedTopic', value_format='json', partitions=3);
CREATE STREAM QuoteIssued (enquiryId VARCHAR KEY, customerId VARCHAR) WITH (kafka_topic='QuoteIssuedTopic', value_format='JSON', partitions=3);
CREATE STREAM LendingApplicationInterviewStarted (enquiryId VARCHAR KEY, customerId VARCHAR) WITH (kafka_topic='LendingApplicationInterviewStartedTopic', value_format='JSON', partitions=3);
CREATE STREAM LendingApplicationInterviewSubmitted(enquiryId VARCHAR KEY, customerId VARCHAR) WITH (kafka_topic='LendingApplicationInterviewSubmittedTopic', value_format='JSON', partitions=3);
Create the ResumePoint stream to capture the changes to the enquiry resume point.
CREATE STREAM ResumePoint (enquiryId VARCHAR KEY, resumePoint VARCHAR) with (kafka_topic='ResumePointTopic', value_format='JSON',partitions=3);
Merge the Needs Analysis, Quote and Loan Application bounded context streams into the Resume Point stream. These are continuous running queries deployed into KSQLDB.
INSERT INTO ResumePoint select enquiryId, 'INTERVIEW' as resumePoint FROM NeedsAnalysisInteriewStarted;
INSERT INTO ResumePoint select enquiryId, 'QUOTE' as resumePoint FROM QuoteIssued;
INSERT INTO ResumePoint select enquiryId, 'LOANAPPLY' as resumePoint FROM LendingApplicationInterviewStarted;
The ResumePointMV materialised view table utilises the LATEST_BY_OFFSET aggregation function to return the latest resume point value for an enquiry. This will be joined with the main customer enquiry materialised view table.
CREATE TABLE ResumePointMV as
select enquiryId,
LATEST_BY_OFFSET(resumepoint) as latestResumePoint
from ResumePoint
group by enquiryId
emit changes;
The CustomerEnquiryStatusUpdated stream holds the changes to the enquiry status.
CREATE STREAM CustomerEnquiryStatusUpdated (enquiryId VARCHAR KEY, enquiryStatus VARCHAR)
WITH (kafka_topic='CustomerEnquiryStatusUpdatedTopic', value_format='JSON', partitions=3);
The CustomerEnquiryStatusUpdatedMV materialised view table utilises the LATEST_BY_OFFSET aggregation function to return the latest status of the enquiry.
CREATE TABLE CustomerEnquiryStatusUpdatedMV as
select enquiryId,
LATEST_BY_OFFSET(enquiryStatus) AS latestEnquiryStatus
FROM CustomerEnquiryStatusUpdated
GROUP BY enquiryId
emit changes;
The CustomerEnquiryCreated stream holds the event that represents the creation of an enquiry. The Create Customer Enquiry Function is responsible for generating a unique enquiry identifier that will be passed into the event. There is only ever one entry for an enquiryId however a customer may have multiple enquiries.
CREATE STREAM CustomerEnquiryCreated (enquiryId VARCHAR KEY, customerId VARCHAR, enquiryType VARCHAR, enquiryChannel VARCHAR)
WITH (kafka_topic='CustomerEnquiryCreatedTopic', value_format='JSON', partitions=3);
The CustomerEnquiryBaseMV materialised view table utilises the LATEST_BY_OFFSET aggregation function across the customerId, enquiryType and enquiryChannel columns.
CREATE TABLE CustomerEnquiryBaseMV AS
SELECT enquiryId,
LATEST_BY_OFFSET(customerId) AS customerId,
LATEST_BY_OFFSET(enquiryType) AS enquiryType,
LATEST_BY_OFFSET(enquiryChannel) AS enquiryChannel
FROM CustomerEnquiryCreated
GROUP BY enquiryId
EMIT CHANGES;
The CustomerEnquiryMV materialised view (table) joins the resumePoint from the ResumePointMV materialised view (table) and the enquiryStatus from the CustomerEnquiryStatusUpdatedMV materialised view (table).
CREATE TABLE CustomerEnquiryMV AS
SELECT c.enquiryId AS enquiryId, c.customerId, c.enquiryType, c.enquiryChannel, r.latestResumePoint, s.latestEnquiryStatus
FROM CustomerEnquiryBaseMV c
LEFT JOIN ResumePointMV r ON c.enquiryId = r.enquiryId
LEFT JOIN CustomerEnquiryStatusUpdatedMV s ON c.enquiryId = s.enquiryId
EMIT CHANGES;
Establish running query that will merge the CustomerEnquiryCreated stream into the CustomerEnquiryStatusUpdated stream, however the status is set to ACTIVE directly just on an event for that enquiryId being added to the CustomerEnquiryCreated stream.
INSERT INTO CustomerEnquiryStatusUpdated
select enquiryId, 'ACTIVE' AS enquiryStatus
FROM CustomerEnquiryCreated;
Similarly the LendingApplicationInterviewSubmitted stream is merged into the CustomerEnquiryStatusUpdated stream. The status is set to COMPLETED if an event for that enquiryId is published to the LendingApplicationInterviewSubmitted stream
INSERT INTO CustomerEnquiryStatusUpdated
select enquiryId, 'COMPLETED' AS enquiryStatus
FROM LendingApplicationInterviewSubmitted;
Establish running query that will merge in the HOME resume point when an enquiry is created.
INSERT INTO ResumePoint
select enquiryId, 'HOME' AS resumePoint
FROM CustomerEnquiryCreated;
Running the example
Create a new customer enquiry.
INSERT INTO CustomerEnquiryCreated (enquiryId, customerId, enquiryType, enquiryChannel)
VALUES ('33050','1001','LOAN','ONLINEWEB');
Confirm the enquiry has been added.
SELECT * FROM CustomerEnquiryMV;
+----------+-----------+------------+--------------+------------------+--------------------+
|ENQUIRYID |CUSTOMERID |ENQUIRYTYPE |ENQUIRYCHANNEL|LATESTRESUMEPOINT |LATESTENQUIRYSTATUS |
+----------+-----------+------------+--------------+------------------+--------------------+
|33050 |1001 |LOAN |ONLINEWEB |HOME |ACTIVE |
Note that the LATESTRESUMEPOINT column is set to HOME and the LATESTENQUIRYSTATUS is set to ACTIVE due to the multiple stream joins for the materialised view. The CustomerEnquiryCreated stream join resulted in the initial status set to ACTIVE and also the initial resume point being set to HOME.
The customer begins with the Needs Analysis phase and the Needs Analysis bounded context publishes an event to the NeedsAnalysisInterviewStarted stream to indicate the interview started.
After the digital interview has been started the started event will drive the resume point to INTERVIEW.
INSERT INTO NeedsAnalysisInterviewStarted (enquiryId, customerid)
VALUES ('33050','1001');
Confirm the LATESTRESUMEPOINT is set to INTERVIEW.
SELECT * FROM CustomerEnquiryMV;
+----------+-----------+------------+--------------+------------------+--------------------+
|ENQUIRYID |CUSTOMERID |ENQUIRYTYPE |ENQUIRYCHANNEL|LATESTRESUMEPOINT |LATESTENQUIRYSTATUS |
+----------+-----------+------------+--------------+------------------+--------------------+
|33050 |1001 |LOAN |ONLINEWEB |INTERVIEW |ACTIVE |
Now the customer enters the quote phase.
INSERT INTO QuoteIssued (enquiryId, customerid) VALUES ('33050','1001');
Confirm the LATESTRESUMEPOINT is set to QUOTE.
SELECT * FROM CustomerEnquiryMV;
+----------+-----------+------------+--------------+-----------------+--------------------+
|ENQUIRYID |CUSTOMERID |ENQUIRYTYPE |ENQUIRYCHANNEL|LATESTRESUMEPOINT|LATESTENQUIRYSTATUS |
+----------+-----------+------------+--------------+-----------------+--------------------+
|33050 |1001 |LOAN |ONLINEWEB |QUOTE |ACTIVE |
Finally the customer enters the loan application phase.
INSERT INTO LendingApplicationInterviewStarted (enquiryId, customerid)
VALUES ('33050','1001');
Confirm the LATESTRESUMEPOINT is set to QUOTE.
SELECT * FROM CustomerEnquiryMV;
+----------+----------+------------+--------------+------------------+--------------------+
|ENQUIRYID |CUSTOMERID|ENQUIRYTYPE |ENQUIRYCHANNEL|LATESTRESUMEPOINT |LATESTENQUIRYSTATUS |
+----------+----------+------------+--------------+------------------+--------------------+
|33050 |1001 |LOAN |ONLINEWEB |QUOTE |ACTIVE |
If you query is run immediately after the insert the LATESTRESUMEPOINT may still show as QUOTE as above. This is due to the eventual consistency model underpinning table to table joins in KSQLDB (1).
Running the query again after a couple of seconds returns the LATESTRESUMEPOINT set to LOANAPPLY.
SELECT * FROM CustomerEnquiryMV;
+----------+-----------+------------+--------------+------------------+--------------------+
|ENQUIRYID |CUSTOMERID |ENQUIRYTYPE |ENQUIRYCHANNEL|LATESTRESUMEPOINT |LATESTENQUIRYSTATUS |
+----------+-----------+------------+--------------+------------------+--------------------+
|33050 |1001 |LOAN |ONLINEWEB |LOANAPPLY |ACTIVE |
Finally when the loan application has completed the enquiry status will be set to COMPLETED.
INSERT INTO LendingApplicationInterviewSubmitted (enquiryId, customerid)
VALUES ('33050','1001');
Confirm the LATESTENQUIRYSTATUS is set to COMPLETED.
SELECT * FROM CustomerEnquiryMV;
+----------+-----------+------------+----------------+-----------------+--------------------+
|ENQUIRYID |CUSTOMERID |ENQUIRYTYPE |ENQUIRYCHANNEL |LATESTRESUMEPOINT|LATESTENQUIRYSTATUS |
+----------+-----------+------------+----------------+-----------------+--------------------+
|33050 |1001 |LOAN |ONLINEWEB |LOANAPPLY |COMPLETED |
References
- Joining collections, ksqlDB Documentation, Confluent
- Materialised cache, ksqlDB Documentation, Confluent