Click to copy

• Reviewed for ksqlDB 0.29

How to Generate Mock Data in ksqlDB

This docker command creates a topic called topictale_users if it doesn’t exist and generates user mock data into it:

docker run --rm \
  --network host \
  confluentinc/ksqldb-examples:7.3.2 \
  ksql-datagen \
    quickstart=users \
    topic=topictale_users \
    bootstrap-server=localhost:9092 \
    msgRate=1

When you run this command, you will see the logs of records being produced at a rate of one per second:

['User_5'] --> ([ 1511261537317L | 'User_5' | 'Region_6' | 'MALE' ]) ts:1677670382971
['User_9'] --> ([ 1500373766464L | 'User_9' | 'Region_1' | 'OTHER' ]) ts:1677670383528
['User_6'] --> ([ 1488348059597L | 'User_6' | 'Region_4' | 'MALE' ]) ts:1677670384531
['User_2'] --> ([ 1503999868114L | 'User_2' | 'Region_2' | 'OTHER' ]) ts:1677670385530
['User_3'] --> ([ 1490726036883L | 'User_3' | 'Region_1' | 'FEMALE' ]) ts:1677670386533

To query the data, you can create a new stream for it like so:

CREATE STREAM topictale_users (
  registertime BIGINT,
  userid VARCHAR,
  regionid VARCHAR,
  gender VARCHAR
) WITH (
  KAFKA_TOPIC='topictale_users',
  VALUE_FORMAT='JSON'
);

Snippets for each type of record

The quickstart value specifies which type of record it will generate. Although the help message for ksql-datagen only list 3 different options – orders, users, and pageviews –, we discovered a few more by looking at the source code of ksql-datagen.

You can find the code snippets – to generate mock data and create the stream definition – for each of the record types we found:

We append the prefix topictale_ to any topic and stream being created in these code snippets. This helps ensure that the name is unique and doesn’t conflict with anything you may have created so far.
  • clickstream_codes
  • docker run --rm \
      --network host \
      confluentinc/ksqldb-examples:7.3.2 \
      ksql-datagen \
        quickstart=clickstream_codes \
        topic=topictale_clickstream_codes \
        bootstrap-server=localhost:9092 \
        msgRate=1
    
    CREATE STREAM topictale_clickstream_codes (
        code INT,
        definition VARCHAR
    ) WITH (
        KAFKA_TOPIC='topictale_clickstream_codes',
        VALUE_FORMAT='JSON'
    );
    
  • clickstream
  • docker run --rm \
      --network host \
      confluentinc/ksqldb-examples:7.3.2 \
      ksql-datagen \
        quickstart=clickstream \
        topic=topictale_clickstream \
        bootstrap-server=localhost:9092 \
        msgRate=1
    
    CREATE STREAM topictale_clickstream (
        ip varchar,
        userid int,
        remote_user varchar,
        time varchar,
        _time bigint,
        request varchar,
        status varchar,
        bytes varchar,
        referrer varchar,
        agent varchar
    ) WITH (
        KAFKA_TOPIC = 'topictale_clickstream',
        VALUE_FORMAT = 'JSON'
    );
    
  • clickstream_users
  • docker run --rm \
      --network host \
      confluentinc/ksqldb-examples:7.3.2 \
      ksql-datagen \
        quickstart=clickstream_users \
        topic=topictale_clickstream_users \
        bootstrap-server=localhost:9092 \
        msgRate=1
    
    CREATE STREAM topictale_clickstream_users (
      user_id INT,
      username VARCHAR,
      registered_at BIGINT,
      first_name VARCHAR,
      last_name VARCHAR,
      city VARCHAR,
      level VARCHAR
    )
    WITH (
      KAFKA_TOPIC='topictale_clickstream_users',
      VALUE_FORMAT='JSON'
    );
    
  • orders
  • docker run --rm \
      --network host \
      confluentinc/ksqldb-examples:7.3.2 \
      ksql-datagen \
        quickstart=orders \
        topic=topictale_orders \
        bootstrap-server=localhost:9092 \
        msgRate=1
    
    CREATE STREAM topictale_orders (
      ordertime BIGINT,
      orderid INT,
      itemid VARCHAR,
      orderunits DOUBLE,
      address STRUCT<
        city VARCHAR,
        state VARCHAR,
        zipcode BIGINT
      >
    ) WITH (
      KAFKA_TOPIC='topictale_orders',
      VALUE_FORMAT='JSON'
    );
    
  • ratings
  • docker run --rm \
      --network host \
      confluentinc/ksqldb-examples:7.3.2 \
      ksql-datagen \
        quickstart=ratings \
        topic=topictale_ratings \
        bootstrap-server=localhost:9092 \
        msgRate=1
    
    CREATE STREAM topictale_ratings (
      rating_id BIGINT,
      user_id INT,
      stars INT,
      route_id INT,
      rating_time BIGINT,
      channel VARCHAR,
      message VARCHAR
    ) WITH (
      KAFKA_TOPIC='topictale_ratings',
      VALUE_FORMAT='JSON'
    );
    
  • users
  • docker run --rm \
      --network host \
      confluentinc/ksqldb-examples:7.3.2 \
      ksql-datagen \
        quickstart=users \
        topic=topictale_users \
        bootstrap-server=localhost:9092 \
        msgRate=1
    
    CREATE STREAM topictale_users (
      registertime BIGINT,
      userid VARCHAR,
      regionid VARCHAR,
      gender VARCHAR
    ) WITH (
      KAFKA_TOPIC='topictale_users',
      VALUE_FORMAT='JSON'
    );
    
  • users_ — sort of an extended version of users.
  • docker run --rm \
      --network host \
      confluentinc/ksqldb-examples:7.3.2 \
      ksql-datagen \
        quickstart=users_ \
        topic=topictale_users_extended \
        bootstrap-server=localhost:9092 \
        msgRate=1
    
    CREATE STREAM topictale_users_extended (
        registertime BIGINT,
        userid VARCHAR,
        regionid VARCHAR,
        gender VARCHAR,
        interests ARRAY<VARCHAR>,
        contactinfo STRUCT<
            phone VARCHAR,
            city VARCHAR,
            state VARCHAR,
            zipcode VARCHAR
        >
    ) WITH (
        KAFKA_TOPIC='topictale_users_extended',
        VALUE_FORMAT='JSON'
    );
    
  • pageviews
  • docker run --rm \
      --network host \
      confluentinc/ksqldb-examples:7.3.2 \
      ksql-datagen \
        quickstart=pageviews \
        topic=topictale_pageviews \
        bootstrap-server=localhost:9092 \
        msgRate=1
    
    CREATE STREAM topictale_pageviews (
        viewtime BIGINT,
        userid VARCHAR,
        pageid VARCHAR
    ) WITH (
        KAFKA_TOPIC='topictale_pageviews',
        VALUE_FORMAT='JSON'
    );
    
Discover what readers are saying
topictale
Get easy to digest how-tos on ksqlDB
Sign up
Please read our Privacy Policy to understand how we protect and manage your data.
You may also like