Click to copy

• Reviewed for ksqlDB 0.29

How to Create Stream from Existing Topic in ksqlDB

To create a ksqlDB stream from an existing Apache Kafka topic, use the CREATE STREAM statement.

Here is an example that parses messages with JSON values:

CREATE STREAM IF NOT EXISTS flights_stream (
  flight_id    STRING KEY,
  from_airport STRING,
  to_airport   STRING,
  produced_at  BYTES HEADER('producedAt')
) WITH (
  KAFKA_TOPIC  = 'source_topic',
  VALUE_FORMAT = 'JSON'
);
For field types, not every SQL data type is supported as it needs to be mapped into Java classes internally. You can find here the full list of supported types.

If the source topic has an associated schema in your Schema Registry, skip the fields definition block.

CREATE STREAM IF NOT EXISTS flights_stream
WITH (
  KAFKA_TOPIC = 'source_topic'
);
REMEMBER - You can create streams by either importing existing topics as streams, by creating empty streams, or by querying other streams.

Stream properties

Set the stream properties inside the WITH block. Check the official docs for the full list.

KAFKA_TOPIC: topic to read — always required.

FORMAT, KEY_FORMAT, VALUE_FORMAT: record deserializer — required if the underlying Apache Kafka topic doesn’t have a registered schema in Schema Registry. Valid formats are:

  • NONE, DELIMITED, JSON, and KAFKA for raw topics.
  • JSON_SR, AVRO, and PROTOBUF for topics with Schema Registry types.
  • VALUE_DELIMITER: how to split the message value into columns when VALUE_FORMAT=’DELIMITED’.

Understanding streams

Creating a stream is a way to tell ksqlDB that you plan to use that topic in later queries.

To run queries, ksqlDB also needs to know about the schema of the messages. If you are not using a Schema Registry, you have to manually set the list of fields contained in the message alongside the type.

Discover what readers are saying
topictale
Get easy to digest how-tos on ksqlDB
Sign up
Please read our Privacy Policy to understand how we protect and manage your data.
You may also like