Click to copy
Streams • Reviewed for ksqlDB 0.29
How to Remove Field from Stream in ksqlDB
In the use case below, we have a flights
stream like the one below and we want to drop the field departed_at
from it:
CREATE STREAM flights (
flight_id INT KEY,
from_country STRING,
departed_at TIMESTAMP
) WITH (
KAFKA_TOPIC = 'flights',
PARTITIONS = 10,
KEY_FORMAT = 'KAFKA',
VALUE_FORMAT = 'JSON'
);
You will get the create or replace error if you try to remove a field by running CREATE OR REPLACE
with the new schema as it would be a non-backward compatible change:
Cannot upgrade data source: DataSource 'FLIGHTS' has schema = FLIGHT_ID INTEGER KEY, FROM_COUNTRY STRING, DEPARTED_AT TIMESTAMP which is not upgradeable to FLIGHT_ID INTEGER KEY, FROM_COUNTRY STRING. (The following columns are changed, missing or reordered: [DEPARTED_AT TIMESTAMP])
To successfully remove the field, you will first need to drop the stream without deleting the underlying topic (note that we don’t use the DELETE TOPIC
modifier):
DROP STREAM flights;
Then, recreate the stream with the new schema:
CREATE STREAM flights (
flight_id INT KEY,
from_country STRING -- without departed_at
) WITH (
KAFKA_TOPIC = 'flights',
PARTITIONS = 10,
KEY_FORMAT = 'KAFKA',
VALUE_FORMAT = 'JSON'
);
INTERESTING FACT - The underlying topic would still contain the data for the dropped column, but it won’t be visible or accessible by the users of that stream.
Was this article helpful?