Using aggregation functions in KSQLDB materialised view

Tony Kambourakis
3 min readDec 16, 2022

--

An event stream contains changes over time, like an accounting journal — individual events represent a recorded change. For example, a customer granting consent for their data to be shared as with Open Banking. A customer can choose to give consent for sharing their data by providing consent to specific data sets.

In this example we’ll use two data sets or scopes; the customer’s list of accounts (AccountList) and the customers profile information (CustomerProfile). The customer can either grant or decline consent at any time. A customer may choose to initially grant consent but then decline consent at later time. Whilst the latest customer consent selection could be stored as a single value in a database the value would typically be overridden with each change the customer makes, losing the historical sequence of changes. By using a stream the history of consent changes is captured and may be used to derive other perspectives and insights.

To determine the final state of a customer’s consent for a scope a materialised view (1) can be created from the stream. A materialised view can be created as a table in KSQLDB that utilises an aggregation function that may be applied to an attribute (2). A scalar function can be used to perform a calculation across the values from each event. In this example, the desired outcome is to determine the final or latest state of the customer’s consent for a specific scope. This can be determined using the LATEST_BY_OFFSET aggregation function. It returns the latest value of the attribute (column) in the underlying stream.

First, create the consent stream:

// Instruct KSQLDB to begin all queries from the earliest point in the topic.

SET 'auto.offset.reset' = 'earliest';

// Create the stream

create stream DataSharingConsentStream (customerId STRING KEY, scope STRING KEY, consent STRING) WITH (kafka_topic='DataSharingConsentTopic', key_format='DELIMITED', partitions=1, value_format='JSON');

Add sample values to the stream that includes updates from multiple customers and multiple updates from an individual customer.

insert into DataSharingConsentStream (customerId, scope, consent) values ('1001','CustomerProfile','GRANTED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1001','AccountList','GRANTED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1001','AccountList','DECLINED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1002','CustomerProfile','GRANTED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1003','CustomerProfile','GRANTED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1003','AccountList','DECLINED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1003','AccountList','GRANTED');

Confirm the data has been added by running the following query. The query returns each change the customer has made to a scope’s consent. Customer 1001 initially granted consent to AccountList but then declined it later.

SET CLI COLUMN-WIDTH 20;

ksql> select * from DataSharingConsentStream;
+-------------------------+-------------------------+-------------------------+
|CUSTOMERID |SCOPE |CONSENT |
+-------------------------+-------------------------+-------------------------+
|1001 |CustomerProfile |GRANTED |
|1001 |AccountList |GRANTED |
|1001 |AccountList |DECLINED |
|1002 |CustomerProfile |GRANTED |
|1003 |CustomerProfile |GRANTED |
|1003 |AccountList |DECLINED |
|1003 |AccountList |GRANTED |

Create the materialised view that will provide the latest consent for each customers’ scope. The “LATEST_BY_OFFSET” aggregation function will result in only the latest consent to be returned for the customers’ scope.

create table DataSharingConsentMV as 
select customerId, scope,
LATEST_BY_OFFSET(consent) as latestConsent
FROM DataSharingConsentStream
group by customerId, scope
emit changes;

Run a query on the materialised view to show the consent for all customers. Only the latest consent for each customer’s scope is returned.

select * from DataSharingConsentMV;
+-------------------------+-------------------------+-------------------------+
|CUSTOMERID |SCOPE |LATESTCONSENT |
+-------------------------+-------------------------+-------------------------+
|1001 |AccountList |DECLINED |
|1001 |CustomerProfile |GRANTED |
|1002 |CustomerProfile |GRANTED |
|1003 |AccountList |GRANTED |
|1003 |CustomerProfile |GRANTED |

Run a query for a specific customer’s scope.

select * from DataSharingConsentMV where customerId='1001' and scope='AccountList';
+-------------------------+-------------------------+-------------------------+
|CUSTOMERID |SCOPE |LATESTCONSENT |
+-------------------------+-------------------------+-------------------------+
|1001 |AccountList |DECLINED |

References

  1. Materialised cache, ksqlDB Documentation, Confluent
  2. Aggregation functions, ksqlDB Documentation, Confluent

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Tony Kambourakis
Tony Kambourakis

Written by Tony Kambourakis

IT Architect at IBM. Obsessed with space, animation, visual effects & shiny Apple toys. Dabble with iOS dev & hug my tech gadgets every day. Views are my own.

No responses yet

Write a response