Anton Liauchuk


Developing Custom Transformations in Kafka Connect

Custom transformations in Kafka Connect offer a powerful approach to manipulate data within connectors, allowing for tasks such as replacement, filtering, or enrichment of messages. The full code, along with other examples, is accessible in the repository.

To define custom transformation, it’s required to implement org.apache.kafka.connect.transforms.Transformation interface with main method apply(R record). In our example, we will replace the original message by new message that contains random generated id and name parameter equal to the pageid field from the original message.

Below is the implementation of the MessageTransformation class:

public class MessageTransformation<R extends ConnectRecord<R>> implements Transformation<R> {

 private static final String ID_FIELD = "id";
 private static final String NAME = "name";
 private static final String PURPOSE = "message model";
 private static final String NAME_DEFAULT_VALUE = "default_name";
 private static final String PAGEID_KEY = "pageid";

 @Override
 public R apply(R record) {
   var transformedSchema = SchemaBuilder.struct()
     .field(ID_FIELD, Schema.STRING_SCHEMA)
     .field(NAME, Schema.STRING_SCHEMA)
     .build();

   Schema schema = record.valueSchema();
   if (schema == null) {
     var transformed = new Struct(transformedSchema)
       .put(ID_FIELD, UUID.randomUUID().toString())
       .put(NAME, NAME_DEFAULT_VALUE);

     return record.newRecord(
       record.topic(),
       record.kafkaPartition(),
       Schema.STRING_SCHEMA,
       transformed.get(ID_FIELD),
       transformedSchema,
       transformed,
       record.timestamp()
     );
   } else {
     var value = Requirements.requireStruct(record.value(), PURPOSE);
     var transformed = new Struct(transformedSchema)
       .put(ID_FIELD, UUID.randomUUID().toString())
       .put(NAME, value.get(PAGEID_KEY).toString());

     return record.newRecord(
       record.topic(),
       record.kafkaPartition(),
       Schema.STRING_SCHEMA,
       transformed.get(ID_FIELD),
       transformedSchema,
       transformed,
       record.timestamp()
     );
   }
 }

 @Override
 public ConfigDef config() {
   return new ConfigDef();
 }

 @Override
 public void close() {

 }

 @Override
 public void configure(Map<String, ?> map) {

 }
}

This implementation is located in kafka-connect-transformation module. To apply this transformation, a connector config should contain:

transforms=MessageTransformation
transforms.MessageTransformation.type=com.uuidable.transformation.MessageTransformation

This is a basic example illustrating the concepts of transformation in Kafka Connect. For further development, this transformation can be extended with custom predicates and transformation-specific configurations.