Chapter 5: Real-time Supply Chain Delay Analytics (Gold Layer)

Chapter Introduction

Welcome to Chapter 5, where we elevate our supply chain data from the Silver layer to the Gold layer. In this crucial phase, we will build Databricks Delta Live Tables (DLT) pipelines to perform real-time aggregations and derive actionable insights for supply chain delay analytics. This involves taking the cleaned and enriched data from our Silver tables and transforming it into easily consumable metrics, such as average delay times, on-time delivery rates, and identifying critical delay incidents.

The Gold layer is paramount because it serves as the direct source for business intelligence dashboards, reporting tools, and downstream analytical applications. By providing pre-aggregated and highly curated data, we empower business users to make swift, data-driven decisions without needing to understand the complexities of raw or semi-processed data. This abstraction not only improves data accessibility but also significantly boosts query performance for analytical workloads.

Our journey in this chapter relies on the robust Silver layer DLT pipelines established in previous chapters, particularly the supply_chain_silver.enriched_events table. We will leverage this foundation to build our Gold tables incrementally. By the end of this chapter, you will have production-ready Gold Delta tables, such as supply_chain_gold.delay_summary_hourly and supply_chain_gold.critical_delays, providing real-time visibility into supply chain performance.

Planning & Design

Building the Gold layer requires careful planning to ensure the derived metrics are accurate, consistent, and meet the analytical needs of the business.

Component Architecture

Our Gold layer architecture will extend our existing Medallion Lakehouse pattern:

  1. Source: supply_chain_silver.enriched_events (and potentially other Silver dimension tables like products or suppliers).
  2. Transformation Logic: Databricks Delta Live Tables (DLT) pipeline processing the Silver data. This pipeline will perform:
    • Aggregations (e.g., COUNT, AVG, SUM).
    • Calculations (e.g., on-time delivery rate).
    • Filtering (e.g., for critical delays).
    • Potentially joins with other Silver dimension tables for richer context.
  3. Destination: Dedicated Gold Delta tables (supply_chain_gold.delay_summary_hourly, supply_chain_gold.critical_delays) optimized for read performance and analytical queries.
graph TD A["Kafka Raw Events"] A --> B{DLT Bronze Layer} B -- "Raw Events" --> C["Bronze.raw_events"] C -- Clean & Enrich --> D{DLT Silver Layer} D -- "Cleaned & Enriched Events" --> E["Silver.enriched_events"] E -- Aggregate & Analyze --> F{DLT Gold Layer} F -- Delay Summary --> G["Gold.delay_summary_hourly"] F -- Critical Delays --> H["Gold.critical_delays"] G --> I[BI Dashboards] H --> I E -- Dimension Data --> F

Database Schema Design

We will define two primary Gold tables:

  1. supply_chain_gold.delay_summary_hourly: This table will provide an hourly summary of supply chain performance, enabling trends and aggregated analysis.

    Column NameData TypeDescription
    event_date_hourTIMESTAMPHourly timestamp for aggregation.
    product_idSTRINGIdentifier for the product.
    supplier_idSTRINGIdentifier for the supplier.
    route_idSTRINGIdentifier for the shipping route.
    total_shipmentsLONGTotal number of shipments in the hour for the given dimensions.
    total_delayed_shipmentsLONGTotal shipments that experienced a delay.
    avg_delay_minutesDOUBLEAverage delay in minutes for shipments.
    min_delay_minutesDOUBLEMinimum delay in minutes for shipments.
    max_delay_minutesDOUBLEMaximum delay in minutes for shipments.
    on_time_delivery_rateDOUBLEPercentage of shipments delivered on time.
    last_updatedTIMESTAMPTimestamp when the record was last updated.
  2. supply_chain_gold.critical_delays: This table will log specific incidents that are deemed “critical” based on a predefined delay threshold, allowing for focused investigation.

    Column NameData TypeDescription
    event_idSTRINGUnique identifier for the original supply chain event.
    event_timestampTIMESTAMPTimestamp of the event.
    product_idSTRINGIdentifier for the product.
    supplier_idSTRINGIdentifier for the supplier.
    route_idSTRINGIdentifier for the shipping route.
    expected_delivery_timeTIMESTAMPOriginal expected delivery timestamp.
    actual_delivery_timeTIMESTAMPActual delivery timestamp.
    delay_minutesDOUBLECalculated delay in minutes.
    delay_reasonSTRINGCategorized reason for the delay (e.g., weather, customs, transport).
    severity_levelSTRINGSeverity of the critical delay (e.g., “High”, “Critical”).
    last_updatedTIMESTAMPTimestamp when the record was last updated.

File Structure

We will continue to organize our DLT pipelines within the dlt_pipelines directory. For the Gold layer, we’ll create a new Python file that defines our Gold tables.

.
├── dlt_pipelines/
│   ├── bronze_pipeline.py
│   ├── silver_pipeline.py
│   └── gold_pipeline.py  # New file for this chapter
├── notebooks/
│   ├── 01_setup_dlt_env.ipynb
│   └── 02_test_dlt_pipelines.ipynb
├── config/
│   └── config.py
└── README.md

Step-by-Step Implementation

We will now implement the Gold layer DLT pipeline in a new Python file.

a) Setup/Configuration

First, create the new DLT pipeline file dlt_pipelines/gold_pipeline.py.

File: dlt_pipelines/gold_pipeline.py

# Databricks notebook source
# MAGIC %md
# MAGIC # Gold Layer DLT Pipeline for Real-time Supply Chain Delay Analytics
# MAGIC
# MAGIC This pipeline aggregates and enriches data from the Silver layer to create Gold tables
# MAGIC optimized for analytical consumption, providing key metrics and insights into supply chain delays.

import dlt
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define the target schema (database) for Gold tables
# This assumes 'supply_chain_gold' is created and managed by Unity Catalog
# or as a standard database. For DLT, we define it in pipeline settings.
# For local testing, ensure the database exists.

# Configuration for delay thresholds (can be parameterized or read from external config)
CRITICAL_DELAY_THRESHOLD_MINUTES = 60 # e.g., 1 hour delay is critical

@dlt.table(
    name="delay_summary_hourly",
    comment="Hourly aggregated summary of supply chain shipment delays and on-time performance.",
    table_properties={
        "quality": "gold",
        "delta.logRetentionDuration": "30 days", # Retain logs for 30 days
        "delta.targetFileSize": "128mb" # Optimize file size for analytical queries
    }
)
@dlt.expect_or_drop("valid_event_date_hour", "event_date_hour IS NOT NULL")
@dlt.expect_or_drop("valid_product_id", "product_id IS NOT NULL")
def delay_summary_hourly():
    """
    Aggregates enriched supply chain events hourly to provide delay summaries.
    Reads from the 'supply_chain_silver.enriched_events' table.
    """
    return (
        dlt.read_stream("supply_chain_silver.enriched_events")
        .withColumn("event_date_hour", F.date_trunc("hour", F.col("event_timestamp")))
        .withColumn("delay_minutes",
                    F.when(F.col("actual_delivery_time").isNotNull(),
                           F.round((F.col("actual_delivery_time").cast("long") - F.col("expected_delivery_time").cast("long")) / 60, 2))
                     .otherwise(0)) # Calculate delay in minutes, default to 0 if not delivered yet
        .groupBy("event_date_hour", "product_id", "supplier_id", "route_id")
        .agg(
            F.count("event_id").alias("total_shipments"),
            F.sum(F.when(F.col("delay_minutes") > 0, 1).otherwise(0)).alias("total_delayed_shipments"),
            F.avg(F.col("delay_minutes")).alias("avg_delay_minutes"),
            F.min(F.col("delay_minutes")).alias("min_delay_minutes"),
            F.max(F.col("delay_minutes")).alias("max_delay_minutes"),
            F.current_timestamp().alias("last_updated")
        )
        .withColumn("on_time_delivery_rate",
                    F.round((F.col("total_shipments") - F.col("total_delayed_shipments")) / F.col("total_shipments") * 100, 2))
    )

@dlt.table(
    name="critical_delays",
    comment="Table containing details of supply chain events identified as critical delays.",
    table_properties={
        "quality": "gold",
        "delta.logRetentionDuration": "30 days",
        "delta.targetFileSize": "128mb"
    }
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("has_delay_minutes", "delay_minutes IS NOT NULL AND delay_minutes > 0")
def critical_delays():
    """
    Identifies and logs individual supply chain events that exceed a predefined critical delay threshold.
    Reads from the 'supply_chain_silver.enriched_events' table.
    """
    return (
        dlt.read_stream("supply_chain_silver.enriched_events")
        .withColumn("delay_minutes",
                    F.when(F.col("actual_delivery_time").isNotNull(),
                           F.round((F.col("actual_delivery_time").cast("long") - F.col("expected_delivery_time").cast("long")) / 60, 2))
                     .otherwise(0))
        .filter(F.col("delay_minutes") >= CRITICAL_DELAY_THRESHOLD_MINUTES)
        .select(
            F.col("event_id"),
            F.col("event_timestamp"),
            F.col("product_id"),
            F.col("supplier_id"),
            F.col("route_id"),
            F.col("expected_delivery_time"),
            F.col("actual_delivery_time"),
            F.col("delay_minutes"),
            F.col("delay_reason"), # Assuming delay_reason is available from Silver layer
            F.lit("Critical").alias("severity_level"), # Assign a severity level
            F.current_timestamp().alias("last_updated")
        )
    )

# Optional: Add a materialized view for faster BI queries if needed
# @dlt.table(
#     name="daily_delay_trends_mv",
#     comment="Materialized view for daily delay trends, optimized for BI dashboards.",
#     as_view=True # Use as_view=True for materialized views
# )
# def daily_delay_trends_mv():
#     return (
#         dlt.read("delay_summary_hourly") # Reads from the previously defined DLT table
#         .withColumn("event_date", F.to_date(F.col("event_date_hour")))
#         .groupBy("event_date", "product_id")
#         .agg(
#             F.sum("total_shipments").alias("daily_total_shipments"),
#             F.avg("avg_delay_minutes").alias("daily_avg_delay_minutes"),
#             F.current_timestamp().alias("last_updated")
#         )
#     )

Explanation of the Code Block:

  • import dlt: Imports the Delta Live Tables library.
  • from pyspark.sql import functions as F: Imports PySpark SQL functions, aliased as F for convenience.
  • CRITICAL_DELAY_THRESHOLD_MINUTES: A constant defining what constitutes a “critical” delay. In a real-world scenario, this would likely be configurable, perhaps read from a Databricks secret or an external configuration table.
  • @dlt.table(...) decorator: This decorator marks a Python function as a DLT table.
    • name: Specifies the name of the output Delta table within the target schema (e.g., delay_summary_hourly).
    • comment: Provides a descriptive comment for the table, useful for data cataloging.
    • table_properties: Allows setting Delta Lake table properties directly.
      • "quality": "gold": A custom property to denote the data quality level.
      • "delta.logRetentionDuration": "30 days": Configures how long Delta Lake transaction logs are retained, impacting VACUUM operations.
      • "delta.targetFileSize": "128mb": Optimizes the target file size for data written to the Delta table, crucial for query performance in analytical workloads (prevents many small files or excessively large ones).
  • @dlt.expect_or_drop(...): These are DLT “expectations” for data quality.
    • They define rules that data must satisfy. If a record violates an expectation, expect_or_drop will remove it from the output table, preventing bad data from polluting the Gold layer. Other options like expect_or_fail (pipeline fails) or expect_or_warn (logs a warning) are also available depending on the desired strictness.
  • delay_summary_hourly() function:
    • dlt.read_stream("supply_chain_silver.enriched_events"): Reads data incrementally from our Silver layer enriched_events table. DLT automatically manages the streaming offsets.
    • withColumn("event_date_hour", F.date_trunc("hour", F.col("event_timestamp"))): Extracts the hour from the event_timestamp to group data hourly.
    • withColumn("delay_minutes", ...): Calculates the delay in minutes. It handles cases where actual_delivery_time might be null (i.e., shipment not yet delivered) by assigning a 0-minute delay, which can be adjusted based on business logic. The round function ensures clean numerical output.
    • groupBy(...): Groups the data by the specified dimensions for aggregation.
    • agg(...): Performs the actual aggregations:
      • count("event_id"): Counts total shipments.
      • sum(F.when(F.col("delay_minutes") > 0, 1).otherwise(0)): Counts delayed shipments.
      • avg, min, max for delay_minutes: Provides comprehensive delay statistics.
      • current_timestamp(): Records when the aggregation was last updated.
    • withColumn("on_time_delivery_rate", ...): Calculates the on-time delivery rate as a percentage.
  • critical_delays() function:
    • dlt.read_stream("supply_chain_silver.enriched_events"): Again, reads incrementally from the Silver layer.
    • withColumn("delay_minutes", ...): Recalculates delay_minutes (could be refactored into a shared function if more complex).
    • filter(F.col("delay_minutes") >= CRITICAL_DELAY_THRESHOLD_MINUTES): Filters for only those events that meet our definition of a critical delay.
    • select(...): Selects and renames columns to match our critical_delays schema.
    • F.lit("Critical").alias("severity_level"): Assigns a static severity level for these filtered events.

b) Core Implementation

The core implementation is provided in the dlt_pipelines/gold_pipeline.py file above. This file contains the complete definitions for both delay_summary_hourly and critical_delays DLT tables.

c) Testing This Component

To test the Gold layer DLT pipeline, you need to first ensure your Bronze and Silver pipelines are running and generating data. Then, deploy and run the Gold pipeline.

Steps to Test:

  1. Ensure Silver Data Exists: Verify that supply_chain_silver.enriched_events has data. If not, run your Bronze and Silver DLT pipelines first.
  2. Create/Update DLT Pipeline:
    • Navigate to the Databricks Workspace.
    • Go to “Workflows” -> “Delta Live Tables”.
    • Click “Create Pipeline” or select your existing pipeline and click “Edit Pipeline”.
    • Add dlt_pipelines/gold_pipeline.py as a new notebook source (if creating a new DLT pipeline specifically for Gold) or add it to your existing multi-tier pipeline definition.
    • Crucially, ensure the target database for this pipeline is set to supply_chain_gold.
    • Configure Cluster mode to Enhanced Autoscaling for production-readiness.
    • Set Channel to CURRENT for the latest features.
    • Set Pipeline mode to Continuous for real-time processing.
    • Click “Create” or “Save”.
  3. Start the DLT Pipeline:
    • Select the newly created or updated pipeline.
    • Click “Start”.
    • Monitor the DLT UI to observe the pipeline’s execution, data flow from Silver to Gold, and table creation.
  4. Query Gold Tables: Once the pipeline is running and data is flowing, you can query the Gold tables using Databricks SQL or a separate notebook.

Example Queries (in a Databricks Notebook or SQL Editor):

-- Query 1: Check the hourly delay summary
SELECT *
FROM supply_chain_gold.delay_summary_hourly
ORDER BY event_date_hour DESC
LIMIT 100;
-- Query 2: Check critical delays
SELECT *
FROM supply_chain_gold.critical_delays
ORDER BY event_timestamp DESC
LIMIT 100;
-- Query 3: Verify data quality expectations (optional, DLT UI shows this)
-- This is more for validating the DLT expectations reporting
SELECT
  SUM(CASE WHEN _is_valid_event_date_hour THEN 1 ELSE 0 END) AS valid_event_date_hour_count,
  SUM(CASE WHEN _is_valid_product_id THEN 1 ELSE 0 END) AS valid_product_id_count,
  COUNT(*) AS total_records
FROM supply_chain_gold.delay_summary_hourly_violations; -- DLT creates a violations table by default for expectations

Expected Behavior:

  • The delay_summary_hourly table should populate with aggregated metrics, updating as new data arrives in the Silver layer.
  • The critical_delays table should contain individual records for shipments exceeding the CRITICAL_DELAY_THRESHOLD_MINUTES.
  • The DLT UI should show the successful creation and updates of the delay_summary_hourly and critical_delays tables, with green checks indicating successful expectation checks.
  • No records should be dropped or quarantined if the Silver data adheres to the expectations. If any Silver data violates the Gold layer expectations, those records will be dropped or logged to a quarantine table (depending on the expectation configuration).

Debugging Tips:

  • DLT UI Logs: The DLT UI provides detailed logs for each step of the pipeline. Check for errors, warnings, and expectation violation reports.
  • Source Data Integrity: If Gold tables are empty or incorrect, first verify the data in your Silver layer (supply_chain_silver.enriched_events).
  • Schema Mismatches: Ensure that column names and data types expected by the Gold pipeline (e.g., event_timestamp, actual_delivery_time) match those in the Silver table.
  • Filter/Aggregation Logic: Double-check your filter conditions and groupBy/agg functions for correctness.
  • Spark UI: For performance issues, use the Spark UI accessible via the DLT pipeline details page to inspect job execution, stages, and tasks.

Production Considerations

Transitioning Gold layer DLT pipelines to production requires careful attention to performance, reliability, security, and maintainability.

Error Handling

Databricks DLT provides robust error handling capabilities:

  • DLT Expectations: As demonstrated, dlt.expect_or_drop (or expect_or_fail, expect_or_warn) are critical for data quality. For production, define a comprehensive set of expectations based on business rules for each Gold table. Consider using expect_or_fail for critical data integrity rules (e.g., event_id IS NOT NULL) and expect_or_drop for less critical issues that shouldn’t halt the pipeline but should be logged.
  • DLT Metrics: DLT automatically captures metrics on data quality, including successful records, dropped records, and violated expectations. Monitor these metrics via the DLT UI or by querying the system.workflow.events table (or similar audit tables in Unity Catalog) for automated alerts.
  • Dead Letter Queue (DLQ): While DLT handles dropped records, for complex scenarios or sensitive data, you might want to explicitly write records failing expectations to a separate “quarantine” Delta table for manual review and reprocessing. This can be achieved by adding an output path for invalid records in your expectations.
  • Retry Mechanisms: DLT pipelines inherently have retry mechanisms for transient failures. Ensure your source systems (e.g., Kafka) retain data for a sufficient period to allow for retries.

Performance Optimization

Optimizing Gold layer performance is crucial for real-time analytics:

  • Photon Engine: Ensure your DLT pipeline is configured to use the Photon engine. Photon significantly accelerates Spark workloads, especially for SQL and Delta Lake operations, which are heavily used in aggregation layers.
  • Enhanced Autoscaling: Utilize DLT’s Enhanced Autoscaling. It intelligently scales clusters based on workload demand, ensuring optimal resource utilization and cost efficiency while maintaining performance.
  • Delta Lake Optimizations:
    • delta.targetFileSize: As set in table_properties, this helps prevent the “small file problem” and ensures optimal file sizes for reads.
    • OPTIMIZE and VACUUM: DLT automatically runs OPTIMIZE for tables with APPLY CHANGES INTO and targetFileSize set. For other tables, consider scheduling OPTIMIZE and VACUUM as separate Databricks Jobs on your Gold tables periodically (e.g., daily) to further compact files and remove stale data.
    • Z-ordering: If your Gold tables are frequently filtered by specific high-cardinality columns (e.g., product_id, supplier_id), consider Z-ordering these columns during OPTIMIZE operations to co-locate related data, drastically improving query performance. This can be done via ALTER TABLE ... ZORDER BY (...) or as part of a scheduled OPTIMIZE job.
  • Partitioning: For very large Gold tables, consider partitioning by a low-cardinality, frequently queried column (e.g., event_date). However, DLT’s internal optimizations often reduce the need for manual partitioning.
  • Materialized Views (Optional): For highly complex queries or those requiring very low latency, consider creating Delta Live Tables as materialized views (as_view=True) on top of your Gold tables. These can be incrementally updated by DLT.

Security Considerations

Security at the Gold layer is paramount as it often exposes business-critical data:

  • Unity Catalog Integration: Leverage Databricks Unity Catalog for fine-grained access control.
    • Schema/Table Permissions: Grant SELECT permissions to BI tools and analytical users on supply_chain_gold schema and its tables. Restrict MODIFY, CREATE, DELETE to data engineers responsible for the DLT pipeline.
    • Column-Level Security: For sensitive columns (e.g., if any PII were to make it to Gold, though ideally filtered earlier), use Unity Catalog’s column-level access control.
    • Row-Level Security: Implement row-level security using SQL functions or views in Unity Catalog if different user groups should only see specific subsets of data (e.g., a regional manager only sees data for their region).
  • Service Principals/Managed Identities: DLT pipelines should run under a service principal or managed identity with the least necessary privileges to access source Silver tables and write to target Gold tables. Avoid using personal access tokens for production pipelines.
  • Data Encryption: Delta Lake tables are stored in cloud object storage (e.g., S3, ADLS Gen2, GCS). Ensure this storage is encrypted at rest (SSE-S3, SSE-KMS, Azure Storage Encryption, Google Cloud Storage Encryption) and in transit (TLS/SSL). Databricks handles this by default for managed tables.
  • Audit Logging: All access to Gold tables via Unity Catalog is logged, providing a comprehensive audit trail for compliance and security monitoring.

Logging and Monitoring

Effective logging and monitoring are crucial for operational excellence:

  • DLT UI: The DLT UI provides a real-time view of pipeline status, data flow, and health metrics.
  • Databricks Monitoring Tools: Utilize Databricks’ built-in monitoring capabilities, including event logs and metrics, to track pipeline health, data volumes, and processing latency.
  • Custom Logging: For specific debugging or operational insights within your Python DLT code, use Python’s logging module. DLT integrates these logs into the pipeline’s execution logs.
  • Alerting: Set up alerts based on DLT pipeline failures, high numbers of dropped records (expectation violations), or significant deviations in processing latency. Integrate with tools like PagerDuty, Slack, or email.
  • Databricks SQL Dashboards: Build Databricks SQL Dashboards on top of Gold tables and DLT system tables (system.workflow.events) to visualize data quality, pipeline performance, and business metrics.

Code Review Checkpoint

At this point, we have successfully implemented the Gold layer DLT pipeline for real-time supply chain delay analytics.

Summary of what was built:

  • delay_summary_hourly table: An hourly aggregated view of supply chain performance, including total shipments, delayed shipments, average/min/max delay times, and on-time delivery rates.
  • critical_delays table: A granular log of individual supply chain events that exceed a predefined critical delay threshold, providing details for immediate investigation.
  • DLT Pipeline: A robust Databricks Delta Live Tables pipeline that incrementally processes data from the Silver layer, applies aggregations and filters, and writes to the Gold layer.
  • Data Quality: Implemented DLT Expectations (expect_or_drop) to ensure data quality at the Gold layer.
  • Production Best Practices: Incorporated table_properties for Delta Lake optimization and noted considerations for security, performance, and monitoring.

Files created/modified:

  • dlt_pipelines/gold_pipeline.py: New file containing the DLT definitions for delay_summary_hourly and critical_delays.

How it integrates with existing code:

The gold_pipeline.py seamlessly integrates by reading directly from the supply_chain_silver.enriched_events table, which is the output of our Silver layer DLT pipeline (defined in silver_pipeline.py). This establishes a clear dependency and data flow in our Medallion Lakehouse architecture. The Gold tables are now ready for consumption by BI tools or further analytical models.

Common Issues & Solutions

Here are a few common issues you might encounter when working with DLT Gold layer pipelines and their solutions:

  1. Issue: supply_chain_gold database not found or permissions error.

    • Description: When running the DLT pipeline, you might see errors indicating that the target database supply_chain_gold does not exist or the pipeline’s service principal lacks permissions to create/write to it.
    • Debugging: Check the DLT pipeline settings. Ensure the “Target Schema” field is correctly set to supply_chain_gold. If using Unity Catalog, verify that the service principal or user running the DLT pipeline has CREATE SCHEMA, USE SCHEMA, and CREATE TABLE permissions on the Unity Catalog metastore and the target schema.
    • Solution:
      • For Unity Catalog: Grant the necessary permissions:
        GRANT CREATE SCHEMA ON CATALOG <your_catalog_name> TO `<service_principal_id>`;
        GRANT USE SCHEMA ON SCHEMA <your_catalog_name>.supply_chain_gold TO `<service_principal_id>`;
        GRANT CREATE TABLE ON SCHEMA <your_catalog_name>.supply_chain_gold TO `<service_principal_id>`;
        
      • For Hive Metastore (legacy): Ensure the database exists or is created automatically by DLT (which it usually does if permissions are sufficient).
  2. Issue: Data quality expectation failures leading to dropped records.

    • Description: You notice that records are being dropped from your Gold tables, and the DLT UI reports expectation violations (e.g., valid_product_id failed).
    • Debugging:
      • Examine the DLT UI’s “Data Quality” tab for the affected table. It will show the number of records violating each expectation.
      • Query the DLT event log or the violation table (e.g., supply_chain_gold.delay_summary_hourly_violations) to inspect the dropped records and understand why they failed.
      • Inspect the source supply_chain_silver.enriched_events table for the problematic data.
    • Solution:
      • Root Cause Analysis: Determine if the issue is with the upstream Silver data (e.g., a bug in the Silver pipeline introduced nulls) or if the Gold layer’s expectation is too strict or incorrect.
      • Fix Upstream: If the Silver data is truly bad, fix the Silver pipeline to prevent bad data from reaching Gold.
      • Adjust Expectation: If the expectation in Gold is too strict for the incoming data, you might need to adjust it (e.g., expect_or_warn instead of expect_or_drop if the issue is minor and shouldn’t block data flow).
      • Reprocess: After fixing, re-run the DLT pipeline (either a full refresh or allow continuous mode to catch up).
  3. Issue: Gold layer queries are slow, despite DLT optimizations.

    • Description: Even with Photon and targetFileSize set, analytical queries on the Gold tables (e.g., from a BI tool) are performing poorly.
    • Debugging:
      • Spark UI: Analyze the query plan in the Spark UI for the slow queries. Look for data skew, full table scans, or inefficient joins.
      • Data Volume: Check the size of your Gold tables. Are they extremely large?
      • Query Patterns: Understand how users are querying the data. What columns are used in WHERE clauses or JOIN conditions?
    • Solution:
      • Z-ordering: If queries frequently filter on high-cardinality columns (e.g., product_id, supplier_id), apply Z-ordering to these columns. This must be done via an OPTIMIZE command, which you can schedule as a separate Databricks Job.
        OPTIMIZE supply_chain_gold.delay_summary_hourly ZORDER BY (product_id, supplier_id);
        
      • Further Partitioning: For extremely large tables and specific query patterns, consider adding a partition column (e.g., event_date) to the Gold tables if not already present. This requires recreating the table or altering the DLT pipeline.
      • Materialized Views: If specific complex aggregations are performed repeatedly, create DLT-managed materialized views (as_view=True) that pre-compute these aggregations.
      • Cluster Sizing: Ensure the cluster running the BI queries (if using Databricks SQL Warehouses) is appropriately sized.
      • Data Skew: If Spark UI shows data skew, investigate the keys used in aggregations and joins. You might need to salt keys or use broadcast joins if appropriate.

Testing & Verification

After implementing and deploying the Gold layer DLT pipeline, thorough testing and verification are essential to ensure data accuracy and reliability.

How to test the chapter’s work:

  1. Run the entire DLT pipeline: Start your DLT pipeline in continuous mode (or trigger a full refresh if preferred). Monitor the DLT UI to ensure all Bronze, Silver, and Gold tables are processing data correctly and without errors.

  2. Generate Sample Data (if needed): If your Kafka producer isn’t actively sending data, use your simulated data generator from previous chapters to push new events into Kafka. Observe these events flowing through Bronze, Silver, and finally populating the Gold tables.

  3. Data Validation Queries: Execute a series of SQL queries against the Gold tables to validate the aggregated results.

    • Verify delay_summary_hourly aggregations:

      -- Check total shipments and on-time rate for a specific product/supplier
      SELECT
          event_date_hour,
          product_id,
          supplier_id,
          total_shipments,
          total_delayed_shipments,
          on_time_delivery_rate,
          avg_delay_minutes
      FROM supply_chain_gold.delay_summary_hourly
      WHERE product_id = 'P001' AND supplier_id = 'S001'
      ORDER BY event_date_hour DESC
      LIMIT 10;
      
      -- Verify counts against Silver layer for a specific hour (manual check)
      -- This requires knowing the exact events that flowed into Silver for a given hour.
      -- For example, if you know 100 events for P001/S001 happened in '2025-12-20 10:00:00'
      -- in Silver, then total_shipments in Gold for that hour should be 100.
      
    • Verify critical_delays entries:

      -- Check if critical delays are being captured based on the threshold
      SELECT *
      FROM supply_chain_gold.critical_delays
      WHERE delay_minutes >= 60 -- Should match CRITICAL_DELAY_THRESHOLD_MINUTES
      ORDER BY event_timestamp DESC
      LIMIT 10;
      
      -- Cross-reference a critical delay event with the Silver layer
      SELECT *
      FROM supply_chain_silver.enriched_events
      WHERE event_id IN (SELECT event_id FROM supply_chain_gold.critical_delays LIMIT 1);
      -- Verify that the delay_minutes calculated in Silver matches the one in Gold
      
    • Check Data Quality Expectations:

      -- Inspect any records dropped due to expectations (if any)
      SELECT *
      FROM supply_chain_gold.delay_summary_hourly_violations;
      SELECT *
      FROM supply_chain_gold.critical_delays_violations;
      

What should work now:

  • Real-time aggregation of supply chain events into hourly summaries of delay metrics.
  • Identification and logging of individual critical delay incidents.
  • Data quality checks enforcing integrity for Gold layer tables.
  • The DLT pipeline should be running continuously, processing new Silver data and updating Gold tables with minimal latency.
  • The Gold tables are ready for direct consumption by BI tools (e.g., Databricks SQL Dashboards, Power BI, Tableau) to visualize key performance indicators (KPIs) and alert on critical events.

How to verify everything is correct:

  • DLT UI Health: Ensure the DLT pipeline graph shows green checkmarks for all tables and updates are occurring regularly. Check the “Data Quality” tab for any unexpected dropped records.
  • Query Results: The SQL queries above should return meaningful, accurate data that aligns with your understanding of the Silver layer input.
  • Business Logic Validation: Share sample Gold layer data with a business analyst or domain expert to confirm that the derived metrics (e.g., avg_delay_minutes, on_time_delivery_rate) make sense and are calculated according to business rules.

Summary & Next Steps

Congratulations! In this chapter, we successfully built the Gold layer of our real-time supply chain analytics solution using Databricks Delta Live Tables. We developed robust DLT pipelines to aggregate cleaned and enriched data from the Silver layer into actionable, production-ready Gold tables: delay_summary_hourly for performance trends and critical_delays for incident management. We emphasized production considerations such as DLT expectations for data quality, performance optimizations using Photon and Delta Lake features, and comprehensive security and monitoring practices with Unity Catalog.

This Gold layer is a significant milestone, as it transforms raw event data into valuable business intelligence, enabling real-time decision-making and proactive management of supply chain operations. The data is now optimized for consumption by dashboards, reports, and downstream analytical applications.

In the next chapter, we will shift our focus to another critical aspect of supply chain management: HS Code-based Import-Export Tariff Impact Analysis with Historical Trend Processing in Databricks. This will involve integrating external tariff data, processing it to understand its impact on procurement costs, and building historical trend analysis capabilities, further enriching our supply chain intelligence.