Supervise su clúster de Apache Kafka autogestionado que se ejecuta en Kubernetes desplegando el OpenTelemetry Collector para recopilar y reenviar métricas a New Relic.
Arquitectura
New Relic admite dos enfoques para el monitoreo de Kubernetes Kafka autoadministrado: el agente de Java de OpenTelemetry o el Prometheus JMX Exporter. Los siguientes diagramas ilustran el flujo de datos para cada enfoque.

Pasos de instalación
Siga estos pasos para configurar un monitoreo integral de Kafka instalando el agente de Java de OpenTelemetry en sus brokers y desplegando un recolector para recopilar y enviar métricas y logs a New Relic.
Antes de que empieces
Asegúrese de tener:
- Una cuenta de New Relic con un
- Clúster de Kubernetes con acceso
kubectl - Kafka desplegado como un StatefulSet
- Capacidad para modificar y redesplegar el StatefulSet de Kafka
Desplegar OpenTelemetry Collector
Despliegue el recolector OpenTelemetry en su clúster. Este paso también crea el ConfigMap kafka-jmx-config que define qué métricas JMX recopila el agente de Java de cada pod de broker. El recolector debe estar en ejecución antes de reiniciar los brokers de Kafka en el siguiente paso.
Paso 1. Crear secreto de credenciales de New Relic
Sugerencia
Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.
Paso 2. Crear values.yaml con la configuración del recolector
Tanto los recolectores NRDOT como los de OpenTelemetry utilizan una configuración idéntica. Seleccione su imagen de recolector preferida:
Para opciones de configuración avanzadas, consulte:
Documentación del receptor de métricas de Kafka
Paso 3. Instalar OpenTelemetry Collector con Helm
bash$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \>--install \>--namespace newrelic \>--create-namespace \>-f values.yamlPaso 4. Verifique el despliegue
bash$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50
Paso 1. Crear secreto de credenciales de New Relic
Sugerencia
Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.
Paso 2. Crear archivos de manifiesto
Tanto los recolectores NRDOT como los de OpenTelemetry utilizan una configuración idéntica. Solo difiere la imagen del contenedor. Ambos también requieren el ConfigMap kafka-jmx-config aplicado a su namespace de Kafka.
Crear kafka-jmx-config.yaml - configuración de métricas JMX para el agente de Java (aplicar a su namespace de Kafka):
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-config namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-jmx-config.yaml: | --- rules: # Per-topic custom metrics - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
- bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: Number of active controllers in the cluster unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=* mapping: CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: Committed heap memory type: gauge
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GCPaso 3. Despliegue los manifiestos
$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$
$# Apply JMX ConfigMap to the Kafka namespace$kubectl apply -f kafka-jmx-config.yaml$
$# Apply collector ConfigMap$kubectl apply -f collector-configmap.yaml$
$# Apply Deployment and Service$kubectl apply -f collector-deployment.yamlPaso 4. Verifique el despliegue
$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$
$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app=otel-collector --tail=50Configurar el StatefulSet de Kafka para el agente de Java
Ahora que el recolector se está ejecutando, aplique un parche a su StatefulSet de Kafka para agregar un contenedor de inicio que descargue el JAR del agente de Java de OpenTelemetry, y luego adjúntelo a la JVM del broker de Kafka a través de KAFKA_OPTS.
Agregue las siguientes secciones a su manifiesto de StatefulSet de Kafka existente:
spec: template: spec: # 1. Init container: downloads OTel Java agent JAR before Kafka starts initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - | wget -O /otel-agent/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach OTel Java agent to the Kafka broker JVM env: - name: KAFKA_OPTS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.jmx.enabled=true -Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.runtime-telemetry.enabled=false -Dotel.metric.export.interval=30000 volumeMounts: - name: otel-agent mountPath: /otel-agent - name: jmx-config mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules volumes: - name: otel-agent emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-config # Deployed with the collector in the previous stepSugerencia
El ConfigMap kafka-jmx-config se desplegó con el recolector en el paso anterior. El valor otel.exporter.otlp.endpoint http://otel-collector.newrelic.svc.cluster.local:4317 asume que el recolector está desplegado en el namespace newrelic con el nombre de servicio otel-collector. Actualícelo para que coincida con el DNS real de su servicio recolector si es diferente.
Aplique su StatefulSet actualizado y espere a que los pods se actualicen:
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace(Opcional) Instrumentar aplicaciones productoras o consumidoras
Importante
Soporte de lenguajes: actualmente, solo se admiten aplicaciones Java para la instrumentación de clientes Kafka mediante el agente de Java de OpenTelemetry.
Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka que se ejecutan en Kubernetes, agregue el agente de Java de OpenTelemetry a esos pods de aplicación.
Agregue un contenedor init y variables de entorno al despliegue de su aplicación:
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.service.name=order-process-service -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-agent mountPath: /otel-agent
volumes: - name: otel-agent emptyDir: {}Parámetro de configuración
La siguiente tabla describe los parámetros de configuración clave:
Parámetro | Descripción |
|---|---|
| Reemplace con un nombre único para su aplicación productora o consumidora |
| Reemplace con el mismo nombre de clúster utilizado en la configuración de su broker |
| Reemplace con el nombre DNS real de su servicio recolector (
) |
El agente de Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando latencias de solicitudes, métricas de rendimiento, tasas de errores y rastreo distribuido. Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.
Siga estos pasos para configurar un monitoreo integral de Kafka instalando Prometheus JMX Exporter en sus pods de broker y desplegando un recolector para recopilar y enviar métricas a New Relic.
Antes de que empieces
Asegúrese de tener:
- Una cuenta de New Relic con un
- Clúster de Kubernetes con acceso
kubectl - Kafka desplegado como un StatefulSet con un servicio headless (para nombres DNS de pod estables)
- Capacidad para modificar y redesplegar el StatefulSet de Kafka
Crear ConfigMap de métricas JMX
Cree un ConfigMap que contenga la configuración de JMX Exporter que defina qué métricas de Kafka recolectar. Este ConfigMap se montará en cada pod del broker de Kafka.
Guardar como kafka-jmx-config.yaml. Aplíquelo al namespace donde está desplegado Kafka:
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-metrics namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-metrics-config.yml: | startDelaySeconds: 0 lowercaseOutputName: true lowercaseOutputLabelNames: true
rules: # Cluster-level controller metrics - pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value' name: kafka_cluster_topic_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value' name: kafka_cluster_partition_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value' name: kafka_broker_fenced_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value' name: kafka_partition_non_preferred_leader type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value' name: kafka_partition_offline type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value' name: kafka_controller_active_count type: GAUGE
# Broker-level replica metrics - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value' name: kafka_partition_under_min_isr type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value' name: kafka_broker_leader_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value' name: kafka_partition_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value' name: kafka_partition_under_replicated type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value' name: kafka_max_lag type: GAUGE
# Broker topic metrics (totals) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count' name: kafka_message_count type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "out"
# Per-topic metrics (only appear after traffic flows) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count' name: kafka_prod_msg_count type: COUNTER labels: topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "out"
# Request metrics - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile' name: kafka_request_time_99p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value' name: kafka_request_queue type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value' name: kafka_purgatory_size type: GAUGE labels: type: "$1"
# Controller stats - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count' name: kafka_leader_election_rate type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count' name: kafka_unclean_election_rate type: COUNTER
# JVM Garbage Collection - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount' name: jvm_gc_collections_count type: COUNTER labels: name: "$1"
# JVM Memory - pattern: 'java.lang<type=Memory><HeapMemoryUsage>max' name: jvm_memory_heap_max type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used' name: jvm_memory_heap_used type: GAUGE
# JVM Threading and System - pattern: 'java.lang<type=Threading><>ThreadCount' name: jvm_thread_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad' name: jvm_system_cpu_utilization type: GAUGE
# Broker uptime - pattern: 'java.lang<type=Runtime><>Uptime' name: kafka_broker_uptime type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above) - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count' name: kafka_request_time_total type: COUNTER labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile' name: kafka_request_time_50p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean' name: kafka_request_time_avg type: GAUGE labels: type: "$1"
# Log flush metrics - pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count' name: kafka_logs_flush_count type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile' name: kafka_logs_flush_time_50p type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile' name: kafka_logs_flush_time_99p type: GAUGE
# JVM GC elapsed time - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime' name: jvm_gc_collections_elapsed type: COUNTER labels: name: "$1"
# JVM Memory heap committed - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed' name: jvm_memory_heap_committed type: GAUGE
# JVM class loading - pattern: 'java.lang<type=ClassLoading><>LoadedClassCount' name: jvm_class_count type: GAUGE
# Additional JVM OS metrics - pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage' name: jvm_system_cpu_load_1m type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors' name: jvm_cpu_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad' name: jvm_cpu_recent_utilization type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount' name: jvm_file_descriptor_count type: GAUGE
# JVM Memory Pool - pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used' name: jvm_memory_pool_used type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max' name: jvm_memory_pool_max type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used' name: jvm_memory_pool_used_after_last_gc type: GAUGE labels: name: "$1"Sugerencia
Personalizar métricas: puede agregar o modificar patrones consultando los ejemplos de Prometheus JMX Exporter y la documentación de Kafka MBean.
Aplique el ConfigMap:
$kubectl apply -f kafka-jmx-config.yamlConfigurar el StatefulSet de Kafka para JMX Exporter
Aplique un parche a su StatefulSet de Kafka para agregar un contenedor init que descargue el JAR de Prometheus JMX Exporter, luego adjúntelo a la JVM del broker de Kafka a través de KAFKA_OPTS.
Paso 1. Agregue las siguientes secciones a su manifiesto de StatefulSet de Kafka existente:
spec: template: spec: # 1. Init container: downloads JMX Exporter JAR before Kafka starts initContainers: - name: download-jmx-exporter image: busybox:latest command: - sh - -c - | # Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases. JMX_EXPORTER_VERSION="1.5.0" wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \ "https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar" volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach JMX Exporter as Java agent on port 9404 env: - name: KAFKA_OPTS value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml" # 3. Expose port 9404 for Prometheus scraping ports: - name: jmx-metrics containerPort: 9404 protocol: TCP volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx - name: jmx-config mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config volumes: - name: prometheus-jmx emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2Paso 2. Aplique su StatefulSet actualizado y espere a que los pod roten:
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespacePaso 3. Una vez que se complete el despliegue, verifique que las métricas estén expuestas en cada pod del broker:
$# Replace kafka-0 and kafka with your pod name and namespace$kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20Importante
Clústeres de múltiples brokers: la configuración del contenedor init y de KAFKA_OPTS se aplica a todos los pods en el StatefulSet automáticamente. Verifique que cada pod del broker exponga métricas después del despliegue.
Desplegar OpenTelemetry Collector
Despliegue el OpenTelemetry Collector en su clúster. El recolector recopila datos de los pods del broker de Kafka utilizando objetivos DNS estáticos y escucha en el puerto 4317 los datos OTLP de las aplicaciones instrumentadas.
El método de instalación con Helm es el enfoque recomendado para desplegar OpenTelemetry Collector en Kubernetes.
Paso 1. Crear secreto de credenciales de New Relic
Sugerencia
Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.
Paso 2. Crear values.yaml con la configuración del recolector
Tanto los recolectores NRDOT como los de OpenTelemetry utilizan una configuración idéntica. Seleccione su imagen de recolector preferida:
Para opciones de configuración avanzadas, consulte estas páginas de documentación del receptor:
Documentación del receptor de métricas de Kafka
Paso 3. Instalar OpenTelemetry Collector con Helm
bash$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \>--install \>--namespace newrelic \>--create-namespace \>-f values.yamlPaso 4. Verifique el despliegue:
bash$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$$# View logs to verify metrics collection$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50Debería ver logs que indiquen un scraping exitoso de los pods de los brokers de Kafka en el puerto
9404.
El método de instalación de manifiesto proporciona control directo sobre los recursos de Kubernetes sin usar Helm.
Paso 1. Crear secreto de credenciales de New Relic
Sugerencia
Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.
Paso 2. Crear archivos de manifiesto
Tanto los recolectores NRDOT como los de OpenTelemetry utilizan una configuración idéntica. Solo la imagen del contenedor difiere.
Para opciones de configuración avanzadas, consulte estas páginas de documentación del receptor:
Documentación del receptor de métricas de Kafka
Paso 3. Despliegue los manifiestos
bash$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$$# Apply ConfigMap$kubectl apply -f collector-configmap.yaml$$# Apply Deployment (includes ServiceAccount)$kubectl apply -f collector-deployment.yamlPaso 4. Verifique el despliegue:
bash$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$$# View logs to verify metrics collection$kubectl logs -n newrelic -l app=otel-collector --tail=50Debería ver logs que indiquen un scraping exitoso de los pods de los brokers de Kafka en el puerto
9404.
(Opcional) Instrumentar aplicaciones productoras o consumidoras
Importante
Soporte de lenguajes: las aplicaciones Java admiten la instrumentación del cliente Kafka lista para usar mediante el agente de Java de OpenTelemetry.
Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka, use el agente de Java de OpenTelemetry con un contenedor init:
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-java-agent image: busybox:latest command: - sh - -c - | wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar -Dotel.service.name=my-kafka-app -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
volumes: - name: otel-auto-instrumentation emptyDir: {}Parámetro de configuración
La siguiente tabla describe los parámetros de configuración clave:
| Parámetro | Descripción |
|---|---|
service.name | Reemplace my-kafka-app con un nombre único para su aplicación productora o consumidora |
kafka.cluster.name | Reemplace my-kafka-cluster con el mismo nombre de clúster utilizado en la configuración de su colector |
otlp.endpoint | El extremo http://otel-collector.newrelic.svc.cluster.local:4317 asume que el recolector está desplegado en el namespace newrelic como otel-collector |
El agente de Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando latencias de solicitudes, métricas de rendimiento, tasas de errores y rastreo distribuido. Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.
(Opcional) Enviar logs del broker de Kafka
Para recopilar logs del broker de Kafka y enviarlos a New Relic, agregue un receptor filelog a la configuración de su recolector.
Encuentra tus datos
Después de unos minutos, sus datos de Kafka deberían aparecer en New Relic. Consulta Encuentra tus datos para obtener instrucciones detalladas sobre cómo explorar tus datos de Kafka en las diferentes vistas de la UI de New Relic.
La siguiente tabla resume dónde se almacena cada tipo de señal. Reemplace my-kafka-cluster por su valor KAFKA_CLUSTER_NAME en todas las consultas a continuación:
| Señal | Tipo de evento | Qué incluye |
|---|---|---|
| Métrica | Metric | Métricas de broker, topic, partición, grupo de consumidores y JVM |
| Logs | Log | Logs de aplicaciones de productor y consumidor (mediante el agente de Java de OTel) y logs del broker recopilados mediante el agente de Java |
| Traza | Span | Spans de productor y consumidor, incluidas las operaciones publish y receive por mensaje en todos los temas |
Métrica
Las métricas de broker, tema, partición, grupo de consumidores y JVM se almacenan en el tipo de evento Metric:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoLogs
Los logs de las aplicaciones de productor y consumidor instrumentadas con el agente de Java de OpenTelemetry, y los logs del broker recopilados a través del agente de Java en el broker, se almacenan en el tipo de evento Log:
FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoTraza
Los spans de productor y consumidor, incluidas las operaciones publish y receive por mensaje en todos los temas, se almacenan en el tipo de evento Span:
FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoEjemplo
Un ejemplo funcional completo con manifiestos de StatefulSet de Kafka, valores de Helm, configuración de OTel Collector y aplicaciones de productor/consumidor de muestra está disponible en el respositorio de New Relic OpenTelemetry Examples.
Resolución de problemas
Próximos pasos
- Explora las métricas de Kafka - Consulta la referencia completa de métricas
- Crear dashboards personalizados - Construir visualizaciones para sus datos de Kafka
- Configurar alertas - Monitorea métricas críticas como el retraso del consumidor y las particiones subreplicadas
Recursos relacionados
- Kafka autohospedado - Monitoreo de Kafka para entornos autohospedados (no Kubernetes)
- Kubernetes Strimzi - monitoreo de Kafka para Kafka administrado por Strimzi en Kubernetes
- Agente de Java de OpenTelemetry - Documentación oficial del agente de Java de OTel
- Prometheus JMX Exporter - agente de Java que expone métricas de JMX en formato Prometheus
- Receptor de Prometheus - receptor de OTel Collector para recopilar extremos de métricas de Prometheus
- receptor kafkametrics - Documentación del receptor de métricas de temas y retraso del consumidor