Introduction: From Local Scripts to Production Pipelines

Welcome to Chapter 16! So far, you’ve mastered the core features of MetaDataHub, Meta AI’s powerful open-source library for managing datasets. You’ve learned how to version, track lineage, and ensure data quality in isolated examples. But what happens when your data needs to move beyond your local machine and into a reliable, scalable, and automated production environment? That’s exactly what we’ll tackle in this chapter!

Here, we’ll dive deep into building a production-ready data workflow. We’ll integrate MetaDataHub with industry-standard MLOps tools like Docker for environment consistency and Apache Airflow for robust workflow orchestration. By the end, you’ll have a clear understanding of how to automate your data pipelines, ensuring that your models are always trained on the right data, every single time. Get ready to put all your MetaDataHub knowledge into action and build something truly impactful!

To get the most out of this chapter, you should be comfortable with:

  • All MetaDataHub core concepts from previous chapters (versioning, schemas, artifacts).
  • Basic Python programming.
  • A foundational understanding of Docker containers.

Core Concepts: Building a Robust MLOps Data Foundation

Before we start coding, let’s align on the key concepts that underpin a successful production data workflow. Think of these as the architectural pillars that support your entire MLOps strategy.

The Pillars of MLOps for Data

In a production machine learning system, your data isn’t static. It’s constantly evolving: new data arrives, existing data gets updated, and transformations change. Without proper management, this can lead to “model rot,” where your models perform poorly due to stale or incorrect data. This is where MLOps principles for data come into play.

  • Reproducibility: Can you recreate the exact dataset used to train a specific model version at any point in time? MetaDataHub directly addresses this by versioning your datasets and their transformations.
  • Traceability (Lineage): Where did this data come from? What transformations were applied? MetaDataHub builds a graph of your data’s journey, providing crucial lineage information.
  • Automation: Manual data processing is slow, error-prone, and doesn’t scale. We need to automate the entire data lifecycle, from ingestion to transformation and preparation for model training.
  • Scalability: As data volumes grow, our pipelines must be able to handle the increased load efficiently.
  • Reliability: Our data pipelines must be robust, with error handling and monitoring, to ensure continuous operation.

Workflow Orchestration with Apache Airflow

For automating complex data workflows, we need an orchestrator. Enter Apache Airflow, a powerful platform to programmatically author, schedule, and monitor workflows. Airflow workflows are defined as Directed Acyclic Graphs (DAGs), where each node is a “task” and edges define dependencies.

Imagine you have a series of steps:

  1. Fetch new raw data.
  2. Clean and preprocess the data.
  3. Validate the schema.
  4. Store the processed data.
  5. Trigger a model retraining job.

Airflow allows you to define these steps, their order, and how they should run (e.g., daily, hourly, or upon an event), providing a central dashboard to monitor their execution.

Containerization with Docker

To ensure our data processing tasks run consistently across different environments (your laptop, a staging server, production), we use Docker. Docker containers package your application, its dependencies, and configuration into a single, isolated unit. This eliminates the dreaded “it works on my machine” problem and is fundamental for reproducible MLOps. Each Airflow task can run within its own Docker container, ensuring a clean and consistent execution environment.

MetaDataHub’s Role in a Production Workflow

MetaDataHub acts as the central data catalog and versioning system within our MLOps stack. It doesn’t run your data transformations, but it manages the outputs and inputs of those transformations.

Consider this workflow:

flowchart TD A[Raw Data Source] --> B{Data Ingestion Service}; B -->|Register/Version| C[MetaDataHub Repository]; C --> D{Data Transformation Service}; D -->|Read/Version| C; C --> E{Model Training Service}; E -->|Uses Versioned Data| C; E --> F[Trained Model Registry]; F --> G[Model Deployment];

As you can see, MetaDataHub (represented by C[MetaDataHub Repository]) is the backbone, ensuring every step knows exactly which version of the data it’s working with and tracking the lineage of new versions.

CI/CD for Data Workflows

Just like software development, data pipelines benefit from Continuous Integration/Continuous Delivery (CI/CD). This means:

  • CI: Automatically testing changes to your data processing code and MetaDataHub schema definitions whenever new code is committed.
  • CD: Automatically deploying validated data pipelines to production environments.

While a full CI/CD setup is beyond this chapter, understanding its importance helps frame our workflow design.

Step-by-Step Implementation: Building Our Production Pipeline

Let’s get our hands dirty and build a simple, yet robust, data workflow. We’ll simulate a scenario where we periodically ingest new “customer feedback” data, clean it, and then version the cleaned dataset using MetaDataHub.

Step 1: Setting Up Your Environment

First, let’s ensure you have the necessary tools installed. As of January 2026, we’ll use recent stable versions.

  1. Python: We’ll use Python 3.11. If you don’t have it, consider using pyenv or conda.

    python3.11 --version
    # Expected output: Python 3.11.x
    
  2. Docker Desktop: Install Docker Desktop for your OS. We’ll use version 25.0.3 or newer.

    docker --version
    # Expected output: Docker version 25.0.3, build 4de4a7f
    
  3. Apache Airflow: We’ll run Airflow in Docker for simplicity. Create a directory for your Airflow setup.

    mkdir airflow_project && cd airflow_project
    curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'
    # This downloads the official docker-compose file for Airflow 2.9.0.
    # Make sure to adjust the Airflow version in the URL if a newer stable release is available.
    

    Now, initialize the Airflow environment:

    docker compose up airflow-init
    

    Once initialized, start Airflow services:

    docker compose up -d
    

    You should now be able to access the Airflow UI at http://localhost:8080 (default credentials: airflow/airflow).

  4. MetaDataHub Client: We need to ensure our Docker containers have the MetaDataHub client installed. For this project, we’ll assume MetaDataHub version 1.2.0 is the latest stable release.

Step 2: Define Our MetaDataHub Dataset Schema

Let’s define a simple schema for our “customer feedback” data. This will ensure consistency.

Create a file named schemas/feedback_schema.py in your airflow_project directory:

# airflow_project/schemas/feedback_schema.py
from metadahub import Schema, Field, DataType

# Define the schema for our customer feedback dataset
feedback_schema = Schema(
    name="customer_feedback",
    version="1.0.0", # Initial schema version
    fields=[
        Field("feedback_id", DataType.INT, description="Unique identifier for the feedback"),
        Field("customer_id", DataType.INT, description="ID of the customer providing feedback"),
        Field("text", DataType.STRING, description="The actual feedback text"),
        Field("sentiment_score", DataType.FLOAT, description="Sentiment score (e.g., -1.0 to 1.0)"),
        Field("timestamp", DataType.DATETIME, description="When the feedback was submitted"),
    ],
    description="Schema for raw customer feedback data."
)

if __name__ == "__main__":
    print("MetaDataHub Schema for Customer Feedback:")
    print(feedback_schema.to_json(indent=2))

Explanation:

  • We import Schema, Field, and DataType from the hypothetical metadahub library.
  • feedback_schema is an instance of Schema defining the structure of our feedback data.
  • Each Field specifies a column name, its data type, and a description for clarity.
  • The if __name__ == "__main__": block is just for testing the schema definition locally.

Step 3: Create a Data Ingestion and Versioning Script

This script will simulate fetching new data, applying the schema, and registering it with MetaDataHub.

Create a file named scripts/ingest_feedback.py in your airflow_project directory:

# airflow_project/scripts/ingest_feedback.py
import pandas as pd
from datetime import datetime
import os
from metadahub import Client, Dataset, DataArtifact
from schemas.feedback_schema import feedback_schema # Our defined schema

# Initialize MetaDataHub client
# In a real setup, this would connect to a MetaDataHub server.
# For local testing, we might use a file-based client or mock it.
mdh_client = Client(server_url=os.getenv("METADAHUB_SERVER_URL", "http://localhost:8081"))

def generate_mock_feedback(num_records=10):
    """Generates mock customer feedback data."""
    data = {
        "feedback_id": range(1, num_records + 1),
        "customer_id": [i % 5 + 1 for i in range(num_records)],
        "text": [f"Feedback {i}: This product is {'great!' if i % 2 == 0 else 'okay.'}" for i in range(num_records)],
        "sentiment_score": [0.8 if i % 2 == 0 else 0.2 for i in range(num_records)],
        "timestamp": [datetime.now() for _ in range(num_records)],
    }
    return pd.DataFrame(data)

def ingest_and_version_feedback():
    """Ingests mock feedback, applies schema, and versions with MetaDataHub."""
    print("Step 1: Generating mock feedback data...")
    raw_data_df = generate_mock_feedback(num_records=50)
    print(f"Generated {len(raw_data_df)} records.")

    # Step 2: Define the dataset artifact
    # We'll save it locally first, then MetaDataHub will track it.
    output_filepath = "data/raw_feedback.csv"
    os.makedirs(os.path.dirname(output_filepath), exist_ok=True)
    raw_data_df.to_csv(output_filepath, index=False)
    print(f"Saved raw data to {output_filepath}")

    # Step 3: Create a MetaDataHub Dataset object
    # We use the defined schema for validation and metadata.
    dataset = Dataset(
        name="customer_feedback_raw",
        description="Raw customer feedback ingested from external source.",
        schema=feedback_schema, # Attach our schema
        version_strategy="semantic", # MetaDataHub automatically increments versions
        tags=["raw", "ingestion"]
    )

    # Step 4: Register and version the data artifact with MetaDataHub
    # MetaDataHub will validate against the schema.
    # It returns the newly created DataArtifact with its version.
    print("Step 4: Registering and versioning data with MetaDataHub...")
    try:
        data_artifact = mdh_client.register_data_artifact(
            dataset=dataset,
            filepath=output_filepath,
            commit_message="Initial ingestion of mock feedback data."
        )
        print(f"Successfully registered data artifact: {data_artifact.name} v{data_artifact.version}")
        print(f"Data location: {data_artifact.uri}")
        # In a real scenario, you'd store data_artifact.version for downstream tasks
    except Exception as e:
        print(f"Error registering data artifact: {e}")
        raise # Re-raise to fail the Airflow task

if __name__ == "__main__":
    ingest_and_version_feedback()

Explanation:

  • We import pandas for data manipulation, datetime for timestamps, and os for file operations.
  • Crucially, we import Client, Dataset, and DataArtifact from metadahub, and our feedback_schema.
  • mdh_client is initialized. In a real environment, METADAHUB_SERVER_URL would point to your deployed MetaDataHub instance.
  • generate_mock_feedback creates some fake data for demonstration.
  • ingest_and_version_feedback orchestrates the process:
    • Generates data.
    • Saves it to a local CSV file (which MetaDataHub will then track).
    • Creates a MetaDataHub Dataset object, linking it to our feedback_schema.
    • Calls mdh_client.register_data_artifact to commit the data. MetaDataHub handles versioning and schema validation automatically. If the data doesn’t match the schema, MetaDataHub will raise an error.

Step 4: Orchestrating with Apache Airflow

Now, let’s wrap our data ingestion script in an Airflow DAG.

Create a file named dags/data_ingestion_workflow.py in your airflow_project directory. You’ll need to create a dags folder first: mkdir dags.

# airflow_project/dags/data_ingestion_workflow.py
from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator

# Import our ingestion script function
from scripts.ingest_feedback import ingest_and_version_feedback

# Define default arguments for the DAG
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
}

# Define the DAG
with DAG(
    dag_id="customer_feedback_data_pipeline",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    schedule=None, # Set to None for manual triggering, or "@daily", "0 0 * * *" for daily
    catchup=False,
    tags=["MetaDataHub", "data_ingestion", "production"],
    default_args=default_args,
) as dag:
    # Task 1: Define the Docker image for our MetaDataHub scripts
    # In a real scenario, you'd build this Docker image with your dependencies.
    # For this example, we'll assume a base Python image with MetaDataHub and pandas installed.
    # You would create a Dockerfile like:
    # FROM python:3.11-slim-bookworm
    # WORKDIR /app
    # COPY requirements.txt .
    # RUN pip install -r requirements.txt
    # COPY . .
    # And then build and push it to a registry.
    # For this Airflow setup, we'll simulate by mounting our scripts.
    # A proper Docker image (e.g., my_org/metadahub_data_processor:1.0.0) would be used.

    # We'll use DockerOperator to run our ingestion script in an isolated container.
    # This requires the Docker daemon to be accessible from the Airflow worker.
    # For local docker-compose setup, this usually means adding `docker_url: unix://var/run/docker.sock`
    # to the DockerOperator or configuring Airflow workers to connect to Docker.
    # For simplicity, we'll use PythonOperator directly, assuming dependencies are in Airflow's environment
    # or we've set up a custom Docker image for Airflow workers with MetaDataHub.
    # A more robust production setup would use DockerOperator with a custom image.

    # Let's use PythonOperator for ease of demonstration with our current Airflow setup.
    # For a truly production-ready Dockerized task, you'd use DockerOperator.
    ingest_feedback_task = PythonOperator(
        task_id="ingest_and_version_feedback_data",
        python_callable=ingest_and_version_feedback,
        # Ensure that METADATAHUB_SERVER_URL is set as an Airflow Variable or environment variable
        # for the Airflow worker running this task.
        op_kwargs={"num_records": 100}, # Example of passing arguments
    )

    # In a production setup with DockerOperator, it would look more like this:
    # ingest_feedback_task_docker = DockerOperator(
    #     task_id="ingest_and_version_feedback_data_docker",
    #     image="my_org/metadahub_data_processor:1.0.0", # Your custom Docker image
    #     command="python scripts/ingest_feedback.py",
    #     environment={
    #         "METADAHUB_SERVER_URL": os.getenv("METADAHUB_SERVER_SERVER_URL", "http://host.docker.internal:8081")
    #     },
    #     # Mount the data and scripts directories if they are not baked into the image
    #     # mounts=[
    #     #     Mount(source="/path/to/airflow_project/scripts", target="/app/scripts", type="bind"),
    #     #     Mount(source="/path/to/airflow_project/schemas", target="/app/schemas", type="bind"),
    #     #     Mount(source="/path/to/airflow_project/data", target="/app/data", type="bind")
    #     # ]
    #     # network_mode="bridge", # Or a specific network if MetaDataHub server is on it
    # )

Explanation:

  • We import necessary modules from Airflow.
  • pendulum is used for defining start_date in a timezone-aware manner.
  • ingest_and_version_feedback is imported directly from our script.
  • The DAG object defines our workflow:
    • dag_id: A unique identifier for the DAG.
    • start_date: When the DAG should start running.
    • schedule: How often the DAG runs. None means manual trigger.
    • tags: Helps categorize DAGs in the Airflow UI.
  • PythonOperator is used to execute our ingest_and_version_feedback function.
    • Important Note: For a truly isolated and production-ready setup, you would use DockerOperator (as commented out) to run your script inside a custom Docker image that has MetaDataHub and all other Python dependencies pre-installed. For this tutorial’s simplicity with docker-compose, we’re using PythonOperator and assuming the metadahub and pandas libraries are accessible to the Airflow worker (you might need to add them to Airflow’s requirements.txt or build a custom Airflow image).

Step 5: Configure MetaDataHub Server (Simulated)

For our MetaDataHub client to work, it needs a server to connect to. In a full production setup, you’d deploy the MetaDataHub server (e.g., using Kubernetes). For this local demo, let’s simulate it by ensuring the METADATAHUB_SERVER_URL environment variable is set.

You can add this to your docker-compose.yaml for the Airflow worker service, or set it in your shell if running the script directly. For the PythonOperator in Airflow, you’d typically set this as an Airflow Variable.

Simulated MetaDataHub Server URL: For local Docker setups, host.docker.internal often resolves to the host machine’s IP address from within a container. So, if you were running a mock MetaDataHub server on your host at port 8081, the URL would be http://host.docker.internal:8081. For this example, we’ll assume a simple http://localhost:8081 for the PythonOperator (meaning the Airflow worker can reach it) or that MetaDataHub client is configured to use a local file-based repository for this demo.

Step 6: Test Your Workflow

  1. Place files: Ensure your airflow_project directory structure looks like this:
    airflow_project/
    ├── docker-compose.yaml
    ├── dags/
    │   └── data_ingestion_workflow.py
    ├── scripts/
    │   └── ingest_feedback.py
    ├── schemas/
    │   └── feedback_schema.py
    └── data/ (will be created by script)
    
  2. Airflow UI: Go to http://localhost:8080. You should see customer_feedback_data_pipeline listed.
  3. Enable and Trigger: Toggle the DAG on and click the “Play” button to trigger a manual run.
  4. Monitor: Check the “Graph View” or “Gantt Chart” for your DAG run. If successful, you’ll see green tasks.
  5. Verify:
    • Check the Airflow task logs for output from ingest_feedback.py. You should see messages about data generation and MetaDataHub registration.
    • Look for the data/raw_feedback.csv file created in your airflow_project directory.
    • (Hypothetically) If you had a MetaDataHub UI, you’d see a new version of customer_feedback_raw registered.

Mini-Challenge: Adding Data Validation

Now it’s your turn! A critical part of any production workflow is data validation. MetaDataHub supports schema validation out-of-the-box.

Challenge: Modify the ingest_feedback.py script to intentionally introduce a data type mismatch (e.g., try to pass a string where an integer is expected for feedback_id). Then, observe how MetaDataHub (and consequently your Airflow task) handles this.

Hint:

  • In generate_mock_feedback, change feedback_id: [str(i) for i in range(1, num_records + 1)].
  • Run the Airflow DAG again. What happens to the task? What error message do you see in the logs?

What to observe/learn: You should see the Airflow task fail, and the logs should clearly indicate a MetaDataHubSchemaValidationError (or similar, depending on the hypothetical MetaDataHub error handling) detailing the mismatch. This demonstrates how MetaDataHub acts as a gatekeeper for data quality in your pipeline.

Common Pitfalls & Troubleshooting

Even with robust tools, production workflows can have hiccups. Here are some common issues and how to approach them:

  1. Environment Inconsistency (The Docker Operator Advantage):

    • Pitfall: Your script works locally, but fails in Airflow. This often points to missing dependencies or different Python versions on the Airflow worker compared to your development environment.
    • Troubleshooting:
      • Solution: This is precisely why DockerOperator is preferred for production. It packages all dependencies.
      • For PythonOperator: Ensure metadahub and pandas are installed in the Airflow worker’s Python environment. You can achieve this by adding them to Airflow’s requirements.txt or by building a custom Docker image for your Airflow workers.
      • Check the Airflow task logs very carefully for ModuleNotFoundError or similar messages.
  2. MetaDataHub Server Connection Issues:

    • Pitfall: mdh_client = Client(...) fails with connection refused errors.
    • Troubleshooting:
      • Verify METADATAHUB_SERVER_URL environment variable or Airflow Variable is correctly set and points to an active MetaDataHub server.
      • If running MetaDataHub in Docker: Ensure the server’s port is exposed and accessible from the Airflow worker container (e.g., using host.docker.internal or putting both on the same Docker network).
      • Check the MetaDataHub server logs if it’s running.
  3. Data Version Skew & Reproducibility:

    • Pitfall: Downstream tasks use an older or incorrect version of data, leading to inconsistent model behavior.
    • Troubleshooting:
      • Solution: Always retrieve the latest or a specific pinned version of a dataset from MetaDataHub using mdh_client.get_latest_data_artifact or mdh_client.get_data_artifact(version="X.Y.Z").
      • Pass the exact data_artifact.version as an XCom (cross-communication) variable between Airflow tasks to ensure explicit version usage.
      • Review MetaDataHub’s lineage graphs to understand how data versions are flowing through your pipeline.

Summary: Your First Production Data Pipeline!

Congratulations! You’ve successfully moved beyond theoretical concepts and built a foundational production-ready data workflow. You’ve learned:

  • The critical MLOps principles for managing data in production environments, emphasizing reproducibility and traceability.
  • How Apache Airflow orchestrates complex data pipelines, defining tasks and dependencies.
  • The role of Docker in ensuring consistent execution environments for your data processing tasks.
  • How MetaDataHub seamlessly integrates into this ecosystem, acting as the single source of truth for your datasets, handling versioning, schema validation, and lineage tracking.
  • Practical steps for setting up your environment, defining MetaDataHub schemas, writing ingestion scripts, and orchestrating them with Airflow.
  • Key troubleshooting strategies for common pitfalls in data pipelines.

This project is just the beginning. From here, you can extend your workflow to include more complex transformations, integrate with model training pipelines, and set up robust monitoring and alerting. The skills you’ve gained in this chapter are invaluable for any data professional looking to deploy machine learning systems reliably.

References

This page is AI-assisted and reviewed. It references official documentation and recognized resources where relevant.