Agent Skill · Databricks

databricks-pipelines

Develop Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables) on Databricks. Use when building batch or streaming data pipelines with Python or SQL. Invoke BEFORE starting implementation.

Provider: Databricks Path in repo: skills/databricks-pipelines/SKILL.md

Skill body

Lakeflow Spark Declarative Pipelines Development

FIRST: Use the parent databricks-core skill for CLI basics, authentication, profile selection, and data discovery commands.

Decision Tree

Use this tree to determine which dataset type and features to use. Multiple features can apply to the same dataset — e.g., a Streaming Table can use Auto Loader for ingestion, Append Flows for fan-in, and Expectations for data quality. Choose the dataset type first, then layer on applicable features.

User request → What kind of output?
├── Intermediate/reusable logic (not persisted) → Temporary View
│   ├── Preprocessing/filtering before Auto CDC → Temporary View feeding CDC flow
│   ├── Shared intermediate streaming logic reused by multiple downstream tables
│   ├── Pipeline-private helper logic (not published to catalog)
│   └── Published to UC for external queries → Persistent View (SQL only)
├── Persisted dataset
│   ├── Source is streaming/incremental/continuously growing → Streaming Table
│   │   ├── File ingestion (cloud storage, Volumes) → Auto Loader
│   │   ├── Message bus (Kafka, Kinesis, Pub/Sub, Pulsar, Event Hubs) → streaming source read
│   │   ├── Existing streaming/Delta table → streaming read from table
│   │   ├── CDC / upserts / track changes / keep latest per key / SCD Type 1 or 2 → Auto CDC
│   │   ├── Multiple sources into one table → Append Flows (NOT union)
│   │   ├── Historical backfill + live stream → one-time Append Flow + regular flow
│   │   └── Windowed aggregation with watermark → stateful streaming
│   └── Source is batch/historical/full scan → Materialized View
│       ├── Aggregation/join across full dataset (GROUP BY, SUM, COUNT, etc.)
│       ├── Gold layer aggregation from streaming table → MV with batch read (spark.read / no STREAM)
│       ├── JDBC/Federation/external batch sources
│       └── Small static file load (reference data, no streaming read)
├── Output to external system (Python only) → Sink
│   ├── Existing external table not managed by this pipeline → Sink with format="delta"
│   │   (prefer fully-qualified dataset names if the pipeline should own the table — see Publishing Modes)
│   ├── Kafka / Event Hubs → Sink with format="kafka" + @dp.append_flow(target="sink_name")
│   ├── Custom destination not natively supported → Sink with custom format
│   ├── Custom merge/upsert logic per batch → ForEachBatch Sink (Public Preview)
│   └── Multiple destinations per batch → ForEachBatch Sink (Public Preview)
└── Data quality constraints → Expectations (on any dataset type)

Common Traps

Publishing Modes

Pipelines use a default catalog and schema configured in the pipeline settings. All datasets are published there unless overridden.

Comprehensive API Reference

MANDATORY: Before implementing, editing, or suggesting any code for a feature, you MUST read the linked reference file for that feature. NO exceptions — always look up the reference before writing code.

Some features require reading multiple skills together:

Dataset Definition APIs

Feature Python (current) Python (deprecated) SQL (current) SQL (deprecated) Skill (Py) Skill (SQL)
Streaming Table @dp.table() returning streaming DF @dlt.table() returning streaming DF CREATE OR REFRESH STREAMING TABLE CREATE STREAMING LIVE TABLE streaming-table-python streaming-table-sql
Materialized View @dp.materialized_view() @dlt.table() returning batch DF CREATE OR REFRESH MATERIALIZED VIEW CREATE LIVE TABLE (batch) materialized-view-python materialized-view-sql
Temporary View @dp.temporary_view() @dlt.view(), @dp.view() CREATE TEMPORARY VIEW CREATE TEMPORARY LIVE VIEW temporary-view-python temporary-view-sql
Persistent View (UC) N/A — SQL only CREATE VIEW view-sql
Streaming Table (explicit) dp.create_streaming_table() dlt.create_streaming_table() CREATE OR REFRESH STREAMING TABLE (no AS) streaming-table-python streaming-table-sql

Flow and Sink APIs

Feature Python (current) Python (deprecated) SQL (current) SQL (deprecated) Skill (Py) Skill (SQL)
Append Flow @dp.append_flow() @dlt.append_flow() CREATE FLOW ... INSERT INTO streaming-table-python streaming-table-sql
Backfill Flow @dp.append_flow(once=True) @dlt.append_flow(once=True) CREATE FLOW ... INSERT INTO ... ONCE streaming-table-python streaming-table-sql
Sink (Delta/Kafka/EH/custom) dp.create_sink() dlt.create_sink() N/A — Python only sink-python
ForEachBatch Sink @dp.foreach_batch_sink() N/A — Python only foreach-batch-sink-python

CDC APIs

Feature Python (current) Python (deprecated) SQL (current) SQL (deprecated) Skill (Py) Skill (SQL)
Auto CDC (streaming source) dp.create_auto_cdc_flow() dlt.apply_changes(), dp.apply_changes() AUTO CDC INTO ... FROM STREAM APPLY CHANGES INTO ... FROM STREAM auto-cdc-python auto-cdc-sql
Auto CDC (periodic snapshot) dp.create_auto_cdc_from_snapshot_flow() dlt.apply_changes_from_snapshot() N/A — Python only auto-cdc-python

Data Quality APIs

Feature Python (current) Python (deprecated) SQL (current) Skill (Py) Skill (SQL)
Expect (warn) @dp.expect() @dlt.expect() CONSTRAINT ... EXPECT (...) expectations-python expectations-sql
Expect or drop @dp.expect_or_drop() @dlt.expect_or_drop() CONSTRAINT ... EXPECT (...) ON VIOLATION DROP ROW expectations-python expectations-sql
Expect or fail @dp.expect_or_fail() @dlt.expect_or_fail() CONSTRAINT ... EXPECT (...) ON VIOLATION FAIL UPDATE expectations-python expectations-sql
Expect all (warn) @dp.expect_all({}) @dlt.expect_all({}) Multiple CONSTRAINT clauses expectations-python expectations-sql
Expect all or drop @dp.expect_all_or_drop({}) @dlt.expect_all_or_drop({}) Multiple constraints with DROP ROW expectations-python expectations-sql
Expect all or fail @dp.expect_all_or_fail({}) @dlt.expect_all_or_fail({}) Multiple constraints with FAIL UPDATE expectations-python expectations-sql

Reading Data APIs

Feature Python (current) Python (deprecated) SQL (current) SQL (deprecated) Skill (Py) Skill (SQL)
Batch read (pipeline dataset) spark.read.table("name") dp.read("name"), dlt.read("name") SELECT ... FROM name SELECT ... FROM LIVE.name
Streaming read (pipeline dataset) spark.readStream.table("name") dp.read_stream("name"), dlt.read_stream("name") SELECT ... FROM STREAM name SELECT ... FROM STREAM LIVE.name
Auto Loader (cloud files) spark.readStream.format("cloudFiles") STREAM read_files(...) auto-loader-python auto-loader-sql
Kafka source spark.readStream.format("kafka") STREAM read_kafka(...)
Kinesis source spark.readStream.format("kinesis") STREAM read_kinesis(...)
Pub/Sub source spark.readStream.format("pubsub") STREAM read_pubsub(...)
Pulsar source spark.readStream.format("pulsar") STREAM read_pulsar(...)
Event Hubs source spark.readStream.format("kafka") + EH config STREAM read_kafka(...) + EH config
JDBC / Lakehouse Federation spark.read.format("postgresql") etc. Direct table ref via federation catalog
Custom data source spark.read[Stream].format("custom") N/A — Python only
Static file read (batch) spark.read.format("json"\|"csv"\|...).load() read_files(...) (no STREAM)
Skip upstream change commits .option("skipChangeCommits", "true") read_stream("name", skipChangeCommits => true) streaming-table-python streaming-table-sql

Table/Schema Feature APIs

Feature Python (current) SQL (current) Skill (Py) Skill (SQL)
Liquid clustering cluster_by=[...] CLUSTER BY (col1, col2) materialized-view-python materialized-view-sql
Auto liquid clustering cluster_by_auto=True CLUSTER BY AUTO materialized-view-python materialized-view-sql
Partition columns partition_cols=[...] PARTITIONED BY (col1, col2) materialized-view-python materialized-view-sql
Table properties table_properties={...} TBLPROPERTIES (...) materialized-view-python materialized-view-sql
Explicit schema schema="col1 TYPE, ..." (col1 TYPE, ...) AS materialized-view-python materialized-view-sql
Generated columns schema="..., col TYPE GENERATED ALWAYS AS (expr)" col TYPE GENERATED ALWAYS AS (expr) materialized-view-python materialized-view-sql
Row filter (Public Preview) row_filter="ROW FILTER fn ON (col)" WITH ROW FILTER fn ON (col) materialized-view-python materialized-view-sql
Column mask (Public Preview) schema="..., col TYPE MASK fn USING COLUMNS (col2)" col TYPE MASK fn USING COLUMNS (col2) materialized-view-python materialized-view-sql
Private dataset private=True CREATE PRIVATE ... materialized-view-python materialized-view-sql

Import / Module APIs

Current Deprecated Notes
from pyspark import pipelines as dp import dlt Both work. Prefer dp. Do NOT change existing dlt imports.
spark.read.table() / spark.readStream.table() dp.read() / dp.read_stream() / dlt.read() / dlt.read_stream() Deprecated reads still work. Prefer spark.*.
LIVE. prefix Fully deprecated. NEVER use. Causes errors in newer pipelines.
CREATE LIVE TABLE / CREATE LIVE VIEW Fully deprecated. Use CREATE STREAMING TABLE / CREATE MATERIALIZED VIEW / CREATE TEMPORARY VIEW.

Language-specific guides

Lakeflow Spark Declarative Pipelines (formerly Delta Live Tables / DLT) is a framework for building batch and streaming data pipelines.

Migrating from DLT

If you have an existing DLT pipeline (import dlt, @dlt.table, dlt.read(...), dlt.apply_changes(...)) and want to move to SDP, see references/dlt-migration.md. It covers both migration paths — DLT Python → SDP Python (from pyspark import pipelines as dp) and DLT Python → SDP SQL — with side-by-side conversions for the table decorators, reads, expectations, CDC/SCD, and partitioning → liquid clustering.

Choose Your Workflow

Three project shapes exist — pick before scaffolding:

Situation Workflow
New standalone pipeline project with its own bundle A. Standalone bundle
Pipeline added to an existing DAB project B. Existing bundle
Quick prototyping, no bundle (yet) C. Rapid CLI iteration

Default to A for production-bound work and C for exploration. Full details, generated structures, polling patterns, and edit/re-upload flow in references/workflows.md.

Language Selection (Python vs SQL)

Decide before scaffolding — the choice picks template files (.py vs .sql) and which reference docs apply. Both can coexist, but pick a primary.

User signal Pick
“Python pipeline”, UDF, pandas, ML inference, pyspark Python
“SQL pipeline”, “SQL files” SQL
“Simple pipeline”, “create a table”, “an aggregation” SQL (simpler)
Complex parameterized logic, custom UDFs, ML Python

If ambiguous, ask. Stick with the chosen language unless the user explicitly switches.

Scaffolding a New Pipeline Project

The newer databricks pipelines init is focused on pipeline projects:

databricks pipelines init --output-dir . --config-file init-config.json

init-config.json:

{
  "project_name": "my_pipeline",
  "initial_catalog": "prod_catalog",
  "use_personal_schema": "no",
  "initial_language": "sql"
}

The template-based databricks bundle init lakeflow-pipelines also works:

databricks bundle init lakeflow-pipelines --config-file <(echo '{"project_name": "my_pipeline", "language": "python", "serverless": "yes"}') --profile <PROFILE> < /dev/null

Field constraints:

See references/workflows.md for the full generated structure, databricks.yml essentials, and per-target catalog/schema patterns.

After scaffolding, create CLAUDE.md and AGENTS.md in the project directory. These files are essential to provide agents with guidance on how to work with the project. Use this content:

# Declarative Automation Bundles Project

This project uses Declarative Automation Bundles (formerly Databricks Asset Bundles) for deployment.

## Prerequisites

Install the Databricks CLI (>= v0.288.0) if not already installed:
- macOS: `brew tap databricks/tap && brew install databricks`
- Linux: `curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh`
- Windows: `winget install Databricks.DatabricksCLI`

Verify: `databricks -v`

## For AI Agents

Read the `databricks-core` skill for CLI basics, authentication, and deployment workflow.
Read the `databricks-pipelines` skill for pipeline-specific guidance.

If skills are not available, install them: `databricks aitools install`

Pipeline Structure

my-pipeline-project/
├── databricks.yml                        # Bundle configuration
├── resources/
│   ├── my_pipeline.pipeline.yml          # Pipeline definition
│   └── my_pipeline_job.job.yml           # Scheduling job (optional)
└── src/
    ├── my_table.py (or .sql)             # One dataset per file
    ├── another_table.py (or .sql)
    └── ...

Scheduling Pipelines

To schedule a pipeline, add a job that triggers it in resources/<name>.job.yml:

resources:
  jobs:
    my_pipeline_job:
      trigger:
        periodic:
          interval: 1
          unit: DAYS
      tasks:
        - task_key: refresh_pipeline
          pipeline_task:
            pipeline_id: ${resources.pipelines.my_pipeline.id}

Running Pipelines

You must deploy before running. In local development, code changes only take effect after databricks bundle deploy. Always deploy before any run, dry run, or selective refresh.

Development Workflow

  1. Validate: databricks bundle validate --profile <profile>
  2. Deploy: databricks bundle deploy -t dev --profile <profile>
  3. Run pipeline: databricks bundle run <pipeline_name> -t dev --profile <profile>
  4. Check status: databricks pipelines get --pipeline-id <id> --profile <profile>

Pipeline API Reference

Detailed reference guides for each pipeline API. Read the relevant guide before writing pipeline code.

Project & Lifecycle

Datasets, Flows & Quality

Skill frontmatter

compatibility: Requires databricks CLI (>= v1.0.0) metadata: {"version"=>"0.3.0"} parent: databricks-core