java multi-thread producer consumer example

Producer and consumer problem is a synchronization issue when multiple processes/threads are sharing a single resource. The producer and the consumer are two separate processes/threads, but they use the same resource/buffer for producing and consuming. Problems could happen when producer and consumer are both accessing the buffer at the same time or one of the process/thread is running faster than the other. For example, the producer runs faster than the consumer, it’s possible the producer might overwrite the previous thing in the buffer before it is consumed by the consumer.

To solve the synchronization issue, the producer should only put things to the buffer when the buffer is not filled, and the consumer should only take things from the buffer when there are things in the buffer. The buffer should only be accessed by one process/thread at a time.

Here is a Java program solves the producer and consumer problem by using synchronized method to produce and consume data. The message in the pipe class is the buffer.

public class ProducerConsumer {
	//The pipe holds a message and there is flag for determining if the pipe has a message in it or not.
	//The pipe has two methods, take() and put().
	//put() puts a message when there is no message. take() takes the message when there is a message.
	public static class Pipe {
	    private String message;
	    private boolean empty = true;	    
	    public synchronized void put(String message) {
	        while (!empty) {
	            try { 
	                wait();
	            } catch (InterruptedException e) {
	            	System.out.println(e);
	            }
	        }
	        empty = false;
	        this.message = message;
	        notifyAll();
	    }
	    public synchronized String take() {
	        while (empty) {
	            try {
	                wait();
	            } catch (InterruptedException e) {
	            	System.out.println(e);
	            }
	        }
	        empty = true;
	        notifyAll();
	        return this.message;
	    }
	}
	
	//Producer puts a message to the pipe, and puts the next one when the previous message is consumed.
	public static class Producer implements Runnable {
	    private Pipe pipe;
	    public Producer(Pipe pipe) {
	        this.pipe = pipe;
	    }
	    public void run() {
	        String importantInfo[] = {
	            "A baby girl was born.",
	            "The baby girl was named Mary.",
	            "Mary learned how to walk at age 1.",
	            "Mary learned how to speak at age 2.",
	            "Mary went to kindergarden at the age 3."
	        };
	        Random random = new Random();
	        for (int i = 0; i < importantInfo.length; i++) {
	            pipe.put(importantInfo[i]);
	            try {
	                Thread.sleep(random.nextInt(5000));
	            } catch (InterruptedException e) {
	            	System.out.println(e);
	            }
	        }
	        pipe.put("DONE");
	    }
	}
	
	//Consumer consumes the message when there is a message in the pipe.
	public static class Consumer implements Runnable {
	    private Pipe pipe;
	    public Consumer(Pipe pipe) {
	        this.pipe = pipe;
	    }
	    public void run() {
	        Random random = new Random();
	        for (String message = pipe.take(); !message.equals("DONE"); message = pipe.take()) {
	            System.out.println("MESSAGE: " + message);
	            try {
	                Thread.sleep(random.nextInt(5000));
	            } catch (InterruptedException e) {
	            	System.out.println(e);
	            }
	        }
	    }
	}
	
	//Start the producer and consumer demo
	 public static void main(String[] args) {
	        Pipe drop = new Pipe();
	        (new Thread(new Producer(drop))).start();
	        (new Thread(new Consumer(drop))).start();
	 }
}

Search within Codexpedia

Custom Search

Search the entire web

Custom Search