Strimzi-managed Kafka cluster with KRaft mode. Topics are sub-components. Link a topic to a sink (MongoDB, PostgreSQL, ClickHouse, MinIO, SeaweedFS) and Kafka Connect + the connector are auto-configured.
| Attribute | Example | Description |
|---|---|---|
namespace REQ |
kafka |
Kubernetes namespace for all Kafka resources |
replicas |
3 |
Broker + controller count (both node pools) |
kafka_version |
4.1.1 |
Kafka version in Strimzi CR |
broker_storage / controller_storage |
100Gi |
PVC size per broker / controller node |
broker_cpu_request / broker_mem_request |
500m / 1Gi |
Broker node pool resources |
controller_cpu_request / controller_mem_request |
250m / 512Mi |
Controller node pool resources |
kafka_connector_additional_artifacts |
url: ... sha512sum: ... |
Extra connector JARs to include in the KafkaConnect build image (textarea, one artifact per url+sha512sum pair) |
Each topic generates a KafkaTopic CR. Topics can link to sink destinations (MongoDB, PostgreSQL, ClickHouse, MinIO, SeaweedFS) to auto-configure Kafka Connect connectors.
| Attribute | Example | Impact |
|---|---|---|
kafka_partition_size REQ |
3 |
Number of partitions in the KafkaTopic spec |
kafka_replication_factor |
2 |
Replication factor in the KafkaTopic spec |
errors_handling |
true |
Enables errors.tolerance: all + logging on connectors linked to this topic |
Generates a KafkaBridge CR + KafkaUser with TLS auth. Exposes Kafka via HTTP REST on port 8080. If Prometheus is linked, enables bridge metrics. If gateway is linked, generates an Ingress for external access.
| Link Type | Direction | What Gets Automated |
|---|---|---|
topic-mongo_db |
Topic → MongoDB | KafkaConnect + MongoSinkConnector auto-configured |
topic-db |
Topic → CNPG (PostgreSQL) | KafkaConnect + CamelPostgresqlSink auto-configured |
topic-clickhouse_db |
Topic → ClickHouse | KafkaConnect + ClickHouseSinkConnector auto-configured |
topic-bucket |
Topic → MinIO | KafkaConnect + CamelMinioSink auto-configured |
topic-swbucket |
Topic → SeaweedFS | KafkaConnect + CamelMinioSink (S3-compatible) auto-configured |
mongo_db-topic |
MongoDB → Topic | MongoDB CDC source (change data capture into topic) |
prometheus-kafka |
Prometheus → Kafka | JMX metrics on Kafka + KafkaExporter + Connect metrics (if connectors exist) |
gateway-kafka |
Gateway → Kafka | Ingress for bridge sub-component (external HTTP access) |
apicurio_schema_registry-kafka |
Apicurio → Kafka | KafkaSQL storage for Apicurio (no impact on KafkaConnect) |
topic-apicurio_schema_registry |
Topic → Schema Registry | Per-topic converter override on sink connectors (Avro/JSON). Breaks plain JSON producers. |
When any topic links to a sink, KafkaConnect is auto-generated with a custom-built image containing the required connector JARs. Each link also generates a KafkaConnector CR. Connector JARs are pulled from the link's kafka_connector_image + sha512sum attributes.
| Sink | Connector Class | Auto-Pulled from Target |
|---|---|---|
| MongoDB | MongoSinkConnector | connection.uri, database, collection from mongo_db sub-component |
| PostgreSQL (CNPG) | CamelPostgresqlSink | serverName, port, databaseName, username, password from db sub-component |
| ClickHouse | ClickHouseSinkConnector | hostname, port (8123), database, username, password from clickhouse_db sub-component |
| MinIO | CamelMinioSink | accessKey, secretKey, endpoint, bucketName from bucket sub-component |
| SeaweedFS | CamelMinioSink (S3-compatible) | accessKey, secretKey, endpoint, bucketName from swbucket sub-component |
| Generated | Description |
|---|---|
| KafkaConnect CR | Generated once (shared by all connectors). Builds custom image with all required JARs. TLS auth to Kafka cluster. |
| KafkaConnector CR | One per sink link. Topic name set to the source topic's name. Connection details auto-pulled from target sub-component. |
| Connect internal topics | KafkaTopics for offsets, configs, status (auto-created when any connector exists) |
| Sink credentials in cloud.env | Database/bucket credentials and host endpoints stored in cloud.env (SOPS encrypted) |
These attributes on the topic sink link control the KafkaConnect build and connector behavior.
| Attribute | Example | Effect |
|---|---|---|
kafka_connector_image REQ |
Maven JAR/TGZ URL | Connector artifact URL, included in KafkaConnect build image |
sha512sum REQ |
SHA-512 hash | Integrity verification for the connector artifact |
| File | Condition | Contains |
|---|---|---|
k8s/deploy/base/kafka-cluster.yaml |
Always | KafkaNodePool (broker + controller) + Kafka CR |
k8s/deploy/base/strimzi-control.yaml |
Always | Strimzi feature gates and operator config |
k8s/deploy/base/topic.yaml |
If topic sub-components exist | KafkaTopic CRs + Connect internal topics (offsets, configs, status) if any connector exists |
k8s/deploy/base/bridge.yaml |
If bridge sub-component | KafkaBridge CR + KafkaUser with TLS auth |
k8s/deploy/base/bridge-ingress.yaml |
If gateway linked | Ingress for bridge HTTP access with TLS |
k8s/deploy/base/connectors/kafka-connect.yaml |
If any sink link | KafkaConnect CR — builds custom image with connector JARs from link attributes |
k8s/deploy/base/connectors/mongodb-sink.yaml |
If topic-mongo_db link | MongoSinkConnector KafkaConnector CR |
k8s/deploy/base/connectors/db-sink.yaml |
If topic-db link | CamelPostgresqlSink KafkaConnector CR |
k8s/deploy/base/connectors/clickhouse-sink.yaml |
If topic-clickhouse_db link | ClickHouseSinkConnector KafkaConnector CR |
k8s/deploy/base/connectors/minio-sink.yaml |
If topic-bucket link | CamelMinioSink KafkaConnector CR |
k8s/deploy/base/connectors/swbucket-sink.yaml |
If topic-swbucket link | CamelMinioSink (S3-compatible) KafkaConnector CR |
k8s/deploy/base/monitor/cm-kafka-metrics.yaml |
If prometheus linked | Kafka JMX metrics ConfigMap |
k8s/deploy/base/monitor/pod-monitor.yaml |
If prometheus linked | PodMonitor CR for Prometheus scraping |
k8s/deploy/base/monitor/cm-connect-metrics.yaml |
If prometheus + any connector | KafkaConnect JMX metrics ConfigMap |
k8s/deploy/base/secret/cloud.env |
Always | Sink credentials — accessKey, secretKey, host, port, connection URIs per linked sink |
When Prometheus links to Kafka, the following are auto-enabled:
| What | Effect |
|---|---|
| JMX Prometheus Exporter on Kafka | metricsConfig added to Kafka CR + kafka-metrics ConfigMap |
| KafkaExporter | Consumer lag, topic/partition metrics exported for all topics and groups |
| PodMonitor | Prometheus scraping configured via PodMonitor CR |
| Connect metrics (if connectors exist) | JMX Prometheus Exporter on KafkaConnect + connect-metrics ConfigMap |
| Bridge metrics (if bridge exists) | enableMetrics: true on KafkaBridge CR |
Only one link is needed: topic-apicurio_schema_registry (per-topic). The converter JAR is included as a default in kafka_connector_additional_artifacts. The apicurio_schema_registry-kafka component link is for KafkaSQL storage only — it has no impact on KafkaConnect.
Link a topic sub-component to the schema registry. Only sink connectors consuming from that specific topic get their converter overridden. Global value.converter stays StringConverter — Bridge, k6, and console producers continue to work on non-schema topics.
| Link Attribute | Values | Converter Class |
|---|---|---|
schema_format |
AVRO (default) |
io.apicurio.registry.utils.converter.AvroConverter |
schema_format |
JSON |
io.apicurio.registry.utils.converter.ExtJsonConverter |
| Sink | Converter Override | ToJSON SMT | Why |
|---|---|---|---|
| PostgreSQL (Camel) | AvroConverter / ExtJsonConverter | Yes | Camel expects JSON string, not Struct |
| MinIO (Camel) | AvroConverter / ExtJsonConverter | Yes | Camel expects JSON string, not Struct |
| SeaweedFS (Camel) | AvroConverter / ExtJsonConverter | Yes | Camel expects JSON string, not Struct |
| ClickHouse (native) | AvroConverter / ExtJsonConverter | No | Native connector handles Struct |
| MongoDB (native) | AvroConverter / ExtJsonConverter | No | Native connector handles Struct |
Wire format: All converters use Legacy4ByteIdHandler for Confluent wire format compatibility (4-byte schema ID). Registry URL is base URL only (http://host:8080). Camel sinks (PostgreSQL, MinIO, SeaweedFS) additionally get a ToJSON SMT to convert Struct to JSON string. The converter JAR and ToJSON packages are included as defaults in kafka_connector_additional_artifacts.
Once a topic is linked to the schema registry via topic-apicurio_schema_registry, the old method of sending plain JSON messages no longer works for that topic. ALL producers MUST use the Confluent Avro wire format.
| Producer Method | Schema-Linked Topic | Normal Topic |
|---|---|---|
| Plain JSON (Bridge, k6, console producer) | ALL SINK CONNECTORS CRASH | Works normally |
| Avro wire format (Confluent serializer) | Works — converters auto-deserialize | N/A (use plain JSON) |
Why it breaks: The AvroConverter expects Confluent binary envelope (magic byte 0x00 + 4-byte schema ID + binary payload). Plain JSON fails deserialization immediately. Since errors.tolerance defaults to none, a single malformed message crashes the entire connector task.
This is per-topic, not system-wide. Topics without a schema registry link continue to work with plain JSON normally. Only topics with topic-apicurio_schema_registry links are affected.