Click to copy
How to Generate Mock Data in ksqlDB
This docker command creates a topic called topictale_users if it doesn’t exist and generates user mock data into it:
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=users \
topic=topictale_users \
bootstrap-server=localhost:9092 \
msgRate=1
When you run this command, you will see the logs of records being produced at a rate of one per second:
['User_5'] --> ([ 1511261537317L | 'User_5' | 'Region_6' | 'MALE' ]) ts:1677670382971
['User_9'] --> ([ 1500373766464L | 'User_9' | 'Region_1' | 'OTHER' ]) ts:1677670383528
['User_6'] --> ([ 1488348059597L | 'User_6' | 'Region_4' | 'MALE' ]) ts:1677670384531
['User_2'] --> ([ 1503999868114L | 'User_2' | 'Region_2' | 'OTHER' ]) ts:1677670385530
['User_3'] --> ([ 1490726036883L | 'User_3' | 'Region_1' | 'FEMALE' ]) ts:1677670386533
To query the data, you can create a new stream for it like so:
CREATE STREAM topictale_users (
registertime BIGINT,
userid VARCHAR,
regionid VARCHAR,
gender VARCHAR
) WITH (
KAFKA_TOPIC='topictale_users',
VALUE_FORMAT='JSON'
);
The quickstart
value specifies which type of record it will generate. Although the help message for ksql-datagen
only list 3 different options – orders, users, and pageviews –, we discovered a few more by looking at the source code of ksql-datagen.
You can find the code snippets – to generate mock data and create the stream definition – for each of the record types we found:
We append the prefix topictale_
to any topic and stream being created in these code snippets. This helps ensure that the name is unique and doesn’t conflict with anything you may have created so far.
- clickstream_codes
- clickstream
- clickstream_users
- orders
- ratings
- users
- users_ — sort of an extended version of users.
- pageviews
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=clickstream_codes \
topic=topictale_clickstream_codes \
bootstrap-server=localhost:9092 \
msgRate=1
CREATE STREAM topictale_clickstream_codes (
code INT,
definition VARCHAR
) WITH (
KAFKA_TOPIC='topictale_clickstream_codes',
VALUE_FORMAT='JSON'
);
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=clickstream \
topic=topictale_clickstream \
bootstrap-server=localhost:9092 \
msgRate=1
CREATE STREAM topictale_clickstream (
ip varchar,
userid int,
remote_user varchar,
time varchar,
_time bigint,
request varchar,
status varchar,
bytes varchar,
referrer varchar,
agent varchar
) WITH (
KAFKA_TOPIC = 'topictale_clickstream',
VALUE_FORMAT = 'JSON'
);
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=clickstream_users \
topic=topictale_clickstream_users \
bootstrap-server=localhost:9092 \
msgRate=1
CREATE STREAM topictale_clickstream_users (
user_id INT,
username VARCHAR,
registered_at BIGINT,
first_name VARCHAR,
last_name VARCHAR,
city VARCHAR,
level VARCHAR
)
WITH (
KAFKA_TOPIC='topictale_clickstream_users',
VALUE_FORMAT='JSON'
);
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=orders \
topic=topictale_orders \
bootstrap-server=localhost:9092 \
msgRate=1
CREATE STREAM topictale_orders (
ordertime BIGINT,
orderid INT,
itemid VARCHAR,
orderunits DOUBLE,
address STRUCT<
city VARCHAR,
state VARCHAR,
zipcode BIGINT
>
) WITH (
KAFKA_TOPIC='topictale_orders',
VALUE_FORMAT='JSON'
);
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=ratings \
topic=topictale_ratings \
bootstrap-server=localhost:9092 \
msgRate=1
CREATE STREAM topictale_ratings (
rating_id BIGINT,
user_id INT,
stars INT,
route_id INT,
rating_time BIGINT,
channel VARCHAR,
message VARCHAR
) WITH (
KAFKA_TOPIC='topictale_ratings',
VALUE_FORMAT='JSON'
);
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=users \
topic=topictale_users \
bootstrap-server=localhost:9092 \
msgRate=1
CREATE STREAM topictale_users (
registertime BIGINT,
userid VARCHAR,
regionid VARCHAR,
gender VARCHAR
) WITH (
KAFKA_TOPIC='topictale_users',
VALUE_FORMAT='JSON'
);
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=users_ \
topic=topictale_users_extended \
bootstrap-server=localhost:9092 \
msgRate=1
CREATE STREAM topictale_users_extended (
registertime BIGINT,
userid VARCHAR,
regionid VARCHAR,
gender VARCHAR,
interests ARRAY<VARCHAR>,
contactinfo STRUCT<
phone VARCHAR,
city VARCHAR,
state VARCHAR,
zipcode VARCHAR
>
) WITH (
KAFKA_TOPIC='topictale_users_extended',
VALUE_FORMAT='JSON'
);
docker run --rm \
--network host \
confluentinc/ksqldb-examples:7.3.2 \
ksql-datagen \
quickstart=pageviews \
topic=topictale_pageviews \
bootstrap-server=localhost:9092 \
msgRate=1
CREATE STREAM topictale_pageviews (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH (
KAFKA_TOPIC='topictale_pageviews',
VALUE_FORMAT='JSON'
);