Chapter 6: Ingesting & Harmonizing HS Code and Tariff Data
Chapter Introduction
In the intricate world of global supply chains, accurate and timely information on Harmonized System (HS) codes and associated tariffs is paramount. These codes classify traded goods, determining duties, taxes, and trade policies. In this chapter, we will build a robust data pipeline using Databricks Delta Live Tables (DLT) to ingest, cleanse, and harmonize raw HS Code and tariff data into our Customs Trade Data Lakehouse.
The primary objective of this step is to establish a reliable source of truth for all trade-related classifications and duties. By automating the ingestion and standardization process, we ensure that downstream analytics for tariff impact analysis, logistics cost monitoring, and anomaly detection are based on high-quality, consistent data. We will move data from raw landing zones into a structured bronze layer and then into a cleaned, validated silver layer, ready for analytical consumption.
Prerequisites: Before proceeding, ensure you have:
- A configured Databricks workspace with Unity Catalog enabled.
- Basic understanding of Delta Live Tables concepts.
- Access to create DLT pipelines and managed tables within Unity Catalog.
- (Optional but recommended) A storage location (e.g., S3 bucket, ADLS Gen2 container) mounted or accessible to your Databricks workspace, where raw HS Code and tariff data files can be placed. For this tutorial, we will use DBFS paths as a stand-in for external cloud storage.
Expected Outcome: By the end of this chapter, you will have a fully functional DLT pipeline that continuously processes raw HS Code and tariff data, applies data quality checks and transformations, and stores the harmonized data in Unity Catalog managed Delta tables. These tables will serve as the foundation for our real-time tariff impact analysis.
Planning & Design
Building a production-ready data pipeline requires careful planning. Here, we outline the architecture, data schemas, and file organization for our HS Code and Tariff data ingestion.
Component Architecture
We will implement a Medallion Architecture pattern using Delta Live Tables:
- Raw Data Landing Zone: This is where raw, untransformed data files (e.g., CSVs, JSONs) for HS codes and tariffs will first land. We’ll simulate this with DBFS paths.
- Bronze Layer (Raw Ingest): A DLT pipeline will read data from the landing zone and land it into
bronzeDelta tables. These tables capture the raw data as-is, along with metadata like processing timestamps and source file names. This layer provides a complete, immutable history of ingested data. - Silver Layer (Cleaned & Harmonized): Another DLT pipeline will read from the
bronzetables, apply data cleaning, standardization, and validation rules (using DLT Expectations). The resulting clean, harmonized data will be stored insilverDelta tables, ready for analytical use.
Database Schema Design
We’ll define schemas for our bronze and silver tables. For simplicity and flexibility, we’ll start with schema inference for bronze and then explicitly define and refine schemas for silver tables. All tables will be managed under Unity Catalog.
Unity Catalog Schema Name: supply_chain_analytics (or a name you prefer from previous chapters)
1. Bronze Layer Tables: These tables capture the raw data with minimal transformation, preserving the original structure and adding metadata.
bronze_hs_codesraw_data:STRING(orSTRUCTif usingcloudFiles.format("json")), captures the entire raw record.processing_timestamp:TIMESTAMP, when the record was processed.source_file:STRING, the path to the original source file.hs_code:STRING, raw HS code from source.description:STRING, raw description.effective_date:STRING, raw effective date.end_date:STRING, raw end date.country_code:STRING, raw country code.source_system:STRING, raw source system identifier._rescued_data:STRING(for schema evolution/corruption handling).
bronze_tariffsraw_data:STRING(orSTRUCT), captures the entire raw record.processing_timestamp:TIMESTAMP, when the record was processed.source_file:STRING, the path to the original source file.tariff_id:STRING, raw tariff ID.hs_code:STRING, raw HS code.country_of_origin:STRING, raw country of origin.country_of_import:STRING, raw country of import.tariff_rate:STRING, raw tariff rate.effective_date:STRING, raw effective date.end_date:STRING, raw end date.tariff_type:STRING, raw tariff type.currency:STRING, raw currency.unit_of_measure:STRING, raw unit of measure.source_system:STRING, raw source system identifier._rescued_data:STRING(for schema evolution/corruption handling).
2. Silver Layer Tables: These tables contain cleaned, standardized, and validated data, ready for direct consumption by analysts and downstream applications.
silver_hs_codeshs_code:STRING(NOT NULL), standardized HS code.description:STRING(NOT NULL), cleaned description.effective_date:DATE(NOT NULL), parsed and validated effective date.end_date:DATE(NOT NULL), parsed and validated end date.country_code:STRING(NOT NULL), standardized country code (e.g., ISO 2-letter).source_system:STRING, source system identifier.is_active:BOOLEAN, flag indicating if the record is currently active based on dates.last_updated:TIMESTAMP, when this record was last updated in the silver table.
silver_tariffstariff_id:STRING(NOT NULL), unique tariff identifier.hs_code:STRING(NOT NULL), standardized HS code (foreign key tosilver_hs_codes).country_of_origin:STRING(NOT NULL), standardized country of origin.country_of_import:STRING(NOT NULL), standardized country of import.tariff_rate:DECIMAL(10,4)(NOT NULL), parsed tariff rate.effective_date:DATE(NOT NULL), parsed effective date.end_date:DATE(NOT NULL), parsed end date.tariff_type:STRING(NOT NULL), standardized tariff type (e.g., MFN, FTA).currency:STRING(NOT NULL), standardized currency code (e.g., USD, EUR).unit_of_measure:STRING, unit of measure for the tariff.source_system:STRING, source system identifier.is_active:BOOLEAN, flag indicating if the record is currently active.last_updated:TIMESTAMP, when this record was last updated.
File Structure
We will organize our DLT pipeline code within a single Python notebook or a set of Python files if the complexity grows. For this chapter, we’ll use a single Python file to define our DLT functions.
/
├── dlt_pipelines/
│ └── hs_tariff_pipeline.py
├── raw_data/
│ ├── hs_codes/
│ │ └── hs_codes_20251201.csv
│ └── tariffs/
│ └── tariffs_20251201.csv
└── notebooks/
└── 06_chapter_guide.ipynb (for running ad-hoc commands/testing)
Step-by-Step Implementation
Let’s begin by setting up our Databricks environment and creating the DLT pipeline components.
a) Setup/Configuration
First, we need to create the raw data files and place them in DBFS (or your cloud storage). Then, we’ll define our DLT pipeline script.
1. Create Sample Raw Data Files:
We’ll simulate incoming raw data by creating two CSV files. In a real-world scenario, these would be dropped into a cloud storage path by an external system.
Create
hs_codes_20251201.csv:hs_code,description,effective_date,end_date,country_code,source_system 010110,"Live horses, asses, mules and hinnies; pure-bred breeding animals",2024-01-01,2099-12-31,US,WCO 010120,"Live horses, asses, mules and hinnies; other than pure-bred breeding animals",2024-01-01,2099-12-31,US,WCO 010210,"Live bovine animals; pure-bred breeding animals",2024-01-01,2099-12-31,CA,WCO 010220,"Live bovine animals; other than pure-bred breeding animals",2024-01-01,2099-12-31,MX,WCO INVALID_HS,"Invalid description",2024-01-01,2099-12-31,ZZ,WCOCreate
tariffs_20251201.csv:tariff_id,hs_code,country_of_origin,country_of_import,tariff_rate,effective_date,end_date,tariff_type,currency,unit_of_measure,source_system T001,010110,CA,US,0.02,2024-01-01,2099-12-31,MFN,USD,UNIT,USTR T002,010120,MX,US,0.00,2024-01-01,2099-12-31,FTA,USD,UNIT,USTR T003,010210,US,CA,0.05,2024-01-01,2099-12-31,MFN,CAD,KG,CBP T004,010220,CA,MX,0.01,2024-01-01,2099-12-31,MFN,MXN,UNIT,CBP T005,010110,DE,US,0.03,2024-01-01,2099-12-31,MFN,EUR,UNIT,USTR T006,INVALID_HS,XX,US,0.00,2024-01-01,2099-12-31,FTA,USD,UNIT,USTR
Upload these files to DBFS: You can do this using the Databricks UI (Workspace -> Create -> File -> Upload File) or by running commands in a Databricks notebook.
In a Databricks notebook, run the following to create the directories and upload the files:
# File Path: notebooks/06_chapter_guide.ipynb
dbutils.fs.mkdirs("/Volumes/supply_chain_analytics/raw_data/hs_codes/")
dbutils.fs.mkdirs("/Volumes/supply_chain_analytics/raw_data/tariffs/")
# Create hs_codes_20251201.csv
hs_codes_data = """hs_code,description,effective_date,end_date,country_code,source_system
010110,"Live horses, asses, mules and hinnies; pure-bred breeding animals",2024-01-01,2099-12-31,US,WCO
010120,"Live horses, asses, mules and hinnies; other than pure-bred breeding animals",2024-01-01,2099-12-31,US,WCO
010210,"Live bovine animals; pure-bred breeding animals",2024-01-01,2099-12-31,CA,WCO
010220,"Live bovine animals; other than pure-bred breeding animals",2024-01-01,2099-12-31,MX,WCO
INVALID_HS,"Invalid description",2024-01-01,2099-12-31,ZZ,WCO
"""
dbutils.fs.put("/Volumes/supply_chain_analytics/raw_data/hs_codes/hs_codes_20251201.csv", hs_codes_data, overwrite=True)
# Create tariffs_20251201.csv
tariffs_data = """tariff_id,hs_code,country_of_origin,country_of_import,tariff_rate,effective_date,end_date,tariff_type,currency,unit_of_measure,source_system
T001,010110,CA,US,0.02,2024-01-01,2099-12-31,MFN,USD,UNIT,USTR
T002,010120,MX,US,0.00,2024-01-01,2099-12-31,FTA,USD,UNIT,USTR
T003,010210,US,CA,0.05,2024-01-01,2099-12-31,MFN,CAD,KG,CBP
T004,010220,CA,MX,0.01,2024-01-01,2099-12-31,MFN,MXN,UNIT,CBP
T005,010110,DE,US,0.03,2024-01-01,2099-12-31,MFN,EUR,UNIT,USTR
T006,INVALID_HS,XX,US,0.00,2024-01-01,2099-12-31,FTA,USD,UNIT,USTR
"""
dbutils.fs.put("/Volumes/supply_chain_analytics/raw_data/tariffs/tariffs_20251201.csv", tariffs_data, overwrite=True)
print("Sample raw data files created in /Volumes/supply_chain_analytics/raw_data/")
Explanation:
- We use
dbutils.fs.mkdirsto create the necessary directories in DBFS. In Unity Catalog,/Volumes/<catalog>/<schema>/is the recommended path for external data. We’re usingsupply_chain_analyticsas our catalog/schema name. dbutils.fs.putis used to write the string content directly to files in DBFS. Theoverwrite=Trueensures that if you re-run this, the files are updated.
2. Create DLT Pipeline Script:
Now, let’s create our DLT Python script. This script will contain the definitions for our bronze and silver tables.
Create
dlt_pipelines/hs_tariff_pipeline.py:# File Path: dlt_pipelines/hs_tariff_pipeline.py import dlt from pyspark.sql.functions import * from pyspark.sql.types import * import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Define Unity Catalog schema (database) and storage locations # Replace <your_catalog> with your actual Unity Catalog catalog name # Replace <your_schema> with your actual Unity Catalog schema name CATALOG_NAME = "supply_chain_analytics" # Ensure this matches your Unity Catalog setup SCHEMA_NAME = "default" # Or a specific schema like 'raw', 'curated' TARGET_SCHEMA = f"{CATALOG_NAME}.{SCHEMA_NAME}" # Raw data input paths (assuming data is placed in these UC volume paths) RAW_HS_CODES_PATH = "/Volumes/supply_chain_analytics/raw_data/hs_codes" RAW_TARIFFS_PATH = "/Volumes/supply_chain_analytics/raw_data/tariffs" # --- Bronze Layer: Raw Ingestion --- @dlt.table( name="bronze_hs_codes", comment="Raw HS Code data ingested from source files, stored in Unity Catalog.", table_properties={"quality": "bronze"}, path=f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/bronze_hs_codes" # Specify path for managed table in UC Volume ) @dlt.expect_or_drop("valid_hs_code_format", "hs_code IS NOT NULL AND length(hs_code) >= 6") def bronze_hs_codes(): """ Reads raw HS Code data from the landing zone into a bronze Delta table. Uses Auto Loader for incremental data ingestion. """ logging.info(f"Ingesting raw HS Code data from: {RAW_HS_CODES_PATH}") return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "csv") .option("cloudFiles.schemaLocation", f"{RAW_HS_CODES_PATH}/_checkpoint/hs_codes_bronze") # Checkpoint location for Auto Loader .option("header", "true") .option("inferSchema", "true") # For initial ingestion, infer schema. Production might use explicit schema. .load(RAW_HS_CODES_PATH) .withColumn("processing_timestamp", current_timestamp()) .withColumn("source_file", input_file_name()) .select( "hs_code", "description", "effective_date", "end_date", "country_code", "source_system", "processing_timestamp", "source_file", "_rescued_data" ) ) @dlt.table( name="bronze_tariffs", comment="Raw Tariff data ingested from source files, stored in Unity Catalog.", table_properties={"quality": "bronze"}, path=f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/bronze_tariffs" # Specify path for managed table in UC Volume ) @dlt.expect_or_drop("valid_tariff_id", "tariff_id IS NOT NULL AND length(tariff_id) > 0") @dlt.expect_or_drop("valid_hs_code_link", "hs_code IS NOT NULL AND length(hs_code) >= 6") def bronze_tariffs(): """ Reads raw Tariff data from the landing zone into a bronze Delta table. Uses Auto Loader for incremental data ingestion. """ logging.info(f"Ingesting raw Tariff data from: {RAW_TARIFFS_PATH}") return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "csv") .option("cloudFiles.schemaLocation", f"{RAW_TARIFFS_PATH}/_checkpoint/tariffs_bronze") # Checkpoint location for Auto Loader .option("header", "true") .option("inferSchema", "true") .load(RAW_TARIFFS_PATH) .withColumn("processing_timestamp", current_timestamp()) .withColumn("source_file", input_file_name()) .select( "tariff_id", "hs_code", "country_of_origin", "country_of_import", "tariff_rate", "effective_date", "end_date", "tariff_type", "currency", "unit_of_measure", "source_system", "processing_timestamp", "source_file", "_rescued_data" ) )
Explanation of the DLT Script:
import dlt: Imports the Delta Live Tables library.from pyspark.sql.functions import *: Imports common Spark SQL functions for transformations.logging: Basic Python logging for visibility into pipeline execution.CATALOG_NAME,SCHEMA_NAME,TARGET_SCHEMA: These variables define our Unity Catalog target for the tables. Crucially, you must replace<your_catalog>and<your_schema>with your actual Unity Catalog details. We’re using/Volumespaths to store the actual data files, which is a best practice for Unity Catalog managed tables.RAW_HS_CODES_PATH,RAW_TARIFFS_PATH: These define where Auto Loader will look for new files. We’re pointing them to the directories where we uploaded our sample CSVs.@dlt.table(...)decorator: Marks a Python function as a DLT table definition.name: The name of the table in Unity Catalog.comment: A descriptive comment for the table.table_properties: Custom properties, herequality: bronzeto indicate the layer.path: Important for Unity Catalog Volumes. This specifies the storage location for the managed table’s data. It should be within a Unity Catalog Volume path, e.g.,/Volumes/<catalog>/<schema>/<table_name>.
@dlt.expect_or_drop(...): This is a DLT expectation. If a record fails this condition, it will be dropped from the output table, and the event will be logged. This ensures basic data quality even at the bronze layer.spark.readStream.format("cloudFiles")...: This is Auto Loader, Databricks’ optimized incremental file ingestion tool.cloudFiles.format("csv"): Specifies the input file format.cloudFiles.schemaLocation: This is the checkpoint location for Auto Loader to track processed files and manage schema evolution. It’s crucial for reliable incremental processing.header,inferSchema: Standard CSV read options.inferSchemais good for initial setup, but for production, explicit schemas are often preferred for robustness..load(RAW_HS_CODES_PATH): Points Auto Loader to the directory to monitor..withColumn("processing_timestamp", current_timestamp()): Adds a column with the time the record was processed..withColumn("source_file", input_file_name()): Adds a column with the name of the source file..select(...): Explicitly selects and reorders columns._rescued_datais a special column created by Auto Loader to capture data that couldn’t be parsed according to the inferred schema, preventing pipeline failures.
b) Core Implementation - Silver Layer Transformations
Now let’s add the silver layer logic to our dlt_pipelines/hs_tariff_pipeline.py script. This layer will consume from the bronze tables, clean, transform, and validate the data.
# File Path: dlt_pipelines/hs_tariff_pipeline.py (append to existing file)
# --- Silver Layer: Cleaned & Harmonized ---
@dlt.table(
name="silver_hs_codes",
comment="Cleaned and harmonized HS Code data, stored in Unity Catalog.",
table_properties={"quality": "silver"},
path=f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/silver_hs_codes"
)
@dlt.expect_or_fail("valid_hs_code_length", "length(hs_code) >= 6 AND length(hs_code) <= 10") # Stricter expectation
@dlt.expect_or_drop("valid_country_code", "length(country_code) = 2") # Ensure ISO 2-letter codes
@dlt.expect_or_drop("valid_dates", "effective_date IS NOT NULL AND end_date IS NOT NULL AND effective_date <= end_date")
def silver_hs_codes():
"""
Cleans and harmonizes HS Code data from the bronze layer.
Applies data quality rules and standardizes formats.
"""
logging.info("Processing bronze_hs_codes into silver_hs_codes.")
return (
dlt.read_stream("bronze_hs_codes")
.filter(col("_rescued_data").isNull()) # Filter out malformed records from bronze
.withColumn("hs_code", upper(trim(col("hs_code"))))
.withColumn("description", trim(col("description")))
.withColumn("effective_date", to_date(col("effective_date"), "yyyy-MM-dd"))
.withColumn("end_date", to_date(col("end_date"), "yyyy-MM-dd"))
.withColumn("country_code", upper(trim(col("country_code"))))
.withColumn("is_active", when(col("end_date") >= current_date(), True).otherwise(False))
.withColumn("last_updated", current_timestamp())
.select(
"hs_code",
"description",
"effective_date",
"end_date",
"country_code",
"source_system",
"is_active",
"last_updated"
)
)
@dlt.table(
name="silver_tariffs",
comment="Cleaned and harmonized Tariff data, stored in Unity Catalog.",
table_properties={"quality": "silver"},
path=f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/silver_tariffs"
)
@dlt.expect_or_fail("valid_tariff_rate", "tariff_rate IS NOT NULL AND tariff_rate >= 0")
@dlt.expect_or_drop("valid_import_country", "length(country_of_import) = 2")
@dlt.expect_or_drop("valid_origin_country", "length(country_of_origin) = 2")
@dlt.expect_or_drop("valid_tariff_dates", "effective_date IS NOT NULL AND end_date IS NOT NULL AND effective_date <= end_date")
def silver_tariffs():
"""
Cleans and harmonizes Tariff data from the bronze layer.
Applies data quality rules and standardizes formats.
"""
logging.info("Processing bronze_tariffs into silver_tariffs.")
return (
dlt.read_stream("bronze_tariffs")
.filter(col("_rescued_data").isNull()) # Filter out malformed records from bronze
.withColumn("tariff_id", upper(trim(col("tariff_id"))))
.withColumn("hs_code", upper(trim(col("hs_code"))))
.withColumn("country_of_origin", upper(trim(col("country_of_origin"))))
.withColumn("country_of_import", upper(trim(col("country_of_import"))))
.withColumn("tariff_rate", col("tariff_rate").cast(DecimalType(10, 4)))
.withColumn("effective_date", to_date(col("effective_date"), "yyyy-MM-dd"))
.withColumn("end_date", to_date(col("end_date"), "yyyy-MM-dd"))
.withColumn("tariff_type", upper(trim(col("tariff_type"))))
.withColumn("currency", upper(trim(col("currency"))))
.withColumn("unit_of_measure", trim(col("unit_of_measure")))
.withColumn("is_active", when(col("end_date") >= current_date(), True).otherwise(False))
.withColumn("last_updated", current_timestamp())
.select(
"tariff_id",
"hs_code",
"country_of_origin",
"country_of_import",
"tariff_rate",
"effective_date",
"end_date",
"tariff_type",
"currency",
"unit_of_measure",
"source_system",
"is_active",
"last_updated"
)
)
```
**Explanation of Silver Layer Logic:**
* **`@dlt.table(...)`**: Similar to bronze, defining silver tables.
* **`@dlt.expect_or_fail(...)`**: A stricter expectation. If a record fails this, the *entire pipeline* will fail. This is used for critical data quality rules where downstream processes cannot proceed without valid data.
* **`@dlt.expect_or_drop(...)`**: If a record fails, it's dropped, but the pipeline continues. Used for less critical issues that can be tolerated.
* **`dlt.read_stream("bronze_hs_codes")`**: Reads incrementally from the previously defined `bronze_hs_codes` DLT table. This creates a dependency, ensuring bronze data is processed first.
* **`.filter(col("_rescued_data").isNull())`**: Filters out records that Auto Loader couldn't parse in the bronze layer. This is a crucial step for data quality.
* **`.withColumn(...)`**: Applies transformations:
* `upper(trim(col(...)))`: Standardizes string fields by trimming whitespace and converting to uppercase.
* `to_date(col(...), "yyyy-MM-dd")`: Converts string dates to `DATE` type.
* `cast(DecimalType(10, 4))`: Converts `tariff_rate` to a precise decimal type.
* `when(col("end_date") >= current_date(), True).otherwise(False)`: Derives an `is_active` flag.
* `current_timestamp()`: Adds a `last_updated` timestamp.
* **`.select(...)`**: Selects and reorders the final columns for the silver table, ensuring the schema matches our design.
#### c) Testing This Component
With the DLT script ready, it's time to create and run the DLT pipeline in Databricks.
**1. Create a DLT Pipeline in Databricks:**
1. Navigate to the **Workflows** persona in your Databricks workspace.
2. Go to the **Delta Live Tables** tab.
3. Click **Create Pipeline**.
4. Configure the pipeline with the following details:
* **Pipeline name:** `hs-tariff-ingestion-pipeline` (or a descriptive name)
* **Product Edition:** `Advanced` (to leverage all DLT features including expectations)
* **Pipeline Library:** Select `Python source code`
* **Path:** Browse to `dlt_pipelines/hs_tariff_pipeline.py`
* **Target Schema:** Enter `supply_chain_analytics.default` (or your chosen catalog and schema). This tells DLT where to create the tables in Unity Catalog.
* **Storage Location:** `/Volumes/<your_catalog>/<your_schema>/dlt_checkpoints/hs_tariff_pipeline` (This is the DLT pipeline's checkpoint location, distinct from Auto Loader's. Replace with your actual catalog/schema).
* **Cluster Mode:** `Enhanced Autoscaling` (recommended for production)
* **Channel:** `Current`
* **Photon:** Checked (for performance)
* **Enable Serverless DLT:** Checked (if available in your region and desired for simplified ops)
* **Advanced Options:**
* **Configuration:**
* `spark.databricks.delta.properties.defaults.enableChangeDataFeed`: `true` (Enable Change Data Feed for all tables by default, useful for downstream SCD operations)
* `spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite`: `true` (Enable auto-optimization for writes)
* `spark.databricks.delta.properties.defaults.autoOptimize.autoCompact`: `true` (Enable auto-compaction)
* **Permissions:** Ensure the service principal or user running the DLT pipeline has `CAN USE` permission on the cluster policy, `CREATE TABLE`, `USE SCHEMA`, `USE CATALOG` permissions on the Unity Catalog schema, and `READ` access to the raw data paths and `WRITE` access to the storage location.
5. Click **Create**.
**2. Run the DLT Pipeline:**
1. Once the pipeline is created, click the **Start** button on the pipeline page.
2. Monitor the pipeline's progress in the DLT UI. You'll see the graph visualizing the bronze and silver tables and their dependencies.
3. Observe the logs for `INFO` messages from our Python `logging` statements.
**Expected Behavior:**
* The pipeline should execute successfully.
* You will see the `bronze_hs_codes`, `bronze_tariffs`, `silver_hs_codes`, and `silver_tariffs` tables being created in your specified Unity Catalog schema (`supply_chain_analytics.default`).
* The DLT UI will show metrics for records processed, records dropped (due to `expect_or_drop`), and any failures (due to `expect_or_fail`).
* For `bronze_hs_codes`, one record (`INVALID_HS`) should be processed, but its `hs_code` expectation will be dropped due to `length(hs_code) >= 6` failing.
* For `silver_hs_codes`, the `INVALID_HS` record from bronze will be filtered out by `col("_rescued_data").isNull()`, so it won't even reach the silver expectations. The `ZZ` country code record will be dropped by `valid_country_code`.
* For `bronze_tariffs`, one record (`T006`) with `INVALID_HS` should be processed.
* For `silver_tariffs`, the `T006` record will be dropped due to the `valid_hs_code_link` expectation (implicitly because `silver_hs_codes` would not have contained it), and also explicitly by `filter(col("_rescued_data").isNull())` if it had parsing issues.
**Debugging Tips:**
* **Check Pipeline Logs:** The DLT UI provides detailed logs. Look for `ERROR` or `WARN` messages.
* **Examine Data Quality Metrics:** The DLT UI clearly shows how many records passed/failed expectations. This helps pinpoint data issues.
* **Query Bronze Tables:** If silver tables are empty or unexpected, query the bronze tables directly in a Databricks SQL editor or notebook (`SELECT * FROM supply_chain_analytics.default.bronze_hs_codes;`) to see what raw data was ingested.
* **Inspect `_rescued_data`:** If `_rescued_data` column in bronze tables contains values, it means your raw data files have parsing issues or schema mismatches that Auto Loader couldn't handle gracefully.
### Production Considerations
Moving from development to production requires addressing robustness, performance, security, and observability.
#### Error Handling
* **DLT Expectations:** We've already implemented `expect_or_drop` and `expect_or_fail`.
* `expect_or_drop`: Ideal for minor data quality issues that shouldn't halt the pipeline but need to be cleaned. The dropped records are logged, providing an audit trail.
* `expect_or_fail`: Critical for business-logic violations where processing incorrect data is worse than stopping the pipeline. This immediately alerts operators.
* **Quarantine Tables:** For more complex error handling, you can direct dropped records into a "quarantine" Delta table for manual review and reprocessing. This can be achieved by writing a separate DLT table that reads from the `_dropped_records` internal view available for each DLT table.
* **Schema Evolution:** Auto Loader handles schema evolution by default. For production, consider enabling `cloudFiles.schemaEvolutionMode` (e.g., `addNewColumns`) and `cloudFiles.schemaHints` for explicit control and robustness.
* **Dead Letter Queue (DLQ):** While DLT handles basic error rows, for integration with external systems like Kafka, a DLQ pattern might be used to send unprocessable messages for further analysis.
#### Performance Optimization
* **DLT Serverless:** Utilize DLT Serverless (if available) to offload cluster management and benefit from optimized resource allocation.
* **Enhanced Autoscaling:** Always use Enhanced Autoscaling for DLT pipelines to dynamically adjust resources based on workload.
* **Photon Engine:** Enable Photon for all DLT pipelines to achieve significant performance gains on Spark workloads.
* **`OPTIMIZE` and `VACUUM`:** DLT automatically manages `OPTIMIZE` and `VACUUM` operations on your Delta tables, ensuring optimal file sizes and performance. Understand the `VACUUM` retention policy to prevent accidental data loss.
* **Checkpointing:** DLT manages checkpointing for streaming queries. Ensure the checkpoint location is in durable, high-performance storage.
* **Batch vs. Streaming:** For very large historical loads, consider a one-time batch DLT run or a `TRIGGERED` pipeline. For continuous ingestion, `CONTINUOUS` mode is suitable, but `TRIGGERED` with `availableNow=True` can be more cost-effective for less frequent updates.
#### Security Considerations
* **Unity Catalog (UC) Integration:** This project leverages Unity Catalog from the start.
* **Fine-grained Access Control:** Grant `SELECT` permissions on `silver` tables to consumers (analysts, other pipelines) and restricted `MODIFY` or `ALL PRIVILEGES` only to the DLT service principal/user.
* **Data Masking/Row Filtering:** If sensitive information were present (e.g., specific tariff rates for proprietary deals), UC allows implementing row filters and column masks.
* **Least Privilege:** Ensure the service principal or user running the DLT pipeline has only the necessary permissions:
* `READ` access to the raw data landing zone.
* `WRITE` access to the DLT checkpoint location.
* `CREATE TABLE`, `USE SCHEMA`, `USE CATALOG` permissions on the target Unity Catalog schema.
* **Data Encryption:** Data in Delta Lake and cloud storage is encrypted at rest (e.g., S3 SSE-KMS, ADLS Gen2 encryption) and in transit (TLS/SSL). Ensure your cloud provider's encryption settings are robust.
* **Network Isolation:** Use private endpoints/Private Link for Databricks workspace and storage accounts to ensure data traffic stays within your private network.
#### Logging and Monitoring
* **DLT UI:** Provides real-time monitoring of pipeline health, progress, and data quality metrics.
* **Databricks Event Logs:** DLT emits detailed events to the Databricks event log, which can be queried for auditing, debugging, and custom monitoring.
* **Cloud Monitoring (e.g., Azure Monitor, CloudWatch):** Integrate Databricks logs and metrics with your cloud provider's monitoring solutions for centralized alerting and dashboarding.
* **Custom Logging:** As demonstrated, use Python's `logging` module within your DLT code for specific application-level insights. This helps trace custom logic.
### Code Review Checkpoint
At this point, you have successfully:
* Created sample raw HS Code and Tariff data files.
* Developed a DLT Python script (`dlt_pipelines/hs_tariff_pipeline.py`) that defines:
* **Bronze layer tables (`bronze_hs_codes`, `bronze_tariffs`)**: Ingesting raw CSV data using Auto Loader, adding processing metadata, and implementing initial data quality expectations (`expect_or_drop`).
* **Silver layer tables (`silver_hs_codes`, `silver_tariffs`)**: Reading from bronze, applying cleaning, standardization (trimming, case conversion, type casting), deriving `is_active` flags, and enforcing stricter data quality rules (`expect_or_fail`, `expect_or_drop`).
* Configured and executed a DLT pipeline in your Databricks workspace, targeting Unity Catalog tables.
The core data ingestion and harmonization for HS Codes and Tariffs are now functional, providing a clean, reliable dataset for subsequent analytical tasks.
### Common Issues & Solutions
Developers often encounter specific issues when working with DLT and data ingestion. Here are a few common ones:
1. **Issue: Schema Evolution Errors with Auto Loader**
* **Scenario:** New columns appear in source files, or data types change, causing Auto Loader to fail or drop records.
* **Debugging:** Check DLT pipeline logs for `Schema evolution` messages. Look at the `_rescued_data` column in your bronze table to see malformed records.
* **Solution:**
* **For new columns:** Auto Loader handles `addNewColumns` by default. Ensure your DLT pipeline's checkpoint location is correctly configured and not corrupted.
* **For data type changes:** Auto Loader generally infers schema once. If a type changes (e.g., `INT` to `STRING`), it might fail. You can use `cloudFiles.schemaHints` to provide an explicit schema or `cloudFiles.schemaEvolutionMode` options like `rescue` or `failOnDataLoss` for specific behaviors. For production, defining explicit schemas for bronze tables is highly recommended.
* **Prevention:** Monitor source schema changes. Communicate with data producers. Implement schema validation pre-ingestion if possible.
2. **Issue: DLT Pipeline Fails Due to `expect_or_fail`**
* **Scenario:** A critical data quality expectation fails, halting the pipeline.
* **Debugging:** The DLT UI clearly highlights which expectation failed and provides links to the error logs. Query the upstream table to identify the specific records that violated the expectation.
* **Solution:**
* **Analyze the data:** Understand why the records failed. Is it a systemic data quality issue from the source, or an incorrect expectation definition?
* **Data cleansing:** Implement additional transformations in your silver layer to correct or filter out problematic data *before* the `expect_or_fail` check.
* **Adjust expectation:** If the business rule has changed or the expectation is too strict for current data quality, adjust or loosen the expectation (e.g., change to `expect_or_drop` if appropriate, or refine the condition).
* **Quarantine:** Direct failing records to a quarantine table for manual review/correction instead of failing the entire pipeline.
* **Prevention:** Start with `expect_or_drop` in development, and only upgrade to `expect_or_fail` for truly critical data quality requirements. Thoroughly test expectations against various data scenarios.
3. **Issue: Unity Catalog Permissions Errors**
* **Scenario:** DLT pipeline fails with `PERMISSION_DENIED` errors when trying to create tables or write to locations.
* **Debugging:** The error message will usually indicate which resource (catalog, schema, table, external location) the permission was denied on.
* **Solution:**
* Ensure the service principal or user configured to run the DLT pipeline has the necessary Unity Catalog permissions:
* `USE CATALOG` on the parent catalog.
* `USE SCHEMA` on the target schema.
* `CREATE TABLE` on the target schema.
* `CREATE VOLUME` and `WRITE_VOLUME` on the volume used for DLT checkpoints and table paths.
* `SELECT` on any source tables (e.g., bronze tables for silver layer).
* Verify that the external locations or volumes used by the pipeline are correctly registered in Unity Catalog and the DLT pipeline has access.
* **Prevention:** Follow the principle of least privilege. Document required permissions clearly. Use Databricks Access Control Lists (ACLs) or SCIM provisioning for consistent permission management.
### Testing & Verification
To confirm that our HS Code and Tariff data ingestion and harmonization pipeline is fully functional and production-ready, perform the following verification steps:
1. **Monitor DLT Pipeline Run:**
* Go to the DLT UI and ensure the `hs-tariff-ingestion-pipeline` completed successfully without errors.
* Review the "Events" tab for any warnings or dropped records.
* Check the "Health" tab for data quality metrics.
2. **Query Bronze Tables:**
* Open a Databricks SQL editor or a new notebook.
* Run queries to inspect the raw data and `_rescued_data` column.
```sql
-- File Path: notebooks/06_chapter_guide.ipynb (or Databricks SQL editor)
SELECT * FROM supply_chain_analytics.default.bronze_hs_codes;
SELECT * FROM supply_chain_analytics.default.bronze_tariffs;
-- Check for rescued data
SELECT COUNT(*) FROM supply_chain_analytics.default.bronze_hs_codes WHERE _rescued_data IS NOT NULL;
SELECT COUNT(*) FROM supply_chain_analytics.default.bronze_tariffs WHERE _rescued_data IS NOT NULL;
```
*Expected:* `_rescued_data` should be NULL for all records in the initial load, as we used `inferSchema` and the CSVs were well-formed. The `INVALID_HS` record should be present.
3. **Query Silver Tables:**
* Verify that the data in the silver tables is clean, transformed, and adheres to the defined schema.
```sql
-- File Path: notebooks/06_chapter_guide.ipynb (or Databricks SQL editor)
SELECT * FROM supply_chain_analytics.default.silver_hs_codes;
SELECT * FROM supply_chain_analytics.default.silver_tariffs;
-- Check data types and transformations
DESCRIBE supply_chain_analytics.default.silver_hs_codes;
DESCRIBE supply_chain_analytics.default.silver_tariffs;
-- Verify active records
SELECT COUNT(*) FROM supply_chain_analytics.default.silver_hs_codes WHERE is_active = TRUE;
SELECT COUNT(*) FROM supply_chain_analytics.default.silver_tariffs WHERE is_active = TRUE;
-- Check for dropped records due to expectations (e.g., 'ZZ' country_code)
SELECT * FROM supply_chain_analytics.default.silver_hs_codes WHERE country_code = 'ZZ'; -- Should return 0 rows
```
*Expected:*
* `silver_hs_codes` should contain 4 records (the `INVALID_HS` and the `ZZ` country code record should have been dropped).
* `hs_code`, `description`, `country_code` should be clean (trimmed, uppercase).
* `effective_date` and `end_date` should be of `DATE` type.
* `silver_tariffs` should contain 4 records (the `T006` record linked to `INVALID_HS` should have been dropped due to the failed `valid_hs_code_link` expectation, and the `XX` country codes).
* `tariff_rate` should be of `DECIMAL` type.
4. **Test Incremental Load:**
* Create a new raw HS Code file (e.g., `hs_codes_20251202.csv`) with some new records and place it in `/Volumes/supply_chain_analytics/raw_data/hs_codes/`.
* Create a new raw Tariff file (e.g., `tariffs_20251202.csv`) with new records and place it in `/Volumes/supply_chain_analytics/raw_data/tariffs/`.
* Run the DLT pipeline again.
* Verify that only the new records are processed and appended to the bronze and silver tables.
```python
# File Path: notebooks/06_chapter_guide.ipynb
# Create new HS Code data
new_hs_codes_data = """hs_code,description,effective_date,end_date,country_code,source_system
010310,"Live swine, pure-bred breeding animals",2024-01-01,2099-12-31,US,WCO
010320,"Live swine, other than pure-bred breeding animals",2024-01-01,2099-12-31,CA,WCO
"""
dbutils.fs.put("/Volumes/supply_chain_analytics/raw_data/hs_codes/hs_codes_20251202.csv", new_hs_codes_data, overwrite=True)
# Create new Tariff data
new_tariffs_data = """tariff_id,hs_code,country_of_origin,country_of_import,tariff_rate,effective_date,end_date,tariff_type,currency,unit_of_measure,source_system
T007,010310,US,CA,0.08,2024-01-01,2099-12-31,MFN,CAD,UNIT,CBP
T008,010320,CA,US,0.00,2024-01-01,2099-12-31,FTA,USD,UNIT,USTR
"""
dbutils.fs.put("/Volumes/supply_chain_analytics/raw_data/tariffs/tariffs_20251202.csv", new_tariffs_data, overwrite=True)
print("New sample raw data files created for incremental load test.")
```
After placing new files, run the DLT pipeline again. Then re-query the silver tables to see the increased row counts.
### Summary & Next Steps
In this chapter, we successfully established the foundational data ingestion and harmonization pipeline for HS Code and Tariff data using Databricks Delta Live Tables. We implemented a robust Medallion Architecture, moving data from raw files into `bronze` and then to `silver` Delta tables within Unity Catalog. Crucially, we incorporated DLT's powerful expectations for data quality enforcement, ensuring that only clean and valid data progresses to our analytical layers.
This harmonized HS Code and Tariff data is now a critical asset in our Customs Trade Data Lakehouse. It forms the basis for accurate calculations of duties and taxes, compliance checks, and detailed cost analysis across the supply chain.
**What's Next:**
In the next chapter, we will leverage this newly ingested and harmonized data. We will focus on **Building Real-time Supply Chain Event Ingestion and Delay Analytics using Databricks Delta Live Tables**. This will involve consuming real-time event streams (e.g., from Kafka) related to supply chain movements, joining them with our HS Code and Tariff data, and performing real-time analytics to detect and analyze delays, integrating our tariff data to understand potential cost impacts.