Producer-Consumer Problem: Java Thread Solution
TL;DR
The Producer-Consumer problem is a classic concurrency pattern where one or more threads produce data and place it in a shared buffer, while other threads consume that data. Solving it correctly requires synchronization mechanisms like locks, semaphores, or blocking queues to prevent race conditions and ensure thread safety.
Core Concept
What is the Producer-Consumer Problem?
The Producer-Consumer problem models a scenario where producer threads generate data items and place them into a shared buffer, while consumer threads remove and process those items. The challenge is coordinating access to the buffer so that:
- Producers don’t add items when the buffer is full
- Consumers don’t remove items when the buffer is empty
- Multiple threads don’t corrupt the buffer through simultaneous access
This pattern appears everywhere in real systems: web servers handling requests, message queues, logging systems, video streaming, and data pipelines.
Why It Matters
Without proper synchronization, you get race conditions where threads interfere with each other, leading to lost data, crashes, or incorrect results. The producer-consumer problem teaches you fundamental concurrency skills:
- Mutual exclusion: Only one thread modifies the buffer at a time
- Condition synchronization: Threads wait for specific conditions (buffer not full/empty)
- Avoiding deadlock: Ensuring threads don’t wait forever
- Avoiding busy-waiting: Threads shouldn’t waste CPU checking conditions repeatedly
Core Components
Bounded Buffer: A fixed-size queue or array that holds items. When full, producers must wait. When empty, consumers must wait.
Synchronization Primitives: Tools to coordinate threads:
- Locks/Mutexes: Ensure only one thread accesses the buffer at a time
- Condition Variables: Allow threads to wait for specific conditions and be notified when conditions change
- Semaphores: Count-based synchronization (track empty/full slots)
- Blocking Queues: High-level abstractions that handle synchronization internally
The Classic Solution Pattern
- Acquire lock before accessing buffer
- Check if operation is possible (buffer not full for producer, not empty for consumer)
- If not possible, wait on a condition variable
- Perform operation (add/remove item)
- Signal waiting threads that conditions may have changed
- Release lock
This pattern prevents race conditions while avoiding busy-waiting and deadlock.
Visual Guide
Producer-Consumer Architecture
graph LR
P1[Producer 1] -->|put| B[Shared Buffer<br/>Fixed Size]
P2[Producer 2] -->|put| B
B -->|get| C1[Consumer 1]
B -->|get| C2[Consumer 2]
B -.->|full?| P1
B -.->|full?| P2
B -.->|empty?| C1
B -.->|empty?| C2
Multiple producers add items to a shared bounded buffer while multiple consumers remove items. Synchronization prevents overflow, underflow, and corruption.
Thread State Transitions
stateDiagram-v2
[*] --> Running
Running --> Waiting: Buffer full (Producer)<br/>Buffer empty (Consumer)
Waiting --> Running: Notified by other thread
Running --> [*]: Task complete
Producers wait when buffer is full, consumers wait when empty. Notification wakes waiting threads when conditions change.
Lock and Condition Variable Flow
sequenceDiagram
participant P as Producer
participant L as Lock
participant B as Buffer
participant CV as Condition Variable
participant C as Consumer
P->>L: acquire()
P->>B: check if full
alt Buffer Full
P->>CV: wait() [releases lock]
Note over P: Blocked
C->>L: acquire()
C->>B: remove item
C->>CV: notify()
C->>L: release()
CV->>P: wake up
P->>L: re-acquire()
end
P->>B: add item
P->>CV: notify()
P->>L: release()
Condition variables allow threads to wait efficiently and be notified when conditions change. The lock is automatically released during wait and re-acquired after notification.
Examples
Example 1: Basic Producer-Consumer with Locks and Condition Variables
This is the classic solution using low-level synchronization primitives.
import threading
import time
import random
class BoundedBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def produce(self, item):
with self.lock: # Acquire lock
# Wait while buffer is full
while len(self.buffer) >= self.capacity:
print(f"Buffer full. Producer waiting...")
self.not_full.wait() # Releases lock, waits, re-acquires
# Add item to buffer
self.buffer.append(item)
print(f"Produced: {item}. Buffer size: {len(self.buffer)}")
# Notify consumers that buffer is not empty
self.not_empty.notify()
def consume(self):
with self.lock: # Acquire lock
# Wait while buffer is empty
while len(self.buffer) == 0:
print(f"Buffer empty. Consumer waiting...")
self.not_empty.wait() # Releases lock, waits, re-acquires
# Remove item from buffer
item = self.buffer.pop(0)
print(f"Consumed: {item}. Buffer size: {len(self.buffer)}")
# Notify producers that buffer is not full
self.not_full.notify()
return item
def producer(buffer, num_items):
for i in range(num_items):
item = f"item-{i}"
buffer.produce(item)
time.sleep(random.uniform(0.1, 0.3)) # Simulate work
def consumer(buffer, num_items):
for i in range(num_items):
item = buffer.consume()
time.sleep(random.uniform(0.2, 0.5)) # Simulate processing
# Run simulation
buffer = BoundedBuffer(capacity=3)
producer_thread = threading.Thread(target=producer, args=(buffer, 10))
consumer_thread = threading.Thread(target=consumer, args=(buffer, 10))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
print("All items produced and consumed.")
Expected Output (order may vary due to thread scheduling):
Produced: item-0. Buffer size: 1
Produced: item-1. Buffer size: 2
Consumed: item-0. Buffer size: 1
Produced: item-2. Buffer size: 2
Produced: item-3. Buffer size: 3
Buffer full. Producer waiting...
Consumed: item-1. Buffer size: 2
Produced: item-4. Buffer size: 3
Consumed: item-2. Buffer size: 2
...
All items produced and consumed.
Key Points:
- Use
while(notif) to check conditions — handles spurious wakeups wait()automatically releases the lock and re-acquires it after notificationnotify()wakes one waiting thread;notify_all()wakes all- The
withstatement ensures lock is released even if exceptions occur
Try it yourself: Modify the code to use multiple producers and consumers. What happens to the output?
Example 2: Using Python’s Queue (Blocking Queue)
Python’s queue.Queue is a thread-safe blocking queue that handles all synchronization internally.
import threading
import queue
import time
import random
def producer(q, num_items, producer_id):
for i in range(num_items):
item = f"P{producer_id}-item-{i}"
q.put(item) # Blocks if queue is full
print(f"Producer {producer_id} produced: {item}")
time.sleep(random.uniform(0.1, 0.3))
print(f"Producer {producer_id} finished.")
def consumer(q, consumer_id):
while True:
try:
# Block for up to 2 seconds waiting for item
item = q.get(timeout=2)
print(f"Consumer {consumer_id} consumed: {item}")
time.sleep(random.uniform(0.2, 0.4))
q.task_done() # Signal that processing is complete
except queue.Empty:
print(f"Consumer {consumer_id} timed out, exiting.")
break
# Create bounded queue with capacity 5
work_queue = queue.Queue(maxsize=5)
# Create 2 producers and 3 consumers
producers = [
threading.Thread(target=producer, args=(work_queue, 5, i))
for i in range(2)
]
consumers = [
threading.Thread(target=consumer, args=(work_queue, i))
for i in range(3)
]
# Start all threads
for p in producers:
p.start()
for c in consumers:
c.start()
# Wait for all producers to finish
for p in producers:
p.join()
# Wait for queue to be empty
work_queue.join()
print("All work completed.")
# Consumers will timeout and exit
for c in consumers:
c.join()
Expected Output (order varies):
Producer 0 produced: P0-item-0
Producer 1 produced: P1-item-0
Consumer 0 consumed: P0-item-0
Consumer 1 consumed: P1-item-0
Producer 0 produced: P0-item-1
Producer 1 produced: P1-item-1
Consumer 2 consumed: P0-item-1
...
Producer 0 finished.
Producer 1 finished.
All work completed.
Consumer 0 timed out, exiting.
Consumer 1 timed out, exiting.
Consumer 2 timed out, exiting.
Key Points:
Queue(maxsize=n)creates a bounded queue;put()blocks when fullget(timeout=n)blocks until item available or timeout expirestask_done()andjoin()allow waiting for all work to complete- Much simpler than manual lock/condition variable management
- Java equivalent:
BlockingQueueinterface with implementations likeArrayBlockingQueue - C++ equivalent: No standard library equivalent until C++20’s
std::jthread; use libraries like Boost or implement manually
Try it yourself: Add a poison pill pattern — producers send a special “STOP” message to signal consumers to exit gracefully.
Example 3: Semaphore-Based Solution
Semaphores track available resources (empty slots and full slots).
import threading
import time
import random
class SemaphoreBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.mutex = threading.Lock() # Protects buffer access
self.empty_slots = threading.Semaphore(capacity) # Counts empty slots
self.full_slots = threading.Semaphore(0) # Counts full slots
def produce(self, item):
self.empty_slots.acquire() # Wait for empty slot (decrement)
with self.mutex:
self.buffer.append(item)
print(f"Produced: {item}. Buffer: {self.buffer}")
self.full_slots.release() # Signal full slot available (increment)
def consume(self):
self.full_slots.acquire() # Wait for full slot (decrement)
with self.mutex:
item = self.buffer.pop(0)
print(f"Consumed: {item}. Buffer: {self.buffer}")
self.empty_slots.release() # Signal empty slot available (increment)
return item
def producer(buffer, num_items):
for i in range(num_items):
buffer.produce(f"item-{i}")
time.sleep(random.uniform(0.1, 0.2))
def consumer(buffer, num_items):
for i in range(num_items):
buffer.consume()
time.sleep(random.uniform(0.15, 0.25))
buffer = SemaphoreBuffer(capacity=3)
p_thread = threading.Thread(target=producer, args=(buffer, 8))
c_thread = threading.Thread(target=consumer, args=(buffer, 8))
p_thread.start()
c_thread.start()
p_thread.join()
c_thread.join()
print("Done.")
Expected Output:
Produced: item-0. Buffer: ['item-0']
Produced: item-1. Buffer: ['item-0', 'item-1']
Consumed: item-0. Buffer: ['item-1']
Produced: item-2. Buffer: ['item-1', 'item-2']
Produced: item-3. Buffer: ['item-1', 'item-2', 'item-3']
Consumed: item-1. Buffer: ['item-2', 'item-3']
...
Done.
Key Points:
- Semaphores count resources:
empty_slotsstarts at capacity,full_slotsstarts at 0 acquire()decrements (waits if count is 0);release()increments- Still need mutex to protect actual buffer manipulation
- Order matters: acquire semaphore first, then lock; release lock first, then semaphore
- Java: Use
java.util.concurrent.Semaphore - C++: Use
std::counting_semaphore(C++20) or platform-specific APIs
Try it yourself: What happens if you swap the order of semaphore and mutex operations? Try it and observe deadlock.
Common Mistakes
1. Using if Instead of while for Condition Checks
Wrong:
if len(self.buffer) >= self.capacity:
self.not_full.wait()
Why it’s wrong: Condition variables can have spurious wakeups — threads wake up even when not notified. Also, another thread might consume the condition between notification and re-acquiring the lock.
Correct:
while len(self.buffer) >= self.capacity:
self.not_full.wait()
Always use while to re-check the condition after waking up.
2. Forgetting to Notify Waiting Threads
Wrong:
def produce(self, item):
with self.lock:
while len(self.buffer) >= self.capacity:
self.not_full.wait()
self.buffer.append(item)
# Missing: self.not_empty.notify()
Why it’s wrong: Consumers waiting on not_empty will never wake up, causing deadlock.
Correct: Always call notify() or notify_all() after changing conditions that other threads are waiting for.
3. Holding Lock While Doing Expensive Operations
Wrong:
def consume(self):
with self.lock:
item = self.buffer.pop(0)
process_item(item) # Expensive operation inside lock!
self.not_full.notify()
Why it’s wrong: Holding the lock during processing prevents other threads from accessing the buffer, reducing concurrency and performance.
Correct:
def consume(self):
with self.lock:
item = self.buffer.pop(0)
self.not_full.notify()
process_item(item) # Process outside the lock
Only hold locks for the minimum time necessary to maintain consistency.
4. Incorrect Semaphore Initialization
Wrong:
self.empty_slots = threading.Semaphore(0) # Should be capacity!
self.full_slots = threading.Semaphore(capacity) # Should be 0!
Why it’s wrong: Semaphores represent available resources. Initially, all slots are empty (capacity empty slots, 0 full slots). Reversing this causes immediate deadlock.
Correct:
self.empty_slots = threading.Semaphore(capacity)
self.full_slots = threading.Semaphore(0)
5. Not Handling Queue.Empty or Queue.Full Exceptions
Wrong:
item = q.get(block=False) # Raises Queue.Empty if empty
Why it’s wrong: Without exception handling, the thread crashes when the queue is empty.
Correct:
try:
item = q.get(timeout=1)
except queue.Empty:
# Handle empty queue gracefully
break
Or use blocking calls with appropriate timeouts and error handling.
Interview Tips
1. Start with the High-Level Approach
When asked about producer-consumer, begin with: “I’d use a blocking queue like Python’s queue.Queue or Java’s BlockingQueue because it handles synchronization internally.” This shows you know the practical solution. Then add: “If we need to implement it from scratch, I’d use locks with condition variables.”
Interviewers appreciate knowing you’d use existing tools first, but can implement from scratch if needed.
2. Clarify Requirements Before Coding
Ask these questions:
- Bounded or unbounded buffer? (Affects whether producers ever wait)
- Single or multiple producers/consumers? (Affects complexity)
- What happens when buffer is full/empty? (Block, drop, throw exception?)
- Any priority or ordering requirements? (FIFO, LIFO, priority queue?)
This demonstrates systems thinking and prevents solving the wrong problem.
3. Explain the Synchronization Invariants
State the conditions you’re maintaining:
- “The buffer size never exceeds capacity”
- “Consumers never read from an empty buffer”
- “No two threads modify the buffer simultaneously”
Then explain how your code enforces each invariant. This shows deep understanding.
4. Discuss Trade-offs Between Approaches
Blocking Queue: Simplest, least error-prone, best for most cases. Downside: less control over behavior.
Lock + Condition Variables: More control, educational value. Downside: more complex, easier to make mistakes.
Semaphores: Good for counting resources. Downside: easy to get initialization wrong, still need mutex for buffer access.
Mention: “In production, I’d use a blocking queue. For learning or special requirements, I’d implement with condition variables.”
5. Know Common Variations
Be ready to discuss:
- Multiple buffers: Separate queues for different priorities
- Poison pill pattern: Special message to signal shutdown
- Work stealing: Consumers can steal from each other’s queues
- Backpressure: Slowing producers when consumers can’t keep up
- Batch processing: Producing/consuming multiple items at once
Example: “If we need graceful shutdown, I’d use a poison pill — producers send a STOP message, and consumers exit when they receive it.”
6. Mention Real-World Applications
Connect to systems you’ve used:
- “This is like Kafka — producers publish messages, consumers read from partitions”
- “Thread pools use this pattern — tasks are produced, worker threads consume them”
- “Logging systems buffer log messages to avoid blocking application threads”
This shows you understand practical applications, not just theory.
7. Be Ready to Debug
If asked “What if this code deadlocks?”, check:
- Are all waits inside while loops?
- Is every state change followed by appropriate notify?
- Are locks acquired in consistent order?
- Are semaphores initialized correctly?
Walk through a scenario: “If producer fills buffer and consumer hasn’t started, producer waits on not_full. When consumer starts and removes item, it calls notify on not_empty — wait, that’s wrong! It should notify not_full.”
8. Code Incrementally
Don’t try to write the complete solution at once:
- Start with buffer structure and basic add/remove (no synchronization)
- Add lock for mutual exclusion
- Add condition variables and wait logic
- Add notify calls
- Test with example scenario
This shows structured thinking and makes it easier to catch errors early.
Key Takeaways
-
The producer-consumer problem requires synchronizing access to a shared bounded buffer where producers add items and consumers remove them, preventing overflow, underflow, and race conditions.
-
Use blocking queues (like Python’s
queue.Queueor Java’sBlockingQueue) in production code — they handle all synchronization internally and are less error-prone than manual implementations. -
When implementing from scratch, use locks with condition variables: acquire lock, check condition in a
whileloop (notif), wait if necessary, perform operation, notify waiting threads, release lock. -
Common mistakes include using
ifinstead ofwhilefor condition checks, forgetting to notify waiting threads, and holding locks during expensive operations — these lead to spurious wakeups, deadlock, and poor performance. -
In interviews, clarify requirements first (bounded/unbounded, single/multiple threads), explain synchronization invariants, discuss trade-offs between approaches, and connect to real-world systems like message queues, thread pools, and logging frameworks.