Airflow Xcom Exclusive Site
If you want tighter Airflow integration, implement a custom XCom backend (subclass XCom) that exposes a method claim_xcom(key, consumer_id) which performs atomic claim semantics in the chosen storage (DB or external store). Register the custom backend in airflow.cfg (xcom backend class path).
Pros:
Simple design:
def task_a(**context):
context['ti'].xcom_push(key=f"result_context['ti'].task_id", value=100)
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetimedef extract(**context): context['ti'].xcom_push(key='user_id', value=42) return "raw": "data"
def transform(**context): user_id = context['ti'].xcom_pull(key='user_id', task_ids='extract') raw = context['ti'].xcom_pull(task_ids='extract') return "transformed": raw["raw"] + f" for user user_id" airflow xcom exclusive
def load(**context): final = context['ti'].xcom_pull(task_ids='transform') print(final)
with DAG('exclusive_xcom_demo', start_date=datetime(2023,1,1), schedule=None) as dag: t1 = PythonOperator(task_id='extract', python_callable=extract) t2 = PythonOperator(task_id='transform', python_callable=transform) t3 = PythonOperator(task_id='load', python_callable=load) If you want tighter Airflow integration, implement a
t1 >> t2 >> t3