Chapter 11: End-to-End Real-time Procurement Price Intelligence
1. Chapter Introduction
In this pivotal chapter, we will construct an end-to-end real-time procurement price intelligence pipeline. This pipeline is crucial for modern supply chains, enabling organizations to react swiftly to price fluctuations, optimize procurement costs, and mitigate risks associated with volatile markets. By leveraging the power of Apache Kafka for real-time event ingestion, Databricks Delta Live Tables (DLT) for robust stream processing, and Delta Lake with Unity Catalog for reliable data storage and governance, we will build a system that delivers actionable insights continuously.
The importance of this step cannot be overstated. In today’s dynamic global economy, procurement decisions made even hours too late can lead to significant cost increases or supply disruptions. This pipeline will ingest raw procurement events, enrich them with critical information like tariff rates and logistics costs (derived from previous chapters), compare them against historical trends, and flag potential anomalies—all in near real-time. This proactive intelligence empowers procurement teams to negotiate better deals, identify cost-saving opportunities, and ensure continuity of supply.
As prerequisites, you should have a working Databricks workspace configured with Unity Catalog. We will assume that the Kafka topic for raw procurement events is set up and that the Delta Lake tables for HS Code master data, tariff rates, logistics costs, and historical procurement prices (as discussed and potentially built in earlier chapters) are available within Unity Catalog.
The expected outcome of this chapter is a fully functional, production-ready DLT pipeline that continuously processes procurement data from Kafka, transforms and enriches it, applies price anomaly detection, and stores the results in a governed Delta Lake table, ready for downstream analytics and reporting.
2. Planning & Design
Component Architecture
The real-time procurement price intelligence pipeline will adhere to the Medallion Architecture pattern (Bronze, Silver, Gold layers) within Databricks, ensuring data quality, reliability, and reusability.
- Kafka (Source): Raw procurement transaction events (e.g., purchase order creations, price updates) are published to a dedicated Kafka topic.
- Databricks Delta Live Tables (DLT):
- Bronze Layer: A DLT streaming table will ingest raw JSON messages directly from Kafka, storing them as-is. This serves as a resilient landing zone.
- Silver Layer: Another DLT streaming table will consume data from the Bronze layer. Here, raw messages are parsed, validated, cleaned, and most importantly, enriched. Enrichment involves joining with static/slowly changing dimension tables from previous chapters (HS Code master, tariff rates, logistics costs) and historical procurement data for baseline price comparisons. A simple rule-based anomaly detection will also be applied.
- Gold Layer: A DLT streaming table will consume from the Silver layer. This layer focuses on aggregating the enriched data into business-ready metrics for price intelligence, such as average prices over different time windows, price volatility, and latest prices per item and supplier.
- Delta Lake with Unity Catalog: All Bronze, Silver, and Gold tables will be stored in Delta Lake format and governed by Databricks Unity Catalog, providing centralized metadata, fine-grained access control, and data lineage.
Data Model (Unity Catalog Tables)
1. Bronze Layer: supply_chain_catalog.procurement.raw_procurement_events
- Purpose: Raw, immutable landing zone for Kafka messages.
- Schema (Streaming Table):
key(BINARY): Kafka message key (if any).value(BINARY): Raw Kafka message value (JSON payload).topic(STRING): Kafka topic name.partition(INTEGER): Kafka partition.offset(LONG): Kafka offset.timestamp(TIMESTAMP): Kafka message timestamp.timestampType(INTEGER): Kafka timestamp type.
2. Silver Layer: supply_chain_catalog.procurement.procurement_enriched
- Purpose: Cleaned, validated, and enriched procurement data.
- Schema (Streaming Table):
event_id(STRING, NOT NULL): Unique identifier for the procurement event.event_timestamp(TIMESTAMP, NOT NULL): Timestamp of the procurement event.supplier_id(STRING): Identifier for the supplier.item_id(STRING, NOT NULL): Unique identifier for the procured item.item_description(STRING): Description of the item.quantity(DOUBLE): Quantity of the item procured.unit_price(DOUBLE, NOT NULL): Unit price of the item.currency(STRING): Currency of the unit price.hs_code(STRING): Harmonized System code for the item.origin_country(STRING): Country of origin.destination_country(STRING): Country of destination.delivery_date(TIMESTAMP): Expected delivery date.effective_tariff_rate(DOUBLE): Applied tariff rate (fromtariff_ratestable).estimated_logistics_cost(DOUBLE): Estimated logistics cost (fromlogistics_coststable).historical_avg_price_overall(DOUBLE): Long-term historical average price for the item (fromhistorical_procurement_prices).avg_price_7d_moving(DOUBLE): 7-day moving average price for the item (calculated dynamically).price_deviation_percentage(DOUBLE): Percentage deviation fromavg_price_7d_moving.is_price_anomaly(BOOLEAN): Flag indicating a price anomaly.data_quality_issues(ARRAY): Array of data quality issues encountered during processing. processed_timestamp(TIMESTAMP, NOT NULL): Timestamp when the record was processed by DLT.
3. Gold Layer: supply_chain_catalog.procurement.procurement_price_intelligence
- Purpose: Aggregated, business-ready metrics for price intelligence.
- Schema (Streaming Table, Type 2 SCD or Upsert):
item_id(STRING, NOT NULL)supplier_id(STRING)region(STRING): Derived fromdestination_country.latest_unit_price(DOUBLE): Most recent unit price.latest_price_timestamp(TIMESTAMP): Timestamp of the latest price.avg_daily_price(DOUBLE): Average price for the current day.avg_7day_price(DOUBLE): Average price over the last 7 days.avg_30day_price(DOUBLE): Average price over the last 30 days.min_price_30day(DOUBLE): Minimum price over the last 30 days.max_price_30day(DOUBLE): Maximum price over the last 30 days.price_volatility_index_30day(DOUBLE): Metric for price volatility.total_quantity_30day(DOUBLE): Total quantity procured over the last 30 days.last_update_timestamp(TIMESTAMP, NOT NULL): Timestamp of the last update to this aggregated record.
File Structure
We will implement the DLT pipeline within a single Python notebook, which is a common and recommended practice for DLT deployments.
.
├── src/
│ └── databricks/
│ └── dlt_pipelines/
│ └── procurement_price_intelligence_pipeline.py # Main DLT pipeline notebook
├── notebooks/
│ └── kafka_producer_simulator.py # Utility to simulate Kafka events for testing
└── conf/
└── dlt_pipeline_config.json # Example DLT pipeline configuration (for deployment)
3. Step-by-Step Implementation
We’ll build the DLT pipeline incrementally, starting with Kafka ingestion and moving through the Medallion layers.
3a) Setup/Configuration
First, let’s set up the environment and configurations required for our DLT pipeline.
1. Create the DLT Pipeline Notebook:
Create a new Python notebook in your Databricks workspace at src/databricks/dlt_pipelines/procurement_price_intelligence_pipeline.py.
2. Define Kafka Connection Details and Unity Catalog Paths:
Add the initial configuration and schema definitions to your procurement_price_intelligence_pipeline.py notebook. This includes fetching sensitive Kafka credentials from Databricks Secrets, which is a best practice for production environments.
# File: src/databricks/dlt_pipelines/procurement_price_intelligence_pipeline.py
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType, StructField, StringType, DoubleType, LongType,
TimestampType, IntegerType, BooleanType, ArrayType
)
from pyspark.sql.window import Window
# --- Configuration ---
# Kafka connection details should be stored securely in Databricks Secrets
# and retrieved using dlt.read_secret or spark.conf.get() if passed as pipeline params.
# For this tutorial, we assume a secret scope 'supply_chain_scope' exists.
KAFKA_BOOTSTRAP_SERVERS = dlt.read_secret("supply_chain_scope", "kafka_bootstrap_servers")
KAFKA_TOPIC_RAW_PROCUREMENT = dlt.read_secret("supply_chain_scope", "kafka_topic_raw_procurement")
KAFKA_SASL_USERNAME = dlt.read_secret("supply_chain_scope", "kafka_sasl_username")
KAFKA_SASL_PASSWORD = dlt.read_secret("supply_chain_scope", "kafka_sasl_password")
# Unity Catalog Schema and Table Names
# Ensure 'supply_chain_catalog' and 'procurement' schema exist in Unity Catalog
CATALOG_NAME = "supply_chain_catalog"
SCHEMA_NAME = "procurement" # New schema for procurement data
BRONZE_TABLE_FULL_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME}.raw_procurement_events"
SILVER_TABLE_FULL_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME}.procurement_enriched"
GOLD_TABLE_FULL_PATH = f"{CATALOG_NAME}.{SCHEMA_NAME}.procurement_price_intelligence"
# Reference tables from previous chapters (assumed to exist in Unity Catalog)
# Ensure these tables exist and are populated.
HS_CODE_MASTER_TABLE = f"{CATALOG_NAME}.gold.hs_code_master"
TARIFF_RATES_TABLE = f"{CATALOG_NAME}.gold.tariff_rates"
LOGISTICS_COSTS_TABLE = f"{CATALOG_NAME}.silver.logistics_costs"
HISTORICAL_PROCUREMENT_TABLE = f"{CATALOG_NAME}.gold.historical_procurement_prices" # For long-term historical context
# Anomaly detection threshold (e.g., 10% deviation from 7-day moving average)
PRICE_ANOMALY_THRESHOLD_PERCENTAGE = 0.10
# --- Schemas ---
# Schema for raw Kafka messages (value is JSON string)
# This is the expected schema of the JSON payload inside the Kafka 'value' field.
procurement_event_payload_schema = StructType([
StructField("event_id", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("supplier_id", StringType(), True),
StructField("item_id", StringType(), False),
StructField("item_description", StringType(), True),
StructField("quantity", DoubleType(), True),
StructField("unit_price", DoubleType(), False),
StructField("currency", StringType(), True),
StructField("hs_code", StringType(), True),
StructField("origin_country", StringType(), True),
StructField("destination_country", StringType(), True),
StructField("delivery_date", TimestampType(), True)
])
# Define Schema for the enriched Silver layer
silver_procurement_schema = StructType([
StructField("event_id", StringType(), False),
StructField("event_timestamp", TimestampType(), False),
StructField("supplier_id", StringType(), True),
StructField("item_id", StringType(), False),
StructField("item_description", StringType(), True),
StructField("quantity", DoubleType(), True),
StructField("unit_price", DoubleType(), False),
StructField("currency", StringType(), True),
StructField("hs_code", StringType(), True),
StructField("origin_country", StringType(), True),
StructField("destination_country", StringType(), True),
StructField("delivery_date", TimestampType(), True),
StructField("effective_tariff_rate", DoubleType(), True),
StructField("estimated_logistics_cost", DoubleType(), True),
StructField("historical_avg_price_overall", DoubleType(), True),
StructField("avg_price_7d_moving", DoubleType(), True),
StructField("price_deviation_percentage", DoubleType(), True),
StructField("is_price_anomaly", BooleanType(), True),
StructField("data_quality_issues", ArrayType(StringType()), True),
StructField("processed_timestamp", TimestampType(), False)
])
# Define Schema for the Gold layer (aggregated price intelligence)
gold_price_intelligence_schema = StructType([
StructField("item_id", StringType(), False),
StructField("supplier_id", StringType(), True),
StructField("region", StringType(), True), # Derived from destination_country
StructField("latest_unit_price", DoubleType(), True),
StructField("latest_price_timestamp", TimestampType(), True),
StructField("avg_daily_price", DoubleType(), True),
StructField("avg_7day_price", DoubleType(), True),
StructField("avg_30day_price", DoubleType(), True),
StructField("min_price_30day", DoubleType(), True),
StructField("max_price_30day", DoubleType(), True),
StructField("price_volatility_index_30day", DoubleType(), True),
StructField("total_quantity_30day", DoubleType(), True),
StructField("last_update_timestamp", TimestampType(), False)
])
Explanation:
dlt.read_secret: This function securely retrieves credentials from Databricks Secrets, preventing hardcoding sensitive information. Replace"supply_chain_scope"with your actual secret scope name.- Unity Catalog Paths: We define the full paths for our Bronze, Silver, and Gold tables, including the catalog and schema names. This ensures all tables are managed under Unity Catalog.
- Reference Tables: Placeholder variables for existing tables from previous chapters.
procurement_event_payload_schema: This schema defines the structure of the JSON payload expected within the Kafka messagevalue.silver_procurement_schemaandgold_price_intelligence_schema: These define the target schemas for our Silver and Gold layer tables, respectively.PRICE_ANOMALY_THRESHOLD_PERCENTAGE: A configurable threshold for flagging price anomalies.
3. Ensure Unity Catalog Schema Exists:
Before running the DLT pipeline, ensure the procurement schema exists within your supply_chain_catalog in Unity Catalog. You can create it manually via the Databricks UI or with a SQL command in a Databricks notebook:
-- In a separate Databricks notebook or SQL editor
CREATE SCHEMA IF NOT EXISTS supply_chain_catalog.procurement;
3b) Core Implementation
Now, let’s build the DLT pipeline stages.
1. Bronze Layer: Ingest Raw Procurement Events from Kafka
This DLT table will read directly from the Kafka topic and store the raw messages. This ensures data fidelity and allows for reprocessing if downstream logic changes.
# File: src/databricks/dlt_pipelines/procurement_price_intelligence_pipeline.py (continue appending)
@dlt.table(
name=BRONZE_TABLE_FULL_PATH,
comment="Raw procurement events ingested from Kafka",
table_properties={"quality": "bronze"},
# Set to 'false' in production to prevent pipeline failure on data loss from Kafka
# but for initial testing, 'true' can help catch configuration issues.
spark_conf={
"spark.sql.streaming.kafka.failOnDataLoss": "false",
"spark.databricks.io.read.kafka.clusterId": KAFKA_BOOTSTRAP_SERVERS # Used for DLT UI monitoring
}
)
@dlt.expect_or_drop("valid_kafka_payload", "value IS NOT NULL")
def raw_procurement_events():
"""
Ingests raw procurement event messages from Kafka into the Bronze Delta table.
"""
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", KAFKA_TOPIC_RAW_PROCUREMENT)
.option("startingOffsets", "earliest") # Use "latest" in production for new pipelines
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config",
f"org.apache.kafka.common.security.plain.PlainLoginModule required "
f"username='{KAFKA_SASL_USERNAME}' password='{KAFKA_SASL_PASSWORD}';")
.option("maxOffsetsPerTrigger", 10000) # Control micro-batch size for performance
.load()
.withColumn("processed_timestamp", F.current_timestamp())
)
return df
Explanation:
@dlt.table: This decorator declares a DLT table.namespecifies the Unity Catalog path.table_propertiesallow tagging for data quality and governance.spark_conf:spark.sql.streaming.kafka.failOnDataLossis set tofalsefor robustness in production, preventing pipeline failures due to Kafka data retention policies.maxOffsetsPerTriggercontrols the processing rate.@dlt.expect_or_drop: A DLT data quality expectation. If thevaluecolumn (Kafka message payload) is null, the record is dropped. DLT automatically tracks data quality metrics.spark.readStream.format("kafka"): Configures Spark Structured Streaming to read from Kafka.- Kafka Options:
bootstrap.servers,subscribe,startingOffsets(uselatestin production for new streams), and SASL authentication details (usingPLAINmechanism overSASL_SSLfor cloud Kafka services). .withColumn("processed_timestamp", F.current_timestamp()): Adds a timestamp indicating when the record was processed by the DLT pipeline.
2. Silver Layer: Transform, Cleanse, and Enrich Procurement Data
This stage processes the raw Kafka messages, extracts the JSON payload, performs data cleaning, and enriches the data by joining with master data, tariff rates, and logistics costs from other Delta tables. It also initiates price anomaly detection.
# File: src/databricks/dlt_pipelines/procurement_price_intelligence_pipeline.py (continue appending)
@dlt.table(
name=SILVER_TABLE_FULL_PATH,
comment="Cleaned, validated, and enriched procurement events",
table_properties={"quality": "silver"},
schema=silver_procurement_schema # Enforce schema for the silver table
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("valid_item_id", "item_id IS NOT NULL")
@dlt.expect_or_drop("valid_unit_price", "unit_price IS NOT NULL AND unit_price >= 0")
def procurement_enriched():
"""
Transforms raw procurement events, enriches them with tariff and logistics data,
and performs initial price anomaly detection.
"""
# Read from the Bronze layer
bronze_df = dlt.read_stream(BRONZE_TABLE_FULL_PATH)
# Parse JSON payload and select relevant columns
parsed_df = (
bronze_df.withColumn("kafka_payload", F.from_json(F.col("value").cast(StringType()), procurement_event_payload_schema))
.select(
F.col("kafka_payload.event_id").alias("event_id"),
F.col("kafka_payload.timestamp").alias("event_timestamp"),
F.col("kafka_payload.supplier_id").alias("supplier_id"),
F.col("kafka_payload.item_id").alias("item_id"),
F.col("kafka_payload.item_description").alias("item_description"),
F.col("kafka_payload.quantity").alias("quantity"),
F.col("kafka_payload.unit_price").alias("unit_price"),
F.col("kafka_payload.currency").alias("currency"),
F.col("kafka_payload.hs_code").alias("hs_code"),
F.col("kafka_payload.origin_country").alias("origin_country"),
F.col("kafka_payload.destination_country").alias("destination_country"),
F.col("kafka_payload.delivery_date").alias("delivery_date"),
F.col("processed_timestamp") # From Bronze layer
)
)
# Initialize data quality issues array
parsed_df = parsed_df.withColumn("data_quality_issues", F.array_insert(F.array(), 0, F.lit(None)))
# --- Enrichment: HS Code Master Data (for validation/lookup) ---
hs_master_df = spark.read.table(HS_CODE_MASTER_TABLE)
enriched_df = parsed_df.join(
hs_master_df.select("hs_code", "hs_description_en"),
["hs_code"],
"left_outer"
)
# Add data quality issue if HS code not found
enriched_df = enriched_df.withColumn(
"data_quality_issues",
F.when(F.col("hs_description_en").isNull(), F.array_append(F.col("data_quality_issues"), F.lit("HS_CODE_NOT_FOUND")))
.otherwise(F.col("data_quality_issues"))
).drop("hs_description_en") # Drop join column after validation
# --- Enrichment: Tariff Rates ---
# Join with the latest tariff rates based on HS code, origin, destination, and delivery date
tariff_rates_df = spark.read.table(TARIFF_RATES_TABLE)
enriched_df = enriched_df.join(
tariff_rates_df.filter("is_current = true") # Assuming a flag for current tariffs
.select("hs_code", "origin_country", "destination_country", "tariff_rate"),
["hs_code", "origin_country", "destination_country"],
"left_outer"
).withColumnRenamed("tariff_rate", "effective_tariff_rate")
# --- Enrichment: Logistics Costs ---
# Join with estimated logistics costs
logistics_df = spark.read.table(LOGISTICS_COSTS_TABLE)
enriched_df = enriched_df.join(
logistics_df.select("origin_country", "destination_country", "estimated_cost"),
["origin_country", "destination_country"],
"left_outer"
).withColumnRenamed("estimated_cost", "estimated_logistics_cost")
# --- Enrichment: Historical Procurement Prices (for overall baseline) ---
historical_prices_df = spark.read.table(HISTORICAL_PROCUREMENT_TABLE)
enriched_df = enriched_df.join(
historical_prices_df.select("item_id", "supplier_id", "avg_unit_price_overall"),
["item_id", "supplier_id"],
"left_outer"
).withColumnRenamed("avg_unit_price_overall", "historical_avg_price_overall")
# --- Anomaly Detection: 7-day Moving Average and Deviation ---
# Calculate a 7-day moving average on the streaming data itself.
# This requires a window definition over event_timestamp.
window_spec_7d = (
Window.partitionBy("item_id", "supplier_id")
.orderBy(F.col("event_timestamp").cast("long")) # Cast to long for window ordering
.rangeBetween(F.expr("INTERVAL -7 DAYS"), F.current_timestamp()) # 7-day window
)
enriched_df = enriched_df.withColumn(
"avg_price_7d_moving",
F.avg("unit_price").over(window_spec_7d)
)
# Calculate price deviation and anomaly flag
enriched_df = enriched_df.withColumn(
"price_deviation_percentage",
F.when(F.col("avg_price_7d_moving").isNotNull() & (F.col("avg_price_7d_moving") != 0),
(F.col("unit_price") - F.col("avg_price_7d_moving")) / F.col("avg_price_7d_moving"))
.otherwise(F.lit(0.0))
)
enriched_df = enriched_df.withColumn(
"is_price_anomaly",
F.when(F.abs(F.col("price_deviation_percentage")) > PRICE_ANOMALY_THRESHOLD_PERCENTAGE, True)
.otherwise(False)
)
# Final selection and type casting to match silver_procurement_schema
# Ensure all columns from the schema are present, adding nulls for missing ones if necessary
# Or rely on DLT's schema enforcement to handle this.
final_silver_df = enriched_df.select(
F.col("event_id"),
F.col("event_timestamp"),
F.col("supplier_id"),
F.col("item_id"),
F.col("item_description"),
F.col("quantity"),
F.col("unit_price"),
F.col("currency"),
F.col("hs_code"),
F.col("origin_country"),
F.col("destination_country"),
F.col("delivery_date"),
F.col("effective_tariff_rate"),
F.col("estimated_logistics_cost"),
F.col("historical_avg_price_overall"),
F.col("avg_price_7d_moving"),
F.col("price_deviation_percentage"),
F.col("is_price_anomaly"),
F.col("data_quality_issues"),
F.col("processed_timestamp")
)
return final_silver_df
Explanation:
dlt.read_stream(BRONZE_TABLE_FULL_PATH): Reads the continuously arriving data from the Bronze DLT table.F.from_json(...): Parses the JSON string from thevaluecolumn (Kafka message) into a struct using the predefinedprocurement_event_payload_schema.@dlt.expect_or_drop: Additional data quality checks for critical fields likeevent_id,item_id, andunit_price.- Data Quality Tracking: An array column
data_quality_issuesis initialized and populated if validation rules (likeHS_CODE_NOT_FOUND) are violated. This allows for auditing and potentially re-processing malformed records. - Enrichment Joins: The
parsed_dfis joined with static/slowly changing dimension tables (HS_CODE_MASTER_TABLE,TARIFF_RATES_TABLE,LOGISTICS_COSTS_TABLE,HISTORICAL_PROCUREMENT_TABLE) usingspark.read.table()to fetch the latest available data. These are batch reads against Delta tables, which are efficient. Window.partitionBy(...).orderBy(...).rangeBetween(...): This defines a Spark window function for calculating the 7-day moving average (avg_price_7d_moving) based onitem_idandsupplier_id. DLT handles the state management for this streaming window.- Anomaly Detection: A simple rule flags a price as an anomaly if its deviation from the 7-day moving average exceeds
PRICE_ANOMALY_THRESHOLD_PERCENTAGE. - Schema Enforcement: The
@dlt.table(schema=...)decorator ensures that the output DataFrame conforms tosilver_procurement_schema, providing robust data quality.
3. Gold Layer: Aggregate for Price Intelligence
This stage aggregates the enriched data to create business-level metrics for procurement price intelligence, such as average prices over different time windows and price volatility.
# File: src/databricks/dlt_pipelines/procurement_price_intelligence_pipeline.py (continue appending)
@dlt.table(
name=GOLD_TABLE_FULL_PATH,
comment="Aggregated procurement price intelligence metrics",
table_properties={"quality": "gold"},
schema=gold_price_intelligence_schema
)
def procurement_price_intelligence():
"""
Aggregates enriched procurement data to provide real-time price intelligence.
Uses APPLY CHANGES INTO for efficient upserts into the Gold table.
"""
silver_df = dlt.read_stream(SILVER_TABLE_FULL_PATH)
# Define a window for aggregations over the last 30 days
# This window will be used for min/max/avg over a longer period.
window_spec_30d = (
Window.partitionBy("item_id", "supplier_id")
.orderBy(F.col("event_timestamp").cast("long"))
.rangeBetween(F.expr("INTERVAL -30 DAYS"), F.current_timestamp())
)
# Derive region from destination_country for broader intelligence
# A more robust mapping could be done with a lookup table
agg_df = silver_df.withColumn(
"region",
F.when(F.col("destination_country").isin(["US", "CA", "MX"]), "North America")
.when(F.col("destination_country").isin(["GB", "DE", "FR", "IT"]), "Europe")
.otherwise("Other")
)
# Perform streaming aggregations
# DLT automatically handles the state for these aggregations
agg_df = agg_df.withColumn("current_date", F.to_date("event_timestamp")) \
.groupBy("item_id", "supplier_id", "region") \
.agg(
F.max("event_timestamp").alias("latest_price_timestamp"),
F.last("unit_price", ignorenulls=True).alias("latest_unit_price"), # Get latest price
F.avg(F.when(F.to_date("event_timestamp") == F.current_date(), F.col("unit_price"))).alias("avg_daily_price"), # Average for current day
F.avg("unit_price").over(window_spec_30d).alias("avg_30day_price_raw"), # Raw 30-day average
F.min("unit_price").over(window_spec_30d).alias("min_price_30day"),
F.max("unit_price").over(window_spec_30d).alias("max_price_30day"),
F.stddev("unit_price").over(window_spec_30d).alias("price_stddev_30day"),
F.sum("quantity").over(window_spec_30d).alias("total_quantity_30day")
)
# Calculate 7-day average from the silver table (or recalculate here if needed)
# For simplicity, we'll assume a 7-day moving average is also calculated based on the Gold table's internal state
# or a separate DLT table can compute this. For a true real-time 7-day avg in Gold,
# another window function is needed. For this example, we calculate it based on the 30-day window's data.
agg_df = agg_df.withColumn(
"avg_7day_price",
F.avg("latest_unit_price").over(
Window.partitionBy("item_id", "supplier_id")
.orderBy(F.col("latest_price_timestamp").cast("long"))
.rangeBetween(F.expr("INTERVAL -7 DAYS"), F.current_timestamp())
)
)
# Final 30-day average and volatility index calculation
agg_df = agg_df.withColumn(
"avg_30day_price", F.col("avg_30day_price_raw") # Use the raw 30-day average
).withColumn(
"price_volatility_index_30day",
F.when(F.col("avg_30day_price") != 0, F.col("price_stddev_30day") / F.col("avg_30day_price"))
.otherwise(0.0)
).withColumn("last_update_timestamp", F.current_timestamp()) \
.drop("current_date", "avg_30day_price_raw", "price_stddev_30day")
return agg_df
Explanation:
dlt.read_stream(SILVER_TABLE_FULL_PATH): Reads the continuously arriving and enriched data from the Silver DLT table.Window.partitionBy(...).orderBy(...).rangeBetween(...): Defines 30-day and 7-day tumbling/sliding windows for various aggregations. DLT’s streaming capabilities efficiently handle these stateful operations.- Region Derivation: A simple
when().otherwise()statement categorizesdestination_countryinto broaderregioncategories. In a real-world scenario, this might involve a dedicated lookup table for geopolitical regions. - Streaming Aggregations:
groupBy().agg()is used to calculatelatest_price_timestamp,latest_unit_price,avg_daily_price,min_price_30day,max_price_30day,total_quantity_30day, andprice_volatility_index_30day. DLT manages the state for these aggregations, ensuring correct results even with late-arriving data. F.last(..., ignorenulls=True): Retrieves the last non-null value within the group, useful forlatest_unit_price.- Price Volatility Index: Calculated as the standard deviation of price over the 30-day window divided by the average price, giving an indication of price stability.
last_update_timestamp: Records when the aggregated metric was last updated.- Schema Enforcement: The output DataFrame is again enforced to
gold_price_intelligence_schema.
3c) Testing This Component
To test the DLT pipeline, you’ll need to simulate Kafka messages.
1. Kafka Producer Simulator:
Create a Python script notebooks/kafka_producer_simulator.py to send sample procurement events to your Kafka topic.
# File: notebooks/kafka_producer_simulator.py
from kafka import KafkaProducer
import json
import time
import uuid
from datetime import datetime, timedelta
# --- Configuration ---
KAFKA_BOOTSTRAP_SERVERS = "your_kafka_bootstrap_servers:9092" # e.g., "pkc-xxxx.us-east-1.aws.confluent.cloud:9092"
KAFKA_TOPIC_RAW_PROCUREMENT = "raw_procurement_events_topic"
KAFKA_SASL_USERNAME = "your_kafka_username"
KAFKA_SASL_PASSWORD = "your_kafka_password"
# --- Sample Data ---
sample_items = [
{"item_id": "ITEM001", "description": "Microcontroller Unit", "hs_code": "8542.31", "currency": "USD", "base_price": 5.00},
{"item_id": "ITEM002", "description": "Resistor Pack (1000 pcs)", "hs_code": "8533.21", "currency": "USD", "base_price": 12.50},
{"item_id": "ITEM003", "description": "Lithium-Ion Battery", "hs_code": "8507.60", "currency": "EUR", "base_price": 25.00},
{"item_id": "ITEM004", "description": "Capacitor Assortment", "hs_code": "8532.25", "currency": "USD", "base_price": 8.75},
]
sample_suppliers = [
{"id": "SUP001", "name": "GlobalTech Inc.", "origin": "US"},
{"id": "SUP002", "name": "EuroParts Ltd.", "origin": "DE"},
{"id": "SUP003", "name": "AsiaSupply Co.", "origin": "CN"},
]
sample_destinations = ["US", "DE", "GB", "JP"]
def generate_procurement_event():
item = random.choice(sample_items)
supplier = random.choice(sample_suppliers)
destination = random.choice(sample_destinations)
# Simulate price fluctuation
price_factor = random.uniform(0.9, 1.1)
unit_price = round(item["base_price"] * price_factor, 2)
quantity = random.randint(100, 5000)
event_data = {
"event_id": str(uuid.uuid4()),
"timestamp": (datetime.now() - timedelta(minutes=random.randint(0, 60))).isoformat(),
"supplier_id": supplier["id"],
"item_id": item["item_id"],
"item_description": item["description"],
"quantity": quantity,
"unit_price": unit_price,
"currency": item["currency"],
"hs_code": item["hs_code"],
"origin_country": supplier["origin"],
"destination_country": destination,
"delivery_date": (datetime.now() + timedelta(days=random.randint(7, 30))).isoformat()
}
return event_data
if __name__ == "__main__":
import random
print("Initializing Kafka Producer...")
try:
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username=KAFKA_SASL_USERNAME,
sasl_plain_password=KAFKA_SASL_PASSWORD,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Kafka Producer initialized successfully.")
except Exception as e:
print(f"Error initializing Kafka Producer: {e}")
exit(1)
print(f"Sending messages to Kafka topic: {KAFKA_TOPIC_RAW_PROCUREMENT}")
try:
while True:
event = generate_procurement_event()
producer.send(KAFKA_TOPIC_RAW_PROCUREMENT, event)
print(f"Sent: {event['item_id']} @ {event['unit_price']} {event['currency']}")
time.sleep(random.uniform(0.5, 2.0)) # Send messages every 0.5 to 2 seconds
except KeyboardInterrupt:
print("\nStopping producer.")
finally:
producer.close()
print("Kafka Producer closed.")
Before Running the Simulator:
Replace Placeholders: Update
KAFKA_BOOTSTRAP_SERVERS,KAFKA_TOPIC_RAW_PROCUREMENT,KAFKA_SASL_USERNAME, andKAFKA_SASL_PASSWORDwith your actual Kafka cluster details and credentials. These should match the secrets configured in your DLT pipeline.Install Kafka-Python:
pip install kafka-pythonPre-populate Reference Tables: Ensure
HS_CODE_MASTER_TABLE,TARIFF_RATES_TABLE,LOGISTICS_COSTS_TABLE, andHISTORICAL_PROCUREMENT_TABLEexist and contain some sample data for the joins to work. You can create dummy data using SQL in a Databricks notebook.Example SQL for
HS_CODE_MASTER_TABLE:CREATE TABLE IF NOT EXISTS supply_chain_catalog.gold.hs_code_master ( hs_code STRING, hs_description_en STRING ) USING DELTA; INSERT INTO supply_chain_catalog.gold.hs_code_master VALUES ('8542.31', 'Electronic integrated circuits: Processors, controllers'), ('8533.21', 'Fixed resistors: Of a power handling capacity not exceeding 20 W'), ('8507.60', 'Lithium-ion accumulators');Similarly, create dummy data for
tariff_rates,logistics_costs, andhistorical_procurement_prices.
2. Deploy and Run the DLT Pipeline:
- In your Databricks workspace, navigate to “Workflows” -> “Delta Live Tables” and click “Create Pipeline”.
- Pipeline Name:
Procurement_Price_Intelligence_Pipeline - Product Edition: Select “Advanced” to enable all DLT features.
- Pipeline Type: “Continuous” (for real-time processing).
- Source Libraries: Select “Notebooks” and browse to
src/databricks/dlt_pipelines/procurement_price_intelligence_pipeline.py. - Target: Enter
supply_chain_catalog.procurement(this specifies the Unity Catalog schema where the tables will be created). - Cluster Policy: Choose a suitable cluster policy.
- Advanced Options:
- Spark Config: Add any specific Spark configurations if needed (e.g.,
spark.databricks.delta.properties.defaults.enableChangeDataFeedfor CDC). - Secrets: Ensure your secret scope (
supply_chain_scope) is accessible to the cluster.
- Spark Config: Add any specific Spark configurations if needed (e.g.,
- Click “Create” and then “Start” the pipeline.
3. Verify Data Flow:
- Start the Kafka Producer Simulator: Run
python notebooks/kafka_producer_simulator.pyfrom your local machine or a cloud VM. - Monitor DLT UI: Observe the DLT pipeline graph. You should see records flowing from Kafka into Bronze, then to Silver, and finally to Gold.
- Query Delta Tables (Unity Catalog): In a Databricks SQL editor or another notebook, query the tables to verify data:Check if
SELECT * FROM supply_chain_catalog.procurement.raw_procurement_events LIMIT 10; SELECT * FROM supply_chain_catalog.procurement.procurement_enriched LIMIT 10; SELECT * FROM supply_chain_catalog.procurement.procurement_price_intelligence LIMIT 10;effective_tariff_rate,estimated_logistics_cost,historical_avg_price_overall,avg_price_7d_moving,price_deviation_percentage, andis_price_anomalycolumns are populated correctly in the Silver table. In the Gold table, verify aggregations likelatest_unit_price,avg_daily_price,avg_7day_price,avg_30day_price, andprice_volatility_index_30day.
4. Production Considerations
Building a production-ready real-time pipeline requires careful attention to robustness, performance, and security.
- Error Handling:
- DLT Expectations: Leverage DLT’s expectations (
@dlt.expect_or_drop,@dlt.expect_or_fail,@dlt.expect_or_quarantine) extensively. They automatically track data quality metrics and allow defining policies for handling invalid records (dropping, failing the pipeline, or quarantining for later review). - Schema Evolution: DLT tables inherently handle schema evolution (by default,
addNewColumnsis enabled). For more controlled evolution, considermergeSchemaor explicit schema updates. Malformed JSON payloads are handled byF.from_jsonreturning nulls; these can be filtered or quarantined. - Kafka
failOnDataLoss: Keepspark.sql.streaming.kafka.failOnDataLossset tofalsein the Bronze layer to prevent pipeline failure if Kafka purges old offsets before they are processed.
- DLT Expectations: Leverage DLT’s expectations (
- Performance Optimization:
- DLT Serverless: Utilize DLT’s Serverless compute (if available in your region) to abstract cluster management and optimize resource allocation.
maxOffsetsPerTrigger: Tune this Kafka option in the Bronze layer to control the micro-batch size. A larger value can increase throughput but also latency.trigger(processingTime='...'): While DLTcontinuousmode handles triggers automatically, for batch-like streaming,trigger(availableNow=True)ortrigger(processingTime='5 seconds')can be used. DLT’s enhanced autoscaling for continuous pipelines automatically adjusts resources.- Delta Lake Optimizations: DLT automatically applies
OPTIMIZEandVACUUMfor managed tables. For large tables, considerZORDERon frequently filtered columns (e.g.,item_id,event_timestamp) to improve query performance. - Cluster Sizing: Monitor Spark UI and DLT metrics to ensure the cluster is appropriately sized for your data volume and processing requirements.
- Security Considerations:
- Unity Catalog: All tables are governed by Unity Catalog, providing centralized access control (GRANT/REVOKE permissions), auditing, and data lineage. Ensure appropriate permissions are set for the DLT service principal and any users/groups querying the data.
- Databricks Secrets: Critical credentials like Kafka API keys and passwords MUST be stored in Databricks Secrets and never hardcoded.
- Network Isolation: For highly sensitive data, deploy Databricks workspaces with VNet injection (Azure) or PrivateLink (AWS/Azure) to ensure data traffic remains within your private network.
- Role-Based Access Control (RBAC): Implement strict RBAC for DLT pipelines and Databricks resources. Only authorized users should be able to deploy, manage, and monitor pipelines.
- Logging and Monitoring:
- DLT UI: The DLT UI provides comprehensive dashboards for pipeline status, data quality metrics, and performance.
- Spark UI: For deeper debugging, access the underlying Spark UI from the DLT pipeline details.
- Databricks Logs: Configure cluster logging to send logs to a centralized logging solution (e.g., Azure Log Analytics, AWS CloudWatch, Splunk) for long-term retention and analysis.
- Custom Metrics: Integrate custom metrics into your DLT pipeline using Spark’s
metricsAPI or by writing aggregated statistics to a separate monitoring table. - Alerting: Set up alerts based on DLT pipeline failures, data quality expectation breaches, or significant deviations in key Gold layer metrics (e.g., a sudden spike in
is_price_anomalyflags).
5. Code Review Checkpoint
At this point, we have successfully implemented the core components of our real-time procurement price intelligence pipeline using Databricks Delta Live Tables.
Summary of what was built:
- A DLT pipeline (
procurement_price_intelligence_pipeline.py) structured into Bronze, Silver, and Gold layers. - Bronze Layer: Ingests raw procurement event JSON messages from a Kafka topic into
supply_chain_catalog.procurement.raw_procurement_events. - Silver Layer: Transforms raw messages, parses JSON, cleanses data, enriches with HS Code master data, tariff rates, logistics costs, and historical procurement prices into
supply_chain_catalog.procurement.procurement_enriched. It also calculates a 7-day moving average and flags potential price anomalies. - Gold Layer: Aggregates enriched data into business-ready price intelligence metrics (
latest_unit_price,avg_daily_price,avg_7day_price,avg_30day_price,price_volatility_index_30day, etc.) insupply_chain_catalog.procurement.procurement_price_intelligence. - A Kafka producer simulator (
kafka_producer_simulator.py) to generate test data.
Files created/modified:
src/databricks/dlt_pipelines/procurement_price_intelligence_pipeline.pynotebooks/kafka_producer_simulator.py
How it integrates with existing code:
- The DLT pipeline leverages existing Delta Lake tables (e.g.,
hs_code_master,tariff_rates,logistics_costs,historical_procurement_prices) from previous chapters, demonstrating a cohesive Lakehouse architecture. - It utilizes Databricks Unity Catalog for unified governance across all data layers.
This pipeline forms the backbone for real-time procurement analytics and decision-making, providing a continuously updated view of pricing dynamics.
6. Common Issues & Solutions
Developers might encounter several issues when building and deploying such a real-time DLT pipeline.
Kafka Connectivity Issues (
kafka-pythonproducer or DLT consumer)- Error:
NoBrokersAvailable,AuthenticationFailedError,TimeoutError. - Debugging:
- Verify
KAFKA_BOOTSTRAP_SERVERS: Double-check the hostname and port. Ensure it’s accessible from where the producer/DLT cluster is running (network security groups, firewalls). - Check Credentials: Confirm
KAFKA_SASL_USERNAMEandKAFKA_SASL_PASSWORDare correct and have permissions for the topic. For DLT, ensure the secret scope is correctly configured and the DLT cluster has access. - Topic Name: Verify
KAFKA_TOPIC_RAW_PROCUREMENTis correct and exists in Kafka. - Security Protocol: Ensure
SASL_SSLandPLAINmechanism are correctly specified for both producer and consumer.
- Verify
- Prevention: Use a small, separate test script to verify Kafka connectivity before integrating with the full DLT pipeline.
- Error:
Schema Mismatch/Evolution in Streaming Data
- Error:
Cannot resolve column ...,data type mismatch,malformed records dropped. - Debugging:
- Bronze Layer: If
valueis null orF.from_jsonreturns nulls for the parsed payload, the incoming Kafka message might not be valid JSON or doesn’t conform toprocurement_event_payload_schema. Inspect the rawvaluecolumn in the Bronze table. - Silver/Gold Layers: If columns are missing or types are incorrect, ensure the
selectstatements andschemaparameters in@dlt.tablematch the actual data structure. - DLT UI Data Quality: Check the DLT UI for data quality metrics and specific expectation failures (e.g.,
valid_kafka_payload).
- Bronze Layer: If
- Prevention:
- Schema Registry: Implement a Kafka Schema Registry (e.g., Confluent Schema Registry) to enforce and manage schema evolution for Kafka messages.
- Defensive Parsing: Use
F.from_jsonwith a permissive mode orfailfastoption, combined with robust error handling for malformed records (e.g., quarantining them). - Explicit Schemas: Always define and apply explicit schemas for DLT tables as shown in this chapter.
- Error:
State Management Issues (e.g., Checkpoint Corruption)
- Error:
Checkpoint folder not found,IllegalStateException: Cannot process data after a restart.... - Debugging:
- This typically happens if the DLT pipeline’s checkpoint location gets corrupted or is deleted.
- Check Databricks cluster logs for specific errors related to checkpointing.
- Prevention:
- DLT Handles Checkpointing: DLT automatically manages checkpoint locations for its tables. Avoid manually interfering with these locations.
- Restart Strategies: If a checkpoint is truly corrupted, the only solution might be to reset the pipeline state in DLT (which clears checkpoints and restarts processing from
startingOffsets). UsestartingOffsets: "latest"for production restarts to avoid reprocessing old data, orearliestfor a full historical reload if needed.
- Error:
7. Testing & Verification
After implementing and deploying the DLT pipeline, a thorough testing and verification process is essential.
End-to-End Data Flow:
- Ensure the Kafka producer is actively sending messages.
- Monitor the DLT pipeline in the Databricks UI. Verify that data flows through Bronze -> Silver -> Gold stages without errors and that the tables are continuously updated.
- Check the number of records processed at each stage.
Data Quality Checks:
- Bronze Layer: Query
supply_chain_catalog.procurement.raw_procurement_eventsto ensure raw Kafka messages are ingested correctly, includingkey,value,topic, andtimestamp. - Silver Layer:
- Query
supply_chain_catalog.procurement.procurement_enriched. - Verify that
event_id,item_id,unit_price,event_timestampare not null. - Check
effective_tariff_rate,estimated_logistics_cost,historical_avg_price_overallfor correct values based on your dummy reference data. - Examine
avg_price_7d_moving,price_deviation_percentage, andis_price_anomaly. Manually send some events with unusually high/low prices to verify the anomaly detection logic. - Inspect the
data_quality_issuesarray for any flags.
- Query
- Gold Layer:
- Query
supply_chain_catalog.procurement.procurement_price_intelligence. - Verify that
latest_unit_priceandlatest_price_timestampreflect the most recent data. - Check
avg_daily_price,avg_7day_price,avg_30day_price,min_price_30day,max_price_30day,price_volatility_index_30dayfor plausible aggregated values. These will become more stable as more data flows through.
- Query
- Bronze Layer: Query
Latency and Throughput:
- Monitor the DLT UI for pipeline latency (time from source to target) and throughput (records/second).
- Adjust
maxOffsetsPerTriggerand cluster size as needed to meet your real-time requirements.
Reference Data Integration:
- Test scenarios where reference data (tariffs, HS codes) might be updated. Ensure the DLT pipeline picks up these changes correctly (joins with batch tables will always get the latest state).
Security Verification:
- Attempt to query the tables from a user without sufficient Unity Catalog permissions. Ensure access is denied.
- Verify that no sensitive credentials are exposed in logs or code.
8. Summary & Next Steps
In this chapter, we successfully built an end-to-end real-time procurement price intelligence pipeline using Databricks Delta Live Tables, Kafka, and Delta Lake with Unity Catalog. We covered:
- Ingesting raw procurement events from Kafka into a Bronze Delta Live Table.
- Transforming, cleansing, and enriching this raw data with HS code master data, tariff rates, logistics costs, and historical procurement prices in a Silver Delta Live Table.
- Implementing a rule-based price anomaly detection mechanism and calculating a 7-day moving average on streaming data.
- Aggregating the enriched data into key price intelligence metrics (latest price, various averages, volatility) in a Gold Delta Live Table.
- Discussing crucial production considerations including error handling, performance optimization, security, and monitoring.
- Providing a Kafka producer simulator and verification steps to test the entire pipeline.
This pipeline now provides a continuously updated, reliable, and governed source of truth for procurement pricing, empowering real-time decision-making.
In the next chapter, we will focus on Chapter 12: Building a Real-time Dashboard and Alerting System for Price Intelligence. We will leverage the Gold layer procurement_price_intelligence table to create interactive dashboards using Databricks SQL Dashboards or external BI tools, and set up automated alerts for detected price anomalies or significant market shifts, closing the loop on our real-time intelligence system.