Producer-Consumer Problem: Java Thread Solution

Updated 2026-03-11

TL;DR

The Producer-Consumer problem is a classic concurrency pattern where one or more threads produce data and place it in a shared buffer, while other threads consume that data. Solving it correctly requires synchronization mechanisms like locks, semaphores, or blocking queues to prevent race conditions and ensure thread safety.

Prerequisites: Understanding of threads and basic concurrency concepts, familiarity with race conditions and critical sections, knowledge of locks/mutexes, basic Python threading module or Java/C++ threading libraries.

After this topic: Implement thread-safe producer-consumer solutions using multiple synchronization approaches (locks with condition variables, semaphores, blocking queues), identify when to use each approach, and debug common concurrency issues like deadlock and busy-waiting in producer-consumer scenarios.

Core Concept

What is the Producer-Consumer Problem?

The Producer-Consumer problem models a scenario where producer threads generate data items and place them into a shared buffer, while consumer threads remove and process those items. The challenge is coordinating access to the buffer so that:

  • Producers don’t add items when the buffer is full
  • Consumers don’t remove items when the buffer is empty
  • Multiple threads don’t corrupt the buffer through simultaneous access

This pattern appears everywhere in real systems: web servers handling requests, message queues, logging systems, video streaming, and data pipelines.

Why It Matters

Without proper synchronization, you get race conditions where threads interfere with each other, leading to lost data, crashes, or incorrect results. The producer-consumer problem teaches you fundamental concurrency skills:

  • Mutual exclusion: Only one thread modifies the buffer at a time
  • Condition synchronization: Threads wait for specific conditions (buffer not full/empty)
  • Avoiding deadlock: Ensuring threads don’t wait forever
  • Avoiding busy-waiting: Threads shouldn’t waste CPU checking conditions repeatedly

Core Components

Bounded Buffer: A fixed-size queue or array that holds items. When full, producers must wait. When empty, consumers must wait.

Synchronization Primitives: Tools to coordinate threads:

  • Locks/Mutexes: Ensure only one thread accesses the buffer at a time
  • Condition Variables: Allow threads to wait for specific conditions and be notified when conditions change
  • Semaphores: Count-based synchronization (track empty/full slots)
  • Blocking Queues: High-level abstractions that handle synchronization internally

The Classic Solution Pattern

  1. Acquire lock before accessing buffer
  2. Check if operation is possible (buffer not full for producer, not empty for consumer)
  3. If not possible, wait on a condition variable
  4. Perform operation (add/remove item)
  5. Signal waiting threads that conditions may have changed
  6. Release lock

This pattern prevents race conditions while avoiding busy-waiting and deadlock.

Visual Guide

Producer-Consumer Architecture

graph LR
    P1[Producer 1] -->|put| B[Shared Buffer<br/>Fixed Size]
    P2[Producer 2] -->|put| B
    B -->|get| C1[Consumer 1]
    B -->|get| C2[Consumer 2]
    B -.->|full?| P1
    B -.->|full?| P2
    B -.->|empty?| C1
    B -.->|empty?| C2

Multiple producers add items to a shared bounded buffer while multiple consumers remove items. Synchronization prevents overflow, underflow, and corruption.

Thread State Transitions

stateDiagram-v2
    [*] --> Running
    Running --> Waiting: Buffer full (Producer)<br/>Buffer empty (Consumer)
    Waiting --> Running: Notified by other thread
    Running --> [*]: Task complete

Producers wait when buffer is full, consumers wait when empty. Notification wakes waiting threads when conditions change.

Lock and Condition Variable Flow

sequenceDiagram
    participant P as Producer
    participant L as Lock
    participant B as Buffer
    participant CV as Condition Variable
    participant C as Consumer
    
    P->>L: acquire()
    P->>B: check if full
    alt Buffer Full
        P->>CV: wait() [releases lock]
        Note over P: Blocked
        C->>L: acquire()
        C->>B: remove item
        C->>CV: notify()
        C->>L: release()
        CV->>P: wake up
        P->>L: re-acquire()
    end
    P->>B: add item
    P->>CV: notify()
    P->>L: release()

Condition variables allow threads to wait efficiently and be notified when conditions change. The lock is automatically released during wait and re-acquired after notification.

Examples

Example 1: Basic Producer-Consumer with Locks and Condition Variables

This is the classic solution using low-level synchronization primitives.

import threading
import time
import random

class BoundedBuffer:
    def __init__(self, capacity):
        self.buffer = []
        self.capacity = capacity
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)
    
    def produce(self, item):
        with self.lock:  # Acquire lock
            # Wait while buffer is full
            while len(self.buffer) >= self.capacity:
                print(f"Buffer full. Producer waiting...")
                self.not_full.wait()  # Releases lock, waits, re-acquires
            
            # Add item to buffer
            self.buffer.append(item)
            print(f"Produced: {item}. Buffer size: {len(self.buffer)}")
            
            # Notify consumers that buffer is not empty
            self.not_empty.notify()
    
    def consume(self):
        with self.lock:  # Acquire lock
            # Wait while buffer is empty
            while len(self.buffer) == 0:
                print(f"Buffer empty. Consumer waiting...")
                self.not_empty.wait()  # Releases lock, waits, re-acquires
            
            # Remove item from buffer
            item = self.buffer.pop(0)
            print(f"Consumed: {item}. Buffer size: {len(self.buffer)}")
            
            # Notify producers that buffer is not full
            self.not_full.notify()
            return item

def producer(buffer, num_items):
    for i in range(num_items):
        item = f"item-{i}"
        buffer.produce(item)
        time.sleep(random.uniform(0.1, 0.3))  # Simulate work

def consumer(buffer, num_items):
    for i in range(num_items):
        item = buffer.consume()
        time.sleep(random.uniform(0.2, 0.5))  # Simulate processing

# Run simulation
buffer = BoundedBuffer(capacity=3)

producer_thread = threading.Thread(target=producer, args=(buffer, 10))
consumer_thread = threading.Thread(target=consumer, args=(buffer, 10))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

print("All items produced and consumed.")

Expected Output (order may vary due to thread scheduling):

Produced: item-0. Buffer size: 1
Produced: item-1. Buffer size: 2
Consumed: item-0. Buffer size: 1
Produced: item-2. Buffer size: 2
Produced: item-3. Buffer size: 3
Buffer full. Producer waiting...
Consumed: item-1. Buffer size: 2
Produced: item-4. Buffer size: 3
Consumed: item-2. Buffer size: 2
...
All items produced and consumed.

Key Points:

  • Use while (not if) to check conditions — handles spurious wakeups
  • wait() automatically releases the lock and re-acquires it after notification
  • notify() wakes one waiting thread; notify_all() wakes all
  • The with statement ensures lock is released even if exceptions occur

Try it yourself: Modify the code to use multiple producers and consumers. What happens to the output?


Example 2: Using Python’s Queue (Blocking Queue)

Python’s queue.Queue is a thread-safe blocking queue that handles all synchronization internally.

import threading
import queue
import time
import random

def producer(q, num_items, producer_id):
    for i in range(num_items):
        item = f"P{producer_id}-item-{i}"
        q.put(item)  # Blocks if queue is full
        print(f"Producer {producer_id} produced: {item}")
        time.sleep(random.uniform(0.1, 0.3))
    print(f"Producer {producer_id} finished.")

def consumer(q, consumer_id):
    while True:
        try:
            # Block for up to 2 seconds waiting for item
            item = q.get(timeout=2)
            print(f"Consumer {consumer_id} consumed: {item}")
            time.sleep(random.uniform(0.2, 0.4))
            q.task_done()  # Signal that processing is complete
        except queue.Empty:
            print(f"Consumer {consumer_id} timed out, exiting.")
            break

# Create bounded queue with capacity 5
work_queue = queue.Queue(maxsize=5)

# Create 2 producers and 3 consumers
producers = [
    threading.Thread(target=producer, args=(work_queue, 5, i))
    for i in range(2)
]
consumers = [
    threading.Thread(target=consumer, args=(work_queue, i))
    for i in range(3)
]

# Start all threads
for p in producers:
    p.start()
for c in consumers:
    c.start()

# Wait for all producers to finish
for p in producers:
    p.join()

# Wait for queue to be empty
work_queue.join()

print("All work completed.")

# Consumers will timeout and exit
for c in consumers:
    c.join()

Expected Output (order varies):

Producer 0 produced: P0-item-0
Producer 1 produced: P1-item-0
Consumer 0 consumed: P0-item-0
Consumer 1 consumed: P1-item-0
Producer 0 produced: P0-item-1
Producer 1 produced: P1-item-1
Consumer 2 consumed: P0-item-1
...
Producer 0 finished.
Producer 1 finished.
All work completed.
Consumer 0 timed out, exiting.
Consumer 1 timed out, exiting.
Consumer 2 timed out, exiting.

Key Points:

  • Queue(maxsize=n) creates a bounded queue; put() blocks when full
  • get(timeout=n) blocks until item available or timeout expires
  • task_done() and join() allow waiting for all work to complete
  • Much simpler than manual lock/condition variable management
  • Java equivalent: BlockingQueue interface with implementations like ArrayBlockingQueue
  • C++ equivalent: No standard library equivalent until C++20’s std::jthread; use libraries like Boost or implement manually

Try it yourself: Add a poison pill pattern — producers send a special “STOP” message to signal consumers to exit gracefully.


Example 3: Semaphore-Based Solution

Semaphores track available resources (empty slots and full slots).

import threading
import time
import random

class SemaphoreBuffer:
    def __init__(self, capacity):
        self.buffer = []
        self.capacity = capacity
        self.mutex = threading.Lock()  # Protects buffer access
        self.empty_slots = threading.Semaphore(capacity)  # Counts empty slots
        self.full_slots = threading.Semaphore(0)  # Counts full slots
    
    def produce(self, item):
        self.empty_slots.acquire()  # Wait for empty slot (decrement)
        with self.mutex:
            self.buffer.append(item)
            print(f"Produced: {item}. Buffer: {self.buffer}")
        self.full_slots.release()  # Signal full slot available (increment)
    
    def consume(self):
        self.full_slots.acquire()  # Wait for full slot (decrement)
        with self.mutex:
            item = self.buffer.pop(0)
            print(f"Consumed: {item}. Buffer: {self.buffer}")
        self.empty_slots.release()  # Signal empty slot available (increment)
        return item

def producer(buffer, num_items):
    for i in range(num_items):
        buffer.produce(f"item-{i}")
        time.sleep(random.uniform(0.1, 0.2))

def consumer(buffer, num_items):
    for i in range(num_items):
        buffer.consume()
        time.sleep(random.uniform(0.15, 0.25))

buffer = SemaphoreBuffer(capacity=3)

p_thread = threading.Thread(target=producer, args=(buffer, 8))
c_thread = threading.Thread(target=consumer, args=(buffer, 8))

p_thread.start()
c_thread.start()

p_thread.join()
c_thread.join()

print("Done.")

Expected Output:

Produced: item-0. Buffer: ['item-0']
Produced: item-1. Buffer: ['item-0', 'item-1']
Consumed: item-0. Buffer: ['item-1']
Produced: item-2. Buffer: ['item-1', 'item-2']
Produced: item-3. Buffer: ['item-1', 'item-2', 'item-3']
Consumed: item-1. Buffer: ['item-2', 'item-3']
...
Done.

Key Points:

  • Semaphores count resources: empty_slots starts at capacity, full_slots starts at 0
  • acquire() decrements (waits if count is 0); release() increments
  • Still need mutex to protect actual buffer manipulation
  • Order matters: acquire semaphore first, then lock; release lock first, then semaphore
  • Java: Use java.util.concurrent.Semaphore
  • C++: Use std::counting_semaphore (C++20) or platform-specific APIs

Try it yourself: What happens if you swap the order of semaphore and mutex operations? Try it and observe deadlock.

Common Mistakes

1. Using if Instead of while for Condition Checks

Wrong:

if len(self.buffer) >= self.capacity:
    self.not_full.wait()

Why it’s wrong: Condition variables can have spurious wakeups — threads wake up even when not notified. Also, another thread might consume the condition between notification and re-acquiring the lock.

Correct:

while len(self.buffer) >= self.capacity:
    self.not_full.wait()

Always use while to re-check the condition after waking up.


2. Forgetting to Notify Waiting Threads

Wrong:

def produce(self, item):
    with self.lock:
        while len(self.buffer) >= self.capacity:
            self.not_full.wait()
        self.buffer.append(item)
        # Missing: self.not_empty.notify()

Why it’s wrong: Consumers waiting on not_empty will never wake up, causing deadlock.

Correct: Always call notify() or notify_all() after changing conditions that other threads are waiting for.


3. Holding Lock While Doing Expensive Operations

Wrong:

def consume(self):
    with self.lock:
        item = self.buffer.pop(0)
        process_item(item)  # Expensive operation inside lock!
        self.not_full.notify()

Why it’s wrong: Holding the lock during processing prevents other threads from accessing the buffer, reducing concurrency and performance.

Correct:

def consume(self):
    with self.lock:
        item = self.buffer.pop(0)
        self.not_full.notify()
    process_item(item)  # Process outside the lock

Only hold locks for the minimum time necessary to maintain consistency.


4. Incorrect Semaphore Initialization

Wrong:

self.empty_slots = threading.Semaphore(0)  # Should be capacity!
self.full_slots = threading.Semaphore(capacity)  # Should be 0!

Why it’s wrong: Semaphores represent available resources. Initially, all slots are empty (capacity empty slots, 0 full slots). Reversing this causes immediate deadlock.

Correct:

self.empty_slots = threading.Semaphore(capacity)
self.full_slots = threading.Semaphore(0)

5. Not Handling Queue.Empty or Queue.Full Exceptions

Wrong:

item = q.get(block=False)  # Raises Queue.Empty if empty

Why it’s wrong: Without exception handling, the thread crashes when the queue is empty.

Correct:

try:
    item = q.get(timeout=1)
except queue.Empty:
    # Handle empty queue gracefully
    break

Or use blocking calls with appropriate timeouts and error handling.

Interview Tips

1. Start with the High-Level Approach

When asked about producer-consumer, begin with: “I’d use a blocking queue like Python’s queue.Queue or Java’s BlockingQueue because it handles synchronization internally.” This shows you know the practical solution. Then add: “If we need to implement it from scratch, I’d use locks with condition variables.”

Interviewers appreciate knowing you’d use existing tools first, but can implement from scratch if needed.


2. Clarify Requirements Before Coding

Ask these questions:

  • Bounded or unbounded buffer? (Affects whether producers ever wait)
  • Single or multiple producers/consumers? (Affects complexity)
  • What happens when buffer is full/empty? (Block, drop, throw exception?)
  • Any priority or ordering requirements? (FIFO, LIFO, priority queue?)

This demonstrates systems thinking and prevents solving the wrong problem.


3. Explain the Synchronization Invariants

State the conditions you’re maintaining:

  • “The buffer size never exceeds capacity”
  • “Consumers never read from an empty buffer”
  • “No two threads modify the buffer simultaneously”

Then explain how your code enforces each invariant. This shows deep understanding.


4. Discuss Trade-offs Between Approaches

Blocking Queue: Simplest, least error-prone, best for most cases. Downside: less control over behavior.

Lock + Condition Variables: More control, educational value. Downside: more complex, easier to make mistakes.

Semaphores: Good for counting resources. Downside: easy to get initialization wrong, still need mutex for buffer access.

Mention: “In production, I’d use a blocking queue. For learning or special requirements, I’d implement with condition variables.”


5. Know Common Variations

Be ready to discuss:

  • Multiple buffers: Separate queues for different priorities
  • Poison pill pattern: Special message to signal shutdown
  • Work stealing: Consumers can steal from each other’s queues
  • Backpressure: Slowing producers when consumers can’t keep up
  • Batch processing: Producing/consuming multiple items at once

Example: “If we need graceful shutdown, I’d use a poison pill — producers send a STOP message, and consumers exit when they receive it.”


6. Mention Real-World Applications

Connect to systems you’ve used:

  • “This is like Kafka — producers publish messages, consumers read from partitions”
  • “Thread pools use this pattern — tasks are produced, worker threads consume them”
  • “Logging systems buffer log messages to avoid blocking application threads”

This shows you understand practical applications, not just theory.


7. Be Ready to Debug

If asked “What if this code deadlocks?”, check:

  • Are all waits inside while loops?
  • Is every state change followed by appropriate notify?
  • Are locks acquired in consistent order?
  • Are semaphores initialized correctly?

Walk through a scenario: “If producer fills buffer and consumer hasn’t started, producer waits on not_full. When consumer starts and removes item, it calls notify on not_empty — wait, that’s wrong! It should notify not_full.”


8. Code Incrementally

Don’t try to write the complete solution at once:

  1. Start with buffer structure and basic add/remove (no synchronization)
  2. Add lock for mutual exclusion
  3. Add condition variables and wait logic
  4. Add notify calls
  5. Test with example scenario

This shows structured thinking and makes it easier to catch errors early.

Key Takeaways

  • The producer-consumer problem requires synchronizing access to a shared bounded buffer where producers add items and consumers remove them, preventing overflow, underflow, and race conditions.

  • Use blocking queues (like Python’s queue.Queue or Java’s BlockingQueue) in production code — they handle all synchronization internally and are less error-prone than manual implementations.

  • When implementing from scratch, use locks with condition variables: acquire lock, check condition in a while loop (not if), wait if necessary, perform operation, notify waiting threads, release lock.

  • Common mistakes include using if instead of while for condition checks, forgetting to notify waiting threads, and holding locks during expensive operations — these lead to spurious wakeups, deadlock, and poor performance.

  • In interviews, clarify requirements first (bounded/unbounded, single/multiple threads), explain synchronization invariants, discuss trade-offs between approaches, and connect to real-world systems like message queues, thread pools, and logging frameworks.