drainTo(Collection<? super E> c, int maxElements)
Removes at most the given number of available elements from this queue and adds them to the given collection.
|
If you happen to use Google Guava, there's a nifty
Queues.drain()
method.Drains the queue asBlockingQueue.drainTo(Collection, int)
, but if the requestednumElements
elements are not available, it will wait for them up to the specified timeout.
Producer Consumer using BlockingQueue - 我的博客 - ITeye技术网站
Usage example, based on a typical producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.
- class Producer implements Runnable {
- private final BlockingQueue queue;
- Producer(BlockingQueue q) { queue = q; }
- public void run() {
- try {
- while (true) { queue.put(produce()); }
- } catch (InterruptedException ex) { ... handle ...}
- }
- Object produce() { ... }
- }
- class Consumer implements Runnable {
- private final BlockingQueue queue;
- Consumer(BlockingQueue q) { queue = q; }
- public void run() {
- try {
- while (true) { consume(queue.take()); }
- } catch (InterruptedException ex) { ... handle ...}
- }
- void consume(Object x) { ... }
- }
Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a
BlockingQueue
happen-before actions subsequent to the access or removal of that element from theBlockingQueue
in another thread.
BlockingQueue是Java Collection框架的一个接口,下面有很多实现类,包括:
ArrayBlockingQueue, (这个要自己会实现)
不用BlockingQueue实现的话,可以这么做:
- public class Drop {
- // Message sent from producer
- // to consumer.
- private String message;
- // True if consumer should wait
- // for producer to send message,
- // false if producer should wait for
- // consumer to retrieve message.
- private boolean empty = true;
- public synchronized String take() {
- // Wait until message is
- // available.
- while (empty) {
- try {
- wait();
- } catch (InterruptedException e) {}
- }
- // Toggle status.
- empty = true;
- // Notify producer that
- // status has changed.
- notifyAll();
- return message;
- }
- public synchronized void put(String message) {
- // Wait until message has
- // been retrieved.
- while (!empty) {
- try {
- wait();
- } catch (InterruptedException e) {}
- }
- // Toggle status.
- empty = false;
- // Store message.
- this.message = message;
- // Notify consumer that status
- // has changed.
- notifyAll();
- }
- }
- public class Producer implements Runnable {
- private Drop drop;
- public Producer(Drop drop) {
- this.drop = drop;
- }
- public void run() {
- String importantInfo[] = {
- "Mares eat oats",
- "Does eat oats",
- "Little lambs eat ivy",
- "A kid will eat ivy too"
- };
- Random random = new Random();
- for (int i = 0;
- i < importantInfo.length;
- i++) {
- drop.put(importantInfo[i]);
- try {
- Thread.sleep(random.nextInt(5000));
- } catch (InterruptedException e) {}
- }
- drop.put("DONE");
- }
- }
- public class Consumer implements Runnable {
- private Drop drop;
- public Consumer(Drop drop) {
- this.drop = drop;
- }
- public void run() {
- Random random = new Random();
- for (String message = drop.take();
- ! message.equals("DONE");
- message = drop.take()) {
- System.out.format("MESSAGE RECEIVED: %s%n", message);
- try {
- Thread.sleep(random.nextInt(5000));
- } catch (InterruptedException e) {}
- }
- }
- }