Friday, July 31, 2015

Hadoop Map Reduce Miscs



Word Count:
Spark:
val textFile = spark.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
http://wiki.apache.org/hadoop/WordCount
Each mapper takes a line as input and breaks it into words. It then emits a key/value pair of the word and 1. Each reducer sums the counts for each word and emits a single key/value with the word and sum.
As an optimization, the reducer is also used as a combiner on the map outputs. This reduces the amount of data sent across the network by combining each word into a single record.
  17  public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
  18     private final static IntWritable one = new IntWritable(1);
  19     private Text word = new Text();
  20         
  21     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  22         String line = value.toString();
  23         StringTokenizer tokenizer = new StringTokenizer(line);
  24         while (tokenizer.hasMoreTokens()) {
  25             word.set(tokenizer.nextToken());
  26             context.write(word, one);
  27         }
  28     }
  29  } 
  30         
  31  public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
  32 
  33     public void reduce(Text key, Iterable<IntWritable> values, Context context) 
  34       throws IOException, InterruptedException {
  35         int sum = 0;
  36         for (IntWritable val : values) {
  37             sum += val.get();
  38         }
  39         context.write(key, new IntWritable(sum));
  40     }
  41  }
  42         
  43  public static void main(String[] args) throws Exception {
  44     Configuration conf = new Configuration();
  45         
  46         Job job = new Job(conf, "wordcount");
  47     
  48     job.setOutputKeyClass(Text.class);
  49     job.setOutputValueClass(IntWritable.class);
  50         
  51     job.setMapperClass(Map.class);
  52     job.setReducerClass(Reduce.class);
  53         
  54     job.setInputFormatClass(TextInputFormat.class);
  55     job.setOutputFormatClass(TextOutputFormat.class);
  56         
  57     FileInputFormat.addInputPath(job, new Path(args[0]));
  58     FileOutputFormat.setOutputPath(job, new Path(args[1]));
  59         
  60     job.waitForCompletion(true);
  61  }

Line Count:
SparkConf sparkConf = new SparkConf().setAppName("File Copy");
03.JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
04. 
05.// Read the source file
06.JavaRDD<String> input = sparkContext.textFile(args[0]);
07. 
08.// Gets the number of entries in the RDD
09.long count = input.count();

Mediator pattern



Mediator pattern - Wikipedia, the free encyclopedia
the mediator pattern defines an object that encapsulates how a set of objects interact.

Usually a program is made up of a large number of classes. So the logic and computation is distributed among these classes. However, as more classes are developed in a program, especially during maintenance and/or refactoring, the problem of communication between these classes may become more complex. This makes the program harder to read and maintain. Furthermore, it can become difficult to change the program, since any change may affect code in several other classes.
With the mediator pattern, communication between objects is encapsulated with a mediator object. Objects no longer communicate directly with each other, but instead communicate through the mediator. This reduces the dependencies between communicating objects, thereby lowering the coupling.
http://www.journaldev.com/1730/mediator-design-pattern-in-java-example-tutorial
Allows loose coupling by encapsulating the way disparate sets of objects interact and communicate with each other. Allows for the actions of each object set to vary independently of one another.

The system objects that communicate each other are called Colleagues. Usually we have an interface or abstract class that provides the contract for communication and then we have concrete implementation of mediators.

Mediator Design Pattern UML
http://www.codeproject.com/Articles/186187/Mediator-Design-Pattern

The left side is the mediator, the object that distributes the messages. The right side are the participants. The official UML of the mediator pattern calls the participants as Colleagues, it's just a different terminology.
  • The IMediator interface defines the properties and the methods that all mediators must support:
    • ColleagueList -- This is the list of the registered participants
    • DistributeMessage(IColleague) -- Sends the messages from the sender to all the participants
    • Register(IColleague) -- Register the participant to receive the message from the mediator
  • The IColleague interface defines the methods that all participants must support:
    • SendMessage(IMediator) -- Sends the message to the mediator
    • ReceiveMessage() -- Gets the message from the mediator
A comparison between the mediator pattern and the observer pattern shows some similarities and some clear differences. Both patterns facilitates the communication between objects, and both decouples the link between the sender and the receiver. The main difference is that in the mediator pattern there is the notion of the participants and they communicate with each other using the mediator as a central hub, whereas in the observer pattern there is a clear distinction between the sender and the receiver, and the receiver merely listens to the changes in the sender.

Communication between mediators and colleagues
In more complex implementations asynchronous messages can be added to to a message queue, from where they can be picked up by the mediator object

    Important Points
  • Mediator pattern is useful when the communication logic between objects is complex, we can have a central point of communication that takes care of communication logic.
  • Java Message Service (JMS) uses Mediator pattern along with Observer pattern to allow applications to subscribe and publish data to other applications.
  • We should not use mediator pattern just to achieve lose-coupling because if the number of mediators will grow, then it will become hard to maintain them.
Example:
(I)ChatRoom, (I)User
the colleagues are a presenter, who is giving a demonstration, and a variable number of attendees, who are watching the presentation. The mediator holds references to a single presenter and multiple attendee objects with the latter held in a list. The attendees may ask questions. The presenter may answer questions and send new images to the attendees.
public abstract class PresentationMember
{
    protected Mediator _mediator;
    public PresentationMember(Mediator mediator)
    {
        _mediator = mediator;
    }
    public string Name { get; set; }
    public void ReceiveAnswer(string answer)
    {
        Console.ForegroundColor = ConsoleColor.Cyan;
        Console.WriteLine("{0} received anwer.\n'{1}'.", Name, answer);
    }
}
public class Presenter : PresentationMember
{
    public Presenter(Mediator mediator) : base(mediator) { }
    public void SendNewImageUrl(string url)
    {
        Console.ForegroundColor = ConsoleColor.White;
        Console.WriteLine("Presenter changed image URL to '{0}'.", url);
        _mediator.UpdateImage(url);
    }
    public void ReceiveQuestion(string question, Attendee attendee)
    {
        Console.ForegroundColor = ConsoleColor.Green;
        Console.WriteLine("Presenter received question from {0}.\n'{1}'"
            , attendee.Name, question);
    }
    public void AnswerQuestion(string answer, Attendee attendee)
    {
        Console.ForegroundColor = ConsoleColor.Blue;
        Console.WriteLine("Presenter answered question for {0}.\n'{1}'"
            , attendee.Name, answer);
        _mediator.SendAnswer(answer, attendee);
    }
}
public class Attendee : PresentationMember
{
    public Attendee(Mediator mediator) : base(mediator) { }
    public void AskQuestion(string question)
    {
        Console.ForegroundColor = ConsoleColor.Red;
        Console.WriteLine("{0} asked question.\n'{1}'", Name, question);
        _mediator.SendQuestion(question, this);
    }
    public void ReceiveImage(string url)
    {
        Console.ForegroundColor = ConsoleColor.Yellow;
        Console.WriteLine("Image for {0} updated to '{1}'.", Name, url);
    }
}
public class Mediator
{
    public Presenter Presenter { get; set; }
    public List<Attendee> Attendees { get; set; }
    public void UpdateImage(string url)
    {
        foreach (Attendee attendee in Attendees)
        {
            attendee.ReceiveImage(url);
        }
    }
    public void SendAnswer(string answer, Attendee attendee)
    {
        attendee.ReceiveAnswer(answer);
    }
    public void SendQuestion(string question, Attendee attendee)
    {
        Presenter.ReceiveQuestion(question, attendee);
    }
}
http://www.oodesign.com/mediator-pattern.html
The Java Message Service (JMS) API is a Java Message Oriented Middleware (MOM) API for sending messages between two or more clients. The JMS API supports 2 models. One of them is the publish-subscribe model. It is an implementation of the mediator pattern. The messages can be publisehd based on a particular topic. The publisher has to create a subscription to which different subscribers may subscribe. Zero or more subscribers may subscribe to receive messages on a particular message topic. The publisher and the subscriber doesn't know one about eachother, the subscriber can be even inactive. In this case the subscriber receives the messages when it will become active.

http://www.cnblogs.com/java-my-life/archive/2012/06/20/2554024.html
  调停者模式是对象的行为模式。调停者模式包装了一系列对象相互作用的方式,使得这些对象不必相互明显引用。从而使它们可以较松散地耦合。当这些对象中的某些对象之间的相互作用发生改变时,不会立即影响到其他的一些对象之间的相互作用。从而保证这些相互作用可以彼此独立地变化。

为什么需要调停者

  如下图所示,这个示意图中有大量的对象,这些对象既会影响别的对象,又会被别的对象所影响,因此常常叫做同事(Colleague)对象。这些同事对象通过彼此的相互作用形成系统的行为。从图中可以看出,几乎每一个对象都需要与其他的对象发生相互作用,而这种相互作用表现为一个对象与另一个对象的直接耦合。这就是过度耦合的系统。
  通过引入调停者对象(Mediator),可以将系统的网状结构变成以中介者为中心的星形结构,如下图所示。在这个星形结构中,同事对象不再通过直接的联系与另一个对象发生相互作用;相反的,它通过调停者对象与另一个对象发生相互作用。调停者对象的存在保证了对象结构上的稳定,也就是说,系统的结构不会因为新对象的引入造成大量的修改工作。
  一个好的面向对象的设计可以使对象之间增加协作性(Collaboration),减少耦合度(Couping)。一个深思熟虑的设计会把一个系统分解为一群相互协作的同事对象,然后给每一个同事对象以独特的责任,恰当的配置它们之间的协作关系,使它们可以在一起工作。

如果没有主板

  大家都知道,电脑里面各个配件之间的交互,主要是通过主板来完成的。如果电脑里面没有了主板,那么各个配件之间就必须自行相互交互,以互相传送数据。而且由于各个配件的接口不同,相互之间交互时,还必须把数据接口进行转换才能匹配上。
  所幸是有了主板,各个配件的交互完全通过主板来完成,每个配件都只需要和主板交互,而主板知道如何跟所有的配件打交道,这样就简单多了。

调停者模式的结构

  调停者模式的示意性类图如下所示:
  
  调停者模式包括以下角色:
  ●  抽象调停者(Mediator)角色:定义出同事对象到调停者对象的接口,其中主要方法是一个(或多个)事件方法。
  ●  具体调停者(ConcreteMediator)角色:实现了抽象调停者所声明的事件方法。具体调停者知晓所有的具体同事类,并负责具体的协调各同事对象的交互关系。
  ●  抽象同事类(Colleague)角色:定义出调停者到同事对象的接口。同事对象只知道调停者而不知道其余的同事对象。
  ●  具体同事类(ConcreteColleague)角色:所有的具体同事类均从抽象同事类继承而来。实现自己的业务,在需要与其他同事通信的时候,就与持有的调停者通信,调停者会负责与其他的同事交互。
public interface Mediator {
    /**
     * 同事对象在自身改变的时候来通知调停者方法
     * 让调停者去负责相应的与其他同事对象的交互
     */
    public void changed(Colleague c);
}
public class ConcreteMediator implements Mediator {
    //持有并维护同事A
    private ConcreteColleagueA colleagueA;
    //持有并维护同事B
    private ConcreteColleagueB colleagueB;    
    
    public void setColleagueA(ConcreteColleagueA colleagueA) {
        this.colleagueA = colleagueA;
    }

    public void setColleagueB(ConcreteColleagueB colleagueB) {
        this.colleagueB = colleagueB;
    }

    @Override
    public void changed(Colleague c) {
        /**
         * 某一个同事类发生了变化,通常需要与其他同事交互
         * 具体协调相应的同事对象来实现协作行为
         */
    }

}
public abstract class Colleague {
    //持有一个调停者对象
    private Mediator mediator;
    /**
     * 构造函数
     */
    public Colleague(Mediator mediator){
        this.mediator = mediator;
    }
    /**
     * 获取当前同事类对应的调停者对象
     */
    public Mediator getMediator() {
        return mediator;
    }
    
}
public class ConcreteColleagueA extends Colleague {

    public ConcreteColleagueA(Mediator mediator) {
        super(mediator);
    }
    /**
     * 示意方法,执行某些操作
     */
    public void operation(){
        //在需要跟其他同事通信的时候,通知调停者对象
        getMediator().changed(this);
    }
}

使用电脑来看电影

  在日常生活中,我们经常使用电脑来看电影,把这个过程描述出来,简化后假定会有如下的交互过程:
  (1)首先是光驱要读取光盘上的数据,然后告诉主板,它的状态改变了。
  (2)主板去得到光驱的数据,把这些数据交给CPU进行分析处理。
  (3)CPU处理完后,把数据分成了视频数据和音频数据,通知主板,它处理完了。
  (4)主板去得到CPU处理过后的数据,分别把数据交给显卡和声卡,去显示出视频和发出声音。
  要使用调停者模式来实现示例,那就要区分出同事对象和调停者对象。很明显,主板是调停者,而光驱、声卡、CPU、显卡等配件,都是作为同事对象。
public class CDDriver extends Colleague{
    //光驱读取出来的数据
    private String data = "";
    /**
     * 构造函数
     */
    public CDDriver(Mediator mediator) {
        super(mediator);
    }
    /**
     * 获取光盘读取出来的数据
     */
    public String getData() {
        return data;
    }
    /**
     * 读取光盘
     */
    public void readCD(){
        //逗号前是视频显示的数据,逗号后是声音
        this.data = "One Piece,海贼王我当定了";
        //通知主板,自己的状态发生了改变
        getMediator().changed(this);
    }
}
public class CPU extends Colleague {
    //分解出来的视频数据
    private String videoData = "";
    //分解出来的声音数据
    private String soundData = "";
    /**
     * 构造函数
     */
    public CPU(Mediator mediator) {
        super(mediator);
    }
    /**
     * 获取分解出来的视频数据
     */
    public String getVideoData() {
        return videoData;
    }
    /**
     * 获取分解出来的声音数据
     */
    public String getSoundData() {
        return soundData;
    }
    /**
     * 处理数据,把数据分成音频和视频的数据
     */
    public void executeData(String data){
        //把数据分解开,前面是视频数据,后面是音频数据
        String[] array = data.split(",");
        this.videoData = array[0];
        this.soundData = array[1];
        //通知主板,CPU完成工作
        getMediator().changed(this);
    }   
}
public class MainBoard implements Mediator {
    //需要知道要交互的同事类——光驱类
    private CDDriver cdDriver = null;
    //需要知道要交互的同事类——CPU类
    private CPU cpu = null;
    //需要知道要交互的同事类——显卡类
    private VideoCard videoCard = null;
    //需要知道要交互的同事类——声卡类
    private SoundCard soundCard = null;
    
    public void setCdDriver(CDDriver cdDriver) {
        this.cdDriver = cdDriver;
    }

    public void setCpu(CPU cpu) {
        this.cpu = cpu;
    }

    public void setVideoCard(VideoCard videoCard) {
        this.videoCard = videoCard;
    }

    public void setSoundCard(SoundCard soundCard) {
        this.soundCard = soundCard;
    }

    @Override
    public void changed(Colleague c) {
        if(c instanceof CDDriver){
            //表示光驱读取数据了
            this.opeCDDriverReadData((CDDriver)c);
        }else if(c instanceof CPU){
            this.opeCPU((CPU)c);
        }
    }
    /**
     * 处理光驱读取数据以后与其他对象的交互
     */
    private void opeCDDriverReadData(CDDriver cd){
        //先获取光驱读取的数据
        String data = cd.getData();
        //把这些数据传递给CPU进行处理
        cpu.executeData(data);
    }
    /**
     * 处理CPU处理完数据后与其他对象的交互
     */
    private void opeCPU(CPU cpu){
        //先获取CPU处理后的数据
        String videoData = cpu.getVideoData();
        String soundData = cpu.getSoundData();
        //把这些数据传递给显卡和声卡展示出来
        videoCard.showData(videoData);
        soundCard.soundData(soundData);
    }
}
  客户端类
    public static void main(String[] args) {
        //创建调停者——主板
        MainBoard mediator = new MainBoard();
        //创建同事类
        CDDriver cd = new CDDriver(mediator);
        CPU cpu = new CPU(mediator);
        VideoCard vc = new VideoCard(mediator);
        SoundCard sc = new SoundCard(mediator);
        //让调停者知道所有同事
        mediator.setCdDriver(cd);
        mediator.setCpu(cpu);
        mediator.setVideoCard(vc);
        mediator.setSoundCard(sc);
        //开始看电影,把光盘放入光驱,光驱开始读盘
        cd.readCD();
        
    }

调停者模式的优点

  ●  松散耦合
  调停者模式通过把多个同事对象之间的交互封装到调停者对象里面,从而使得同事对象之间松散耦合,基本上可以做到互补依赖。这样一来,同事对象就可以独立地变化和复用,而不再像以前那样“牵一处而动全身”了。
  ●  集中控制交互
  多个同事对象的交互,被封装在调停者对象里面集中管理,使得这些交互行为发生变化的时候,只需要修改调停者对象就可以了,当然如果是已经做好的系统,那么就扩展调停者对象,而各个同事类不需要做修改。
  ●  多对多变成一对多
  没有使用调停者模式的时候,同事对象之间的关系通常是多对多的,引入调停者对象以后,调停者对象和同事对象的关系通常变成双向的一对多,这会让对象的关系更容易理解和实现。

调停者模式的缺点

  调停者模式的一个潜在缺点是,过度集中化。如果同事对象的交互非常多,而且比较复杂,当这些复杂性全部集中到调停者的时候,会导致调停者对象变得十分复杂,而且难于管理和维护。
Read full article from Mediator pattern - Wikipedia, the free encyclopedia

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts