• /
  • EnglishEspañolFrançais日本語한국어Português
  • 로그인지금 시작하기

사용자의 편의를 위해 제공되는 기계 번역입니다.

영문본과 번역본이 일치하지 않는 경우 영문본이 우선합니다. 보다 자세한 내용은 이 페이지를 방문하시기 바랍니다.

문제 신고

OpenTelemetry를 사용하여 자체 호스팅 Kafka를 모니터링하세요.

Linux 호스트에 직접 OpenTelemetry Collector를 설치하여 자체 호스팅 아파치 Kafka 클러스터를 모니터하십시오. Kafka 브로커에서 JMX 메트릭을 수집하려면 OpenTelemetry 자바 에이전트 또는 Prometheus JMX Exporter 방식 중 하나를 선택하십시오.

아키텍처

뉴렐릭은 자체 호스팅 Kafka를 모니터링하기 위한 두 가지 접근 방식을 지원합니다: OpenTelemetry 자바 에이전트 또는 Prometheus JMX Exporter. 다음 다이어그램은 각 접근 방식에 대한 데이터 흐름을 보여줍니다.

Self-hosted Kafka monitoring architecture

설치 단계

다음 단계에 따라 브로커에 OpenTelemetry 자바 에이전트를 설치하고 수집기를 배포하여 메트릭과 로그를 수집하고 뉴렐릭으로 전송하여 포괄적인 Kafka 모니터링을 설정하십시오.

시작하기 전에

다음 사항을 확인하십시오:

  • 뉴렐릭 계정
  • 수집기에서 Kafka 부팅스트랩 서버 포트(일반적으로 9092)로의 네트워크 액세스

수집기 설정 생성

모니터링 호스트의 ~/opentelemetry/collector-kafka-config.yaml 에 메인 OpenTelemetry Collector 설정을 생성합니다.

receivers:
# OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
# Remove broker.id for cluster-level metrics — these represent the whole cluster,
# not a specific broker. broker.id is retained on broker-level metrics pipelines.
- context: resource
statements:
- delete_key(attributes, "broker.id")
transform/remove_extra_attributes:
metric_statements:
- context: resource
statements:
# Delete all attributes starting with "process."
- delete_matching_keys(attributes, "^process\\..*")
# Delete all attributes starting with "telemetry."
- delete_matching_keys(attributes, "^telemetry\\..*")
- delete_key(attributes, "host.arch")
- delete_key(attributes, "os.description")
- delete_key(attributes, "host.image.id")
- delete_key(attributes, "host.type")
- delete_matching_keys(attributes, "^cloud\\..*")
- delete_key(attributes, "service.instance.id") where IsMatch(attributes["service.name"], "^unknown_service:")
- delete_key(attributes, "service.name") where IsMatch(attributes["service.name"], "^unknown_service:")
# Filter internal Kafka topics as a safety net (kafkametrics topic_match handles the receiver side)
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
filter/remove_partition_level_replicas:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
exporters:
otlp/newrelic:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
# Broker metrics pipeline (excludes cluster-level metrics)
metrics/broker:
receivers: [otlp, kafkametrics]
processors:
- resourcedetection
- resource
- filter/exclude_cluster_metrics
- filter/internal_topics
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- metricstransform/kafka_topic_sum_aggregation
- filter/remove_partition_level_replicas
- batch/aggregation
exporters: [otlp/newrelic]
# Cluster metrics pipeline (controller-emitted metrics like offline partitions, topic/partition counts — no broker.id)
metrics/cluster:
receivers: [otlp]
processors:
- resourcedetection
- resource
- filter/include_cluster_metrics
- transform/remove_broker_id
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/aggregation
exporters: [otlp/newrelic]
# APM traces pipeline (producer + consumer spans via OTel Java agent)
traces/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]
# APM logs pipeline (producer + consumer logs via OTel Java agent)
logs/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]

환경 변수 설정

수집기를 설치하기 전에 모니터링 호스트에 필수 환경 변수를 설정하십시오:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region

구성 매개변수

다음 표에서는 주요 설정 시위에 대해 설명합니다.

변하기 쉬운설명
NEW_RELIC_LICENSE_KEY예를 들어, 귀하의 뉴렐릭 라이선스 키 YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEKafka 클러스터의 고유한 이름, 예를 들어 my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSES예를 들어, 사용자의 Kafka 부트스트랩 브로커 주소 broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTOTLP 인제스트 엔드포인트. US 리전에는 https://otlp.nr-data.net:4317 을(를) 사용하고 EU 리전에는 https://otlp.eu01.nr-data.net:4317 을(를) 사용하십시오. 다른 설정의 경우, OTLP 엔드포인트 구성을 참조하세요.

수집기를 설치하고 시작하세요.

모니터링 호스트에 수집기를 설치하고 실행하십시오. NRDOT Collector(뉴렐릭 배포판) 또는 OpenTelemetry Collector 중에서 선택하세요:

NRDOT Collector 는 뉴렐릭이 OpenTelemetry Collector 배포한 버전이며, 뉴렐릭이 지원을 제공합니다.

1단계. 바이너리 다운로드 및 설치

호스트 운영 시스템용 NRDOT Collector 바이너리를 다운로드하여 설치하십시오. 아래 예시는 linux_amd64 아키텍처용입니다:

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

중요

다른 운영 시스템 및 복제의 경우 NRDOT Collector 릴리스를 방문하여 시스템에 적합한 바이너리를 다운로드하세요.

2단계. 수집기 시작

시뮬레이션을 시작하려면 설정 파일로 수집기를 실행하세요.

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

수집기가 이제 실행 중이며 데이터를 수신할 준비가 되었습니다. 뉴렐릭에 메트릭이 표시되기 전에 Kafka 브로커에 자바 에이전트를 연결하는 남은 단계를 완료하십시오.

1단계. 바이너리 다운로드 및 설치

호스트 운영 시스템용 OpenTelemetry Collector Contrib 바이너리를 다운로드하여 설치하십시오. 아래 예시는 linux_amd64 아키텍처용입니다:

bash
$
# Set version and architecture
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
# Download the collector
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
# Extract the binary
$
tar -xzf otelcol-contrib.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv otelcol-contrib /usr/local/bin/
$
$
# Verify installation
$
otelcol-contrib --version

다른 운영 시스템에 대해서는 OpenTelemetry Collector 릴리스 페이지를 방문하세요.

2단계. 수집기 시작

시뮬레이션을 시작하려면 설정 파일로 수집기를 실행하세요.

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

수집기가 이제 실행 중이며 데이터를 수신할 준비가 되었습니다. 뉴렐릭에 메트릭이 표시되기 전에 Kafka 브로커에 자바 에이전트를 연결하는 남은 단계를 완료하십시오.

OpenTelemetry 자바 에이전트 다운로드

중요

자바 에이전트가 연결된 Kafka 브로커를 (재)시작하기 전에 OpenTelemetry Collector가 실행 중인지 확인하십시오. 에이전트는 브로커 시작 시 즉시 메트릭을 전송하기 시작하므로, 수집기는 이를 수신할 수 있어야 합니다.

OpenTelemetry 자바 에이전트 는 Kafka 브로커에 연결된 자바 에이전트로 실행되며, Kafka 및 JMX 메트릭을 수집하고 OTLP를 통해 수집기로 전송합니다:

bash
$
# Create directory for OpenTelemetry components
$
mkdir -p ~/opentelemetry
$
$
# Download OpenTelemetry Java agent
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

JMX 사용자 정의 설정 만들기

JMX MBean에서 Kafka 메트릭을 수집하려면 OpenTelemetry 자바 에이전트 JMX 설정 파일을 생성하세요.

다음 설정으로 각 브로커 호스트에 ~/opentelemetry/kafka-jmx-config.yaml 파일을 생성합니다:

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count - increasing indicates broker failures
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
# Request latency: total count, 50th percentile, and average (99p kept above)
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
# GC elapsed time (cumulative collection time in ms)
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
# JVM class loading
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
# JVM heap committed (in addition to heap.used and heap.max)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
# Additional JVM CPU and system metrics
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
# JVM memory pool breakdown (by generation: G1 Old Gen, Eden, Survivor, etc.)
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Kafka 브로커 구성

Kafka를 시작하기 전에 KAFKA_OPTS 환경 변수를 설정하여 OpenTelemetry 자바 에이전트를 Kafka 브로커에 연결하십시오.

단일 브로커 예시:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \
>
-Dotel.jmx.enabled=true \
>
-Dotel.jmx.config=$JMX_CONFIG \
>
-Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol=grpc \
>
-Dotel.metrics.exporter=otlp \
>
-Dotel.logs.exporter=otlp \
>
-Dotel.instrumentation.runtime-telemetry.enabled=false \
>
-Dotel.metric.export.interval=30000" \
>
bin/kafka-server-start.sh config/server.properties &

중요

다중 브로커 클러스터: 여러 브로커의 경우 각 브로커에 대해 -Dotel.resource.attributes 클러스터에서 고유한 broker.id 값(예: broker.id=1, broker.id=2, broker.id=3)과 동일한 설정을 사용합니다.

브로커 로그는 위의 -Dotel.logs.exporter=otlp 플래그로 자동으로 활성화됩니다. 브로커 로그 수집을 비활성화하려면 대신 -Dotel.logs.exporter=none 을(를) 설정하십시오.

구성 매개변수

다음 표에서는 주요 설정 시위에 대해 설명합니다.

매개변수설명
otlp.endpoint예를 들어, OpenTelemetry Collector를 실행하는 호스트의 IP 또는 호스트명으로 바꾸세요. http://collector-host-ip:4317
broker.id1 을(를) 각 브로커의 고유 브로커 ID로 바꿉니다(예: broker.id=1, broker.id=2,). broker.id=3
kafka.cluster.namemy-kafka-cluster 을 Kafka 클러스터 이름으로 교체합니다. 수집기 설정에 지정된 값과 일치해야 합니다.
logs.exporterotlp(으)로 설정하면 브로커 로그 수집을 활성화합니다. 브로커 로그 포워딩을 비활성화하려면 none (으)로 설정하십시오.

전체 설정 옵션은 자바 에이전트 설정 가이드를 참조하세요.

(선택사항) 제작자 또는 소비자를 위해

중요

언어 지원: 현재 OpenTelemetry 자바 에이전트를 사용한 Kafka 클라이언트 계측에는 자바 애플리케이션만 지원됩니다.

Kafka 프로듀서 및 소비자 애플리케이션에서 애플리케이션 수준의 텔레메트리를 수집하려면 위의 OpenTelemetry 자바 에이전트 다운로드 단계에서 OpenTelemetry 자바 에이전트를 다운로드하세요.

에이전트로 시작하세요:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

구성 매개변수

다음 표에서는 주요 설정 시위에 대해 설명합니다.

매개변수설명
service.name예를 들어, 생산자 또는 소비자 애플리케이션의 고유한 이름으로 대체하십시오. order-process-service
kafka.cluster.name예를 들어, 수집기 설정에 사용된 것과 동일한 클러스터 이름으로 교체하십시오. my-kafka-cluster
otlp.endpoint예를 들어, OpenTelemetry Collector를 실행 중인 호스트의 호스트명 또는 IP로 대체하세요. http://collector-host-ip:4317

위의 설정은 collector-host-ip:4317에서 실행 중인 OpenTelemetry Collector로 텔레메트리를 전송합니다. 애플리케이션 텔레메트리 전용 별도 수집기를 원하시는 경우, 다음 설정으로 생성하세요:

자바 에이전트는 코드 변경 없이 즉시 사용 가능한 Kafka 계측 을 제공하여 요청 지연시간, 처리량 메트릭, 오류율 및 분산 트레이스를 캡처합니다.

고급 설정에 대해서는 Kafka 측정, 로그 문서를 참조하세요.

브로커에 Prometheus JMX Exporter를 설치하고 메트릭을 수집하여 뉴렐릭으로 전송할 수집기를 배포함으로써 포괄적인 Kafka 모니터링을 설정하려면 다음 단계를 따르십시오.

시작하기 전에

다음 사항을 확인하십시오:

  • 뉴렐릭 계정
  • 수집기 호스트에서 포트를 통한 각 브로커로의 네트워크 액세스 9404
  • 수집기에서 Kafka 부트스트랩 포트(일반적으로 9092)로의 네트워크 액세스

Prometheus JMX Exporter를 다운로드하세요.

각 Kafka 브로커 호스트에서 Prometheus JMX Exporter JAR를 다운로드합니다:

bash
$
# Create directory for Prometheus components
$
mkdir -p ~/opentelemetry
$
$
# Download the Prometheus JMX Exporter agent JAR
$
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
$
JMX_EXPORTER_VERSION="1.5.0"
$
curl -L -o ~/opentelemetry/jmx_prometheus_javaagent.jar \
>
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"

JMX 메트릭 설정 생성

수집할 Kafka 메트릭을 정의하는 JMX Exporter 설정 파일을 생성합니다. 각 브로커 호스트에 ~/opentelemetry/kafka-jmx-config.yaml (으)로 저장:

startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

메트릭 사용자 지정: Prometheus JMX Exporter 예제Kafka MBean 문서를 참조하여 패턴을 추가하거나 수정할 수 있습니다. 추가 설정은 JMX Exporter 규칙 문서 를 참조하십시오.

JMX Exporter를 사용하도록 Kafka 브로커 구성

Kafka 시작 옵션에 추가하여 Prometheus JMX Exporter를 각 Kafka 브로커에 자바 에이전트로 연결합니다.

단일 브로커 예시:

bash
$
JMX_JAR="$HOME/opentelemetry/jmx_prometheus_javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:${JMX_JAR}=9404:${JMX_CONFIG}" \
>
bin/kafka-server-start.sh config/server.properties &

이제 각 브로커는 포트 9404에서 Prometheus 메트릭을 노출합니다. 확인:

bash
$
curl http://localhost:9404/metrics | grep kafka_

중요

다중 브로커 클러스터: 모든 브로커에 동일한 KAFKA_OPTS 설정을 적용합니다. 각 브로커는 자체 호스트 IP의 포트 9404 에서 메트릭을 노출합니다.

수집기 설정 생성

모니터링 호스트의 ~/opentelemetry/collector-kafka-config.yaml 에 OpenTelemetry Collector 설정을 생성합니다.

Prometheus 수신기는 모든 브로커 엔드포인트를 스크랩합니다. 수집기는 Prometheus 엔드포인트를 스크래핑하는 것 외에도 0.0.0.0:4317 에서 모든 OTLP 데이터(애플리케이션 트레이스, 로그)를 수신 대기합니다.

receivers:
# OTLP receiver for application traces, metrics, and logs (listens on port 4317)
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Prometheus receiver scrapes JMX metrics from Kafka brokers
prometheus/kafka-jmx:
config:
scrape_configs:
- job_name: 'kafka-jmx-metrics'
metrics_path: /metrics
scrape_interval: 30s
static_configs:
# TODO: Replace each target with your broker hostname or IP, and set a unique broker.id per broker
- targets: ['broker1-host:9404']
labels:
broker.id: '0'
- targets: ['broker2-host:9404']
labels:
broker.id: '1'
- targets: ['broker3-host:9404']
labels:
broker.id: '2'
# Kafka metrics receiver for cluster-level consumer lag, topic, and partition metrics
kafkametrics/cluster:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
exporters:
otlp/backend:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
processors:
# Batch processor for efficient export
batch/export:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Transform metric naming conventions (underscore to dot, normalize special names)
transform/metric-naming:
metric_statements:
- context: metric
statements:
- replace_pattern(name, "_", ".")
- replace_pattern(name, "\\.load\\.1", ".load_1")
- replace_pattern(name, "\\.recent\\.util", ".recent_util")
- replace_pattern(name, "file\\.descriptor\\.count", "file_descriptor.count")
- replace_pattern(name, "\\.memory\\.pool\\.used\\.bytes$", ".memory.pool.used")
- replace_pattern(name, "\\.memory\\.pool\\.max\\.bytes$", ".memory.pool.max")
- replace_pattern(name, "\\.memory\\.pool\\.collection\\.used\\.bytes$", ".memory.pool.used_after_last_gc")
- replace_pattern(name, "\\.non\\.preferred\\.leader", ".non_preferred_leader")
- replace_pattern(name, "\\.under\\.min\\.isr", ".under_min_isr")
- replace_pattern(name, "\\.under\\.replicated", ".under_replicated")
- replace_pattern(name, "\\.total$", "") where name != "kafka.request.time.total"
- context: datapoint
statements:
- set(attributes["name"], attributes["gc"]) where attributes["gc"] != nil
- delete_key(attributes, "gc") where attributes["gc"] != nil
- set(attributes["name"], attributes["pool"]) where attributes["pool"] != nil
- delete_key(attributes, "pool") where attributes["pool"] != nil
# Add cluster name to all metrics
resource/cluster-name:
attributes:
- key: kafka.cluster.name
# TODO: Replace with your Kafka cluster name
value: ${env:KAFKA_CLUSTER_NAME}
action: upsert
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: datapoint
statements:
- delete_key(attributes, "broker.id")
# Filter out scrape overhead metrics
filter/scrape-overhead:
metrics:
exclude:
match_type: regexp
metric_names:
- "^jmx_.*"
- "^process_.*"
- "^jvm_buffer_pool_.*"
- "^jvm_threads_.*"
- "^jvm_classes_.*"
- "^jvm_memory_(heap|non_heap)_(committed|init|max|used)_bytes$"
- "^jvm_compilation_.*"
- "^jvm_(runtime|info).*"
- "^jvm_memory_pool_(allocated_bytes_total|committed_bytes|init_bytes|collection_(committed|init|max)_bytes)$"
# Include only cluster-level metrics for the cluster pipeline
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Exclude cluster-level metrics from the broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Remove unnecessary attributes
transform/remove_attributes:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
- delete_key(attributes, "server.address")
- delete_key(attributes, "server.port")
- delete_key(attributes, "service.instance.id")
- delete_key(attributes, "host.name")
- delete_key(attributes, "url.scheme")
# Aggregate partition metrics to topic level
metricstransform/topic-aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
# Filter out original partition replicas metric
filter/exclude_partition_replicas_metric:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
# Filter internal Kafka topics as a safety net
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
# Convert cumulative to delta metrics
cumulativetodelta:
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
service:
pipelines:
# Application traces from instrumented Kafka clients and apps
traces:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application metrics from instrumented Kafka clients and apps
metrics:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application logs from instrumented Kafka clients and apps
logs:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Broker-level metrics from Prometheus JMX scraping
metrics/broker:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/exclude_cluster_metrics
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Prometheus JMX scraping
metrics/cluster/prometheus:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- memory_limiter
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Kafka metrics receiver (consumer lag, topics, partitions)
metrics/cluster/kafkametrics:
receivers:
- kafkametrics/cluster
processors:
- resource/cluster-name
- filter/internal_topics
- transform/remove_attributes
- metricstransform/topic-aggregation
- filter/exclude_partition_replicas_metric
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend

환경 변수 설정

수집기를 시작하기 전에 모니터링 호스트에 필수 환경 변수를 설정하십시오:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region
$
# EU region: https://otlp.eu01.nr-data.net:4317

구성 매개변수

다음 표에서는 주요 설정 시위에 대해 설명합니다.

변하기 쉬운설명
NEW_RELIC_LICENSE_KEY예를 들어, 귀하의 뉴렐릭 라이선스 키 YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEKafka 클러스터의 고유한 이름, 예를 들어 my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSES예를 들어, 사용자의 Kafka 부트스트랩 브로커 주소 broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTOTLP 인제스트 엔드포인트. US 리전에는 https://otlp.nr-data.net:4317 을(를) 사용하고 EU 리전에는 https://otlp.eu01.nr-data.net:4317 을(를) 사용하십시오. 다른 설정의 경우, OTLP 엔드포인트 구성을 참조하세요.

수집기를 설치하고 시작하세요.

모니터링 호스트에 수집기를 설치하고 실행하십시오. NRDOT Collector(뉴렐릭 배포판) 또는 OpenTelemetry Collector 중에서 선택하세요:

NRDOT Collector 는 뉴렐릭의 지원이 제공되는 OpenTelemetry Collector의 뉴렐릭 배포판입니다. 자세한 내용은 NRDOT Collector GitHub 저장소를 참조하십시오.

1단계. 바이너리 다운로드 및 설치

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

중요

다른 운영 시스템 및 복제의 경우 NRDOT Collector 릴리스를 방문하여 시스템에 적합한 바이너리를 다운로드하세요.

2단계. 수집기 시작

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

수집기는 몇 분 내에 Kafka 메트릭을 스크래핑하여 뉴렐릭으로 전송하기 시작합니다.

1단계. 바이너리 다운로드 및 설치

bash
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
tar -xzf otelcol-contrib.tar.gz
$
sudo mv otelcol-contrib /usr/local/bin/
$
otelcol-contrib --version

다른 운영 시스템에 대해서는 OpenTelemetry Collector 릴리스 페이지를 방문하세요.

2단계. 수집기 시작

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

수집기는 몇 분 내에 Kafka 메트릭을 스크래핑하여 뉴렐릭으로 전송하기 시작합니다.

(선택사항) 제작자 또는 소비자를 위해

중요

언어 지원: 현재 OpenTelemetry 자바 에이전트를 사용한 Kafka 클라이언트 계측에는 자바 애플리케이션만 지원됩니다.

Kafka 프로듀서 및 소비자 애플리케이션에서 애플리케이션 수준의 텔레메트리를 수집하려면, 아직 다운로드하지 않은 경우 OpenTelemetry 자바 에이전트를 다운로드하세요:

bash
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

에이전트로 시작하세요:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

구성 매개변수

다음 표에서는 주요 설정 시위에 대해 설명합니다.

매개변수설명
service.name예를 들어, 생산자 또는 소비자 애플리케이션의 고유한 이름으로 대체하십시오. order-process-service
kafka.cluster.name예를 들어, 수집기 설정에 사용된 것과 동일한 클러스터 이름으로 교체하십시오. my-kafka-cluster
otlp.endpoint예를 들어, OpenTelemetry Collector를 실행 중인 호스트의 호스트명 또는 IP로 대체하세요. http://collector-host-ip:4317

위의 설정은 collector-host-ip:4317에서 실행 중인 OpenTelemetry Collector로 텔레메트리를 전송합니다. 애플리케이션 텔레메트리 전용 별도 수집기를 원하시는 경우, 다음 설정으로 생성하세요:

자바 에이전트는 코드 변경 없이 즉시 사용 가능한 Kafka 계측 을 제공하여 요청 지연시간, 처리량 메트릭, 오류율 및 분산 트레이스를 캡처합니다. 고급 설정에 대해서는 Kafka 계측 문서를 참조하세요.

(선택 사항) Kafka 브로커 로그 전달

Kafka 브로커 로그를 수집하여 뉴렐릭으로 전송하려면 OpenTelemetry Collector 에서 filelog 수신기를 구성하십시오.

데이터 찾기

몇 분 후 Kafka 데이터가 뉴렐릭에 나타납니다. 뉴렐릭 UI의 여러 뷰에서 Kafka 데이터를 탐색하는 방법에 대한 자세한 지침은 데이터 찾기 를 참조하세요.

다음 표는 각 신호 유형이 저장되는 위치를 요약합니다. 아래의 모든 쿼리에서 my-kafka-cluster 을(를) KAFKA_CLUSTER_NAME 값으로 바꾸십시오:

시그널이벤트 유형포함 사항
메트릭Metric브로커, 토픽, 파티션, 소비자 그룹 및 JVM 메트릭
로그Log생산자 및 소비자 애플리케이션의 로그(OTel 자바 에이전트를 통해) 및 자바 에이전트를 통해 수집된 브로커 로그
트레이스Span토픽 전반에 걸친 메시지당 publishreceive 작업을 포함하는 생산자 및 소비자 스팬

메트릭

브로커, 토픽, 파티션, 소비자 그룹 및 JVM 메트릭은 Metric 이벤트 유형에 저장됩니다. my-kafka-cluster 을(를) KAFKA_CLUSTER_NAME 값으로 교체합니다:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

로그

OpenTelemetry 자바 에이전트로 계측된 생산자 및 소비자 애플리케이션의 로그와 -Dotel.logs.exporter=otlp 이(가) 설정된 경우의 브로커 로그는 Log 이벤트 유형에 저장됩니다:

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

트레이스

토픽 전반의 메시지당 publishreceive 작업을 포함한 생산자 및 소비자 스팬은 Span 이벤트 유형에 저장됩니다:

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

예시

도커 Compose 설정, OTel Collector 설정, OTel 자바 에이전트 설정 및 샘플 생산자/소비자 애플리케이션이 포함된 완전한 작동 예제는 뉴렐릭 OpenTelemetry 예제 저장소에서 확인할 수 있습니다.

문제점 해결

다음 단계

Copyright © 2026 New Relic Inc.

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.