Monitore seu cluster Apache Kafka autogerenciado em execução no Kubernetes implantando o OpenTelemetry Collector para coletar e encaminhar métricas para o New Relic.
Arquitetura
O New Relic oferece suporte a duas abordagens para o monitoramento do Kubernetes Kafka autogerenciado: o agente Java do OpenTelemetry ou o Prometheus JMX Exporter. Os diagramas a seguir ilustram o fluxo de dados para cada abordagem.

Etapas de instalação
Siga estas etapas para configurar o monitoramento abrangente do Kafka instalando o agente Java do OpenTelemetry em seus brokers e implantando um coletor para reunir e enviar métricas e logs para o New Relic.
Antes de você começar
Certifique-se de ter:
- Uma conta New Relic com uma
- Cluster do Kubernetes com acesso
kubectl - Kafka implantado como um StatefulSet
- Capacidade de modificar e reimplantar o StatefulSet do Kafka
Implantar o OpenTelemetry Collector
Implante o coletor OpenTelemetry em seu cluster. Esta etapa também cria o ConfigMap kafka-jmx-config que define quais métricas JMX o agente Java coleta de cada pod de broker. O coletor deve estar em execução antes de reiniciar os brokers do Kafka na próxima etapa.
Passo 1. Criar segredo de credenciais da New Relic
Dica
Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.
Passo 2. Crie o values.yaml com a configuração do coletor
Tanto o NRDOT quanto os coletores OpenTelemetry usam configuração idêntica. Escolha sua imagem de coletor preferida:
Para opções de configuração avançadas, veja:
Documentação do receptor de métricas do Kafka
Etapa 3. Instalar o OpenTelemetry Collector com o Helm
bash$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \>--install \>--namespace newrelic \>--create-namespace \>-f values.yamlEtapa 4. Verifique a implantação
bash$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50
Passo 1. Criar segredo de credenciais da New Relic
Dica
Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.
Passo 2. Crie arquivos de manifesto
Tanto o NRDOT quanto os coletores OpenTelemetry usam configuração idêntica. Apenas a imagem do contêiner difere. Ambos também requerem o ConfigMap kafka-jmx-config aplicado ao seu namespace do Kafka.
Criar kafka-jmx-config.yaml - configuração de métricas JMX para o agente Java (aplicar ao seu namespace Kafka):
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-config namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-jmx-config.yaml: | --- rules: # Per-topic custom metrics - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
- bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
- bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: Number of active controllers in the cluster unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentile
- bean: java.lang:type=GarbageCollector,name=* mapping: CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: Committed heap memory type: gauge
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GCPasso 3. Implante os manifestos
$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$
$# Apply JMX ConfigMap to the Kafka namespace$kubectl apply -f kafka-jmx-config.yaml$
$# Apply collector ConfigMap$kubectl apply -f collector-configmap.yaml$
$# Apply Deployment and Service$kubectl apply -f collector-deployment.yamlEtapa 4. Verifique a implantação
$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$
$# View logs to verify metrics are being received from broker pods$kubectl logs -n newrelic -l app=otel-collector --tail=50Configurar o StatefulSet do Kafka para o agente Java
Agora que o coletor está em execução, aplique um patch no seu StatefulSet do Kafka para adicionar um contêiner de inicialização que baixa o JAR do agente Java do OpenTelemetry e, em seguida, anexe-o à JVM do broker do Kafka via KAFKA_OPTS.
Adicione as seguintes seções ao seu manifesto do StatefulSet do Kafka existente:
spec: template: spec: # 1. Init container: downloads OTel Java agent JAR before Kafka starts initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - | wget -O /otel-agent/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach OTel Java agent to the Kafka broker JVM env: - name: KAFKA_OPTS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.jmx.enabled=true -Dotel.jmx.config=/jmx-config/kafka-jmx-config.yaml -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.runtime-telemetry.enabled=false -Dotel.metric.export.interval=30000 volumeMounts: - name: otel-agent mountPath: /otel-agent - name: jmx-config mountPath: /jmx-config
# 3. Volumes: emptyDir for JAR, ConfigMap for JMX rules volumes: - name: otel-agent emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-config # Deployed with the collector in the previous stepDica
O ConfigMap kafka-jmx-config foi implantado com o coletor na etapa anterior. O valor otel.exporter.otlp.endpoint http://otel-collector.newrelic.svc.cluster.local:4317 assume que o coletor está implantado no namespace newrelic com o nome de serviço otel-collector. Atualize-o para corresponder ao DNS real do serviço do coletor, caso seja diferente.
Aplique seu StatefulSet atualizado e aguarde os pods serem atualizados:
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespace(Opcional) Instrumente aplicações produtoras ou consumidoras
Importante
Suporte a linguagens: atualmente, apenas aplicativos Java são suportados para a instrumentação do cliente Kafka usando o agente Java do OpenTelemetry.
Para coletar telemetria em nível de aplicativo de seus aplicativos produtores e consumidores Kafka em execução no Kubernetes, adicione o agente Java do OpenTelemetry a esses pods de aplicativo.
Adicione um contêiner init e variáveis de ambiente à implantação do seu aplicativo:
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-otel-agent image: busybox:latest command: - sh - -c - wget -O /otel-agent/opentelemetry-javaagent.jar https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-agent mountPath: /otel-agent
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-agent/opentelemetry-javaagent.jar -Dotel.service.name=order-process-service -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-agent mountPath: /otel-agent
volumes: - name: otel-agent emptyDir: {}Parâmetro de configuração
A tabela a seguir descreve os principais parâmetros de configuração:
Parâmetro | Descrição |
|---|---|
| Substitua por um nome exclusivo para seu aplicativo produtor ou consumidor |
| Substitua pelo mesmo nome do cluster usado na configuração do seu broker |
| Substitua pelo nome DNS real do seu serviço de coletor (
) |
O agente Java fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando latência de solicitações, métrica de taxas de transferência, taxa de erros e distributed trace. Para configuração avançada, consulte a documentação de instrumentação do Kafka.
Siga estas etapas para configurar o monitoramento abrangente do Kafka instalando o Prometheus JMX Exporter em seus pods de broker e implantando um coletor para reunir e enviar métricas para o New Relic.
Antes de você começar
Certifique-se de ter:
- Uma conta New Relic com uma
- Cluster do Kubernetes com acesso
kubectl - Kafka implantado como um StatefulSet com um serviço headless (para nomes DNS de pod estáveis)
- Capacidade de modificar e reimplantar o StatefulSet do Kafka
Criar ConfigMap de métricas JMX
Crie um ConfigMap contendo a configuração do JMX Exporter que define quais métricas do Kafka coletar. Este ConfigMap será montado em cada pod de broker do Kafka.
Salvar como kafka-jmx-config.yaml. Aplique-o ao namespace onde o Kafka está implantado:
apiVersion: v1kind: ConfigMapmetadata: name: kafka-jmx-metrics namespace: kafka # TODO: Replace with your Kafka namespacedata: kafka-metrics-config.yml: | startDelaySeconds: 0 lowercaseOutputName: true lowercaseOutputLabelNames: true
rules: # Cluster-level controller metrics - pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value' name: kafka_cluster_topic_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value' name: kafka_cluster_partition_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value' name: kafka_broker_fenced_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value' name: kafka_partition_non_preferred_leader type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value' name: kafka_partition_offline type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value' name: kafka_controller_active_count type: GAUGE
# Broker-level replica metrics - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value' name: kafka_partition_under_min_isr type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value' name: kafka_broker_leader_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value' name: kafka_partition_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value' name: kafka_partition_under_replicated type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value' name: kafka_max_lag type: GAUGE
# Broker topic metrics (totals) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count' name: kafka_message_count type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "out"
# Per-topic metrics (only appear after traffic flows) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count' name: kafka_prod_msg_count type: COUNTER labels: topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "out"
# Request metrics - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile' name: kafka_request_time_99p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value' name: kafka_request_queue type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value' name: kafka_purgatory_size type: GAUGE labels: type: "$1"
# Controller stats - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count' name: kafka_leader_election_rate type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count' name: kafka_unclean_election_rate type: COUNTER
# JVM Garbage Collection - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount' name: jvm_gc_collections_count type: COUNTER labels: name: "$1"
# JVM Memory - pattern: 'java.lang<type=Memory><HeapMemoryUsage>max' name: jvm_memory_heap_max type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used' name: jvm_memory_heap_used type: GAUGE
# JVM Threading and System - pattern: 'java.lang<type=Threading><>ThreadCount' name: jvm_thread_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad' name: jvm_system_cpu_utilization type: GAUGE
# Broker uptime - pattern: 'java.lang<type=Runtime><>Uptime' name: kafka_broker_uptime type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above) - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count' name: kafka_request_time_total type: COUNTER labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile' name: kafka_request_time_50p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean' name: kafka_request_time_avg type: GAUGE labels: type: "$1"
# Log flush metrics - pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count' name: kafka_logs_flush_count type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile' name: kafka_logs_flush_time_50p type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile' name: kafka_logs_flush_time_99p type: GAUGE
# JVM GC elapsed time - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime' name: jvm_gc_collections_elapsed type: COUNTER labels: name: "$1"
# JVM Memory heap committed - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed' name: jvm_memory_heap_committed type: GAUGE
# JVM class loading - pattern: 'java.lang<type=ClassLoading><>LoadedClassCount' name: jvm_class_count type: GAUGE
# Additional JVM OS metrics - pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage' name: jvm_system_cpu_load_1m type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors' name: jvm_cpu_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad' name: jvm_cpu_recent_utilization type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount' name: jvm_file_descriptor_count type: GAUGE
# JVM Memory Pool - pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used' name: jvm_memory_pool_used type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max' name: jvm_memory_pool_max type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used' name: jvm_memory_pool_used_after_last_gc type: GAUGE labels: name: "$1"Dica
Personalizar métricas: você pode adicionar ou modificar padrões consultando os exemplos do Prometheus JMX Exporter e a documentação do Kafka MBean.
Aplique o ConfigMap:
$kubectl apply -f kafka-jmx-config.yamlConfigurar o StatefulSet do Kafka para o JMX Exporter
Aplique um patch no seu StatefulSet do Kafka para adicionar um contêiner de inicialização que baixa o JAR do Prometheus JMX Exporter e, em seguida, anexe-o à JVM do broker do Kafka via KAFKA_OPTS.
Etapa 1. Adicione as seguintes seções ao seu manifesto Kafka StatefulSet existente:
spec: template: spec: # 1. Init container: downloads JMX Exporter JAR before Kafka starts initContainers: - name: download-jmx-exporter image: busybox:latest command: - sh - -c - | # Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases. JMX_EXPORTER_VERSION="1.5.0" wget -O /prometheus-jmx/jmx_prometheus_javaagent.jar \ "https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar" volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx
containers: - name: kafka # TODO: Replace with your Kafka container name # 2. Attach JMX Exporter as Java agent on port 9404 env: - name: KAFKA_OPTS value: "-javaagent:/prometheus-jmx/jmx_prometheus_javaagent.jar=9404:/jmx-config/kafka-metrics-config.yml" # 3. Expose port 9404 for Prometheus scraping ports: - name: jmx-metrics containerPort: 9404 protocol: TCP volumeMounts: - name: prometheus-jmx mountPath: /prometheus-jmx - name: jmx-config mountPath: /jmx-config
# 4. Volumes: emptyDir for JAR, ConfigMap for metrics config volumes: - name: prometheus-jmx emptyDir: {} - name: jmx-config configMap: name: kafka-jmx-metrics # Must match the ConfigMap name from Step 2Passo 2. Aplique seu StatefulSet atualizado e aguarde a atualização dos pod:
$kubectl apply -f kafka-statefulset.yaml$kubectl rollout status statefulset/kafka -n kafka # TODO: Replace with your StatefulSet name and namespacePasso 3. Após a conclusão do rollout, verifique se as métricas estão expostas em cada pod do broker:
$# Replace kafka-0 and kafka with your pod name and namespace$kubectl exec -n kafka kafka-0 -- curl -s http://localhost:9404/metrics | grep kafka_ | head -20Importante
Clusters multi-broker: a configuração do contêiner init e KAFKA_OPTS se aplica a todos os pods no StatefulSet automaticamente. Verifique se cada pod de broker expõe métricas após o rollout.
Implantar o OpenTelemetry Collector
Implante o OpenTelemetry Collector em seu cluster. O coletor faz a raspagem dos pods do broker Kafka usando destinos DNS estáticos e escuta na porta 4317 por dados OTLP de aplicativos instrumentados.
O método de instalação via Helm é a abordagem recomendada para implantar o OpenTelemetry Collector no Kubernetes.
Passo 1. Criar segredo de credenciais da New Relic
Dica
Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.
Passo 2. Crie o values.yaml com a configuração do coletor
Tanto o NRDOT quanto os coletores OpenTelemetry usam configuração idêntica. Escolha sua imagem de coletor preferida:
Para opções de configuração avançadas, consulte estas páginas de documentação do receptor:
Documentação do receptor de métricas do Kafka
Etapa 3. Instalar o OpenTelemetry Collector com o Helm
bash$helm repo add open-telemetry https://open-telemetry.github.io/opentelemetry-helm-charts$helm upgrade kafka-monitoring open-telemetry/opentelemetry-collector \>--install \>--namespace newrelic \>--create-namespace \>-f values.yamlPasso 4. Verifique a implantação:
bash$# Check pod status$kubectl get pods -n newrelic -l app.kubernetes.io/name=opentelemetry-collector$$# View logs to verify metrics collection$kubectl logs -n newrelic -l app.kubernetes.io/name=opentelemetry-collector --tail=50Você deve ver logs indicando uma coleta bem-sucedida dos pods de broker Kafka na porta
9404.
O método de instalação via manifesto oferece controle direto sobre os recursos do Kubernetes sem usar o Helm.
Passo 1. Criar segredo de credenciais da New Relic
Dica
Para outras configurações de endpoint, consulte Configure seu endpoint OTLP.
Passo 2. Crie arquivos de manifesto
Tanto o NRDOT quanto os coletores OpenTelemetry usam configuração idêntica. Apenas a imagem do contêiner difere.
Para opções de configuração avançadas, consulte estas páginas de documentação do receptor:
Documentação do receptor de métricas do Kafka
Passo 3. Implante os manifestos
bash$# Create namespace if it doesn't exist$kubectl create namespace newrelic --dry-run=client -o yaml | kubectl apply -f -$$# Apply ConfigMap$kubectl apply -f collector-configmap.yaml$$# Apply Deployment (includes ServiceAccount)$kubectl apply -f collector-deployment.yamlPasso 4. Verifique a implantação:
bash$# Check pod status$kubectl get pods -n newrelic -l app=otel-collector$$# View logs to verify metrics collection$kubectl logs -n newrelic -l app=otel-collector --tail=50Você deve ver logs indicando uma coleta bem-sucedida dos pods de broker Kafka na porta
9404.
(Opcional) Instrumente aplicações produtoras ou consumidoras
Importante
Suporte a linguagens: aplicativos Java suportam instrumentação de cliente Kafka pronta para uso utilizando o agente Java do OpenTelemetry.
Para coletar telemetria em nível de aplicativo de seus aplicativos produtores e consumidores Kafka, use o agente Java do OpenTelemetry com um contêiner init:
apiVersion: apps/v1kind: Deploymentmetadata: name: kafka-producer-appspec: template: spec: initContainers: - name: download-java-agent image: busybox:latest command: - sh - -c - | wget -O /otel-auto-instrumentation/opentelemetry-javaagent.jar \ https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
containers: - name: app image: your-kafka-app:latest env: - name: JAVA_TOOL_OPTIONS value: >- -javaagent:/otel-auto-instrumentation/opentelemetry-javaagent.jar -Dotel.service.name=my-kafka-app -Dotel.resource.attributes=kafka.cluster.name=my-kafka-cluster -Dotel.exporter.otlp.endpoint=http://otel-collector.newrelic.svc.cluster.local:4317 -Dotel.exporter.otlp.protocol=grpc -Dotel.metrics.exporter=otlp -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.instrumentation.kafka.experimental-span-attributes=true -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -Dotel.instrumentation.kafka.producer-propagation.enabled=true -Dotel.instrumentation.kafka.enabled=true -Dotel.instrumentation.runtime-telemetry.enabled=false volumeMounts: - name: otel-auto-instrumentation mountPath: /otel-auto-instrumentation
volumes: - name: otel-auto-instrumentation emptyDir: {}Parâmetro de configuração
A tabela a seguir descreve os principais parâmetros de configuração:
| Parâmetro | Descrição |
|---|---|
service.name | Substitua my-kafka-app por um nome exclusivo para sua aplicação produtora ou consumidora |
kafka.cluster.name | Substitua my-kafka-cluster pelo mesmo nome do cluster usado na configuração do seu coletor |
otlp.endpoint | O endpoint http://otel-collector.newrelic.svc.cluster.local:4317 pressupõe que o coletor esteja implantado no namespace newrelic como otel-collector |
O agente Java fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando latência de solicitações, métrica de taxas de transferência, taxa de erros e distributed trace. Para configuração avançada, consulte a documentação de instrumentação do Kafka.
(Opcional) Encaminhar logs do broker Kafka
Para coletar logs do broker Kafka e enviá-los para o New Relic, adicione um receiver filelog à configuração do seu coletor.
Encontre seus dados
Após alguns minutos, seus dados do Kafka devem aparecer no New Relic. Consulte Encontre seus dados para obter instruções detalhadas sobre como explorar seus dados do Kafka em diferentes visualizações na interface do New Relic.
A tabela a seguir resume onde cada tipo de sinal é armazenado. Substitua my-kafka-cluster pelo seu valor de KAFKA_CLUSTER_NAME em todas as consultas abaixo:
| Sinal | Tipo de evento | O que está incluído |
|---|---|---|
| Métrica | Metric | Métricas de broker, tópico, partição, grupo de consumidores e JVM |
| Registro | Log | Logs de aplicativos de produtor e consumidor (via agente Java OTel) e logs do broker coletados via agente Java |
| Traces | Span | Spans de produtor e consumidor, incluindo operações publish e receive por mensagem em tópicos |
Métrica
As métricas de broker, tópico, partição, grupo de consumidores e JVM são armazenadas no tipo de evento Metric:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoRegistro
Logs de aplicativos produtores e consumidores instrumentados com o agente Java do OpenTelemetry, e logs do broker coletados pelo agente Java no broker, são armazenados no tipo de evento Log:
FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoTraces
Os spans de produtor e consumidor, incluindo operações publish e receive por mensagem entre tópicos, são armazenados no tipo de evento Span:
FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoExemplo
Um exemplo completo e funcional com manifestos Kafka StatefulSet, valores do Helm, configuração do OTel Collector e aplicativos de exemplo de produtor/consumidor está disponível no repositório New Relic OpenTelemetry Examples.
Resolução de problemas
Próximos passos
- Explore as métricas do Kafka - Visualize a referência completa de métricas
- Criar dashboards personalizados - Crie visualizações para seus dados do Kafka
- Configurar alertas - Monitore métricas críticas como lag do consumidor e partições sub-replicadas
Recursos relacionados
- Kafka auto-hospedado - monitoramento do Kafka para ambientes auto-hospedados (não Kubernetes)
- Kubernetes Strimzi - Monitoramento do Kafka para Kafka gerenciado pelo Strimzi no Kubernetes
- Agente Java do OpenTelemetry - Documentação oficial do agente Java do OTel
- Prometheus JMX Exporter - agente Java que expõe métricas JMX no formato Prometheus
- Receptor do Prometheus - Receptor do OTel Collector para coletar endpoints de métricas do Prometheus
- kafkametrics receiver - Documentação do receiver de lag do consumidor e métricas de tópico