Click to copy

• Reviewed for ksqlDB 0.29

How to Add Field to Stream in ksqlDB

If want to add a new column departed_at to the existing stream below:

CREATE STREAM flights (
  flight_id    INT KEY,
  from_country STRING
) WITH (
  KAFKA_TOPIC  = 'flights',
  PARTITIONS   = 10,
  KEY_FORMAT   = 'KAFKA',
  VALUE_FORMAT = 'JSON'
);

Simply rerun the CREATE STREAM statement by using the OR REPLACE modifier and adding the new field definition at the end of the fields list:

CREATE OR REPLACE STREAM flights (
  flight_id    INT KEY,
  from_country STRING,
  departed_at  TIMESTAMP
) WITH (
  KAFKA_TOPIC  = 'flights',
  PARTITIONS   = 10,
  KEY_FORMAT   = 'KAFKA',
  VALUE_FORMAT = 'JSON'
);

It’s important to add it to the end of the fields list or you will get the error below:

Cannot upgrade data source: DataSource 'FLIGHTS' has schema = FLIGHT_ID INTEGER KEY, FROM_COUNTRY STRING which is not upgradeable to FLIGHT_ID INTEGER KEY, DEPARTED_AT TIMESTAMP, FROM_COUNTRY STRING. (The following columns are changed, missing or reordered: [FROM_COUNTRY STRING])

Adding the field within the field list

To add the field within the field list, you will first need to drop the existing stream without deleting the underlying topic, then run the CREATE statement with the desired field order.

Discover what readers are saying
topictale
Get easy to digest how-tos on ksqlDB
Sign up
Please read our Privacy Policy to understand how we protect and manage your data.
You may also like