Linuxホストに直接OpenTelemetry Collectorをインストールすることで、セルフホスト型Apache Kafkaクラスタをモニターします。KafkaブローカーからJMXメトリクスを収集するには、OpenTelemetry JavaエージェントまたはPrometheus JMX Exporterのいずれかのアプローチを選択します。
アーキテクチャー
New Relicは、セルフホスト型Kafkaを監視するための2つのアプローチをサポートしています:OpenTelemetry JavaエージェントまたはPrometheus JMX Exporter。次の図は、各アプローチのデータフローを示しています。

インストレーション手順
以下の手順に従って、ブローカーにOpenTelemetry Javaエージェントをインストールし、メトリクスとログを収集してNew Relicに送信するためのコレクターをデプロイして、包括的なKafka監視をセットアップします。
コレクター設定の作成
監視ホストの~/opentelemetry/collector-kafka-config.yamlにメインの OpenTelemetry Collector 設定を作成します。
receivers: # OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s # Exclude internal Kafka topics (prefixed with __) at the source topic_match: "^[^_].*$" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: # Remove broker.id for cluster-level metrics — these represent the whole cluster, # not a specific broker. broker.id is retained on broker-level metrics pipelines. - context: resource statements: - delete_key(attributes, "broker.id")
transform/remove_extra_attributes: metric_statements: - context: resource statements: # Delete all attributes starting with "process." - delete_matching_keys(attributes, "^process\\..*") # Delete all attributes starting with "telemetry." - delete_matching_keys(attributes, "^telemetry\\..*") - delete_key(attributes, "host.arch") - delete_key(attributes, "os.description") - delete_key(attributes, "host.image.id") - delete_key(attributes, "host.type") - delete_matching_keys(attributes, "^cloud\\..*") - delete_key(attributes, "service.instance.id") where IsMatch(attributes["service.name"], "^unknown_service:") - delete_key(attributes, "service.name") where IsMatch(attributes["service.name"], "^unknown_service:")
# Filter internal Kafka topics as a safety net (kafkametrics topic_match handles the receiver side) filter/internal_topics: metrics: datapoint: - 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
- include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
filter/remove_partition_level_replicas: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
groupbyattrs/cluster: keys: [kafka.cluster.name]
metricstransform/cluster_max: transforms: - include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count" match_type: regexp action: update operations: - action: aggregate_labels aggregation_type: max label_set: []
exporters: otlp/newrelic: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: # Broker metrics pipeline (excludes cluster-level metrics) metrics/broker: receivers: [otlp, kafkametrics] processors: - resourcedetection - resource - filter/exclude_cluster_metrics - filter/internal_topics - transform/remove_extra_attributes - transform/des_units - cumulativetodelta - metricstransform/kafka_topic_sum_aggregation - filter/remove_partition_level_replicas - batch/aggregation exporters: [otlp/newrelic]
# Cluster metrics pipeline (controller-emitted metrics like offline partitions, topic/partition counts — no broker.id) metrics/cluster: receivers: [otlp] processors: - resourcedetection - resource - filter/include_cluster_metrics - transform/remove_broker_id - transform/remove_extra_attributes - transform/des_units - cumulativetodelta - groupbyattrs/cluster - metricstransform/cluster_max - batch/aggregation exporters: [otlp/newrelic]
# APM traces pipeline (producer + consumer spans via OTel Java agent) traces/apps: receivers: [otlp] processors: [resourcedetection, resource, batch/aggregation] exporters: [otlp/newrelic]
# APM logs pipeline (producer + consumer logs via OTel Java agent) logs/apps: receivers: [otlp] processors: [resourcedetection, resource, batch/aggregation] exporters: [otlp/newrelic]環境変数の設定
コレクターをインストールする前に、監視ホストで必要な環境変数を設定します:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region設定パラメーター
次の表では、キー設定について説明します。
| 変数 | 説明 |
|---|---|
NEW_RELIC_LICENSE_KEY | たとえば、New Relicのライセンスキー YOUR_LICENSE_KEY |
KAFKA_CLUSTER_NAME | たとえば、Kafkaクラスタの一意の名前 my-kafka-cluster |
KAFKA_BOOTSTRAP_BROKER_ADDRESSES | たとえば、Kafkaブートストラップブローカーのアドレス broker1-host:9092,broker2-host:9092,broker3-host:9092 |
NEW_RELIC_OTLP_ENDPOINT | OTLP取り込みエンドポイント。USリージョンにはhttps://otlp.nr-data.net:4317を、EUリージョンにはhttps://otlp.eu01.nr-data.net:4317を使用します。その他の設定については、OTLPエンドポイントの設定を参照してください。 |
コレクターをインストールして起動する
監視ホストにコレクターをインストールして実行します。NRDOT Collector(New Relicのディストリビューション)またはOpenTelemetry Collectorのいずれかを選択します:
ヒント
NRDOT Collector は、New Relic のサポートを受けた OpenTelemetry Collector の New Relic ディストリビューションです。
ステップ 1. バイナリのダウンロードおよびインストール
ホストOS用のNRDOT Collectorバイナリをダウンロードしてインストールします。以下の例はlinux_amd64アーキテクチャー向けです:
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --version重要
他の OS およびアーキテクチャーについては、 NRDOT Collectorリリースにアクセスし、システムに適切なバイナリをダウンロードしてください。
ステップ2. コレクターを開始します
設定ファイルを使用してコレクターを実行して、監視を開始します。
$nrdot-collector --config ~/opentelemetry/collector-kafka-config.yamlコレクターは現在実行中であり、データを受信する準備ができています。New Relicにメトリクスが表示される前に、KafkaブローカーにJavaエージェントをアタッチする残りの手順を完了してください。
ステップ 1. バイナリのダウンロードおよびインストール
ホストOS用のOpenTelemetry Collector Contribバイナリをダウンロードしてインストールします。以下の例はlinux_amd64アーキテクチャー用です:
$# Set version and architecture$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$# Download the collector$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$# Extract the binary$tar -xzf otelcol-contrib.tar.gz$
$# Move to a location in PATH (optional)$sudo mv otelcol-contrib /usr/local/bin/$
$# Verify installation$otelcol-contrib --versionその他の OS については、 OpenTelemetry Collectorリリースページをご覧ください。
ステップ2. コレクターを開始します
設定ファイルを使用してコレクターを実行して、監視を開始します。
$otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yamlコレクターは現在実行中であり、データを受信する準備ができています。New Relicにメトリクスが表示される前に、KafkaブローカーにJavaエージェントをアタッチする残りの手順を完了してください。
OpenTelemetry Javaエージェントをダウンロードします
重要
JavaエージェントをアタッチしたKafkaブローカーを(再)起動する前に、OpenTelemetry Collectorが実行されていることを確認してください。エージェントはブローカーの起動直後にメトリクスの送信を開始するため、コレクターはそれらを受信できる状態である必要があります。
OpenTelemetry Javaエージェントは、KafkaブローカーにアタッチされたJavaエージェントとして実行され、KafkaおよびJMXメトリクスを収集し、OTLP経由でコレクターに送信します:
$# Create directory for OpenTelemetry components$mkdir -p ~/opentelemetry$
$# Download OpenTelemetry Java agent$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarJMXカスタム設定の作成
JMX MBeanからKafkaメトリクスを収集するために、OpenTelemetry JavaエージェントのJMX設定ファイルを作成します。
各ブローカーホストで、次の設定を使用してファイル~/opentelemetry/kafka-jmx-config.yamlを作成します:
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker. unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count - increasing indicates broker failures unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
# Request latency: total count, 50th percentile, and average (99p kept above) - beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentile
# GC elapsed time (cumulative collection time in ms) - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
# JVM class loading - bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
# JVM heap committed (in addition to heap.used and heap.max) - bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: Committed heap memory type: gauge
# Additional JVM CPU and system metrics - bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
# JVM memory pool breakdown (by generation: G1 Old Gen, Eden, Survivor, etc.) - bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Kafkaブローカーを構成する
Kafkaを起動する前にKAFKA_OPTS環境変数を設定して、OpenTelemetry JavaエージェントをKafkaブローカーにアタッチします。
単一ブローカーの例:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \> -Dotel.jmx.enabled=true \> -Dotel.jmx.config=$JMX_CONFIG \> -Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol=grpc \> -Dotel.metrics.exporter=otlp \> -Dotel.logs.exporter=otlp \> -Dotel.instrumentation.runtime-telemetry.enabled=false \> -Dotel.metric.export.interval=30000" \> bin/kafka-server-start.sh config/server.properties &重要
マルチブローカーの集中: 複数のブローカーの場合は、各ブローカーの-Dotel.resource.attributes問題で一意のbroker.id値 (例: broker.id=1 、 broker.id=2 、 broker.id=3) を持つ同じ設定を使用します。
ヒント
ブローカーログは、上記の-Dotel.logs.exporter=otlpフラグにより自動的に有効になります。ブローカーのログ収集を無効にするには、代わりに-Dotel.logs.exporter=noneを設定します。
設定パラメーター
次の表では、キー設定について説明します。
| パラメータ | 説明 |
|---|---|
otlp.endpoint | たとえば、OpenTelemetry Collectorを実行しているホストのIPまたはホスト名に置き換えます。 http://collector-host-ip:4317 |
broker.id | 1を各ブローカーの一意のブローカーID(たとえばbroker.id=1、broker.id=2)に置き換えます、 broker.id=3 |
kafka.cluster.name | my-kafka-clusterをKafkaクラスタ名に置き換えます。コレクターの設定で設定された値と一致する必要があります。 |
logs.exporter | otlpに設定すると、ブローカーのログ収集を有効にします。ブローカーのログ転送を無効にするには、noneに設定します。 |
完全な設定オプションについては、Javaエージェント設定ガイドを参照してください。
(オプション) 計装プロデューサーまたは消費者アプリケーション
重要
言語サポート: 現在、OpenTelemetry Javaエージェントを使用したKafkaクライアントの計装でサポートされているのはJavaアプリケーションのみです。
お使いのKafkaプロデューサーおよび消費者アプリケーションからアプリケーションレベルのテレメトリーを収集するには、上記のOpenTelemetry JavaエージェントのダウンロードステップからOpenTelemetry Javaエージェントをダウンロードしてください。
エージェントを使用してアプリケーションを開始します。
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$
$java \> -javaagent:$OTEL_AGENT \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -Dotel.instrumentation.runtime-telemetry.enabled="false" \> -jar your-kafka-application.jar設定パラメーター
次の表では、キー設定について説明します。
| パラメータ | 説明 |
|---|---|
service.name | たとえば、プロデューサーまたは消費者アプリケーションの一意の名前に置き換えてください。 order-process-service |
kafka.cluster.name | たとえば、コレクター設定で使用されているのと同じクラスタ名に置き換えます。 my-kafka-cluster |
otlp.endpoint | たとえば、OpenTelemetry Collectorを実行しているホストのホスト名またはIPに置き換えます http://collector-host-ip:4317 |
ヒント
上記の設定は、collector-host-ip:4317で実行されているOpenTelemetry Collectorにテレメトリーを送信します。アプリケーションテレメトリー専用の別のコレクターが必要な場合は、以下の設定で作成してください:
Javaエージェントは、コード変更なしですぐに使えるKafkaの計装を提供し、リクエストレイテンシ、スループットメトリクス、エラー率、およびディストリビューティッド(分散)トレーシングをキャプチャします。
高度な設定については、 Kafka 計装ドキュメントを参照してください。
ブローカーにPrometheus JMX Exporterをインストールし、メトリクスを収集してNew Relicに送信するコレクターをデプロイすることで、包括的なKafka監視をセットアップするには、以下の手順に従ってください。
あなたが始める前に
以下のものを用意してください:
- New Relicアカウント
- コレクターホストから各ブローカーのポートへのネットワークアクセス
9404 - コレクターからKafkaブートストラップポート(通常は
9092)へのネットワークアクセス
Prometheus JMX Exporter をダウンロードします
各KafkaブローカーホストにPrometheus JMX Exporter JARをダウンロードします:
$# Create directory for Prometheus components$mkdir -p ~/opentelemetry$
$# Download the Prometheus JMX Exporter agent JAR$# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.$JMX_EXPORTER_VERSION="1.5.0"$curl -L -o ~/opentelemetry/jmx_prometheus_javaagent.jar \> "https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"JMXメトリクス設定を作成
収集するKafkaメトリクスを定義するJMX Exporterの設定ファイルを作成します。各ブローカーホストで~/opentelemetry/kafka-jmx-config.yamlとして保存します:
startDelaySeconds: 0lowercaseOutputName: truelowercaseOutputLabelNames: true
rules: # Cluster-level controller metrics - pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value' name: kafka_cluster_topic_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value' name: kafka_cluster_partition_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value' name: kafka_broker_fenced_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value' name: kafka_partition_non_preferred_leader type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value' name: kafka_partition_offline type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value' name: kafka_controller_active_count type: GAUGE
# Broker-level replica metrics - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value' name: kafka_partition_under_min_isr type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value' name: kafka_broker_leader_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value' name: kafka_partition_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value' name: kafka_partition_under_replicated type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value' name: kafka_max_lag type: GAUGE
# Broker topic metrics (totals) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count' name: kafka_message_count type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "out"
# Per-topic metrics (only appear after traffic flows) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count' name: kafka_prod_msg_count type: COUNTER labels: topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "out"
# Request metrics - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile' name: kafka_request_time_99p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value' name: kafka_request_queue type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value' name: kafka_purgatory_size type: GAUGE labels: type: "$1"
# Controller stats - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count' name: kafka_leader_election_rate type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count' name: kafka_unclean_election_rate type: COUNTER
# JVM Garbage Collection - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount' name: jvm_gc_collections_count type: COUNTER labels: name: "$1"
# JVM Memory - pattern: 'java.lang<type=Memory><HeapMemoryUsage>max' name: jvm_memory_heap_max type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used' name: jvm_memory_heap_used type: GAUGE
# JVM Threading and System - pattern: 'java.lang<type=Threading><>ThreadCount' name: jvm_thread_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad' name: jvm_system_cpu_utilization type: GAUGE
# Broker uptime - pattern: 'java.lang<type=Runtime><>Uptime' name: kafka_broker_uptime type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above) - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count' name: kafka_request_time_total type: COUNTER labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile' name: kafka_request_time_50p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean' name: kafka_request_time_avg type: GAUGE labels: type: "$1"
# Log flush metrics - pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count' name: kafka_logs_flush_count type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile' name: kafka_logs_flush_time_50p type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile' name: kafka_logs_flush_time_99p type: GAUGE
# JVM GC elapsed time - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime' name: jvm_gc_collections_elapsed type: COUNTER labels: name: "$1"
# JVM Memory heap committed - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed' name: jvm_memory_heap_committed type: GAUGE
# JVM class loading - pattern: 'java.lang<type=ClassLoading><>LoadedClassCount' name: jvm_class_count type: GAUGE
# Additional JVM OS metrics - pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage' name: jvm_system_cpu_load_1m type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors' name: jvm_cpu_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad' name: jvm_cpu_recent_utilization type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount' name: jvm_file_descriptor_count type: GAUGE
# JVM Memory Pool - pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used' name: jvm_memory_pool_used type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max' name: jvm_memory_pool_max type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used' name: jvm_memory_pool_used_after_last_gc type: GAUGE labels: name: "$1"ヒント
メトリクスのカスタマイズ: Prometheus JMX Exporterの例およびKafka MBeanドキュメントを参照して、パターンを追加または変更できます。追加の設定については、JMX Exporter ルールのドキュメントを参照してください。
JMX Exporter を使用するように Kafka ブローカーを設定する
Kafkaの起動オプションに追加することで、Prometheus JMX ExporterをJavaエージェントとして各Kafkaブローカーにアタッチします。
単一ブローカーの例:
$JMX_JAR="$HOME/opentelemetry/jmx_prometheus_javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:${JMX_JAR}=9404:${JMX_CONFIG}" \> bin/kafka-server-start.sh config/server.properties &各ブローカーは、ポート9404でPrometheusメトリクスを公開するようになります。確認:
$curl http://localhost:9404/metrics | grep kafka_重要
マルチブローカークラスタ: すべてのブローカーに同じKAFKA_OPTS設定を適用します。各ブローカーは、自身のホストIPからポート9404でメトリクスを公開します。
コレクター設定の作成
監視ホスト上の~/opentelemetry/collector-kafka-config.yamlにOpenTelemetry Collectorの設定を作成します。
Prometheusレシーバーは、すべてのブローカーエンドポイントをスクレイピングします。コレクターは、Prometheusエンドポイントのスクレイピングに加えて、OTLPデータ(アプリケーションのトレース、ログ)を0.0.0.0:4317でリッスンします。
receivers: # OTLP receiver for application traces, metrics, and logs (listens on port 4317) otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Prometheus receiver scrapes JMX metrics from Kafka brokers prometheus/kafka-jmx: config: scrape_configs: - job_name: 'kafka-jmx-metrics' metrics_path: /metrics scrape_interval: 30s static_configs: # TODO: Replace each target with your broker hostname or IP, and set a unique broker.id per broker - targets: ['broker1-host:9404'] labels: broker.id: '0' - targets: ['broker2-host:9404'] labels: broker.id: '1' - targets: ['broker3-host:9404'] labels: broker.id: '2'
# Kafka metrics receiver for cluster-level consumer lag, topic, and partition metrics kafkametrics/cluster: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s # Exclude internal Kafka topics (prefixed with __) at the source topic_match: "^[^_].*$" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
exporters: otlp/backend: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} tls: insecure: false sending_queue: num_consumers: 12 queue_size: 5000 retry_on_failure: enabled: true
processors: # Batch processor for efficient export batch/export: send_batch_size: 1024 timeout: 30s
# Memory limiter to prevent OOM memory_limiter: limit_percentage: 80 spike_limit_percentage: 30 check_interval: 1s
# Transform metric naming conventions (underscore to dot, normalize special names) transform/metric-naming: metric_statements: - context: metric statements: - replace_pattern(name, "_", ".") - replace_pattern(name, "\\.load\\.1", ".load_1") - replace_pattern(name, "\\.recent\\.util", ".recent_util") - replace_pattern(name, "file\\.descriptor\\.count", "file_descriptor.count") - replace_pattern(name, "\\.memory\\.pool\\.used\\.bytes$", ".memory.pool.used") - replace_pattern(name, "\\.memory\\.pool\\.max\\.bytes$", ".memory.pool.max") - replace_pattern(name, "\\.memory\\.pool\\.collection\\.used\\.bytes$", ".memory.pool.used_after_last_gc") - replace_pattern(name, "\\.non\\.preferred\\.leader", ".non_preferred_leader") - replace_pattern(name, "\\.under\\.min\\.isr", ".under_min_isr") - replace_pattern(name, "\\.under\\.replicated", ".under_replicated") - replace_pattern(name, "\\.total$", "") where name != "kafka.request.time.total" - context: datapoint statements: - set(attributes["name"], attributes["gc"]) where attributes["gc"] != nil - delete_key(attributes, "gc") where attributes["gc"] != nil - set(attributes["name"], attributes["pool"]) where attributes["pool"] != nil - delete_key(attributes, "pool") where attributes["pool"] != nil
# Add cluster name to all metrics resource/cluster-name: attributes: - key: kafka.cluster.name # TODO: Replace with your Kafka cluster name value: ${env:KAFKA_CLUSTER_NAME} action: upsert
# Remove broker.id for cluster-level metrics transform/remove_broker_id: metric_statements: - context: datapoint statements: - delete_key(attributes, "broker.id")
# Filter out scrape overhead metrics filter/scrape-overhead: metrics: exclude: match_type: regexp metric_names: - "^jmx_.*" - "^process_.*" - "^jvm_buffer_pool_.*" - "^jvm_threads_.*" - "^jvm_classes_.*" - "^jvm_memory_(heap|non_heap)_(committed|init|max|used)_bytes$" - "^jvm_compilation_.*" - "^jvm_(runtime|info).*" - "^jvm_memory_pool_(allocated_bytes_total|committed_bytes|init_bytes|collection_(committed|init|max)_bytes)$"
# Include only cluster-level metrics for the cluster pipeline filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "^kafka\\.partition\\.offline$" - "^kafka\\.(leader|unclean)\\.election\\.rate$" - "^kafka\\.partition\\.non_preferred_leader$" - "^kafka\\.broker\\.fenced\\.count$" - "^kafka\\.cluster\\.partition\\.count$" - "^kafka\\.cluster\\.topic\\.count$"
# Exclude cluster-level metrics from the broker pipeline filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "^kafka\\.partition\\.offline$" - "^kafka\\.(leader|unclean)\\.election\\.rate$" - "^kafka\\.partition\\.non_preferred_leader$" - "^kafka\\.broker\\.fenced\\.count$" - "^kafka\\.cluster\\.partition\\.count$" - "^kafka\\.cluster\\.topic\\.count$"
# Remove unnecessary attributes transform/remove_attributes: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != "" - context: resource statements: - delete_key(attributes, "server.address") - delete_key(attributes, "server.port") - delete_key(attributes, "service.instance.id") - delete_key(attributes, "host.name") - delete_key(attributes, "url.scheme")
# Aggregate partition metrics to topic level metricstransform/topic-aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum - include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
# Filter out original partition replicas metric filter/exclude_partition_replicas_metric: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
# Filter internal Kafka topics as a safety net filter/internal_topics: metrics: datapoint: - 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
# Convert cumulative to delta metrics cumulativetodelta:
groupbyattrs/cluster: keys: [kafka.cluster.name]
metricstransform/cluster_max: transforms: - include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count" match_type: regexp action: update operations: - action: aggregate_labels aggregation_type: max label_set: []
service: pipelines: # Application traces from instrumented Kafka clients and apps traces: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Application metrics from instrumented Kafka clients and apps metrics: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Application logs from instrumented Kafka clients and apps logs: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Broker-level metrics from Prometheus JMX scraping metrics/broker: receivers: - prometheus/kafka-jmx processors: - resource/cluster-name - filter/scrape-overhead - transform/metric-naming - transform/remove_attributes - filter/exclude_cluster_metrics - memory_limiter - cumulativetodelta - batch/export exporters: - otlp/backend
# Cluster-level metrics from Prometheus JMX scraping metrics/cluster/prometheus: receivers: - prometheus/kafka-jmx processors: - resource/cluster-name - filter/scrape-overhead - transform/metric-naming - transform/remove_attributes - filter/include_cluster_metrics - transform/remove_broker_id - memory_limiter - cumulativetodelta - groupbyattrs/cluster - metricstransform/cluster_max - batch/export exporters: - otlp/backend
# Cluster-level metrics from Kafka metrics receiver (consumer lag, topics, partitions) metrics/cluster/kafkametrics: receivers: - kafkametrics/cluster processors: - resource/cluster-name - filter/internal_topics - transform/remove_attributes - metricstransform/topic-aggregation - filter/exclude_partition_replicas_metric - memory_limiter - cumulativetodelta - batch/export exporters: - otlp/backend環境変数の設定
コレクターを起動する前に、監視ホストで必要な環境変数を設定します:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region$# EU region: https://otlp.eu01.nr-data.net:4317設定パラメーター
次の表では、キー設定について説明します。
| 変数 | 説明 |
|---|---|
NEW_RELIC_LICENSE_KEY | たとえば、New Relicのライセンスキー YOUR_LICENSE_KEY |
KAFKA_CLUSTER_NAME | たとえば、Kafkaクラスタの一意の名前 my-kafka-cluster |
KAFKA_BOOTSTRAP_BROKER_ADDRESSES | たとえば、Kafkaブートストラップブローカーのアドレス broker1-host:9092,broker2-host:9092,broker3-host:9092 |
NEW_RELIC_OTLP_ENDPOINT | OTLP取り込みエンドポイント。USリージョンにはhttps://otlp.nr-data.net:4317を、EUリージョンにはhttps://otlp.eu01.nr-data.net:4317を使用します。その他の設定については、OTLPエンドポイントの設定を参照してください。 |
コレクターをインストールして起動する
監視ホストにコレクターをインストールして実行します。NRDOT Collector(New Relicのディストリビューション)またはOpenTelemetry Collectorのいずれかを選択します:
ヒント
NRDOT Collectorは、New Relicのサポートによる支援が受けられる、New RelicのOpenTelemetry Collectorディストリビューションです。詳細については、NRDOT Collector GitHubリポジトリを参照してください。
ステップ 1. バイナリのダウンロードおよびインストール
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --version重要
他の OS およびアーキテクチャーについては、 NRDOT Collectorリリースにアクセスし、システムに適切なバイナリをダウンロードしてください。
ステップ2. コレクターを開始します
$nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml数分以内に、コレクターはKafkaメトリクスのスクレイピングとNew Relicへの送信を開始します。
ステップ 1. バイナリのダウンロードおよびインストール
$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$tar -xzf otelcol-contrib.tar.gz$sudo mv otelcol-contrib /usr/local/bin/$otelcol-contrib --versionその他の OS については、 OpenTelemetry Collectorリリースページをご覧ください。
ステップ2. コレクターを開始します
$otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml数分以内に、コレクターはKafkaメトリクスのスクレイピングとNew Relicへの送信を開始します。
(オプション) 計装プロデューサーまたは消費者アプリケーション
重要
言語サポート: 現在、OpenTelemetry Javaエージェントを使用したKafkaクライアントの計装でサポートされているのはJavaアプリケーションのみです。
Kafkaプロデューサーおよび消費者アプリケーションからアプリケーションレベルのテレメトリーを収集するには、まだダウンロードしていない場合は、OpenTelemetry Javaエージェントをダウンロードしてください:
$mkdir -p ~/opentelemetry$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarエージェントを使用してアプリケーションを開始します。
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$
$java \> -javaagent:$OTEL_AGENT \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -Dotel.instrumentation.runtime-telemetry.enabled="false" \> -jar your-kafka-application.jar設定パラメーター
次の表では、キー設定について説明します。
| パラメータ | 説明 |
|---|---|
service.name | たとえば、プロデューサーまたは消費者アプリケーションの一意の名前に置き換えてください。 order-process-service |
kafka.cluster.name | たとえば、コレクター設定で使用されているのと同じクラスタ名に置き換えます。 my-kafka-cluster |
otlp.endpoint | たとえば、OpenTelemetry Collectorを実行しているホストのホスト名またはIPに置き換えます http://collector-host-ip:4317 |
ヒント
上記の設定は、collector-host-ip:4317で実行されているOpenTelemetry Collectorにテレメトリーを送信します。アプリケーションテレメトリー専用の別のコレクターが必要な場合は、以下の設定で作成してください:
Javaエージェントは、コードを変更することなくすぐに使えるKafkaの計装を提供し、リクエストレイテンシ、スループットメトリクス、エラー率、およびディストリビューティッド(分散)トレーシングをキャプチャします。高度な設定については、Kafka計装ドキュメントを参照してください。
(オプション)Kafkaブローカーログを転送する
Kafka ブローカー ログを収集して New Relic に送信するには、OpenTelemetry Collector でファイルログ レシーバーを構成します。
データを検索する
数分後、New Relic に Kafka データが表示されるはずです。New Relic UIのさまざまなビューでKafkaデータを探索するための詳細な手順については、データの検索を参照してください。
以下の表は、各シグナルタイプの保存先をまとめています。以下のすべてのクエリで、my-kafka-clusterをKAFKA_CLUSTER_NAMEの値に置き換えます:
| シグナル | イベントタイプ | 含まれるもの |
|---|---|---|
| 指標 | Metric | ブローカー、トピック、パーティション、消費者グループ、およびJVMメトリクス |
| ログ | Log | プロデューサーおよび消費者アプリケーションからのログ(OTel Javaエージェント経由)と、Javaエージェント経由で収集されたブローカーログ |
| トレース | Span | トピックをまたぐメッセージごとのpublishおよびreceive操作を含む、プロデューサーと消費者のスパン |
指標
ブローカー、トピック、パーティション、消費者グループ、およびJVMメトリクスは、Metricイベントタイプに保存されます。my-kafka-clusterをKAFKA_CLUSTER_NAME値に置き換えます:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoログ
OpenTelemetry Javaエージェントでインストゥルメントされたプロデューサーおよび消費者アプリケーションからのログ、および-Dotel.logs.exporter=otlpが設定されている場合のブローカーログは、Logイベントタイプに保存されます:
FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes agoトレース
トピックをまたぐメッセージごとのpublishおよびreceive操作を含む、プロデューサーと消費者のスパンは、Spanイベントタイプに保存されます:
FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago例
Docker Composeのセットアップ、OTel Collectorの設定、OTel Javaエージェントの設定、およびサンプルのプロデューサー/消費者アプリケーションを含む完全な動作例は、New Relic OpenTelemetry Examplesリポジトリで利用可能です。
トラブルシューティング
次のステップ
- Kafka メトリクスを調べる- 完全なメトリクスリファレンスを見る
- カスタムダッシュボードの作成- Kafka データの視覚化を構築します
- アラートの設定 — 消費者ラグやレプリカ不足のパーティションなどの重要なメトリクスをモニターします