Introduction

Welcome back, intrepid tester! In the previous chapters, you mastered the art of using Testcontainers to bring real databases into your tests. That was a huge step up from in-memory fakes, but what about the broader landscape of modern applications? Many microservices don’t just talk to databases; they communicate through message brokers, call other APIs, and integrate with external services.

This chapter is your passport to confidently testing those complex interactions. We’re going to tackle two crucial areas:

  1. Message Brokers: Specifically, we’ll dive into Kafka, a cornerstone of many asynchronous microservice architectures. You’ll learn how to spin up a fully functional Kafka cluster for your tests, allowing your applications to produce and consume messages against a real instance.
  2. Web Service Interactions: We’ll explore how to test scenarios where your application needs to communicate with other HTTP-based services. This means bringing those dependent services to life within containers for your tests.

By the end of this chapter, you’ll be equipped to test your microservices’ interactions with external systems with unparalleled realism, ensuring robust and reliable applications. Remember our guiding principles: baby steps, practical application, and true understanding. Let’s get started!

Core Concepts: Why Real Message Brokers and Web Services?

Before we jump into code, let’s understand why Testcontainers shines brightest when testing these types of dependencies.

The Challenge of Distributed System Testing

Modern applications, especially those built with microservices, are inherently distributed. They rely on networks, message queues, API calls, and shared state. Testing such systems presents unique challenges:

  • Mocks and Fakes Fall Short: While mocks are great for unit testing isolated components, they often fail to replicate the subtle nuances, latency, error conditions, and protocol quirks of real systems. A mock Kafka might work fine, but what if your application misinterprets a specific message header, or struggles with the broker’s acknowledgment semantics?
  • Integration Gaps: Even if individual services work, their integration points (how they talk to each other) are often the source of bugs. These bugs are incredibly hard to catch without testing against the actual integration mechanisms.
  • Environment Drift: Using a separate “integration testing environment” can lead to differences between your test setup and production, making tests less reliable. Testcontainers aims to bridge this gap.

The Testcontainers Solution: Realism and Isolation

Testcontainers addresses these challenges head-on:

  1. Real Dependencies: Instead of fakes, Testcontainers spins up actual instances of Kafka, Redis, PostgreSQL, or even your custom microservices in Docker containers. This ensures your application interacts with the real thing, catching integration bugs early.
  2. Isolated Environments: Each test run gets its own fresh, clean set of containers. No shared state, no pollution from previous tests, no “it worked on my machine” excuses. This is crucial for reliable and repeatable tests.
  3. Disposable and Fast: Containers are started quickly and torn down automatically after tests, leaving no trace. This keeps your development and CI/CD environments clean.

Testing message brokers and web services with Testcontainers allows you to simulate your production environment’s critical communication pathways, giving you high confidence that your distributed application truly works as intended.

Kafka in Microservices: A Quick Refresher

Kafka is a distributed streaming platform that acts as a central nervous system for many microservice architectures. Services produce events to Kafka topics, and other services consume those events. This asynchronous communication pattern is powerful but also adds complexity to testing. We need a real Kafka broker to ensure our producers correctly serialize messages and our consumers correctly deserialize and process them, including error handling, retries, and exactly-once processing semantics.

Kafka with Testcontainers: A Practical Guide

Let’s get our hands dirty by spinning up a Kafka broker using Testcontainers across different languages. We’ll simulate a simple producer and consumer scenario.

Kafka Version Note: As of 2026-02-14, confluentinc/cp-kafka images are widely used and stable. We’ll use a recent 7.6.0 version in our examples.

1. Java (JUnit 5 + Testcontainers-Kafka)

First, make sure you have your build tool (Maven or Gradle) configured with the Testcontainers dependencies.

pom.xml (Maven) / build.gradle (Gradle) Additions:

<!-- Maven pom.xml -->
<dependencies>
    <!-- Testcontainers core -->
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>testcontainers</artifactId>
        <version>1.19.4</version> <!-- Latest stable as of Feb 2026 -->
        <scope>test</scope>
    </dependency>
    <!-- JUnit Jupiter for testing -->
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.10.1</version> <!-- Latest stable as of Feb 2026 -->
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-engine</artifactId>
        <version>5.10.1</version>
        <scope>test</scope>
    </dependency>
    <!-- Testcontainers Kafka Module -->
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <version>1.19.4</version> <!-- Match core Testcontainers version -->
        <scope>test</scope>
    </dependency>
    <!-- Kafka client for interaction -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.1</version> <!-- Compatible with Kafka 7.x -->
        <scope>test</scope>
    </dependency>
    <!-- SLF4J for logging -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>2.0.11</version> <!-- Latest stable as of Feb 2026 -->
        <scope>test</scope>
    </dependency>
</dependencies>
// Gradle build.gradle
dependencies {
    // Testcontainers core
    testImplementation 'org.testcontainers:testcontainers:1.19.4' // Latest stable as of Feb 2026
    // JUnit Jupiter for testing
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.10.1' // Latest stable as of Feb 2026
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.10.1'
    // Testcontainers Kafka Module
    testImplementation 'org.testcontainers:kafka:1.19.4' // Match core Testcontainers version
    // Kafka client for interaction
    testImplementation 'org.apache.kafka:kafka-clients:3.6.1' // Compatible with Kafka 7.x
    // SLF4J for logging
    testImplementation 'org.slf4j:slf4j-simple:2.0.11' // Latest stable as of Feb 2026
}

test {
    useJUnitPlatform()
}

Now, let’s write a simple Java test that uses a Kafka container:

// src/test/java/com/example/KafkaIntegrationTest.java
package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.*;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

public class KafkaIntegrationTest {

    // Step 1: Declare the KafkaContainer
    // We'll use the Confluent Platform Kafka image for robustness.
    // As of Feb 2026, version 7.6.0 is a stable choice.
    @Container
    private static KafkaContainer kafka =
            new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"));

    // Step 2: Start the container once for all tests in this class
    // This is a common optimization for faster test suites.
    @BeforeAll
    static void startKafka() {
        kafka.start();
    }

    // Step 3: Stop the container after all tests
    @AfterAll
    static void stopKafka() {
        kafka.stop();
    }

    @Test
    void shouldProduceAndConsumeMessage() throws ExecutionException, InterruptedException {
        String topic = "my-test-topic-" + UUID.randomUUID(); // Unique topic for isolation
        String message = "Hello, Testcontainers Kafka!";

        // Configure Producer
        Properties producerProps = new Properties();
        // Get Kafka's bootstrap servers dynamically from the container
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Configure Consumer
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + UUID.randomUUID()); // Unique consumer group
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start reading from the beginning

        // Step 4: Create a Kafka Producer
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
            // Step 5: Send a message
            producer.send(new ProducerRecord<>(topic, "test-key", message)).get(); // .get() makes it synchronous
            producer.flush(); // Ensure message is sent
        }

        // Step 6: Create a Kafka Consumer
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
            consumer.subscribe(Collections.singletonList(topic)); // Subscribe to our test topic

            // Step 7: Poll for messages
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10)); // Poll for 10 seconds

            // Assertions
            assertFalse(records.isEmpty(), "Should have received at least one message");
            assertEquals(1, records.count(), "Should have received exactly one message");
            assertEquals(message, records.iterator().next().value(), "Received message should match sent message");
        }
    }
}

Explanation:

  • @Container private static KafkaContainer kafka = ...: This line declares our Kafka container. We specify the Docker image name. Testcontainers automatically handles downloading, starting, and stopping Kafka, including its Zookeeper dependency.
  • @BeforeAll static void startKafka() { kafka.start(); }: The start() method launches the container before any tests run. static ensures it’s done once for the entire test class.
  • @AfterAll static void stopKafka() { kafka.stop(); }: The stop() method gracefully shuts down the container after all tests complete.
  • kafka.getBootstrapServers(): This is the magic! Testcontainers dynamically provides the correct, dynamically mapped port and host for your application to connect to the Kafka broker inside the container. This removes hardcoding and ensures portability.
  • The rest of the code sets up standard Kafka producer and consumer clients using the provided bootstrap servers, then sends and receives a message, asserting its content. We use UUID.randomUUID() for topic and consumer group IDs to ensure test isolation even if multiple tests run concurrently.

2. Python (Pytest + Testcontainers-Python)

For Python, we’ll leverage pytest and the testcontainers-python library.

Installation:

pip install pytest testcontainers-core testcontainers-kafka-enterprise kafka-python

Note: testcontainers-kafka-enterprise is the module that provides the KafkaContainer for Testcontainers-Python. kafka-python is a client library to interact with Kafka. As of 2026-02-14, testcontainers-python 4.14.1 is a recent stable version.

# tests/test_kafka_integration.py
import pytest
from testcontainers.kafka import KafkaContainer # This module is usually from testcontainers-kafka-enterprise
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import NoBrokersAvailable
import json
import time
import uuid

# Define a fixture for the Kafka container
# This fixture will start the Kafka container before tests that use it
# and stop it afterwards. 'scope="session"' means it runs once for all tests.
@pytest.fixture(scope="session")
def kafka_container():
    # Using a stable Confluent Platform Kafka image
    # As of Feb 2026, version 7.6.0 is a stable choice.
    with KafkaContainer("confluentinc/cp-kafka:7.6.0") as kafka:
        yield kafka.get_bootstrap_server()

# Fixture to provide a Kafka producer
@pytest.fixture
def kafka_producer(kafka_container):
    # The 'retry_backoff_ms' is important here for Python clients
    # to give Kafka time to start up and become ready.
    retries = 5
    for i in range(retries):
        try:
            producer = KafkaProducer(
                bootstrap_servers=kafka_container,
                value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                acks='all',
                retries=3,
                retry_backoff_ms=200, # Small delay between retries
                api_version=(0,11,5) # Specify API version for broader compatibility
            )
            yield producer
            return
        except NoBrokersAvailable as e:
            if i < retries - 1:
                print(f"Kafka producer connection failed, retrying... ({i+1}/{retries})")
                time.sleep(2) # Wait a bit longer before next retry
            else:
                raise e # Re-raise if all retries fail
    

# Fixture to provide a Kafka consumer
@pytest.fixture
def kafka_consumer(kafka_container):
    group_id = f"test-group-{uuid.uuid4()}" # Unique group ID for isolation
    consumer = KafkaConsumer(
        bootstrap_servers=kafka_container,
        group_id=group_id,
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        consumer_timeout_ms=5000, # Wait up to 5 seconds for messages
        api_version=(0,11,5)
    )
    yield consumer
    consumer.close() # Ensure consumer is closed after test

# Now, let's write our test!
def test_should_produce_and_consume_message_python(kafka_producer, kafka_consumer):
    topic = f"my-python-test-topic-{uuid.uuid4()}"
    message_payload = {"id": 123, "data": "Hello from Python Testcontainers!"}

    # Step 1: Subscribe the consumer to the topic before producing
    kafka_consumer.subscribe([topic])

    # Step 2: Produce a message
    future = kafka_producer.send(topic, message_payload)
    record_metadata = future.get(timeout=10) # Wait for the message to be sent
    assert record_metadata.topic == topic
    print(f"Message sent to topic {record_metadata.topic}, partition {record_metadata.partition}, offset {record_metadata.offset}")

    # Step 3: Consume messages
    received_message = None
    for msg in kafka_consumer:
        if msg.topic == topic and msg.value == message_payload:
            received_message = msg
            break # Found our message
    
    # Assertions
    assert received_message is not None, "Did not receive the expected message"
    assert received_message.value == message_payload, "Received message content mismatch"
    assert received_message.topic == topic, "Received message topic mismatch"

Explanation:

  • @pytest.fixture(scope="session") def kafka_container():: Pytest fixtures are powerful for managing resources. scope="session" ensures the Kafka container is started once and shared across all tests in the session, improving performance. The yield kafka.get_bootstrap_server() passes the dynamically determined Kafka connection string to tests.
  • KafkaContainer("confluentinc/cp-kafka:7.6.0"): Similar to Java, this instantiates our Kafka container.
  • kafka_producer and kafka_consumer fixtures: These create Kafka client instances, configured with the bootstrap_servers from our kafka_container fixture. The retry_backoff_ms is important for the producer to gracefully handle the brief startup time of Kafka.
  • consumer.subscribe([topic]): The consumer needs to subscribe to a topic before it can receive messages.
  • producer.send(topic, message_payload): Sends the message. .get(timeout=10) makes it block until the message is acknowledged by Kafka, which is useful in tests.
  • for msg in kafka_consumer:: The consumer polls for messages. We loop until we find our specific message or the consumer_timeout_ms is reached.

3. JavaScript/TypeScript (Node.js + Testcontainers-Node)

For Node.js, we’ll use testcontainers with kafkajs (a popular Kafka client for Node.js).

Installation:

npm install --save-dev testcontainers @types/jest jest kafkajs
# Or yarn:
# yarn add -D testcontainers @types/jest jest kafkajs

Note: As of 2026-02-14, testcontainers (Node.js) is in a stable release, e.g., 10.9.0 or higher.

// tests/kafka.test.ts
import { KafkaContainer } from 'testcontainers';
import { Kafka, Producer, Consumer } from 'kafkajs';
import { v4 as uuidv4 } from 'uuid';

describe('Kafka Integration with Testcontainers', () => {
    let kafkaContainer: KafkaContainer;
    let kafka: Kafka;
    let producer: Producer;
    let consumer: Consumer;
    let topic: string;
    let messages: any[] = [];

    // Before all tests, start the Kafka container and set up clients
    beforeAll(async () => {
        // Step 1: Start the Kafka Container
        // As of Feb 2026, version 7.6.0 is a stable choice for Confluent Platform Kafka.
        kafkaContainer = await new KafkaContainer('confluentinc/cp-kafka:7.6.0').start();

        // Step 2: Initialize KafkaJS client
        kafka = new Kafka({
            brokers: [kafkaContainer.getBootstrapServers()],
            clientId: `my-app-${uuidv4()}`
        });

        // Step 3: Create a producer
        producer = kafka.producer();
        await producer.connect();

        // Step 4: Create a consumer
        consumer = kafka.consumer({ groupId: `test-group-${uuidv4()}` });
        await consumer.connect();

        // Create a unique topic for this test run
        topic = `node-test-topic-${uuidv4()}`;

        // Step 5: Subscribe the consumer to the topic
        await consumer.subscribe({ topic, fromBeginning: true });

        // Set up message collection for the consumer
        consumer.run({
            eachMessage: async ({ message }) => {
                if (message.value) {
                    messages.push(JSON.parse(message.value.toString()));
                }
            },
        });

        // Give the consumer a moment to subscribe and be ready
        await new Promise(resolve => setTimeout(resolve, 2000)); 
    }, 60000); // Increased timeout for container startup

    // After all tests, stop the producer, consumer, and container
    afterAll(async () => {
        await producer.disconnect();
        await consumer.disconnect();
        await kafkaContainer.stop();
    });

    // Before each test, clear previous messages
    beforeEach(() => {
        messages = [];
    });

    test('should produce and consume a message from Kafka', async () => {
        const testMessage = { id: 1, text: 'Hello from Node.js Testcontainers!' };

        // Step 6: Produce a message
        await producer.send({
            topic: topic,
            messages: [{ value: JSON.stringify(testMessage) }],
        });

        // Step 7: Wait for the consumer to process the message
        // In a real app, this would be an assertion on the side effect
        // or a mock of the next service in the chain. For testing, we poll.
        let retries = 0;
        const maxRetries = 10;
        while (messages.length === 0 && retries < maxRetries) {
            await new Promise(resolve => setTimeout(resolve, 500)); // Wait for 500ms
            retries++;
        }

        // Assertions
        expect(messages.length).toBe(1);
        expect(messages[0]).toEqual(testMessage);
    });
});

Explanation:

  • import { KafkaContainer } from 'testcontainers';: Imports the KafkaContainer class.
  • kafkaContainer = await new KafkaContainer('confluentinc/cp-kafka:7.6.0').start();: Instantiates and starts the Kafka container. await is used because starting a container is an asynchronous operation.
  • kafka = new Kafka({ brokers: [kafkaContainer.getBootstrapServers()], ... });: Initializes the kafkajs client, passing the dynamically retrieved bootstrap servers.
  • producer.connect(), consumer.connect(), consumer.subscribe(): Standard kafkajs client setup.
  • consumer.run({ eachMessage: async ({ message }) => { ... } });: This sets up a listener for messages. Messages are pushed into a messages array for later assertion.
  • producer.send(): Sends the message to the topic.
  • Polling Loop: The while loop simulates waiting for the message to be processed by our consumer. In a real integration test, you’d likely assert on the side effects of message processing (e.g., data updated in a database, another service receiving an API call), rather than directly consuming the message in the test itself. This approach shows direct consumption for demonstration.

Testing Web Service Interactions: A Client-Server Scenario

Many microservices act as clients to other HTTP APIs. Testcontainers can spin up those dependent APIs in containers, allowing you to test your client service against a real, running instance of its dependency.

Let’s imagine you have a UserService that needs to fetch user details from an AuthService. We’ll spin up a mock AuthService in a container and test our UserService against it.

For this, we’ll use GenericContainer, which allows us to run any Docker image. We’ll simulate a simple HTTP server within a container.

1. Java: Testing a Client Service against a Containerized API

First, let’s create a very simple “AuthService” that runs on a specific port and returns a JSON response. We’ll use a plain Java HTTP server for simplicity, but in a real scenario, this would be your actual Spring Boot/Micronaut/Quarkus application.

AuthService.java (This would be your actual dependent service code, placed somewhere accessible to Docker, or a pre-built image):

// src/main/java/com/example/AuthService.java
package com.example;

import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpExchange;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;

// This is a minimal HTTP server for demonstration.
// In a real-world scenario, this would be your dependent microservice.
public class AuthService {

    public static void main(String[] args) throws IOException {
        int port = Integer.parseInt(System.getProperty("PORT", "8080"));
        HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
        server.createContext("/user/123", new UserHandler());
        server.setExecutor(null); // creates a default executor
        server.start();
        System.out.println("AuthService started on port " + port);
    }

    static class UserHandler implements HttpHandler {
        @Override
        public void handle(HttpExchange exchange) throws IOException {
            String response = "{\"id\": 123, \"username\": \"testuser\", \"email\": \"[email protected]\"}";
            exchange.getResponseHeaders().set("Content-Type", "application/json");
            exchange.sendResponseHeaders(200, response.length());
            OutputStream os = exchange.getResponseBody();
            os.write(response.getBytes());
            os.close();
        }
    }
}

To run this in a Testcontainers GenericContainer, we need a Dockerfile that builds and runs it.

Dockerfile (in src/test/resources/auth-service/Dockerfile):

# src/test/resources/auth-service/Dockerfile
FROM eclipse-temurin:21-jre-jammy
WORKDIR /app
COPY target/classes/com/example/AuthService.class /app/com/example/
COPY target/classes/com/example/AuthService$UserHandler.class /app/com/example/
CMD ["java", "-cp", "/app", "com.example.AuthService"]
EXPOSE 8080

Now, let’s write our UserService client and its test:

UserService.java (Our client service that depends on AuthService):

// src/main/java/com/example/UserService.java
package com.example;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;

// A simple client service that fetches user data from an external API
public class UserService {
    private final String authServiceBaseUrl;
    private final HttpClient httpClient;

    public UserService(String authServiceBaseUrl) {
        this.authServiceBaseUrl = authServiceBaseUrl;
        this.httpClient = HttpClient.newHttpClient();
    }

    public String getUserDetails(String userId) throws IOException, InterruptedException {
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(authServiceBaseUrl + "/user/" + userId))
                .GET()
                .build();

        HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

        if (response.statusCode() == 200) {
            return response.body();
        } else {
            throw new RuntimeException("Failed to get user details: " + response.statusCode());
        }
    }
}

UserServiceIntegrationTest.java (The Testcontainers test):

// src/test/java/com/example/UserServiceIntegrationTest.java
package com.example;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.images.builder.ImageFromDockerfile;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.Executors;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class UserServiceIntegrationTest {

    // Step 1: Declare the GenericContainer for our AuthService
    // We'll build an image from a Dockerfile located in resources.
    @Container
    private static GenericContainer<?> authService = new GenericContainer<>(
            new ImageFromDockerfile()
                    .withDockerfile(new File("src/test/resources/auth-service/Dockerfile").toPath())
                    .withFileFromClasspath("com/example/AuthService.class", "com/example/AuthService.class")
                    .withFileFromClasspath("com/example/AuthService$UserHandler.class", "com/example/AuthService$UserHandler.class")
    )
            .withExposedPorts(8080) // The port our AuthService listens on
            .waitingFor(Wait.forHttp("/user/123").forPort(8080).forStatusCode(200)); // Wait until /user/123 returns 200 OK

    private static UserService userService;

    // Step 2: Start the container and initialize our client service
    @BeforeAll
    static void startAuthServiceAndInitClient() {
        authService.start();
        // Dynamically get the mapped host and port to connect our client
        String authServiceBaseUrl = String.format("http://%s:%d",
                authService.getHost(),
                authService.getMappedPort(8080));
        userService = new UserService(authServiceBaseUrl);
        System.out.println("AuthService running at: " + authServiceBaseUrl);
    }

    // Step 3: Stop the container
    @AfterAll
    static void stopAuthService() {
        authService.stop();
    }

    @Test
    void shouldGetUserDetailsFromAuthService() throws IOException, InterruptedException {
        // Step 4: Call our client service, which in turn calls the containerized AuthService
        String userDetailsJson = userService.getUserDetails("123");

        // Assertions
        assertNotNull(userDetailsJson);
        assertTrue(userDetailsJson.contains("testuser"));
        assertTrue(userDetailsJson.contains("[email protected]"));
        System.out.println("Received user details: " + userDetailsJson);
    }
}

Explanation:

  • ImageFromDockerfile: This is a powerful feature! Testcontainers can build a Docker image on-the-fly from a Dockerfile and local files. Here, we’re copying our compiled AuthService.class files into the container.
  • withExposedPorts(8080): Tells Testcontainers that port 8080 inside the container should be accessible from the host. Testcontainers will dynamically map this to an available port on your host machine.
  • waitingFor(Wait.forHttp("/user/123")...): This is a crucial wait strategy. It tells Testcontainers not to consider the container “started” until an HTTP GET request to /user/123 on port 8080 returns a 200 OK status. This ensures our AuthService is fully ready to accept requests before our test tries to connect.
  • authService.getHost() and authService.getMappedPort(8080): We use these methods to get the actual host and port that Testcontainers has exposed for the AuthService. This dynamically configures our UserService client.
  • The test then simply calls our UserService, which internally makes an HTTP call to the containerized AuthService, and we assert the response.

2. Python: Testing a Client Service against a Containerized API

We’ll use a similar approach in Python, creating a simple Flask application for our mock AuthService and then testing a Python client against it.

auth_service_app.py (our dependent Flask service, needs to be in a directory with a Dockerfile):

# auth-service-py/auth_service_app.py
from flask import Flask, jsonify

app = Flask(__name__)

@app.route("/user/<user_id>", methods=["GET"])
def get_user(user_id):
    if user_id == "123":
        return jsonify({"id": 123, "username": "pythonuser", "email": "[email protected]"})
    return jsonify({"error": "User not found"}), 404

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)

Dockerfile (in auth-service-py/Dockerfile):

# auth-service-py/Dockerfile
FROM python:3.10-slim-buster
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY auth_service_app.py .
CMD ["python", "auth_service_app.py"]
EXPOSE 8080

requirements.txt (in auth-service-py/requirements.txt):

Flask==2.3.3

user_service_client.py (Our Python client service):

# user_service_client.py
import requests

class UserService:
    def __init__(self, auth_service_base_url: str):
        self.auth_service_base_url = auth_service_base_url

    def get_user_details(self, user_id: str) -> dict:
        url = f"{self.auth_service_base_url}/user/{user_id}"
        response = requests.get(url, timeout=5) # Add a timeout for robustness
        response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
        return response.json()

tests/test_web_service_integration.py (The Pytest Testcontainers test):

# tests/test_web_service_integration.py
import pytest
from testcontainers.core.container import GenericContainer
from testcontainers.core.waiting_utils import wait_for_http_status_code
from user_service_client import UserService
import os

# Define a fixture for the AuthService container
@pytest.fixture(scope="session")
def auth_service_container():
    # Construct the path to the Dockerfile relative to the test file
    current_dir = os.path.dirname(os.path.abspath(__file__))
    dockerfile_dir = os.path.join(current_dir, "../auth-service-py")

    with GenericContainer(image="test-auth-service-py", dockerfile=dockerfile_dir) \
            .with_exposed_ports(8080) \
            .with_env("PORT", "8080") \
            .waiting_for(wait_for_http_status_code("/", 8080, 200, timeout=10)) as container:
        
        # Testcontainers-Python's 'wait_for_http_status_code' needs an initial path.
        # Once it confirms the server is up, we can use the specific endpoint for the test.
        # A more robust wait strategy could also be used here if the root is not immediately available.
        # Alternatively, for wait_for_http_status_code, we can use `/user/123` if that endpoint is the first to become ready.
        # Let's refine the wait strategy to target our specific endpoint:
        # Note: wait_for_http_status_code is a bit simplistic. Testcontainers-Python often
        # recommends using custom wait strategies or a combination.
        # For a truly robust wait, a custom predicate or retries on connection might be better.
        # Given the Flask app serves `/user/123`, we'll make sure it's ready.
        wait_for_http_status_code(container.get_container_host_ip(), container.get_exposed_port(8080), "/user/123", 200, timeout=10)

        yield f"http://{container.get_container_host_ip()}:{container.get_exposed_port(8080)}"

# Fixture for our UserService client
@pytest.fixture
def user_service_client(auth_service_container):
    return UserService(auth_service_container)

# The integration test
def test_should_get_user_details_from_auth_service_py(user_service_client):
    user_id = "123"
    user_details = user_service_client.get_user_details(user_id)

    assert user_details is not None
    assert user_details["id"] == 123
    assert user_details["username"] == "pythonuser"
    assert user_details["email"] == "[email protected]"
    print(f"Received user details: {user_details}")

    # Test for a non-existent user
    with pytest.raises(requests.exceptions.HTTPError) as excinfo:
        user_service_client.get_user_details("999")
    assert excinfo.value.response.status_code == 404

Explanation:

  • GenericContainer(image="test-auth-service-py", dockerfile=dockerfile_dir): In Python, you can specify dockerfile to build an image from a local Dockerfile and context. The image name here is a temporary name assigned during the build.
  • .with_exposed_ports(8080): Exposes the internal port 8080.
  • .with_env("PORT", "8080"): Passes an environment variable to the container, which our Flask app uses.
  • .waiting_for(wait_for_http_status_code(...)): This is the wait strategy. It waits until an HTTP GET request to auth_service_container.get_container_host_ip():auth_service_container.get_exposed_port(8080)/user/123 returns a 200 status code. The wait_for_http_status_code utility is a convenient way to confirm the web service is ready.
  • yield f"http://{container.get_container_host_ip()}:{container.get_exposed_port(8080)}": Provides the dynamically generated base URL for the AuthService to our user_service_client fixture.
  • The test then calls the UserService and asserts the JSON response.

3. JavaScript/TypeScript: Testing a Client Service against a Containerized API

We’ll follow the pattern again, creating a simple Express.js application for our mock AuthService.

auth-service-node/app.ts (our dependent Express service):

// auth-service-node/app.ts
import express from 'express';

const app = express();
const port = process.env.PORT || 8080;

app.get('/user/:userId', (req, res) => {
    const userId = req.params.userId;
    if (userId === '123') {
        res.status(200).json({ id: 123, username: 'nodeuser', email: '[email protected]' });
    } else {
        res.status(404).json({ error: 'User not found' });
    }
});

app.listen(port, () => {
    console.log(`AuthService Node.js running on http://localhost:${port}`);
});

auth-service-node/package.json:

{
  "name": "auth-service-node",
  "version": "1.0.0",
  "description": "",
  "main": "dist/app.js",
  "scripts": {
    "start": "tsc && node dist/app.js",
    "build": "tsc"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "express": "^4.19.2"
  },
  "devDependencies": {
    "@types/express": "^4.17.21",
    "@types/node": "^20.11.17",
    "typescript": "^5.3.3"
  }
}

auth-service-node/tsconfig.json:

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "outDir": "./dist",
    "strict": true,
    "esModuleInterop": true,
    "forceConsistentCasingInFileNames": true
  },
  "include": ["app.ts"]
}

user-service-client.ts (Our Node.js client service):

// user-service-client.ts
import fetch from 'node-fetch'; // For Node.js, fetch needs to be imported or use native HTTP

interface UserDetails {
    id: number;
    username: string;
    email: string;
}

export class UserService {
    private authServiceBaseUrl: string;

    constructor(authServiceBaseUrl: string) {
        this.authServiceBaseUrl = authServiceBaseUrl;
    }

    public async getUserDetails(userId: string): Promise<UserDetails> {
        const response = await fetch(`${this.authServiceBaseUrl}/user/${userId}`);
        if (!response.ok) {
            throw new Error(`Failed to fetch user details: ${response.status} ${response.statusText}`);
        }
        return response.json() as Promise<UserDetails>;
    }
}

tests/web-service.test.ts (The Jest Testcontainers test):

// tests/web-service.test.ts
import { GenericContainer, StartedTestContainer } from 'testcontainers';
import { UserService } from '../user-service-client';
import path from 'path';

// Note: Testcontainers-Node uses `Node.js` fetch which is built-in in recent Node versions.
// For older versions, you might need `npm install node-fetch` and import it.

describe('Web Service Integration with Testcontainers', () => {
    let authServiceContainer: StartedTestContainer;
    let userService: UserService;
    let authServiceBaseUrl: string;

    beforeAll(async () => {
        // Step 1: Build and start the AuthService container from a local Dockerfile
        // We assume the Dockerfile is in the 'auth-service-node' directory relative to this test.
        const dockerfileContext = path.resolve(__dirname, '../auth-service-node');

        authServiceContainer = await new GenericContainer(dockerfileContext)
            .withExposedPorts(8080) // Port the Express app listens on
            .withWaitStrategy(Wait.forHttp('/user/123').forPort(8080).forStatusCode(200)) // Wait until ready
            .start();

        // Step 2: Get the dynamically mapped URL for our client
        authServiceBaseUrl = `http://${authServiceContainer.getHost()}:${authServiceContainer.getMappedPort(8080)}`;
        userService = new UserService(authServiceBaseUrl);

        console.log(`AuthService Node.js running at: ${authServiceBaseUrl}`);
    }, 60000); // Increased timeout for container build and startup

    afterAll(async () => {
        await authServiceContainer.stop();
    });

    test('should get user details from containerized AuthService', async () => {
        const userId = '123';
        const userDetails = await userService.getUserDetails(userId);

        expect(userDetails).toBeDefined();
        expect(userDetails.id).toBe(123);
        expect(userDetails.username).toBe('nodeuser');
        expect(userDetails.email).toBe('[email protected]');
        console.log(`Received user details: ${JSON.stringify(userDetails)}`);
    });

    test('should handle non-existent user correctly', async () => {
        const userId = '999';
        await expect(userService.getUserDetails(userId)).rejects.toThrow('Failed to fetch user details: 404 Not Found');
    });
});

Explanation:

  • new GenericContainer(dockerfileContext): Testcontainers-Node can take a path to a directory containing a Dockerfile as its first argument. It will then build the image locally and use it.
  • .withExposedPorts(8080): Exposes the internal port 8080.
  • .withWaitStrategy(Wait.forHttp('/user/123')...): Uses a built-in HTTP wait strategy to ensure the Express service is up and responsive before the test attempts to connect.
  • authServiceContainer.getHost() and authServiceContainer.getMappedPort(8080): Retrieves the dynamic host and port for our UserService client.
  • The tests then interact with the UserService and assert the responses, including error handling.

Mini-Challenge: Extend the Kafka Producer/Consumer

You’ve seen how to produce and consume a single message. Now, it’s your turn to add some complexity!

Challenge: Modify one of the Kafka examples (Java, Python, or Node.js) to:

  1. Produce multiple messages (e.g., 5-10 messages with unique IDs or data).
  2. Consume all produced messages and verify that you received exactly the number of messages sent and that their contents are correct.

Hints:

  • For producers, a simple loop around the producer.send() call will work. Remember producer.flush() if sending multiple messages quickly and expecting them immediately.
  • For consumers, you’ll need to poll multiple times or increase the poll duration (Java/Python) or refine your message collection logic (Node.js) to ensure all messages are captured. A while loop with a timeout is a robust way to collect all expected messages.
  • Consider using a List or Array to store received messages and then check its size and contents.

What to Observe/Learn:

  • How to handle batching messages and ensure they are all processed.
  • The importance of consumer polling strategies and timeouts when dealing with asynchronous message flows in tests.
  • Confirming message order (if your Kafka setup guarantees it and your test validates it).

Common Pitfalls & Troubleshooting

Working with message brokers and web services in containers can introduce some unique challenges. Here’s how to navigate them:

  1. Kafka Startup Timeouts:

    • Symptom: NoBrokersAvailable error, TimeoutException, or tests failing because Kafka isn’t ready. Kafka (and its Zookeeper dependency) can take a moment to initialize.
    • Fix:
      • Increase Testcontainers wait strategy timeout: If using KafkaContainer, Testcontainers has built-in wait strategies, but you can explicitly increase the startup timeout: .withStartupTimeout(Duration.ofSeconds(120)) (Java).
      • Implement client-side retries: As shown in the Python Kafka producer example, clients should ideally have retry logic with backoff. This makes your application more resilient and your tests more stable.
      • Check Docker logs: Inspect the container logs (kafka.getLogs() or docker logs <container_id>) for underlying issues.
  2. Web Service Reachability Issues (Port Mapping/Host):

    • Symptom: ConnectionRefused, ConnectException, or UnknownHostException when your client tries to connect to the containerized service.
    • Fix:
      • Verify withExposedPorts(): Ensure you’ve correctly exposed the port your service listens on within the container.
      • Use getMappedPort() and getHost(): Always retrieve the connection details dynamically from Testcontainers. Never hardcode localhost and a fixed port for container communication, as the port is dynamically assigned on the host.
      • Check container logs: Make sure your web service inside the container actually started successfully and is listening on the expected port.
  3. Asynchronous Assertions (Waiting for Events):

    • Symptom: Tests failing because they assert too quickly before an asynchronous event (like a Kafka message being processed or an API call completing) has occurred.
    • Fix:
      • Polling with a timeout: As demonstrated in the Node.js Kafka consumer, a while loop that periodically checks a condition (e.g., messages.length > 0) with a setTimeout or Thread.sleep and an overall timeout is a robust pattern.
      • Awaitability libraries: For more complex asynchronous scenarios, consider libraries that help with awaiting conditions, like Awaitility for Java or specific async/await patterns in JavaScript/Python.
      • Don’t over-rely on Thread.sleep(): While useful for simple demonstrations, excessive use of Thread.sleep() leads to slow and flaky tests. Prefer explicit wait strategies or polling loops with sensible timeouts.

Summary

Fantastic work! You’ve successfully navigated the complexities of integration testing with message brokers and web services using Testcontainers. Here’s a quick recap of what you’ve achieved:

  • Understanding the “Why”: You now grasp why testing against real instances of Kafka and dependent web services is crucial for distributed systems, moving beyond the limitations of mocks.
  • Kafka Mastery: You’ve learned how to spin up disposable Kafka containers, configure producers and consumers, and verify message flow across Java, Python, and Node.js.
  • Web Service Integration: You’ve mastered using GenericContainer and ImageFromDockerfile to bring custom or dependent microservices into your test environment, allowing your client services to interact with them realistically.
  • Dynamic Connectivity: You’re adept at using Testcontainers’ dynamic port mapping and host retrieval to ensure your applications connect correctly to these ephemeral services.
  • Robust Testing: You’ve started implementing wait strategies and handling asynchronous assertions, making your integration tests reliable.

You’re now ready to tackle even more sophisticated integration scenarios in your microservice landscape.

In the next chapter, we’ll shift our focus to integrating Testcontainers into your CI/CD pipelines, ensuring your robust tests run automatically and efficiently with every code change. Get ready to automate!

References


This page is AI-assisted and reviewed. It references official documentation and recognized resources where relevant.