• /
  • EnglishEspañolFrançais日本語한국어Português
  • EntrarComeçar agora

Esta tradução de máquina é fornecida para sua comodidade.

Caso haja alguma divergência entre a versão em inglês e a traduzida, a versão em inglês prevalece. Acesse esta página para mais informações.

Criar um problema

Monitore o Kafka auto-hospedado com OpenTelemetry

Monitore seu cluster Apache Kafka auto-hospedado instalando o OpenTelemetry Collector diretamente em hosts Linux. Escolha entre a abordagem do agente Java do OpenTelemetry ou do Prometheus JMX Exporter para coletar métricas JMX dos seus brokers Kafka.

Arquitetura

O New Relic oferece suporte a duas abordagens para o monitoramento do Kafka auto-hospedado: o agente Java do OpenTelemetry ou o Prometheus JMX Exporter. O diagrama a seguir ilustra o fluxo de dados para cada abordagem.

Self-hosted Kafka monitoring architecture

Etapas de instalação

Siga estas etapas para configurar o monitoramento abrangente do Kafka instalando o agente Java do OpenTelemetry em seus brokers e implantando um coletor para reunir e enviar métricas e logs para o New Relic.

Antes de você começar

Certifique-se de ter:

  • Uma conta New Relic com uma
  • Acesso de rede do coletor à porta do servidor de bootstrap do Kafka (tipicamente 9092)

Criar configuração do coletor

Crie a configuração principal do OpenTelemetry Collector em ~/opentelemetry/collector-kafka-config.yaml em um host de monitoramento.

receivers:
# OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
# Remove broker.id for cluster-level metrics — these represent the whole cluster,
# not a specific broker. broker.id is retained on broker-level metrics pipelines.
- context: resource
statements:
- delete_key(attributes, "broker.id")
transform/remove_extra_attributes:
metric_statements:
- context: resource
statements:
# Delete all attributes starting with "process."
- delete_matching_keys(attributes, "^process\\..*")
# Delete all attributes starting with "telemetry."
- delete_matching_keys(attributes, "^telemetry\\..*")
- delete_key(attributes, "host.arch")
- delete_key(attributes, "os.description")
- delete_key(attributes, "host.image.id")
- delete_key(attributes, "host.type")
- delete_matching_keys(attributes, "^cloud\\..*")
- delete_key(attributes, "service.instance.id") where IsMatch(attributes["service.name"], "^unknown_service:")
- delete_key(attributes, "service.name") where IsMatch(attributes["service.name"], "^unknown_service:")
# Filter internal Kafka topics as a safety net (kafkametrics topic_match handles the receiver side)
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
filter/remove_partition_level_replicas:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
exporters:
otlp/newrelic:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
# Broker metrics pipeline (excludes cluster-level metrics)
metrics/broker:
receivers: [otlp, kafkametrics]
processors:
- resourcedetection
- resource
- filter/exclude_cluster_metrics
- filter/internal_topics
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- metricstransform/kafka_topic_sum_aggregation
- filter/remove_partition_level_replicas
- batch/aggregation
exporters: [otlp/newrelic]
# Cluster metrics pipeline (controller-emitted metrics like offline partitions, topic/partition counts — no broker.id)
metrics/cluster:
receivers: [otlp]
processors:
- resourcedetection
- resource
- filter/include_cluster_metrics
- transform/remove_broker_id
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/aggregation
exporters: [otlp/newrelic]
# APM traces pipeline (producer + consumer spans via OTel Java agent)
traces/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]
# APM logs pipeline (producer + consumer logs via OTel Java agent)
logs/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]

Definir variáveis de ambiente

Defina as variáveis de ambiente necessárias no host de monitoramento antes de instalar o coletor:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region

Parâmetro de configuração

A tabela a seguir descreve os principais parâmetros de configuração:

VariávelDescrição
NEW_RELIC_LICENSE_KEYSua chave de licença New Relic, por exemplo YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEUm nome exclusivo para seu cluster Kafka, por exemplo my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSESOs endereços do seu broker de inicialização do Kafka, por exemplo broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTEndpoint de ingestão OTLP. Use https://otlp.nr-data.net:4317 para a região dos EUA ou https://otlp.eu01.nr-data.net:4317 para a região da UE. Para outras configurações, consulte Configure seu endpoint OTLP.

Instale e inicie o coletor

Instale e execute o coletor no host de monitoramento. Escolha entre o NRDOT Collector (distribuição da New Relic) ou o OpenTelemetry Collector:

Dica

NRDOT Collector é a distribuição da New Relic do OpenTelemetry Collector com suporte da New Relic para assistência.

Passo 1. Baixe e instale o binário

Baixe e instale o binário do NRDOT Collector para o sistema operacional do host. O exemplo abaixo é para a arquitetura linux_amd64:

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

Importante

Para outros sistemas operacionais e arquiteturas, acesse NRDOT Collector releases e baixe o binário apropriado para o seu sistema.

Etapa 2. Inicie o coletor

Execute o coletor com seu arquivo de configuração para iniciar o monitoramento:

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

O coletor agora está em execução e pronto para receber dados. Conclua as etapas restantes para anexar o agente Java aos seus brokers do Kafka antes que as métricas apareçam no New Relic.

Passo 1. Baixe e instale o binário

Baixe e instale o binário do OpenTelemetry Collector Contrib para o seu sistema operacional host. O exemplo abaixo é para a arquitetura linux_amd64:

bash
$
# Set version and architecture
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
# Download the collector
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
# Extract the binary
$
tar -xzf otelcol-contrib.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv otelcol-contrib /usr/local/bin/
$
$
# Verify installation
$
otelcol-contrib --version

Para outros sistemas operacionais, acesse a página de releases do OpenTelemetry Collector.

Etapa 2. Inicie o coletor

Execute o coletor com seu arquivo de configuração para iniciar o monitoramento:

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

O coletor agora está em execução e pronto para receber dados. Conclua as etapas restantes para anexar o agente Java aos seus brokers do Kafka antes que as métricas apareçam no New Relic.

Baixe o agente Java do OpenTelemetry

Importante

Certifique-se de que seu OpenTelemetry Collector esteja em execução antes de (re)iniciar os brokers do Kafka com o agente Java anexado. O agente começa a enviar métricas imediatamente na inicialização do broker, portanto o coletor deve estar disponível para recebê-las.

O agente Java do OpenTelemetry é executado como um agente Java anexado aos seus brokers Kafka, coletando métricas do Kafka e JMX e enviando-as via OTLP para o coletor:

bash
$
# Create directory for OpenTelemetry components
$
mkdir -p ~/opentelemetry
$
$
# Download OpenTelemetry Java agent
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

Criar configuração personalizada do JMX

Crie um arquivo de configuração JMX do agente Java do OpenTelemetry para coletar métricas do Kafka dos MBeans JMX.

Crie o arquivo ~/opentelemetry/kafka-jmx-config.yaml em cada host do broker com a seguinte configuração:

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count - increasing indicates broker failures
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
# Request latency: total count, 50th percentile, and average (99p kept above)
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
# GC elapsed time (cumulative collection time in ms)
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
# JVM class loading
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
# JVM heap committed (in addition to heap.used and heap.max)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
# Additional JVM CPU and system metrics
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
# JVM memory pool breakdown (by generation: G1 Old Gen, Eden, Survivor, etc.)
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Configurar o broker do Kafka

Anexe o agente Java do OpenTelemetry ao seu broker Kafka definindo a variável de ambiente KAFKA_OPTS antes de iniciar o Kafka.

Exemplo de broker único:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \
>
-Dotel.jmx.enabled=true \
>
-Dotel.jmx.config=$JMX_CONFIG \
>
-Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol=grpc \
>
-Dotel.metrics.exporter=otlp \
>
-Dotel.logs.exporter=otlp \
>
-Dotel.instrumentation.runtime-telemetry.enabled=false \
>
-Dotel.metric.export.interval=30000" \
>
bin/kafka-server-start.sh config/server.properties &

Importante

Clusters com múltiplos brokers: Para múltiplos brokers, use a mesma configuração com valores únicos de broker.id (por exemplo, broker.id=1, broker.id=2, broker.id=3) no parâmetro -Dotel.resource.attributes para cada broker.

Dica

Os logs do broker são habilitados automaticamente com a flag -Dotel.logs.exporter=otlp acima. Para desativar a coleta de logs do broker, defina -Dotel.logs.exporter=none em vez disso.

Parâmetro de configuração

A tabela a seguir descreve os principais parâmetros de configuração:

ParâmetroDescrição
otlp.endpointSubstitua pelo IP ou nome do host que executa o seu OpenTelemetry Collector, por exemplo http://collector-host-ip:4317
broker.idSubstitua 1 pelo ID exclusivo do broker para cada broker, por exemplo, broker.id=1, broker.id=2, broker.id=3
kafka.cluster.nameSubstitua my-kafka-cluster pelo nome do seu cluster Kafka. Deve corresponder ao valor definido na configuração do coletor.
logs.exporterHabilita a coleta de logs do broker quando definido como otlp. Defina como none para desativar o encaminhamento de logs do broker.

Para opções de configuração completas, consulte o guia de configuração do agente Java.

(Opcional) Instrumente aplicações produtoras ou consumidoras

Importante

Suporte a linguagens: atualmente, apenas aplicativos Java são suportados para a instrumentação do cliente Kafka usando o agente Java do OpenTelemetry.

Para coletar telemetria em nível de aplicativo de seus aplicativos produtores e consumidores Kafka, baixe o agente Java do OpenTelemetry na etapa Baixar o agente Java do OpenTelemetry acima.

Inicie seu aplicativo com o agente:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

Parâmetro de configuração

A tabela a seguir descreve os principais parâmetros de configuração:

ParâmetroDescrição
service.nameSubstitua por um nome exclusivo para seu aplicativo produtor ou consumidor, por exemplo order-process-service
kafka.cluster.nameSubstitua pelo mesmo nome do cluster usado na configuração do seu coletor, por exemplo my-kafka-cluster
otlp.endpointSubstitua pelo nome do host ou IP do host que executa seu OpenTelemetry Collector, por exemplo http://collector-host-ip:4317

Dica

A configuração acima envia telemetria para um OpenTelemetry Collector em execução em collector-host-ip:4317. Se você quiser um coletor separado dedicado à telemetria de aplicativo, crie um com a seguinte configuração:

O agente Java fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando latências de solicitação, métricas de taxas de transferência, taxas de erros e distributed traces.

Para configuração avançada, consulte a documentação de instrumentação do Kafka.

Siga estas etapas para configurar o monitoramento abrangente do Kafka instalando o Prometheus JMX Exporter em seus brokers e implantando um coletor para reunir e enviar métricas para o New Relic.

Antes de você começar

Certifique-se de ter:

  • Uma conta New Relic com uma
  • Acesso à rede do host do coletor para cada broker na porta 9404
  • Acesso de rede do coletor à porta de inicialização do Kafka (tipicamente 9092)

Baixe o exportador Prometheus JMX

Baixe o JAR do Prometheus JMX Exporter em cada host do broker Kafka:

bash
$
# Create directory for Prometheus components
$
mkdir -p ~/opentelemetry
$
$
# Download the Prometheus JMX Exporter agent JAR
$
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
$
JMX_EXPORTER_VERSION="1.5.0"
$
curl -L -o ~/opentelemetry/jmx_prometheus_javaagent.jar \
>
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"

Criar configuração de métricas JMX

Crie o arquivo de configuração do JMX Exporter que define quais métricas do Kafka coletar. Salve como ~/opentelemetry/kafka-jmx-config.yaml em cada host do broker:

startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

Dica

Personalizar métricas: você pode adicionar ou modificar padrões consultando os exemplos do Prometheus JMX Exporter e a documentação do Kafka MBean. Consulte a documentação de regras do JMX Exporter para configurações adicionais.

Configure os brokers Kafka para usar o JMX Exporter

Anexe o Prometheus JMX Exporter como um agente Java a cada broker do Kafka adicionando-o às suas opções de inicialização do Kafka.

Exemplo de broker único:

bash
$
JMX_JAR="$HOME/opentelemetry/jmx_prometheus_javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:${JMX_JAR}=9404:${JMX_CONFIG}" \
>
bin/kafka-server-start.sh config/server.properties &

Cada broker agora exporá métricas do Prometheus na porta 9404. Verifique:

bash
$
curl http://localhost:9404/metrics | grep kafka_

Importante

Clusters multi-broker: aplique a mesma configuração KAFKA_OPTS a cada broker. Cada broker expõe métricas na porta 9404 a partir do IP do seu próprio host.

Criar configuração do coletor

Crie a configuração do OpenTelemetry Collector em ~/opentelemetry/collector-kafka-config.yaml em um host de monitoramento.

O receptor do Prometheus coleta todos os endpoints do broker. O coletor escuta em 0.0.0.0:4317 por quaisquer dados OTLP (traces de aplicativo, logs), além de fazer o scraping de endpoints do Prometheus.

receivers:
# OTLP receiver for application traces, metrics, and logs (listens on port 4317)
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Prometheus receiver scrapes JMX metrics from Kafka brokers
prometheus/kafka-jmx:
config:
scrape_configs:
- job_name: 'kafka-jmx-metrics'
metrics_path: /metrics
scrape_interval: 30s
static_configs:
# TODO: Replace each target with your broker hostname or IP, and set a unique broker.id per broker
- targets: ['broker1-host:9404']
labels:
broker.id: '0'
- targets: ['broker2-host:9404']
labels:
broker.id: '1'
- targets: ['broker3-host:9404']
labels:
broker.id: '2'
# Kafka metrics receiver for cluster-level consumer lag, topic, and partition metrics
kafkametrics/cluster:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
exporters:
otlp/backend:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
processors:
# Batch processor for efficient export
batch/export:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Transform metric naming conventions (underscore to dot, normalize special names)
transform/metric-naming:
metric_statements:
- context: metric
statements:
- replace_pattern(name, "_", ".")
- replace_pattern(name, "\\.load\\.1", ".load_1")
- replace_pattern(name, "\\.recent\\.util", ".recent_util")
- replace_pattern(name, "file\\.descriptor\\.count", "file_descriptor.count")
- replace_pattern(name, "\\.memory\\.pool\\.used\\.bytes$", ".memory.pool.used")
- replace_pattern(name, "\\.memory\\.pool\\.max\\.bytes$", ".memory.pool.max")
- replace_pattern(name, "\\.memory\\.pool\\.collection\\.used\\.bytes$", ".memory.pool.used_after_last_gc")
- replace_pattern(name, "\\.non\\.preferred\\.leader", ".non_preferred_leader")
- replace_pattern(name, "\\.under\\.min\\.isr", ".under_min_isr")
- replace_pattern(name, "\\.under\\.replicated", ".under_replicated")
- replace_pattern(name, "\\.total$", "") where name != "kafka.request.time.total"
- context: datapoint
statements:
- set(attributes["name"], attributes["gc"]) where attributes["gc"] != nil
- delete_key(attributes, "gc") where attributes["gc"] != nil
- set(attributes["name"], attributes["pool"]) where attributes["pool"] != nil
- delete_key(attributes, "pool") where attributes["pool"] != nil
# Add cluster name to all metrics
resource/cluster-name:
attributes:
- key: kafka.cluster.name
# TODO: Replace with your Kafka cluster name
value: ${env:KAFKA_CLUSTER_NAME}
action: upsert
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: datapoint
statements:
- delete_key(attributes, "broker.id")
# Filter out scrape overhead metrics
filter/scrape-overhead:
metrics:
exclude:
match_type: regexp
metric_names:
- "^jmx_.*"
- "^process_.*"
- "^jvm_buffer_pool_.*"
- "^jvm_threads_.*"
- "^jvm_classes_.*"
- "^jvm_memory_(heap|non_heap)_(committed|init|max|used)_bytes$"
- "^jvm_compilation_.*"
- "^jvm_(runtime|info).*"
- "^jvm_memory_pool_(allocated_bytes_total|committed_bytes|init_bytes|collection_(committed|init|max)_bytes)$"
# Include only cluster-level metrics for the cluster pipeline
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Exclude cluster-level metrics from the broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Remove unnecessary attributes
transform/remove_attributes:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
- delete_key(attributes, "server.address")
- delete_key(attributes, "server.port")
- delete_key(attributes, "service.instance.id")
- delete_key(attributes, "host.name")
- delete_key(attributes, "url.scheme")
# Aggregate partition metrics to topic level
metricstransform/topic-aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
# Filter out original partition replicas metric
filter/exclude_partition_replicas_metric:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
# Filter internal Kafka topics as a safety net
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
# Convert cumulative to delta metrics
cumulativetodelta:
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
service:
pipelines:
# Application traces from instrumented Kafka clients and apps
traces:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application metrics from instrumented Kafka clients and apps
metrics:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application logs from instrumented Kafka clients and apps
logs:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Broker-level metrics from Prometheus JMX scraping
metrics/broker:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/exclude_cluster_metrics
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Prometheus JMX scraping
metrics/cluster/prometheus:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- memory_limiter
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Kafka metrics receiver (consumer lag, topics, partitions)
metrics/cluster/kafkametrics:
receivers:
- kafkametrics/cluster
processors:
- resource/cluster-name
- filter/internal_topics
- transform/remove_attributes
- metricstransform/topic-aggregation
- filter/exclude_partition_replicas_metric
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend

Definir variáveis de ambiente

Defina as variáveis de ambiente necessárias no host de monitoramento antes de iniciar o coletor:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region
$
# EU region: https://otlp.eu01.nr-data.net:4317

Parâmetro de configuração

A tabela a seguir descreve os principais parâmetros de configuração:

VariávelDescrição
NEW_RELIC_LICENSE_KEYSua chave de licença New Relic, por exemplo YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEUm nome exclusivo para seu cluster Kafka, por exemplo my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSESOs endereços do seu broker de inicialização do Kafka, por exemplo broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTEndpoint de ingestão OTLP. Use https://otlp.nr-data.net:4317 para a região dos EUA ou https://otlp.eu01.nr-data.net:4317 para a região da UE. Para outras configurações, consulte Configure seu endpoint OTLP.

Instale e inicie o coletor

Instale e execute o coletor no host de monitoramento. Escolha entre o NRDOT Collector (distribuição da New Relic) ou o OpenTelemetry Collector:

Dica

NRDOT Collector é a distribuição da New Relic do OpenTelemetry Collector com suporte da New Relic para assistência. Para mais informações, consulte o repositório do NRDOT Collector no GitHub.

Passo 1. Baixe e instale o binário

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

Importante

Para outros sistemas operacionais e arquiteturas, acesse NRDOT Collector releases e baixe o binário apropriado para o seu sistema.

Etapa 2. Inicie o coletor

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

O coletor começará a coletar métricas do Kafka e enviá-las para o New Relic em alguns minutos.

Passo 1. Baixe e instale o binário

bash
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
tar -xzf otelcol-contrib.tar.gz
$
sudo mv otelcol-contrib /usr/local/bin/
$
otelcol-contrib --version

Para outros sistemas operacionais, acesse a página de releases do OpenTelemetry Collector.

Etapa 2. Inicie o coletor

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

O coletor começará a coletar métricas do Kafka e enviá-las para o New Relic em alguns minutos.

(Opcional) Instrumente aplicações produtoras ou consumidoras

Importante

Suporte a linguagens: atualmente, apenas aplicativos Java são suportados para a instrumentação do cliente Kafka usando o agente Java do OpenTelemetry.

Para coletar telemetria em nível de aplicativo de seus aplicativos produtores e consumidores Kafka, baixe o agente Java do OpenTelemetry se ainda não o fez:

bash
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

Inicie seu aplicativo com o agente:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

Parâmetro de configuração

A tabela a seguir descreve os principais parâmetros de configuração:

ParâmetroDescrição
service.nameSubstitua por um nome exclusivo para seu aplicativo produtor ou consumidor, por exemplo order-process-service
kafka.cluster.nameSubstitua pelo mesmo nome do cluster usado na configuração do seu coletor, por exemplo my-kafka-cluster
otlp.endpointSubstitua pelo nome do host ou IP do host que executa seu OpenTelemetry Collector, por exemplo http://collector-host-ip:4317

Dica

A configuração acima envia telemetria para um OpenTelemetry Collector em execução em collector-host-ip:4317. Se você quiser um coletor separado dedicado à telemetria de aplicativo, crie um com a seguinte configuração:

O agente Java fornece instrumentação Kafka pronta para uso com zero alterações de código, capturando latência de solicitações, métrica de taxas de transferência, taxa de erros e distributed trace. Para configuração avançada, consulte a documentação de instrumentação do Kafka.

(Opcional) Encaminhar logs do broker Kafka

Para coletar logs do broker Kafka e enviá-los para o New Relic, configure o receiver filelog no seu OpenTelemetry Collector.

Encontre seus dados

Após alguns minutos, seus dados do Kafka devem aparecer no New Relic. Consulte Encontre seus dados para obter instruções detalhadas sobre como explorar seus dados do Kafka em diferentes visualizações na interface do New Relic.

A tabela a seguir resume onde cada tipo de sinal é armazenado. Substitua my-kafka-cluster pelo seu valor de KAFKA_CLUSTER_NAME em todas as consultas abaixo:

SinalTipo de eventoO que está incluído
MétricaMetricMétricas de broker, tópico, partição, grupo de consumidores e JVM
RegistroLogLogs de aplicativos de produtor e consumidor (via agente Java OTel) e logs do broker coletados via agente Java
TracesSpanSpans de produtor e consumidor, incluindo operações publish e receive por mensagem em tópicos

Métrica

As métricas de broker, tópico, partição, grupo de consumidores e JVM são armazenadas no tipo de evento Metric. Substitua my-kafka-cluster pelo seu valor de KAFKA_CLUSTER_NAME:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Registro

Logs de aplicativos produtores e consumidores instrumentados com o agente Java do OpenTelemetry, e logs do broker quando -Dotel.logs.exporter=otlp é definido, são armazenados no tipo de evento Log:

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Traces

Os spans de produtor e consumidor, incluindo operações publish e receive por mensagem entre tópicos, são armazenados no tipo de evento Span:

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Exemplo

Um exemplo completo e funcional com a configuração do Docker Compose, a configuração do OTel Collector, a configuração do agente Java do OTel e aplicativos de exemplo de produtor/consumidor está disponível no repositório de exemplos do New Relic OpenTelemetry.

Resolução de problemas

Próximos passos

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.