Skip to main content

Questions tagged [airflow-taskflow]

The tag has no usage guidance.

airflow-taskflow
0 votes
0 answers
23 views

How to solve Airflow reporting "Detected zombie job"? [closed]

I am constantly fetching data from an API using the Python request library in an iterative manner. When U am testing my code using notebooks, it is working fine, and once I finished my pipeline, I ...
Safwan Asghar's user avatar
0 votes
1 answer
45 views

how to create multiple task group in parallel using taskgroup decorator in Airflow

I would like to improve this DAG if possible. from ... import.... with DAG( ... ) as dag: @task_group(group_id=f"group", dag=dag) def group(dag_instance, dataset, table): ...
toyo123's user avatar
0 votes
0 answers
28 views

How to Handle Skipping Subsequent Tasks for Specific Indices in Dynamic Task Mapping in Airflow Task Groups

I’ve recently started working with Airflow and I’m using Airflow 2.9.1 with the Taskflow API. I’ve created a task group with dynamic task mapping and I’m using task decorators exclusively (no ...
Hemanth's user avatar
  • 161
0 votes
1 answer
44 views

Airflow decorated task type hinting

Take this simple dag, in which one task takes the output of another: import datetime from airflow.decorators import dag, task @task def task_that_returns_a_string() -> str: return "a ...
Izaak Cornelis's user avatar
0 votes
1 answer
47 views

Airflow XCom not retrieving values between tasks in DAG

I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code: from airflow ...
daniel guo's user avatar
0 votes
1 answer
25 views

How to set up airflow alerting so it alerts only on last retry attempt?

My current DAG looks like: def download_stage(dag_def: DailyDownloadDefinition, **additional_kwargs): ... retries = 5 download = ShortCircuitOperator( task_id=task_id, ...
user21641220's user avatar
0 votes
1 answer
53 views

Airflow - Dynamic mapped Task Group - Removing mapped task dependencies for all the sub task, and access mapped_input in task group directly

I am working with Airflow and have created a DAG that uses dynamic task mapping inside a task group. I have two questions: How to remove the line from get_files to process_file_step2? I want ...
Hemanth's user avatar
  • 161
0 votes
1 answer
49 views

Airflow XCom Push from SparkSubmitOperator not working

I have an airflow job which launches a Spark job in one task and the next task extracts the application logs to find the Spark job application ID. I use Xcom push in the spark-submit task and ...
Ajith Kannan's user avatar
0 votes
1 answer
32 views

In Airflow how to avoid a DAG to run a child DAG which is disabled in the Airflow UI?

I was told to disable the dbt DAG in Airflow, which I did, but the dbt DAG in my case is called by a parent "main" DAG, which calls an "extract" DAG before calling the dbt one. I ...
Jose Pla's user avatar
  • 110
0 votes
0 answers
49 views

How to use Airflow XCOM to push and pull data and create tasks dynamically

I am trying fetch files from sftp and push them to s3 bucket, fetching files from SFTP server and pushing them as df to xcom if no files found using branch operator to skip the task if files found ...
LIONEL JOSEPH's user avatar
0 votes
0 answers
51 views

Airflow confuse path_file with Jinja template: Exception rendering Jinja template for task 'start', field 'op_kwargs'

I am encuntering an unexpected behaviour from airflow when trying to pass path of file to tasks. My dag: @dag(dag_id="classifire_dag_01", schedule_interval=None, start_date="", ...
Dani Khalang's user avatar
0 votes
3 answers
43 views

Airflow PostgresOperator pass variable to sql parameter

In my PostgresOperator, I would like to pass SQL file path as variable to sql parameter. I am reading this file path from a configuration file: sql_execution = PostgresOperator( task_id='...
Hazhir's user avatar
  • 51
-1 votes
1 answer
25 views

the dag is getting triggered immediately when toggle is ON

This is my dag: dag_pr_api = DAG( dag_id='API-PERSONAL-COMPUTER', default_args=args, schedule_interval='30 07 * * *', start_date=pendulum.yesterday('Europe/Berlin'), catchup=False, ...
maneesh arava's user avatar
0 votes
0 answers
33 views

The TriggerDagRunOperator in airflow does not trigger the second dag

I recently read more about from airflow.operators.trigger_dagrun import TriggerDagRunOperator if one day is successful, another one should start. with DAG( 'delete_customer_retention', ...
Вениамин Шендоган's user avatar
0 votes
0 answers
8 views

if any one of the input sensors fail, it should stop at start - DAG should fail

I have a dag where I have a task called start which is a dummy operator . Before the start I have a few input sensors for checking the file . I need to run the DAG for test cases . all the input ...
Deeksha's user avatar

15 30 50 per page
1
2 3 4 5
9