Click to copy
Streams • Reviewed for ksqlDB 0.29
How to Add Field to Stream in ksqlDB
If want to add a new column departed_at
to the existing stream below:
CREATE STREAM flights (
flight_id INT KEY,
from_country STRING
) WITH (
KAFKA_TOPIC = 'flights',
PARTITIONS = 10,
KEY_FORMAT = 'KAFKA',
VALUE_FORMAT = 'JSON'
);
Simply rerun the CREATE STREAM
statement by using the OR REPLACE
modifier and adding the new field definition at the end of the fields list:
CREATE OR REPLACE STREAM flights (
flight_id INT KEY,
from_country STRING,
departed_at TIMESTAMP
) WITH (
KAFKA_TOPIC = 'flights',
PARTITIONS = 10,
KEY_FORMAT = 'KAFKA',
VALUE_FORMAT = 'JSON'
);
It’s important to add it to the end of the fields list or you will get the error below:
Cannot upgrade data source: DataSource 'FLIGHTS' has schema = FLIGHT_ID INTEGER KEY, FROM_COUNTRY STRING which is not upgradeable to FLIGHT_ID INTEGER KEY, DEPARTED_AT TIMESTAMP, FROM_COUNTRY STRING. (The following columns are changed, missing or reordered: [FROM_COUNTRY STRING])
To add the field within the field list, you will first need to drop the existing stream without deleting the underlying topic, then run the CREATE
statement with the desired field order.
Was this article helpful?