Skip to content

Commit 1ee78f3

Browse files
Add examples with Airflow patterns (#137)
DABs examples in python. Patterns for feature mapping for migrating common Airflow practices to Databricks Lakeflow Jobs - task values - file arrival trigger - for each tasks - programmatic generation & mutators --------- Co-authored-by: Zanita Rahimi <zanita.rahimi@databricks.com>
1 parent d7d25d5 commit 1ee78f3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1336
-0
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# job_backfill_data
2+
3+
This example demonstrates a Databricks Asset Bundle (DABs) Job that runs a SQL task with a date parameter for backfilling data.
4+
5+
The Job consists of:
6+
7+
1. **run_daily_sql** — A SQL task that runs `src/my_query.sql` with a `run_date` job parameter. The query inserts data from a source table into a target table filtered by `event_date = run_date`, so you can backfill or reprocess specific dates.
8+
9+
* `src/`: SQL and notebook source code for this project.
10+
* `src/my_query.sql`: Daily insert query that uses the `:run_date` parameter to filter by event date.
11+
* `resources/`: Resource configurations (jobs, pipelines, etc.)
12+
* `resources/backfill_data.py`: job definition with a parameterized SQL task.
13+
14+
## Job parameters
15+
16+
| Parameter | Default | Description |
17+
|------------|-------------|--------------------------------------|
18+
| `run_date` | `2024-01-01` | Date used to filter data (e.g. `event_date`). |
19+
20+
Before deploying, set `warehouse_id` in `resources/backfill_data.py` to your SQL warehouse ID, and adjust the catalog/schema/table names in `src/my_query.sql` to match your environment.
21+
22+
## Documentation
23+
24+
For more information about job backfills and parameters, see:
25+
- [Create and run jobs](https://docs.databricks.com/en/jobs/index.html)
26+
- [Backfill jobs](https://docs.databricks.com/aws/en/jobs/backfill-jobs)
27+
28+
## Getting started
29+
30+
Choose how you want to work on this project:
31+
32+
(a) Directly in your Databricks workspace, see
33+
https://docs.databricks.com/dev-tools/bundles/workspace.
34+
35+
(b) Locally with an IDE like Cursor or VS Code, see
36+
https://docs.databricks.com/vscode-ext.
37+
38+
(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html
39+
40+
If you're developing with an IDE, dependencies for this project should be installed using uv:
41+
42+
* Make sure you have the UV package manager installed.
43+
It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/.
44+
* Run `uv sync --dev` to install the project's dependencies.
45+
46+
## Using this project with the CLI
47+
48+
The Databricks workspace and IDE extensions provide a graphical interface for working
49+
with this project. You can also use the CLI:
50+
51+
1. Authenticate to your Databricks workspace, if you have not done so already:
52+
```
53+
$ databricks configure
54+
```
55+
56+
2. To deploy a development copy of this project, run:
57+
```
58+
$ databricks bundle deploy --target dev
59+
```
60+
(Note: "dev" is the default target, so `--target` is optional.)
61+
62+
This deploys everything defined for this project, including the job
63+
`[dev yourname] sql_backfill_example`. You can find it under **Workflows** (or **Jobs & Pipelines**) in your workspace.
64+
65+
3. To run the job with the default `run_date`:
66+
```
67+
$ databricks bundle run sql_backfill_example
68+
```
69+
70+
4. To run the job for a specific date (e.g. backfill):
71+
```
72+
$ databricks bundle run sql_backfill_example --parameters run_date=2024-02-01
73+
```
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# This is a Databricks asset bundle definition for job backfill data.
2+
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
3+
bundle:
4+
name: job_backfill_data
5+
6+
python:
7+
venv_path: .venv
8+
# Functions called to load resources defined in Python. See resources/__init__.py
9+
resources:
10+
- "resources:load_resources"
11+
12+
include:
13+
- resources/*.yml
14+
- resources/*/*.yml
15+
16+
targets:
17+
dev:
18+
mode: development
19+
default: true
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[project]
2+
name = "job_backfill_data"
3+
version = "0.0.1"
4+
authors = [{ name = "Databricks Field Engineering" }]
5+
requires-python = ">=3.10,<=3.13"
6+
dependencies = [
7+
# Any dependencies for jobs and pipelines in this project can be added here
8+
# See also https://docs.databricks.com/dev-tools/bundles/library-dependencies
9+
#
10+
# LIMITATION: for pipelines, dependencies are cached during development;
11+
# add dependencies to the 'environment' section of pipeline.yml file instead
12+
]
13+
14+
[dependency-groups]
15+
dev = [
16+
"pytest",
17+
"databricks-bundles==0.275.0",
18+
]
19+
20+
[build-system]
21+
requires = ["hatchling"]
22+
build-backend = "hatchling.build"
23+
24+
[tool.black]
25+
line-length = 125
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from databricks.bundles.core import (
2+
Bundle,
3+
Resources,
4+
load_resources_from_current_package_module,
5+
)
6+
7+
8+
def load_resources(bundle: Bundle) -> Resources:
9+
"""
10+
'load_resources' function is referenced in databricks.yml and is responsible for loading
11+
bundle resources defined in Python code. This function is called by Databricks CLI during
12+
bundle deployment. After deployment, this function is not used.
13+
"""
14+
15+
# the default implementation loads all Python files in 'resources' directory
16+
return load_resources_from_current_package_module()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from databricks.bundles.jobs import (
2+
Job,
3+
Task,
4+
SqlTask,
5+
SqlTaskFile,
6+
JobParameterDefinition,
7+
)
8+
9+
run_daily_sql = Task(
10+
task_key="run_daily_sql",
11+
sql_task=SqlTask(
12+
warehouse_id="<your_warehouse_id>",
13+
file=SqlTaskFile(path="src/my_query.sql"),
14+
parameters={"run_date": "{{job.parameters.run_date}}"},
15+
),
16+
)
17+
18+
job = Job(
19+
name="sql_backfill_example",
20+
tasks=[run_daily_sql],
21+
parameters=[
22+
JobParameterDefinition(name="run_date", default="2024-01-01"),
23+
],
24+
)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- referenced by sql_task
2+
INSERT INTO catalog.schema.target_table
3+
SELECT *
4+
FROM catalog.schema.source_table
5+
WHERE event_date = date(:run_date);
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# job_conditional_execution
2+
3+
This example demonstrates a Lakeflow Job that uses conditional task execution based on data quality checks.
4+
5+
The Lakeflow Job consists of following tasks:
6+
1. Checks data quality and calculates bad records
7+
2. Evaluates if bad records exceed a threshold (100 records)
8+
3. Routes to different processing paths based on the condition:
9+
- If bad records > 100: runs `fix_path` task
10+
- If bad records ≤ 100: runs `skip_path` task
11+
12+
* `src/`: Notebook source code for this project.
13+
* `src/check_quality.py`: Checks data quality and outputs bad record count
14+
* `src/fix_path.py`: Handles cases with high bad record count
15+
* `src/skip_path.py`: The skip path
16+
* `resources/`: Resource configurations (jobs, pipelines, etc.)
17+
* `resources/conditional_execution.py`: job definition with conditional tasks
18+
19+
## Documentation
20+
21+
For more information about conditional task execution, see:
22+
- [Add branching logic to a job with the If/else task](https://docs.databricks.com/aws/en/jobs/if-else)
23+
24+
## Getting started
25+
26+
Choose how you want to work on this project:
27+
28+
(a) Directly in your Databricks workspace, see
29+
https://docs.databricks.com/dev-tools/bundles/workspace.
30+
31+
(b) Locally with an IDE like Cursor or VS Code, see
32+
https://docs.databricks.com/vscode-ext.
33+
34+
(c) With command line tools, see https://docs.databricks.com/dev-tools/cli/databricks-cli.html
35+
36+
If you're developing with an IDE, dependencies for this project should be installed using uv:
37+
38+
* Make sure you have the UV package manager installed.
39+
It's an alternative to tools like pip: https://docs.astral.sh/uv/getting-started/installation/.
40+
* Run `uv sync --dev` to install the project's dependencies.
41+
42+
43+
# Using this project using the CLI
44+
45+
The Databricks workspace and IDE extensions provide a graphical interface for working
46+
with this project. It's also possible to interact with it directly using the CLI:
47+
48+
1. Authenticate to your Databricks workspace, if you have not done so already:
49+
```
50+
$ databricks configure
51+
```
52+
53+
2. To deploy a development copy of this project, type:
54+
```
55+
$ databricks bundle deploy --target dev
56+
```
57+
(Note that "dev" is the default target, so the `--target` parameter
58+
is optional here.)
59+
60+
This deploys everything that's defined for this project.
61+
For example, this project will deploy a job called
62+
`[dev yourname] conditional_execution_example` to your workspace.
63+
You can find that resource by opening your workspace and clicking on **Jobs & Pipelines**.
64+
65+
3. To run the job, use the "run" command:
66+
```
67+
$ databricks bundle run conditional_execution_example
68+
```
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# This is a Databricks asset bundle definition for job conditional execution.
2+
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
3+
bundle:
4+
name: job_conditional_execution
5+
6+
python:
7+
venv_path: .venv
8+
# Functions called to load resources defined in Python. See resources/__init__.py
9+
resources:
10+
- "resources:load_resources"
11+
12+
include:
13+
- resources/*.yml
14+
- resources/*/*.yml
15+
16+
targets:
17+
dev:
18+
mode: development
19+
default: true
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[project]
2+
name = "job_conditional_execution"
3+
version = "0.0.1"
4+
authors = [{ name = "Databricks Field Engineering" }]
5+
requires-python = ">=3.10,<=3.13"
6+
dependencies = [
7+
# Any dependencies for jobs and pipelines in this project can be added here
8+
# See also https://docs.databricks.com/dev-tools/bundles/library-dependencies
9+
#
10+
# LIMITATION: for pipelines, dependencies are cached during development;
11+
# add dependencies to the 'environment' section of pipeline.yml file instead
12+
]
13+
14+
[dependency-groups]
15+
dev = [
16+
"pytest",
17+
"databricks-bundles==0.275.0",
18+
]
19+
20+
[build-system]
21+
requires = ["hatchling"]
22+
build-backend = "hatchling.build"
23+
24+
[tool.black]
25+
line-length = 125
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from databricks.bundles.core import (
2+
Bundle,
3+
Resources,
4+
load_resources_from_current_package_module,
5+
)
6+
7+
8+
def load_resources(bundle: Bundle) -> Resources:
9+
"""
10+
'load_resources' function is referenced in databricks.yml and is responsible for loading
11+
bundle resources defined in Python code. This function is called by Databricks CLI during
12+
bundle deployment. After deployment, this function is not used.
13+
"""
14+
15+
# the default implementation loads all Python files in 'resources' directory
16+
return load_resources_from_current_package_module()

0 commit comments

Comments
 (0)