Chapter Introduction
In this chapter, we embark on the crucial first step of our real-time supply chain analytics journey: ingesting raw supply chain events into our data lakehouse. We will leverage Databricks Delta Live Tables (DLT) to build a robust, fault-tolerant, and scalable pipeline that continuously reads event data from Apache Kafka and lands it into a “Bronze” Delta table. The Bronze layer serves as the raw, immutable historical record of all ingested data, preserving the original state of events as they arrive.
This step is foundational for several reasons. Firstly, it establishes a reliable ingestion mechanism, ensuring no event is lost and providing a single source of truth for all subsequent transformations. Secondly, by using DLT, we automatically inherit best practices for data quality, schema enforcement, and operational simplicity, allowing us to focus on the data rather than infrastructure. Finally, the Bronze layer forms the bedrock of our Medallion architecture, enabling full data lineage and replayability, which is vital for auditing, debugging, and backfilling historical data.
Before proceeding, ensure you have completed the initial Databricks workspace setup, Unity Catalog configuration, and Kafka cluster setup as outlined in Chapters 1 and 2. By the end of this chapter, you will have a fully functional DLT pipeline that continuously ingests raw supply chain events into a governed Delta table, ready for further processing.
Planning & Design
Component Architecture for this Feature
Our architecture for the Bronze layer ingestion will be straightforward yet powerful:
- Apache Kafka: Acts as the primary event bus, receiving real-time supply chain events from various upstream systems (e.g., IoT sensors, logistics platforms, ERP systems).
- Databricks Delta Live Tables (DLT) Pipeline: A declarative pipeline defined in Python that connects to Kafka.
- Spark Structured Streaming: Underpins DLT, providing the continuous processing engine to read from Kafka.
- Unity Catalog: Provides centralized governance for our Delta tables, managing access control, schema, and metadata.
- Bronze Delta Table: The target table in our data lakehouse, storing raw, untransformed Kafka messages with added ingestion metadata. This table will reside within a Unity Catalog managed schema.
#### Database Schema (Bronze Layer)
The Bronze layer's primary goal is to store raw data as-is. We will capture the entire Kafka message, including its key, value, timestamp, and metadata. The `value` field, containing the actual supply chain event payload, will be stored as a `STRING` (assuming JSON payload for simplicity).
**Target Table:** `supply_chain_bronze.raw_events`
| Field Name | Data Type | Description |
| :------------------ | :-------- | :----------------------------------------------------------------------- |
| `key` | `BINARY` | The raw Kafka message key (often used for partitioning/ordering). |
| `value` | `STRING` | The raw Kafka message value, typically a JSON string representing the event. |
| `topic` | `STRING` | The Kafka topic from which the record was read. |
| `partition` | `INT` | The Kafka partition from which the record was read. |
| `offset` | `LONG` | The offset of the record within the Kafka partition. |
| `timestamp` | `TIMESTAMP` | The timestamp of the record in Kafka (event time). |
| `ingestion_timestamp` | `TIMESTAMP` | The timestamp when the record was ingested into the Bronze table (processing time). |
#### File Structure
For this chapter, we will create a single DLT Python notebook. In a real-world scenario, you might split DLT definitions across multiple notebooks for larger pipelines, but for the Bronze layer, a single file is usually sufficient.
databricks/ ├── dlt_pipelines/ │ └── supply_chain_bronze_ingestion.py ├── notebooks/ │ └── kafka_event_producer.py (for testing/simulation) └── config/ └── pipeline_config.py (for shared configurations - optional for this chapter)
### Step-by-Step Implementation
#### a) Setup/Configuration
Before we write our DLT code, let's ensure our Databricks environment is properly set up and we have a secure way to store Kafka credentials.
1. **Create a Unity Catalog Schema (if not already done):**
We will store our Bronze tables in a dedicated Unity Catalog schema. Replace `<your-catalog-name>` with the name of your Unity Catalog.
Open a new Databricks notebook (any language, then run `spark.sql` commands) or use the SQL editor.
```sql
-- SQL Notebook/Editor
CREATE SCHEMA IF NOT EXISTS <your-catalog-name>.supply_chain_bronze
COMMENT 'Schema for raw, untransformed supply chain event data';
-- Grant necessary permissions to your user or service principal
GRANT CREATE TABLE ON SCHEMA <your-catalog-name>.supply_chain_bronze TO `users`;
GRANT USE SCHEMA ON SCHEMA <your-catalog-name>.supply_chain_bronze TO `users`;
GRANT USE CATALOG ON CATALOG <your-catalog-name> TO `users`;
```
**Explanation:**
- `CREATE SCHEMA IF NOT EXISTS`: Ensures our `supply_chain_bronze` schema exists under your specified Unity Catalog.
- `GRANT ... TO `users``: Assigns necessary permissions. In a production environment, you would grant these to specific service principals or groups, not `users` directly.
2. **Set up Kafka Credentials in Databricks Secrets:**
Storing sensitive information like Kafka connection strings and authentication details directly in code is a major security vulnerability. Databricks Secrets provide a secure way to manage these.
First, create a secret scope (if you don't have one):
```bash
# In Databricks CLI or terminal with Databricks CLI configured
databricks secrets create-scope --scope supply_chain_scope
```
Next, add your Kafka connection details as secrets within this scope. Replace `<your-kafka-bootstrap-servers>` and `<your-kafka-sasl-username/password>` with your actual Kafka cluster details. For a managed Kafka service (like Confluent Cloud, Azure Event Hubs, AWS MSK), these will be provided by your cloud provider. We'll assume SASL/SCRAM-SHA-512 authentication over SSL, which is common.
```bash
# Add Kafka bootstrap servers
databricks secrets put --scope supply_chain_scope --key kafka_bootstrap_servers
# Enter your Kafka bootstrap servers (e.g., "broker1:9092,broker2:9092") when prompted
# Add Kafka SASL username
databricks secrets put --scope supply_chain_scope --key kafka_sasl_username
# Enter your Kafka SASL username when prompted
# Add Kafka SASL password
databricks secrets put --scope supply_chain_scope --key kafka_sasl_password
# Enter your Kafka SASL password when prompted
# Add Kafka topic name for raw events
databricks secrets put --scope supply_chain_scope --key kafka_raw_events_topic
# Enter your Kafka topic name (e.g., "supply_chain_events_raw") when prompted
```
**Explanation:**
- `databricks secrets create-scope`: Creates a logical grouping for secrets.
- `databricks secrets put`: Stores a secret value under a given key within the scope. The CLI will prompt you to enter the value, preventing it from being stored in your shell history.
3. **Create DLT Python Notebook:**
Navigate to your Databricks workspace, create a new notebook, name it `supply_chain_bronze_ingestion`, set the default language to Python, and save it under `databricks/dlt_pipelines/`.
#### b) Core Implementation
Now, let's implement the DLT pipeline code in the `supply_chain_bronze_ingestion.py` notebook.
**File:** `databricks/dlt_pipelines/supply_chain_bronze_ingestion.py`
```python
# databricks/dlt_pipelines/supply_chain_bronze_ingestion.py
import dlt
from pyspark.sql.functions import col, current_timestamp, from_json, schema_of_json
from pyspark.sql.types import StructType, StringType, TimestampType, BinaryType
# --- Configuration Parameters ---
# These parameters will be used to configure our DLT pipeline.
# For production, consider using Databricks Secrets for sensitive values
# and DLT pipeline settings for general parameters.
# Define Kafka connection details using Databricks Secrets for security
KAFKA_BOOTSTRAP_SERVERS = dbutils.secrets.get(scope="supply_chain_scope", key="kafka_bootstrap_servers")
KAFKA_RAW_EVENTS_TOPIC = dbutils.secrets.get(scope="supply_chain_scope", key="kafka_raw_events_topic")
KAFKA_SASL_USERNAME = dbutils.secrets.get(scope="supply_chain_scope", key="kafka_sasl_username")
KAFKA_SASL_PASSWORD = dbutils.secrets.get(scope="supply_chain_scope", key="kafka_sasl_password")
# Unity Catalog configuration
UNITY_CATALOG_NAME = "<your-catalog-name>" # Replace with your Unity Catalog name
BRONZE_SCHEMA_NAME = "supply_chain_bronze"
BRONZE_TABLE_NAME = f"{UNITY_CATALOG_NAME}.{BRONZE_SCHEMA_NAME}.raw_events"
# --- DLT Table Definition: Raw Supply Chain Events (Bronze Layer) ---
@dlt.table(
name="raw_events",
comment="Raw, immutable supply chain events ingested from Kafka.",
table_properties={
"quality": "bronze",
"pipeline.type": "streaming",
"delta.logRetentionDuration": "30 days", # Keep history for 30 days
"delta.deletedFileRetentionDuration": "7 days" # Keep deleted files for 7 days
},
# Ensure this table is created under Unity Catalog
path=f"/{BRONZE_SCHEMA_NAME}/raw_events" # DLT will manage the path within Unity Catalog's managed storage
)
@dlt.expect_or_drop("valid_kafka_value", "value IS NOT NULL")
@dlt.expect_or_drop("valid_kafka_timestamp", "timestamp IS NOT NULL")
def raw_supply_chain_events():
"""
Ingests raw supply chain events from Kafka into the Bronze Delta table.
This function reads directly from the Kafka topic, adds an ingestion timestamp,
and stores the raw event payload along with Kafka metadata.
"""
# Kafka source options for Structured Streaming
kafka_options = {
"kafka.bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"subscribe": KAFKA_RAW_EVENTS_TOPIC,
"startingOffsets": "earliest", # Start reading from the earliest available offset
"maxOffsetsPerTrigger": 100000, # Limit the number of offsets processed per trigger for better control
"failOnDataLoss": "false", # Do not fail if data loss occurs (e.g., topic deletion)
# SSL/SASL authentication configuration for Kafka
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "SCRAM-SHA-512",
"kafka.sasl.jaas.config": (
f'org.apache.kafka.common.security.scram.ScramLoginModule required '
f'username="{KAFKA_SASL_USERNAME}" '
f'password="{KAFKA_SASL_PASSWORD}";'
),
"kafka.ssl.endpoint.identification.algorithm": "https" # Recommended for cloud Kafka services
}
# Read the stream from Kafka
raw_kafka_stream = (
spark.readStream
.format("kafka")
.options(**kafka_options)
.load()
)
# Transform the raw Kafka data:
# - Cast key and value to String for easier inspection (value is usually JSON)
# - Add an ingestion timestamp for tracking processing time
processed_stream = raw_kafka_stream.select(
col("key").cast(BinaryType()).alias("key"), # Keep as Binary for exact raw representation
col("value").cast(StringType()).alias("value"), # Assuming event payload is UTF-8 encoded JSON
col("topic").alias("topic"),
col("partition").alias("partition"),
col("offset").alias("offset"),
col("timestamp").alias("timestamp"), # Kafka event timestamp
current_timestamp().alias("ingestion_timestamp") # DLT ingestion timestamp
)
# Log the schema of the incoming stream for debugging
# Note: In DLT, direct print/logger to driver logs is typical.
# For more robust logging, consider custom DLT logging hooks or structured logging.
# print(f"Schema of raw_kafka_stream: {raw_kafka_stream.schema}")
# print(f"Schema of processed_stream: {processed_stream.schema}")
print(f"DLT Pipeline '{dlt.pipeline().name}' is ingesting from Kafka topic '{KAFKA_RAW_EVENTS_TOPIC}' into table '{BRONZE_TABLE_NAME}'.")
return processed_stream
Explanation of Code Blocks:
Imports:
dlt: The core Delta Live Tables library.pyspark.sql.functions: Provides Spark SQL functions likecol,current_timestamp,from_json.pyspark.sql.types: Used for defining schema types.
Configuration Parameters:
KAFKA_BOOTSTRAP_SERVERS,KAFKA_RAW_EVENTS_TOPIC,KAFKA_SASL_USERNAME,KAFKA_SASL_PASSWORD: These are retrieved securely from Databricks Secrets usingdbutils.secrets.get(). This is a critical best practice for production environments.UNITY_CATALOG_NAME,BRONZE_SCHEMA_NAME: Define the target Unity Catalog and schema. Remember to replace<your-catalog-name>with your actual Unity Catalog name.BRONZE_TABLE_NAME: Constructs the full qualified table name.
@dlt.tableDecorator:name="raw_events": Defines the logical name of our DLT table, which will map tosupply_chain_bronze.raw_eventsin Unity Catalog.comment: Provides a description for the table, useful for documentation and discoverability in Unity Catalog.table_properties: Sets Delta table properties."quality": "bronze": A custom property to denote the data quality layer."pipeline.type": "streaming": Indicates this is a streaming table.delta.logRetentionDuration,delta.deletedFileRetentionDuration: Important for managing Delta Lake history and ensuring data retention policies are met.
path: For Unity Catalog managed tables, DLT automatically handles the storage location. Specifying apathhere (likef"/{BRONZE_SCHEMA_NAME}/raw_events") might be used by DLT to derive the storage location within Unity Catalog’s managed storage, or it might be implicitly handled if thetargetin the DLT pipeline settings points to a Unity Catalog schema. The key is that DLT will manage the underlying files.@dlt.expect_or_drop: These are DLT data quality constraints."valid_kafka_value", "value IS NOT NULL": Ensures that the raw Kafka message value is not null. If it is, the record is dropped. This is a basic but essential check."valid_kafka_timestamp", "timestamp IS NOT NULL": Ensures Kafka’s event timestamp is present._or_dropstrategy means records failing these expectations will be dropped from the target table but logged in DLT’s metrics. Other strategies include_or_fail(stops the pipeline) or_or_expect(flags the record but includes it). For Bronze, dropping malformed records is often acceptable.
raw_supply_chain_events()Function:- This Python function contains the Spark Structured Streaming logic.
kafka_options: A dictionary holding all necessary Kafka connection parameters, includingbootstrap.servers,subscribetopic,startingOffsets(set toearliestfor initial loads,latestfor production after initial sync),maxOffsetsPerTrigger(controls micro-batch size), and crucial SASL/SSL authentication details.spark.readStream.format("kafka").options(**kafka_options).load(): This is the core Spark Structured Streaming command to connect to Kafka and start reading data.processed_stream = raw_kafka_stream.select(...): Selects and transforms the incoming Kafka fields.col("key").cast(BinaryType()): The Kafka key is often binary; keeping it as such preserves its raw form.col("value").cast(StringType()): The Kafka value (payload) is usually binary/bytes. We cast it toStringType()assuming it’s UTF-8 encoded text, typically JSON.current_timestamp().alias("ingestion_timestamp"): Adds a processing-time timestamp, crucial for understanding when data entered our lakehouse.
print(...): Simple logging to indicate pipeline activity. In a production DLT pipeline, DLT’s built-in logging and metrics are more comprehensive.
c) Testing This Component (Local Simulation)
To test the DLT pipeline, we need to simulate real-time supply chain events being published to our Kafka topic.
Create a Kafka Event Producer Notebook: Create a new Python notebook
databricks/notebooks/kafka_event_producer.py. This notebook will simulate upstream systems publishing events.File:
databricks/notebooks/kafka_event_producer.py# databricks/notebooks/kafka_event_producer.py from kafka import KafkaProducer import json import time import uuid import random from datetime import datetime # --- Configuration Parameters --- KAFKA_BOOTSTRAP_SERVERS = dbutils.secrets.get(scope="supply_chain_scope", key="kafka_bootstrap_servers") KAFKA_RAW_EVENTS_TOPIC = dbutils.secrets.get(scope="supply_chain_scope", key="kafka_raw_events_topic") KAFKA_SASL_USERNAME = dbutils.secrets.get(scope="supply_chain_scope", key="kafka_sasl_username") KAFKA_SASL_PASSWORD = dbutils.secrets.get(scope="supply_chain_scope", key="kafka_sasl_password") # --- Kafka Producer Setup --- producer = KafkaProducer( bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, security_protocol="SASL_SSL", sasl_mechanism="SCRAM-SHA-512", sasl_plain_username=KAFKA_SASL_USERNAME, sasl_plain_password=KAFKA_SASL_PASSWORD, value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: str(k).encode('utf-8') ) print(f"Kafka Producer initialized for topic: {KAFKA_RAW_EVENTS_TOPIC}") # --- Simulate Supply Chain Events --- def generate_supply_chain_event(): event_types = ["ORDER_CREATED", "SHIPMENT_UPDATE", "INVENTORY_CHANGE", "QUALITY_CHECK", "DELIVERY_CONFIRMED"] item_ids = ["ITEM-001", "ITEM-002", "ITEM-003", "ITEM-004", "ITEM-005"] locations = ["WAREHOUSE_A", "DISTRIBUTION_CENTER_B", "RETAIL_STORE_C", "PORT_D", "FACTORY_E"] statuses = ["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED", "IN_TRANSIT"] event_type = random.choice(event_types) event_id = str(uuid.uuid4()) timestamp = datetime.utcnow().isoformat() + "Z" # ISO 8601 format payload = { "event_id": event_id, "event_type": event_type, "timestamp": timestamp, "correlation_id": str(uuid.uuid4()), "details": { "item_id": random.choice(item_ids), "quantity": random.randint(1, 100), "location": random.choice(locations), "status": random.choice(statuses), "temperature_celsius": round(random.uniform(-5, 30), 2) if event_type == "SHIPMENT_UPDATE" else None, "notes": f"Event {event_type} for {random.choice(item_ids)}" } } return event_id, payload # Send events continuously num_events_to_send = 100 # Send 100 events for initial test for i in range(num_events_to_send): key, event = generate_supply_chain_event() try: future = producer.send(KAFKA_RAW_EVENTS_TOPIC, key=key, value=event) record_metadata = future.get(timeout=10) # Block until send is complete or timeout print(f"Sent event {key} to topic {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}") except Exception as e: print(f"Error sending event: {e}") time.sleep(random.uniform(0.1, 0.5)) # Simulate real-time delay producer.flush() # Ensure all messages are sent print(f"Finished sending {num_events_to_send} events.")Explanation:
- This notebook uses the
kafka-pythonlibrary. You might need to install this library on your Databricks cluster. You can do this by navigating to your cluster configuration -> Libraries -> Install New -> PyPI and enteringkafka-python. - It uses the same secret scope to retrieve Kafka credentials.
KafkaProducer: Configured to serialize values as JSON and keys as strings.generate_supply_chain_event(): Creates a sample JSON payload for various supply chain event types.- The loop sends a specified number of events to the Kafka topic with a small delay.
- This notebook uses the
Run the Kafka Event Producer: Attach the
kafka_event_producer.pynotebook to a Databricks cluster (an interactive cluster is fine for this). Run the notebook. You should see messages indicating events being sent to Kafka.Create and Run the DLT Pipeline: Now, let’s create and run our DLT pipeline: a. Navigate to the “Workflows” -> “Delta Live Tables” section in your Databricks workspace. b. Click “Create Pipeline”. c. Pipeline Name:
supply-chain-ingestion-pipelined. Product Edition:Advanced(to enable advanced features like expectations) e. Pipeline Libraries: Select “Notebooks” and browse todatabricks/dlt_pipelines/supply_chain_bronze_ingestion.py. f. Target Schema: Enter your Unity Catalog name and Bronze schema:<your-catalog-name>.supply_chain_bronzeg. Storage Location: For Unity Catalog, you might leave this blank or specify a path within Unity Catalog’s managed storage. DLT will manage the underlying storage. h. Cluster Mode:Enhanced Autoscaling(recommended for production) i. Photon:Enabled(for performance) j. Enable Serverless: Check this box (highly recommended for DLT in 2025 for simplified operations and cost efficiency). If Serverless is not available in your region, proceed without it. k. Scheduling:Triggeredfor initial testing. For continuous ingestion, you would set it toContinuous. l. Permissions: Ensure the service principal or user running the pipeline hasUSE CATALOGandCREATE TABLEpermissions on the target Unity Catalog and schema. m. Click “Create”. n. After creation, click “Start” to run the pipeline.Expected Behavior:
- The DLT pipeline will start, deploy its resources, and then begin reading messages from the
supply_chain_events_rawKafka topic. - You should see the pipeline graph in the DLT UI, showing the
raw_eventstable being updated. - Check the pipeline logs for any errors or warnings.
- After a few minutes (depending on Kafka event rate), query the
supply_chain_bronze.raw_eventstable in a SQL notebook or editor. You should see the ingested events.
-- SQL Notebook/Editor SELECT * FROM <your-catalog-name>.supply_chain_bronze.raw_events LIMIT 10; SELECT COUNT(*) FROM <your-catalog-name>.supply_chain_bronze.raw_events;Verify that the counts increase as you send more events from your producer notebook.
- The DLT pipeline will start, deploy its resources, and then begin reading messages from the
Production Considerations
Building a production-ready Bronze layer requires careful thought beyond just the code.
Error Handling & Data Quality:
- DLT Expectations: We used
expect_or_dropfor basic null checks. For production, consider more sophisticated expectations:expect_or_fail: For critical data quality issues that should halt the pipeline (e.g., malformed JSON that cannot be parsed).expect_or_quarantine: For records that fail expectations but shouldn’t halt the pipeline. These records can be written to a separate “quarantine” or “dead letter” Delta table for manual inspection and reprocessing.
- Schema Evolution: Delta Lake inherently supports schema evolution. For the Bronze layer, where data is raw, we typically allow schema inference or define a very loose schema (e.g.,
valueasSTRING). More strict schema enforcement will be applied in the Silver layer. - Kafka Data Loss:
failOnDataLoss: "false"is set, but understand its implications. For critical data, ensure Kafka retention and replication are robust.
- DLT Expectations: We used
Performance Optimization:
- DLT Serverless: Enabling Serverless DLT is the primary optimization, as Databricks automatically manages compute resources, scaling up and down as needed.
maxOffsetsPerTrigger: This Kafka option controls the maximum number of records processed in each micro-batch. Tune this value based on your event volume and desired latency. A larger value can improve throughput but increase latency for individual records.- DLT Auto-Optimization: DLT automatically applies
OPTIMIZEandVACUUMoperations to Delta tables, improving query performance and storage efficiency. - Photon Engine: Ensure Photon is enabled on your DLT pipeline for accelerated query performance.
Security Considerations:
- Unity Catalog (UC): All tables are governed by UC. Ensure least-privilege access:
- DLT pipeline’s service principal/user requires
USE CATALOG,USE SCHEMA,CREATE TABLE,SELECTon the Bronze schema. - Downstream Silver pipelines require
SELECTon the Bronze table. - Analysts require
SELECTon appropriate views/tables, not necessarily raw Bronze.
- DLT pipeline’s service principal/user requires
- Databricks Secrets: Critical for Kafka credentials. Never hardcode them.
- Network Isolation: For highly sensitive data, configure Databricks workspace with VNet injection or Private Link to ensure all traffic (including to Kafka) stays within your private network.
- Kafka Security: Ensure your Kafka cluster is secured with SSL/TLS encryption and SASL authentication (as implemented).
- Unity Catalog (UC): All tables are governed by UC. Ensure least-privilege access:
Logging and Monitoring:
- DLT UI: The DLT UI provides detailed logs, event metrics, and health status of your pipeline.
- Databricks Monitoring: Integrate DLT logs with Databricks monitoring tools (e.g., Ganglia for cluster metrics, Spark UI for job details).
- External Monitoring: Push DLT metrics and logs to your organization’s centralized monitoring solutions (e.g., Prometheus, Grafana, Splunk, Datadog) for comprehensive observability and alerting.
- DLT Event Log: DLT writes a detailed event log to the pipeline’s storage location, which can be queried for advanced auditing and custom monitoring.
Code Review Checkpoint
At this stage, you have successfully set up the foundational layer of our real-time supply chain analytics platform.
Summary of what was built:
- A secure method to store and retrieve Kafka connection details using Databricks Secrets.
- A DLT Python notebook (
supply_chain_bronze_ingestion.py) that defines a streaming pipeline. - This pipeline reads raw supply chain events from a Kafka topic.
- It performs minimal transformations (casting types, adding ingestion timestamp).
- It applies basic data quality checks using DLT expectations (
expect_or_drop). - It writes the raw events to a Unity Catalog managed Delta table (
<your-catalog-name>.supply_chain_bronze.raw_events). - A helper notebook (
kafka_event_producer.py) to simulate incoming Kafka events for testing.
Files created/modified:
databricks/dlt_pipelines/supply_chain_bronze_ingestion.pydatabricks/notebooks/kafka_event_producer.py- Databricks Secrets for Kafka credentials.
- A DLT Pipeline configured in the Databricks UI.
How it integrates with existing code: This chapter builds upon the foundational Databricks and Unity Catalog setup from previous chapters. It establishes the first layer of our Medallion architecture, providing the raw data source for all subsequent processing.
Common Issues & Solutions
Issue: Kafka Connectivity Errors (
connection refused,authentication failed)- Symptom: DLT pipeline fails to start, logs show Kafka connection errors.
- Debugging:
- Verify Bootstrap Servers: Double-check
KAFKA_BOOTSTRAP_SERVERSin your secrets. Ensure they are reachable from your Databricks cluster’s network (e.g., public IP, VNet peering, firewall rules). - Authentication: Confirm
KAFKA_SASL_USERNAMEandKAFKA_SASL_PASSWORDsecrets are correct and match your Kafka cluster’s SASL/SCRAM configuration. - Security Protocol: Ensure
kafka.security.protocolandkafka.sasl.mechanismmatch your Kafka broker’s configuration. - Network Access: If using a private Kafka cluster, ensure your Databricks workspace (especially with VNet injection) has network routes to Kafka.
- Verify Bootstrap Servers: Double-check
- Prevention: Test Kafka connectivity independently using a simple Spark Structured Streaming notebook before deploying DLT.
Issue: DLT Pipeline Fails Due to Unity Catalog Permissions
- Symptom: Pipeline fails with
PERMISSION_DENIEDerrors when trying to create or write to the target table. - Debugging:
- Check the permissions granted to the service principal or user running the DLT pipeline on your Unity Catalog, the target catalog, and the
supply_chain_bronzeschema. - Ensure
USE CATALOG,USE SCHEMA, andCREATE TABLEpermissions are assigned.
- Check the permissions granted to the service principal or user running the DLT pipeline on your Unity Catalog, the target catalog, and the
- Prevention: Always follow the principle of least privilege and verify permissions in a staging environment before production deployment.
- Symptom: Pipeline fails with
Issue: No Data Appearing in Bronze Table
- Symptom: DLT pipeline runs successfully, but
SELECT COUNT(*)on the Bronze table returns 0, even after sending events to Kafka. - Debugging:
- Kafka Producer: Verify your
kafka_event_producer.pyis successfully sending messages to the correct Kafka topic. Check its logs. - Kafka Topic: Ensure the
KAFKA_RAW_EVENTS_TOPICsecret matches the topic your producer is writing to and your DLT pipeline is subscribing to. startingOffsets: If you’ve been testing,startingOffsets: "latest"might cause you to miss old messages. Change toearliestfor initial testing or if you suspect you missed data.- DLT Logs: Check the DLT pipeline logs carefully for any warnings or errors related to reading from Kafka or applying expectations. Records dropped by
expect_or_dropwon’t appear in the table.
- Kafka Producer: Verify your
- Prevention: Use end-to-end testing with a controlled set of test data. Monitor Kafka consumer lag to ensure DLT is actively processing messages.
- Symptom: DLT pipeline runs successfully, but
Testing & Verification
To ensure everything is working correctly for Chapter 3:
Start the Kafka Event Producer: Run the
databricks/notebooks/kafka_event_producer.pynotebook on an interactive cluster. Let it run for at least 5-10 minutes to generate a steady stream of events.Start the DLT Pipeline: In the Databricks DLT UI, ensure your
supply-chain-ingestion-pipelineis running inContinuousmode (orTriggeredand manually triggered multiple times).Monitor DLT Progress: Observe the DLT UI. The
raw_eventstable should show increasing record counts and healthy processing. Check the “Event log” for any warnings or errors.Query the Bronze Table: Open a new SQL notebook or editor and execute the following queries, replacing
<your-catalog-name>:SELECT * FROM <your-catalog-name>.supply_chain_bronze.raw_events ORDER BY ingestion_timestamp DESC LIMIT 10; SELECT COUNT(*) AS total_raw_events FROM <your-catalog-name>.supply_chain_bronze.raw_events; SELECT topic, COUNT(*) AS events_per_topic, MIN(timestamp) AS min_event_time, MAX(timestamp) AS max_event_time FROM <your-catalog-name>.supply_chain_bronze.raw_events GROUP BY topic;- The first query should show recent raw events, including their Kafka metadata and the
ingestion_timestamp. - The second query’s count should continuously increase as the producer sends more events and DLT processes them.
- The third query confirms events are coming from the expected Kafka topic and shows the time range of ingested events.
- The first query should show recent raw events, including their Kafka metadata and the
Stop Producer and Pipeline: Once verified, stop the Kafka producer notebook and the DLT pipeline (if running in
Continuousmode) to conserve resources. You can restart them when needed for further development.
Summary & Next Steps
Congratulations! You have successfully implemented the raw data ingestion pipeline using Databricks Delta Live Tables, establishing a robust Bronze layer for our real-time supply chain analytics platform. This layer is now continuously fed with raw events from Kafka, providing an immutable and auditable source of truth. We’ve adhered to production best practices by using Databricks Secrets for credentials, Unity Catalog for governance, and DLT expectations for basic data quality.
In the next chapter, we will move to the Silver layer. We will build another DLT pipeline to consume the raw events from our Bronze table, parse the JSON payload, apply schema enforcement, cleanse the data, and enrich it with initial business context. This will transform our raw events into structured, clean, and queryable data suitable for further analysis.