Kafka Integration

Running producers and consumers with transparent Avro logical type conversion

Overview

One of the most powerful features of custom logical types is the ability to handle data transformation transparently at the driver level. This guide shows how to run the included Kafka demos where encryption and normalization happen automatically.

The "Magic" Explained

How does the application see Plain Text while Kafka stores Cyphertext? The conversion happens transparently in the Avro Logic Type layer during serialization/deserialization.

Producer Flow (Encryption)

    sequenceDiagram
        participant App as Application
        participant Avro as Avro Serializer
        participant Logic as LogicalType (AES)
        participant Kafka
        
        Note over App, Kafka: Writing a Record
        
        App->>Avro: write(QueryRecord)
        Note right of App: "secret-value"
        
        Avro->>Logic: conversion.toCharSequence()
        Logic->>Logic: Encrypt(value, key)
        Logic-->>Avro: "encrypted-cyphertext"
        
        Avro->>Kafka: Produce Message
        Note right of Kafka: Stored as Cyphertext
    

Consumer Flow (Decryption)

    sequenceDiagram
        participant Kafka
        participant Avro as Avro Deserializer
        participant Logic as LogicalType (AES)
        participant App as Application
        
        Note over Kafka, App: Reading a Record
        
        Kafka->>Avro: Consume Message
        Note right of Kafka: "encrypted-cyphertext"
        
        Avro->>Logic: conversion.fromCharSequence()
        Logic->>Logic: Decrypt(value, key)
        Logic-->>Avro: "secret-value"
        
        Avro->>App: read(QueryRecord)
        Note right of App: Plain Text restored!
    

Prerequisites

You need a running Kafka cluster. You can use the included Docker Compose file or a local Confluent Platform installation.

Option A: Docker Sandbox

The easiest way to run the full stack (Kafka + Schema Registry + Zookeeper).

# Start services and demos
docker-compose up demo-producer demo-consumer

# View logs
docker-compose logs -f demo-producer
docker-compose logs -f demo-consumer

Defaults

  • Bootstrap: kafka:9092
  • Schema Registry: http://schema-registry:8081
  • Topic: first_topic

Option B: Local Gradle Run

If you have Confluent CLI installed or a cluster running locally:

# 1. Start services
confluent local services start

# 2. Create topic
./gradlew createTopicDemo

# 3. Run Producer
./gradlew runProducerDemo

# 4. Run Consumer
./gradlew runConsumerDemo

What to Observe

👀 CLI Consumer (Raw)

If you consume the topic without the custom logical types on the classpath, you see the encrypted data:

{"secretName": {"string": "X3Glf/VIN92Ks..."}}

🔓 Java Consumer (Decrypted)

The demo consumer has the conversion logic, so it automatically decrypts the data:

Secret Name: yoshiko.kshlerin