Source URL: https://cloud.google.com/blog/products/data-analytics/apache-airflow-hierarchy-and-alerting-options-with-cloud-composer/
Source: Cloud Blog
Title: Scalable alerting for Apache Airflow to improve data orchestration reliability and performance
Feedly Summary: About
Apache Airflow is a popular tool for orchestrating data workflows. Google Cloud offers a managed Airflow service called Cloud Composer, a fully managed workflow orchestration service built on Apache Airflow that enables you to author, schedule, and monitor pipelines. And when running Cloud Composer, it’s important to have a robust logging and alerting setup to monitor your DAGs (Directed Acyclic Graphs) and minimize downtime in your data pipelines.
In this guide, we will review the hierarchy of alerting on Cloud Composer and the various alerting options available to Google Cloud engineers using Cloud Composer and Apache Airflow.
Getting started
Hierarchy of alerting on Cloud Composer
Composer environment
Cloud Composer environments are self-contained Airflow deployments based on Google Kubernetes Engine. They work with other Google Cloud services using connectors built into Airflow.
Cloud Composer provisions Google Cloud services that run your workflows and all Airflow components. The main components of an environment are GKE cluster, Airflow web server, Airflow database, and Cloud Storage bucket. For more information, check out Cloud Composer environment architecture.
Alerts at this level primarily consist of cluster and Airflow component performance and health.
Airflow DAG Runs
A DAG Run is an object representing an instantiation of the DAG at a point in time. Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the task’s state. Each DAG Run is run separately from one another, meaning that you can have many runs of a DAG at the same time.
Alerts at this level primarily consist of DAG Run state changes such as Success and Failure, as well as SLA Misses. Airflow’s Callback functionality can trigger code to send these alerts.
Airflow Task instances
A Task is the basic unit of execution in Airflow. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them in order to express the order they should run in. Airflow tasks include Operators and Sensors.
Like Airflow DAG Runs, Airflow Tasks can utilize Airflow Callbacks to trigger code to send alerts.
Summary
To summarize Airflow’s alerting hierarchy: Google Cloud → Cloud Composer Service → Cloud Composer Environment → Airflow Components (Worker) → Airflow DAG Run → Airflow Task Instance.
Any production-level implementation of Cloud Composer should have alerting and monitoring capabilities at each level in the hierarchy. Our Cloud Composer engineering team has extensive documentation around monitoring and alerting at the service/environment level.
Airflow Alerting on Google Cloud
Now, let’s consider three options for alerting at the Airflow DAG Run and Airflow Task level.
Option 1: Log-based alerting policies
Google Cloud offers native tools for logging and alerting within your Airflow environment. Cloud Logging centralizes logs from various sources, including Airflow, while Cloud Monitoring lets you set up alerting policies based on specific log entries or metrics thresholds.
You can configure an alerting policy to notify you whenever a specific message appears in your included logs. For example, if you want to know when an audit log records a particular data-access message, you can get notified when the message appears. These types of alerting policies are called log-based alerting policies. Check out Configure log-based alerting policies | Cloud Logging to learn more.
These services combine nicely with Airflow’s Callback feature previously mentioned above. To accomplish this:
Define a Callback function and set at the DAG or Task level.
Use Python’s native logging library to write a specific log message to Cloud Logging.
Define a log-based alerting policy triggered by the specific log message and sends alerts to a notification channel.
Pros and cons
Pros:
Lightweight, minimal setup: no third party tools, no email server set up, no additional Airflow providers required
Integration with Logs Explorer and Log-based metrics for deeper insights and historical analysis
Multiple notification channel options
Cons:
Email alerts contain minimal info
Learning curve and overhead for setting up log sinks and alerting policies
Costs associated with Cloud Logging and Cloud Monitoring usage
Sample code
Airflow DAG Callback:
code_block
This Airflow DAG uses a Python operator to miss a defined SLA and/or raise an Airflow Exception. If the DAG Run enters a failed state it triggers the log_on_dag_failure callback function and if it misses an SLA it triggers the log_on_sla_miss callback function. Both of these callbacks log a specific message string "Airflow DAG Failure:" and "Airflow SLA Miss:" respectively. These are the messages that the log-based alerting catches and uses to send an alert to the defined notification channel.
Airflow Task callback:
code_block
<ListValue: [StructValue([(‘code’, ‘import time\r\nimport random\r\nimport logging\r\nfrom datetime import timedelta\r\nfrom airflow import models\r\nfrom airflow.operators.python_operator import PythonOperator\r\nfrom airflow import AirflowException\r\n\r\nON_TASK_FAILURE_ALERT = "Airflow Task Failure:"\r\n\r\ndef log_on_task_failure(context):\r\n ti = context.get(\’task_instance\’)\r\n log_msg = f"""\r\n {ON_TASK_FAILURE_ALERT}\r\n *DAG*: {ti.dag_id}\r\n *DAG Run*: {ti.run_id}\r\n *Task*: {ti.task_id}\r\n *state*: {ti.state}\r\n *operator*: {ti.operator}\r\n """\r\n\r\n logging.info(log_msg)\r\n\r\n\r\nwith models.DAG(\r\n f"log_alert_demo",\r\n schedule=f"*/5 * * * *", # every 5 minutes\r\n default_args={\r\n "owner": "Google",\r\n "depends_on_past": False,\r\n "retries": 1,\r\n "retry_delay": timedelta(minutes=1),\r\n },\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n) as dag:\r\n\r\n def one_minute_run():\r\n """\r\n run for one minutes\r\n """\r\n time.sleep(60)\r\n\r\n # 50% chance to raise an exception\r\n if random.randint(0,8) % 2 == 0:\r\n raise AirflowException("Error msg")\r\n\r\n\r\n one_minute_task = PythonOperator(\r\n task_id=\’one_minute_task\’, \r\n python_callable=one_minute_run,\r\n on_failure_callback=log_on_task_failure\r\n )\r\n\r\n one_minute_task’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e79fb1b5430>)])]>
In this example, the task instance itself calls back to log_on_task_failure. Since you can set specific callback functions at the task-level, you have great flexibility on when and how you send alerts based on a given task.
Option 2: Email alerts via SendGrid
SendGrid is an SMTP service provider and Cloud Composer’s email notification service of choice. For more information, check out how to Configure email notifications on Cloud Composer.
Pros and cons
Pros:
Widely supported and reliable notification method
Detailed emails with formatted log snippets for analysis
Uses native Airflow EmailOperator
Flexible recipient lists on a per-task basis
Cons:
Can be overwhelming with a high volume of alerts
Requires configuring an external email provider (SendGrid) and managing email templates
Might get lost in inboxes if not prioritized or filtered correctly
Costs associated with SendGrid
Sample code
EmailOperator
code_block
<ListValue: [StructValue([(‘code’, ‘from datetime import datetime\r\n\r\nimport airflow\r\nfrom airflow.operators.email import EmailOperator\r\n\r\nwith airflow.DAG(\r\n "demo_sendgrid_email",\r\n start_date=datetime(2024, 1, 1),\r\n default_args={\r\n "owner": "Google",\r\n "depends_on_past": False,\r\n "retries": 1,\r\n "retry_delay": timedelta(minutes=1),\r\n "sla": timedelta(minutes=55),\r\n },\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n) as dag:\r\n task_email = EmailOperator(\r\n task_id="send-email",\r\n conn_id="sendgrid_default",\r\n # You can specify more than one recipient with a list.\r\n to="user@example.com",\r\n subject="EmailOperator test for SendGrid",\r\n html_content="This is a test message sent through SendGrid."\r\n )’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e79fb1b5940>)])]>
Option 3: Third-party tools such as Slack, Pagerduty
Since Airflow is open-source, there are other providers to choose from that can handle alerting and notifications for you, such as Slack or Pagerduty.
Pros and cons
Pros:
Real-time notifications in a familiar communication channel
Customizable formatting and the ability to send messages to specific channels or users
Third-party options integrate with your team’s existing communication workflow. Alerts can be discussed directly, keeping the context and resolution steps together. This promotes faster troubleshooting and knowledge sharing compared to isolated emails or logging entries.
Cons:
Requires a third-party workspace , webhook, and API token setup
Requires management of additional Airflow connections
Might lead to notification fatigue if not used judiciously
Potential security concerns if the webhook or API token is compromised
Potentially limited long-term log storage within third-party message history
Costs associated with third-party tools
Sample code
Slack:
code_block
<ListValue: [StructValue([(‘code’, ‘from airflow import DAG\r\nfrom datetime import datetime, timedelta\r\nfrom airflow.operators.python import PythonOperator\r\nfrom airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator\r\n\r\n# Replace with your actual Slack webhook URL (get it from your Slack app settings)\r\nSLACK_CONN_ID = \’your_slack_connection\’\r\n\r\ndef task_fail_slack_alert(context):\r\n slack_msg = f"""\r\n :red_circle: Airflow Task Failed :red_circle:\r\n *Task*: {context.get(\’task_instance\’).task_id} \r\n *DAG*: {context.get(\’task_instance\’).dag_id}\r\n *Execution Time*: {context.get(\’execution_date\’)}\r\n *Log URL*: {context.get(\’task_instance\’).log_url}\r\n """\r\n return SlackWebhookOperator(\r\n task_id=\’slack_alert\’,\r\n http_conn_id=SLACK_CONN_ID,\r\n message=slack_msg,\r\n ).execute(context=context) # Send the message immediately\r\n\r\nwith DAG(\r\n \’demo_slack_alerts\’,\r\n start_date=datetime(2024, 1, 1),\r\n default_args={\r\n "owner": "Google",\r\n "depends_on_past": False,\r\n "retries": 1,\r\n "retry_delay": timedelta(minutes=1),\r\n "sla": timedelta(minutes=55),\r\n },\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n) as dag:\r\n\r\n task_that_might_fail = PythonOperator(\r\n task_id=\’failing_task\’,\r\n python_callable=lambda: 1 / 0, # This will raise a ZeroDivisionError\r\n on_failure_callback=task_fail_slack_alert,\r\n )’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e79fb1b5d00>)])]>
Pagerduty:
code_block
<ListValue: [StructValue([(‘code’, ‘from datetime import datetime\r\nfrom airflow import DAG\r\nfrom airflow.operators.bash import BashOperator\r\nfrom airflow.providers.pagerduty.notifications.pagerduty import send_pagerduty_notification\r\n\r\nwith DAG(\r\n "demo_pagerduty_alerts",\r\n start_date=datetime(2024, 1, 1),\r\n default_args={\r\n "owner": "Google",\r\n "depends_on_past": False,\r\n "retries": 1,\r\n "retry_delay": timedelta(minutes=1),\r\n "sla": timedelta(minutes=55),\r\n },\r\n is_paused_upon_creation=True,\r\n catchup=False,\r\n max_active_runs=1,\r\n dagrun_timeout=timedelta(minutes=60),\r\n on_failure_callback=[\r\n send_pagerduty_notification(\r\n summary="The dag failed",\r\n severity="critical",\r\n source="airflow dag_id: ",\r\n dedup_key="-",\r\n group="",\r\n component="airflow",\r\n class_type="Prod Data Pipeline",\r\n )\r\n ],\r\n):\r\n BashOperator(\r\n task_id="mytask",\r\n bash_command="fail",\r\n on_failure_callback=[\r\n send_pagerduty_notification(\r\n summary="The task failed",\r\n severity="critical",\r\n source="airflow dag_id: ",\r\n dedup_key="-",\r\n group="",\r\n component="airflow",\r\n class_type="Prod Data Pipeline",\r\n )\r\n ],\r\n )’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e79fb1b5280>)])]>
Opinionated guidance and next steps
Considering the pros and cons, we recommend log-based alerting policies (Option 1) for Airflow alerting in production environments. This approach offers scalable log collection, simple threshold-based alerting, diverse notification channels, metric exploration, and integration with other Google Cloud services. Logging is intuitive and integrated with Cloud Composer, eliminating the need for extra provider packages.
By incorporating logging and alerting into your Airflow DAGs, you proactively monitor your data pipelines and leverage the full potential of Google Cloud.
To learn more about Cloud Composer, Apache Airflow, and the alerting mechanisms discussed in this guide, consider exploring the following resources:
Cloud Composer Documentation
Apache Airflow Documentation
Cloud Logging Documentation
Cloud Monitoring Documentation
Also check out some of our other Cloud Composer-related Google Cloud blogs:
Optimize Cloud Composer via Better Airflow DAGs
maximize the benefits of Cloud Composer and reduce parse times
A Cloud Composer tenancy case study
AI Summary and Description: Yes
**Summary:**
The text provides a comprehensive guide on how to set up alerting mechanisms in Google Cloud’s managed workflow orchestration service, Cloud Composer, which is built on Apache Airflow. The guide focuses on the hierarchy of alerting levels and various options available for monitoring the health of data workflows. The insights are particularly valuable for professionals in cloud computing security and infrastructure management, emphasizing the importance of robust logging and alerting systems in ensuring operational resilience.
**Detailed Description:**
The guide discusses the importance of having a robust logging and alerting setup while using Cloud Composer to monitor Directed Acyclic Graphs (DAGs) and minimize pipeline downtime. It outlines methods to establish effective monitoring through a hierarchical alerting structure and various alert options available for Google Cloud engineers.
Key points covered in the guide include:
– **Introduction to Cloud Composer**:
– A managed service for orchestrating data workflows using Apache Airflow.
– Built on Google Kubernetes Engine, offering seamless integration with other Google Cloud services.
– **Hierarchy of Alerting**:
– Describes the levels at which alerts can be configured, from Google Cloud services down to individual Airflow task instances.
– Each level provides different alerting options that correspond to the health and performance of the workflow components.
– **Alerting Options**:
1. **Log-based Alerting Policies**:
– Use of Google Cloud’s logging services to create alerting policies based on specific log entries or metrics.
– Pros: Lightweight setup, deeper insights, multiple notification options.
– Cons: Learning curve for configuration, email alerts containing minimal info.
2. **Email Alerts via SendGrid**:
– Offers the ability to send detailed alerts via email when certain conditions are met.
– Pros: Detailed notifications, native integration with Airflow.
– Cons: Configuration overhead, potential for email saturation.
3. **Third-party Tools (e.g., Slack, PagerDuty)**:
– Real-time notifications through established communication tools.
– Pros: Familiar interfaces, effective for team troubleshooting.
– Cons: Requires additional setup, potential security risks with integrations.
– **Recommendation**:
– The guide ultimately recommends log-based alerting policies as the most effective option for production environments, due to their scalability and integration capabilities with other Google Cloud services.
– **Call to Action**:
– Encourages further exploration of Cloud Composer and related documentation for continuous improvement of monitoring and alerting practices.
This analysis highlights the critical need for security and compliance professionals to implement and maintain effective alerting mechanisms as part of their infrastructure security posture, especially in cloud environments where workflows and data processing tasks are essential and can have significant business implications if they fail.