Click to copy
How to Create Stream from Existing Topic in ksqlDB
To create a ksqlDB stream from an existing Apache Kafka topic, use the CREATE STREAM
statement.
Here is an example that parses messages with JSON values:
CREATE STREAM IF NOT EXISTS flights_stream (
flight_id STRING KEY,
from_airport STRING,
to_airport STRING,
produced_at BYTES HEADER('producedAt')
) WITH (
KAFKA_TOPIC = 'source_topic',
VALUE_FORMAT = 'JSON'
);
For field types, not every SQL data type is supported as it needs to be mapped into Java classes internally. You can find here the full list of supported types.
If the source topic has an associated schema in your Schema Registry, skip the fields definition block.
CREATE STREAM IF NOT EXISTS flights_stream
WITH (
KAFKA_TOPIC = 'source_topic'
);
REMEMBER - You can create streams by either importing existing topics as streams, by creating empty streams, or by querying other streams.
Set the stream properties inside
the WITH
block. Check the official docs for the full list.
KAFKA_TOPIC
: topic to read — always required.
FORMAT
, KEY_FORMAT
, VALUE_FORMAT
: record deserializer — required if the underlying Apache Kafka topic doesn’t have a registered schema in Schema Registry. Valid formats are:
- NONE, DELIMITED, JSON, and KAFKA for raw topics.
- JSON_SR, AVRO, and PROTOBUF for topics with Schema Registry types.
VALUE_DELIMITER
: how to split the message value into columns when VALUE_FORMAT=’DELIMITED’
.
Creating a stream is a way to tell ksqlDB that you plan to use that topic in later queries.
To run queries, ksqlDB also needs to know about the schema of the messages. If you are not using a Schema Registry, you have to manually set the list of fields contained in the message alongside the type.