Sunday, January 3, 2016

Market Basket Analysis - Data Algorithms



https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap07/mapreduce/MBAMapper.java
public class MBAMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

   public static final Logger THE_LOGGER = Logger.getLogger(MBAMapper.class);

   public static final int DEFAULT_NUMBER_OF_PAIRS = 2;

   //output key2: list of items paired; can be 2 or 3 ...
   private static final Text reducerKey = new Text();
 
   //output value2: number of the paired items in the item list
   private static final IntWritable NUMBER_ONE = new IntWritable(1);

   int numberOfPairs; // will be read by setup(), set by driver
 
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
      this.numberOfPairs = context.getConfiguration().getInt("number.of.pairs", DEFAULT_NUMBER_OF_PAIRS);
      THE_LOGGER.info("setup() numberOfPairs = " + numberOfPairs);
    }

   @Override
   public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

      // input line
      String line = value.toString();
      List<String> items = convertItemsToList(line);
      if ((items == null) || (items.isEmpty())) {
         // no mapper output will be generated
         return;
      }
      generateMapperOutput(numberOfPairs, items, context);
   }
 
   private static List<String> convertItemsToList(String line) {
      if ((line == null) || (line.length() == 0)) {
         // no mapper output will be generated
         return null;
      }    
      String[] tokens = StringUtils.split(line, ",");  
      if ( (tokens == null) || (tokens.length == 0) ) {
         return null;
      }
      List<String> items = new ArrayList<String>();        
      for (String token : tokens) {
         if (token != null) {
             items.add(token.trim());
         }        
      }        
      return items;
   }
 
   /**
    *
    * build <key, value> by sorting the input list
    * If not sort the input, it may have duplicated list but not considered as a same list.
    * ex: (a, b, c) and (a, c, b) might become different items to be counted if not sorted
    * @param numberOfPairs   number of pairs associated
    * @param items      list of items (from input line)
    * @param context   Hadoop Job context
    * @throws IOException
    * @throws InterruptedException
    */
   private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)
      throws IOException, InterruptedException {
      List<List<String>> sortedCombinations = Combination.findSortedCombinations(items, numberOfPairs);
      for (List<String> itemList: sortedCombinations) {
         System.out.println("itemlist="+itemList.toString());
         reducerKey.set(itemList.toString());
         context.write(reducerKey, NUMBER_ONE);
      }  
   }
 
}
https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap07/mapreduce/MBAReducer.java
public class MBAReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
       
   @Override
   public void reduce(Text key, Iterable<IntWritable> values, Context context)
      throws IOException, InterruptedException {
      int sum = 0; // total items paired
      for (IntWritable value : values) {
         sum += value.get();
      }
      context.write(key, new IntWritable(sum));
   }
}

https://github.com/mahmoudparsian/data-algorithms-book/blob/master/src/main/java/org/dataalgorithms/chap07/spark/FindAssociationRules.java
public class FindAssociationRules {
 
   static List<String> toList(String transaction) {
      String[] items = transaction.trim().split(",");
      List<String> list = new ArrayList<String>();
      for (String item : items) {
        list.add(item);
      }
      return list;
   }
 
   static List<String>  removeOneItem(List<String> list, int i) {
      if ( (list == null) || (list.isEmpty()) ) {
        return list;
      }
     
      if ( (i < 0) || (i > (list.size()-1)) ) {
         return list;
      }
     
      List<String> cloned = new ArrayList<String>(list);
      cloned.remove(i);
      return cloned;
   }

 
   public static void main(String[] args) throws Exception {
      // STEP-1: handle input parameters
      if (args.length < 1) {
         System.err.println("Usage: FindAssociationRules <transactions>");
         System.exit(1);
      }
      String transactionsFileName =  args[0];

      // STEP-2: create a Spark context object
      JavaSparkContext ctx = new JavaSparkContext();
     
      // STEP-3: read all transactions from HDFS and create the first RDD
      JavaRDD<String> transactions = ctx.textFile(transactionsFileName, 1);
      transactions.saveAsTextFile("/rules/output/1");

      // STEP-4: generate frequent patterns
      // PairFlatMapFunction<T, K, V>    
      // T => Iterable<Tuple2<K, V>>
      JavaPairRDD<List<String>,Integer> patterns =
         transactions.flatMapToPair(new PairFlatMapFunction<
                                                             String,        // T
                                                             List<String>,  // K
                                                             Integer        // V
                                                           >() {
         @Override
         public Iterable<Tuple2<List<String>,Integer>> call(String transaction) {
            List<String> list = toList(transaction);
            List<List<String>> combinations = Combination.findSortedCombinations(list);
            List<Tuple2<List<String>,Integer>> result = new ArrayList<Tuple2<List<String>,Integer>>();
            for (List<String> combList : combinations) {
                 if (combList.size() > 0) {
                   result.add(new Tuple2<List<String>,Integer>(combList, 1));
                 }
            }
            return result;
         }
      });      
      // STEP-5: combine/reduce frequent patterns
      JavaPairRDD<List<String>, Integer> combined = patterns.reduceByKey(new Function2<Integer, Integer, Integer>() {
         @Override
         public Integer call(Integer i1, Integer i2) {
            return i1 + i2;
         }
      });      
      // now, we have: patterns(K,V)
      //      K = pattern as List<String>
      //      V = frequency of pattern
      // now given (K,V) as (List<a,b,c>, 2) we will
      // generate the following (K2,V2) pairs:
      //
      //   (List<a,b,c>, T2(null, 2))
      //   (List<a,b>,   T2(List<a,b,c>, 2))
      //   (List<a,c>,   T2(List<a,b,c>, 2))
      //   (List<b,c>,   T2(List<a,b,c>, 2))


      // STEP-6: generate all sub-patterns
      // PairFlatMapFunction<T, K, V>    
      // T => Iterable<Tuple2<K, V>>
      JavaPairRDD<List<String>,Tuple2<List<String>,Integer>> subpatterns =
         combined.flatMapToPair(new PairFlatMapFunction<
          Tuple2<List<String>, Integer>,   // T
          List<String>,                    // K
          Tuple2<List<String>,Integer>     // V
        >() {
       @Override
       public Iterable<Tuple2<List<String>,Tuple2<List<String>,Integer>>>
          call(Tuple2<List<String>, Integer> pattern) {
            List<Tuple2<List<String>,Tuple2<List<String>,Integer>>> result =
               new ArrayList<Tuple2<List<String>,Tuple2<List<String>,Integer>>>();
            List<String> list = pattern._1;
            Integer frequency = pattern._2;
            result.add(new Tuple2(list, new Tuple2(null,frequency)));
            if (list.size() == 1) {
               return result;
            }
           
            // pattern has more than one items
            // result.add(new Tuple2(list, new Tuple2(null,size)));
            for (int i=0; i < list.size(); i++) {
               List<String> sublist = removeOneItem(list, i);
               result.add(new Tuple2(sublist, new Tuple2(list, frequency)));
            }
            return result;
        }
      });
       
      // STEP-6: combine sub-patterns
      JavaPairRDD<List<String>,Iterable<Tuple2<List<String>,Integer>>> rules = subpatterns.groupByKey();      

      // STEP-7: generate association rules    
      // Now, use (K=List<String>, V=Iterable<Tuple2<List<String>,Integer>>)
      // to generate association rules
      // JavaRDD<R> map(Function<T,R> f)
      // Return a new RDD by applying a function to all elements of this RDD.
      JavaRDD<List<Tuple3<List<String>,List<String>,Double>>> assocRules = rules.map(new Function<
          Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>>,     // T: input
          List<Tuple3<List<String>,List<String>,Double>>                   // R: ( ac => b, 1/3): T3(List(a,c), List(b),  0.33)
                                                                           //    ( ad => c, 1/3): T3(List(a,d), List(c),  0.33)
         >() {
        @Override
        public List<Tuple3<List<String>,List<String>,Double>> call(Tuple2<List<String>,Iterable<Tuple2<List<String>,Integer>>> in) {
            List<Tuple3<List<String>,List<String>,Double>> result =
               new ArrayList<Tuple3<List<String>,List<String>,Double>>();
            List<String> fromList = in._1;
            Iterable<Tuple2<List<String>,Integer>> to = in._2;
            List<Tuple2<List<String>,Integer>> toList = new ArrayList<Tuple2<List<String>,Integer>>();
            Tuple2<List<String>,Integer> fromCount = null;
            for (Tuple2<List<String>,Integer> t2 : to) {
               // find the "count" object
               if (t2._1 == null) {
                    fromCount = t2;
               }
               else {
                  toList.add(t2);
               }
            }
           
            // Now, we have the required objects for generating association rules:
            //  "fromList", "fromCount", and "toList"
            if (toList.isEmpty()) {
               // no output generated, but since Spark does not like null objects, we will fake a null object
               return result; // an empty list
            }
           
            // now using 3 objects: "from", "fromCount", and "toList",
            // create association rules:
            for (Tuple2<List<String>,Integer>  t2 : toList) {
               double confidence = (double) t2._2 / (double) fromCount._2;
               List<String> t2List = new ArrayList<String>(t2._1);
               t2List.removeAll(fromList);
               result.add(new Tuple3(fromList, t2List, confidence));
            }
          return result;
        }
      });  
      assocRules.saveAsTextFile("/rules/output/6");

      // done
      ctx.close();    
      System.exit(0);
   }
}



Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts