• /
  • EnglishEspañolFrançais日本語한국어Português
  • Inicia sesiónComenzar ahora

Te ofrecemos esta traducción automática para facilitar la lectura.

En caso de que haya discrepancias entre la versión en inglés y la versión traducida, se entiende que prevalece la versión en inglés. Visita esta página para obtener más información.

Crea una propuesta

Monitorear Kafka autogestionado en Kubernetes con OpenTelemetry

Supervise su clúster de Apache Kafka autogestionado que se ejecuta en Kubernetes desplegando el OpenTelemetry Collector para recopilar y reenviar métricas a New Relic.

Arquitectura

New Relic admite dos enfoques para el monitoreo de Kubernetes Kafka autoadministrado: el agente de Java de OpenTelemetry o el Prometheus JMX Exporter. Los siguientes diagramas ilustran el flujo de datos para cada enfoque.

Kubernetes self-managed Kafka monitoring architecture

Pasos de instalación

Siga estos pasos para configurar un monitoreo integral de Kafka instalando el agente de Java de OpenTelemetry en sus brokers y desplegando un recolector para recopilar y enviar métricas y logs a New Relic.

Antes de que empieces

Asegúrese de tener:

  • Una cuenta de New Relic con un
  • Clúster de Kubernetes con acceso kubectl
  • Kafka desplegado como un StatefulSet
  • Capacidad para modificar y redesplegar el StatefulSet de Kafka

Desplegar OpenTelemetry Collector

Despliegue el recolector OpenTelemetry en su clúster. Este paso también crea el ConfigMap kafka-jmx-config que define qué métricas JMX recopila el agente de Java de cada pod de broker. El recolector debe estar en ejecución antes de reiniciar los brokers de Kafka en el siguiente paso.

Paso 1. Crear secreto de credenciales de New Relic

Sugerencia

Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.

Paso 2. Crear values.yaml con la configuración del recolector

Tanto los recolectores NRDOT como los de OpenTelemetry utilizan una configuración idéntica. Seleccione su imagen de recolector preferida:

Para opciones de configuración avanzadas, consulte:

  • Documentación del receptor OTLP

  • Documentación del receptor de métricas de Kafka

    Paso 3. Instalar OpenTelemetry Collector con Helm

    bash
    $
    helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
    $
    helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \
    >
    --install \
    >
    --namespace newrelic \
    >
    --create-namespace \
    >
    -f values.yaml

    Paso 4. Verifique el despliegue

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector
    $
    $
    # View logs to verify metrics are being received from broker pods
    $
    kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50

Paso 1. Crear secreto de credenciales de New Relic

Sugerencia

Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.

Paso 2. Crear archivos de manifiesto

Tanto los recolectores NRDOT como los de OpenTelemetry utilizan una configuración idéntica. Solo difiere la imagen del contenedor. Ambos también requieren el ConfigMap kafka-jmx-config aplicado a su namespace de Kafka.

Crear kafka-jmx-config.yaml - configuración de métricas JMX para el agente de Java (aplicar a su namespace de Kafka):

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jmx-config
namespace: kafka # TODO: Replace with your Kafka namespace
data:
kafka-jmx-config.yaml: |
---
rules:
# Per-topic custom metrics
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: Number of active controllers in the cluster
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute)
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC

Paso 3. Despliegue los manifiestos

bash
$
# Create namespace if it doesn't exist
$
kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -
$
$
# Apply JMX ConfigMap to the Kafka namespace
$
kubectl apply -f kafka-jmx-config.yaml
$
$
# Apply collector ConfigMap
$
kubectl apply -f collector-configmap.yaml
$
$
# Apply Deployment and Service
$
kubectl apply -f collector-deployment.yaml

Paso 4. Verifique el despliegue

bash
$
# Check pod status
$
kubectl get pods -n newrelic -l app=otel-collector
$
$
# View logs to verify metrics are being received from broker pods
$
kubectl logs -n newrelic -l app=otel-collector --tail=50

Configurar el StatefulSet de Kafka para el agente de Java

Ahora que el recolector se está ejecutando, aplique un parche a su StatefulSet de Kafka para agregar un contenedor de inicio que descargue el JAR del agente de Java de OpenTelemetry, y luego adjúntelo a la JVM del broker de Kafka a través de KAFKA_OPTS.

Agregue las siguientes secciones a su manifiesto de StatefulSet de Kafka existente:

spec:
template:
spec:
# 1. Init container: downloads OTel Java agent JAR before Kafka starts
initContainers:
- name: download-otel-agent
image: busybox:latest
command:
- sh
- -c
- |
wget -O /otel-agent/opentelemetry-javaagent.jar \
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
containers:
- name: kafka # TODO: Replace with your Kafka container name
# 2. Attach OTel Java agent to the Kafka broker JVM
env:
- name: KAFKA_OPTS
value: >-
-javaagent:/otel-agent/opentelemetry-javaagent.jar
-Dotel.jmx.enabled=true
-Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.runtime-telemetry.enabled=false
-Dotel.metric.export.interval=30000
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
- name: jmx-config
mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules
volumes:
- name: otel-agent
emptyDir: {}
- name: jmx-config
configMap:
name: kafka-jmx-config # Deployed with the collector in the previous step

Sugerencia

El ConfigMap kafka-jmx-config se desplegó con el recolector en el paso anterior. El valor otel.exporter.otlp.endpoint http://otel-collector.newrelic.svc.cluster.local:4317 asume que el recolector está desplegado en el namespace newrelic con el nombre de servicio otel-collector. Actualícelo para que coincida con el DNS real de su servicio recolector si es diferente.

Aplique su StatefulSet actualizado y espere a que los pods se actualicen:

bash
$
kubectl apply -f kafka-statefulset.yaml
$
kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace

(Opcional) Instrumentar aplicaciones productoras o consumidoras

Importante

Soporte de lenguajes: actualmente, solo se admiten aplicaciones Java para la instrumentación de clientes Kafka mediante el agente de Java de OpenTelemetry.

Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka que se ejecutan en Kubernetes, agregue el agente de Java de OpenTelemetry a esos pods de aplicación.

Agregue un contenedor init y variables de entorno al despliegue de su aplicación:

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer-app
spec:
template:
spec:
initContainers:
- name: download-otel-agent
image: busybox:latest
command:
- sh
- -c
- wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
containers:
- name: app
image: your-kafka-app:latest
env:
- name: JAVA_TOOL_OPTIONS
value: >-
-javaagent:/otel-agent/opentelemetry-javaagent.jar
-Dotel.service.name=order-process-service
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.traces.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.kafka.experimental-span-attributes=true
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true
-Dotel.instrumentation.kafka.producer-propagation.enabled=true
-Dotel.instrumentation.kafka.enabled=true
-Dotel.instrumentation.runtime-telemetry.enabled=false
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
volumes:
- name: otel-agent
emptyDir: {}

Parámetro de configuración

La siguiente tabla describe los parámetros de configuración clave:

Parámetro

Descripción

order-process-service

Reemplace con un nombre único para su aplicación productora o consumidora

my-kafka-cluster

Reemplace con el mismo nombre de clúster utilizado en la configuración de su broker

otel-collector.newrelic.svc.cluster.local

Reemplace con el nombre DNS real de su servicio recolector (

<service-name>.<namespace>.svc.cluster.local

)

El agente de Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando latencias de solicitudes, métricas de rendimiento, tasas de errores y rastreo distribuido. Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.

Siga estos pasos para configurar un monitoreo integral de Kafka instalando Prometheus JMX Exporter en sus pods de broker y desplegando un recolector para recopilar y enviar métricas a New Relic.

Antes de que empieces

Asegúrese de tener:

  • Una cuenta de New Relic con un
  • Clúster de Kubernetes con acceso kubectl
  • Kafka desplegado como un StatefulSet con un servicio headless (para nombres DNS de pod estables)
  • Capacidad para modificar y redesplegar el StatefulSet de Kafka

Crear ConfigMap de métricas JMX

Cree un ConfigMap que contenga la configuración de JMX Exporter que defina qué métricas de Kafka recolectar. Este ConfigMap se montará en cada pod del broker de Kafka.

Guardar como kafka-jmx-config.yaml. Aplíquelo al namespace donde está desplegado Kafka:

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jmx-metrics
namespace: kafka # TODO: Replace with your Kafka namespace
data:
kafka-metrics-config.yml: |
startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

Sugerencia

Personalizar métricas: puede agregar o modificar patrones consultando los ejemplos de Prometheus JMX Exporter y la documentación de Kafka MBean.

Aplique el ConfigMap:

bash
$
kubectl apply -f kafka-jmx-config.yaml

Configurar el StatefulSet de Kafka para JMX Exporter

Aplique un parche a su StatefulSet de Kafka para agregar un contenedor init que descargue el JAR de Prometheus JMX Exporter, luego adjúntelo a la JVM del broker de Kafka a través de KAFKA_OPTS.

Paso 1. Agregue las siguientes secciones a su manifiesto de StatefulSet de Kafka existente:

spec:
template:
spec:
# 1. Init container: downloads JMX Exporter JAR before Kafka starts
initContainers:
- name: download-jmx-exporter
image: busybox:latest
command:
- sh
- -c
- |
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
JMX_EXPORTER_VERSION="1.5.0"
wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"
volumeMounts:
- name: prometheus-jmx
mountPath: /prometheus-jmx
containers:
- name: kafka # TODO: Replace with your Kafka container name
# 2. Attach JMX Exporter as Java agent on port 9404
env:
- name: KAFKA_OPTS
value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml"
# 3. Expose port 9404 for Prometheus scraping
ports:
- name: jmx-metrics
containerPort: 9404
protocol: TCP
volumeMounts:
- name: prometheus-jmx
mountPath: /prometheus-jmx
- name: jmx-config
mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config
volumes:
- name: prometheus-jmx
emptyDir: {}
- name: jmx-config
configMap:
name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2

Paso 2. Aplique su StatefulSet actualizado y espere a que los pod roten:

bash
$
kubectl apply -f kafka-statefulset.yaml
$
kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace

Paso 3. Una vez que se complete el despliegue, verifique que las métricas estén expuestas en cada pod del broker:

bash
$
# Replace kafka-0 and kafka with your pod name and namespace
$
kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20

Importante

Clústeres de múltiples brokers: la configuración del contenedor init y de KAFKA_OPTS se aplica a todos los pods en el StatefulSet automáticamente. Verifique que cada pod del broker exponga métricas después del despliegue.

Desplegar OpenTelemetry Collector

Despliegue el OpenTelemetry Collector en su clúster. El recolector recopila datos de los pods del broker de Kafka utilizando objetivos DNS estáticos y escucha en el puerto 4317 los datos OTLP de las aplicaciones instrumentadas.

El método de instalación con Helm es el enfoque recomendado para desplegar OpenTelemetry Collector en Kubernetes.

Paso 1. Crear secreto de credenciales de New Relic

Sugerencia

Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.

Paso 2. Crear values.yaml con la configuración del recolector

Tanto los recolectores NRDOT como los de OpenTelemetry utilizan una configuración idéntica. Seleccione su imagen de recolector preferida:

Para opciones de configuración avanzadas, consulte estas páginas de documentación del receptor:

  • Documentación del receptor Prometheus

  • Documentación del receptor de métricas de Kafka

    Paso 3. Instalar OpenTelemetry Collector con Helm

    bash
    $
    helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
    $
    helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \
    >
    --install \
    >
    --namespace newrelic \
    >
    --create-namespace \
    >
    -f values.yaml

    Paso 4. Verifique el despliegue:

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector
    $
    $
    # View logs to verify metrics collection
    $
    kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50

    Debería ver logs que indiquen un scraping exitoso de los pods de los brokers de Kafka en el puerto 9404.

El método de instalación de manifiesto proporciona control directo sobre los recursos de Kubernetes sin usar Helm.

Paso 1. Crear secreto de credenciales de New Relic

Sugerencia

Para otras configuraciones de endpoints, consulte Configure su endpoint OTLP.

Paso 2. Crear archivos de manifiesto

Tanto los recolectores NRDOT como los de OpenTelemetry utilizan una configuración idéntica. Solo la imagen del contenedor difiere.

Para opciones de configuración avanzadas, consulte estas páginas de documentación del receptor:

  • Documentación del receptor Prometheus

  • Documentación del receptor de métricas de Kafka

    Paso 3. Despliegue los manifiestos

    bash
    $
    # Create namespace if it doesn't exist
    $
    kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -
    $
    $
    # Apply ConfigMap
    $
    kubectl apply -f collector-configmap.yaml
    $
    $
    # Apply Deployment (includes ServiceAccount)
    $
    kubectl apply -f collector-deployment.yaml

    Paso 4. Verifique el despliegue:

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app=otel-collector
    $
    $
    # View logs to verify metrics collection
    $
    kubectl logs -n newrelic -l app=otel-collector --tail=50

    Debería ver logs que indiquen un scraping exitoso de los pods de los brokers de Kafka en el puerto 9404.

(Opcional) Instrumentar aplicaciones productoras o consumidoras

Importante

Soporte de lenguajes: las aplicaciones Java admiten la instrumentación del cliente Kafka lista para usar mediante el agente de Java de OpenTelemetry.

Para recopilar telemetría a nivel de aplicación de sus aplicaciones de productor y consumidor de Kafka, use el agente de Java de OpenTelemetry con un contenedor init:

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer-app
spec:
template:
spec:
initContainers:
- name: download-java-agent
image: busybox:latest
command:
- sh
- -c
- |
wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-auto-instrumentation
mountPath: /otel-auto-instrumentation
containers:
- name: app
image: your-kafka-app:latest
env:
- name: JAVA_TOOL_OPTIONS
value: >-
-javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar
-Dotel.service.name=my-kafka-app
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.traces.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.kafka.experimental-span-attributes=true
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true
-Dotel.instrumentation.kafka.producer-propagation.enabled=true
-Dotel.instrumentation.kafka.enabled=true
-Dotel.instrumentation.runtime-telemetry.enabled=false
volumeMounts:
- name: otel-auto-instrumentation
mountPath: /otel-auto-instrumentation
volumes:
- name: otel-auto-instrumentation
emptyDir: {}

Parámetro de configuración

La siguiente tabla describe los parámetros de configuración clave:

ParámetroDescripción
service.nameReemplace my-kafka-app con un nombre único para su aplicación productora o consumidora
kafka.cluster.nameReemplace my-kafka-cluster con el mismo nombre de clúster utilizado en la configuración de su colector
otlp.endpointEl extremo http://otel-collector.newrelic.svc.cluster.local:4317 asume que el recolector está desplegado en el namespace newrelic como otel-collector

El agente de Java proporciona instrumentación de Kafka lista para usar sin cambios de código, capturando latencias de solicitudes, métricas de rendimiento, tasas de errores y rastreo distribuido. Para una configuración avanzada, consulte la documentación de instrumentación de Kafka.

(Opcional) Enviar logs del broker de Kafka

Para recopilar logs del broker de Kafka y enviarlos a New Relic, agregue un receptor filelog a la configuración de su recolector.

Encuentra tus datos

Después de unos minutos, sus datos de Kafka deberían aparecer en New Relic. Consulta Encuentra tus datos para obtener instrucciones detalladas sobre cómo explorar tus datos de Kafka en las diferentes vistas de la UI de New Relic.

La siguiente tabla resume dónde se almacena cada tipo de señal. Reemplace my-kafka-cluster por su valor KAFKA_CLUSTER_NAME en todas las consultas a continuación:

SeñalTipo de eventoQué incluye
MétricaMetricMétricas de broker, topic, partición, grupo de consumidores y JVM
LogsLogLogs de aplicaciones de productor y consumidor (mediante el agente de Java de OTel) y logs del broker recopilados mediante el agente de Java
TrazaSpanSpans de productor y consumidor, incluidas las operaciones publish y receive por mensaje en todos los temas

Métrica

Las métricas de broker, tema, partición, grupo de consumidores y JVM se almacenan en el tipo de evento Metric:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Logs

Los logs de las aplicaciones de productor y consumidor instrumentadas con el agente de Java de OpenTelemetry, y los logs del broker recopilados a través del agente de Java en el broker, se almacenan en el tipo de evento Log:

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Traza

Los spans de productor y consumidor, incluidas las operaciones publish y receive por mensaje en todos los temas, se almacenan en el tipo de evento Span:

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Ejemplo

Un ejemplo funcional completo con manifiestos de StatefulSet de Kafka, valores de Helm, configuración de OTel Collector y aplicaciones de productor/consumidor de muestra está disponible en el respositorio de New Relic OpenTelemetry Examples.

Resolución de problemas

Próximos pasos

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.