java - Single Producer Multiple Consumer - queue contains null -
can explain how consumer-2 below consuming 'null'? code should preventing this.
public class test { public static void main(string args[]) throws interruptedexception { boundedqueue<integer> sharedqueue = new boundedqueue<>(10); callable<integer> producer1 = new producer(sharedqueue, "producer-1"); //callable<integer> producer2 = new producer(sharedqueue, "producer-2"); callable<integer> consumer1 = new consumer(sharedqueue, "consumer-1"); callable<integer> consumer2 = new consumer(sharedqueue, "consumer-2"); collection<callable<integer>> callables = new hashset<>(); callables.add(producer1); //callables.add(producer2); callables.add(consumer1); callables.add(consumer2); executorservice executorservice = executors.newfixedthreadpool(10); executorservice.invokeall(callables); } } public class boundedqueue<t> { private int capacity; private int head; private int tail; private int currentsizeofbuffer; private t[] buffer; private final reentrantlock lock = new reentrantlock(); private final condition notfull = lock.newcondition(); private final condition notempty = lock.newcondition(); public boundedqueue(int capacity) { this.capacity = capacity; this.buffer = (t[]) new object[capacity]; } public void put(t element) throws interruptedexception { final reentrantlock lock = this.lock; lock.lock(); if(isbufferfull()) { waitonavailableslot(); } try { buffer[tail] = element; tail = getnextavailableslot(tail); currentsizeofbuffer++; informconsumerqueuehaselement(); } { lock.unlock(); } } private boolean isbufferfull() { return capacity == currentsizeofbuffer; } private void waitonavailableslot() throws interruptedexception { notfull.await(); } private void informconsumerqueuehaselement() { notempty.signal(); } public t take() throws interruptedexception { final reentrantlock lock = this.lock; lock.lock(); if(isbufferempty()) { waitonavailableelement(); } try { t element = buffer[head]; head = getnextavailableslot(head); currentsizeofbuffer--; informproducerqueuehasspaceavailable(); return element; } { lock.unlock(); } } private boolean isbufferempty() { return 0 == currentsizeofbuffer; } private void waitonavailableelement() throws interruptedexception { notempty.await(); } private void informproducerqueuehasspaceavailable() { notfull.signal(); } private final int getnextavailableslot(int currentslotposition) { int nextavailableslot = ++currentslotposition; return (nextavailableslot == capacity) ? 0 : nextavailableslot; } } public class producer implements callable<integer> { private final boundedqueue sharedqueue; private string name; @override public integer call() throws exception { for(int i=0; i<10; i++){ try { sharedqueue.put(i); system.out.println(name + " produced: " + i); } catch (interruptedexception ex) { logger.getlogger(producer.class.getname()).log(level.severe, null, ex); } } return null; } public producer(boundedqueue sharedqueue, string name) { this.sharedqueue = sharedqueue; this.name = name; } } public class consumer implements callable<integer> { private final boundedqueue sharedqueue; private string name; @override public integer call() throws exception { while(true){ //what happening here? try { integer element = (integer) sharedqueue.take(); system.out.println(name + " consumed: "+ element); } catch (interruptedexception ex) { logger.getlogger(consumer.class.getname()).log(level.severe, null, ex); } } } public consumer(boundedqueue sharedqueue, string name) { this.sharedqueue = sharedqueue; this.name = name; } }
output:
- producer-2 produced: 0
- consumer-2 consumed: null
- consumer-1 consumed: 0
- producer-2 produced: 1
- producer-2 produced: 2
- consumer-2 consumed: 2
- consumer-1 consumed: 0
- producer-1 produced: 0
- consumer-2 consumed: 3
- etc
another run:
- producer-2 produced: 0
- consumer-1 consumed: 0
- consumer-2 consumed: null
- producer-1 produced: 0
- roducer-2 produced: 1
- producer-1 produced: 1
- consumer-2 consumed: 0
- consumer-1 consumed: null
- consumer-2 consumed: 2
- etc
you need use while(isbufferempty())
instead of if
(and same full). since consumers (and producers) signaled @ same time, have recheck make sure other ones haven't processed elements added in queue.
Comments
Post a Comment