Kafka Topic Links (*-topic)

Links to Kafka topic sub-components enable event streaming for applications with high-throughput messaging

📡 What These Links Do

Inject Kafka bootstrap servers into environment
Configure topic name and consumer group
Set up SASL authentication credentials
Enable SSL/TLS for secure connections

🔗 Supported Link Types

Link Type Use Case Generated Files
image_base-topic Application event streaming 📁 src/kafka_module/
fastapi-topic FastAPI event producer/consumer 📁 src/kafka_module/
nodejs-topic Node.js Kafka client 📁 src/kafka/
keda-topic Auto-scaling based on consumer lag 📄 scaledobject.yaml
airflow-topic Airflow DAG triggering 📄 values.yaml

⚙️ Generated Environment Variables

# Automatically configured when you create a Kafka topic link
KAFKA_BOOTSTRAP_SERVERS=auto-configured
KAFKA_TOPIC={topic_name}
KAFKA_CONSUMER_GROUP={consumer_group}
KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
KAFKA_SASL_MECHANISM=SCRAM-SHA-512
KAFKA_SASL_USERNAME={topic_user}
KAFKA_SASL_PASSWORD={topic_password}

🔧 Topic Storage Links (Tiered Storage)

Link Type Purpose Generated Files
topic-minio Tiered storage to MinIO 📄 tiered-storage.yaml
topic-seaweedfs Tiered storage to SeaweedFS 📄 tiered-storage.yaml
topic-database Sink to PostgreSQL (Kafka Connect) 📄 connector.yaml
topic-mongo_db Sink to MongoDB (Kafka Connect) 📄 connector.yaml
topic-apicurio_schema_registry Schema validation (overrides sink converters to Avro/JSON) 📄 *-sink.yaml converter override

📋 Schema Registry Link (topic-apicurio_schema_registry)

Link a topic to Apicurio Schema Registry to override value.converter on all sink connectors consuming from that topic. The converter JAR is included as a default in kafka_connector_additional_artifacts — no component-level link needed for this.

Link Attribute: schema_format

Value Converter Class Use When
AVRO (default) AvroConverter Producers use Confluent Avro serializer
JSON ExtJsonConverter Producers use Confluent JSON Schema serializer

What Gets Auto-Configured on Sink Connectors

value.converter: AvroConverter | ExtJsonConverter # based on schema_format
value.converter.apicurio.registry.url: http://{registry}-app-service.{ns}.svc:8080
value.converter.apicurio.registry.auto-register: true
value.converter.apicurio.registry.id-handler: Legacy4ByteIdHandler
# Camel sinks only (PostgreSQL, MinIO, SeaweedFS):
transforms: toJson # converts Struct to JSON string

BREAKING CHANGE: Old JSON Method Stops Working

Once this link exists, plain JSON messages from Bridge, k6, console producer, or any non-Avro producer will crash ALL sink connectors consuming from this topic.

Producer Schema-Linked Topic Normal Topic
Bridge (plain JSON) CONNECTOR CRASH Works
k6 load test CONNECTOR CRASH Works
Console producer CONNECTOR CRASH Works
Avro wire format producer Works N/A

The AvroConverter expects Confluent binary envelope (magic byte + 4-byte schema ID + binary payload). Plain JSON fails immediately. Since errors.tolerance defaults to none, one bad message crashes the connector task. This is per-topic — topics without the schema link are unaffected.

📈 KEDA Topic Scaling (keda-topic)

When KEDA links to a topic, it creates a ScaledObject for auto-scaling based on consumer lag

Attribute Description
keda_lagthreshold Consumer lag threshold to trigger scaling
keda_pollinginterval Polling interval (seconds)
keda_cooldownperiod Cooldown period (seconds)
keda_minreplicacount / keda_maxreplicacount Min/Max replica bounds