Notre intégration dbt Cloud avec Airflow monitoree l'état de vos tâches et ressources dbt Cloud , vous aidant à identifier les problèmes tels que l'échec des exécutions, des modèles ou des tests.
Cette intégration s'exécute sur Apache Airflow et la requête Snowflake pour tous les tests ayant échoué si elle est configurée pour le faire.
Prérequis
- Compte dbt Cloud avec API activées et utilisant Snowflake comme base de données.
- Accès au compte Snowflake sur lequel le compte dbt Cloud s'exécute.
- Environnement Airflow existant version 2.8.1 ou supérieure, ou possibilité d'exécuter Docker Compose.
Installer l'intégration
Vous pouvez installer l'intégration New Relic dbt Cloud avec Airflow de l'une des manières suivantes :
- Installation dans votre environnement Airflow existant. Ceci est recommandé pour l’environnement de production.
- Installation avec Docker Compose. Ceci est adapté aux POC rapides.
Sélectionnez l'option la plus adaptée à vos besoins en cliquant sur son onglet :
Assurez-vous que vous disposez du fournisseur Snowflake, puis clonez le référentiel newrelic-dbt-cloud-integration
en exécutant ces commandes :
$pip install apache-airflow-providers-snowflake>=3.0.0
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
Copiez le contenu de airflow/dags
à la racine de votre dossier dags Airflow
Créez les cinq connexions Airflow nécessaires au DAG. Le tableau suivant fournit le nom de la connexion et les informations nécessaires à sa configuration. Notez que pour tous ceux-ci, le type est http
:
Nom de la connexion | Description | Type | Hôte et mot de passe | |
---|---|---|---|---|
| Vous permet de vous connecter à l'API d'administration de dbt Cloud avec |
| Hôte : https://cloud.getdbt.com/api/v2/accounts/ACCOUNT_ID/ (Remplacez Mot de passe : votre jeton d'API dbt Cloud (paramètres de profil) ou un jeton de compte de service | |
| Permet de se connecter à l'API de découverte dbt |
| Hôte : https://metadata.cloud.getdbt.com/graphql Mot de passe : Compte cloud des services DBt jeton | |
| Vous permet de télécharger un événement personnalisé sur New Relic |
| Hôte : https://insights-collector.newrelic.com/v1/accounts/ACCOUNT_ID/events (Remplacez Mot de passe : Vos informations NR détaillées insérer clé API | |
| Permet de requêter New Relic événement personnalisé |
| Hôte : https://insights-api.newrelic.com/v1/accounts/ACCOUNT_ID/query (Remplacez Mot de passe : Vos informations NR détaillées requête clé API |
Une fois que vous avez configuré les quatre éléments ci-dessus, vous devez configurer la connexion Snowflake. Snowflake vous permet d'interroger les lignes de test ayant échoué. Il existe de nombreuses façons de configurer une connexion Snowflake. Pour configurer à l'aide d'une paire de clés privées, renseignez l'attribut suivant :
Type
: Flocon de neigeLogin
: Votre nom d'utilisateur SnowflakeAccount
: Votre compte SnowflakeWarehouse
: Votre entrepôt SnowflakeRole
:Votre rôle de Flocon de Neige. Le rôle doit avoir accès à toutes les bases de données utilisées dans dbt Cloud pour obtenir toutes les lignes de test ayant échoué.Private Key Text
:La clé privée complète utilisée pour cette connexion.Password
:Phrase de passe pour la clé privée si elle est cryptée. Vide s'il n'est pas chiffré.
Terminez la configuration en activant le DAG new_relic_data_pipeline_observability_get_dbt_run_metadata2
.
Exécutez la commande suivante pour cloner le référentiel newrelic-dbt-cloud-integration
:
$git clone https://github.com/newrelic-experimental/newrelic-dbt-cloud-integration.git
Ensuite cd
dans le répertoire Airflow :
$cd newrelic-dbt-cloud-integration/airflow
Initialisez ensuite et exécutez Docker compose en exécutant les commandes suivantes :
$docker-compose up airflow-init
$docker-compose up
lancement de l'UI Airflow : http://localhost:8080
Créez les cinq connexions Airflow nécessaires au DAG. Le tableau suivant fournit le nom de la connexion et les informations nécessaires à sa configuration. Notez que pour tous ceux-ci, le type est http
:
Nom de la connexion | Description | Type | Hôte et mot de passe | |
---|---|---|---|---|
| Vous permet de vous connecter à l'API d'administration de dbt Cloud avec |
| Hôte : https://cloud.getdbt.com/api/v2/accounts/ACCOUNT_ID/ (Remplacez Mot de passe : votre jeton d'API dbt Cloud (paramètres de profil) ou un jeton de compte de service | |
| Permet de se connecter à l'API de découverte dbt |
| Hôte : https://metadata.cloud.getdbt.com/graphql Mot de passe : Compte cloud des services DBt jeton | |
| Vous permet de télécharger un événement personnalisé sur New Relic |
| Hôte : https://insights-collector.newrelic.com/v1/accounts/ACCOUNT_ID/events (Remplacez Mot de passe : Vos informations NR détaillées insérer clé API | |
| Permet de requêter New Relic événement personnalisé |
| Hôte : https://insights-api.newrelic.com/v1/accounts/ACCOUNT_ID/query (Remplacez Mot de passe : Vos informations NR détaillées requête clé API |
Une fois que vous avez configuré les quatre éléments ci-dessus, vous devez configurer la connexion Snowflake. Snowflake vous permet d'interroger les lignes de test ayant échoué. Il existe de nombreuses façons de configurer une connexion Snowflake. Pour configurer à l'aide d'une paire de clés privées, renseignez l'attribut suivant :
Type
: Flocon de neigeLogin
: Votre nom d'utilisateur SnowflakeAccount
: Votre compte SnowflakeWarehouse
: Votre entrepôt SnowflakeRole
:Votre rôle de Flocon de Neige. Le rôle doit avoir accès à toutes les bases de données utilisées dans dbt Cloud pour obtenir toutes les lignes de test ayant échoué.Private Key Text
:La clé privée complète utilisée pour cette connexion.Password
:Phrase de passe pour la clé privée si elle est cryptée. Vide s'il n'est pas chiffré.
Terminez la configuration en activant le DAG new_relic_data_pipeline_observability_get_dbt_run_metadata2
.
Trouvez vos données
Cette intégration crée et rapporte trois événements personnalisés à New Relic :
Configuration DAG
Relations:
Ce DAG est destiné à fonctionner tel quel sans aucune configuration. Dans le même temps, nous réalisons que votre entreprise peut avoir ses propres conventions de dénomination pour les connexions. En tant que tel, nous avons une configuration simple à l'intérieur dag_config.yml
où vous pouvez définir le nom des différentes connexions.
connections: dbt_cloud_admin_api: dbt_cloud_admin_api dbt_cloud_discovery_api: dbt_cloud_discovery_api nr_insights_query: nr_insights_query nr_insights_insert: nr_insights_insert snowflake_api: SNOWFLAKE
Équipe de course :
les tâches dbt peuvent appartenir à différentes équipes, mais il n'y a aucun endroit pour définir cela dans dbt Cloud. Nous pouvons utiliser le code Python pour définir l’équipe de manière dynamique. Pour écrire votre propre code, modifiez airflow/dags/nr_utils/nr_utils.py
et mettez toute logique nécessaire dans get_team_from_run()
. Les données d’exécution transmises à cette fonction ont accès à l’attribut suivant.
- project_name
- environment_name
- Tous les champs répertoriés dans l' API dbt Cloud v2 pour les exécutions. Tous les attributs sont précédés de « run_ »
Voici un exemple de fonction :
def get_team_from_run(run: dict) -> str: team = 'Data Engineering' if run['project_id'] == '11111' and run['environment_id'] in ['55555', '33333']: team = 'Platform' if re.match(r'Catch-all', run['job_name']): team = 'Project Catch All' return team
Configuration du projet DBt
Dans le cadre du projet Dbt, nous pouvons utiliser la méta-configuration pour définir une équipe supplémentaire et des paramètres spécifiques aux tests.
Team
:Bien querun_team determines
soit propriétaire des tâches, nous avons parfois besoin que les équipes en amont ou en aval reçoivent des notifications d'alerte sur les ressources ayant échoué, comme les tests et les modèles. La constitution d’une équipe nous aide à y parvenir.alert_failed_test_rows
: Le paramètreTrue
activera les lignes de test ayant échoué où nous exécutons la requête pour les tests ayant échoué et enverrons jusqu'aux 10 premières colonnes à New Relicfailed_test_rows_limit
:Nombre maximal de lignes de test ayant échoué à envoyer à New Relic. Nous avons une limite codée en dur de 100 lignes pour éviter les situations où nous envoyons des quantités déraisonnables à New Relic.slack_mentions
:Si vous activez les alertes Slack, ce champ vous permet de définir qui doit être mentionné dans le message.
La définition de cette valeur sur dbt_project.yml
définirait l'équipe sur « Ingénierie des données » et activerait les lignes de test ayant échoué.
models: dbt_fake_company: +meta: nr_config: team: 'Data Engineering' alert_failed_test_rows: False failed_test_rows_limit: 5 slack_mentions: '@channel, @business_users'
Nous pouvons ajouter un autre attribut appelé message aux ressources. Dans la configuration suivante, une équipe commerciale partenaire peut être alertée sur des tests spécifiques ayant échoué. De plus, nous pouvons définir des alertes sur les lignes de test ayant échoué elles-mêmes.
models: - name: important_business_model tests: - some_custom_test: config: meta: nr_config: team: 'Upstream Business Team' alert_failed_test_rows: true failed_test_rows_limit: 10 slack_mentions: '@channel, @business_user1, @engineer1' message: 'Important business process produced invalid data. Please check X tool'
Dépannage
Différentes versions d'Airflow combinées à différentes versions de fournisseurs peuvent induire des changements radicaux. Dans certains cas, vous devrez peut-être modifier le code pour qu'il corresponde aux versions spécifiques de votre environnement Airflow. Nous suivons les problèmes connus dans notre référentiel Github.