Chapter 4: Refining Supply Chain Events for Delay Analytics (Silver Layer)

Chapter Introduction

Welcome to Chapter 4! In this chapter, we will elevate the raw supply chain event data ingested into our Bronze layer to a refined, clean, and structured Silver layer using Databricks Delta Live Tables (DLT). The Bronze layer, which we established in the previous chapter, serves as our landing zone for immutable raw data. Now, our focus shifts to transforming this raw data into a format suitable for downstream analytics, particularly for identifying and analyzing supply chain delays.

The Silver layer is crucial because it acts as a bridge between the messy, inconsistent raw data and the highly curated, business-ready data in the Gold layer. Here, we will apply data quality checks, standardize formats, enrich events with derived attributes, and filter out irrelevant or erroneous records. This transformation ensures that any analytics built upon this data, such as our delay analytics, are based on reliable and consistent information, preventing “garbage in, garbage out” scenarios.

From the previous chapter, we have a DLT pipeline ingesting real-time supply chain events from Kafka into our bronze_supply_chain_events table within Unity Catalog. By the end of this chapter, you will have extended this DLT pipeline to process data from the Bronze layer, apply robust data quality rules, and persist the refined data into a new silver_supply_chain_events table, ready for sophisticated analysis.

Planning & Design

Component Architecture

Our DLT pipeline will now consist of two main stages:

  1. Bronze Layer (Ingestion): Continuously ingesting raw JSON messages from Kafka into bronze_supply_chain_events. (Already completed in Chapter 3).
  2. Silver Layer (Refinement): Reading from bronze_supply_chain_events, performing data quality checks, standardization, and enrichment, then writing to silver_supply_chain_events.

This layered approach, often referred to as the Medallion Architecture (Bronze -> Silver -> Gold), provides clear separation of concerns, improves data governance, and facilitates easier troubleshooting and data reprocessing.

graph TD A["Kafka Topic: supply_chain_events"] --> B["Distributed Ledger Technology (DLT) Pipeline"] B --> C["Bronze Layer: bronze_supply_chain_events"] C --> D["Distributed Ledger Technology (DLT) Pipeline - Silver Stage"] D -- "Data Quality & Enrichment" --> E["Silver Layer: silver_supply_chain_events"] E -- "Ready for Delay Analytics & Gold Layer" --> F["Downstream Analytics / Gold Layer"]

Silver Table Schema Design

The silver_supply_chain_events table will contain cleaned and enriched versions of our supply chain events. We’ll aim for a consistent schema that captures key information for delay analytics.

Here’s the proposed schema for silver_supply_chain_events:

Field NameData TypeDescriptionQuality Expectation
event_idSTRINGUnique identifier for the event.NOT NULL, UNIQUE
shipment_idSTRINGIdentifier for the overall shipment.NOT NULL
tracking_numberSTRINGTracking number for the package/consignment.NOT NULL
event_typeSTRINGStandardized type of event (e.g., ORDER_PLACED, IN_TRANSIT, DELIVERED).NOT NULL, VALID_ENUM
event_timestampTIMESTAMPTimestamp when the event occurred (UTC).NOT NULL, VALID_DATE
location_citySTRINGCity where the event took place.NULLABLE
location_countrySTRINGCountry where the event took place.NULLABLE
carrierSTRINGShipping carrier responsible for the event.NULLABLE
status_codeSTRINGCarrier-specific status code.NULLABLE
status_descriptionSTRINGHuman-readable description of the status.NULLABLE
estimated_delivery_dateTIMESTAMPEstimated delivery date (if available).NULLABLE, VALID_DATE
actual_delivery_dateTIMESTAMPActual delivery date (if event type is DELIVERED).NULLABLE, VALID_DATE
processing_timestampTIMESTAMPTimestamp when the event was processed into the Silver layer (DLT).NOT NULL
source_fileSTRINGSource file/path in Bronze (for traceability).NOT NULL
_bronze_record_hashSTRINGHash of the original Bronze record for deduplication/auditing.NOT NULL

File Structure

We will continue to use the same DLT pipeline definition notebook for simplicity, adding the Silver layer transformations to it.

databricks_workspace/
└── supply_chain_project/
    └── dlt_pipelines/
        └── supply_chain_events_pipeline.py  <-- We will modify this file

Step-by-Step Implementation

We will extend our existing DLT Python notebook to include the Silver layer processing.

a) Setup/Configuration

Open your supply_chain_events_pipeline.py notebook in your Databricks workspace.

File Path: databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py

Ensure you have the following imports at the top (if not already present):

# databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py

import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType
import hashlib

# Define Unity Catalog schema and volume paths from DLT pipeline configuration
# These should be passed as DLT pipeline configurations
# Example: {"spark.dlt.pipeline.applyChanges.cdcTargetSchema": "supply_chain_uc_schema"}
# For this tutorial, we'll assume the schema is configured in the DLT pipeline settings.
# For example, in the DLT UI, under "Target Schema", you might set "supply_chain_uc_schema".
# The actual table names will be `supply_chain_uc_schema.bronze_supply_chain_events`
# and `supply_chain_uc_schema.silver_supply_chain_events`.

b) Core Implementation

We will add a new DLT table definition for our Silver layer. This table will read from the bronze_supply_chain_events table, apply transformations, and enforce data quality.

Step 1: Define the Silver Layer Table with Initial Transformations

We’ll start by reading from the Bronze table and performing some basic parsing and selection of fields. We’ll also add a processing_timestamp and a hash of the raw content for traceability.

# databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py

# ... (existing Bronze layer code from Chapter 3) ...

@dlt.table(
    name="silver_supply_chain_events",
    comment="Cleaned and enriched supply chain events, ready for delay analytics.",
    table_properties={"quality": "silver", "pipeline.type": "streaming"},
    path="/mnt/supply_chain_project/silver/supply_chain_events" # Optional: specify storage path
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("valid_shipment_id", "shipment_id IS NOT NULL")
@dlt.expect_or_drop("valid_tracking_number", "tracking_number IS NOT NULL")
@dlt.expect_or_drop("valid_event_type", "event_type IS NOT NULL AND event_type != ''")
@dlt.expect_or_drop("valid_event_timestamp", "event_timestamp IS NOT NULL AND event_timestamp_str IS NOT NULL")
def silver_supply_chain_events():
    """
    Reads raw supply chain events from the Bronze layer, applies data quality checks,
    standardizes event types, and enriches the data for the Silver layer.
    """
    bronze_df = dlt.read_stream("bronze_supply_chain_events")

    # Parse the 'value' column (which contains the raw JSON string)
    # We need to explicitly cast to a struct to access fields
    event_schema = StructType([
        StructField("event_id", StringType(), True),
        StructField("shipment_id", StringType(), True),
        StructField("tracking_number", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("event_timestamp", StringType(), True), # Keep as string for initial parsing
        StructField("location", StructType([
            StructField("city", StringType(), True),
            StructField("country", StringType(), True)
        ]), True),
        StructField("carrier", StringType(), True),
        StructField("status_code", StringType(), True),
        StructField("status_description", StringType(), True),
        StructField("estimated_delivery_date", StringType(), True),
        StructField("actual_delivery_date", StringType(), True)
    ])

    parsed_df = bronze_df.withColumn("parsed_value", F.from_json(F.col("value"), event_schema))

    # Select and rename fields, handling potential nulls from parsing
    # Use F.expr for SQL-like expressions in PySpark for cleaner code
    silver_df = parsed_df.select(
        F.col("parsed_value.event_id").alias("event_id"),
        F.col("parsed_value.shipment_id").alias("shipment_id"),
        F.col("parsed_value.tracking_number").alias("tracking_number"),
        F.upper(F.trim(F.col("parsed_value.event_type"))).alias("event_type_raw"), # Keep raw for standardization
        F.col("parsed_value.event_timestamp").alias("event_timestamp_str"), # Keep as string for validation
        F.col("parsed_value.location.city").alias("location_city"),
        F.col("parsed_value.location.country").alias("location_country"),
        F.col("parsed_value.carrier").alias("carrier"),
        F.col("parsed_value.status_code").alias("status_code"),
        F.col("parsed_value.status_description").alias("status_description"),
        F.col("parsed_value.estimated_delivery_date").alias("estimated_delivery_date_str"),
        F.col("parsed_value.actual_delivery_date").alias("actual_delivery_date_str"),
        F.current_timestamp().alias("processing_timestamp"),
        F.input_file_name().alias("source_file"),
        F.sha2(F.col("value"), 256).alias("_bronze_record_hash") # Hash of the raw JSON for audit
    )

    # Return this initial DataFrame. We'll add more transformations in the next step.
    return silver_df

Explanation:

  • @dlt.table(name="silver_supply_chain_events", ...): This decorator defines a new DLT table. We assign a descriptive name, comment, and properties. The path argument is optional but good practice for specifying where Delta files are stored (in our Unity Catalog managed external location).
  • @dlt.expect_or_drop(...): These are DLT’s built-in data quality expectations. If a record fails an expectation, it will be dropped from the Silver table. This is a critical production best practice for ensuring data quality. We define expectations for event_id, shipment_id, tracking_number, event_type, and event_timestamp.
  • dlt.read_stream("bronze_supply_chain_events"): Reads data continuously from our Bronze table. DLT automatically handles state management and exactly-once processing.
  • F.from_json(F.col("value"), event_schema): Parses the raw JSON string from the value column into a structured format based on our defined event_schema.
  • select(...): Extracts fields from the parsed JSON. We use alias to rename them to our desired Silver schema names.
  • F.upper(F.trim(F.col("parsed_value.event_type"))): Basic cleansing for event_type by trimming whitespace and converting to uppercase for standardization.
  • F.current_timestamp().alias("processing_timestamp"): Adds a timestamp indicating when the record was processed into the Silver layer.
  • F.input_file_name().alias("source_file"): Captures the source file path from the Bronze layer for auditability.
  • F.sha2(F.col("value"), 256).alias("_bronze_record_hash"): Creates a SHA256 hash of the original raw JSON string. This is useful for detecting duplicate records, auditing, or comparing changes to the raw data over time.

Step 2: Standardize Event Types and Convert Timestamps

Raw data often has inconsistent event type strings. We need to map them to a canonical set of types for consistent analytics. We also need to convert our timestamp strings to actual TIMESTAMP types.

# databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py

# ... (previous Silver layer code) ...

@dlt.table(
    name="silver_supply_chain_events",
    comment="Cleaned and enriched supply chain events, ready for delay analytics.",
    table_properties={"quality": "silver", "pipeline.type": "streaming"},
    path="/mnt/supply_chain_project/silver/supply_chain_events"
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("valid_shipment_id", "shipment_id IS NOT NULL")
@dlt.expect_or_drop("valid_tracking_number", "tracking_number IS NOT NULL")
@dlt.expect_or_drop("valid_event_type", "event_type IS NOT NULL AND event_type != ''")
@dlt.expect_or_drop("valid_event_timestamp", "event_timestamp IS NOT NULL")
@dlt.expect_or_drop("future_event_timestamp_check", "event_timestamp <= processing_timestamp") # Event should not be in the future
def silver_supply_chain_events():
    """
    Reads raw supply chain events from the Bronze layer, applies data quality checks,
    standardizes event types, and enriches the data for the Silver layer.
    """
    bronze_df = dlt.read_stream("bronze_supply_chain_events")

    event_schema = StructType([
        StructField("event_id", StringType(), True),
        StructField("shipment_id", StringType(), True),
        StructField("tracking_number", StringType(), True),
        StructField("event_type", StringType(), True),
        StructField("event_timestamp", StringType(), True),
        StructField("location", StructType([
            StructField("city", StringType(), True),
            StructField("country", StringType(), True)
        ]), True),
        StructField("carrier", StringType(), True),
        StructField("status_code", StringType(), True),
        StructField("status_description", StringType(), True),
        StructField("estimated_delivery_date", StringType(), True),
        StructField("actual_delivery_date", StringType(), True)
    ])

    parsed_df = bronze_df.withColumn("parsed_value", F.from_json(F.col("value"), event_schema))

    silver_df = parsed_df.select(
        F.col("parsed_value.event_id").alias("event_id"),
        F.col("parsed_value.shipment_id").alias("shipment_id"),
        F.col("parsed_value.tracking_number").alias("tracking_number"),
        F.upper(F.trim(F.col("parsed_value.event_type"))).alias("event_type_raw"),
        F.col("parsed_value.event_timestamp").alias("event_timestamp_str"),
        F.col("parsed_value.location.city").alias("location_city"),
        F.col("parsed_value.location.country").alias("location_country"),
        F.col("parsed_value.carrier").alias("carrier"),
        F.col("parsed_value.status_code").alias("status_code"),
        F.col("parsed_value.status_description").alias("status_description"),
        F.col("parsed_value.estimated_delivery_date").alias("estimated_delivery_date_str"),
        F.col("parsed_value.actual_delivery_date").alias("actual_delivery_date_str"),
        F.current_timestamp().alias("processing_timestamp"),
        F.input_file_name().alias("source_file"),
        F.sha2(F.col("value"), 256).alias("_bronze_record_hash")
    ).withColumn(
        "event_timestamp",
        F.to_timestamp(F.col("event_timestamp_str"), "yyyy-MM-dd HH:mm:ss")
    ).withColumn(
        "estimated_delivery_date",
        F.to_timestamp(F.col("estimated_delivery_date_str"), "yyyy-MM-dd")
    ).withColumn(
        "actual_delivery_date",
        F.to_timestamp(F.col("actual_delivery_date_str"), "yyyy-MM-dd")
    ).withColumn(
        "event_type",
        F.when(F.col("event_type_raw").isin(["ORDER_PLACED", "ORDERED"]), "ORDER_PLACED")
        .when(F.col("event_type_raw").isin(["SHIPPED", "IN_TRANSIT", "DEPARTED", "ARRIVED_AT_HUB"]), "IN_TRANSIT")
        .when(F.col("event_type_raw").isin(["DELIVERED", "COMPLETED"]), "DELIVERED")
        .when(F.col("event_type_raw").isin(["CANCELED", "VOIDED"]), "CANCELED")
        .otherwise("OTHER")
    ).drop("event_timestamp_str", "event_type_raw", "estimated_delivery_date_str", "actual_delivery_date_str") # Drop intermediate columns

    return silver_df

Explanation:

  • Timestamp Conversion: We use F.to_timestamp to convert the string representations of event_timestamp, estimated_delivery_date, and actual_delivery_date into proper TIMESTAMP data types. This is crucial for time-series analysis and calculations.
  • Event Type Standardization: F.when().otherwise() is used to map various raw event_type_raw values to a standardized set of event_type values (e.g., ORDER_PLACED, IN_TRANSIT, DELIVERED, CANCELED, OTHER). This ensures consistency for downstream analytics.
  • New Expectation: Added future_event_timestamp_check to ensure event_timestamp is not in the future relative to processing_timestamp, which can indicate data quality issues or system clock discrepancies.
  • Dropping Intermediate Columns: drop("event_timestamp_str", "event_type_raw", ...) removes the temporary columns used for parsing and standardization, keeping the Silver schema clean.

This completes the DLT notebook for the Silver layer.

c) Testing This Component

To test the Silver layer, you need to run your DLT pipeline in Databricks.

  1. Navigate to your DLT pipeline: In your Databricks workspace, go to “Workflows” -> “Delta Live Tables”.

  2. Select your pipeline: Choose the DLT pipeline you configured in Chapter 3 (e.g., supply_chain_events_pipeline).

  3. Start/Update the pipeline: Click the “Start” or “Update” button.

    • If this is the first time running after adding the Silver table, DLT will detect the new table.
    • If the pipeline is already running, an “Update” will deploy the new logic.
  4. Monitor the pipeline: Observe the DLT UI. You should see the silver_supply_chain_events table being created and populated after bronze_supply_chain_events is processed.

  5. Verify Data in Silver Table: Once the pipeline runs successfully and data flows, you can query the silver_supply_chain_events table using a SQL notebook or Databricks SQL endpoint.

    Example SQL Query (in a new Databricks SQL/Notebook):

    SELECT * FROM supply_chain_uc_schema.silver_supply_chain_events LIMIT 100;
    

    (Replace supply_chain_uc_schema with your actual Unity Catalog schema name).

    You should see records with cleaned event types, proper timestamps, and the new processing_timestamp and _bronze_record_hash columns. Check if any records were dropped due to the dlt.expect_or_drop rules by inspecting the DLT pipeline’s event log or the DLT UI’s data quality tab.

Production Considerations

Error Handling for this Feature

  • DLT Expectations (dlt.expect_or_drop/expect_or_fail): These are the primary mechanisms for data quality enforcement.
    • expect_or_drop: Records failing the expectation are dropped and logged in the DLT event log. This is suitable for non-critical errors where dropping the record is acceptable.
    • expect_or_fail: If records fail, the entire pipeline update fails. Use this for critical data quality issues that must be resolved before data can proceed.
  • Error Tables: DLT automatically creates an _errors view for each table where expectations are defined, allowing you to inspect records that failed quality checks. You can query these to understand data quality issues.
    • Example: SELECT * FROM supply_chain_uc_schema.silver_supply_chain_events_errors
  • Schema Evolution: DLT handles schema evolution for Delta tables. If new fields appear in the Bronze layer, they won’t automatically break the Silver layer, but you’ll need to update your event_schema and select statements to include them.

Performance Optimization

  • Cluster Sizing: Ensure your DLT cluster is appropriately sized for the expected data volume and velocity. DLT’s Enhanced Autoscaling helps, but monitor cluster utilization.
  • APPLY CHANGES INTO (Future consideration for SCD): While not used in this specific Silver layer (which is append-only), for more complex Silver transformations involving Slowly Changing Dimensions (SCD) or upserts, APPLY CHANGES INTO can optimize performance by efficiently merging changes.
  • OPTIMIZE and VACUUM: DLT automatically manages OPTIMIZE for small files and VACUUM for data retention. Ensure appropriate retention policies are set for your Delta tables.
  • Predicate Pushdown: DLT leverages Spark’s optimizations, including predicate pushdown, when reading from Delta tables. Filtering data early can significantly improve performance.

Security Considerations

  • Unity Catalog Permissions: The most critical security aspect.
    • Ensure the service principal or user running the DLT pipeline has SELECT privileges on the Bronze table and CREATE TABLE, SELECT, MODIFY privileges on the target Unity Catalog schema and the Silver table.
    • Downstream users/applications consuming silver_supply_chain_events should only have SELECT privileges.
  • Data Masking/Row-Level Security: If your supply chain events contain sensitive information (e.g., specific customer details, precise locations), consider implementing data masking or row-level security (RLS) policies in Unity Catalog at the Silver or Gold layer. This would typically involve Unity Catalog functions or external tools. For this chapter, we assume no PII requiring masking, but it’s a critical production consideration.
  • Network Security: Ensure your Databricks workspace and DLT pipeline have network access only to necessary resources (e.g., Kafka, object storage).

Logging and Monitoring

  • DLT Event Log: DLT provides a comprehensive event log (accessible via the DLT UI or programmatic API) that captures pipeline events, data quality metrics, and errors. This is invaluable for monitoring and debugging.
  • Databricks Monitoring: Integrate DLT pipeline logs and metrics with your organization’s monitoring solutions (e.g., Azure Monitor, AWS CloudWatch, Splunk).
  • Custom Logging: For complex transformations or UDFs, you can add standard PySpark logging (spark.sparkContext.setLogLevel("INFO") and then spark.sparkContext.logInfo(...)) to gain deeper insights, though DLT’s event log covers most needs.

Code Review Checkpoint

At this point, you have successfully:

  • Extended your DLT pipeline (supply_chain_events_pipeline.py) to include a Silver layer.
  • Defined the silver_supply_chain_events table, reading from bronze_supply_chain_events.
  • Implemented robust data quality checks using dlt.expect_or_drop for critical fields.
  • Standardized event_type values for consistency.
  • Converted string timestamps to proper TIMESTAMP types.
  • Enriched records with processing_timestamp and _bronze_record_hash for auditability.
  • Configured the DLT pipeline to run and produce data into the Silver layer.

The modified file is:

  • databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py

This Silver layer now provides a clean, reliable foundation for building our delay analytics.

Common Issues & Solutions

  1. Issue: DLT Pipeline Fails or silver_supply_chain_events table is not created.

    • Reason: Syntax errors in the Python notebook, incorrect Unity Catalog permissions, or upstream Bronze table issues.
    • Debugging:
      • Check the DLT pipeline UI for detailed error messages. Click on the failed table or pipeline for logs.
      • Verify your Python syntax in the notebook.
      • Ensure the DLT cluster has SELECT permission on bronze_supply_chain_events and CREATE TABLE/MODIFY on the target schema for silver_supply_chain_events.
      • Confirm the bronze_supply_chain_events table is populating correctly from Kafka.
    • Prevention: Use Databricks Repos for version control and integrate with CI/CD for linting and unit testing DLT code before deployment.
  2. Issue: Records are being dropped from the Silver table unexpectedly.

    • Reason: Data quality expectations (dlt.expect_or_drop) are too strict or the incoming Bronze data has more quality issues than anticipated.
    • Debugging:
      • Query the DLT-generated error table: SELECT * FROM supply_chain_uc_schema.silver_supply_chain_events_errors to see which records failed and why.
      • Inspect the DLT pipeline’s event log for data quality metrics and dropped record counts.
      • Analyze the raw data in the Bronze layer to understand the nature of the data quality problems.
    • Prevention: Start with expect_or_drop for initial data quality to avoid pipeline failures. Gradually introduce expect_or_fail for truly critical data points once data quality is understood. Regularly monitor data quality metrics in DLT.
  3. Issue: Timestamps or event_type are incorrect after processing.

    • Reason: Incorrect date/time format string in F.to_timestamp or flawed logic in the F.when().otherwise() standardization.
    • Debugging:
      • Add temporary F.withColumn statements in your DLT code to output intermediate columns (e.g., event_timestamp_str, event_type_raw) and inspect their values before final conversion/standardization.
      • Verify the exact format of the incoming timestamp strings in the Bronze layer.
    • Prevention: Thoroughly test transformation logic with representative sample data. Use clear and descriptive column names during intermediate steps.

Testing & Verification

After successfully running the DLT pipeline, perform the following verification steps:

  1. Verify Silver Table Existence and Schema:

    • In a Databricks SQL editor or notebook, run:
      DESCRIBE TABLE supply_chain_uc_schema.silver_supply_chain_events;
      
    • Confirm that the schema matches our design, including event_id, shipment_id, event_type (standardized), event_timestamp (TIMESTAMP type), processing_timestamp, and _bronze_record_hash.
  2. Verify Data Quality and Transformations:

    • Query the silver_supply_chain_events table:
      SELECT
          event_id,
          shipment_id,
          tracking_number,
          event_type,
          event_timestamp,
          location_city,
          location_country,
          carrier,
          status_code,
          status_description,
          estimated_delivery_date,
          actual_delivery_date,
          processing_timestamp,
          _bronze_record_hash
      FROM supply_chain_uc_schema.silver_supply_chain_events
      WHERE event_type = 'IN_TRANSIT' -- Example filter
      LIMIT 50;
      
    • Check for:
      • Non-null values in event_id, shipment_id, tracking_number, event_type, event_timestamp.
      • Standardized event_type values (e.g., no “SHIPPED”, only “IN_TRANSIT”).
      • Correct TIMESTAMP format for all date/time fields.
      • processing_timestamp being a recent timestamp.
    • Query the error table to understand any dropped records:
      SELECT * FROM supply_chain_uc_schema.silver_supply_chain_events_errors LIMIT 50;
      
      This will show you any records that failed the DLT expectations.
  3. Verify Continuous Ingestion:

    • If your Kafka producer is still sending messages, observe the silver_supply_chain_events table growing over time. Run SELECT COUNT(*) FROM supply_chain_uc_schema.silver_supply_chain_events; multiple times over a few minutes to see the count increase.

By performing these checks, you can confidently verify that your Silver layer is correctly processing and refining the supply chain event data.

Summary & Next Steps

In this chapter, we successfully built the Silver layer for our real-time supply chain event data using Databricks Delta Live Tables. We’ve taken raw, potentially inconsistent data from the Bronze layer, applied essential data quality checks, standardized key fields like event_type, and enriched the dataset with crucial metadata like processing_timestamp and a record hash. This silver_supply_chain_events table now provides a clean, reliable, and analytics-ready foundation for all subsequent analyses, particularly for identifying and understanding supply chain delays.

This step is fundamental to building a robust data platform, ensuring that business decisions are based on high-quality, trustworthy data. The DLT framework, with its declarative approach and built-in data quality features, proved invaluable in achieving this.

In the next chapter, we will move to the Gold layer, where we will aggregate and transform this Silver data into a highly optimized, business-specific format for our core delay analytics. This will involve calculating key metrics, potentially joining with other reference data, and creating materialized views that are purpose-built for reporting and advanced analysis. Get ready to turn refined data into actionable insights!