Click to copy
How to Use Connector Converters in ksqlDB
When you create a new connector, you need to set
key.converter
or value.converter
to the converter classpath.
These two properties specify how to encode or decode the key and value of a message.
In the example below, the key will be encoded or decoded as a number of type Long, while the value will be treated as a String value:
"key.converter" = 'org.apache.kafka.connect.converters.LongConverter',
"value.converter" = 'org.apache.kafka.connect.storage.StringConverter',
Converters are the Kafka Connect equivalent to deserializers in Kafka Consumer, serializers in Kafka Producer, and serdes in Kafka Streams.
org.apache.kafka.connect.json.JsonConverter
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.converters.DoubleConverter
org.apache.kafka.connect.converters.FloatConverter
org.apache.kafka.connect.converters.IntegerConverter
org.apache.kafka.connect.converters.LongConverter
org.apache.kafka.connect.converters.NumberConverter
org.apache.kafka.connect.converters.ShortConverter
If you look at the Kafka Connect source code, you can find the common converters within the converters package, the StringConverter, and the JsonConverter.
Special mention to the JsonConverter. This converter can read and write raw JSON data from topics without a given schema.
It has two modes that can be enabled or disabled by the [key|value].converter.schemas.enable
config.
Most times you will be working with raw JSON records, so remember to set the schemas.enable
to false
.
"value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
"value.converter.schemas.enable" = 'false',
Otherwise, you will get the error
Caused by: org.apache.kafka.connect.errors.DataException:
JsonConverter with schemas.enable requires "schema" and "payload"
fields and may not contain additional fields. If you are trying
to deserialize plain JSON data, set schemas.enable=false in your
converter configuration.
If you are using Confluent Schema Registry, you will be looking at:
io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
io.confluent.connect.json.JsonSchemaConverter
These can be found within the Schema Registry project, at JsonSchemaConverter.java, at ProtobufConverter.java, and AvroConverter.java.