Click to copy

• Reviewed for ksqlDB 0.29

How to Remove Field from Stream in ksqlDB

In the use case below, we have a flights stream like the one below and we want to drop the field departed_at from it:

CREATE STREAM flights (
  flight_id    INT KEY,
  from_country STRING,
  departed_at  TIMESTAMP
) WITH (
  KAFKA_TOPIC  = 'flights',
  PARTITIONS   = 10,
  KEY_FORMAT   = 'KAFKA',
  VALUE_FORMAT = 'JSON'
);

Attempt #1: create or replace - error

You will get the create or replace error if you try to remove a field by running CREATE OR REPLACE with the new schema as it would be a non-backward compatible change:

Cannot upgrade data source: DataSource 'FLIGHTS' has schema = FLIGHT_ID INTEGER KEY, FROM_COUNTRY STRING, DEPARTED_AT TIMESTAMP which is not upgradeable to FLIGHT_ID INTEGER KEY, FROM_COUNTRY STRING. (The following columns are changed, missing or reordered: [DEPARTED_AT TIMESTAMP])

Attempt #2: drop and create - success

To successfully remove the field, you will first need to drop the stream without deleting the underlying topic (note that we don’t use the DELETE TOPIC modifier):

DROP STREAM flights;

Then, recreate the stream with the new schema:

CREATE STREAM flights (
  flight_id    INT KEY,
  from_country STRING -- without departed_at
) WITH (
  KAFKA_TOPIC  = 'flights',
  PARTITIONS   = 10,
  KEY_FORMAT   = 'KAFKA',
  VALUE_FORMAT = 'JSON'
);
INTERESTING FACT - The underlying topic would still contain the data for the dropped column, but it won’t be visible or accessible by the users of that stream.
Discover what readers are saying
topictale
Get easy to digest how-tos on ksqlDB
Sign up
Please read our Privacy Policy to understand how we protect and manage your data.
You may also like