Chapter 9: Building the Customs Trade Data Lakehouse & HS Code Validation
Welcome to Chapter 9 of our real-time supply chain project! In this chapter, we will lay the foundation for intelligent customs trade data analysis by building a robust Data Lakehouse. Specifically, we’ll focus on ingesting and preparing customs declaration data, establishing a master data repository for HS (Harmonized System) codes, and setting up initial data quality validation using Databricks Delta Live Tables (DLT).
This step is critical because accurate HS code classification is paramount for correct tariff application, trade compliance, and avoiding costly penalties. By centralizing and validating this data within a structured Lakehouse environment, we create a reliable source for our tariff impact analysis and anomaly detection systems. We’ll leverage DLT’s declarative nature to build resilient, self-managing data pipelines that transform raw, semi-structured customs data into clean, conformed datasets.
From previous chapters, you should have your Databricks environment configured and a landing zone for raw data (e.g., from Kafka topics written to cloud storage). We’ll assume raw customs declarations are landing in a designated cloud storage path. By the end of this chapter, you will have a fully functional Bronze and Silver layer for your customs trade data, ready for advanced HS code validation and anomaly detection in subsequent chapters.
Planning & Design
Our Data Lakehouse will adhere to the Medallion Architecture, providing a structured approach to data processing:
- Bronze Layer: This layer will store raw, immutable data exactly as it’s ingested from external sources. It acts as a historical archive and a source of truth for all raw data. For customs declarations, this means capturing the raw JSON or CSV payloads.
- Silver Layer: In this layer, data from the Bronze layer is cleaned, parsed, standardized, and enriched. We’ll apply data quality rules, transform data types, and select relevant fields. This layer is designed to be a reliable source for analytics and downstream applications.
- Gold Layer (Future): This layer will house highly aggregated and business-specific data, optimized for reporting, dashboards, and machine learning models. We will build this in later chapters.
Component Architecture for Customs Trade Data
Database Schema (Conceptual)
bronze_customs_declarations (Streaming Live Table)
_raw_data: STRING (Raw JSON payload)_ingestion_timestamp: TIMESTAMP (When record was ingested into Bronze)_source_file: STRING (Path to the source file if applicable)
silver_customs_declarations (Streaming Live Table)
declaration_id: STRING (Unique identifier for the declaration)product_description: STRING (Description of the product)declared_hs_code: STRING (HS code provided in the declaration)importer_exporter_name: STRING (Name of the entity, potentially masked)declaration_date: DATE (Date of the customs declaration)value_usd: DECIMAL(18,2) (Value of goods in USD)currency: STRING (Original currency)country_of_origin: STRINGcountry_of_destination: STRINGsource_system: STRING (e.g., “Kafka_Customs_Feed”)processing_timestamp: TIMESTAMP (When record was processed into Silver)_bronze_ingestion_timestamp: TIMESTAMP (Link to Bronze record)
silver_hs_code_master (Live Table - Batch)
hs_code: STRING (The official HS code, e.g., 6-digit or 10-digit)description: STRING (Official description of the HS code)tariff_rate_import: DECIMAL(5,2) (Current import tariff rate)tariff_rate_export: DECIMAL(5,2) (Current export tariff rate)effective_date: DATE (When the tariff rate became effective)end_date: DATE (When the tariff rate expired, or NULL if current)source_agency: STRING (e.g., “WCO”, “USITC”)last_updated: TIMESTAMP
File Structure
We’ll organize our DLT pipeline code within a Databricks Repo.
├── src/
│ ├── data_lakehouse/
│ │ ├── customs/
│ │ │ ├── dlt_pipelines/
│ │ │ │ └── customs_dlt_pipeline.py # Main DLT pipeline definition
│ │ │ └── schemas/
│ │ │ └── customs_schemas.py # Pydantic/StructType schemas
│ ├── common/
│ │ └── utils.py # Common utilities like logging
├── notebooks/
│ └── dlt_pipeline_runner.ipynb # Notebook to trigger DLT pipeline
├── conf/
│ └── dlt_config.json # DLT pipeline configuration
Step-by-Step Implementation
a) Setup/Configuration
First, ensure you have a Databricks workspace and a Databricks Repo set up. Clone your repo and create the file structure as outlined above.
We’ll start by defining the schemas for our customs data. This helps enforce data quality and provides clarity.
File: src/data_lakehouse/customs/schemas/customs_schemas.py
# src/data_lakehouse/customs/schemas/customs_schemas.py
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DecimalType, DateType
# Schema for raw customs declaration data (Bronze layer)
# Assuming raw data is JSON strings, we'll parse it in DLT
raw_customs_declaration_schema = StructType([
StructField("declaration_id", StringType(), True),
StructField("product_description", StringType(), True),
StructField("declared_hs_code", StringType(), True),
StructField("importer_exporter_name", StringType(), True),
StructField("declaration_date", StringType(), True), # Raw string, will be parsed to DateType
StructField("value", StringType(), True), # Raw string, will be parsed to DecimalType
StructField("currency", StringType(), True),
StructField("country_of_origin", StringType(), True),
StructField("country_of_destination", StringType(), True),
StructField("source_system", StringType(), True)
])
# Schema for the Silver layer customs declarations
silver_customs_declaration_schema = StructType([
StructField("declaration_id", StringType(), False), # Not nullable
StructField("product_description", StringType(), True),
StructField("declared_hs_code", StringType(), True),
StructField("importer_exporter_name", StringType(), True),
StructField("declaration_date", DateType(), True),
StructField("value_usd", DecimalType(18, 2), True),
StructField("currency", StringType(), True),
StructField("country_of_origin", StringType(), True),
StructField("country_of_destination", StringType(), True),
StructField("source_system", StringType(), True),
StructField("processing_timestamp", TimestampType(), False),
StructField("_bronze_ingestion_timestamp", TimestampType(), False)
])
# Schema for HS Code Master Data (Silver layer)
hs_code_master_schema = StructType([
StructField("hs_code", StringType(), False),
StructField("description", StringType(), True),
StructField("tariff_rate_import", DecimalType(5, 2), True),
StructField("tariff_rate_export", DecimalType(5, 2), True),
StructField("effective_date", DateType(), True),
StructField("end_date", DateType(), True),
StructField("source_agency", StringType(), True),
StructField("last_updated", TimestampType(), True)
])
Next, create the main DLT pipeline file.
File: src/data_lakehouse/customs/dlt_pipelines/customs_dlt_pipeline.py
# src/data_lakehouse/customs/dlt_pipelines/customs_dlt_pipeline.py
import dlt
from pyspark.sql.functions import col, current_timestamp, from_json, to_date, regexp_replace, lit, coalesce, round, expr
from pyspark.sql.types import StringType, DecimalType
# Import custom schemas
from data_lakehouse.customs.schemas.customs_schemas import (
raw_customs_declaration_schema,
silver_customs_declaration_schema,
hs_code_master_schema
)
# Configuration for input paths - these will be set via DLT pipeline settings
# For local testing or development, you might hardcode them or use default values.
# In production, DLT parameters are preferred.
RAW_CUSTOMS_LANDING_PATH = "/Volumes/main/default/raw_customs_data" # Example Unity Catalog volume path
HS_CODE_MASTER_PATH = "/Volumes/main/default/hs_code_master_data/hs_codes.csv" # Example Unity Catalog volume path
CURRENCY_EXCHANGE_RATES_PATH = "/Volumes/main/default/currency_exchange_rates/latest.csv" # Example Unity Catalog volume path
# --- Bronze Layer: Raw Ingestion ---
@dlt.table(
name="bronze_customs_declarations",
comment="Raw customs declaration data ingested from landing zone.",
table_properties={"quality": "bronze"}
)
@dlt.expect_all_or_drop({"valid_declaration_id": "declaration_id IS NOT NULL"})
def bronze_customs_declarations():
"""
Ingests raw JSON customs declaration data from a cloud storage landing zone
into the Bronze layer.
"""
# For production, RAW_CUSTOMS_LANDING_PATH should be configured as a DLT pipeline parameter.
# We assume raw JSON files are landing here, e.g., from Kafka Connect S3 Sink.
df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaLocation", f"{RAW_CUSTOMS_LANDING_PATH}/_schemas/bronze_customs_declarations")
.load(RAW_CUSTOMS_LANDING_PATH)
.withColumn("_ingestion_timestamp", current_timestamp())
.withColumn("_source_file", expr("input_file_name()"))
)
# Parse the raw JSON string into its structured fields
# We use a temporary column 'parsed_data' to extract fields
parsed_df = df.withColumn("parsed_data", from_json(col("_raw_data"), raw_customs_declaration_schema)) \
.select(
col("parsed_data.declaration_id").alias("declaration_id"),
col("parsed_data.product_description").alias("product_description"),
col("parsed_data.declared_hs_code").alias("declared_hs_code"),
col("parsed_data.importer_exporter_name").alias("importer_exporter_name"),
col("parsed_data.declaration_date").alias("declaration_date"),
col("parsed_data.value").alias("value"),
col("parsed_data.currency").alias("currency"),
col("parsed_data.country_of_origin").alias("country_of_origin"),
col("parsed_data.country_of_destination").alias("country_of_destination"),
col("parsed_data.source_system").alias("source_system"),
col("_ingestion_timestamp"),
col("_source_file")
)
return parsed_df
# --- Silver Layer: Cleaned and Conformed Data ---
@dlt.table(
name="silver_customs_declarations",
comment="Cleaned and conformed customs declaration data.",
table_properties={"quality": "silver"}
)
@dlt.expect("valid_hs_code_format", "declared_hs_code IS NOT NULL AND LENGTH(declared_hs_code) >= 6")
@dlt.expect("positive_value", "value_usd >= 0")
@dlt.expect_or_drop("valid_declaration_date", "declaration_date IS NOT NULL")
def silver_customs_declarations():
"""
Transforms raw customs declaration data from the Bronze layer into a cleaned,
conformed Silver layer table.
"""
# Read from Bronze layer
bronze_df = dlt.read_stream("bronze_customs_declarations")
# Read latest currency exchange rates (assumed to be a small, frequently updated CSV/Delta table)
# For a real-time system, this might come from a streaming source or a dedicated DLT table.
# For simplicity, we'll treat it as a batch load for this chapter.
exchange_rates_df = (
spark.read
.format("csv")
.option("header", "true")
.load(CURRENCY_EXCHANGE_RATES_PATH)
.select(col("currency_code").alias("from_currency"),
col("usd_rate").cast(DecimalType(10, 4)).alias("usd_rate"))
)
# Perform cleaning and transformations
transformed_df = (
bronze_df.withColumn("processing_timestamp", current_timestamp())
.withColumn("declaration_date", to_date(col("declaration_date"), "yyyy-MM-dd"))
.withColumn("cleaned_value",
regexp_replace(col("value"), "[^0-9.]", "").cast(DecimalType(18, 2))) # Remove non-numeric chars
.join(exchange_rates_df, bronze_df.currency == exchange_rates_df.from_currency, "left")
.withColumn("value_usd", coalesce(round(col("cleaned_value") * col("usd_rate"), 2), col("cleaned_value"))) # Convert to USD, fallback to original if rate missing
.select(
col("declaration_id"),
col("product_description"),
col("declared_hs_code"),
col("importer_exporter_name"),
col("declaration_date"),
col("value_usd"),
col("currency"),
col("country_of_origin"),
col("country_of_destination"),
col("source_system"),
col("processing_timestamp"),
col("_ingestion_timestamp").alias("_bronze_ingestion_timestamp")
)
)
return transformed_df
# --- Silver Layer: HS Code Master Data ---
@dlt.table(
name="silver_hs_code_master",
comment="Master data for HS codes, descriptions, and tariff rates.",
table_properties={"quality": "silver"}
)
@dlt.expect_all_or_fail({"unique_hs_code": "hs_code IS NOT NULL",
"valid_tariff_rates": "tariff_rate_import >= 0 AND tariff_rate_export >= 0"})
def silver_hs_code_master():
"""
Loads HS code master data from a static CSV/Parquet file into the Silver layer.
This table serves as a reference for validation and enrichment.
"""
# For production, HS_CODE_MASTER_PATH should be configured as a DLT pipeline parameter.
# This is typically a batch load, refreshed periodically.
df = (
spark.read
.format("csv") # Or "delta", "parquet" depending on source
.option("header", "true")
.option("inferSchema", "false") # Explicit schema for production
.schema(hs_code_master_schema)
.load(HS_CODE_MASTER_PATH)
.withColumn("last_updated", current_timestamp())
)
return df
Explanation of the DLT Pipeline:
- Imports: We import
dltfor DLT decorators,pyspark.sql.functionsfor data transformations, and our custom schemas. - Configuration:
RAW_CUSTOMS_LANDING_PATH,HS_CODE_MASTER_PATH, andCURRENCY_EXCHANGE_RATES_PATHare placeholders. In a real production setup, these would be passed as parameters to your DLT pipeline configuration, making it flexible across environments (dev, staging, prod). bronze_customs_declarations(Bronze Layer):@dlt.table: Defines this function as a DLT table.name: The name of the Delta table in Unity Catalog (e.g.,main.default.bronze_customs_declarations).commentandtable_properties: Provide metadata for discoverability and governance.@dlt.expect_all_or_drop: This is a crucial data quality constraint. Ifdeclaration_idis null, the entire row will be dropped. DLT will log these dropped records. This ensures only records with a basic identifier proceed.spark.readStream.format("cloudFiles"): This is Databricks’ Auto Loader, highly optimized for incremental processing of files landing in cloud storage. It automatically tracks processed files, handles schema evolution, and is fault-tolerant.option("cloudFiles.schemaLocation"): Specifies a location for Auto Loader to store schema inference and evolution files..withColumn("_ingestion_timestamp", current_timestamp()): Adds a timestamp for when the record entered the Bronze layer..withColumn("_source_file", expr("input_file_name()")): Captures the original file name for auditing.from_json: Parses the raw JSON string (assuming the_raw_datacolumn contains the JSON) into a structured format usingraw_customs_declaration_schema.
silver_customs_declarations(Silver Layer):@dlt.expect,@dlt.expect_or_drop: More specific data quality checks.valid_hs_code_format: Ensuresdeclared_hs_codeis not null and has a minimum length (e.g., 6 digits for international standard). If violated, the row is marked as invalid but still processed (not dropped by default unlessFAIL ON VIOLATIONis set).positive_value: Ensuresvalue_usdis not negative.valid_declaration_date: Ifdeclaration_dateis null after conversion, the row is dropped.
dlt.read_stream("bronze_customs_declarations"): Reads incrementally from our Bronze table, ensuring continuous processing.- Currency Conversion: We join with a static (for now)
exchange_rates_dfto convertvaluetovalue_usd. In a real-world scenario,CURRENCY_EXCHANGE_RATES_PATHmight point to another DLT table or a real-time feed. to_date,regexp_replace,cast: Standard PySpark SQL functions for data cleaning and type conversion.regexp_replaceis used to clean potential non-numeric characters from thevaluefield before casting to Decimal.coalesce: Used forvalue_usdto handle cases where currency conversion might fail (e.g., missing exchange rate), falling back to the original cleaned value.
silver_hs_code_master(Silver Layer - Master Data):- This is a regular
dlt.table(notdlt.read_stream) because it’s typically a batch load of static or periodically updated master data. @dlt.expect_all_or_fail: If any of these expectations (non-nullhs_code, valid tariff rates) are violated, the entire pipeline will fail, indicating a critical issue with the master data itself. This is stricter than_or_drop.spark.read.format("csv"): Reads the master data from a CSV file. For production, specifyinferSchema="false"and explicitly provide the schema to prevent issues.
- This is a regular
c) Testing This Component
To test this DLT pipeline, you’ll need to:
Upload Sample Data:
- Create a sample
raw_customs_data.jsonfile. - Create a sample
hs_codes.csvfile. - Create a sample
latest.csvfor currency exchange rates. - Upload these files to the Unity Catalog volumes or cloud storage paths specified in your DLT pipeline (
RAW_CUSTOMS_LANDING_PATH,HS_CODE_MASTER_PATH,CURRENCY_EXCHANGE_RATES_PATH).
Example
raw_customs_data.json(upload this as a file toRAW_CUSTOMS_LANDING_PATH):{"declaration_id": "DCL001", "product_description": "Laptop Computers", "declared_hs_code": "847130", "importer_exporter_name": "Tech Corp", "declaration_date": "2025-10-01", "value": "125000.00", "currency": "USD", "country_of_origin": "CN", "country_of_destination": "US", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL001\", \"product_description\": \"Laptop Computers\", \"declared_hs_code\": \"847130\", \"importer_exporter_name\": \"Tech Corp\", \"declaration_date\": \"2025-10-01\", \"value\": \"125000.00\", \"currency\": \"USD\", \"country_of_origin\": \"CN\", \"country_of_destination\": \"US\", \"source_system\": \"Kafka_Customs_Feed\"}"} {"declaration_id": "DCL002", "product_description": "Smartphones", "declared_hs_code": "851712", "importer_exporter_name": "Mobile Inc", "declaration_date": "2025-10-02", "value": "200000.00", "currency": "EUR", "country_of_origin": "VN", "country_of_destination": "DE", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL002\", \"product_description\": \"Smartphones\", \"declared_hs_code\": \"851712\", \"importer_exporter_name\": \"Mobile Inc\", \"declaration_date\": \"2025-10-02\", \"value\": \"200000.00\", \"currency\": \"EUR\", \"country_of_origin\": \"VN\", \"country_of_destination\": \"DE\", \"source_system\": \"Kafka_Customs_Feed\"}"} {"declaration_id": "DCL003", "product_description": "Raw Materials", "declared_hs_code": "280300", "importer_exporter_name": "Chem Co", "declaration_date": "2025-10-03", "value": "50000.00", "currency": "GBP", "country_of_origin": "IN", "country_of_destination": "UK", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL003\", \"product_description\": \"Raw Materials\", \"declared_hs_code\": \"280300\", \"importer_exporter_name\": \"Chem Co\", \"declaration_date\": \"2025-10-03\", \"value\": \"50000.00\", \"currency\": \"GBP\", \"country_of_origin\": \"IN\", \"country_of_destination\": \"UK\", \"source_system\": \"Kafka_Customs_Feed\"}"} {"declaration_id": "DCL004", "product_description": "Invalid HS Code", "declared_hs_code": "123", "importer_exporter_name": "Bad Co", "declaration_date": "2025-10-04", "value": "1000.00", "currency": "USD", "country_of_origin": "US", "country_of_destination": "CA", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL004\", \"product_description\": \"Invalid HS Code\", \"declared_hs_code\": \"123\", \"importer_exporter_name\": \"Bad Co\", \"declaration_date\": \"2025-10-04\", \"value\": \"1000.00\", \"currency\": \"USD\", \"country_of_origin\": \"US\", \"country_of_destination\": \"CA\", \"source_system\": \"Kafka_Customs_Feed\"}"} {"declaration_id": "DCL005", "product_description": "Negative Value", "declared_hs_code": "900000", "importer_exporter_name": "Neg Co", "declaration_date": "2025-10-05", "value": "-500.00", "currency": "USD", "country_of_origin": "US", "country_of_destination": "CA", "source_system": "Kafka_Customs_Feed", "_raw_data": "{\"declaration_id\": \"DCL005\", \"product_description\": \"Negative Value\", \"declared_hs_code\": \"900000\", \"importer_exporter_name\": \"Neg Co\", \"declaration_date\": \"2025-10-05\", \"value\": \"-500.00\", \"currency\": \"USD\", \"country_of_origin\": \"US\", \"country_of_destination\": \"CA\", \"source_system\": \"Kafka_Customs_Feed\"}"}Example
hs_codes.csv(upload this toHS_CODE_MASTER_PATH):hs_code,description,tariff_rate_import,tariff_rate_export,effective_date,end_date,source_agency 847130,Automatic data processing machines,0.00,0.00,2024-01-01,,WCO 851712,Telephones for cellular networks,0.00,0.00,2024-01-01,,WCO 280300,Carbon (carbon blacks and other forms of carbon),0.00,0.00,2024-01-01,,WCO 900000,General goods,5.00,0.00,2024-01-01,,WCOExample
latest.csv(for currency exchange rates, upload this toCURRENCY_EXCHANGE_RATES_PATH):currency_code,usd_rate,last_updated USD,1.0000,2025-12-20 EUR,1.0850,2025-12-20 GBP,1.2600,2025-12-20- Create a sample
Create a DLT Pipeline in Databricks:
- Navigate to “Workflows” -> “Delta Live Tables” in your Databricks workspace.
- Click “Create Pipeline”.
- Pipeline Name:
Customs_Trade_Lakehouse_Pipeline - Pipeline Mode:
Continuous(for real-time ingestion) - Source Libraries: Click “Add pipeline library”, select “Workspace file” and navigate to
src/data_lakehouse/customs/dlt_pipelines/customs_dlt_pipeline.py. - Target Schema: Specify a schema name (e.g.,
main.default.customs_lakehouse). Ensure your Unity Catalog is enabled and you have write permissions to this schema. - Storage Location: (Optional but recommended for production) Specify a cloud storage path where DLT can store checkpointing and schema information.
- Advanced Options:
- Configuration: Add the following key-value pairs to pass the paths to your DLT pipeline. Replace
catalog_name.schema_namewith your actual Unity Catalog path.RAW_CUSTOMS_LANDING_PATH:/Volumes/catalog_name/schema_name/raw_customs_dataHS_CODE_MASTER_PATH:/Volumes/catalog_name/schema_name/hs_code_master_data/hs_codes.csvCURRENCY_EXCHANGE_RATES_PATH:/Volumes/catalog_name/schema_name/currency_exchange_rates/latest.csv
- Cluster Policy: (Optional) Select a cluster policy for cost control and standardization.
- Photon: Enable Photon for improved performance.
- Enhanced Autoscaling: Enable for optimal resource utilization.
- Configuration: Add the following key-value pairs to pass the paths to your DLT pipeline. Replace
- Click “Create”.
Start the DLT Pipeline:
- Once created, select your pipeline and click “Start”.
- Monitor the pipeline in the DLT UI. It will show the graph of tables and their status (Initializing, Running, Updating).
Verify Data:
- After the pipeline runs successfully, go to the SQL Editor or browse your Unity Catalog.
- Query the created tables:
SELECT * FROM main.default.customs_lakehouse.bronze_customs_declarations; SELECT * FROM main.default.customs_lakehouse.silver_customs_declarations; SELECT * FROM main.default.customs_lakehouse.silver_hs_code_master; - Check for the expected number of rows, data types, and transformations.
- Observe how the
DCL004(invalid HS code format) andDCL005(negative value) records are handled by theEXPECTclauses insilver_customs_declarations. Depending on the policy (_or_drop), they might be dropped or still present with quality metrics indicating violations.
Production Considerations
- Error Handling:
- DLT Expectations: We’ve implemented
EXPECTandEXPECT_ALL_OR_DROPforbronze_customs_declarationsandsilver_customs_declarations. Forsilver_hs_code_master, we usedEXPECT_ALL_OR_FAILto ensure the integrity of critical master data. - Dead Letter Queues (DLQs): For records dropped by
_or_dropexpectations, DLT provides visibility in the UI. For more robust error handling, consider configuring a custom DLQ strategy where malformed records are written to a separate Delta table for manual review and reprocessing. This would involve a separate DLT table definition that reads from the “dropped” output of the main table. - Try-Except Blocks: For complex transformations outside DLT’s declarative expectations, Python’s
try-exceptblocks can handle specific data parsing errors.
- DLT Expectations: We’ve implemented
- Performance Optimization:
- DLT Serverless: Utilize Databricks Serverless DLT for hands-off infrastructure management and optimized cost-performance.
- Enhanced Autoscaling: DLT pipelines automatically scale resources up or down based on workload, but ensure Enhanced Autoscaling is enabled for optimal performance.
- Photon: Enable Photon runtime for all DLT pipelines to accelerate Spark workloads.
- Small File Compaction: Delta Lake automatically handles small file compaction with
OPTIMIZEcommands. DLT pipelines often incorporate this automatically. - Z-Ordering: For large tables with frequent filtering on specific columns (e.g.,
declaration_date,declared_hs_code), consider applyingZORDERto improve query performance. While DLT can automateOPTIMIZE,ZORDERmight need explicit configuration or a separate DLT job.
- Security Considerations:
- Unity Catalog: All tables are managed by Unity Catalog, providing centralized governance. Ensure appropriate grants are set for users and service principals accessing or modifying these tables (e.g.,
SELECTon Silver,MODIFYon Bronze/Silver for DLT service principal). - Data Masking/Tokenization: The
importer_exporter_nameor other sensitive fields insilver_customs_declarationsmight require masking or tokenization before being exposed to broader analytical teams. This can be implemented as an additional transformation step in the Silver layer or through Unity Catalog’s Row/Column Level Security. - Least Privilege: Grant DLT service principals and users only the necessary permissions to access source data and write to target tables.
- Unity Catalog: All tables are managed by Unity Catalog, providing centralized governance. Ensure appropriate grants are set for users and service principals accessing or modifying these tables (e.g.,
- Logging and Monitoring:
- DLT UI: The DLT UI provides comprehensive logs, data quality metrics, and lineage information.
- Databricks Monitoring: Integrate DLT logs with Databricks monitoring tools (e.g., Ganglia, Spark UI) and external monitoring solutions (e.g., Azure Monitor, AWS CloudWatch, Splunk) for real-time alerts on pipeline failures, performance degradation, or data quality violations.
- Custom Logging: For debugging complex transformations, add standard Python
loggingstatements within your DLT functions.
Code Review Checkpoint
At this checkpoint, we have successfully implemented the initial layers of our Customs Trade Data Lakehouse.
Summary of what was built:
- Bronze Layer (
bronze_customs_declarations): A streaming DLT table ingesting raw customs declaration data from a cloud storage landing zone, ensuring basic data quality fordeclaration_id. - Silver Layer (
silver_customs_declarations): A streaming DLT table transforming the Bronze data, performing cleaning, type conversions, currency conversion to USD, and applying more rigorous data quality checks for HS code format, positive values, and valid declaration dates. - HS Code Master Data (
silver_hs_code_master): A batch DLT table loading a reference dataset of HS codes and their associated tariffs, with strict quality checks to ensure master data integrity.
Files created/modified:
src/data_lakehouse/customs/schemas/customs_schemas.pysrc/data_lakehouse/customs/dlt_pipelines/customs_dlt_pipeline.py
How it integrates with existing code:
This chapter leverages the raw data ingestion mechanisms we established in earlier chapters (e.g., Kafka writing to cloud storage). The DLT pipelines now pick up this raw data, process it, and store it in a structured, governed format within Delta Lake tables managed by Unity Catalog. These Silver layer tables will serve as the foundation for the tariff impact analysis and anomaly detection components we build next.
Common Issues & Solutions
Issue:
cloudFiles.schemaLocationpermissions error.- Symptom: DLT pipeline fails to start or process files with errors related to accessing the schema location.
- Reason: The DLT service principal (or the user running the pipeline) does not have sufficient permissions to create or write to the specified
cloudFiles.schemaLocationpath in your cloud storage or Unity Catalog volume. - Solution: Ensure the identity used by your DLT pipeline has
READ,WRITE, andCREATEpermissions on theschemaLocationpath. For Unity Catalog volumes, this meansCREATE VOLUME,USE VOLUME,MODIFYprivileges.
Issue: Data quality expectations causing pipeline failures or unexpected drops.
- Symptom: DLT pipeline fails or many rows are dropped, but the raw data looks “mostly” okay.
- Reason: Your
EXPECTclauses might be too strict for the incoming data quality, or the data transformations preceding theEXPECTare not robust enough. For example,to_datemight fail if the date format isn’t consistentlyyyyy-MM-dd. - Solution:
- Review
EXPECTpolicies: Understand the difference between_or_drop,_or_fail, and default (metrics only). Adjust as needed. - Refine transformations: Add more robust cleaning steps before applying expectations (e.g.,
try_castorcoalescewith default values). - Inspect dropped records: Use DLT’s UI to view data quality metrics and sample dropped records to understand the exact violations.
- Staged Expectations: Start with
EXPECT(metrics only) to gather data quality insights without blocking the pipeline, then gradually introduce_or_dropor_or_failas data quality improves.
- Review
Issue: Schema evolution not handled correctly.
- Symptom: New fields in incoming raw data are not appearing in the Bronze or Silver tables, or the pipeline fails due to schema mismatch.
- Reason: While Auto Loader (
cloudFiles) has robust schema evolution capabilities, sometimes explicit handling is needed, especially if schema changes are complex or involve nested structures. - Solution:
- Auto Loader Default:
cloudFilesis configured for schema inference and evolution by default. EnsurecloudFiles.schemaLocationis correctly configured. cloudFiles.schemaHints: For specific fields, you can provide hints to Auto Loader.APPLY CHANGES INTO: For Silver tables, if you expect schema changes from Bronze, you might need to re-evaluate howAPPLY CHANGES INTOor explicit schema definition impacts downstream tables. Always ensure your DLTSELECTstatements are resilient to schema changes in upstream tables (e.g., usingcol("new_field")withcoalescefor nulls if the field might not always exist).
- Auto Loader Default:
Testing & Verification
To thoroughly test and verify the work in this chapter:
- Run the DLT Pipeline in
ContinuousMode: This ensures it’s configured for real-time processing and can handle new incoming data. - Ingest More Sample Data: Add new JSON files to your
RAW_CUSTOMS_LANDING_PATH(e.g., another set of 5-10 declarations, including some with known data quality issues) while the pipeline is running. Observe the DLT UI for new updates and successful processing. - Monitor Data Quality Metrics: In the DLT UI, click on each table (
bronze_customs_declarations,silver_customs_declarations,silver_hs_code_master) and review the “Data Quality” tab.- Verify that
bronze_customs_declarationsshows 100% adherence forvalid_declaration_id(since we used_or_drop). - For
silver_customs_declarations, check thevalid_hs_code_formatandpositive_valueexpectations. The records with “123” HS code and “-500” value should show violations, but the pipeline should not fail (unless configured_or_fail). Thevalid_declaration_dateshould show 100% if all dates were valid strings. - For
silver_hs_code_master, all expectations should show 100% adherence, otherwise, the pipeline would have failed.
- Verify that
- Query Data for Integrity:
-- Verify Bronze layer SELECT COUNT(*) FROM main.default.customs_lakehouse.bronze_customs_declarations; SELECT * FROM main.default.customs_lakehouse.bronze_customs_declarations WHERE declaration_id IS NULL; -- Should be 0 if _or_drop worked -- Verify Silver Customs layer SELECT COUNT(*) FROM main.default.customs_lakehouse.silver_customs_declarations; SELECT * FROM main.default.customs_lakehouse.silver_customs_declarations WHERE declared_hs_code LIKE '123%'; -- Should be present, but DLT UI shows quality violation SELECT * FROM main.default.customs_lakehouse.silver_customs_declarations WHERE value_usd < 0; -- Should be present, but DLT UI shows quality violation SELECT * FROM main.default.customs_lakehouse.silver_customs_declarations WHERE currency = 'EUR'; -- Check if USD conversion is correct SELECT declaration_id, declaration_date, value_usd, _bronze_ingestion_timestamp, processing_timestamp FROM main.default.customs_lakehouse.silver_customs_declarations; -- Verify HS Code Master layer SELECT COUNT(*) FROM main.default.customs_lakehouse.silver_hs_code_master; SELECT * FROM main.default.customs_lakehouse.silver_hs_code_master WHERE hs_code = '847130'; - Check for Schema Evolution: If you introduce a new field in a subsequent raw JSON file, verify that
bronze_customs_declarationsautomatically updates its schema.
If all checks pass, your Customs Trade Data Lakehouse is successfully established with core ingestion and data quality mechanisms!
Summary & Next Steps
In this comprehensive chapter, we successfully built the foundational layers of our Customs Trade Data Lakehouse using Databricks Delta Live Tables. We implemented:
- A Bronze layer for raw, immutable customs declarations, leveraging Auto Loader for efficient streaming ingestion.
- A Silver layer for cleaned, transformed, and currency-converted customs declarations, applying robust data quality expectations.
- A Silver layer for HS Code Master Data, crucial for future validation and enrichment.
This robust Lakehouse structure, governed by Unity Catalog and powered by DLT, provides a single source of truth for our customs trade data. It’s now clean, standardized, and ready for advanced analytics.
In the next chapter, we will leverage this prepared data to implement HS Code Classification Validation and Anomaly Detection. This will involve comparing declared HS codes against historical patterns and master data to identify potential misclassifications or suspicious activities, further enhancing our supply chain intelligence.