Sunday, November 29, 2015

Producer Consumer using BlockingQueue




drainTo(Collection<? super E> c, int maxElements)
Removes at most the given number of available elements from this queue and adds them to the given collection.

If you happen to use Google Guava, there's a nifty Queues.drain() method.
Drains the queue as BlockingQueue.drainTo(Collection, int), but if the requested numElements elements are not available, it will wait for them up to the specified timeout.

Producer Consumer using BlockingQueue - 我的博客 - ITeye技术网站
Usage example, based on a typical producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.
  1. class Producer implements Runnable {  
  2.   private final BlockingQueue queue;  
  3.   Producer(BlockingQueue q) { queue = q; }  
  4.   public void run() {  
  5.     try {  
  6.       while (true) { queue.put(produce()); }  
  7.     } catch (InterruptedException ex) { ... handle ...}  
  8.   }  
  9.   Object produce() { ... }  
  10. }  
  11.   
  12. class Consumer implements Runnable {  
  13.   private final BlockingQueue queue;  
  14.   Consumer(BlockingQueue q) { queue = q; }  
  15.   public void run() {  
  16.     try {  
  17.       while (true) { consume(queue.take()); }  
  18.     } catch (InterruptedException ex) { ... handle ...}  
  19.   }  
  20.   void consume(Object x) { ... }  
  21. }  

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from theBlockingQueue in another thread.

BlockingQueue是Java Collection框架的一个接口,下面有很多实现类,包括:
ArrayBlockingQueue, (这个要自己会实现)

不用BlockingQueue实现的话,可以这么做:
  1. public class Drop {  
  2.     // Message sent from producer  
  3.     // to consumer.  
  4.     private String message;  
  5.     // True if consumer should wait  
  6.     // for producer to send message,  
  7.     // false if producer should wait for  
  8.     // consumer to retrieve message.  
  9.     private boolean empty = true;  
  10.   
  11.     public synchronized String take() {  
  12.         // Wait until message is  
  13.         // available.  
  14.         while (empty) {  
  15.             try {  
  16.                 wait();  
  17.             } catch (InterruptedException e) {}  
  18.         }  
  19.         // Toggle status.  
  20.         empty = true;  
  21.         // Notify producer that  
  22.         // status has changed.  
  23.         notifyAll();  
  24.         return message;  
  25.     }  
  26.   
  27.     public synchronized void put(String message) {  
  28.         // Wait until message has  
  29.         // been retrieved.  
  30.         while (!empty) {  
  31.             try {   
  32.                 wait();  
  33.             } catch (InterruptedException e) {}  
  34.         }  
  35.         // Toggle status.  
  36.         empty = false;  
  37.         // Store message.  
  38.         this.message = message;  
  39.         // Notify consumer that status  
  40.         // has changed.  
  41.         notifyAll();  
  42.     }  
  43. }  
  44.   
  45.   
  46. public class Producer implements Runnable {  
  47.     private Drop drop;  
  48.   
  49.     public Producer(Drop drop) {  
  50.         this.drop = drop;  
  51.     }  
  52.   
  53.     public void run() {  
  54.         String importantInfo[] = {  
  55.             "Mares eat oats",  
  56.             "Does eat oats",  
  57.             "Little lambs eat ivy",  
  58.             "A kid will eat ivy too"  
  59.         };  
  60.         Random random = new Random();  
  61.   
  62.         for (int i = 0;  
  63.              i < importantInfo.length;  
  64.              i++) {  
  65.             drop.put(importantInfo[i]);  
  66.             try {  
  67.                 Thread.sleep(random.nextInt(5000));  
  68.             } catch (InterruptedException e) {}  
  69.         }  
  70.         drop.put("DONE");  
  71.     }  
  72. }  
  73.   
  74. public class Consumer implements Runnable {    
  75.     private Drop drop;    
  76.     
  77.     public Consumer(Drop drop) {    
  78.         this.drop = drop;    
  79.     }    
  80.     
  81.     public void run() {    
  82.         Random random = new Random();    
  83.         for (String message = drop.take();    
  84.              ! message.equals("DONE");    
  85.              message = drop.take()) {    
  86.             System.out.format("MESSAGE RECEIVED: %s%n", message);    
  87.             try {    
  88.                 Thread.sleep(random.nextInt(5000));    
  89.             } catch (InterruptedException e) {}    
  90.         }    
  91.     }    
  92. }
Read full article from Producer Consumer using BlockingQueue - 我的博客 - ITeye技术网站

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts