Questions tagged [airflow-taskflow]
The airflow-taskflow tag has no usage guidance.
airflow-taskflow
134
questions
0
votes
0
answers
23
views
How to solve Airflow reporting "Detected zombie job"? [closed]
I am constantly fetching data from an API using the Python request library in an iterative manner. When U am testing my code using notebooks, it is working fine, and once I finished my pipeline, I ...
0
votes
1
answer
45
views
how to create multiple task group in parallel using taskgroup decorator in Airflow
I would like to improve this DAG if possible.
from ... import....
with DAG(
...
) as dag:
@task_group(group_id=f"group", dag=dag)
def group(dag_instance, dataset, table):
...
0
votes
0
answers
28
views
How to Handle Skipping Subsequent Tasks for Specific Indices in Dynamic Task Mapping in Airflow Task Groups
I’ve recently started working with Airflow and I’m using Airflow 2.9.1 with the Taskflow API. I’ve created a task group with dynamic task mapping and I’m using task decorators exclusively (no ...
0
votes
1
answer
44
views
Airflow decorated task type hinting
Take this simple dag, in which one task takes the output of another:
import datetime
from airflow.decorators import dag, task
@task
def task_that_returns_a_string() -> str:
return "a ...
0
votes
1
answer
47
views
Airflow XCom not retrieving values between tasks in DAG
I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code:
from airflow ...
0
votes
1
answer
25
views
How to set up airflow alerting so it alerts only on last retry attempt?
My current DAG looks like:
def download_stage(dag_def: DailyDownloadDefinition, **additional_kwargs):
...
retries = 5
download = ShortCircuitOperator(
task_id=task_id,
...
0
votes
1
answer
53
views
Airflow - Dynamic mapped Task Group - Removing mapped task dependencies for all the sub task, and access mapped_input in task group directly
I am working with Airflow and have created a DAG that uses dynamic task mapping inside a task group.
I have two questions:
How to remove the line from get_files to process_file_step2?
I want ...
0
votes
1
answer
49
views
Airflow XCom Push from SparkSubmitOperator not working
I have an airflow job which launches a Spark job in one task and the next task extracts the application logs to find the Spark job application ID. I use Xcom push in the spark-submit task and ...
0
votes
1
answer
32
views
In Airflow how to avoid a DAG to run a child DAG which is disabled in the Airflow UI?
I was told to disable the dbt DAG in Airflow, which I did, but the dbt DAG in my case is called by a parent "main" DAG, which calls an "extract" DAG before calling the dbt one.
I ...
0
votes
0
answers
49
views
How to use Airflow XCOM to push and pull data and create tasks dynamically
I am trying fetch files from sftp and push them to s3 bucket,
fetching files from SFTP server and pushing them as df to xcom
if no files found using branch operator to skip the task
if files found ...
0
votes
0
answers
51
views
Airflow confuse path_file with Jinja template: Exception rendering Jinja template for task 'start', field 'op_kwargs'
I am encuntering an unexpected behaviour from airflow when trying to pass path of file to tasks.
My dag:
@dag(dag_id="classifire_dag_01", schedule_interval=None, start_date="", ...
0
votes
3
answers
43
views
Airflow PostgresOperator pass variable to sql parameter
In my PostgresOperator, I would like to pass SQL file path as variable to sql parameter. I am reading this file path from a configuration file:
sql_execution = PostgresOperator(
task_id='...
-1
votes
1
answer
25
views
the dag is getting triggered immediately when toggle is ON
This is my dag:
dag_pr_api = DAG(
dag_id='API-PERSONAL-COMPUTER',
default_args=args,
schedule_interval='30 07 * * *',
start_date=pendulum.yesterday('Europe/Berlin'),
catchup=False,
...
0
votes
0
answers
33
views
The TriggerDagRunOperator in airflow does not trigger the second dag
I recently read more about
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
if one day is successful, another one should start.
with DAG(
'delete_customer_retention',
...
0
votes
0
answers
8
views
if any one of the input sensors fail, it should stop at start - DAG should fail
I have a dag where I have a task called start which is a dummy operator . Before the start I have a few input sensors for checking the file . I need to run the DAG for test cases .
all the input ...