Monitorez votre cluster Apache Kafka auto-hébergé en installant le collecteur OpenTelemetry directement sur les hôtes Linux.
Avant de commencer
Assurez-vous d'avoir :
Un compte New Relic avec un
OpenJDK installé sur l'hôte de monitoring
JMX activé sur les brokers Kafka (généralement sur le port 9999)
Accès réseau du collecteur aux brokers Kafka :
- Port du serveur Bootstrap (généralement 9092)
- Port JMX (généralement 9999)
Étape 1 : Installer OpenTelemetry Collector
Téléchargez et installez le binaire OpenTelemetry Collector Contrib pour le système d'exploitation de votre hôte à partir des versions d'OpenTelemetry Collector.
Étape 2 : Télécharger le scraper JMX
Le collecteur JMX collecte des métriques détaillées à partir des MBeans du broker Kafka :
$# Create directory in user home (no sudo needed)$mkdir -p ~/opentelemetry$curl -L -o ~/opentelemetry/opentelemetry-jmx-scraper.jar \> https://github.com/open-telemetry/opentelemetry-java-contrib/releases/download/v1.52.0/opentelemetry-jmx-scraper.jarImportant
Compatibilité des versions: Ce guide utilise JMX Scraper 1.52.0. Les anciennes versions d'OpenTelemetry Collector peuvent ne pas inclure le hachage de ce scraper dans leur liste de compatibilité. Pour de meilleurs résultats, utilisez la dernière version d'OpenTelemetry Collector, qui inclut la prise en charge de cette version de JMX Scraper.
Étape 3 : Créer la configuration des métriques personnalisées JMX
Créez un fichier de configuration JMX personnalisé pour collecter des métriques Kafka supplémentaires non incluses dans le système cible par défaut.
Créez le fichier ~/opentelemetry/kafka-jmx-config.yaml avec la configuration suivante :
---rules: # Per-topic custom metrics using custom MBean commands - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec,topic=* metricAttribute: topic: param(topic) mapping: Count: metric: kafka.prod.msg.count type: counter desc: The number of messages in per topic unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=* metricAttribute: topic: param(topic) direction: const(in) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec,topic=* metricAttribute: topic: param(topic) direction: const(out) mapping: Count: metric: kafka.topic.io type: counter desc: The bytes received or sent per topic unit: By
# Cluster-level metrics using controller-based MBeans - bean: kafka.controller:type=KafkaController,name=GlobalTopicCount mapping: Value: metric: kafka.cluster.topic.count type: gauge desc: The total number of global topics in the cluster unit: "{topic}"
- bean: kafka.controller:type=KafkaController,name=GlobalPartitionCount mapping: Value: metric: kafka.cluster.partition.count type: gauge desc: The total number of global partitions in the cluster unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=FencedBrokerCount mapping: Value: metric: kafka.broker.fenced.count type: gauge desc: The number of fenced brokers in the cluster unit: "{broker}"
- bean: kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount mapping: Value: metric: kafka.partition.non_preferred_leader type: gauge desc: The count of topic partitions for which the leader is not the preferred leader unit: "{partition}"
# Broker-level metrics using ReplicaManager MBeans - bean: kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount mapping: Value: metric: kafka.partition.under_min_isr type: gauge desc: The number of partitions where the number of in-sync replicas is less than the minimum unit: "{partition}"
# Broker uptime metric using JVM Runtime - bean: java.lang:type=Runtime mapping: Uptime: metric: kafka.broker.uptime type: gauge desc: Broker uptime in milliseconds unit: ms
# Leader count per broker - bean: kafka.server:type=ReplicaManager,name=LeaderCount mapping: Value: metric: kafka.broker.leader.count type: gauge desc: Number of partitions for which this broker is the leader unit: "{partition}"
# JVM metrics - bean: java.lang:type=GarbageCollector,name=* mapping: CollectionCount: metric: jvm.gc.collections.count type: counter unit: "{collection}" desc: total number of collections that have occurred metricAttribute: name: param(name) CollectionTime: metric: jvm.gc.collections.elapsed type: counter unit: ms desc: the approximate accumulated collection elapsed time in milliseconds metricAttribute: name: param(name)
- bean: java.lang:type=Memory unit: By prefix: jvm.memory. dropNegativeValues: true mapping: HeapMemoryUsage.committed: metric: heap.committed desc: current heap usage type: gauge HeapMemoryUsage.max: metric: heap.max desc: current heap usage type: gauge HeapMemoryUsage.used: metric: heap.used desc: current heap usage type: gauge
- bean: java.lang:type=Threading mapping: ThreadCount: metric: jvm.thread.count type: gauge unit: "{thread}" desc: Total thread count (Kafka typical range 100-300 threads)
- bean: java.lang:type=OperatingSystem prefix: jvm. dropNegativeValues: true mapping: SystemLoadAverage: metric: system.cpu.load_1m type: gauge unit: "{run_queue_item}" desc: System load average (1 minute) - alert if > CPU count AvailableProcessors: metric: cpu.count type: gauge unit: "{cpu}" desc: Number of processors available ProcessCpuLoad: metric: cpu.recent_utilization type: gauge unit: '1' desc: Recent CPU utilization for JVM process (0.0 to 1.0) SystemCpuLoad: metric: system.cpu.utilization type: gauge unit: '1' desc: Recent CPU utilization for whole system (0.0 to 1.0) OpenFileDescriptorCount: metric: file_descriptor.count type: gauge unit: "{file_descriptor}" desc: Number of open file descriptors - alert if > 80% of ulimit
- bean: java.lang:type=ClassLoading mapping: LoadedClassCount: metric: jvm.class.count type: gauge unit: "{class}" desc: Currently loaded class count
- bean: java.lang:type=MemoryPool,name=* type: gauge unit: By metricAttribute: name: param(name) mapping: Usage.used: metric: jvm.memory.pool.used desc: Memory pool usage by generation (G1 Old Gen, Eden, Survivor) Usage.max: metric: jvm.memory.pool.max desc: Maximum memory pool size CollectionUsage.used: metric: jvm.memory.pool.used_after_last_gc desc: Memory used after last GC (shows retained memory baseline)Conseil
Personnaliser la collecte de métriques: Vous pouvez scraper des métriques Kafka supplémentaires en ajoutant des règles MBean personnalisées au fichier kafka-jmx-config.yaml :
Apprenez la syntaxe de base des règles de métriques JMX
Trouvez les noms MBean disponibles dans la documentation de monitoring Kafka
Cela vous permet de collecter n'importe quelle métrique JMX exposée par les brokers Kafka en fonction de vos besoins de monitoring spécifiques.
Étape 4 : Créer la configuration du collecteur
Créez la configuration principale du collecteur OpenTelemetry à ~/opentelemetry/config.yaml.
receivers: # Kafka metrics receiver for cluster-level metrics kafkametrics: brokers: - ${env:KAFKA_BROKER_ADDRESS} protocol_version: 2.8.0 scrapers: - brokers - topics - consumers collection_interval: 30s topic_match: ".*" metrics: kafka.topic.min_insync_replicas: enabled: true kafka.topic.replication_factor: enabled: true kafka.partition.replicas: enabled: false kafka.partition.oldest_offset: enabled: false kafka.partition.current_offset: enabled: false
# JMX receiver for broker-specific metrics jmx/kafka_broker-1: jar_path: ${env:HOME}/opentelemetry/opentelemetry-jmx-scraper.jar endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS} target_system: kafka collection_interval: 30s jmx_configs: ${env:HOME}/opentelemetry/kafka-jmx-config.yaml resource_attributes: broker.id: "1" broker.endpoint: ${env:KAFKA_BROKER_JMX_ADDRESS}
processors: batch/aggregation: send_batch_size: 1024 timeout: 30s
resourcedetection: detectors: [env, ec2, system] system: resource_attributes: host.name: enabled: true host.id: enabled: true
resource: attributes: - action: insert key: kafka.cluster.name value: ${env:KAFKA_CLUSTER_NAME}
transform/remove_broker_id: metric_statements: - context: resource statements: - delete_key(attributes, "broker.id")
filter/include_cluster_metrics: metrics: include: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
filter/exclude_cluster_metrics: metrics: exclude: match_type: regexp metric_names: - "kafka\\.partition\\.offline" - "kafka\\.(leader|unclean)\\.election\\.rate" - "kafka\\.partition\\.non_preferred_leader" - "kafka\\.broker\\.fenced\\.count" - "kafka\\.cluster\\.partition\\.count" - "kafka\\.cluster\\.topic\\.count"
transform/des_units: metric_statements: - context: metric statements: - set(description, "") where description != "" - set(unit, "") where unit != ""
cumulativetodelta:
metricstransform/kafka_topic_sum_aggregation: transforms: - include: kafka.partition.replicas_in_sync action: insert new_name: kafka.partition.replicas_in_sync.total operations: - action: aggregate_labels label_set: [ topic ] aggregation_type: sum
exporters: otlp/newrelic: endpoint: https://otlp.nr-data.net:4317 headers: api-key: ${env:NEW_RELIC_LICENSE_KEY} compression: gzip timeout: 30s
service: pipelines: metrics/brokers-cluster-topics: receivers: [jmx/kafka_broker-1, kafkametrics] processors: [resourcedetection, resource, filter/exclude_cluster_metrics, transform/des_units, cumulativetodelta, metricstransform/kafka_topic_sum_aggregation, batch/aggregation] exporters: [otlp/newrelic]
metrics/jmx-cluster: receivers: [jmx/kafka_broker-1] processors: [resourcedetection, resource, filter/include_cluster_metrics, transform/remove_broker_id, transform/des_units, cumulativetodelta, batch/aggregation] exporters: [otlp/newrelic]Notes de configuration :
- Point de terminaison OTLP: utilise
https://otlp.nr-data.net:4317(région US) ouhttps://otlp.eu01.nr-data.net:4317(région UE). Consultez Configurez votre point de terminaison OTLP pour d'autres régions
Important
Pour plusieurs brokers, ajoutez des récepteurs JMX supplémentaires avec différents points de terminaison et ID de broker pour monitorer chaque broker de votre cluster.
Étape 5 : Définir les variables d'environnement
Définissez les variables d'environnement requises :
$export NEW_RELIC_LICENSE_KEY="YOUR_LICENSE_KEY"$export KAFKA_CLUSTER_NAME="my-kafka-cluster"$export KAFKA_BROKER_ADDRESS="localhost:9092"$export KAFKA_BROKER_JMX_ADDRESS="localhost:9999"Remplacer :
YOUR_LICENSE_KEYavec votre clé de licence New Relicmy-kafka-clusteravec un nom unique pour votre cluster Kafkalocalhost:9092avec l'adresse de votre serveur d'amorçage Kafkalocalhost:9999avec votre point de terminaison JMX du broker Kafka
Étape 6 : Démarrer le collecteur
Exécutez le collecteur directement (pas besoin de sudo) :
$# Start the collector with your config$otelcol-contrib --config ~/opentelemetry/config.yamlLe collecteur commencera à envoyer des métriques Kafka à New Relic en quelques minutes.
Créez un service systemd pour une exécution persistante (nécessite sudo pour une configuration unique) :
$# Create systemd service file$sudo tee /etc/systemd/system/otelcol-contrib.service > /dev/null <<EOF$[Unit]$Description=OpenTelemetry Collector for Kafka$After=network.target$
$[Service]$Type=simple$User=$USER$WorkingDirectory=$HOME/opentelemetry$ExecStart=/usr/local/bin/otelcol-contrib --config $HOME/opentelemetry/config.yaml$Restart=on-failure$Environment="NEW_RELIC_LICENSE_KEY=YOUR_LICENSE_KEY"$Environment="KAFKA_CLUSTER_NAME=my-kafka-cluster"$Environment="KAFKA_BROKER_ADDRESS=localhost:9092"$Environment="KAFKA_BROKER_JMX_ADDRESS=localhost:9999"$
$[Install]$WantedBy=multi-user.target$EOFRemplacez YOUR_LICENSE_KEY et les autres valeurs, puis activez et démarrez le service :
$sudo systemctl daemon-reload$sudo systemctl enable otelcol-contrib$sudo systemctl start otelcol-contrib$sudo systemctl status otelcol-contribÉtape 7 : (Facultatif) Instrumenter les applications producteur ou consommateur
Pour collecter la télémétrie au niveau de l'application à partir de vos applications producteur et consommateur Kafka, utilisez l'Agent Java OpenTelemetry:
Téléchargez l'agent Java :
bash$mkdir -p ~/otel-java$curl -L -o ~/otel-java/opentelemetry-javaagent.jar \>https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jarDémarrez votre application avec l'agent :
bash$java \>-javaagent:~/otel-java/opentelemetry-javaagent.jar \>-Dotel.service.name="kafka-producer-1" \>-Dotel.resource.attributes="kafka.cluster.name=my-kafka-cluster" \>-Dotel.exporter.otlp.endpoint=https://otlp.nr-data.net:4317 \>-Dotel.exporter.otlp.protocol="grpc" \>-Dotel.metrics.exporter="otlp" \>-Dotel.traces.exporter="otlp" \>-Dotel.logs.exporter="otlp" \>-Dotel.instrumentation.kafka.experimental-span-attributes="true" \>-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled="true" \>-Dotel.instrumentation.kafka.producer-propagation.enabled="true" \>-Dotel.instrumentation.kafka.enabled="true" \>-jar your-kafka-application.jar
Remplacer :
kafka-producer-1avec un nom unique pour votre application producteur ou consommateurmy-kafka-clusteravec le même nom de cluster utilisé dans votre configuration de collecteurhttps://otlp.nr-data.net:4317avec votre point de terminaison OTLP New Relic (utilisezhttps://otlp.eu01.nr-data.net:4317pour la région UE). Pour les autres points de terminaison et options de configuration, consultez Configurez votre point de terminaison OTLP.
L'agent Java fournit l'instrumentation Kafka prête à l'emploi sans aucune modification de code, capturant :
- Latences des requêtes
- Métriques de débit
- Taux d'erreur
- traces distribuées
Pour une configuration avancée, consultez la documentation d'instrumentation Kafka.
Étape 6 : (Facultatif) Transférer les logs du broker Kafka
Pour collecter les logs du broker Kafka à partir de vos hôtes et les envoyer à New Relic, configurez le récepteur de logs de fichiers dans votre OpenTelemetry Collector.
Trouvez vos données
Après quelques minutes, vos métriques Kafka devraient apparaître dans New Relic. Consultez Trouver vos données pour obtenir des instructions détaillées sur l'exploration de vos métriques Kafka dans différentes vues de l'interface utilisateur New Relic.
Vous pouvez également interroger vos données avec NRQL :
FROM Metric SELECT * WHERE kafka.cluster.name = 'my-kafka-cluster'Dépannage
Prochaines étapes
- Explorer les métriques Kafka - Afficher la référence complète des métriques
- Créer des dashboards personnalisés - Créez des visualisations pour vos données Kafka
- Configurer les alertes - Monitorer les métriques critiques telles que le retard du consommateur et les partitions sous-répliquées