Using aggregation functions in KSQLDB materialised view
An event stream contains changes over time, like an accounting journal — individual events represent a recorded change. For example, a customer granting consent for their data to be shared as with Open Banking. A customer can choose to give consent for sharing their data by providing consent to specific data sets.
In this example we’ll use two data sets or scopes; the customer’s list of accounts (AccountList) and the customers profile information (CustomerProfile). The customer can either grant or decline consent at any time. A customer may choose to initially grant consent but then decline consent at later time. Whilst the latest customer consent selection could be stored as a single value in a database the value would typically be overridden with each change the customer makes, losing the historical sequence of changes. By using a stream the history of consent changes is captured and may be used to derive other perspectives and insights.
To determine the final state of a customer’s consent for a scope a materialised view (1) can be created from the stream. A materialised view can be created as a table in KSQLDB that utilises an aggregation function that may be applied to an attribute (2). A scalar function can be used to perform a calculation across the values from each event. In this example, the desired outcome is to determine the final or latest state of the customer’s consent for a specific scope. This can be determined using the LATEST_BY_OFFSET aggregation function. It returns the latest value of the attribute (column) in the underlying stream.
First, create the consent stream:
// Instruct KSQLDB to begin all queries from the earliest point in the topic.
SET 'auto.offset.reset' = 'earliest';
// Create the stream
create stream DataSharingConsentStream (customerId STRING KEY, scope STRING KEY, consent STRING) WITH (kafka_topic='DataSharingConsentTopic', key_format='DELIMITED', partitions=1, value_format='JSON');
Add sample values to the stream that includes updates from multiple customers and multiple updates from an individual customer.
insert into DataSharingConsentStream (customerId, scope, consent) values ('1001','CustomerProfile','GRANTED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1001','AccountList','GRANTED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1001','AccountList','DECLINED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1002','CustomerProfile','GRANTED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1003','CustomerProfile','GRANTED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1003','AccountList','DECLINED');
insert into DataSharingConsentStream (customerId, scope, consent) values ('1003','AccountList','GRANTED');
Confirm the data has been added by running the following query. The query returns each change the customer has made to a scope’s consent. Customer 1001 initially granted consent to AccountList but then declined it later.
SET CLI COLUMN-WIDTH 20;
ksql> select * from DataSharingConsentStream;
+-------------------------+-------------------------+-------------------------+
|CUSTOMERID |SCOPE |CONSENT |
+-------------------------+-------------------------+-------------------------+
|1001 |CustomerProfile |GRANTED |
|1001 |AccountList |GRANTED |
|1001 |AccountList |DECLINED |
|1002 |CustomerProfile |GRANTED |
|1003 |CustomerProfile |GRANTED |
|1003 |AccountList |DECLINED |
|1003 |AccountList |GRANTED |
Create the materialised view that will provide the latest consent for each customers’ scope. The “LATEST_BY_OFFSET” aggregation function will result in only the latest consent to be returned for the customers’ scope.
create table DataSharingConsentMV as
select customerId, scope,
LATEST_BY_OFFSET(consent) as latestConsent
FROM DataSharingConsentStream
group by customerId, scope
emit changes;
Run a query on the materialised view to show the consent for all customers. Only the latest consent for each customer’s scope is returned.
select * from DataSharingConsentMV;
+-------------------------+-------------------------+-------------------------+
|CUSTOMERID |SCOPE |LATESTCONSENT |
+-------------------------+-------------------------+-------------------------+
|1001 |AccountList |DECLINED |
|1001 |CustomerProfile |GRANTED |
|1002 |CustomerProfile |GRANTED |
|1003 |AccountList |GRANTED |
|1003 |CustomerProfile |GRANTED |
Run a query for a specific customer’s scope.
select * from DataSharingConsentMV where customerId='1001' and scope='AccountList';
+-------------------------+-------------------------+-------------------------+
|CUSTOMERID |SCOPE |LATESTCONSENT |
+-------------------------+-------------------------+-------------------------+
|1001 |AccountList |DECLINED |
References
- Materialised cache, ksqlDB Documentation, Confluent
- Aggregation functions, ksqlDB Documentation, Confluent