Click to copy

• Reviewed for ksqlDB 0.29

How to Use Connector Converters in ksqlDB

When you create a new connector, you need to set key.converter or value.converter to the converter classpath. These two properties specify how to encode or decode the key and value of a message.

In the example below, the key will be encoded or decoded as a number of type Long, while the value will be treated as a String value:

"key.converter" = 'org.apache.kafka.connect.converters.LongConverter',
  "value.converter" = 'org.apache.kafka.connect.storage.StringConverter',
Converters are the Kafka Connect equivalent to deserializers in Kafka Consumer, serializers in Kafka Producer, and serdes in Kafka Streams.

Base Converters

org.apache.kafka.connect.json.JsonConverter
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.converters.DoubleConverter
org.apache.kafka.connect.converters.FloatConverter
org.apache.kafka.connect.converters.IntegerConverter
org.apache.kafka.connect.converters.LongConverter
org.apache.kafka.connect.converters.NumberConverter
org.apache.kafka.connect.converters.ShortConverter
If you look at the Kafka Connect source code, you can find the common converters within the converters package, the StringConverter, and the JsonConverter.

The JsonConverter

Special mention to the JsonConverter. This converter can read and write raw JSON data from topics without a given schema.

It has two modes that can be enabled or disabled by the [key|value].converter.schemas.enable config.

Most times you will be working with raw JSON records, so remember to set the schemas.enable to false.

"value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
  "value.converter.schemas.enable" = 'false',

Otherwise, you will get the error

Caused by: org.apache.kafka.connect.errors.DataException: 
JsonConverter with schemas.enable requires "schema" and "payload" 
fields and may not contain additional fields. If you are trying 
to deserialize plain JSON data, set schemas.enable=false in your 
converter configuration.

Schemed Converters

If you are using Confluent Schema Registry, you will be looking at:

io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
io.confluent.connect.json.JsonSchemaConverter
These can be found within the Schema Registry project, at JsonSchemaConverter.java, at ProtobufConverter.java, and AvroConverter.java.
Discover what readers are saying
topictale
Get easy to digest how-tos on ksqlDB
Sign up
Please read our Privacy Policy to understand how we protect and manage your data.
You may also like