Linux 호스트에 직접 OpenTelemetry Collector를 설치하여 자체 호스팅 아파치 Kafka 클러스터를 모니터하십시오. Kafka 브로커에서 JMX 메트릭을 수집하려면 OpenTelemetry 자바 에이전트 또는 Prometheus JMX Exporter 방식 중 하나를 선택하십시오.
아키텍처
뉴렐릭은 자체 호스팅 Kafka를 모니터링하기 위한 두 가지 접근 방식을 지원합니다: OpenTelemetry 자바 에이전트 또는 Prometheus JMX Exporter. 다음 다이어그램은 각 접근 방식에 대한 데이터 흐름을 보여줍니다.

설치 단계
다음 단계에 따라 브로커에 OpenTelemetry 자바 에이전트를 설치하고 수집기를 배포하여 메트릭과 로그를 수집하고 뉴렐릭으로 전송하여 포괄적인 Kafka 모니터링을 설정하십시오.
수집기 설정 생성
모니터링 호스트의 ~/opentelemetry/collector-kafka-config.yaml 에 메인 OpenTelemetry Collector 설정을 생성합니다.
receivers: # OTLP receiver for Kafka and JMX metrics from Java agents and application telemetry otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s # Exclude internal Kafka topics (prefixed with __) at the source topic_match: "^[^_].*$" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: # Remove broker.id for cluster-level metrics — these represent the whole cluster, # not a specific broker. broker.id is retained on broker-level metrics pipelines. - context: resource statements: - delete_key(attributes, "broker.id")
transform/remove_extra_attributes: metric_statements: - context: resource statements: # Delete all attributes starting with "process." - delete_matching_keys(attributes, "^process\\..*") # Delete all attributes starting with "telemetry." - delete_matching_keys(attributes, "^telemetry\\..*") - delete_key(attributes, "host.arch") - delete_key(attributes, "os.description") - delete_key(attributes, "host.image.id") - delete_key(attributes, "host.type") - delete_matching_keys(attributes, "^cloud\\..*") - delete_key(attributes, "service.instance.id") where IsMatch(attributes["service.name"], "^unknown_service:") - delete_key(attributes, "service.name") where IsMatch(attributes["service.name"], "^unknown_service:")
# Filter internal Kafka topics as a safety net (kafkametrics topic_match handles the receiver side) filter/internal_topics: metrics: datapoint: - 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
- include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
filter/remove_partition_level_replicas: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
groupbyattrs/cluster: keys: [kafka.cluster.name]
metricstransform/cluster_max: transforms: - include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count" match_type: regexp action: update operations: - action: aggregate_labels aggregation_type: max label_set: []
exporters: otlp/newrelic: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: # Broker metrics pipeline (excludes cluster-level metrics) metrics/broker: receivers: [otlp, kafkametrics] processors: - resourcedetection - resource - filter/exclude_cluster_metrics - filter/internal_topics - transform/remove_extra_attributes - transform/des_units - cumulativetodelta - metricstransform/kafka_topic_sum_aggregation - filter/remove_partition_level_replicas - batch/aggregation exporters: [otlp/newrelic]
# Cluster metrics pipeline (controller-emitted metrics like offline partitions, topic/partition counts — no broker.id) metrics/cluster: receivers: [otlp] processors: - resourcedetection - resource - filter/include_cluster_metrics - transform/remove_broker_id - transform/remove_extra_attributes - transform/des_units - cumulativetodelta - groupbyattrs/cluster - metricstransform/cluster_max - batch/aggregation exporters: [otlp/newrelic]
# APM traces pipeline (producer + consumer spans via OTel Java agent) traces/apps: receivers: [otlp] processors: [resourcedetection, resource, batch/aggregation] exporters: [otlp/newrelic]
# APM logs pipeline (producer + consumer logs via OTel Java agent) logs/apps: receivers: [otlp] processors: [resourcedetection, resource, batch/aggregation] exporters: [otlp/newrelic]환경 변수 설정
수집기를 설치하기 전에 모니터링 호스트에 필수 환경 변수를 설정하십시오:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region구성 매개변수
다음 표에서는 주요 설정 시위에 대해 설명합니다.
| 변하기 쉬운 | 설명 |
|---|---|
NEW_RELIC_LICENSE_KEY | 예를 들어, 귀하의 뉴렐릭 라이선스 키 YOUR_LICENSE_KEY |
KAFKA_CLUSTER_NAME | Kafka 클러스터의 고유한 이름, 예를 들어 my-kafka-cluster |
KAFKA_BOOTSTRAP_BROKER_ADDRESSES | 예를 들어, 사용자의 Kafka 부트스트랩 브로커 주소 broker1-host:9092,broker2-host:9092,broker3-host:9092 |
NEW_RELIC_OTLP_ENDPOINT | OTLP 인제스트 엔드포인트. US 리전에는 https://otlp.nr-data.net:4317 을(를) 사용하고 EU 리전에는 https://otlp.eu01.nr-data.net:4317 을(를) 사용하십시오. 다른 설정의 경우, OTLP 엔드포인트 구성을 참조하세요. |
수집기를 설치하고 시작하세요.
모니터링 호스트에 수집기를 설치하고 실행하십시오. NRDOT Collector(뉴렐릭 배포판) 또는 OpenTelemetry Collector 중에서 선택하세요:
팁
NRDOT Collector 는 뉴렐릭이 OpenTelemetry Collector 배포한 버전이며, 뉴렐릭이 지원을 제공합니다.
1단계. 바이너리 다운로드 및 설치
호스트 운영 시스템용 NRDOT Collector 바이너리를 다운로드하여 설치하십시오. 아래 예시는 linux_amd64 아키텍처용입니다:
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --version중요
다른 운영 시스템 및 복제의 경우 NRDOT Collector 릴리스를 방문하여 시스템에 적합한 바이너리를 다운로드하세요.
2단계. 수집기 시작
시뮬레이션을 시작하려면 설정 파일로 수집기를 실행하세요.
$nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml수집기가 이제 실행 중이며 데이터를 수신할 준비가 되었습니다. 뉴렐릭에 메트릭이 표시되기 전에 Kafka 브로커에 자바 에이전트를 연결하는 남은 단계를 완료하십시오.
1단계. 바이너리 다운로드 및 설치
호스트 운영 시스템용 OpenTelemetry Collector Contrib 바이너리를 다운로드하여 설치하십시오. 아래 예시는 linux_amd64 아키텍처용입니다:
$# Set version and architecture$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$# Download the collector$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$# Extract the binary$tar -xzf otelcol-contrib.tar.gz$
$# Move to a location in PATH (optional)$sudo mv otelcol-contrib /usr/local/bin/$
$# Verify installation$otelcol-contrib --version다른 운영 시스템에 대해서는 OpenTelemetry Collector 릴리스 페이지를 방문하세요.
2단계. 수집기 시작
시뮬레이션을 시작하려면 설정 파일로 수집기를 실행하세요.
$otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml수집기가 이제 실행 중이며 데이터를 수신할 준비가 되었습니다. 뉴렐릭에 메트릭이 표시되기 전에 Kafka 브로커에 자바 에이전트를 연결하는 남은 단계를 완료하십시오.
OpenTelemetry 자바 에이전트 다운로드
중요
자바 에이전트가 연결된 Kafka 브로커를 (재)시작하기 전에 OpenTelemetry Collector가 실행 중인지 확인하십시오. 에이전트는 브로커 시작 시 즉시 메트릭을 전송하기 시작하므로, 수집기는 이를 수신할 수 있어야 합니다.
OpenTelemetry 자바 에이전트 는 Kafka 브로커에 연결된 자바 에이전트로 실행되며, Kafka 및 JMX 메트릭을 수집하고 OTLP를 통해 수집기로 전송합니다:
$# Create directory for OpenTelemetry components$mkdir -p ~/opentelemetry$
$# Download OpenTelemetry Java agent$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarJMX 사용자 정의 설정 만들기
JMX MBean에서 Kafka 메트릭을 수집하려면 OpenTelemetry 자바 에이전트 JMX 설정 파일을 생성하세요.
다음 설정으로 각 브로커 호스트에 ~/opentelemetry/kafka-jmx-config.yaml 파일을 생성합니다:
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0)
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec mapping: Count: metric: kafka.message.count type: counter desc: The number of messages received by the broker unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.count type: &type counter desc: &desc The number of requests received by the broker unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: metric: &metric kafka.request.failed type: &type counter desc: &desc The number of requests to the broker resulting in a failure unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: 99thPercentile: metric: kafka.request.time.99p type: gauge desc: The 99th percentile time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize mapping: Value: metric: kafka.request.queue type: gauge desc: Size of the request queue unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: metric: &metric kafka.network.io type: &type counter desc: &desc The bytes received or sent by the broker unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: metric: *metric type: *type desc: *desc unit: *unit
- beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch metricAttribute: type: param(delayedOperation) mapping: Value: metric: kafka.purgatory.size type: gauge desc: The number of requests waiting in purgatory unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount mapping: Value: metric: kafka.partition.count type: gauge desc: The number of partitions on the broker unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount mapping: Value: metric: kafka.partition.offline type: gauge desc: The number of partitions offline unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions mapping: Value: metric: kafka.partition.under_replicated type: gauge desc: The number of under replicated partitions unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec metricAttribute: operation: const(shrink) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec metricAttribute: operation: const(expand) mapping: Count: metric: kafka.isr.operation.count type: counter desc: The number of in-sync replica shrink and expand operations unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: metric: kafka.max.lag type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count type: gauge desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker. unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count - increasing indicates broker failures unit: "{election}"
# ── Additional metrics — remove this section to reduce data ingest ───────────
# Request latency: total count, 50th percentile, and average (99p kept above) - beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower metricAttribute: type: param(request) unit: ms mapping: Count: metric: kafka.request.time.total type: counter desc: The total time the broker has taken to service requests 50thPercentile: metric: kafka.request.time.50p type: gauge desc: The 50th percentile time the broker has taken to service requests Mean: metric: kafka.request.time.avg type: gauge desc: The average time the broker has taken to service requests
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs unit: ms type: gauge prefix: kafka.logs.flush. mapping: Count: metric: count unit: '{flush}' type: counter desc: Log flush count 50thPercentile: metric: time.50p desc: Log flush time - 50th percentile 99thPercentile: metric: time.99p desc: Log flush time - 99th percentile
# GC elapsed time (cumulative collection time in ms) - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
# JVM class loading - bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
# JVM heap committed (in addition to heap.used and heap.max) - bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: Committed heap memory type: gauge
# Additional JVM CPU and system metrics - bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
# JVM memory pool breakdown (by generation: G1 Old Gen, Eden, Survivor, etc.) - bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Kafka 브로커 구성
Kafka를 시작하기 전에 KAFKA_OPTS 환경 변수를 설정하여 OpenTelemetry 자바 에이전트를 Kafka 브로커에 연결하십시오.
단일 브로커 예시:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:$OTEL_AGENT \> -Dotel.jmx.enabled=true \> -Dotel.jmx.config=$JMX_CONFIG \> -Dotel.resource.attributes=broker.id=1,kafka.cluster.name=my-kafka-cluster \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol=grpc \> -Dotel.metrics.exporter=otlp \> -Dotel.logs.exporter=otlp \> -Dotel.instrumentation.runtime-telemetry.enabled=false \> -Dotel.metric.export.interval=30000" \> bin/kafka-server-start.sh config/server.properties &중요
다중 브로커 클러스터: 여러 브로커의 경우 각 브로커에 대해 -Dotel.resource.attributes 클러스터에서 고유한 broker.id 값(예: broker.id=1, broker.id=2, broker.id=3)과 동일한 설정을 사용합니다.
팁
브로커 로그는 위의 -Dotel.logs.exporter=otlp 플래그로 자동으로 활성화됩니다. 브로커 로그 수집을 비활성화하려면 대신 -Dotel.logs.exporter=none 을(를) 설정하십시오.
구성 매개변수
다음 표에서는 주요 설정 시위에 대해 설명합니다.
| 매개변수 | 설명 |
|---|---|
otlp.endpoint | 예를 들어, OpenTelemetry Collector를 실행하는 호스트의 IP 또는 호스트명으로 바꾸세요. http://collector-host-ip:4317 |
broker.id | 1 을(를) 각 브로커의 고유 브로커 ID로 바꿉니다(예: broker.id=1, broker.id=2,). broker.id=3 |
kafka.cluster.name | my-kafka-cluster 을 Kafka 클러스터 이름으로 교체합니다. 수집기 설정에 지정된 값과 일치해야 합니다. |
logs.exporter | otlp(으)로 설정하면 브로커 로그 수집을 활성화합니다. 브로커 로그 포워딩을 비활성화하려면 none (으)로 설정하십시오. |
전체 설정 옵션은 자바 에이전트 설정 가이드를 참조하세요.
(선택사항) 제작자 또는 소비자를 위해
중요
언어 지원: 현재 OpenTelemetry 자바 에이전트를 사용한 Kafka 클라이언트 계측에는 자바 애플리케이션만 지원됩니다.
Kafka 프로듀서 및 소비자 애플리케이션에서 애플리케이션 수준의 텔레메트리를 수집하려면 위의 OpenTelemetry 자바 에이전트 다운로드 단계에서 OpenTelemetry 자바 에이전트를 다운로드하세요.
에이전트로 시작하세요:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$
$java \> -javaagent:$OTEL_AGENT \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -Dotel.instrumentation.runtime-telemetry.enabled="false" \> -jar your-kafka-application.jar구성 매개변수
다음 표에서는 주요 설정 시위에 대해 설명합니다.
| 매개변수 | 설명 |
|---|---|
service.name | 예를 들어, 생산자 또는 소비자 애플리케이션의 고유한 이름으로 대체하십시오. order-process-service |
kafka.cluster.name | 예를 들어, 수집기 설정에 사용된 것과 동일한 클러스터 이름으로 교체하십시오. my-kafka-cluster |
otlp.endpoint | 예를 들어, OpenTelemetry Collector를 실행 중인 호스트의 호스트명 또는 IP로 대체하세요. http://collector-host-ip:4317 |
팁
위의 설정은 collector-host-ip:4317에서 실행 중인 OpenTelemetry Collector로 텔레메트리를 전송합니다. 애플리케이션 텔레메트리 전용 별도 수집기를 원하시는 경우, 다음 설정으로 생성하세요:
자바 에이전트는 코드 변경 없이 즉시 사용 가능한 Kafka 계측 을 제공하여 요청 지연시간, 처리량 메트릭, 오류율 및 분산 트레이스를 캡처합니다.
고급 설정에 대해서는 Kafka 측정, 로그 문서를 참조하세요.
브로커에 Prometheus JMX Exporter를 설치하고 메트릭을 수집하여 뉴렐릭으로 전송할 수집기를 배포함으로써 포괄적인 Kafka 모니터링을 설정하려면 다음 단계를 따르십시오.
시작하기 전에
다음 사항을 확인하십시오:
- 뉴렐릭 계정
- 수집기 호스트에서 포트를 통한 각 브로커로의 네트워크 액세스
9404 - 수집기에서 Kafka 부트스트랩 포트(일반적으로
9092)로의 네트워크 액세스
Prometheus JMX Exporter를 다운로드하세요.
각 Kafka 브로커 호스트에서 Prometheus JMX Exporter JAR를 다운로드합니다:
$# Create directory for Prometheus components$mkdir -p ~/opentelemetry$
$# Download the Prometheus JMX Exporter agent JAR$# Version 1.5.0 is the minimum required version. Check https://github.com/prometheus/jmx_exporter/releases/latest for newer releases.$JMX_EXPORTER_VERSION="1.5.0"$curl -L -o ~/opentelemetry/jmx_prometheus_javaagent.jar \> "https://github.com/prometheus/jmx_exporter/releases/download/${JMX_EXPORTER_VERSION}/jmx_prometheus_javaagent-${JMX_EXPORTER_VERSION}.jar"JMX 메트릭 설정 생성
수집할 Kafka 메트릭을 정의하는 JMX Exporter 설정 파일을 생성합니다. 각 브로커 호스트에 ~/opentelemetry/kafka-jmx-config.yaml (으)로 저장:
startDelaySeconds: 0lowercaseOutputName: truelowercaseOutputLabelNames: true
rules: # Cluster-level controller metrics - pattern: 'kafka.controller<type=KafkaController, name=GlobalTopicCount><>Value' name: kafka_cluster_topic_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=GlobalPartitionCount><>Value' name: kafka_cluster_partition_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=FencedBrokerCount><>Value' name: kafka_broker_fenced_count type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=PreferredReplicaImbalanceCount><>Value' name: kafka_partition_non_preferred_leader type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=OfflinePartitionsCount><>Value' name: kafka_partition_offline type: GAUGE
- pattern: 'kafka.controller<type=KafkaController, name=ActiveControllerCount><>Value' name: kafka_controller_active_count type: GAUGE
# Broker-level replica metrics - pattern: 'kafka.server<type=ReplicaManager, name=UnderMinIsrPartitionCount><>Value' name: kafka_partition_under_min_isr type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=LeaderCount><>Value' name: kafka_broker_leader_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=PartitionCount><>Value' name: kafka_partition_count type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=UnderReplicatedPartitions><>Value' name: kafka_partition_under_replicated type: GAUGE
- pattern: 'kafka.server<type=ReplicaManager, name=IsrShrinksPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "shrink"
- pattern: 'kafka.server<type=ReplicaManager, name=IsrExpandsPerSec><>Count' name: kafka_isr_operation_count type: COUNTER labels: operation: "expand"
- pattern: 'kafka.server<type=ReplicaFetcherManager, name=MaxLag, clientId=Replica><>Value' name: kafka_max_lag type: GAUGE
# Broker topic metrics (totals) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec><>Count' name: kafka_message_count type: COUNTER
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalFetchRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=TotalProduceRequestsPerSec><>Count' name: kafka_request_count type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedFetchRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "fetch"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=FailedProduceRequestsPerSec><>Count' name: kafka_request_failed type: COUNTER labels: type: "produce"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec><>Count' name: kafka_network_io type: COUNTER labels: direction: "out"
# Per-topic metrics (only appear after traffic flows) - pattern: 'kafka.server<type=BrokerTopicMetrics, name=MessagesInPerSec, topic=(.+)><>Count' name: kafka_prod_msg_count type: COUNTER labels: topic: "$1"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesInPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "in"
- pattern: 'kafka.server<type=BrokerTopicMetrics, name=BytesOutPerSec, topic=(.+)><>Count' name: kafka_topic_io type: COUNTER labels: topic: "$1" direction: "out"
# Request metrics - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>99thPercentile' name: kafka_request_time_99p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestChannel, name=RequestQueueSize><>Value' name: kafka_request_queue type: GAUGE
- pattern: 'kafka.server<type=DelayedOperationPurgatory, name=PurgatorySize, delayedOperation=(.+)><>Value' name: kafka_purgatory_size type: GAUGE labels: type: "$1"
# Controller stats - pattern: 'kafka.controller<type=ControllerStats, name=LeaderElectionRateAndTimeMs><>Count' name: kafka_leader_election_rate type: COUNTER
- pattern: 'kafka.controller<type=ControllerStats, name=UncleanLeaderElectionsPerSec><>Count' name: kafka_unclean_election_rate type: COUNTER
# JVM Garbage Collection - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionCount' name: jvm_gc_collections_count type: COUNTER labels: name: "$1"
# JVM Memory - pattern: 'java.lang<type=Memory><HeapMemoryUsage>max' name: jvm_memory_heap_max type: GAUGE
- pattern: 'java.lang<type=Memory><HeapMemoryUsage>used' name: jvm_memory_heap_used type: GAUGE
# JVM Threading and System - pattern: 'java.lang<type=Threading><>ThreadCount' name: jvm_thread_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>SystemCpuLoad' name: jvm_system_cpu_utilization type: GAUGE
# Broker uptime - pattern: 'java.lang<type=Runtime><>Uptime' name: kafka_broker_uptime type: GAUGE
# Additional metrics — remove this section to reduce data ingest
# Request latency: total count, 50th percentile, and average (99p kept above) - pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Count' name: kafka_request_time_total type: COUNTER labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>50thPercentile' name: kafka_request_time_50p type: GAUGE labels: type: "$1"
- pattern: 'kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer|FetchFollower)><>Mean' name: kafka_request_time_avg type: GAUGE labels: type: "$1"
# Log flush metrics - pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>Count' name: kafka_logs_flush_count type: COUNTER
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>50thPercentile' name: kafka_logs_flush_time_50p type: GAUGE
- pattern: 'kafka.log<type=LogFlushStats, name=LogFlushRateAndTimeMs><>99thPercentile' name: kafka_logs_flush_time_99p type: GAUGE
# JVM GC elapsed time - pattern: 'java.lang<name=(.+), type=GarbageCollector><>CollectionTime' name: jvm_gc_collections_elapsed type: COUNTER labels: name: "$1"
# JVM Memory heap committed - pattern: 'java.lang<type=Memory><HeapMemoryUsage>committed' name: jvm_memory_heap_committed type: GAUGE
# JVM class loading - pattern: 'java.lang<type=ClassLoading><>LoadedClassCount' name: jvm_class_count type: GAUGE
# Additional JVM OS metrics - pattern: 'java.lang<type=OperatingSystem><>SystemLoadAverage' name: jvm_system_cpu_load_1m type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>AvailableProcessors' name: jvm_cpu_count type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>ProcessCpuLoad' name: jvm_cpu_recent_utilization type: GAUGE
- pattern: 'java.lang<type=OperatingSystem><>OpenFileDescriptorCount' name: jvm_file_descriptor_count type: GAUGE
# JVM Memory Pool - pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>used' name: jvm_memory_pool_used type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><Usage>max' name: jvm_memory_pool_max type: GAUGE labels: name: "$1"
- pattern: 'java.lang<type=MemoryPool, name=(.+)><CollectionUsage>used' name: jvm_memory_pool_used_after_last_gc type: GAUGE labels: name: "$1"팁
메트릭 사용자 지정: Prometheus JMX Exporter 예제 및 Kafka MBean 문서를 참조하여 패턴을 추가하거나 수정할 수 있습니다. 추가 설정은 JMX Exporter 규칙 문서 를 참조하십시오.
JMX Exporter를 사용하도록 Kafka 브로커 구성
Kafka 시작 옵션에 추가하여 Prometheus JMX Exporter를 각 Kafka 브로커에 자바 에이전트로 연결합니다.
단일 브로커 예시:
$JMX_JAR="$HOME/opentelemetry/jmx_prometheus_javaagent.jar"$JMX_CONFIG="$HOME/opentelemetry/kafka-jmx-config.yaml"$
$nohup env KAFKA_OPTS="-javaagent:${JMX_JAR}=9404:${JMX_CONFIG}" \> bin/kafka-server-start.sh config/server.properties &이제 각 브로커는 포트 9404에서 Prometheus 메트릭을 노출합니다. 확인:
$curl http://localhost:9404/metrics | grep kafka_중요
다중 브로커 클러스터: 모든 브로커에 동일한 KAFKA_OPTS 설정을 적용합니다. 각 브로커는 자체 호스트 IP의 포트 9404 에서 메트릭을 노출합니다.
수집기 설정 생성
모니터링 호스트의 ~/opentelemetry/collector-kafka-config.yaml 에 OpenTelemetry Collector 설정을 생성합니다.
Prometheus 수신기는 모든 브로커 엔드포인트를 스크랩합니다. 수집기는 Prometheus 엔드포인트를 스크래핑하는 것 외에도 0.0.0.0:4317 에서 모든 OTLP 데이터(애플리케이션 트레이스, 로그)를 수신 대기합니다.
receivers: # OTLP receiver for application traces, metrics, and logs (listens on port 4317) otlp: protocols: grpc: endpoint: "0.0.0.0:4317"
# Prometheus receiver scrapes JMX metrics from Kafka brokers prometheus/kafka-jmx: config: scrape_configs: - job_name: 'kafka-jmx-metrics' metrics_path: /metrics scrape_interval: 30s static_configs: # TODO: Replace each target with your broker hostname or IP, and set a unique broker.id per broker - targets: ['broker1-host:9404'] labels: broker.id: '0' - targets: ['broker2-host:9404'] labels: broker.id: '1' - targets: ['broker3-host:9404'] labels: broker.id: '2'
# Kafka metrics receiver for cluster-level consumer lag, topic, and partition metrics kafkametrics/cluster: brokers: ${env:KAFKA_BOOTSTRAP_BROKER_ADDRESSES} protocol_version: 2.0.0 scrapers: - brokers - topics - consumers collection_interval: 30s # Exclude internal Kafka topics (prefixed with __) at the source topic_match: "^[^_].*$" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
exporters: otlp/backend: endpoint: ${env:NEW_RELIC_OTLP_ENDPOINT} headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} tls: insecure: false sending_queue: num_consumers: 12 queue_size: 5000 retry_on_failure: enabled: true
processors: # Batch processor for efficient export batch/export: send_batch_size: 1024 timeout: 30s
# Memory limiter to prevent OOM memory_limiter: limit_percentage: 80 spike_limit_percentage: 30 check_interval: 1s
# Transform metric naming conventions (underscore to dot, normalize special names) transform/metric-naming: metric_statements: - context: metric statements: - replace_pattern(name, "_", ".") - replace_pattern(name, "\\.load\\.1", ".load_1") - replace_pattern(name, "\\.recent\\.util", ".recent_util") - replace_pattern(name, "file\\.descriptor\\.count", "file_descriptor.count") - replace_pattern(name, "\\.memory\\.pool\\.used\\.bytes$", ".memory.pool.used") - replace_pattern(name, "\\.memory\\.pool\\.max\\.bytes$", ".memory.pool.max") - replace_pattern(name, "\\.memory\\.pool\\.collection\\.used\\.bytes$", ".memory.pool.used_after_last_gc") - replace_pattern(name, "\\.non\\.preferred\\.leader", ".non_preferred_leader") - replace_pattern(name, "\\.under\\.min\\.isr", ".under_min_isr") - replace_pattern(name, "\\.under\\.replicated", ".under_replicated") - replace_pattern(name, "\\.total$", "") where name != "kafka.request.time.total" - context: datapoint statements: - set(attributes["name"], attributes["gc"]) where attributes["gc"] != nil - delete_key(attributes, "gc") where attributes["gc"] != nil - set(attributes["name"], attributes["pool"]) where attributes["pool"] != nil - delete_key(attributes, "pool") where attributes["pool"] != nil
# Add cluster name to all metrics resource/cluster-name: attributes: - key: kafka.cluster.name # TODO: Replace with your Kafka cluster name value: ${env:KAFKA_CLUSTER_NAME} action: upsert
# Remove broker.id for cluster-level metrics transform/remove_broker_id: metric_statements: - context: datapoint statements: - delete_key(attributes, "broker.id")
# Filter out scrape overhead metrics filter/scrape-overhead: metrics: exclude: match_type: regexp metric_names: - "^jmx_.*" - "^process_.*" - "^jvm_buffer_pool_.*" - "^jvm_threads_.*" - "^jvm_classes_.*" - "^jvm_memory_(heap|non_heap)_(committed|init|max|used)_bytes$" - "^jvm_compilation_.*" - "^jvm_(runtime|info).*" - "^jvm_memory_pool_(allocated_bytes_total|committed_bytes|init_bytes|collection_(committed|init|max)_bytes)$"
# Include only cluster-level metrics for the cluster pipeline filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "^kafka\\.partition\\.offline$" - "^kafka\\.(leader|unclean)\\.election\\.rate$" - "^kafka\\.partition\\.non_preferred_leader$" - "^kafka\\.broker\\.fenced\\.count$" - "^kafka\\.cluster\\.partition\\.count$" - "^kafka\\.cluster\\.topic\\.count$"
# Exclude cluster-level metrics from the broker pipeline filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "^kafka\\.partition\\.offline$" - "^kafka\\.(leader|unclean)\\.election\\.rate$" - "^kafka\\.partition\\.non_preferred_leader$" - "^kafka\\.broker\\.fenced\\.count$" - "^kafka\\.cluster\\.partition\\.count$" - "^kafka\\.cluster\\.topic\\.count$"
# Remove unnecessary attributes transform/remove_attributes: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != "" - context: resource statements: - delete_key(attributes, "server.address") - delete_key(attributes, "server.port") - delete_key(attributes, "service.instance.id") - delete_key(attributes, "host.name") - delete_key(attributes, "url.scheme")
# Aggregate partition metrics to topic level metricstransform/topic-aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum - include: kafka.partition.replicas action: insert new_name: kafka.partition.replicas.total operations: - action: aggregate_labels label_set: [topic] aggregation_type: sum
# Filter out original partition replicas metric filter/exclude_partition_replicas_metric: metrics: exclude: match_type: strict metric_names: - kafka.partition.replicas_in_sync
# Filter internal Kafka topics as a safety net filter/internal_topics: metrics: datapoint: - 'attributes["topic"] != nil and IsMatch(attributes["topic"], "^__.*")'
# Convert cumulative to delta metrics cumulativetodelta:
groupbyattrs/cluster: keys: [kafka.cluster.name]
metricstransform/cluster_max: transforms: - include: "kafka\\.partition\\.offline|kafka\\.leader\\.election\\.rate|kafka\\.unclean\\.election\\.rate|kafka\\.partition\\.non_preferred_leader|kafka\\.broker\\.fenced\\.count|kafka\\.cluster\\.partition\\.count|kafka\\.cluster\\.topic\\.count" match_type: regexp action: update operations: - action: aggregate_labels aggregation_type: max label_set: []
service: pipelines: # Application traces from instrumented Kafka clients and apps traces: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Application metrics from instrumented Kafka clients and apps metrics: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Application logs from instrumented Kafka clients and apps logs: receivers: [otlp] processors: [memory_limiter, batch/export] exporters: [otlp/backend]
# Broker-level metrics from Prometheus JMX scraping metrics/broker: receivers: - prometheus/kafka-jmx processors: - resource/cluster-name - filter/scrape-overhead - transform/metric-naming - transform/remove_attributes - filter/exclude_cluster_metrics - memory_limiter - cumulativetodelta - batch/export exporters: - otlp/backend
# Cluster-level metrics from Prometheus JMX scraping metrics/cluster/prometheus: receivers: - prometheus/kafka-jmx processors: - resource/cluster-name - filter/scrape-overhead - transform/metric-naming - transform/remove_attributes - filter/include_cluster_metrics - transform/remove_broker_id - memory_limiter - cumulativetodelta - groupbyattrs/cluster - metricstransform/cluster_max - batch/export exporters: - otlp/backend
# Cluster-level metrics from Kafka metrics receiver (consumer lag, topics, partitions) metrics/cluster/kafkametrics: receivers: - kafkametrics/cluster processors: - resource/cluster-name - filter/internal_topics - transform/remove_attributes - metricstransform/topic-aggregation - filter/exclude_partition_replicas_metric - memory_limiter - cumulativetodelta - batch/export exporters: - otlp/backend환경 변수 설정
수집기를 시작하기 전에 모니터링 호스트에 필수 환경 변수를 설정하십시오:
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BOOTSTRAP_BROKER_ADDRESSES="broker1-host:9092,broker2-host:9092,broker3-host:9092"$export NEW_RELIC_OTLP_ENDPOINT="https://otlp.nr-data.net:4317" # US region$# EU region: https://otlp.eu01.nr-data.net:4317구성 매개변수
다음 표에서는 주요 설정 시위에 대해 설명합니다.
| 변하기 쉬운 | 설명 |
|---|---|
NEW_RELIC_LICENSE_KEY | 예를 들어, 귀하의 뉴렐릭 라이선스 키 YOUR_LICENSE_KEY |
KAFKA_CLUSTER_NAME | Kafka 클러스터의 고유한 이름, 예를 들어 my-kafka-cluster |
KAFKA_BOOTSTRAP_BROKER_ADDRESSES | 예를 들어, 사용자의 Kafka 부트스트랩 브로커 주소 broker1-host:9092,broker2-host:9092,broker3-host:9092 |
NEW_RELIC_OTLP_ENDPOINT | OTLP 인제스트 엔드포인트. US 리전에는 https://otlp.nr-data.net:4317 을(를) 사용하고 EU 리전에는 https://otlp.eu01.nr-data.net:4317 을(를) 사용하십시오. 다른 설정의 경우, OTLP 엔드포인트 구성을 참조하세요. |
수집기를 설치하고 시작하세요.
모니터링 호스트에 수집기를 설치하고 실행하십시오. NRDOT Collector(뉴렐릭 배포판) 또는 OpenTelemetry Collector 중에서 선택하세요:
팁
NRDOT Collector 는 뉴렐릭의 지원이 제공되는 OpenTelemetry Collector의 뉴렐릭 배포판입니다. 자세한 내용은 NRDOT Collector GitHub 저장소를 참조하십시오.
1단계. 바이너리 다운로드 및 설치
$# Set version and architecture$NRDOT_VERSION="1.9.0"$ARCH="amd64" # or arm64$
$# Download and extract$curl "https://github.com/newrelic/nrdot-collector-releases/releases/download/${NRDOT_VERSION}/nrdot-collector_${NRDOT_VERSION}_linux_${ARCH}.tar.gz" \> --location --output collector.tar.gz$tar -xzf collector.tar.gz$
$# Move to a location in PATH (optional)$sudo mv nrdot-collector /usr/local/bin/$
$# Verify installation$nrdot-collector --version중요
다른 운영 시스템 및 복제의 경우 NRDOT Collector 릴리스를 방문하여 시스템에 적합한 바이너리를 다운로드하세요.
2단계. 수집기 시작
$nrdot-collector --config ~/opentelemetry/collector-kafka-config.yaml수집기는 몇 분 내에 Kafka 메트릭을 스크래핑하여 뉴렐릭으로 전송하기 시작합니다.
1단계. 바이너리 다운로드 및 설치
$# Check https://github.com/open-telemetry/opentelemetry-collector-releases/releases/latest for the latest version$OTEL_VERSION="<collector_version>"$ARCH="amd64"$
$curl -L -o otelcol-contrib.tar.gz \> "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v${OTEL_VERSION}/otelcol-contrib_${OTEL_VERSION}_linux_${ARCH}.tar.gz"$
$tar -xzf otelcol-contrib.tar.gz$sudo mv otelcol-contrib /usr/local/bin/$otelcol-contrib --version다른 운영 시스템에 대해서는 OpenTelemetry Collector 릴리스 페이지를 방문하세요.
2단계. 수집기 시작
$otelcol-contrib --config ~/opentelemetry/collector-kafka-config.yaml수집기는 몇 분 내에 Kafka 메트릭을 스크래핑하여 뉴렐릭으로 전송하기 시작합니다.
(선택사항) 제작자 또는 소비자를 위해
중요
언어 지원: 현재 OpenTelemetry 자바 에이전트를 사용한 Kafka 클라이언트 계측에는 자바 애플리케이션만 지원됩니다.
Kafka 프로듀서 및 소비자 애플리케이션에서 애플리케이션 수준의 텔레메트리를 수집하려면, 아직 다운로드하지 않은 경우 OpenTelemetry 자바 에이전트를 다운로드하세요:
$mkdir -p ~/opentelemetry$curl -L -o ~/opentelemetry/opentelemetry-javaagent.jar \> https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar에이전트로 시작하세요:
$OTEL_AGENT="$HOME/opentelemetry/opentelemetry-javaagent.jar"$
$java \> -javaagent:$OTEL_AGENT \> -Dotel.service.name="order-process-service" \> -Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \> -Dotel.exporter.otlp.endpoint=http://collector-host-ip:4317 \> -Dotel.exporter.otlp.protocol="grpc" \> -Dotel.metrics.exporter="otlp" \> -Dotel.traces.exporter="otlp" \> -Dotel.logs.exporter="otlp" \> -Dotel.instrumentation.kafka.experimental-span-attributes="true" \> -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \> -Dotel.instrumentation.kafka.producer-propagation.enabled="true" \> -Dotel.instrumentation.kafka.enabled="true" \> -Dotel.instrumentation.runtime-telemetry.enabled="false" \> -jar your-kafka-application.jar구성 매개변수
다음 표에서는 주요 설정 시위에 대해 설명합니다.
| 매개변수 | 설명 |
|---|---|
service.name | 예를 들어, 생산자 또는 소비자 애플리케이션의 고유한 이름으로 대체하십시오. order-process-service |
kafka.cluster.name | 예를 들어, 수집기 설정에 사용된 것과 동일한 클러스터 이름으로 교체하십시오. my-kafka-cluster |
otlp.endpoint | 예를 들어, OpenTelemetry Collector를 실행 중인 호스트의 호스트명 또는 IP로 대체하세요. http://collector-host-ip:4317 |
팁
위의 설정은 collector-host-ip:4317에서 실행 중인 OpenTelemetry Collector로 텔레메트리를 전송합니다. 애플리케이션 텔레메트리 전용 별도 수집기를 원하시는 경우, 다음 설정으로 생성하세요:
자바 에이전트는 코드 변경 없이 즉시 사용 가능한 Kafka 계측 을 제공하여 요청 지연시간, 처리량 메트릭, 오류율 및 분산 트레이스를 캡처합니다. 고급 설정에 대해서는 Kafka 계측 문서를 참조하세요.
(선택 사항) Kafka 브로커 로그 전달
Kafka 브로커 로그를 수집하여 뉴렐릭으로 전송하려면 OpenTelemetry Collector 에서 filelog 수신기를 구성하십시오.
데이터 찾기
몇 분 후 Kafka 데이터가 뉴렐릭에 나타납니다. 뉴렐릭 UI의 여러 뷰에서 Kafka 데이터를 탐색하는 방법에 대한 자세한 지침은 데이터 찾기 를 참조하세요.
다음 표는 각 신호 유형이 저장되는 위치를 요약합니다. 아래의 모든 쿼리에서 my-kafka-cluster 을(를) KAFKA_CLUSTER_NAME 값으로 바꾸십시오:
| 시그널 | 이벤트 유형 | 포함 사항 |
|---|---|---|
| 메트릭 | Metric | 브로커, 토픽, 파티션, 소비자 그룹 및 JVM 메트릭 |
| 로그 | Log | 생산자 및 소비자 애플리케이션의 로그(OTel 자바 에이전트를 통해) 및 자바 에이전트를 통해 수집된 브로커 로그 |
| 트레이스 | Span | 토픽 전반에 걸친 메시지당 publish 및 receive 작업을 포함하는 생산자 및 소비자 스팬 |
메트릭
브로커, 토픽, 파티션, 소비자 그룹 및 JVM 메트릭은 Metric 이벤트 유형에 저장됩니다. my-kafka-cluster 을(를) KAFKA_CLUSTER_NAME 값으로 교체합니다:
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago로그
OpenTelemetry 자바 에이전트로 계측된 생산자 및 소비자 애플리케이션의 로그와 -Dotel.logs.exporter=otlp 이(가) 설정된 경우의 브로커 로그는 Log 이벤트 유형에 저장됩니다:
FROM Log SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago트레이스
토픽 전반의 메시지당 publish 및 receive 작업을 포함한 생산자 및 소비자 스팬은 Span 이벤트 유형에 저장됩니다:
FROM Span SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster' SINCE 30 minutes ago예시
도커 Compose 설정, OTel Collector 설정, OTel 자바 에이전트 설정 및 샘플 생산자/소비자 애플리케이션이 포함된 완전한 작동 예제는 뉴렐릭 OpenTelemetry 예제 저장소에서 확인할 수 있습니다.
문제점 해결
다음 단계
- Kafka 메트릭 살펴보기 - 전체 메트릭 참조 자료를 확인하세요
- 맞춤형 대시보드 만들기 - Kafka 데이터에 대한 시각화 구축
- 알림 설정 ― 소비자 지연 및 과소 복제된 파티션과 같은 중요한 메트릭을 모니터합니다