Chapter 4: Refining Supply Chain Events for Delay Analytics (Silver Layer)
Chapter Introduction
Welcome to Chapter 4! In this chapter, we will elevate the raw supply chain event data ingested into our Bronze layer to a refined, clean, and structured Silver layer using Databricks Delta Live Tables (DLT). The Bronze layer, which we established in the previous chapter, serves as our landing zone for immutable raw data. Now, our focus shifts to transforming this raw data into a format suitable for downstream analytics, particularly for identifying and analyzing supply chain delays.
The Silver layer is crucial because it acts as a bridge between the messy, inconsistent raw data and the highly curated, business-ready data in the Gold layer. Here, we will apply data quality checks, standardize formats, enrich events with derived attributes, and filter out irrelevant or erroneous records. This transformation ensures that any analytics built upon this data, such as our delay analytics, are based on reliable and consistent information, preventing “garbage in, garbage out” scenarios.
From the previous chapter, we have a DLT pipeline ingesting real-time supply chain events from Kafka into our bronze_supply_chain_events table within Unity Catalog. By the end of this chapter, you will have extended this DLT pipeline to process data from the Bronze layer, apply robust data quality rules, and persist the refined data into a new silver_supply_chain_events table, ready for sophisticated analysis.
Planning & Design
Component Architecture
Our DLT pipeline will now consist of two main stages:
- Bronze Layer (Ingestion): Continuously ingesting raw JSON messages from Kafka into
bronze_supply_chain_events. (Already completed in Chapter 3). - Silver Layer (Refinement): Reading from
bronze_supply_chain_events, performing data quality checks, standardization, and enrichment, then writing tosilver_supply_chain_events.
This layered approach, often referred to as the Medallion Architecture (Bronze -> Silver -> Gold), provides clear separation of concerns, improves data governance, and facilitates easier troubleshooting and data reprocessing.
Silver Table Schema Design
The silver_supply_chain_events table will contain cleaned and enriched versions of our supply chain events. We’ll aim for a consistent schema that captures key information for delay analytics.
Here’s the proposed schema for silver_supply_chain_events:
| Field Name | Data Type | Description | Quality Expectation |
|---|---|---|---|
event_id | STRING | Unique identifier for the event. | NOT NULL, UNIQUE |
shipment_id | STRING | Identifier for the overall shipment. | NOT NULL |
tracking_number | STRING | Tracking number for the package/consignment. | NOT NULL |
event_type | STRING | Standardized type of event (e.g., ORDER_PLACED, IN_TRANSIT, DELIVERED). | NOT NULL, VALID_ENUM |
event_timestamp | TIMESTAMP | Timestamp when the event occurred (UTC). | NOT NULL, VALID_DATE |
location_city | STRING | City where the event took place. | NULLABLE |
location_country | STRING | Country where the event took place. | NULLABLE |
carrier | STRING | Shipping carrier responsible for the event. | NULLABLE |
status_code | STRING | Carrier-specific status code. | NULLABLE |
status_description | STRING | Human-readable description of the status. | NULLABLE |
estimated_delivery_date | TIMESTAMP | Estimated delivery date (if available). | NULLABLE, VALID_DATE |
actual_delivery_date | TIMESTAMP | Actual delivery date (if event type is DELIVERED). | NULLABLE, VALID_DATE |
processing_timestamp | TIMESTAMP | Timestamp when the event was processed into the Silver layer (DLT). | NOT NULL |
source_file | STRING | Source file/path in Bronze (for traceability). | NOT NULL |
_bronze_record_hash | STRING | Hash of the original Bronze record for deduplication/auditing. | NOT NULL |
File Structure
We will continue to use the same DLT pipeline definition notebook for simplicity, adding the Silver layer transformations to it.
databricks_workspace/
└── supply_chain_project/
└── dlt_pipelines/
└── supply_chain_events_pipeline.py <-- We will modify this file
Step-by-Step Implementation
We will extend our existing DLT Python notebook to include the Silver layer processing.
a) Setup/Configuration
Open your supply_chain_events_pipeline.py notebook in your Databricks workspace.
File Path: databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py
Ensure you have the following imports at the top (if not already present):
# databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType
import hashlib
# Define Unity Catalog schema and volume paths from DLT pipeline configuration
# These should be passed as DLT pipeline configurations
# Example: {"spark.dlt.pipeline.applyChanges.cdcTargetSchema": "supply_chain_uc_schema"}
# For this tutorial, we'll assume the schema is configured in the DLT pipeline settings.
# For example, in the DLT UI, under "Target Schema", you might set "supply_chain_uc_schema".
# The actual table names will be `supply_chain_uc_schema.bronze_supply_chain_events`
# and `supply_chain_uc_schema.silver_supply_chain_events`.
b) Core Implementation
We will add a new DLT table definition for our Silver layer. This table will read from the bronze_supply_chain_events table, apply transformations, and enforce data quality.
Step 1: Define the Silver Layer Table with Initial Transformations
We’ll start by reading from the Bronze table and performing some basic parsing and selection of fields. We’ll also add a processing_timestamp and a hash of the raw content for traceability.
# databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py
# ... (existing Bronze layer code from Chapter 3) ...
@dlt.table(
name="silver_supply_chain_events",
comment="Cleaned and enriched supply chain events, ready for delay analytics.",
table_properties={"quality": "silver", "pipeline.type": "streaming"},
path="/mnt/supply_chain_project/silver/supply_chain_events" # Optional: specify storage path
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("valid_shipment_id", "shipment_id IS NOT NULL")
@dlt.expect_or_drop("valid_tracking_number", "tracking_number IS NOT NULL")
@dlt.expect_or_drop("valid_event_type", "event_type IS NOT NULL AND event_type != ''")
@dlt.expect_or_drop("valid_event_timestamp", "event_timestamp IS NOT NULL AND event_timestamp_str IS NOT NULL")
def silver_supply_chain_events():
"""
Reads raw supply chain events from the Bronze layer, applies data quality checks,
standardizes event types, and enriches the data for the Silver layer.
"""
bronze_df = dlt.read_stream("bronze_supply_chain_events")
# Parse the 'value' column (which contains the raw JSON string)
# We need to explicitly cast to a struct to access fields
event_schema = StructType([
StructField("event_id", StringType(), True),
StructField("shipment_id", StringType(), True),
StructField("tracking_number", StringType(), True),
StructField("event_type", StringType(), True),
StructField("event_timestamp", StringType(), True), # Keep as string for initial parsing
StructField("location", StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True)
]), True),
StructField("carrier", StringType(), True),
StructField("status_code", StringType(), True),
StructField("status_description", StringType(), True),
StructField("estimated_delivery_date", StringType(), True),
StructField("actual_delivery_date", StringType(), True)
])
parsed_df = bronze_df.withColumn("parsed_value", F.from_json(F.col("value"), event_schema))
# Select and rename fields, handling potential nulls from parsing
# Use F.expr for SQL-like expressions in PySpark for cleaner code
silver_df = parsed_df.select(
F.col("parsed_value.event_id").alias("event_id"),
F.col("parsed_value.shipment_id").alias("shipment_id"),
F.col("parsed_value.tracking_number").alias("tracking_number"),
F.upper(F.trim(F.col("parsed_value.event_type"))).alias("event_type_raw"), # Keep raw for standardization
F.col("parsed_value.event_timestamp").alias("event_timestamp_str"), # Keep as string for validation
F.col("parsed_value.location.city").alias("location_city"),
F.col("parsed_value.location.country").alias("location_country"),
F.col("parsed_value.carrier").alias("carrier"),
F.col("parsed_value.status_code").alias("status_code"),
F.col("parsed_value.status_description").alias("status_description"),
F.col("parsed_value.estimated_delivery_date").alias("estimated_delivery_date_str"),
F.col("parsed_value.actual_delivery_date").alias("actual_delivery_date_str"),
F.current_timestamp().alias("processing_timestamp"),
F.input_file_name().alias("source_file"),
F.sha2(F.col("value"), 256).alias("_bronze_record_hash") # Hash of the raw JSON for audit
)
# Return this initial DataFrame. We'll add more transformations in the next step.
return silver_df
Explanation:
@dlt.table(name="silver_supply_chain_events", ...): This decorator defines a new DLT table. We assign a descriptive name, comment, and properties. Thepathargument is optional but good practice for specifying where Delta files are stored (in our Unity Catalog managed external location).@dlt.expect_or_drop(...): These are DLT’s built-in data quality expectations. If a record fails an expectation, it will be dropped from the Silver table. This is a critical production best practice for ensuring data quality. We define expectations forevent_id,shipment_id,tracking_number,event_type, andevent_timestamp.dlt.read_stream("bronze_supply_chain_events"): Reads data continuously from our Bronze table. DLT automatically handles state management and exactly-once processing.F.from_json(F.col("value"), event_schema): Parses the raw JSON string from thevaluecolumn into a structured format based on our definedevent_schema.select(...): Extracts fields from the parsed JSON. We usealiasto rename them to our desired Silver schema names.F.upper(F.trim(F.col("parsed_value.event_type"))): Basic cleansing forevent_typeby trimming whitespace and converting to uppercase for standardization.F.current_timestamp().alias("processing_timestamp"): Adds a timestamp indicating when the record was processed into the Silver layer.F.input_file_name().alias("source_file"): Captures the source file path from the Bronze layer for auditability.F.sha2(F.col("value"), 256).alias("_bronze_record_hash"): Creates a SHA256 hash of the original raw JSON string. This is useful for detecting duplicate records, auditing, or comparing changes to the raw data over time.
Step 2: Standardize Event Types and Convert Timestamps
Raw data often has inconsistent event type strings. We need to map them to a canonical set of types for consistent analytics. We also need to convert our timestamp strings to actual TIMESTAMP types.
# databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py
# ... (previous Silver layer code) ...
@dlt.table(
name="silver_supply_chain_events",
comment="Cleaned and enriched supply chain events, ready for delay analytics.",
table_properties={"quality": "silver", "pipeline.type": "streaming"},
path="/mnt/supply_chain_project/silver/supply_chain_events"
)
@dlt.expect_or_drop("valid_event_id", "event_id IS NOT NULL")
@dlt.expect_or_drop("valid_shipment_id", "shipment_id IS NOT NULL")
@dlt.expect_or_drop("valid_tracking_number", "tracking_number IS NOT NULL")
@dlt.expect_or_drop("valid_event_type", "event_type IS NOT NULL AND event_type != ''")
@dlt.expect_or_drop("valid_event_timestamp", "event_timestamp IS NOT NULL")
@dlt.expect_or_drop("future_event_timestamp_check", "event_timestamp <= processing_timestamp") # Event should not be in the future
def silver_supply_chain_events():
"""
Reads raw supply chain events from the Bronze layer, applies data quality checks,
standardizes event types, and enriches the data for the Silver layer.
"""
bronze_df = dlt.read_stream("bronze_supply_chain_events")
event_schema = StructType([
StructField("event_id", StringType(), True),
StructField("shipment_id", StringType(), True),
StructField("tracking_number", StringType(), True),
StructField("event_type", StringType(), True),
StructField("event_timestamp", StringType(), True),
StructField("location", StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True)
]), True),
StructField("carrier", StringType(), True),
StructField("status_code", StringType(), True),
StructField("status_description", StringType(), True),
StructField("estimated_delivery_date", StringType(), True),
StructField("actual_delivery_date", StringType(), True)
])
parsed_df = bronze_df.withColumn("parsed_value", F.from_json(F.col("value"), event_schema))
silver_df = parsed_df.select(
F.col("parsed_value.event_id").alias("event_id"),
F.col("parsed_value.shipment_id").alias("shipment_id"),
F.col("parsed_value.tracking_number").alias("tracking_number"),
F.upper(F.trim(F.col("parsed_value.event_type"))).alias("event_type_raw"),
F.col("parsed_value.event_timestamp").alias("event_timestamp_str"),
F.col("parsed_value.location.city").alias("location_city"),
F.col("parsed_value.location.country").alias("location_country"),
F.col("parsed_value.carrier").alias("carrier"),
F.col("parsed_value.status_code").alias("status_code"),
F.col("parsed_value.status_description").alias("status_description"),
F.col("parsed_value.estimated_delivery_date").alias("estimated_delivery_date_str"),
F.col("parsed_value.actual_delivery_date").alias("actual_delivery_date_str"),
F.current_timestamp().alias("processing_timestamp"),
F.input_file_name().alias("source_file"),
F.sha2(F.col("value"), 256).alias("_bronze_record_hash")
).withColumn(
"event_timestamp",
F.to_timestamp(F.col("event_timestamp_str"), "yyyy-MM-dd HH:mm:ss")
).withColumn(
"estimated_delivery_date",
F.to_timestamp(F.col("estimated_delivery_date_str"), "yyyy-MM-dd")
).withColumn(
"actual_delivery_date",
F.to_timestamp(F.col("actual_delivery_date_str"), "yyyy-MM-dd")
).withColumn(
"event_type",
F.when(F.col("event_type_raw").isin(["ORDER_PLACED", "ORDERED"]), "ORDER_PLACED")
.when(F.col("event_type_raw").isin(["SHIPPED", "IN_TRANSIT", "DEPARTED", "ARRIVED_AT_HUB"]), "IN_TRANSIT")
.when(F.col("event_type_raw").isin(["DELIVERED", "COMPLETED"]), "DELIVERED")
.when(F.col("event_type_raw").isin(["CANCELED", "VOIDED"]), "CANCELED")
.otherwise("OTHER")
).drop("event_timestamp_str", "event_type_raw", "estimated_delivery_date_str", "actual_delivery_date_str") # Drop intermediate columns
return silver_df
Explanation:
- Timestamp Conversion: We use
F.to_timestampto convert the string representations ofevent_timestamp,estimated_delivery_date, andactual_delivery_dateinto properTIMESTAMPdata types. This is crucial for time-series analysis and calculations. - Event Type Standardization:
F.when().otherwise()is used to map various rawevent_type_rawvalues to a standardized set ofevent_typevalues (e.g.,ORDER_PLACED,IN_TRANSIT,DELIVERED,CANCELED,OTHER). This ensures consistency for downstream analytics. - New Expectation: Added
future_event_timestamp_checkto ensureevent_timestampis not in the future relative toprocessing_timestamp, which can indicate data quality issues or system clock discrepancies. - Dropping Intermediate Columns:
drop("event_timestamp_str", "event_type_raw", ...)removes the temporary columns used for parsing and standardization, keeping the Silver schema clean.
This completes the DLT notebook for the Silver layer.
c) Testing This Component
To test the Silver layer, you need to run your DLT pipeline in Databricks.
Navigate to your DLT pipeline: In your Databricks workspace, go to “Workflows” -> “Delta Live Tables”.
Select your pipeline: Choose the DLT pipeline you configured in Chapter 3 (e.g.,
supply_chain_events_pipeline).Start/Update the pipeline: Click the “Start” or “Update” button.
- If this is the first time running after adding the Silver table, DLT will detect the new table.
- If the pipeline is already running, an “Update” will deploy the new logic.
Monitor the pipeline: Observe the DLT UI. You should see the
silver_supply_chain_eventstable being created and populated afterbronze_supply_chain_eventsis processed.Verify Data in Silver Table: Once the pipeline runs successfully and data flows, you can query the
silver_supply_chain_eventstable using a SQL notebook or Databricks SQL endpoint.Example SQL Query (in a new Databricks SQL/Notebook):
SELECT * FROM supply_chain_uc_schema.silver_supply_chain_events LIMIT 100;(Replace
supply_chain_uc_schemawith your actual Unity Catalog schema name).You should see records with cleaned event types, proper timestamps, and the new
processing_timestampand_bronze_record_hashcolumns. Check if any records were dropped due to thedlt.expect_or_droprules by inspecting the DLT pipeline’s event log or the DLT UI’s data quality tab.
Production Considerations
Error Handling for this Feature
- DLT Expectations (
dlt.expect_or_drop/expect_or_fail): These are the primary mechanisms for data quality enforcement.expect_or_drop: Records failing the expectation are dropped and logged in the DLT event log. This is suitable for non-critical errors where dropping the record is acceptable.expect_or_fail: If records fail, the entire pipeline update fails. Use this for critical data quality issues that must be resolved before data can proceed.
- Error Tables: DLT automatically creates an
_errorsview for each table where expectations are defined, allowing you to inspect records that failed quality checks. You can query these to understand data quality issues.- Example:
SELECT * FROM supply_chain_uc_schema.silver_supply_chain_events_errors
- Example:
- Schema Evolution: DLT handles schema evolution for Delta tables. If new fields appear in the Bronze layer, they won’t automatically break the Silver layer, but you’ll need to update your
event_schemaandselectstatements to include them.
Performance Optimization
- Cluster Sizing: Ensure your DLT cluster is appropriately sized for the expected data volume and velocity. DLT’s Enhanced Autoscaling helps, but monitor cluster utilization.
APPLY CHANGES INTO(Future consideration for SCD): While not used in this specific Silver layer (which is append-only), for more complex Silver transformations involving Slowly Changing Dimensions (SCD) or upserts,APPLY CHANGES INTOcan optimize performance by efficiently merging changes.OPTIMIZEandVACUUM: DLT automatically managesOPTIMIZEfor small files andVACUUMfor data retention. Ensure appropriate retention policies are set for your Delta tables.- Predicate Pushdown: DLT leverages Spark’s optimizations, including predicate pushdown, when reading from Delta tables. Filtering data early can significantly improve performance.
Security Considerations
- Unity Catalog Permissions: The most critical security aspect.
- Ensure the service principal or user running the DLT pipeline has
SELECTprivileges on the Bronze table andCREATE TABLE,SELECT,MODIFYprivileges on the target Unity Catalog schema and the Silver table. - Downstream users/applications consuming
silver_supply_chain_eventsshould only haveSELECTprivileges.
- Ensure the service principal or user running the DLT pipeline has
- Data Masking/Row-Level Security: If your supply chain events contain sensitive information (e.g., specific customer details, precise locations), consider implementing data masking or row-level security (RLS) policies in Unity Catalog at the Silver or Gold layer. This would typically involve Unity Catalog functions or external tools. For this chapter, we assume no PII requiring masking, but it’s a critical production consideration.
- Network Security: Ensure your Databricks workspace and DLT pipeline have network access only to necessary resources (e.g., Kafka, object storage).
Logging and Monitoring
- DLT Event Log: DLT provides a comprehensive event log (accessible via the DLT UI or programmatic API) that captures pipeline events, data quality metrics, and errors. This is invaluable for monitoring and debugging.
- Databricks Monitoring: Integrate DLT pipeline logs and metrics with your organization’s monitoring solutions (e.g., Azure Monitor, AWS CloudWatch, Splunk).
- Custom Logging: For complex transformations or UDFs, you can add standard PySpark logging (
spark.sparkContext.setLogLevel("INFO")and thenspark.sparkContext.logInfo(...)) to gain deeper insights, though DLT’s event log covers most needs.
Code Review Checkpoint
At this point, you have successfully:
- Extended your DLT pipeline (
supply_chain_events_pipeline.py) to include a Silver layer. - Defined the
silver_supply_chain_eventstable, reading frombronze_supply_chain_events. - Implemented robust data quality checks using
dlt.expect_or_dropfor critical fields. - Standardized
event_typevalues for consistency. - Converted string timestamps to proper
TIMESTAMPtypes. - Enriched records with
processing_timestampand_bronze_record_hashfor auditability. - Configured the DLT pipeline to run and produce data into the Silver layer.
The modified file is:
databricks_workspace/supply_chain_project/dlt_pipelines/supply_chain_events_pipeline.py
This Silver layer now provides a clean, reliable foundation for building our delay analytics.
Common Issues & Solutions
Issue: DLT Pipeline Fails or
silver_supply_chain_eventstable is not created.- Reason: Syntax errors in the Python notebook, incorrect Unity Catalog permissions, or upstream Bronze table issues.
- Debugging:
- Check the DLT pipeline UI for detailed error messages. Click on the failed table or pipeline for logs.
- Verify your Python syntax in the notebook.
- Ensure the DLT cluster has
SELECTpermission onbronze_supply_chain_eventsandCREATE TABLE/MODIFYon the target schema forsilver_supply_chain_events. - Confirm the
bronze_supply_chain_eventstable is populating correctly from Kafka.
- Prevention: Use Databricks Repos for version control and integrate with CI/CD for linting and unit testing DLT code before deployment.
Issue: Records are being dropped from the Silver table unexpectedly.
- Reason: Data quality expectations (
dlt.expect_or_drop) are too strict or the incoming Bronze data has more quality issues than anticipated. - Debugging:
- Query the DLT-generated error table:
SELECT * FROM supply_chain_uc_schema.silver_supply_chain_events_errorsto see which records failed and why. - Inspect the DLT pipeline’s event log for data quality metrics and dropped record counts.
- Analyze the raw data in the Bronze layer to understand the nature of the data quality problems.
- Query the DLT-generated error table:
- Prevention: Start with
expect_or_dropfor initial data quality to avoid pipeline failures. Gradually introduceexpect_or_failfor truly critical data points once data quality is understood. Regularly monitor data quality metrics in DLT.
- Reason: Data quality expectations (
Issue: Timestamps or
event_typeare incorrect after processing.- Reason: Incorrect date/time format string in
F.to_timestampor flawed logic in theF.when().otherwise()standardization. - Debugging:
- Add temporary
F.withColumnstatements in your DLT code to output intermediate columns (e.g.,event_timestamp_str,event_type_raw) and inspect their values before final conversion/standardization. - Verify the exact format of the incoming timestamp strings in the Bronze layer.
- Add temporary
- Prevention: Thoroughly test transformation logic with representative sample data. Use clear and descriptive column names during intermediate steps.
- Reason: Incorrect date/time format string in
Testing & Verification
After successfully running the DLT pipeline, perform the following verification steps:
Verify Silver Table Existence and Schema:
- In a Databricks SQL editor or notebook, run:
DESCRIBE TABLE supply_chain_uc_schema.silver_supply_chain_events; - Confirm that the schema matches our design, including
event_id,shipment_id,event_type(standardized),event_timestamp(TIMESTAMP type),processing_timestamp, and_bronze_record_hash.
- In a Databricks SQL editor or notebook, run:
Verify Data Quality and Transformations:
- Query the
silver_supply_chain_eventstable:SELECT event_id, shipment_id, tracking_number, event_type, event_timestamp, location_city, location_country, carrier, status_code, status_description, estimated_delivery_date, actual_delivery_date, processing_timestamp, _bronze_record_hash FROM supply_chain_uc_schema.silver_supply_chain_events WHERE event_type = 'IN_TRANSIT' -- Example filter LIMIT 50; - Check for:
- Non-null values in
event_id,shipment_id,tracking_number,event_type,event_timestamp. - Standardized
event_typevalues (e.g., no “SHIPPED”, only “IN_TRANSIT”). - Correct
TIMESTAMPformat for all date/time fields. processing_timestampbeing a recent timestamp.
- Non-null values in
- Query the error table to understand any dropped records:This will show you any records that failed the DLT expectations.
SELECT * FROM supply_chain_uc_schema.silver_supply_chain_events_errors LIMIT 50;
- Query the
Verify Continuous Ingestion:
- If your Kafka producer is still sending messages, observe the
silver_supply_chain_eventstable growing over time. RunSELECT COUNT(*) FROM supply_chain_uc_schema.silver_supply_chain_events;multiple times over a few minutes to see the count increase.
- If your Kafka producer is still sending messages, observe the
By performing these checks, you can confidently verify that your Silver layer is correctly processing and refining the supply chain event data.
Summary & Next Steps
In this chapter, we successfully built the Silver layer for our real-time supply chain event data using Databricks Delta Live Tables. We’ve taken raw, potentially inconsistent data from the Bronze layer, applied essential data quality checks, standardized key fields like event_type, and enriched the dataset with crucial metadata like processing_timestamp and a record hash. This silver_supply_chain_events table now provides a clean, reliable, and analytics-ready foundation for all subsequent analyses, particularly for identifying and understanding supply chain delays.
This step is fundamental to building a robust data platform, ensuring that business decisions are based on high-quality, trustworthy data. The DLT framework, with its declarative approach and built-in data quality features, proved invaluable in achieving this.
In the next chapter, we will move to the Gold layer, where we will aggregate and transform this Silver data into a highly optimized, business-specific format for our core delay analytics. This will involve calculating key metrics, potentially joining with other reference data, and creating materialized views that are purpose-built for reporting and advanced analysis. Get ready to turn refined data into actionable insights!