java - Single Producer Multiple Consumer - queue contains null -


can explain how consumer-2 below consuming 'null'? code should preventing this.

public class test {  public static void main(string args[]) throws interruptedexception {      boundedqueue<integer> sharedqueue = new boundedqueue<>(10);      callable<integer> producer1 = new producer(sharedqueue, "producer-1");     //callable<integer> producer2 = new producer(sharedqueue, "producer-2");     callable<integer> consumer1 = new consumer(sharedqueue, "consumer-1");     callable<integer> consumer2 = new consumer(sharedqueue, "consumer-2");      collection<callable<integer>> callables = new hashset<>();     callables.add(producer1);     //callables.add(producer2);     callables.add(consumer1);     callables.add(consumer2);      executorservice executorservice = executors.newfixedthreadpool(10);     executorservice.invokeall(callables); } }  public class boundedqueue<t> {  private int capacity; private int head; private int tail; private int currentsizeofbuffer; private t[] buffer;  private final reentrantlock lock = new reentrantlock(); private final condition notfull = lock.newcondition(); private final condition notempty = lock.newcondition();  public boundedqueue(int capacity) {     this.capacity = capacity;     this.buffer = (t[]) new object[capacity]; }  public void put(t element) throws interruptedexception {      final reentrantlock lock = this.lock;     lock.lock();      if(isbufferfull()) {         waitonavailableslot();     }      try {         buffer[tail] = element;         tail = getnextavailableslot(tail);         currentsizeofbuffer++;          informconsumerqueuehaselement();      } {         lock.unlock();     } }  private boolean isbufferfull() {     return capacity == currentsizeofbuffer; }  private void waitonavailableslot() throws interruptedexception {     notfull.await(); }  private void informconsumerqueuehaselement() {     notempty.signal(); }  public t take() throws interruptedexception {      final reentrantlock lock = this.lock;     lock.lock();      if(isbufferempty()) {         waitonavailableelement();     }      try {         t element = buffer[head];         head = getnextavailableslot(head);         currentsizeofbuffer--;          informproducerqueuehasspaceavailable();          return element;     } {         lock.unlock();     } }  private boolean isbufferempty() {     return 0 == currentsizeofbuffer; }  private void waitonavailableelement() throws interruptedexception {     notempty.await(); }  private void informproducerqueuehasspaceavailable() {     notfull.signal(); }  private final int getnextavailableslot(int currentslotposition) {     int nextavailableslot = ++currentslotposition;     return (nextavailableslot == capacity) ? 0 : nextavailableslot; } }   public class producer implements callable<integer> {  private final boundedqueue sharedqueue; private string name;  @override public integer call() throws exception {      for(int i=0; i<10; i++){         try {             sharedqueue.put(i);             system.out.println(name + " produced: " + i);         } catch (interruptedexception ex) {             logger.getlogger(producer.class.getname()).log(level.severe, null, ex);         }     }     return null; }  public producer(boundedqueue sharedqueue, string name) {     this.sharedqueue = sharedqueue;     this.name = name; } }  public class consumer implements callable<integer> {  private final boundedqueue sharedqueue; private string name;  @override public integer call() throws exception {      while(true){    //what happening here?         try {             integer element = (integer) sharedqueue.take();             system.out.println(name + " consumed: "+ element);         } catch (interruptedexception ex) {             logger.getlogger(consumer.class.getname()).log(level.severe, null, ex);         }     } }  public consumer(boundedqueue sharedqueue, string name) {     this.sharedqueue = sharedqueue;     this.name = name; } } 

output:

  • producer-2 produced: 0
  • consumer-2 consumed: null
  • consumer-1 consumed: 0
  • producer-2 produced: 1
  • producer-2 produced: 2
  • consumer-2 consumed: 2
  • consumer-1 consumed: 0
  • producer-1 produced: 0
  • consumer-2 consumed: 3
  • etc

another run:

  • producer-2 produced: 0
  • consumer-1 consumed: 0
  • consumer-2 consumed: null
  • producer-1 produced: 0
  • roducer-2 produced: 1
  • producer-1 produced: 1
  • consumer-2 consumed: 0
  • consumer-1 consumed: null
  • consumer-2 consumed: 2
  • etc

you need use while(isbufferempty()) instead of if (and same full). since consumers (and producers) signaled @ same time, have recheck make sure other ones haven't processed elements added in queue.


Comments

Popular posts from this blog

asynchronous - C# WinSCP .NET assembly: How to upload multiple files asynchronously -

aws api gateway - SerializationException in posting new Records via Dynamodb Proxy Service in API -

asp.net - Problems sending emails from forum -