• /
  • EnglishEspañolFrançais日本語한국어Português
  • ログイン今すぐ開始

この機械翻訳は、参考として提供されています。

英語版と翻訳版に矛盾がある場合は、英語版が優先されます。詳細については、このページを参照してください。

問題を作成する

OpenTelemetryを使用した自己ホスト型 Kafka の監視

Linuxホストに直接OpenTelemetry Collectorをインストールすることで、セルフホスト型Apache Kafkaクラスタをモニターします。KafkaブローカーからJMXメトリクスを収集するには、OpenTelemetry JavaエージェントまたはPrometheus JMX Exporterのいずれかのアプローチを選択します。

アーキテクチャー

New Relicは、セルフホスト型Kafkaを監視するための2つのアプローチをサポートしています:OpenTelemetry JavaエージェントまたはPrometheus JMX Exporter。次の図は、各アプローチのデータフローを示しています。

Self-hosted Kafka monitoring architecture

インストレーション手順

以下の手順に従って、ブローカーにOpenTelemetry Javaエージェントをインストールし、メトリクスとログを収集してNew Relicに送信するためのコレクターをデプロイして、包括的なKafka監視をセットアップします。

あなたが始める前に

以下のものを用意してください:

  • New Relicアカウント
  • コレクターから Kafka ブートストラップ サーバー ポート (通常は 9092) へのネットワーク アクセス

コレクター設定の作成

監視ホストの~/opentelemetry/collector-kafka-config.yamlにメインの OpenTelemetry Collector 設定を作成します。

receivers:
# OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics
kafkametrics:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
processors:
batch/aggregation:
send_batch_size: 1024
timeout: 30s
resourcedetection:
detectors: [env, ec2, system]
system:
resource_attributes:
host.name:
enabled: true
host.id:
enabled: true
resource:
attributes:
- action: insert
key: kafka.cluster.name
value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id:
metric_statements:
# Remove broker.id for cluster-level metrics — these represent the whole cluster,
# not a specific broker. broker.id is retained on broker-level metrics pipelines.
- context: resource
statements:
- delete_key(attributes, "broker.id")
transform/remove_extra_attributes:
metric_statements:
- context: resource
statements:
# Delete all attributes starting with "process."
- delete_matching_keys(attributes, "^process\\..*")
# Delete all attributes starting with "telemetry."
- delete_matching_keys(attributes, "^telemetry\\..*")
- delete_key(attributes, "host.arch")
- delete_key(attributes, "os.description")
- delete_key(attributes, "host.image.id")
- delete_key(attributes, "host.type")
- delete_matching_keys(attributes, "^cloud\\..*")
- delete_key(attributes, "service.instance.id") where IsMatch(attributes["service.name"], "^unknown_service:")
- delete_key(attributes, "service.name") where IsMatch(attributes["service.name"], "^unknown_service:")
# Filter internal Kafka topics as a safety net (kafkametrics topic_match handles the receiver side)
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "kafka\\.partition\\.offline"
- "kafka\\.(leader|unclean)\\.election\\.rate"
- "kafka\\.partition\\.non_preferred_leader"
- "kafka\\.broker\\.fenced\\.count"
- "kafka\\.cluster\\.partition\\.count"
- "kafka\\.cluster\\.topic\\.count"
transform/des_units:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
filter/remove_partition_level_replicas:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
exporters:
otlp/newrelic:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
compression: gzip
timeout: 30s
service:
pipelines:
# Broker metrics pipeline (excludes cluster-level metrics)
metrics/broker:
receivers: [otlp, kafkametrics]
processors:
- resourcedetection
- resource
- filter/exclude_cluster_metrics
- filter/internal_topics
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- metricstransform/kafka_topic_sum_aggregation
- filter/remove_partition_level_replicas
- batch/aggregation
exporters: [otlp/newrelic]
# Cluster metrics pipeline (controller-emitted metrics like offline partitions, topic/partition counts — no broker.id)
metrics/cluster:
receivers: [otlp]
processors:
- resourcedetection
- resource
- filter/include_cluster_metrics
- transform/remove_broker_id
- transform/remove_extra_attributes
- transform/des_units
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/aggregation
exporters: [otlp/newrelic]
# APM traces pipeline (producer + consumer spans via OTel Java agent)
traces/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]
# APM logs pipeline (producer + consumer logs via OTel Java agent)
logs/apps:
receivers: [otlp]
processors: [resourcedetection, resource, batch/aggregation]
exporters: [otlp/newrelic]

環境変数の設定

コレクターをインストールする前に、監視ホストで必要な環境変数を設定します:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region

設定パラメーター

次の表では、キー設定について説明します。

変数説明
NEW_RELIC_LICENSE_KEYたとえば、New Relicのライセンスキー YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEたとえば、Kafkaクラスタの一意の名前 my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSESたとえば、Kafkaブートストラップブローカーのアドレス broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTOTLP取り込みエンドポイント。USリージョンにはhttps://otlp.nr-data.net:4317を、EUリージョンにはhttps://otlp.eu01.nr-data.net:4317を使用します。その他の設定については、OTLPエンドポイントの設定を参照してください。

コレクターをインストールして起動する

監視ホストにコレクターをインストールして実行します。NRDOT Collector(New Relicのディストリビューション)またはOpenTelemetry Collectorのいずれかを選択します:

ヒント

NRDOT Collector は、New Relic のサポートを受けた OpenTelemetry Collector の New Relic ディストリビューションです。

ステップ 1. バイナリのダウンロードおよびインストール

ホストOS用のNRDOT Collectorバイナリをダウンロードしてインストールします。以下の例はlinux_amd64アーキテクチャー向けです:

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

重要

他の OS およびアーキテクチャーについては、 NRDOT Collectorリリースにアクセスし、システムに適切なバイナリをダウンロードしてください。

ステップ2. コレクターを開始します

設定ファイルを使用してコレクターを実行して、監視を開始します。

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

コレクターは現在実行中であり、データを受信する準備ができています。New Relicにメトリクスが表示される前に、KafkaブローカーにJavaエージェントをアタッチする残りの手順を完了してください。

ステップ 1. バイナリのダウンロードおよびインストール

ホストOS用のOpenTelemetry Collector Contribバイナリをダウンロードしてインストールします。以下の例はlinux_amd64アーキテクチャー用です:

bash
$
# Set version and architecture
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
# Download the collector
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
# Extract the binary
$
tar -xzf otelcol-contrib.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv otelcol-contrib /usr/local/bin/
$
$
# Verify installation
$
otelcol-contrib --version

その他の OS については、 OpenTelemetry Collectorリリースページをご覧ください。

ステップ2. コレクターを開始します

設定ファイルを使用してコレクターを実行して、監視を開始します。

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

コレクターは現在実行中であり、データを受信する準備ができています。New Relicにメトリクスが表示される前に、KafkaブローカーにJavaエージェントをアタッチする残りの手順を完了してください。

OpenTelemetry Javaエージェントをダウンロードします

重要

JavaエージェントをアタッチしたKafkaブローカーを(再)起動する前に、OpenTelemetry Collectorが実行されていることを確認してください。エージェントはブローカーの起動直後にメトリクスの送信を開始するため、コレクターはそれらを受信できる状態である必要があります。

OpenTelemetry Javaエージェントは、KafkaブローカーにアタッチされたJavaエージェントとして実行され、KafkaおよびJMXメトリクスを収集し、OTLP経由でコレクターに送信します:

bash
$
# Create directory for OpenTelemetry components
$
mkdir -p ~/opentelemetry
$
$
# Download OpenTelemetry Java agent
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

JMXカスタム設定の作成

JMX MBeanからKafkaメトリクスを収集するために、OpenTelemetry JavaエージェントのJMX設定ファイルを作成します。

各ブローカーホストで、次の設定を使用してファイル~/opentelemetry/kafka-jmx-config.yamlを作成します:

---
rules:
# Per-topic custom metrics using custom MBean commands
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=*
metricAttribute:
topic: param(topic)
mapping:
Count:
metric: kafka.prod.msg.count
type: counter
desc: The number of messages per topic
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(in)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=*
metricAttribute:
topic: param(topic)
direction: const(out)
mapping:
Count:
metric: kafka.topic.io
type: counter
desc: The bytes received or sent per topic
unit: By
# Cluster-level metrics using controller-based MBeans
- bean: kafka.controller:type=KafkaController,name=GlobalTopicCount
mapping:
Value:
metric: kafka.cluster.topic.count
type: gauge
desc: The total number of global topics in the cluster
unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount
mapping:
Value:
metric: kafka.cluster.partition.count
type: gauge
desc: The total number of global partitions in the cluster
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount
mapping:
Value:
metric: kafka.broker.fenced.count
type: gauge
desc: The number of fenced brokers in the cluster
unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount
mapping:
Value:
metric: kafka.partition.non_preferred_leader
type: gauge
desc: The count of topic partitions for which the leader is not the preferred leader
unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans
- bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount
mapping:
Value:
metric: kafka.partition.under_min_isr
type: gauge
desc: The number of partitions where the number of in-sync replicas is less than the minimum
unit: "{partition}"
# Broker uptime metric using JVM Runtime
- bean: java.lang:type=Runtime
mapping:
Uptime:
metric: kafka.broker.uptime
type: gauge
desc: Broker uptime in milliseconds
unit: ms
# Leader count per broker
- bean: kafka.server:type=ReplicaManager,name=LeaderCount
mapping:
Value:
metric: kafka.broker.leader.count
type: gauge
desc: Number of partitions for which this broker is the leader
unit: "{partition}"
# JVM metrics
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionCount:
metric: jvm.gc.collections.count
type: counter
unit: "{collection}"
desc: total number of collections that have occurred
metricAttribute:
name: param(name)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.max:
metric: heap.max
desc: current heap usage
type: gauge
HeapMemoryUsage.used:
metric: heap.used
desc: current heap usage
type: gauge
- bean: java.lang:type=Threading
mapping:
ThreadCount:
metric: jvm.thread.count
type: gauge
unit: "{thread}"
desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemCpuLoad:
metric: system.cpu.utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count - increasing indicates broker failures
unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
# Request latency: total count, 50th percentile, and average (99p kept above)
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile
# GC elapsed time (cumulative collection time in ms)
- bean: java.lang:type=GarbageCollector,name=*
mapping:
CollectionTime:
metric: jvm.gc.collections.elapsed
type: counter
unit: ms
desc: the approximate accumulated collection elapsed time in milliseconds
metricAttribute:
name: param(name)
# JVM class loading
- bean: java.lang:type=ClassLoading
mapping:
LoadedClassCount:
metric: jvm.class.count
type: gauge
unit: "{class}"
desc: Currently loaded class count
# JVM heap committed (in addition to heap.used and heap.max)
- bean: java.lang:type=Memory
unit: By
prefix: jvm.memory.
dropNegativeValues: true
mapping:
HeapMemoryUsage.committed:
metric: heap.committed
desc: Committed heap memory
type: gauge
# Additional JVM CPU and system metrics
- bean: java.lang:type=OperatingSystem
prefix: jvm.
dropNegativeValues: true
mapping:
SystemLoadAverage:
metric: system.cpu.load_1m
type: gauge
unit: "{run_queue_item}"
desc: System load average (1 minute) - alert if > CPU count
AvailableProcessors:
metric: cpu.count
type: gauge
unit: "{cpu}"
desc: Number of processors available
ProcessCpuLoad:
metric: cpu.recent_utilization
type: gauge
unit: '1'
desc: Recent CPU utilization for JVM process (0.0 to 1.0)
OpenFileDescriptorCount:
metric: file_descriptor.count
type: gauge
unit: "{file_descriptor}"
desc: Number of open file descriptors - alert if > 80% of ulimit
# JVM memory pool breakdown (by generation: G1 Old Gen, Eden, Survivor, etc.)
- bean: java.lang:type=MemoryPool,name=*
type: gauge
unit: By
metricAttribute:
name: param(name)
mapping:
Usage.used:
metric: jvm.memory.pool.used
desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor)
Usage.max:
metric: jvm.memory.pool.max
desc: Maximum memory pool size
CollectionUsage.used:
metric: jvm.memory.pool.used_after_last_gc
desc: Memory used after last GC (shows retained memory baseline)

Kafkaブローカーを構成する

Kafkaを起動する前にKAFKA_OPTS環境変数を設定して、OpenTelemetry JavaエージェントをKafkaブローカーにアタッチします。

単一ブローカーの例:

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \
>
-Dotel.jmx.enabled=true \
>
-Dotel.jmx.config=$JMX_CONFIG \
>
-Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol=grpc \
>
-Dotel.metrics.exporter=otlp \
>
-Dotel.logs.exporter=otlp \
>
-Dotel.instrumentation.runtime-telemetry.enabled=false \
>
-Dotel.metric.export.interval=30000" \
>
bin/kafka-server-start.sh config/server.properties &

重要

マルチブローカーの集中: 複数のブローカーの場合は、各ブローカーの-Dotel.resource.attributes問題で一意のbroker.id値 (例: broker.id=1broker.id=2broker.id=3) を持つ同じ設定を使用します。

ヒント

ブローカーログは、上記の-Dotel.logs.exporter=otlpフラグにより自動的に有効になります。ブローカーのログ収集を無効にするには、代わりに-Dotel.logs.exporter=noneを設定します。

設定パラメーター

次の表では、キー設定について説明します。

パラメータ説明
otlp.endpointたとえば、OpenTelemetry Collectorを実行しているホストのIPまたはホスト名に置き換えます。 http://collector-host-ip:4317
broker.id1を各ブローカーの一意のブローカーID(たとえばbroker.id=1broker.id=2)に置き換えます、 broker.id=3
kafka.cluster.namemy-kafka-clusterをKafkaクラスタ名に置き換えます。コレクターの設定で設定された値と一致する必要があります。
logs.exporterotlpに設定すると、ブローカーのログ収集を有効にします。ブローカーのログ転送を無効にするには、noneに設定します。

完全な設定オプションについては、Javaエージェント設定ガイドを参照してください。

(オプション) 計装プロデューサーまたは消費者アプリケーション

重要

言語サポート: 現在、OpenTelemetry Javaエージェントを使用したKafkaクライアントの計装でサポートされているのはJavaアプリケーションのみです。

お使いのKafkaプロデューサーおよび消費者アプリケーションからアプリケーションレベルのテレメトリーを収集するには、上記のOpenTelemetry JavaエージェントのダウンロードステップからOpenTelemetry Javaエージェントをダウンロードしてください。

エージェントを使用してアプリケーションを開始します。

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

設定パラメーター

次の表では、キー設定について説明します。

パラメータ説明
service.nameたとえば、プロデューサーまたは消費者アプリケーションの一意の名前に置き換えてください。 order-process-service
kafka.cluster.nameたとえば、コレクター設定で使用されているのと同じクラスタ名に置き換えます。 my-kafka-cluster
otlp.endpointたとえば、OpenTelemetry Collectorを実行しているホストのホスト名またはIPに置き換えます http://collector-host-ip:4317

ヒント

上記の設定は、collector-host-ip:4317で実行されているOpenTelemetry Collectorにテレメトリーを送信します。アプリケーションテレメトリー専用の別のコレクターが必要な場合は、以下の設定で作成してください:

Javaエージェントは、コード変更なしですぐに使えるKafkaの計装を提供し、リクエストレイテンシ、スループットメトリクス、エラー率、およびディストリビューティッド(分散)トレーシングをキャプチャします。

高度な設定については、 Kafka 計装ドキュメントを参照してください。

ブローカーにPrometheus JMX Exporterをインストールし、メトリクスを収集してNew Relicに送信するコレクターをデプロイすることで、包括的なKafka監視をセットアップするには、以下の手順に従ってください。

あなたが始める前に

以下のものを用意してください:

  • New Relicアカウント
  • コレクターホストから各ブローカーのポートへのネットワークアクセス 9404
  • コレクターからKafkaブートストラップポート(通常は9092)へのネットワークアクセス

Prometheus JMX Exporter をダウンロードします

各KafkaブローカーホストにPrometheus JMX Exporter JARをダウンロードします:

bash
$
# Create directory for Prometheus components
$
mkdir -p ~/opentelemetry
$
$
# Download the Prometheus JMX Exporter agent JAR
$
# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.
$
JMX_EXPORTER_VERSION="1.5.0"
$
curl -L -o ~/opentelemetry/jmx_prometheus_javaagent.jar \
>
"https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"

JMXメトリクス設定を作成

収集するKafkaメトリクスを定義するJMX Exporterの設定ファイルを作成します。各ブローカーホストで~/opentelemetry/kafka-jmx-config.yamlとして保存します:

startDelaySeconds: 0
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Cluster-level controller metrics
- pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value'
name: kafka_cluster_topic_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value'
name: kafka_cluster_partition_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value'
name: kafka_broker_fenced_count
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value'
name: kafka_partition_non_preferred_leader
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value'
name: kafka_partition_offline
type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value'
name: kafka_controller_active_count
type: GAUGE
# Broker-level replica metrics
- pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value'
name: kafka_partition_under_min_isr
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value'
name: kafka_broker_leader_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value'
name: kafka_partition_count
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value'
name: kafka_partition_under_replicated
type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count'
name: kafka_isr_operation_count
type: COUNTER
labels:
operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value'
name: kafka_max_lag
type: GAUGE
# Broker topic metrics (totals)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count'
name: kafka_message_count
type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count'
name: kafka_request_count
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count'
name: kafka_request_failed
type: COUNTER
labels:
type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count'
name: kafka_network_io
type: COUNTER
labels:
direction: "out"
# Per-topic metrics (only appear after traffic flows)
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count'
name: kafka_prod_msg_count
type: COUNTER
labels:
topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count'
name: kafka_topic_io
type: COUNTER
labels:
topic: "$1"
direction: "out"
# Request metrics
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile'
name: kafka_request_time_99p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value'
name: kafka_request_queue
type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value'
name: kafka_purgatory_size
type: GAUGE
labels:
type: "$1"
# Controller stats
- pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count'
name: kafka_leader_election_rate
type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count'
name: kafka_unclean_election_rate
type: COUNTER
# JVM Garbage Collection
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount'
name: jvm_gc_collections_count
type: COUNTER
labels:
name: "$1"
# JVM Memory
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>max'
name: jvm_memory_heap_max
type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used'
name: jvm_memory_heap_used
type: GAUGE
# JVM Threading and System
- pattern: 'java.lang<type=Threading><>ThreadCount'
name: jvm_thread_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad'
name: jvm_system_cpu_utilization
type: GAUGE
# Broker uptime
- pattern: 'java.lang<type=Runtime><>Uptime'
name: kafka_broker_uptime
type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above)
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count'
name: kafka_request_time_total
type: COUNTER
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile'
name: kafka_request_time_50p
type: GAUGE
labels:
type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean'
name: kafka_request_time_avg
type: GAUGE
labels:
type: "$1"
# Log flush metrics
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count'
name: kafka_logs_flush_count
type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile'
name: kafka_logs_flush_time_50p
type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile'
name: kafka_logs_flush_time_99p
type: GAUGE
# JVM GC elapsed time
- pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime'
name: jvm_gc_collections_elapsed
type: COUNTER
labels:
name: "$1"
# JVM Memory heap committed
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed'
name: jvm_memory_heap_committed
type: GAUGE
# JVM class loading
- pattern: 'java.lang<type=ClassLoading><>LoadedClassCount'
name: jvm_class_count
type: GAUGE
# Additional JVM OS metrics
- pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage'
name: jvm_system_cpu_load_1m
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors'
name: jvm_cpu_count
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad'
name: jvm_cpu_recent_utilization
type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount'
name: jvm_file_descriptor_count
type: GAUGE
# JVM Memory Pool
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used'
name: jvm_memory_pool_used
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max'
name: jvm_memory_pool_max
type: GAUGE
labels:
name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used'
name: jvm_memory_pool_used_after_last_gc
type: GAUGE
labels:
name: "$1"

ヒント

メトリクスのカスタマイズ: Prometheus JMX Exporterの例およびKafka MBeanドキュメントを参照して、パターンを追加または変更できます。追加の設定については、JMX Exporter ルールのドキュメントを参照してください。

JMX Exporter を使用するように Kafka ブローカーを設定する

Kafkaの起動オプションに追加することで、Prometheus JMX ExporterをJavaエージェントとして各Kafkaブローカーにアタッチします。

単一ブローカーの例:

bash
$
JMX_JAR="$HOME/opentelemetry/jmx_prometheus_javaagent.jar"
$
JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"
$
$
nohup env KAFKA_OPTS="-javaagent:${JMX_JAR}=9404:${JMX_CONFIG}" \
>
bin/kafka-server-start.sh config/server.properties &

各ブローカーは、ポート9404でPrometheusメトリクスを公開するようになります。確認:

bash
$
curl http://localhost:9404/metrics | grep kafka_

重要

マルチブローカークラスタ: すべてのブローカーに同じKAFKA_OPTS設定を適用します。各ブローカーは、自身のホストIPからポート9404でメトリクスを公開します。

コレクター設定の作成

監視ホスト上の~/opentelemetry/collector-kafka-config.yamlにOpenTelemetry Collectorの設定を作成します。

Prometheusレシーバーは、すべてのブローカーエンドポイントをスクレイピングします。コレクターは、Prometheusエンドポイントのスクレイピングに加えて、OTLPデータ(アプリケーションのトレース、ログ)を0.0.0.0:4317でリッスンします。

receivers:
# OTLP receiver for application traces, metrics, and logs (listens on port 4317)
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
# Prometheus receiver scrapes JMX metrics from Kafka brokers
prometheus/kafka-jmx:
config:
scrape_configs:
- job_name: 'kafka-jmx-metrics'
metrics_path: /metrics
scrape_interval: 30s
static_configs:
# TODO: Replace each target with your broker hostname or IP, and set a unique broker.id per broker
- targets: ['broker1-host:9404']
labels:
broker.id: '0'
- targets: ['broker2-host:9404']
labels:
broker.id: '1'
- targets: ['broker3-host:9404']
labels:
broker.id: '2'
# Kafka metrics receiver for cluster-level consumer lag, topic, and partition metrics
kafkametrics/cluster:
brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES}
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
collection_interval: 30s
# Exclude internal Kafka topics (prefixed with __) at the source
topic_match: "^[^_].*$"
metrics:
kafka.topic.min_insync_replicas:
enabled: true
kafka.topic.replication_factor:
enabled: true
kafka.partition.replicas:
enabled: false
kafka.partition.oldest_offset:
enabled: false
kafka.partition.current_offset:
enabled: false
exporters:
otlp/backend:
endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT}
headers:
api-key: ${env:NEW_RELIC_LICENSE_KEY}
tls:
insecure: false
sending_queue:
num_consumers: 12
queue_size: 5000
retry_on_failure:
enabled: true
processors:
# Batch processor for efficient export
batch/export:
send_batch_size: 1024
timeout: 30s
# Memory limiter to prevent OOM
memory_limiter:
limit_percentage: 80
spike_limit_percentage: 30
check_interval: 1s
# Transform metric naming conventions (underscore to dot, normalize special names)
transform/metric-naming:
metric_statements:
- context: metric
statements:
- replace_pattern(name, "_", ".")
- replace_pattern(name, "\\.load\\.1", ".load_1")
- replace_pattern(name, "\\.recent\\.util", ".recent_util")
- replace_pattern(name, "file\\.descriptor\\.count", "file_descriptor.count")
- replace_pattern(name, "\\.memory\\.pool\\.used\\.bytes$", ".memory.pool.used")
- replace_pattern(name, "\\.memory\\.pool\\.max\\.bytes$", ".memory.pool.max")
- replace_pattern(name, "\\.memory\\.pool\\.collection\\.used\\.bytes$", ".memory.pool.used_after_last_gc")
- replace_pattern(name, "\\.non\\.preferred\\.leader", ".non_preferred_leader")
- replace_pattern(name, "\\.under\\.min\\.isr", ".under_min_isr")
- replace_pattern(name, "\\.under\\.replicated", ".under_replicated")
- replace_pattern(name, "\\.total$", "") where name != "kafka.request.time.total"
- context: datapoint
statements:
- set(attributes["name"], attributes["gc"]) where attributes["gc"] != nil
- delete_key(attributes, "gc") where attributes["gc"] != nil
- set(attributes["name"], attributes["pool"]) where attributes["pool"] != nil
- delete_key(attributes, "pool") where attributes["pool"] != nil
# Add cluster name to all metrics
resource/cluster-name:
attributes:
- key: kafka.cluster.name
# TODO: Replace with your Kafka cluster name
value: ${env:KAFKA_CLUSTER_NAME}
action: upsert
# Remove broker.id for cluster-level metrics
transform/remove_broker_id:
metric_statements:
- context: datapoint
statements:
- delete_key(attributes, "broker.id")
# Filter out scrape overhead metrics
filter/scrape-overhead:
metrics:
exclude:
match_type: regexp
metric_names:
- "^jmx_.*"
- "^process_.*"
- "^jvm_buffer_pool_.*"
- "^jvm_threads_.*"
- "^jvm_classes_.*"
- "^jvm_memory_(heap|non_heap)_(committed|init|max|used)_bytes$"
- "^jvm_compilation_.*"
- "^jvm_(runtime|info).*"
- "^jvm_memory_pool_(allocated_bytes_total|committed_bytes|init_bytes|collection_(committed|init|max)_bytes)$"
# Include only cluster-level metrics for the cluster pipeline
filter/include_cluster_metrics:
metrics:
include:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Exclude cluster-level metrics from the broker pipeline
filter/exclude_cluster_metrics:
metrics:
exclude:
match_type: regexp
metric_names:
- "^kafka\\.partition\\.offline$"
- "^kafka\\.(leader|unclean)\\.election\\.rate$"
- "^kafka\\.partition\\.non_preferred_leader$"
- "^kafka\\.broker\\.fenced\\.count$"
- "^kafka\\.cluster\\.partition\\.count$"
- "^kafka\\.cluster\\.topic\\.count$"
# Remove unnecessary attributes
transform/remove_attributes:
metric_statements:
- context: metric
statements:
- set(description, "") where description != ""
- set(unit, "") where unit != ""
- context: resource
statements:
- delete_key(attributes, "server.address")
- delete_key(attributes, "server.port")
- delete_key(attributes, "service.instance.id")
- delete_key(attributes, "host.name")
- delete_key(attributes, "url.scheme")
# Aggregate partition metrics to topic level
metricstransform/topic-aggregation:
transforms:
- include: kafka.partition.replicas_in_sync
action: insert
new_name: kafka.partition.replicas_in_sync.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
- include: kafka.partition.replicas
action: insert
new_name: kafka.partition.replicas.total
operations:
- action: aggregate_labels
label_set: [topic]
aggregation_type: sum
# Filter out original partition replicas metric
filter/exclude_partition_replicas_metric:
metrics:
exclude:
match_type: strict
metric_names:
- kafka.partition.replicas_in_sync
# Filter internal Kafka topics as a safety net
filter/internal_topics:
metrics:
datapoint:
- 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
# Convert cumulative to delta metrics
cumulativetodelta:
groupbyattrs/cluster:
keys: [kafka.cluster.name]
metricstransform/cluster_max:
transforms:
- include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count"
match_type: regexp
action: update
operations:
- action: aggregate_labels
aggregation_type: max
label_set: []
service:
pipelines:
# Application traces from instrumented Kafka clients and apps
traces:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application metrics from instrumented Kafka clients and apps
metrics:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Application logs from instrumented Kafka clients and apps
logs:
receivers: [otlp]
processors: [memory_limiter, batch/export]
exporters: [otlp/backend]
# Broker-level metrics from Prometheus JMX scraping
metrics/broker:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/exclude_cluster_metrics
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Prometheus JMX scraping
metrics/cluster/prometheus:
receivers:
- prometheus/kafka-jmx
processors:
- resource/cluster-name
- filter/scrape-overhead
- transform/metric-naming
- transform/remove_attributes
- filter/include_cluster_metrics
- transform/remove_broker_id
- memory_limiter
- cumulativetodelta
- groupbyattrs/cluster
- metricstransform/cluster_max
- batch/export
exporters:
- otlp/backend
# Cluster-level metrics from Kafka metrics receiver (consumer lag, topics, partitions)
metrics/cluster/kafkametrics:
receivers:
- kafkametrics/cluster
processors:
- resource/cluster-name
- filter/internal_topics
- transform/remove_attributes
- metricstransform/topic-aggregation
- filter/exclude_partition_replicas_metric
- memory_limiter
- cumulativetodelta
- batch/export
exporters:
- otlp/backend

環境変数の設定

コレクターを起動する前に、監視ホストで必要な環境変数を設定します:

bash
$
export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"
$
export KAFKA_CLUSTER_NAME="my-kafka-cluster"
$
export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"
$
export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region
$
# EU region: https://otlp.eu01.nr-data.net:4317

設定パラメーター

次の表では、キー設定について説明します。

変数説明
NEW_RELIC_LICENSE_KEYたとえば、New Relicのライセンスキー YOUR_LICENSE_KEY
KAFKA_CLUSTER_NAMEたとえば、Kafkaクラスタの一意の名前 my-kafka-cluster
KAFKA_BOOTSTRAP_BROKER_ADDRESSESたとえば、Kafkaブートストラップブローカーのアドレス broker1-host:9092,broker2-host:9092,broker3-host:9092
NEW_RELIC_OTLP_ENDPOINTOTLP取り込みエンドポイント。USリージョンにはhttps://otlp.nr-data.net:4317を、EUリージョンにはhttps://otlp.eu01.nr-data.net:4317を使用します。その他の設定については、OTLPエンドポイントの設定を参照してください。

コレクターをインストールして起動する

監視ホストにコレクターをインストールして実行します。NRDOT Collector(New Relicのディストリビューション)またはOpenTelemetry Collectorのいずれかを選択します:

ヒント

NRDOT Collectorは、New Relicのサポートによる支援が受けられる、New RelicのOpenTelemetry Collectorディストリビューションです。詳細については、NRDOT Collector GitHubリポジトリを参照してください。

ステップ 1. バイナリのダウンロードおよびインストール

bash
$
# Set version and architecture
$
NRDOT_VERSION="1.9.0"
$
ARCH="amd64" # or arm64
$
$
# Download and extract
$
curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \
>
--location --output collector.tar.gz
$
tar -xzf collector.tar.gz
$
$
# Move to a location in PATH (optional)
$
sudo mv nrdot-collector /usr/local/bin/
$
$
# Verify installation
$
nrdot-collector --version

重要

他の OS およびアーキテクチャーについては、 NRDOT Collectorリリースにアクセスし、システムに適切なバイナリをダウンロードしてください。

ステップ2. コレクターを開始します

bash
$
nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml

数分以内に、コレクターはKafkaメトリクスのスクレイピングとNew Relicへの送信を開始します。

ステップ 1. バイナリのダウンロードおよびインストール

bash
$
# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version
$
OTEL_VERSION="<collector_version>"
$
ARCH="amd64"
$
$
curl -L -o otelcol-contrib.tar.gz \
>
"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"
$
$
tar -xzf otelcol-contrib.tar.gz
$
sudo mv otelcol-contrib /usr/local/bin/
$
otelcol-contrib --version

その他の OS については、 OpenTelemetry Collectorリリースページをご覧ください。

ステップ2. コレクターを開始します

bash
$
otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml

数分以内に、コレクターはKafkaメトリクスのスクレイピングとNew Relicへの送信を開始します。

(オプション) 計装プロデューサーまたは消費者アプリケーション

重要

言語サポート: 現在、OpenTelemetry Javaエージェントを使用したKafkaクライアントの計装でサポートされているのはJavaアプリケーションのみです。

Kafkaプロデューサーおよび消費者アプリケーションからアプリケーションレベルのテレメトリーを収集するには、まだダウンロードしていない場合は、OpenTelemetry Javaエージェントをダウンロードしてください:

bash
$
mkdir -p ~/opentelemetry
$
curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \
>
https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar

エージェントを使用してアプリケーションを開始します。

bash
$
OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"
$
$
java \
>
-javaagent:$OTEL_AGENT \
>
-Dotel.service.name="order-process-service" \
>
-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \
>
-Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \
>
-Dotel.exporter.otlp.protocol="grpc" \
>
-Dotel.metrics.exporter="otlp" \
>
-Dotel.traces.exporter="otlp" \
>
-Dotel.logs.exporter="otlp" \
>
-Dotel.instrumentation.kafka.experimental-span-attributes="true" \
>
-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \
>
-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \
>
-Dotel.instrumentation.kafka.enabled="true" \
>
-Dotel.instrumentation.runtime-telemetry.enabled="false" \
>
-jar your-kafka-application.jar

設定パラメーター

次の表では、キー設定について説明します。

パラメータ説明
service.nameたとえば、プロデューサーまたは消費者アプリケーションの一意の名前に置き換えてください。 order-process-service
kafka.cluster.nameたとえば、コレクター設定で使用されているのと同じクラスタ名に置き換えます。 my-kafka-cluster
otlp.endpointたとえば、OpenTelemetry Collectorを実行しているホストのホスト名またはIPに置き換えます http://collector-host-ip:4317

ヒント

上記の設定は、collector-host-ip:4317で実行されているOpenTelemetry Collectorにテレメトリーを送信します。アプリケーションテレメトリー専用の別のコレクターが必要な場合は、以下の設定で作成してください:

Javaエージェントは、コードを変更することなくすぐに使えるKafkaの計装を提供し、リクエストレイテンシ、スループットメトリクス、エラー率、およびディストリビューティッド(分散)トレーシングをキャプチャします。高度な設定については、Kafka計装ドキュメントを参照してください。

(オプション)Kafkaブローカーログを転送する

Kafka ブローカー ログを収集して New Relic に送信するには、OpenTelemetry Collector でファイルログ レシーバーを構成します。

データを検索する

数分後、New Relic に Kafka データが表示されるはずです。New Relic UIのさまざまなビューでKafkaデータを探索するための詳細な手順については、データの検索を参照してください。

以下の表は、各シグナルタイプの保存先をまとめています。以下のすべてのクエリで、my-kafka-clusterKAFKA_CLUSTER_NAMEの値に置き換えます:

シグナルイベントタイプ含まれるもの
指標Metricブローカー、トピック、パーティション、消費者グループ、およびJVMメトリクス
ログLogプロデューサーおよび消費者アプリケーションからのログ(OTel Javaエージェント経由)と、Javaエージェント経由で収集されたブローカーログ
トレースSpanトピックをまたぐメッセージごとのpublishおよびreceive操作を含む、プロデューサーと消費者のスパン

指標

ブローカー、トピック、パーティション、消費者グループ、およびJVMメトリクスは、Metricイベントタイプに保存されます。my-kafka-clusterKAFKA_CLUSTER_NAME値に置き換えます:

FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

ログ

OpenTelemetry Javaエージェントでインストゥルメントされたプロデューサーおよび消費者アプリケーションからのログ、および-Dotel.logs.exporter=otlpが設定されている場合のブローカーログは、Logイベントタイプに保存されます:

FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

トレース

トピックをまたぐメッセージごとのpublishおよびreceive操作を含む、プロデューサーと消費者のスパンは、Spanイベントタイプに保存されます:

FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago

Docker Composeのセットアップ、OTel Collectorの設定、OTel Javaエージェントの設定、およびサンプルのプロデューサー/消費者アプリケーションを含む完全な動作例は、New Relic OpenTelemetry Examplesリポジトリで利用可能です。

トラブルシューティング

次のステップ

Copyright © 2026 New Relic株式会社。

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.