Kafka

Strimzi-managed Kafka cluster with KRaft mode. Topics are sub-components. Link a topic to a sink (MongoDB, PostgreSQL, ClickHouse, MinIO, SeaweedFS) and Kafka Connect + the connector are auto-configured.

Architecture

Kafka Brokers - KafkaNodePool (broker role) with persistent storage
KRaft Controllers - KafkaNodePool (controller role), ZooKeeper-less
Kafka Connect - Auto-generated when any topic links to a sink. Builds custom image with connector JARs.
Topics - Sub-components, each generates a KafkaTopic CR
Bridge - Sub-component, HTTP REST interface to Kafka (KafkaBridge CR)

Attributes

Attribute Example Description
namespace REQ kafka Kubernetes namespace for all Kafka resources
replicas 3 Broker + controller count (both node pools)
kafka_version 4.1.1 Kafka version in Strimzi CR
broker_storage / controller_storage 100Gi PVC size per broker / controller node
broker_cpu_request / broker_mem_request 500m / 1Gi Broker node pool resources
controller_cpu_request / controller_mem_request 250m / 512Mi Controller node pool resources
kafka_connector_additional_artifacts url: ...
sha512sum: ...
Extra connector JARs to include in the KafkaConnect build image (textarea, one artifact per url+sha512sum pair)

Sub-Components

topic - KafkaTopic

Each topic generates a KafkaTopic CR. Topics can link to sink destinations (MongoDB, PostgreSQL, ClickHouse, MinIO, SeaweedFS) to auto-configure Kafka Connect connectors.

Attribute Example Impact
kafka_partition_size REQ 3 Number of partitions in the KafkaTopic spec
kafka_replication_factor 2 Replication factor in the KafkaTopic spec
errors_handling true Enables errors.tolerance: all + logging on connectors linked to this topic

bridge - KafkaBridge (HTTP REST)

Generates a KafkaBridge CR + KafkaUser with TLS auth. Exposes Kafka via HTTP REST on port 8080. If Prometheus is linked, enables bridge metrics. If gateway is linked, generates an Ingress for external access.

Links

Link Type Direction What Gets Automated
topic-mongo_db Topic → MongoDB KafkaConnect + MongoSinkConnector auto-configured
topic-db Topic → CNPG (PostgreSQL) KafkaConnect + CamelPostgresqlSink auto-configured
topic-clickhouse_db Topic → ClickHouse KafkaConnect + ClickHouseSinkConnector auto-configured
topic-bucket Topic → MinIO KafkaConnect + CamelMinioSink auto-configured
topic-swbucket Topic → SeaweedFS KafkaConnect + CamelMinioSink (S3-compatible) auto-configured
mongo_db-topic MongoDB → Topic MongoDB CDC source (change data capture into topic)
prometheus-kafka Prometheus → Kafka JMX metrics on Kafka + KafkaExporter + Connect metrics (if connectors exist)
gateway-kafka Gateway → Kafka Ingress for bridge sub-component (external HTTP access)
apicurio_schema_registry-kafka Apicurio → Kafka KafkaSQL storage for Apicurio (no impact on KafkaConnect)
topic-apicurio_schema_registry Topic → Schema Registry Per-topic converter override on sink connectors (Avro/JSON). Breaks plain JSON producers.

Topic → Sink Links (Kafka Connect)

When any topic links to a sink, KafkaConnect is auto-generated with a custom-built image containing the required connector JARs. Each link also generates a KafkaConnector CR. Connector JARs are pulled from the link's kafka_connector_image + sha512sum attributes.

Auto-Configured per Sink Type

Sink Connector Class Auto-Pulled from Target
MongoDB MongoSinkConnector connection.uri, database, collection from mongo_db sub-component
PostgreSQL (CNPG) CamelPostgresqlSink serverName, port, databaseName, username, password from db sub-component
ClickHouse ClickHouseSinkConnector hostname, port (8123), database, username, password from clickhouse_db sub-component
MinIO CamelMinioSink accessKey, secretKey, endpoint, bucketName from bucket sub-component
SeaweedFS CamelMinioSink (S3-compatible) accessKey, secretKey, endpoint, bucketName from swbucket sub-component

What Gets Generated per Sink Link

Generated Description
KafkaConnect CR Generated once (shared by all connectors). Builds custom image with all required JARs. TLS auth to Kafka cluster.
KafkaConnector CR One per sink link. Topic name set to the source topic's name. Connection details auto-pulled from target sub-component.
Connect internal topics KafkaTopics for offsets, configs, status (auto-created when any connector exists)
Sink credentials in cloud.env Database/bucket credentials and host endpoints stored in cloud.env (SOPS encrypted)

Connector Link Attributes

These attributes on the topic sink link control the KafkaConnect build and connector behavior.

Attribute Example Effect
kafka_connector_image REQ Maven JAR/TGZ URL Connector artifact URL, included in KafkaConnect build image
sha512sum REQ SHA-512 hash Integrity verification for the connector artifact

Generated Files

File Condition Contains
k8s/deploy/base/kafka-cluster.yaml Always KafkaNodePool (broker + controller) + Kafka CR
k8s/deploy/base/strimzi-control.yaml Always Strimzi feature gates and operator config
k8s/deploy/base/topic.yaml If topic sub-components exist KafkaTopic CRs + Connect internal topics (offsets, configs, status) if any connector exists
k8s/deploy/base/bridge.yaml If bridge sub-component KafkaBridge CR + KafkaUser with TLS auth
k8s/deploy/base/bridge-ingress.yaml If gateway linked Ingress for bridge HTTP access with TLS
k8s/deploy/base/connectors/kafka-connect.yaml If any sink link KafkaConnect CR — builds custom image with connector JARs from link attributes
k8s/deploy/base/connectors/mongodb-sink.yaml If topic-mongo_db link MongoSinkConnector KafkaConnector CR
k8s/deploy/base/connectors/db-sink.yaml If topic-db link CamelPostgresqlSink KafkaConnector CR
k8s/deploy/base/connectors/clickhouse-sink.yaml If topic-clickhouse_db link ClickHouseSinkConnector KafkaConnector CR
k8s/deploy/base/connectors/minio-sink.yaml If topic-bucket link CamelMinioSink KafkaConnector CR
k8s/deploy/base/connectors/swbucket-sink.yaml If topic-swbucket link CamelMinioSink (S3-compatible) KafkaConnector CR
k8s/deploy/base/monitor/cm-kafka-metrics.yaml If prometheus linked Kafka JMX metrics ConfigMap
k8s/deploy/base/monitor/pod-monitor.yaml If prometheus linked PodMonitor CR for Prometheus scraping
k8s/deploy/base/monitor/cm-connect-metrics.yaml If prometheus + any connector KafkaConnect JMX metrics ConfigMap
k8s/deploy/base/secret/cloud.env Always Sink credentials — accessKey, secretKey, host, port, connection URIs per linked sink

Prometheus → Kafka Link

When Prometheus links to Kafka, the following are auto-enabled:

What Effect
JMX Prometheus Exporter on Kafka metricsConfig added to Kafka CR + kafka-metrics ConfigMap
KafkaExporter Consumer lag, topic/partition metrics exported for all topics and groups
PodMonitor Prometheus scraping configured via PodMonitor CR
Connect metrics (if connectors exist) JMX Prometheus Exporter on KafkaConnect + connect-metrics ConfigMap
Bridge metrics (if bridge exists) enableMetrics: true on KafkaBridge CR

Schema Registry Integration (Apicurio)

Only one link is needed: topic-apicurio_schema_registry (per-topic). The converter JAR is included as a default in kafka_connector_additional_artifacts. The apicurio_schema_registry-kafka component link is for KafkaSQL storage only — it has no impact on KafkaConnect.

Per-Topic Link (topic-apicurio_schema_registry)

Link a topic sub-component to the schema registry. Only sink connectors consuming from that specific topic get their converter overridden. Global value.converter stays StringConverter — Bridge, k6, and console producers continue to work on non-schema topics.

Link Attribute Values Converter Class
schema_format AVRO (default) io.apicurio.registry.utils.converter.AvroConverter
schema_format JSON io.apicurio.registry.utils.converter.ExtJsonConverter

Converter Override per Sink Type

Sink Converter Override ToJSON SMT Why
PostgreSQL (Camel) AvroConverter / ExtJsonConverter Yes Camel expects JSON string, not Struct
MinIO (Camel) AvroConverter / ExtJsonConverter Yes Camel expects JSON string, not Struct
SeaweedFS (Camel) AvroConverter / ExtJsonConverter Yes Camel expects JSON string, not Struct
ClickHouse (native) AvroConverter / ExtJsonConverter No Native connector handles Struct
MongoDB (native) AvroConverter / ExtJsonConverter No Native connector handles Struct

Wire format: All converters use Legacy4ByteIdHandler for Confluent wire format compatibility (4-byte schema ID). Registry URL is base URL only (http://host:8080). Camel sinks (PostgreSQL, MinIO, SeaweedFS) additionally get a ToJSON SMT to convert Struct to JSON string. The converter JAR and ToJSON packages are included as defaults in kafka_connector_additional_artifacts.

BREAKING CHANGE: Schema-Linked Topics Reject Plain JSON

Once a topic is linked to the schema registry via topic-apicurio_schema_registry, the old method of sending plain JSON messages no longer works for that topic. ALL producers MUST use the Confluent Avro wire format.

Producer Method Schema-Linked Topic Normal Topic
Plain JSON (Bridge, k6, console producer) ALL SINK CONNECTORS CRASH Works normally
Avro wire format (Confluent serializer) Works — converters auto-deserialize N/A (use plain JSON)

Why it breaks: The AvroConverter expects Confluent binary envelope (magic byte 0x00 + 4-byte schema ID + binary payload). Plain JSON fails deserialization immediately. Since errors.tolerance defaults to none, a single malformed message crashes the entire connector task.

This is per-topic, not system-wide. Topics without a schema registry link continue to work with plain JSON normally. Only topics with topic-apicurio_schema_registry links are affected.