Airflow Xcom Exclusive Site

If you want tighter Airflow integration, implement a custom XCom backend (subclass XCom) that exposes a method claim_xcom(key, consumer_id) which performs atomic claim semantics in the chosen storage (DB or external store). Register the custom backend in airflow.cfg (xcom backend class path).

Pros:

Simple design:


def task_a(**context):
    context['ti'].xcom_push(key=f"result_context['ti'].task_id", value=100)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract(**context): context['ti'].xcom_push(key='user_id', value=42) return "raw": "data"

def transform(**context): user_id = context['ti'].xcom_pull(key='user_id', task_ids='extract') raw = context['ti'].xcom_pull(task_ids='extract') return "transformed": raw["raw"] + f" for user user_id" airflow xcom exclusive

def load(**context): final = context['ti'].xcom_pull(task_ids='transform') print(final)

with DAG('exclusive_xcom_demo', start_date=datetime(2023,1,1), schedule=None) as dag: t1 = PythonOperator(task_id='extract', python_callable=extract) t2 = PythonOperator(task_id='transform', python_callable=transform) t3 = PythonOperator(task_id='load', python_callable=load) If you want tighter Airflow integration, implement a

t1 >> t2 >> t3