• /
  • EnglishEspañolFrançais日本語한국어Português
  • EntrarComeçar agora

Esta tradução de máquina é fornecida para sua comodidade.

Caso haja alguma divergência entre a versão em inglês e a traduzida, a versão em inglês prevalece. Acesse esta página para mais informações.

Criar um problema

Monitore o Kafka autogerenciado no Kubernetes com OpenTelemetry

Monitore seu cluster Apache Kafka autogerenciado em execução no Kubernetes implantando o OpenTelemetry Collector para coletar e encaminhar métricas para o New Relic.

Arquitetura

O New Relic oferece suporte a duas abordagens para o monitoramento do Kubernetes Kafka autogerenciado: o agente Java do OpenTelemetry ou o Prometheus JMX Exporter. Os diagramas a seguir ilustram o fluxo de dados para cada abordagem.

Kubernetes self-managed Kafka monitoring architecture

Etapas de instalação

Siga estas etapas para configurar o monitoramento abrangente do Kafka instalando o agente Java do OpenTelemetry em seus brokers e implantando um coletor para reunir e enviar métricas e logs para o New Relic.

Antes de você começar

Certifique-se de ter:

  • Uma conta New Relic com uma
  • Cluster do Kubernetes com acesso kubectl
  • Kafka implantado como um StatefulSet
  • Capacidade de modificar e reimplantar o StatefulSet do Kafka

Implantar o OpenTelemetry Collector

Implante o coletor OpenTelemetry em seu cluster. Esta etapa também cria o ConfigMap kafka-jmx-config que define quais métricas JMX o agente Java coleta de cada pod de broker. O coletor deve estar em execução antes de reiniciar os brokers do Kafka na próxima etapa.

Passo 1. Criar segredo de credenciais da New Relic

Dica

Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.

Passo 2. Crie o values.yaml com a configuração do coletor

Tanto o NRDOT quanto os coletores OpenTelemetry usam configuração idêntica. Escolha sua imagem de coletor preferida:

Para opções de configuração avançadas, veja:

  • Documentação do receiver OTLP

  • Documentação do receptor de métricas do Kafka

    Etapa 3. Instalar o OpenTelemetry Collector com o Helm

    bash
    $
    helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
    $
    helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \
    >
    --install \
    >
    --namespace newrelic \
    >
    --create-namespace \
    >
    -f values.yaml

    Etapa 4. Verifique a implantação

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector
    $
    $
    # View logs to verify metrics are being received from broker pods
    $
    kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50

Passo 1. Criar segredo de credenciais da New Relic

Dica

Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.

Passo 2. Crie arquivos de manifesto

Tanto o NRDOT quanto os coletores OpenTelemetry usam configuração idêntica. Apenas a imagem do contêiner difere. Ambos também requerem o ConfigMap kafka-jmx-config aplicado ao seu namespace do Kafka.

Criar kafka-jmx-config.yaml - configuração de métricas JMX para o agente Java (aplicar ao seu namespace Kafka):

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jmx-config
namespace: kafka # TODO: Replace with your Kafka namespace
data:
kafka-jmx-config.yaml: |
---
rules:
# Per-topic custom metrics
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: Number of active controllers in the cluster
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute)
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC

Passo 3. Implante os manifestos

bash
$
# Create namespace if it doesn't exist
$
kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -
$
$
# Apply JMX ConfigMap to the Kafka namespace
$
kubectl apply -f kafka-jmx-config.yaml
$
$
# Apply collector ConfigMap
$
kubectl apply -f collector-configmap.yaml
$
$
# Apply Deployment and Service
$
kubectl apply -f collector-deployment.yaml

Etapa 4. Verifique a implantação

bash
$
# Check pod status
$
kubectl get pods -n newrelic -l app=otel-collector
$
$
# View logs to verify metrics are being received from broker pods
$
kubectl logs -n newrelic -l app=otel-collector --tail=50

Configurar o StatefulSet do Kafka para o agente Java

Agora que o coletor está em execução, aplique um patch no seu StatefulSet do Kafka para adicionar um contêiner de inicialização que baixa o JAR do agente Java do OpenTelemetry e, em seguida, anexe-o à JVM do broker do Kafka via KAFKA_OPTS.

Adicione as seguintes seções ao seu manifesto do StatefulSet do Kafka existente:

spec:
template:
spec:
# 1. Init container: downloads OTel Java agent JAR before Kafka starts
initContainers:
- name: download-otel-agent
image: busybox:latest
command:
- sh
- -c
- |
wget -O /otel-agent/opentelemetry-javaagent.jar \
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
containers:
- name: kafka # TODO: Replace with your Kafka container name
# 2. Attach OTel Java agent to the Kafka broker JVM
env:
- name: KAFKA_OPTS
value: >-
-javaagent:/otel-agent/opentelemetry-javaagent.jar
-Dotel.jmx.enabled=true
-Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.runtime-telemetry.enabled=false
-Dotel.metric.export.interval=30000
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
- name: jmx-config
mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules
volumes:
- name: otel-agent
emptyDir: {}
- name: jmx-config
configMap:
name: kafka-jmx-config # Deployed with the collector in the previous step

Dica

O ConfigMap kafka-jmx-config foi implantado com o coletor na etapa anterior. O valor otel.exporter.otlp.endpoint http://otel-collector.newrelic.svc.cluster.local:4317 assume que o coletor está implantado no namespace newrelic com o nome de serviço otel-collector. Atualize-o para corresponder ao DNS real do serviço do coletor, caso seja diferente.

Aplique seu StatefulSet atualizado e aguarde os pods serem atualizados:

bash
$
kubectl apply -f kafka-statefulset.yaml
$
kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace

(Opcional) Instrumente aplicações produtoras ou consumidoras

Importante

Suporte a linguagens: atualmente, apenas aplicativos Java são suportados para a instrumentação do cliente Kafka usando o agente Java do OpenTelemetry.

Para coletar telemetria em nível de aplicativo de seus aplicativos produtores e consumidores Kafka em execução no Kubernetes, adicione o agente Java do OpenTelemetry a esses pods de aplicativo.

Adicione um contêiner init e variáveis de ambiente à implantação do seu aplicativo:

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer-app
spec:
template:
spec:
initContainers:
- name: download-otel-agent
image: busybox:latest
command:
- sh
- -c
- wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
containers:
- name: app
image: your-kafka-app:latest
env:
- name: JAVA_TOOL_OPTIONS
value: >-
-javaagent:/otel-agent/opentelemetry-javaagent.jar
-Dotel.service.name=order-process-service
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.traces.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.kafka.experimental-span-attributes=true
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true
-Dotel.instrumentation.kafka.producer-propagation.enabled=true
-Dotel.instrumentation.kafka.enabled=true
-Dotel.instrumentation.runtime-telemetry.enabled=false
volumeMounts:
- name: otel-agent
mountPath: /otel-agent
volumes:
- name: otel-agent
emptyDir: {}

Parâmetro de configuração

A tabela a seguir descreve os principais parâmetros de configuração:

Parâmetro

Descrição

order-process-service

Substitua por um nome exclusivo para seu aplicativo produtor ou consumidor

my-kafka-cluster

Substitua pelo mesmo nome do cluster usado na configuração do seu broker

otel-collector.newrelic.svc.cluster.local

Substitua pelo nome DNS real do seu serviço de coletor (

<service-name>.<namespace>.svc.cluster.local

)

O agente Java fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando latência de solicitações, métrica de taxas de transferência, taxa de erros e distributed trace. Para configuração avançada, consulte a documentação de instrumentação do Kafka.

Siga estas etapas para configurar o monitoramento abrangente do Kafka instalando o Prometheus JMX Exporter em seus pods de broker e implantando um coletor para reunir e enviar métricas para o New Relic.

Antes de você começar

Certifique-se de ter:

  • Uma conta New Relic com uma
  • Cluster do Kubernetes com acesso kubectl
  • Kafka implantado como um StatefulSet com um serviço headless (para nomes DNS de pod estáveis)
  • Capacidade de modificar e reimplantar o StatefulSet do Kafka

Criar ConfigMap de métricas JMX

Crie um ConfigMap contendo a configuração do JMX Exporter que define quais métricas do Kafka coletar. Este ConfigMap será montado em cada pod de broker do Kafka.

Salvar como kafka-jmx-config.yaml. Aplique-o ao namespace onde o Kafka está implantado:

apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jmx-metrics
namespace: kafka # TODO: Replace with your Kafka namespace
data:
kafka-metrics-config.yml: |
startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

Dica

Personalizar métricas: você pode adicionar ou modificar padrões consultando os exemplos do Prometheus JMX Exporter e a documentação do Kafka MBean.

Aplique o ConfigMap:

bash
$
kubectl apply -f kafka-jmx-config.yaml

Configurar o StatefulSet do Kafka para o JMX Exporter

Aplique um patch no seu StatefulSet do Kafka para adicionar um contêiner de inicialização que baixa o JAR do Prometheus JMX Exporter e, em seguida, anexe-o à JVM do broker do Kafka via KAFKA_OPTS.

Etapa 1. Adicione as seguintes seções ao seu manifesto Kafka StatefulSet existente:

spec:
template:
spec:
# 1. Init container: downloads JMX Exporter JAR before Kafka starts
initContainers:
- name: download-jmx-exporter
image: busybox:latest
command:
- sh
- -c
- |
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
JMX_EXPORTER_VERSION="1.5.0"
wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"
volumeMounts:
- name: prometheus-jmx
mountPath: /prometheus-jmx
containers:
- name: kafka # TODO: Replace with your Kafka container name
# 2. Attach JMX Exporter as Java agent on port 9404
env:
- name: KAFKA_OPTS
value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml"
# 3. Expose port 9404 for Prometheus scraping
ports:
- name: jmx-metrics
containerPort: 9404
protocol: TCP
volumeMounts:
- name: prometheus-jmx
mountPath: /prometheus-jmx
- name: jmx-config
mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config
volumes:
- name: prometheus-jmx
emptyDir: {}
- name: jmx-config
configMap:
name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2

Passo 2. Aplique seu StatefulSet atualizado e aguarde a atualização dos pod:

bash
$
kubectl apply -f kafka-statefulset.yaml
$
kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace

Passo 3. Após a conclusão do rollout, verifique se as métricas estão expostas em cada pod do broker:

bash
$
# Replace kafka-0 and kafka with your pod name and namespace
$
kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20

Importante

Clusters multi-broker: a configuração do contêiner init e KAFKA_OPTS se aplica a todos os pods no StatefulSet automaticamente. Verifique se cada pod de broker expõe métricas após o rollout.

Implantar o OpenTelemetry Collector

Implante o OpenTelemetry Collector em seu cluster. O coletor faz a raspagem dos pods do broker Kafka usando destinos DNS estáticos e escuta na porta 4317 por dados OTLP de aplicativos instrumentados.

O método de instalação via Helm é a abordagem recomendada para implantar o OpenTelemetry Collector no Kubernetes.

Passo 1. Criar segredo de credenciais da New Relic

Dica

Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.

Passo 2. Crie o values.yaml com a configuração do coletor

Tanto o NRDOT quanto os coletores OpenTelemetry usam configuração idêntica. Escolha sua imagem de coletor preferida:

Para opções de configuração avançadas, consulte estas páginas de documentação do receptor:

  • Documentação do receptor Prometheus

  • Documentação do receptor de métricas do Kafka

    Etapa 3. Instalar o OpenTelemetry Collector com o Helm

    bash
    $
    helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts
    $
    helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \
    >
    --install \
    >
    --namespace newrelic \
    >
    --create-namespace \
    >
    -f values.yaml

    Passo 4. Verifique a implantação:

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector
    $
    $
    # View logs to verify metrics collection
    $
    kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50

    Você deve ver logs indicando uma coleta bem-sucedida dos pods de broker Kafka na porta 9404.

O método de instalação via manifesto oferece controle direto sobre os recursos do Kubernetes sem usar o Helm.

Passo 1. Criar segredo de credenciais da New Relic

Dica

Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.

Passo 2. Crie arquivos de manifesto

Tanto o NRDOT quanto os coletores OpenTelemetry usam configuração idêntica. Apenas a imagem do contêiner difere.

Para opções de configuração avançadas, consulte estas páginas de documentação do receptor:

  • Documentação do receptor Prometheus

  • Documentação do receptor de métricas do Kafka

    Passo 3. Implante os manifestos

    bash
    $
    # Create namespace if it doesn't exist
    $
    kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -
    $
    $
    # Apply ConfigMap
    $
    kubectl apply -f collector-configmap.yaml
    $
    $
    # Apply Deployment (includes ServiceAccount)
    $
    kubectl apply -f collector-deployment.yaml

    Passo 4. Verifique a implantação:

    bash
    $
    # Check pod status
    $
    kubectl get pods -n newrelic -l app=otel-collector
    $
    $
    # View logs to verify metrics collection
    $
    kubectl logs -n newrelic -l app=otel-collector --tail=50

    Você deve ver logs indicando uma coleta bem-sucedida dos pods de broker Kafka na porta 9404.

(Opcional) Instrumente aplicações produtoras ou consumidoras

Importante

Suporte a linguagens: aplicativos Java suportam instrumentação de cliente Kafka pronta para uso utilizando o agente Java do OpenTelemetry.

Para coletar telemetria em nível de aplicativo de seus aplicativos produtores e consumidores Kafka, use o agente Java do OpenTelemetry com um contêiner init:

apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-producer-app
spec:
template:
spec:
initContainers:
- name: download-java-agent
image: busybox:latest
command:
- sh
- -c
- |
wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar
volumeMounts:
- name: otel-auto-instrumentation
mountPath: /otel-auto-instrumentation
containers:
- name: app
image: your-kafka-app:latest
env:
- name: JAVA_TOOL_OPTIONS
value: >-
-javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar
-Dotel.service.name=my-kafka-app
-Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster
-Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317
-Dotel.exporter.otlp.protocol=grpc
-Dotel.metrics.exporter=otlp
-Dotel.traces.exporter=otlp
-Dotel.logs.exporter=otlp
-Dotel.instrumentation.kafka.experimental-span-attributes=true
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true
-Dotel.instrumentation.kafka.producer-propagation.enabled=true
-Dotel.instrumentation.kafka.enabled=true
-Dotel.instrumentation.runtime-telemetry.enabled=false
volumeMounts:
- name: otel-auto-instrumentation
mountPath: /otel-auto-instrumentation
volumes:
- name: otel-auto-instrumentation
emptyDir: {}

Parâmetro de configuração

A tabela a seguir descreve os principais parâmetros de configuração:

ParâmetroDescrição
service.nameSubstitua my-kafka-app por um nome exclusivo para sua aplicação produtora ou consumidora
kafka.cluster.nameSubstitua my-kafka-cluster pelo mesmo nome do cluster usado na configuração do seu coletor
otlp.endpointO endpoint http://otel-collector.newrelic.svc.cluster.local:4317 pressupõe que o coletor esteja implantado no namespace newrelic como otel-collector

O agente Java fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando latência de solicitações, métrica de taxas de transferência, taxa de erros e distributed trace. Para configuração avançada, consulte a documentação de instrumentação do Kafka.

(Opcional) Encaminhar logs do broker Kafka

Para coletar logs do broker Kafka e enviá-los para o New Relic, adicione um receiver filelog à configuração do seu coletor.

Encontre seus dados

Após alguns minutos, seus dados do Kafka devem aparecer no New Relic. Consulte Encontre seus dados para obter instruções detalhadas sobre como explorar seus dados do Kafka em diferentes visualizações na interface do New Relic.

A tabela a seguir resume onde cada tipo de sinal é armazenado. Substitua my-kafka-cluster pelo seu valor de KAFKA_CLUSTER_NAME em todas as consultas abaixo:

SinalTipo de eventoO que está incluído
MétricaMetricMétricas de broker, tópico, partição, grupo de consumidores e JVM
RegistroLogLogs de aplicativos de produtor e consumidor (via agente Java OTel) e logs do broker coletados via agente Java
TracesSpanSpans de produtor e consumidor, incluindo operações publish e receive por mensagem em tópicos

Métrica

As métricas de broker, tópico, partição, grupo de consumidores e JVM são armazenadas no tipo de evento Metric:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Registro

Logs de aplicativos produtores e consumidores instrumentados com o agente Java do OpenTelemetry, e logs do broker coletados pelo agente Java no broker, são armazenados no tipo de evento Log:

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Traces

Os spans de produtor e consumidor, incluindo operações publish e receive por mensagem entre tópicos, são armazenados no tipo de evento Span:

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Exemplo

Um exemplo completo e funcional com manifestos Kafka StatefulSet, valores do Helm, configuração do OTel Collector e aplicativos de exemplo de produtor/consumidor está disponível no repositório New Relic OpenTelemetry Examples.

Resolução de problemas

Próximos passos

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.