Integrate your Airflow DAG pipeline runs with Atlan
🏃

Integrate your Airflow DAG pipeline runs with Atlan

image

🤓 Why?

This Atlan x Airflow integration will help the business users know information about their pipelines without reaching out to the engineering team. Metadata from pipeline runs is a goldmine of information. It can tell you critical information like:

  • Freshness
  • Last run time
  • Run schedule

Data democratization, here we come!

🤝 Integrate your Airflow DAG pipeline runs with Atlan

All information available in Airflow can be pushed into Atlan's business metadata. Atlan has a Python library for Airflow, making it super easy to push any kind of information into Atlan.

☑️ Prerequisites

  • Python 3.5 or greater
  • Apache Airflow 1.10.x
  • Atlan API key

🚀 Steps to push information from Airflow

1️⃣ Install the Atlan Airflow Plugin from the Python package registry

You can run the following command to install the plugin directly from pip:

python -m pip install atlan-airflow-plugin

If using an Airflow server or Amazon Managed Workflows for Airflow, please include this in your requirements.txt file:

atlan-airflow-plugin==0.0.1

2️⃣ Add connection details in Airflow

Go to "Connections" in the Admin dropdown. Click "Create".

image

Then fill in the following details and hit "Save":

  • Connection Id: atlan_default
    • Note: this exact atlan_default value is required to make use of the atlan-airflow-plugin
  • Host: Your Atlan instance URL (e.g. https://development.atlan.com)
  • Password: Your Atlan API key, which you can generate from the Admin tab in Atlan
image

3️⃣ Create the Business Metadata in Atlan

In the Admin panel in Atlan, you can define the custom metadata you want to bring from Airflow. To learn more about how you can create business metadata in Atlan, read the documentation here.

image

4️⃣ Define the attributes in the DAG file

AtlanBMOperator can be used to push the information from the DAG file to Atlan. The operator takes the following as inputs:

  • task_id: This is the identifier of the task in Airflow.
  • asset_guid: The GUID of the asset for which the Airflow attributes need to be pushed. This can be retrieved using the search API.
  • bm: This is the JSON body of the business metadata that will be pushed. It should be a JSON with the attribute's ID and the attribute values to be added to the asset.

The attribute ID can be obtained from this endpoint. This will give the list of all the business metadata available along with the attributes. The “name” attribute in businessMetadataDefs will be the ID of the business metadata, and the attributeDefs is the ID of a business metadata attribute you want to update.

{
	"businessMetadataDefs[1].name":{
	   "businessMetadataDefs[1].attributeDefs[1].name" : "value1"
		 "businessM etadataDefs[1].attributeDefs[2].name" : "value2"
	}
}

Here's a sample DAG file:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

from atlan_airflow_plugin.operators import AtlanBMOperator

args = {"owner": "Atlan Technologies Pvt Ltd", "start_date": airflow.utils.dates.days_ago(1)}

dag = DAG(dag_id="atlan_airflow_bm_demo", default_args=args, schedule_interval=None)

guid = ""

bm = {
 "33eb5d71-b0cc-4045-841a-9ca74132c266": {
        "0d8f7264-083c-4f85-3f77-54294d35c098": "1day",
        "0fb797a7-8755-4fd1-ab8b-15105104b13d": "Success",
        "61448f44-0e2c-4e5c-060e-470fafe2afa2": "1601317800000",
        "bd183b51-dead-4751-1be3-7d820faf7ddb": "1601317800000"
    }
}

with dag:

    some_task = BashOperator(task_id="any_task", bash_command="echo Hello!")

    push_bm = AtlanBMOperator(
        task_id="send_bm_to_atlan", asset_guid=guid, bm=bm, overwrite=True
    )

some_task >> push_bm

🎉 That's it! When the Airflow DAG runs, you'll start seeing all the information in Atlan.

image

🔗 Related Reads