Chapter 14: CI/CD for Databricks Pipelines with Databricks Asset Bundles
Chapter Introduction
In previous chapters, we meticulously crafted robust data pipelines using Databricks Delta Live Tables (DLT) for real-time ingestion, Spark Structured Streaming for logistics cost monitoring, and various Spark jobs for tariff analysis and anomaly detection. We’ve built the individual components, but deploying and managing these complex pipelines across different environments (development, staging, production) can quickly become a significant challenge without proper automation. This is where Continuous Integration/Continuous Deployment (CI/CD) comes into play, ensuring that our code changes are consistently tested, validated, and deployed.
This chapter will guide you through establishing a production-ready CI/CD workflow for our Databricks assets using Databricks Asset Bundles (DABs). DABs provide an Infrastructure-as-Code (IaC) approach for managing Databricks workspaces, allowing us to define and deploy all our Databricks resources—like DLT pipelines, jobs, notebooks, and even associated infrastructure—declaratively. By the end of this chapter, you will have a fully automated process that takes our source code, validates it, and deploys it to the target Databricks environment, drastically improving reliability, consistency, and speed of delivery.
The primary outcome of this chapter will be a GitHub Actions workflow that automatically deploys our DLT pipelines and Spark Structured Streaming jobs to a specified Databricks workspace whenever changes are pushed to our version control system. We will leverage Databricks Asset Bundles to define our deployment targets and configurations, ensuring that environment-specific parameters are handled gracefully. This setup is crucial for maintaining the integrity and performance of our real-time supply chain analytics solution in a dynamic production environment.
Planning & Design
Implementing CI/CD for Databricks requires a structured approach. Our design will revolve around Databricks Asset Bundles as the central mechanism for defining and deploying our Databricks resources.
Component Architecture for CI/CD:
- Source Code Repository (GitHub): All our DLT pipeline code, Spark Structured Streaming jobs, and configuration files (
databricks.yml) will reside here. - Databricks Asset Bundle (DAB): A local project structure that defines all Databricks resources (DLT pipelines, jobs, notebooks, clusters) and their configurations for various environments. The
databricks.ymlfile is the heart of the bundle. - CI/CD Workflow (GitHub Actions): This automated process will:
- Trigger on specific events (e.g., push to
mainbranch). - Install the Databricks CLI.
- Authenticate with Databricks using a Personal Access Token (PAT) or Service Principal.
- Validate the Databricks Asset Bundle.
- Deploy the bundle to the target Databricks workspace.
- Trigger on specific events (e.g., push to
- Databricks Workspace: The actual environment where DLT pipelines run, Spark jobs execute, and data is processed and stored in Delta Lake tables. We will aim for
development,staging, andproductionworkspaces (or distinct directories/schemas within a single workspace).
File Structure:
We will adopt a structure that separates our source code from the bundle definition, allowing for clear organization.
.
├── .github/
│ └── workflows/
│ └── deploy.yml # GitHub Actions workflow for CI/CD
├── src/
│ ├── dlt_pipelines/
│ │ └── supply_chain_ingestion.py # Our DLT pipeline code
│ └── jobs/
│ └── logistics_cost_monitoring.py # Our Spark Structured Streaming job
│ └── tariff_analysis_batch.py # Our batch tariff analysis job
│ └── notebooks/
│ └── anomaly_detection.ipynb # Notebook for anomaly detection (can be run as job)
├── databricks.yml # Main Databricks Asset Bundle definition
├── README.md
└── requirements.txt
This structure ensures that our Python and notebook code are logically grouped, and the databricks.yml at the root defines how these assets are deployed.
Step-by-Step Implementation
3.1. Initializing the Databricks Asset Bundle Project
First, let’s set up the basic structure for our Databricks Asset Bundle.
a) Setup/Configuration
Create a new directory for your project if you haven’t already, and navigate into it. We’ll start by initializing a Databricks Asset Bundle.
# Ensure you are in the root of your project, e.g., real-time-supply-chain-analytics/
mkdir -p src/dlt_pipelines src/jobs src/notebooks
# Initialize a Databricks Asset Bundle
databricks bundle init
This command will prompt you to choose a template. Select the simple template for now, as we’ll customize it extensively. It will create a databricks.yml file and a resources directory (which we’ll later rename/restructure to match our src directory).
b) Core Implementation
Now, let’s move our existing DLT pipeline and Spark Structured Streaming job code into the src directory we just created. For demonstration, let’s assume you have:
- A DLT pipeline definition in
dlt_supply_chain_ingestion.py(from Chapter 2). - A Spark Structured Streaming job for logistics cost monitoring in
logistics_cost_monitoring.py(from Chapter 9). - A batch job for tariff analysis in
tariff_analysis_batch.py(from Chapter 6). - An anomaly detection notebook in
anomaly_detection.ipynb(from Chapter 12).
File: src/dlt_pipelines/supply_chain_ingestion.py
(This is a simplified example; your actual DLT code from Chapter 2 would go here.)
# src/dlt_pipelines/supply_chain_ingestion.py
import dlt
from pyspark.sql.functions import col, lit, current_timestamp, sha2, concat_ws
# Get environment-specific parameters (e.g., from DLT pipeline configuration)
# These will be passed via the databricks.yml bundle definition
@dlt.table(
comment="Raw Kafka events for supply chain, ingested into Bronze layer."
)
def bronze_supply_chain_events():
# Placeholder for Kafka source. In a real DLT, this would read from Kafka
# For bundle testing, we might use a simulated source or a file.
# In production, DLT would manage the Kafka connection.
# We assume Kafka data is JSON with 'value' and 'timestamp' fields.
kafka_source_topic = spark.conf.get("supply_chain.kafka_source_topic", "supply_chain_events_raw")
# Simulate Kafka read for demonstration
# In a real scenario, this would be:
# df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "your_kafka_brokers").option("subscribe", kafka_source_topic).load()
# For local testing with bundles, we might read from a file or mock data
# For DLT, the expectation is a streaming source.
# Let's assume a simple file-based mock for bundle deployment testing
# In production DLT, this would be a direct Kafka connector
# This is a placeholder. Real DLT would read from Kafka directly configured in DLT settings.
# For a bundle, DLT pipeline settings would include Kafka configs.
# Here, we're just defining the transformation logic.
# Example: Mocking a stream for DLT definition
# In a real DLT pipeline, you'd configure the Kafka source in the DLT UI or bundle config.
# This Python file defines the *tables* within that pipeline.
# For the purpose of this bundle definition, we need to ensure the DLT code is valid Python.
# The actual Kafka source connection details are typically passed as pipeline configurations.
# Example: Creating a dummy DataFrame for code validation
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"dbfs:/dlt_checkpoints/bronze_supply_chain_events_schema")
.load(spark.conf.get("supply_chain.bronze_source_path", "/databricks-datasets/structured-streaming/events/"))
.select(
col("value").cast("string").alias("raw_json"),
current_timestamp().alias("processing_timestamp"),
lit(kafka_source_topic).alias("source_topic")
)
)
return df
@dlt.table(
comment="Cleaned and enriched supply chain events in Silver layer."
)
@dlt.expect_or_drop("has_valid_json", "raw_json IS NOT NULL")
def silver_supply_chain_events():
# Read from bronze table
bronze_df = dlt.read("bronze_supply_chain_events")
# Example: Parse JSON, add a unique ID, and select relevant fields
silver_df = (
bronze_df
.withColumn("event_data", from_json(col("raw_json"), "SCHEMA_OF_YOUR_JSON_DATA")) # Replace SCHEMA_OF_YOUR_JSON_DATA
.withColumn("event_id", sha2(concat_ws("-", col("raw_json"), col("processing_timestamp")), 256))
.select(
col("event_id"),
col("processing_timestamp"),
col("event_data.*") # Explode event_data fields
)
# Example: Add data quality checks
.filter(col("event_data.item_id").isNotNull()) # Ensure critical fields are present
)
return silver_df
# Additional DLT tables (e.g., gold layer, aggregated views) would follow here.
File: src/jobs/logistics_cost_monitoring.py
(This is a simplified example; your actual Structured Streaming code from Chapter 9 would go here.)
# src/jobs/logistics_cost_monitoring.py
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
def create_spark_session(app_name):
"""Creates and returns a SparkSession."""
return SparkSession.builder.appName(app_name).getOrCreate()
def define_schema():
"""Defines the schema for logistics cost events."""
return StructType([
StructField("shipment_id", StringType(), True),
StructField("cost_type", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("currency", StringType(), True),
StructField("timestamp", TimestampType(), True)
])
def process_logistics_costs(spark, kafka_bootstrap_servers, kafka_topic, checkpoint_location, output_table_name):
"""
Reads logistics cost data from Kafka, processes it, and writes to a Delta table.
"""
print(f"Starting logistics cost monitoring job for topic: {kafka_topic}")
print(f"Kafka brokers: {kafka_bootstrap_servers}")
print(f"Checkpoint location: {checkpoint_location}")
print(f"Output table: {output_table_name}")
schema = define_schema()
# Read from Kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic) \
.option("startingOffsets", "latest") \
.load()
# Process streaming data
parsed_df = df.selectExpr("CAST(value AS STRING) as json_payload", "timestamp as kafka_timestamp") \
.withColumn("data", from_json(col("json_payload"), schema)) \
.select(col("data.*"), col("kafka_timestamp"), current_timestamp().alias("processing_time"))
# Add error handling for malformed JSON or missing fields
# Example: Filter out rows where 'amount' is null after parsing
validated_df = parsed_df.filter(col("amount").isNotNull())
# Write to Delta table
query = validated_df \
.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", checkpoint_location) \
.option("mergeSchema", "true") \
.trigger(processingTime="60 seconds") # Process every 60 seconds
.toTable(output_table_name)
print(f"Streaming query started for {output_table_name}.")
# For production, we would typically let the job run indefinitely.
# For testing, you might add a .awaitTermination(timeout=...)
query.awaitTermination()
print(f"Streaming query for {output_table_name} terminated.")
if __name__ == "__main__":
# Expecting arguments from the job configuration in databricks.yml
if len(sys.argv) < 5:
print("Usage: logistics_cost_monitoring.py <kafka_bootstrap_servers> <kafka_topic> <checkpoint_location> <output_table_name>")
sys.exit(1)
kafka_bootstrap_servers = sys.argv[1]
kafka_topic = sys.argv[2]
checkpoint_location = sys.argv[3]
output_table_name = sys.argv[4]
spark = create_spark_session("LogisticsCostMonitoring")
# Configure logging for better visibility
spark.sparkContext.setLogLevel("INFO")
try:
process_logistics_costs(spark, kafka_bootstrap_servers, kafka_topic, checkpoint_location, output_table_name)
except Exception as e:
print(f"Error in logistics cost monitoring job: {e}")
# Log the full traceback for debugging
import traceback
traceback.print_exc()
sys.exit(1) # Exit with an error code
File: src/jobs/tariff_analysis_batch.py
(This is a simplified example; your actual batch tariff analysis code would go here.)
# src/jobs/tariff_analysis_batch.py
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date, year, month, dayofmonth
def create_spark_session(app_name):
"""Creates and returns a SparkSession."""
return SparkSession.builder.appName(app_name).getOrCreate()
def run_tariff_analysis(spark, input_table, output_table, analysis_date=None):
"""
Performs historical tariff trend analysis and writes results to a Delta table.
"""
print(f"Starting tariff analysis job for input: {input_table}, output: {output_table}")
if analysis_date is None:
analysis_date = str(current_date()) # Default to current date if not provided
try:
# Read historical trade data (e.g., from a Delta table populated by DLT)
trade_data_df = spark.read.table(input_table)
# Example: Perform some tariff analysis (e.g., average tariff by HS code per month)
tariff_trends_df = trade_data_df \
.withColumn("analysis_year", year(col("import_date"))) \
.withColumn("analysis_month", month(col("import_date"))) \
.groupBy("hs_code", "analysis_year", "analysis_month") \
.agg(
avg("tariff_rate").alias("average_tariff_rate"),
count("shipment_id").alias("total_shipments")
) \
.withColumn("analysis_run_date", lit(analysis_date))
# Write results to output Delta table
# Using 'overwrite' for simplicity, but 'merge' is better for incremental updates
tariff_trends_df.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.saveAsTable(output_table)
print(f"Successfully completed tariff analysis. Results written to {output_table}.")
except Exception as e:
print(f"Error in tariff analysis job: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: tariff_analysis_batch.py <input_table> <output_table> [analysis_date]")
sys.exit(1)
input_table = sys.argv[1]
output_table = sys.argv[2]
analysis_date = sys.argv[3] if len(sys.argv) > 3 else None
spark = create_spark_session("TariffAnalysisBatch")
spark.sparkContext.setLogLevel("INFO")
run_tariff_analysis(spark, input_table, output_table, analysis_date)
File: src/notebooks/anomaly_detection.ipynb
(This would be your actual notebook content saved as .ipynb. For the bundle, we just need the file to exist.)
You would save your notebook content from Chapter 12 as src/notebooks/anomaly_detection.ipynb.
# Placeholder for src/notebooks/anomaly_detection.ipynb content
# This is typically a mix of markdown and code cells.
# For simplicity, imagine it contains PySpark code for anomaly detection.
# Example:
# %python
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("AnomalyDetection").getOrCreate()
#
# input_table = spark.conf.get("anomaly_detection.input_table")
# output_table = spark.conf.get("anomaly_detection.output_table")
#
# df = spark.read.table(input_table)
# # ... apply anomaly detection logic ...
# anomalies_df.write.format("delta").mode("overwrite").saveAsTable(output_table)
# print(f"Anomaly detection completed. Results in {output_table}")
c) Testing This Component
At this stage, we are just organizing files. The individual components (DLT, Structured Streaming, batch jobs) should have been tested in their respective chapters. The next step will be to test the bundle definition itself.
3.2. Defining Databricks Assets in databricks.yml
Now, let’s configure the databricks.yml file to define our Databricks resources and how they should be deployed to different environments.
a) Setup/Configuration
Open the databricks.yml file created by databricks bundle init. We will modify it to suit our project.
b) Core Implementation
The databricks.yml file uses YAML syntax to define resources, environments, and deployment settings. We’ll define:
- Bundle Name: A unique identifier for our project.
- Target Environments:
development,staging,production, each pointing to a different Databricks workspace (or configuration within one workspace). - Resources:
- DLT Pipelines: For
supply_chain_ingestion.py. - Jobs: For
logistics_cost_monitoring.pyandtariff_analysis_batch.py. - Notebooks: For
anomaly_detection.ipynb(to be run as a job).
- DLT Pipelines: For
File: databricks.yml
# databricks.yml
# This file defines the Databricks Asset Bundle for our Real-time Supply Chain project.
# It declares all Databricks resources (DLT pipelines, jobs, notebooks) and
# their configurations, allowing for consistent deployment across environments.
bundle:
name: supply-chain-analytics-bundle # A unique name for your bundle
environments:
# Development environment configuration
development:
# Host URL for your Databricks development workspace
# Replace with your actual Databricks workspace URL (e.g., https://adb-xxxxxxxxxxxxxxxx.xx.databricks.com)
host: {{ env.DATABRICKS_HOST_DEV }}
# Catalog and schema for development data. Using Unity Catalog best practices.
catalog: supply_chain_dev_catalog
schema: dev
# Base path for storing checkpoints and other artifacts in DBFS
base_path: /Users/{{ user.email }}/supply_chain_dev_bundle_artifacts
# Cluster configuration for development jobs (smaller, cost-optimized)
cluster_config: &dev_cluster_config
node_type_id: Standard_DS3_v2 # Example: Smaller instance type
num_workers: 2 # Example: Fewer workers
spark_version: 13.3.x-scala2.12 # Use a recent stable Spark version
autotermination_minutes: 30 # Auto-terminate after inactivity
data_security_mode: SINGLE_USER # For dev, single user is often sufficient
runtime_engine: STANDARD
custom_tags:
bundle_environment: development
bundle_name: supply-chain-analytics-bundle
# Staging environment configuration
staging:
host: {{ env.DATABRICKS_HOST_STAGING }}
catalog: supply_chain_staging_catalog
schema: staging
base_path: /supply_chain_staging_bundle_artifacts
cluster_config: &staging_cluster_config
node_type_id: Standard_DS4_v2 # Slightly larger for staging
num_workers: 4
spark_version: 13.3.x-scala2.12
autoscale:
min_workers: 2
max_workers: 6
autotermination_minutes: 60
data_security_mode: SINGLE_USER # Or USER_ISOLATION if Unity Catalog is enabled and shared
runtime_engine: PHOTON # Use Photon for better performance
custom_tags:
bundle_environment: staging
bundle_name: supply-chain-analytics-bundle
# Production environment configuration
production:
host: {{ env.DATABRICKS_HOST_PROD }}
catalog: supply_chain_prod_catalog
schema: prod
base_path: /supply_chain_prod_bundle_artifacts
cluster_config: &prod_cluster_config
node_type_id: Standard_DS5_v2 # Larger, more powerful for production
num_workers: 8
autoscale:
min_workers: 4
max_workers: 16
spark_version: 13.3.x-scala2.12
autotermination_minutes: 120 # Keep alive longer for critical jobs
data_security_mode: USER_ISOLATION # Recommended for production with Unity Catalog
runtime_engine: PHOTON
custom_tags:
bundle_environment: production
bundle_name: supply-chain-analytics-bundle
# Additional production-specific configurations, e.g., instance_pool_id
resources:
# All resources defined here will be deployed to the target environment.
# The 'target' property within a resource can override environment settings.
dlt_pipelines:
# DLT pipeline for real-time supply chain event ingestion
supply_chain_ingestion_pipeline:
name: "{{ bundle.name }}-{{ environment.schema }}-supply-chain-ingestion" # Unique name based on bundle and environment
target: "{{ environment.catalog }}.{{ environment.schema }}" # Unity Catalog target schema
configuration:
# Example DLT pipeline specific configurations
supply_chain.kafka_source_topic: "supply_chain_events_raw"
supply_chain.bronze_source_path: "/databricks-datasets/structured-streaming/events/" # Placeholder for demo
# Add actual Kafka broker configs here, often from secrets or environment variables
# spark.kafka.brokers: "{{ env.KAFKA_BROKERS }}"
clusters:
- label: default
num_workers: 3 # DLT manages cluster, but you can suggest size
node_type_id: Standard_DS4_v2
autoscale:
min_workers: 1
max_workers: 5
# Define libraries for the DLT pipeline
libraries:
- file:
path: ./src/dlt_pipelines/supply_chain_ingestion.py
continuous: false # Set to true for continuous processing
photon: true
channel: CURRENT # Use CURRENT for latest DLT features
development: # Overrides for development environment
continuous: false # Often run as triggered in dev for cost savings
photon: false # Disable photon in dev to save cost
channel: PREVIEW # Test new DLT features
production: # Overrides for production environment
continuous: true # Continuous processing for real-time
clusters:
- label: default
num_workers: 5
autoscale:
min_workers: 2
max_workers: 10
# Add notifications, alerts etc. for production
notifications:
- alert_type: ON_FAILURE
email_recipients:
- "[email protected]"
- "[email protected]"
jobs:
# Spark Structured Streaming job for logistics cost monitoring
logistics_cost_monitoring_job:
name: "{{ bundle.name }}-{{ environment.schema }}-logistics-cost-monitoring"
tasks:
- task_key: monitor_costs
python_wheel_task:
python_file: ./src/jobs/logistics_cost_monitoring.py
parameters:
- "{{ env.KAFKA_BOOTSTRAP_SERVERS }}" # Parameter 1: Kafka brokers
- "logistics_cost_events" # Parameter 2: Kafka topic
- "{{ environment.base_path }}/checkpoints/logistics_cost_monitoring" # Parameter 3: Checkpoint
- "{{ environment.catalog }}.{{ environment.schema }}.logistics_costs_silver" # Parameter 4: Output table
new_cluster: *prod_cluster_config # Use production cluster config by default
# Override cluster for dev/staging if needed
development:
new_cluster: *dev_cluster_config
staging:
new_cluster: *staging_cluster_config
max_retries: 3 # Retry failed tasks
timeout_seconds: 3600 # 1 hour timeout
# Add schedule for production environment if it's not a continuous stream
# schedule:
# quartz_cron_expression: "0 0 0 * * ?"
# timezone_id: "UTC"
job_clusters:
- job_cluster_key: base_cluster
new_cluster: *prod_cluster_config # Define a base cluster for the job
development:
new_cluster: *dev_cluster_config
staging:
new_cluster: *staging_cluster_config
# Batch job for historical tariff impact analysis
tariff_analysis_batch_job:
name: "{{ bundle.name }}-{{ environment.schema }}-tariff-analysis-batch"
tasks:
- task_key: analyze_tariffs
python_wheel_task:
python_file: ./src/jobs/tariff_analysis_batch.py
parameters:
- "{{ environment.catalog }}.{{ environment.schema }}.trade_data_gold" # Input table
- "{{ environment.catalog }}.{{ environment.schema }}.tariff_trends_gold" # Output table
# Optional: Pass analysis_date as a parameter, or let job default
new_cluster: *prod_cluster_config
development:
new_cluster: *dev_cluster_config
staging:
new_cluster: *staging_cluster_config
max_retries: 1
timeout_seconds: 1800
schedule: # Schedule this batch job
quartz_cron_expression: "0 0 2 * * ?" # Run daily at 2 AM UTC
timezone_id: "UTC"
email_notifications:
on_failure:
- "[email protected]"
# Notebook job for HS Code anomaly detection
anomaly_detection_notebook_job:
name: "{{ bundle.name }}-{{ environment.schema }}-anomaly-detection"
tasks:
- task_key: detect_anomalies
notebook_task:
notebook_path: ./src/notebooks/anomaly_detection.ipynb
source: WORKSPACE # Deploy notebook to workspace
base_parameters:
input_table: "{{ environment.catalog }}.{{ environment.schema }}.hs_code_classification_silver"
output_table: "{{ environment.catalog }}.{{ environment.schema }}.hs_code_anomalies_gold"
new_cluster: *prod_cluster_config
development:
new_cluster: *dev_cluster_config
staging:
new_cluster: *staging_cluster_config
max_retries: 1
timeout_seconds: 1800
schedule: # Schedule this notebook job
quartz_cron_expression: "0 0 4 * * ?" # Run daily at 4 AM UTC
timezone_id: "UTC"
Explanation of databricks.yml:
bundle.name: A unique identifier for your project bundle.environments: Defines different deployment targets.host: The Databricks workspace URL. We use{{ env.DATABRICKS_HOST_DEV }}etc. to pull this from environment variables, which is crucial for CI/CD.catalog/schema: Specifies the Unity Catalog catalog and schema to use for tables created by this environment. This ensures data isolation.base_path: A DBFS path for checkpoints, logs, and other artifacts.cluster_config: Reusable cluster configurations for jobs. We use YAML anchors (&dev_cluster_config) and aliases (*dev_cluster_config) for cleaner syntax.
resources: This section defines the actual Databricks objects to be deployed.dlt_pipelines: Defines Delta Live Tables pipelines.target: The Unity Catalog schema where DLT tables will be created.configuration: Key-value pairs passed as pipeline settings.libraries: Points to our DLT Python script.continuous:truefor real-time,falsefor triggered.development/productionoverrides: Allows specific settings for different environments (e.g.,continuouscan befalseindevelopmentto save costs, buttrueinproduction).
jobs: Defines Databricks Jobs.tasks: A list of tasks within a job.python_wheel_task: For deploying Python scripts.python_filepoints to our script,parametersare command-line arguments.notebook_task: For deploying and running notebooks.notebook_pathpoints to our notebook.new_cluster: Specifies the cluster configuration for the task, referencing our environment-defined clusters.schedule: Defines a cron schedule for batch jobs.email_notifications: Configures alerts for job status.
- Variable Substitution (
{{ ... }}): Databricks Asset Bundles support powerful variable substitution:{{ bundle.name }}: The name of the bundle.{{ environment.schema }}: The schema defined for the current deployment environment.{{ user.email }}: The email of the user running the bundle (useful for dev paths).{{ env.YOUR_ENV_VAR }}: Accesses environment variables set in your shell or CI/CD system. This is crucial for sensitive data like Kafka brokers or API keys, and for host URLs.
c) Testing This Component
Before setting up CI/CD, let’s validate and try a local deployment.
- Install Databricks CLI: If you haven’t already, ensure you have Databricks CLI v0.200 or higher.
pip install databricks-cli --upgrade - Configure Databricks CLI: Authenticate your local CLI with your Databricks workspace.
databricks configure --token # Databricks Host (e.g., https://adb-xxxxxxxxxxxxxxxx.xx.databricks.com): <YOUR_DEV_DATABRICKS_HOST> # Token: <YOUR_DATABRICKS_PAT>- Important: Generate a Personal Access Token (PAT) from your Databricks workspace settings (User Settings -> Developer -> Access Tokens). Store it securely.
- Set Environment Variables: For a local test, you need to export the environment variables referenced in
databricks.yml.export DATABRICKS_HOST_DEV="<YOUR_DEV_DATABRICKS_HOST>" export DATABRICKS_HOST_STAGING="<YOUR_STAGING_DATABRICKS_HOST>" # If you have one export DATABRICKS_HOST_PROD="<YOUR_PROD_DATABRICKS_HOST>" # If you have one export KAFKA_BOOTSTRAP_SERVERS="<YOUR_KAFKA_BROKERS_HOST:PORT>" # e.g., pkc-xxxx.us-east-1.aws.confluent.cloud:9092 - Validate the Bundle:This command checks your
databricks bundle validatedatabricks.ymlfor syntax errors and ensures all referenced files exist. It’s a critical step before deployment. - Deploy to Development Environment:This command will deploy all resources defined under the
databricks bundle deploy --target developmentdevelopmentenvironment in yourdatabricks.ymlto your configured Databricks workspace.- Expected Behavior: You should see output indicating resource creation/update (DLT pipelines, jobs). If successful, navigate to your Databricks workspace UI (Workflows -> Delta Live Tables and Workflows -> Jobs) and verify that the pipelines and jobs have been created. You can then manually start them to ensure they run correctly.
3.3. Setting up CI/CD with GitHub Actions
Now that our bundle is defined and deployable locally, let’s automate the deployment using GitHub Actions.
a) Setup/Configuration
Create the GitHub Actions workflow file: .github/workflows/deploy.yml.
b) Core Implementation
This workflow will trigger on push events to specific branches (main for staging, release for production) and use the databricks bundle deploy command.
File: .github/workflows/deploy.yml
# .github/workflows/deploy.yml
name: Databricks Bundle CI/CD
on:
push:
branches:
- main # Triggers for staging deployment
- release # Triggers for production deployment
workflow_dispatch: # Allows manual triggering of the workflow
env:
# Common environment variables for all jobs
DATABRICKS_CLI_VERSION: "0.210.0" # Use a stable and recent version of Databricks CLI
jobs:
validate-bundle:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.10' # Ensure a compatible Python version
- name: Install Databricks CLI
run: pip install databricks-cli==${{ env.DATABRICKS_CLI_VERSION }}
- name: Validate Databricks Bundle
# The 'validate' command does not require authentication
run: databricks bundle validate -t development # Validate against a specific target's config
deploy-staging:
needs: validate-bundle # Only run if validation passes
if: github.ref == 'refs/heads/main' # Only deploy to staging on push to 'main'
runs-on: ubuntu-latest
environment: staging # Specify GitHub environment for secrets management
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install Databricks CLI
run: pip install databricks-cli==${{ env.DATABRICKS_CLI_VERSION }}
- name: Configure Databricks CLI for Staging
run: |
databricks configure --host ${{ secrets.DATABRICKS_HOST_STAGING }} --token ${{ secrets.DATABRICKS_TOKEN_STAGING }}
# Also set environment variables for bundle processing
echo "DATABRICKS_HOST_STAGING=${{ secrets.DATABRICKS_HOST_STAGING }}" >> $GITHUB_ENV
echo "KAFKA_BOOTSTRAP_SERVERS=${{ secrets.KAFKA_BOOTSTRAP_SERVERS_STAGING }}" >> $GITHUB_ENV
env:
DATABRICKS_HOST_STAGING: ${{ secrets.DATABRICKS_HOST_STAGING }}
DATABRICKS_TOKEN_STAGING: ${{ secrets.DATABRICKS_TOKEN_STAGING }}
KAFKA_BOOTSTRAP_SERVERS_STAGING: ${{ secrets.KAFKA_BOOTSTRAP_SERVERS_STAGING }}
- name: Deploy Databricks Bundle to Staging
run: databricks bundle deploy --target staging
deploy-production:
needs: deploy-staging # Only run if staging deployment passes
if: github.ref == 'refs/heads/release' # Only deploy to production on push to 'release'
runs-on: ubuntu-latest
environment: production # Specify GitHub environment for secrets management
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install Databricks CLI
run: pip install databricks-cli==${{ env.DATABRICKS_CLI_VERSION }}
- name: Configure Databricks CLI for Production
run: |
databricks configure --host ${{ secrets.DATABRICKS_HOST_PROD }} --token ${{ secrets.DATABRICKS_TOKEN_PROD }}
# Also set environment variables for bundle processing
echo "DATABRICKS_HOST_PROD=${{ secrets.DATABRICKS_HOST_PROD }}" >> $GITHUB_ENV
echo "KAFKA_BOOTSTRAP_SERVERS=${{ secrets.KAFKA_BOOTSTRAP_SERVERS_PROD }}" >> $GITHUB_ENV
env:
DATABRICKS_HOST_PROD: ${{ secrets.DATABRICKS_HOST_PROD }}
DATABRICKS_TOKEN_PROD: ${{ secrets.DATABRICKS_TOKEN_PROD }}
KAFKA_BOOTSTRAP_SERVERS_PROD: ${{ secrets.KAFKA_BOOTSTRAP_SERVERS_PROD }}
- name: Deploy Databricks Bundle to Production
run: databricks bundle deploy --target production
c) Testing This Component
- GitHub Repository: Push all your code (
src/,databricks.yml,.github/workflows/deploy.yml) to a GitHub repository. - GitHub Secrets: In your GitHub repository settings, go to
Settings -> Security -> Secrets and variables -> Actions. Create repository secrets for:DATABRICKS_HOST_STAGING: URL of your staging Databricks workspace.DATABRICKS_TOKEN_STAGING: PAT for the staging workspace.KAFKA_BOOTSTRAP_SERVERS_STAGING: Kafka brokers for staging.DATABRICKS_HOST_PROD: URL of your production Databricks workspace.DATABRICKS_TOKEN_PROD: PAT for the production workspace.KAFKA_BOOTSTRAP_SERVERS_PROD: Kafka brokers for production.- Security Best Practice: Use separate PATs (or Service Principals) for each environment with the least privilege necessary.
- Trigger Workflow:
- Push a change to the
mainbranch. This should triggervalidate-bundleand thendeploy-staging. - Once
mainis stable, create areleasebranch frommainand push torelease. This should triggervalidate-bundleand thendeploy-production. - Alternatively, use
workflow_dispatchto manually trigger the workflow and select a branch.
- Push a change to the
- Monitor Workflow: Go to the “Actions” tab in your GitHub repository to monitor the workflow execution. Check logs for any errors.
- Verify Deployment: After successful deployment, check your Databricks Staging and Production workspaces. You should see the DLT pipelines and jobs created/updated under the “Workflows” section, corresponding to the names defined in
databricks.yml.
Production Considerations
Security:
- Databricks Authentication: For production CI/CD, always prefer Service Principals over Personal Access Tokens (PATs). Service Principals offer better security, auditability, and lifecycle management. Configure your CI/CD to use Service Principal credentials (client ID, client secret, tenant ID) stored as secrets.
- Least Privilege: Ensure the Service Principal or PAT used for deployment has only the necessary permissions (e.g.,
CAN_MANAGEon DLT pipelines and jobs,CAN_MANAGEon clusters if creating new ones,CAN_USEon Unity Catalog schemas). - Unity Catalog: Leverage Unity Catalog for fine-grained access control to your data. The bundle configuration uses
catalog.schemafor table targets, ensuring data isolation and governance. - Secrets Management: Never hardcode sensitive information (Kafka brokers, API keys, database credentials) in
databricks.ymlor your code. Use environment variables and CI/CD secrets (like GitHub Secrets) that are securely injected at runtime.
Performance Optimization:
- Cluster Sizing: Define appropriate cluster sizes (
node_type_id,num_workers,autoscale) indatabricks.ymlfor each environment. Production clusters should be scaled for performance and resilience, while development clusters can be smaller to save costs. - Photon Engine: Enable Photon in production environments for DLT and Spark jobs where applicable, as it significantly boosts query performance.
- DLT Continuous vs. Triggered: For real-time requirements, DLT pipelines should be
continuous: truein production. For less stringent latency,continuous: falsewith a frequent trigger can be more cost-effective. - Checkpoint Locations: Ensure checkpoint locations are on reliable, performant storage (e.g., cloud object storage, not local DBFS) and are unique per environment.
- Cluster Sizing: Define appropriate cluster sizes (
Error Handling & Resilience:
- DLT Error Handling: DLT pipelines inherently offer robust error handling with expectations (
expect_or_drop,expect_or_fail). Configurenotificationsindatabricks.ymlto alert on pipeline failures. - Job Retries: Configure
max_retriesfor Databricks Jobs indatabricks.ymlto handle transient failures gracefully. - Timeout: Set
timeout_secondsfor jobs to prevent them from running indefinitely due to issues. - DLT Channel: Use
CURRENTfor production DLT pipelines for stability, andPREVIEWin development/staging to test new features.
- DLT Error Handling: DLT pipelines inherently offer robust error handling with expectations (
Logging and Monitoring:
- Databricks Logging: All job and DLT pipeline runs generate logs within Databricks. Ensure your Python scripts use standard logging practices (
loggingmodule) to output relevant information. - Notifications: Configure email notifications in
databricks.ymlfor job failures or DLT pipeline updates. - Monitoring Tools: Integrate Databricks logs and metrics with external monitoring solutions (e.g., Datadog, Prometheus, Splunk) for centralized observability. (This will be covered in the next chapter).
- Databricks Logging: All job and DLT pipeline runs generate logs within Databricks. Ensure your Python scripts use standard logging practices (
Deployment Strategies:
- Blue-Green/Canary Deployments: While
databricks bundle deployperforms an in-place update, you can implement more advanced strategies by defining multiple DLT pipelines or jobs within your bundle (e.g.,supply-chain-ingestion-v1,supply-chain-ingestion-v2) and gradually shifting traffic, or deploying to a separate “blue” workspace and switching DNS. Bundles provide the IaC foundation for such strategies. - Rollback: Version control (Git) provides the primary rollback mechanism. If a deployment fails or introduces issues, you can revert to a previous commit and re-deploy.
- Blue-Green/Canary Deployments: While
Code Review Checkpoint
At this point, we have significantly enhanced our project’s operational readiness.
Summary of what was built:
- Databricks Asset Bundle (
databricks.yml): A declarative definition of our DLT pipelines, Spark Structured Streaming jobs, batch jobs, and notebook jobs, along with their environment-specific configurations (cluster sizes, target schemas, parameters). - Structured Codebase: Our Python scripts and notebooks are organized within the
src/directory, ready for bundle deployment. - GitHub Actions Workflow: An automated CI/CD pipeline that validates our bundle and deploys it to staging (on
mainbranch push) and production (onreleasebranch push). - Environment-Specific Configuration: Robust handling of different configurations for development, staging, and production using variable substitution and GitHub Secrets.
Files created/modified:
databricks.yml(New or heavily modified)src/dlt_pipelines/supply_chain_ingestion.py(Existing, potentially refined for bundle parameters)src/jobs/logistics_cost_monitoring.py(Existing, potentially refined for bundle parameters)src/jobs/tariff_analysis_batch.py(Existing, potentially refined for bundle parameters)src/notebooks/anomaly_detection.ipynb(Existing, potentially refined for bundle parameters).github/workflows/deploy.yml(New)
How it integrates with existing code:
The Databricks Asset Bundle acts as the orchestration layer for our existing DLT and Spark code. It picks up the Python scripts and notebooks from the src directory, packages them, and deploys them as defined in databricks.yml. The parameters defined in the bundle (e.g., kafka_bootstrap_servers, checkpoint_location, output_table_name) are passed directly to our Python scripts, making them configurable per environment without code changes.
Common Issues & Solutions
Bundle Validation Errors (
databricks bundle validate):- Issue:
databricks bundle validatefails with YAML syntax errors orfile not founderrors. - Solution: Carefully check your
databricks.ymlfor correct YAML indentation and syntax. Ensure all file paths (e.g.,./src/dlt_pipelines/supply_chain_ingestion.py) are correct relative to thedatabricks.ymlfile. - Prevention: Use a good YAML linter in your IDE. Always run
databricks bundle validatelocally before committing.
- Issue:
Permissions Issues During Deployment:
- Issue: The CI/CD pipeline fails with
PERMISSION_DENIEDerrors when deploying to Databricks. - Solution: The Databricks PAT or Service Principal used by your CI/CD system lacks the necessary permissions.
- Verify the PAT/Service Principal has
CAN_MANAGEpermissions on the target workspace’s DLT pipelines, Jobs, and potentiallyCAN_MANAGEon the clusters or instance pools if the bundle creates new ones. - For Unity Catalog, ensure the identity has
USE CATALOGandCREATE SCHEMA(orUSE SCHEMA) permissions on the target catalog/schema.
- Verify the PAT/Service Principal has
- Debugging: Check the detailed error message in the CI/CD logs. It usually indicates which specific permission is missing.
- Issue: The CI/CD pipeline fails with
Environment Variable/Secret Configuration Problems:
- Issue:
databricks bundle deployfails because an environment variable (e.g.,KAFKA_BOOTSTRAP_SERVERS) is not found, or its value is incorrect. - Solution:
- Local: Ensure you have
exported all required environment variables in your shell before runningdatabricks bundle deploy. - CI/CD: Double-check that all GitHub Secrets are correctly named, have the correct values, and are exposed to the workflow job using the
env:block in the GitHub Actions YAML. Remember thatsecretsare case-sensitive.
- Local: Ensure you have
- Prevention: Use a consistent naming convention for secrets. Test local deployment with dummy environment variables first to ensure the bundle picks them up correctly.
- Issue:
Testing & Verification
To thoroughly test and verify the CI/CD setup:
- Trigger a Full Deployment:
- Make a minor, innocuous change to one of your DLT Python files (e.g., add a comment).
- Commit the change and push it to your
mainbranch. - Observe the GitHub Actions workflow. It should trigger, run
validate-bundle, and thendeploy-staging. - Once
deploy-stagingis successful, create areleasebranch frommainand push it. This should triggerdeploy-production.
- Verify Databricks Resources:
- Log in to your Databricks Staging workspace. Navigate to “Workflows” -> “Delta Live Tables” and “Workflows” -> “Jobs”.
- Confirm that
supply-chain-analytics-bundle-staging-supply-chain-ingestionDLT pipeline is present. - Confirm that
supply-chain-analytics-bundle-staging-logistics-cost-monitoringand other jobs are present. - Repeat this verification for your Databricks Production workspace after the production deployment.
- Run and Monitor Pipelines/Jobs:
- DLT Pipeline: Manually start the DLT pipeline in the staging workspace. Check the pipeline graph and event log for successful execution and data processing. Verify that data lands in the
supply_chain_staging_catalog.stagingschema. - Streaming Job: Manually run the
logistics-cost-monitoringjob in staging. If you have a Kafka producer configured for staging, send some test messages and observe if the job consumes them and writes to thelogistics_costs_silvertable. - Batch Jobs: Manually run the
tariff-analysis-batchandanomaly-detectionjobs. Verify their output tables. - Automated Runs: Ensure that scheduled batch jobs (tariff analysis, anomaly detection) are correctly scheduled in Databricks.
- DLT Pipeline: Manually start the DLT pipeline in the staging workspace. Check the pipeline graph and event log for successful execution and data processing. Verify that data lands in the
- Check Logs: Review the logs of the running pipelines/jobs in Databricks for any errors or warnings.
- Data Validation: Perform spot checks on the data generated in the target Delta tables in both staging and production to ensure correctness and completeness.
Summary & Next Steps
Congratulations! You have successfully implemented a robust CI/CD pipeline for your Databricks data assets using Databricks Asset Bundles and GitHub Actions. This chapter has transformed our individual data pipelines into a deployable, version-controlled, and automated system, a critical step towards production readiness. We covered:
- The importance of CI/CD for data engineering projects.
- How to structure your Databricks project for bundle deployment.
- Defining DLT pipelines, Spark Jobs, and Notebook Jobs in
databricks.yml. - Parameterizing configurations for different environments using variable substitution.
- Automating deployment with GitHub Actions, including secret management.
- Key production considerations for security, performance, error handling, and deployment strategies.
With CI/CD in place, you can now confidently iterate on your data pipelines, knowing that every change will be validated and deployed consistently. This automation significantly reduces manual errors, accelerates development cycles, and ensures the reliability of our real-time supply chain analytics solution.
In the next and final chapter, we will focus on Monitoring, Alerting, and Observability for Real-time Data Pipelines. We’ll integrate Databricks with monitoring tools, set up custom alerts for pipeline health and data quality, and establish dashboards to provide real-time insights into our supply chain operations.