Friday, January 8, 2016

Apache Oozie



Apache Oozie: The Workflow Scheduler for Hadoop
multistage jobs
It should use an adequate and well-understood programming model to facilitate its adoption and to reduce developer ramp-up time.
It should be easy to troubleshot and recover jobs when something goes wrong.
It should be extensible to support new types of jobs.
It should scale to support several thousand concurrent jobs.
Jobs should run in a server to increase reliability.
It should be a multitenant service to reduce the cost of operation.

Oozie is designed to run multistage Hadoop jobs as a single job: an Oozie job. Oozie jobs can be configured to run on demand or periodically. Oozie jobs running on demand are called workflow jobs. Oozie jobs running periodically are called coordinator jobs. There is also a third type of Oozie job called bundle jobs. A bundle job is a collection of coordinator jobs managed as a single job.

https://github.com/oozie-book/examples


<workflow-app xmlns="uri:oozie:workflow:0.4" name="identity-WF">

  <parameters>
    <property>
      <name>jobTracker</name>
    </property>
    <property>
      <name>nameNode</name>
    </property>
    <property>
      <name>exampleDir</name>
    </property>
  </parameters>

  <start to="identity-MR"/>

  <action name="identity-MR">
    <map-reduce>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>
      <prepare>
        <delete path="${exampleDir}/data/output"/>
      </prepare>
      <configuration>
        <property>
          <name>mapred.mapper.class</name>
          <value>org.apache.hadoop.mapred.lib.IdentityMapper</value>
        </property>
        <property>
          <name>mapred.reducer.class</name>
          <value>org.apache.hadoop.mapred.lib.IdentityReducer</value>
        </property>
        <property>
          <name>mapred.input.dir</name>
          <value>${exampleDir}/data/input</value>
        </property>
        <property>
          <name>mapred.output.dir</name>
          <value>${exampleDir}/data/output</value>
        </property>
      </configuration>
    </map-reduce>
    <ok to="success"/>
    <error to="fail"/>
  </action>

  <kill name="fail">
    <message>The Identity Map-Reduce job failed!</message>
  </kill>
  <end name="success"/>
</workflow-app>

job.properties
nameNode=hdfs://localhost:8020
jobTracker=localhost:8032
exampleDir=${nameNode}/user/${user.name}/ch01-identity
oozie.wf.application.path=${exampleDir}/app

$ export OOZIE_URL=http://localhost:11000/oozie
$ oozie job -run -config target/example/job.properties
oozie job -info

The fork and join control nodes allow executing actions in parallel. The decision control node is like a switch/case statement that can select a particular execution path within the workflow using information from the job itself.
Because workflows are directed acyclic graphs, they don’t support loops in the flow.


DistCp Version 2 (distributed copy) is a tool used for large inter/intra-cluster copying. It uses MapReduce to effect its distribution, error handling and recovery, and reporting. It expands a list of files and directories into input to map tasks, each of which will copy a partition of the files specified in the source list.

bash$ hadoop distcp2 hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo

Oozie Coordinators
An Oozie coordinator schedules workflow executions based on a start-time and a frequency parameter, and it starts the workflow when all the necessary input data becomes available. If the input data is not available, the workflow execution is delayed until the input data becomes available.

If the daily rawlogs are not available for a few days, the coordinator job keeps track of all the missed days. And when the rawlogs for a missing day shows up, the workflow to process the logs for the corresponding date is started.

Oozie Bundles
An Oozie bundle is a collection of coordinator jobs that can be started, stopped, suspended, and modified as a single job. Typically, coordinator jobs in a bundle depend on each other. The Output data produced by a coordinator job becomes input data for other coordinator jobs. These types of interdependent coordinator jobs are also called data pipelines.

A logs-processing-bundle bundle job groups these three coordinator jobs. By running the bundle job, the three coordinator jobs will run at their corresponding frequencies. All workflow jobs and coordinator jobs are accessible and managed from a single bundle job.

Parameters, Variables, and Functions
Coordinator applications consist of a coordinator.xml file. Bundle applications consist of a bundle.xml file.

Before running a job, you must copy the application files to HDFS. Deploying an Oozie application simply involves copying the directory with all the files required to run the application to HDFS.

The oozie command-line tool and the Oozie Java API ultimately use the Oozie HTTP REST API to communicate with the Oozie server.

All of the job states are stored in the SQL database and the transactional nature of the SQL database ensures reliable behavior of Oozie jobs even if the Oozie server crashes or is shut down.

Oozie has built-in purging logic that deletes completed jobs from the database after a period of time.

If the queue overflows, commands are dropped silently from the queue. To handle this scenario, Oozie has a background thread that re-creates all dropped commands after a certain amount of time using the job state stored in the SQL database.

Hadoop Application Architectures
Fan-Out Workflow
The fan-out workflow pattern is most commonly used when multiple actions in the workflow could run in parallel, but a later action requires all previous actions to be completed before it can be run. This is also called a fork-and-join pattern.

<workflow-app name="build_reports" xmlns="uri:oozie:workflow:0.4">

   <global>
         <job-tracker>${jobTracker}</job-tracker>
         <name-node>${nameNode}</name-node>
         <job-xml>${hiveSiteXML}</job-xml>
   </global>

   <start to="preliminary_statistics" />

   <action name="preliminary_statistics">
      <hive xmlns="uri:oozie:hive-action:0.5">
         <script>${scripts}/stats.hql</script>
      </hive>
      <ok to="fork_aggregates" />
      <error to="kill" />
   </action>

   <fork name="fork_aggregates">
      <path start="prescriptions_and_refills" />
      <path start="office_visits" />
      <path start="lab_results" />
   </fork>

   <action name="prescriptions_and_refills">
      <hive xmlns="uri:oozie:hive-action:0.5">
         <script>${scripts}/refills.hql</script>
      </hive>
      <ok to="join_reports" />
      <error to="kill" />
   </action>

   <action name="office_visits">
      <hive xmlns="uri:oozie:hive-action:0.5">
         <script>${scripts}/visits.hql</script>
      </hive>
      <ok to="join_reports" />
      <error to="kill" />
   </action>

   <action name="lab_results">
      <hive xmlns="uri:oozie:hive-action:0.5">
         <script>${scripts}/labs.hql</script>
      </hive>
      <ok to="join_reports" />
      <error to="kill" />
   </action>

   <join name="join_reports" to="summary_report" />

   <action name="summary_report">
      <hive xmlns="uri:oozie:hive-action:0.5">
         <script>${scripts}/summary_report.hql</script>
      </hive>
      <ok to="end" />
      <error to="kill" />
   </action>

   <kill name="kill">
      <message> Workflow failed. Error message
                [${wf:errorMessage(wf:lastErrorNode())}]</message>
   </kill>
   <end name="end" />
</workflow-app>

The capture-and-decide workflow is commonly used when the next action needs to be chosen based on the result of a previous action.
    <action name='validate'>
        <java>
            <main-class>com.hadooparchitecturebook.DataValidationRunner
            </main-class>
            <arg>-Dinput.base.dir=${wf:conf('input.base.dir')}</arg>
            <arg>-Dvalidation.output.dir=${wf:conf('input.base.dir')}/dataset
            </arg>
            <capture-output />
        </java>
        <ok to="check_for_validation_errors" />
        <error to="fail" />
    </action>

    <decision name='check_for_validation_errors'>
        <switch>
            <case to="validation_failure">
                ${(wf:actionData("validate")["errors"] == "true")}
            </case>
            <default to="process_data" />
        </switch>
    </decision>

    <action name='process_data'>
        <java>
            <main-class>com.hadooparchitecturebook.ProcessDataRunner</main-class>
            <arg>-Dinput.dir=${wf:conf('input.base.dir')}/dataset</arg>
        </java>
        <ok to="end" />
        <error to="fail" />
    </action>

    <action name="validation_failure">
        <java>
            <main-class>com.hadooparchitecturebook.MoveOutputToErrorsAction
            </main-class>
            <arg>${wf:conf('input.base.dir')}</arg>
            <arg>${wf:conf('errors.base.dir')}</arg>
            <capture-output />
        </java>
        <ok to="validation_fail" />
        <error to="fail" />
    </action>

    <kill name="validation_fail">
        <message>Input validation failed. Please see error text in:
                 ${wf:actionData("validation_failure")["errorDir"]}
        </message>
    </kill>

Frequency Scheduling
Time and Data Triggers
<coordinator-app name="hourly-aggregation" frequency="${coord:days(1)}"
start="2014-01-19T09:00Z" end="2015-01-19T10:00Z" timezone="America/Los_Angeles"
xmlns="uri:oozie:coordinator:0.1">
   <dataset name="logs" frequency="${coord:days(1)}"
           initial-instance="2014-01-15T06:15Z" timezone="America/Los_Angeles">
    <uri-template>
      hdfs://nameservice1/app/logs/${YEAR}${MONTH}/${DAY}
    </uri-template>
    <done-flag>_DONE</done-flag>
  </dataset>
    <input-events>
    <data-in name="input" dataset="logs">
      <instance>${coord:current(-1)}</instance>
    </data-in>
  </input-events>
   <action>
      <workflow>
         <app-path>hdfs://nameservice1/app/workflows/hourly-aggregation
         </app-path>
      </workflow>
   </action>
</coordinator-app>
  <controls>
    <timeout>1440</timeout>
    <execution>FIFO</execution>
    <concurrency>5</concurrency>
    <throttle>5</throttle>
  </controls>

  

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts