Hadoop Application Architectures
Clickstream Analysis
/etl/BI/casualcyclist/clicks/rawlogs/year=2014/month=10/day=10
Sqoop is an excellent tool for moving data to and from external data stores such as a relational database management system.
Because our application only requires ingesting the log data into HDFS, we’ll select Flume, because it’s purpose-built for HDFS—the built-in components tailored for HDFS mean that we won’t have to do any custom development in order to build our Flume pipeline. Flume also supports interceptors—the ability to perform small transformations of the data.
If we’re working with a web application written in Java that uses Log4j for logging, we can use the Flume Log4j appender to send data to a Flume Avro source. This is one of the simplest ways to send events to Flume, and requires just the addition of a single file to the application classpath (flume-ng-sdk-*.jar) and a few modifications to Log4j properties.
Flume provides a syslog source, which can read events from a syslog stream and convert them into Flume events.
You can use the Avro or Thrift sources to send Avro or Thrift messages to Flume. Flume can also pull messages from a JMS queue. Another option is to send JSON messages to the HTTP source.
Flume’s spooling directory source. This source will read files from a directory and turn every line into a Flume event. Note that some examples found on the Web show how to use the exec source to tail a logfile. This is highly discouraged because it is not a reliable solution. Tail does not track the last place it read, and if the Flume agent crashes it will either read duplicate records or miss them, both of which are, of course, undesirable. By contrast, the spooling directory source will only read complete files, so in case of failure it will retry the entire file and mark files successfully read so they will not be ingested twice.
the memory channel is the best choice when performance is more important than reliability, and the file channel is recommended when reliability is more important.
we’re assuming the collector agents are running on cluster edge nodes—these are nodes within the cluster network that have a Hadoop client configuration so they can communicate with the Hadoop cluster in order to submit Hadoop commands, write data to HDFS, and so on.
client.sources=r1
client.sources.r1.channels=ch1
client.sources.r1.type=spooldir
client.sources.r1.spoolDir=/opt/weblogs
# Use the Timestamp interceptor to add a timestamp to all event headers:
client.sources.r1.interceptors.i1.type=timestamp
client.sources.r1.interceptors=i1
# Define a file channel:
client.channels=ch1
client.channels.ch1.type=FILE
# Define two Avro sinks:
client.sinks=k1 k2
client.sinks.k1.type=avro
client.sinks.k1.hostname=collector1.hadoopapplicationarch.com
# Compress data before sending across the wire:
client.sinks.k1.compression-type=deflate
client.sinks.k1.port=4141
client.sinks.k2.type=avro
client.sinks.k2.hostname=collector2.hadoopapplicationarch.com
client.sinks.k2.port=4141
client.sinks.k2.compression-type=deflate
client.sinks.k1.channel=ch1
client.sinks.k2.channel=ch1
# Define a load balancing sink group to spread load over multiple collectors:
client.sinkgroups=g1
client.sinkgroups.g1.sinks=k1 k2
client.sinkgroups.g1.processor.type=load_balance
client.sinkgroups.g1.processor.selector=round_robin
client.sinkgroups.g1.processor.backoff=true
# Define an Avro source:
collector.sources=r1
collector.sources.r1.type=avro
collector.sources.r1.bind=0.0.0.0
collector.sources.r1.port=4141
collector.sources.r1.channels=ch1
# Decompress the incoming data:
collector1.sources.r1.compression-type=deflate
# Define a file channel using multiple disks for reliability:
collector.channels=ch1
collector.channels.ch1.type=FILE
collector.channels.ch1.checkpointDir=/opt/flume/ch1/cp1,/opt/flume/ch1/cp2
collector.channels.ch1.dataDirs=/opt/flume/ch1/data1,/opt/flume/ch1/data2
# Define HDFS sinks to persist events to disk as text.
# Note the use of multiple sinks to spread the load:
collector.sinks=k1 k2
collector.sinks.k1.type=hdfs
collector.sinks.k1.channel=ch1
# Partition files by date:
collector.sinks.k1.hdfs.path=/weblogs/combined/%Y/%m/%d
collector.sinks.k1.hdfs.filePrefix=combined
collector.sinks.k1.hdfs.fileSuffix=.log
collector.sinks.k1.hdfs.fileType=DataStream
collector.sinks.k1.hdfs.writeFormat=Text
collector.sinks.k1.hdfs.batchSize=10000
# Roll HDFS files every 10000 events or 30 seconds:
collector.sinks.k1.hdfs.rollCount=10000
collector.sinks.k1.hdfs.rollSize=0
collector.sinks.k1.hdfs.rollInterval=30
collector.sinks.k2.type=hdfs
collector.sinks.k2.channel=ch1
# Partition files by date:
collector.sinks.k2.hdfs.path=/weblogs/combined/%Y/%m/%d
collector.sinks.k2.hdfs.filePrefix=combined
collector.sinks.k2.hdfs.fileSuffix=.log
collector.sinks.k2.hdfs.fileType=DataStream
collector.sinks.k2.hdfs.writeFormat=Text
collector.sinks.k2.hdfs.batchSize=10000
collector.sinks.k2.hdfs.rollCount=10000
collector.sinks.k2.hdfs.rollSize=0
collector.sinks.k2.hdfs.rollInterval=30
collector.sinkgroups=g1
collector.sinkgroups.g1.sinks=k1 k2
collector.sinkgroups.g1.processor.type=load_balance
collector.sinkgroups.g1.processor.selector=round_robin
collector.sinkgroups.g1.processor.backoff=true
In the rare event of a Flume agent crash, Flume guarantees that all the log records being ingested will end up in Hadoop. However, there is no guarantee that each of these records would be stored exactly once. In the case of a Flume agent crash, you may end up with some of the log records duplicated, hence the need for deduplicating them.
writing the deduplicated data into a temporary table and then moving it to a final table
DEDUPLICATION
INSERT INTO table $deduped_log
SELECT
ip,
ts,
url,
referrer,
user_agent,
YEAR(FROM_UNIXTIME(unix_ts)) year,
MONTH(FROM_UNIXTIME(unix_ts)) month
FROM (
SELECT
ip,
ts,
url,
referrer,
user_agent,
UNIX_TIMESTAMP(ts,'dd/MMM/yyyy:HH:mm:ss') unix_ts
FROM
$raw_log
WHERE
year=${YEAR} AND
month=${MONTH} AND
day=${DAY}
GROUP BY
ip,
ts,
url,
referrer,
user_agent,
UNIX_TIMESTAMP(ts,'dd/MMM/yyyy:HH:mm:ss')
) t;
rawlogs = LOAD '$raw_log_dir';
dedupedlogs = DISTINCT rawlogs;
STORE dedupedlogs INTO '$deduped_log_dir' USING PigStorage();
Sessionization
JavaPairRDD<String,SerializableLogLine> parsed =
dataSet.map(new PairFunction<String, String, SerializableLogLine>() {
@Override
public Tuple2<String, SerializableLogLine> call(String s)
throws Exception {
return new Tuple2<String,SerializableLogLine>(getIP(s),getFields(s));
}
});
// Sessionize function - take a sequence of events from one IP,
// sort by timestamp and mark all events less than 30 minutes apart as a session
public static List<SerializableLogLine> sessionize
(Iterable<SerializableLogLine> lines) {
List<SerializableLogLine> sessionizedLines = Lists.newArrayList(lines);
Collections.sort(sessionizedLines);
int sessionId = 0;
sessionizedLines.get(0).setSessionid(sessionId);
for (int i = 1; i < sessionizedLines.size(); i++) {
SerializableLogLine thisLine = sessionizedLines.get(i);
SerializableLogLine prevLine = sessionizedLines.get(i - 1);
if (thisLine.getTimestamp() - prevLine.getTimestamp() > 30 * 60 * 1000) {
sessionId++;
}
thisLine.setSessionid(sessionId);
}
return sessionizedLines;
}
// This groups clicks by IP address
JavaPairRDD<String,List<SerializableLogLine>> grouped = parsed.groupByKey();
JavaPairRDD<String, Iterable<SerializableLogLine>> sessionized =
grouped.mapValues(new Function<Iterable<SerializableLogLine>,
Iterable<SerializableLogLine>>() {
@Override
public Iterable<SerializableLogLine> call
(Iterable<SerializableLogLine> logLines) throws
Exception {
return sessionize(logLines);
}
});
answer how much time, on average, shoppers spend on your website
SELECT
AVG(session_length)/60 avg_min
FROM (
SELECT
MAX(ts) - MIN(ts) session_length_in_sec
FROM
apache_log_parquet
GROUP BY
session_id
) t
find the bounce rate of your website—that is, the percentage of users who leave the website without visiting any other page except the one they landed on:
SELECT
(SUM(CASE WHEN count!=1 THEN 0 ELSE 1 END))*100/(COUNT(*)) bounce_rate
FROM (
SELECT
session_id,
COUNT(*) count
FROM
apache_log_parquet
GROUP BY
session_id)t;
anomaly detection or outlier detection
The downside to combining all the values is that you can no longer change them atomically. This is why this strategy of combining values is mostly reserved to columns that don’t change much or always change together.
USING SHORT COLUMN NAMES
Spark Streaming can store counts in an RDD that can be incremented in a reduce process.
static HConnection hConnection;
hConnection = HConnectionManager.createConnection(hbaseConfig);
byte[] rowKey = HBaseUtils.convertKeyToRowKey("profileCacheTableName", userId);
Get get = new Get(rowKey);
final HTableInterface table = hConnection.getTable("profileCacheTableName");
Result result = table.get(get); 1
NavigableMap<byte[], byte[]> userProfile = result
.getFamilyMap("profile");
UserProfileUtils.createUserProfile(familyMap);
public static UserProfile createUserProfile(NavigableMap<byte[], byte[]> familyMap) throws JSONException {
long timeStamp = Bytes.toLong(familyMap.get(HBaseTableMetaModel.profileCacheTsColumn));
String jsonString = Bytes.toString(familyMap.get(HBaseTableMetaModel.profileCacheJsonColumn));
LOG.info("createUserProfile: " + jsonString);
UserProfile userProfile = new UserProfile(jsonString, timeStamp);
return userProfile;
}
We are using the checkAndPut() method to avoid race conditions where we accidentally override a profile that was already updated by another web server. If the timestamp in HBase does not match the timestamp in our app, we have an outdated copy. We’ll need to get a new copy from HBase, make our updates on it, and try writing it to HBase again. This line usually appears in a while() statement, so if the checkAndPut() fails, we reload the new profile, update it, and try again.
byte[] rowKey = HBaseUtils.convertKeyToRowKey(HBaseTableMetaModel.profileCacheTableName, entry.getKey().userId);
Put put = new Put(rowKey);
put.add(HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheJsonColumn,
Bytes.toBytes(entry.getKey().getJSONObject().toString()));
put.add(HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheTsColumn,
Bytes.toBytes(Long.toString(System.currentTimeMillis())));
long timeStamp = entry.getKey().lastUpdatedTimeStamp;
while (!table.checkAndPut(rowKey,
HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheTsColumn,
Bytes.toBytes(Long.toString(timeStamp)),
put)) {
//We reached here because someone else modified out userProfile
Get get = new Get(rowKey);
Result result = table.get(get);
NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(HBaseTableMetaModel.profileCacheColumnFamily);
timeStamp = Bytes.toLong(familyMap.get(HBaseTableMetaModel.profileCacheTsColumn));
UserProfile userProfile = new UserProfile(
Bytes.toString(familyMap.get(HBaseTableMetaModel.profileCacheJsonColumn)), timeStamp);
userProfile.updateWithUserEvent(entry.getValue());
put = new Put(rowKey);
put.add(HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheJsonColumn,
Bytes.toBytes(userProfile.getJSONObject().toString()));
put.add(HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheTsColumn,
Bytes.toBytes(Long.toString(System.currentTimeMillis())));
}
Delivering Transaction Status
We could publish alerts to a topic on our Kafka or MQ system, allowing any downstream systems needing access to these alerts to subscribe to the appropriate topic.
Path Between the Client and Flume
client push, logfile pull, and a message queue between the client and the final sink.
FlumeFlusher
NettyAvroRpcClient client = null;
while (isRunning) {
if (client == null) {
client = getClient();
}
List<Event> eventActionList = new ArrayList<Event>();
List<Action> actionList = new ArrayList<Action>();
try {
for (int i = 0; i < MAX_BATCH_PUT_SIZE; i++) {
Action action = pendingFlumeSubmits.poll();
if (action == null) {
break;
}
Event event = new SimpleEvent();
event.setBody(Bytes.toBytes(action.getJSONObject().toString()));
eventActionList.add(event);
actionList.add(action);
}
if (eventActionList.size() > 0) {
client.appendBatch(eventActionList);
}
} catch (Throwable t) {}
}
LOGFILE PULL
MESSAGE QUEUE OR KAFKA IN THE MIDDLE
the advantage of having Spark receiving events from Flume is that we could have filtering before we send to Spark. This would allow us to reduce the load on our microbatching system.
client → Kafka → Flume/Spark
there is the big advantage of replaying the data here, which can come in handy for development and testing.
Kafka to Storm or Spark Streaming
ETL/ELT
Data archiving
Removing Duplicate Records by Primary Key
{PrimaryKey},{timeStamp},{value}
val dedupOriginalDataRDD = sc.hadoopFile(inputPath,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
1)
//Get the data in a key value format
val keyValueRDD = dedupOriginalDataRDD.map(t => {
val splits = t._2.toString.split(",")
(splits(0), (splits(1), splits(2)))})
//reduce by key so we will only get one record for every primary key
val reducedRDD = keyValueRDD.reduceByKey((a,b) => if (a._1.compareTo(b._1) > 0) a else b)
//Format the data to a human readable format and write it back out to HDFS
reducedRDD
.map(r => r._1 + "," + r._2._1 + "," + r._2._2)
.saveAsTextFile(outputPath)
The TextInputFormat provides functionality that will allow Spark or MapReduce jobs to break up a directory into files, which are then broken up into blocks to be processed by different tasks.
CREATE EXTERNAL TABLE COMPACTION_TABLE (
PRIMARY_KEY STRING,
TIME_STAMP BIGINT,
EVENT_VALUE STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'compaction_data';
SELECT
A.PRIMARY_KEY,
A.TIME_STAMP,
MAX(A.EVENT_VALUE)
FROM COMPACTION_TABLE A JOIN (
SELECT
PRIMARY_KEY AS P_KEY,
MAX(TIME_STAMP) as TIME_SP
FROM COMPACTION_TABLE
GROUP BY PRIMARY_KEY
) B
WHERE A.PRIMARY_KEY = B.P_KEY AND A.TIME_STAMP = B.TIME_SP
GROUP BY A.PRIMARY_KEY, A.TIME_STAMP
Pattern: Windowing Analysis
//Part 1 : Reading in the data
var originalDataRDD = sc.hadoopFile(inputPath,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
1).map(r => {
val splits = r._2.toString.split(",")
(new DataKey(splits(0), splits(1).toLong), splits(2).toInt)
})
//Part 2 : Partition to partition by primaryKey only
val partitioner = new Partitioner {
override def numPartitions: Int = numberOfPartitions
override def getPartition(key: Any): Int = {
Math.abs(key.asInstanceOf[DataKey].uniqueId.hashCode() % numPartitions)
}
}
//Part 3 : Partition and sort
val partedSortedRDD =
new ShuffledRDD[DataKey, Int, Int](
originalDataRDD,
partitioner).setKeyOrdering(implicitly[Ordering[DataKey]])
//Part 4 // MapPartition to do windowing
val pivotPointRDD = partedSortedRDD.mapPartitions(it => {
val results = new mutable.MutableList[PivotPoint]
//Part 5 // Keeping context
var lastUniqueId = "foobar"
var lastRecord: (DataKey, Int) = null
var lastLastRecord: (DataKey, Int) = null
var position = 0
it.foreach( r => {
position = position + 1
if (!lastUniqueId.equals(r._1.uniqueId)) {
lastRecord = null
lastLastRecord = null
}
//Part 6 : Finding those peaks and valleys
if (lastRecord != null && lastLastRecord != null) {
if (lastRecord._2 < r._2 && lastRecord._2 < lastLastRecord._2) {
results.+=(new PivotPoint(r._1.uniqueId,
position,
lastRecord._1.eventTime,
lastRecord._2,
false))
} else if (lastRecord._2 > r._2 && lastRecord._2 > lastLastRecord._2) {
results.+=(new PivotPoint(r._1.uniqueId,
position,
lastRecord._1.eventTime,
lastRecord._2,
true))
}
}
lastUniqueId = r._1.uniqueId
lastLastRecord = lastRecord
lastRecord = r
})
results.iterator
})
//Part 7 : pretty everything up
pivotPointRDD.map(r => {
val pivotType = if (r.isPeak) "peak" else "valley"
r.uniqueId + "," +
r.position + "," +
r.eventTime + "," +
r.eventValue + "," +
pivotType
} ).saveAsTextFile(outputPath)
Clickstream Analysis
/etl/BI/casualcyclist/clicks/rawlogs/year=2014/month=10/day=10
Sqoop is an excellent tool for moving data to and from external data stores such as a relational database management system.
Because our application only requires ingesting the log data into HDFS, we’ll select Flume, because it’s purpose-built for HDFS—the built-in components tailored for HDFS mean that we won’t have to do any custom development in order to build our Flume pipeline. Flume also supports interceptors—the ability to perform small transformations of the data.
If we’re working with a web application written in Java that uses Log4j for logging, we can use the Flume Log4j appender to send data to a Flume Avro source. This is one of the simplest ways to send events to Flume, and requires just the addition of a single file to the application classpath (flume-ng-sdk-*.jar) and a few modifications to Log4j properties.
Flume provides a syslog source, which can read events from a syslog stream and convert them into Flume events.
You can use the Avro or Thrift sources to send Avro or Thrift messages to Flume. Flume can also pull messages from a JMS queue. Another option is to send JSON messages to the HTTP source.
Flume’s spooling directory source. This source will read files from a directory and turn every line into a Flume event. Note that some examples found on the Web show how to use the exec source to tail a logfile. This is highly discouraged because it is not a reliable solution. Tail does not track the last place it read, and if the Flume agent crashes it will either read duplicate records or miss them, both of which are, of course, undesirable. By contrast, the spooling directory source will only read complete files, so in case of failure it will retry the entire file and mark files successfully read so they will not be ingested twice.
the memory channel is the best choice when performance is more important than reliability, and the file channel is recommended when reliability is more important.
we’re assuming the collector agents are running on cluster edge nodes—these are nodes within the cluster network that have a Hadoop client configuration so they can communicate with the Hadoop cluster in order to submit Hadoop commands, write data to HDFS, and so on.
client.sources=r1
client.sources.r1.channels=ch1
client.sources.r1.type=spooldir
client.sources.r1.spoolDir=/opt/weblogs
# Use the Timestamp interceptor to add a timestamp to all event headers:
client.sources.r1.interceptors.i1.type=timestamp
client.sources.r1.interceptors=i1
# Define a file channel:
client.channels=ch1
client.channels.ch1.type=FILE
# Define two Avro sinks:
client.sinks=k1 k2
client.sinks.k1.type=avro
client.sinks.k1.hostname=collector1.hadoopapplicationarch.com
# Compress data before sending across the wire:
client.sinks.k1.compression-type=deflate
client.sinks.k1.port=4141
client.sinks.k2.type=avro
client.sinks.k2.hostname=collector2.hadoopapplicationarch.com
client.sinks.k2.port=4141
client.sinks.k2.compression-type=deflate
client.sinks.k1.channel=ch1
client.sinks.k2.channel=ch1
# Define a load balancing sink group to spread load over multiple collectors:
client.sinkgroups=g1
client.sinkgroups.g1.sinks=k1 k2
client.sinkgroups.g1.processor.type=load_balance
client.sinkgroups.g1.processor.selector=round_robin
client.sinkgroups.g1.processor.backoff=true
# Define an Avro source:
collector.sources=r1
collector.sources.r1.type=avro
collector.sources.r1.bind=0.0.0.0
collector.sources.r1.port=4141
collector.sources.r1.channels=ch1
# Decompress the incoming data:
collector1.sources.r1.compression-type=deflate
# Define a file channel using multiple disks for reliability:
collector.channels=ch1
collector.channels.ch1.type=FILE
collector.channels.ch1.checkpointDir=/opt/flume/ch1/cp1,/opt/flume/ch1/cp2
collector.channels.ch1.dataDirs=/opt/flume/ch1/data1,/opt/flume/ch1/data2
# Define HDFS sinks to persist events to disk as text.
# Note the use of multiple sinks to spread the load:
collector.sinks=k1 k2
collector.sinks.k1.type=hdfs
collector.sinks.k1.channel=ch1
# Partition files by date:
collector.sinks.k1.hdfs.path=/weblogs/combined/%Y/%m/%d
collector.sinks.k1.hdfs.filePrefix=combined
collector.sinks.k1.hdfs.fileSuffix=.log
collector.sinks.k1.hdfs.fileType=DataStream
collector.sinks.k1.hdfs.writeFormat=Text
collector.sinks.k1.hdfs.batchSize=10000
# Roll HDFS files every 10000 events or 30 seconds:
collector.sinks.k1.hdfs.rollCount=10000
collector.sinks.k1.hdfs.rollSize=0
collector.sinks.k1.hdfs.rollInterval=30
collector.sinks.k2.type=hdfs
collector.sinks.k2.channel=ch1
# Partition files by date:
collector.sinks.k2.hdfs.path=/weblogs/combined/%Y/%m/%d
collector.sinks.k2.hdfs.filePrefix=combined
collector.sinks.k2.hdfs.fileSuffix=.log
collector.sinks.k2.hdfs.fileType=DataStream
collector.sinks.k2.hdfs.writeFormat=Text
collector.sinks.k2.hdfs.batchSize=10000
collector.sinks.k2.hdfs.rollCount=10000
collector.sinks.k2.hdfs.rollSize=0
collector.sinks.k2.hdfs.rollInterval=30
collector.sinkgroups=g1
collector.sinkgroups.g1.sinks=k1 k2
collector.sinkgroups.g1.processor.type=load_balance
collector.sinkgroups.g1.processor.selector=round_robin
collector.sinkgroups.g1.processor.backoff=true
In the rare event of a Flume agent crash, Flume guarantees that all the log records being ingested will end up in Hadoop. However, there is no guarantee that each of these records would be stored exactly once. In the case of a Flume agent crash, you may end up with some of the log records duplicated, hence the need for deduplicating them.
writing the deduplicated data into a temporary table and then moving it to a final table
DEDUPLICATION
INSERT INTO table $deduped_log
SELECT
ip,
ts,
url,
referrer,
user_agent,
YEAR(FROM_UNIXTIME(unix_ts)) year,
MONTH(FROM_UNIXTIME(unix_ts)) month
FROM (
SELECT
ip,
ts,
url,
referrer,
user_agent,
UNIX_TIMESTAMP(ts,'dd/MMM/yyyy:HH:mm:ss') unix_ts
FROM
$raw_log
WHERE
year=${YEAR} AND
month=${MONTH} AND
day=${DAY}
GROUP BY
ip,
ts,
url,
referrer,
user_agent,
UNIX_TIMESTAMP(ts,'dd/MMM/yyyy:HH:mm:ss')
) t;
rawlogs = LOAD '$raw_log_dir';
dedupedlogs = DISTINCT rawlogs;
STORE dedupedlogs INTO '$deduped_log_dir' USING PigStorage();
Sessionization
JavaPairRDD<String,SerializableLogLine> parsed =
dataSet.map(new PairFunction<String, String, SerializableLogLine>() {
@Override
public Tuple2<String, SerializableLogLine> call(String s)
throws Exception {
return new Tuple2<String,SerializableLogLine>(getIP(s),getFields(s));
}
});
// Sessionize function - take a sequence of events from one IP,
// sort by timestamp and mark all events less than 30 minutes apart as a session
public static List<SerializableLogLine> sessionize
(Iterable<SerializableLogLine> lines) {
List<SerializableLogLine> sessionizedLines = Lists.newArrayList(lines);
Collections.sort(sessionizedLines);
int sessionId = 0;
sessionizedLines.get(0).setSessionid(sessionId);
for (int i = 1; i < sessionizedLines.size(); i++) {
SerializableLogLine thisLine = sessionizedLines.get(i);
SerializableLogLine prevLine = sessionizedLines.get(i - 1);
if (thisLine.getTimestamp() - prevLine.getTimestamp() > 30 * 60 * 1000) {
sessionId++;
}
thisLine.setSessionid(sessionId);
}
return sessionizedLines;
}
// This groups clicks by IP address
JavaPairRDD<String,List<SerializableLogLine>> grouped = parsed.groupByKey();
JavaPairRDD<String, Iterable<SerializableLogLine>> sessionized =
grouped.mapValues(new Function<Iterable<SerializableLogLine>,
Iterable<SerializableLogLine>>() {
@Override
public Iterable<SerializableLogLine> call
(Iterable<SerializableLogLine> logLines) throws
Exception {
return sessionize(logLines);
}
});
answer how much time, on average, shoppers spend on your website
SELECT
AVG(session_length)/60 avg_min
FROM (
SELECT
MAX(ts) - MIN(ts) session_length_in_sec
FROM
apache_log_parquet
GROUP BY
session_id
) t
find the bounce rate of your website—that is, the percentage of users who leave the website without visiting any other page except the one they landed on:
SELECT
(SUM(CASE WHEN count!=1 THEN 0 ELSE 1 END))*100/(COUNT(*)) bounce_rate
FROM (
SELECT
session_id,
COUNT(*) count
FROM
apache_log_parquet
GROUP BY
session_id)t;
anomaly detection or outlier detection
The downside to combining all the values is that you can no longer change them atomically. This is why this strategy of combining values is mostly reserved to columns that don’t change much or always change together.
USING SHORT COLUMN NAMES
Spark Streaming can store counts in an RDD that can be incremented in a reduce process.
static HConnection hConnection;
hConnection = HConnectionManager.createConnection(hbaseConfig);
byte[] rowKey = HBaseUtils.convertKeyToRowKey("profileCacheTableName", userId);
Get get = new Get(rowKey);
final HTableInterface table = hConnection.getTable("profileCacheTableName");
Result result = table.get(get); 1
NavigableMap<byte[], byte[]> userProfile = result
.getFamilyMap("profile");
UserProfileUtils.createUserProfile(familyMap);
public static UserProfile createUserProfile(NavigableMap<byte[], byte[]> familyMap) throws JSONException {
long timeStamp = Bytes.toLong(familyMap.get(HBaseTableMetaModel.profileCacheTsColumn));
String jsonString = Bytes.toString(familyMap.get(HBaseTableMetaModel.profileCacheJsonColumn));
LOG.info("createUserProfile: " + jsonString);
UserProfile userProfile = new UserProfile(jsonString, timeStamp);
return userProfile;
}
We are using the checkAndPut() method to avoid race conditions where we accidentally override a profile that was already updated by another web server. If the timestamp in HBase does not match the timestamp in our app, we have an outdated copy. We’ll need to get a new copy from HBase, make our updates on it, and try writing it to HBase again. This line usually appears in a while() statement, so if the checkAndPut() fails, we reload the new profile, update it, and try again.
byte[] rowKey = HBaseUtils.convertKeyToRowKey(HBaseTableMetaModel.profileCacheTableName, entry.getKey().userId);
Put put = new Put(rowKey);
put.add(HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheJsonColumn,
Bytes.toBytes(entry.getKey().getJSONObject().toString()));
put.add(HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheTsColumn,
Bytes.toBytes(Long.toString(System.currentTimeMillis())));
long timeStamp = entry.getKey().lastUpdatedTimeStamp;
while (!table.checkAndPut(rowKey,
HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheTsColumn,
Bytes.toBytes(Long.toString(timeStamp)),
put)) {
//We reached here because someone else modified out userProfile
Get get = new Get(rowKey);
Result result = table.get(get);
NavigableMap<byte[], byte[]> familyMap = result.getFamilyMap(HBaseTableMetaModel.profileCacheColumnFamily);
timeStamp = Bytes.toLong(familyMap.get(HBaseTableMetaModel.profileCacheTsColumn));
UserProfile userProfile = new UserProfile(
Bytes.toString(familyMap.get(HBaseTableMetaModel.profileCacheJsonColumn)), timeStamp);
userProfile.updateWithUserEvent(entry.getValue());
put = new Put(rowKey);
put.add(HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheJsonColumn,
Bytes.toBytes(userProfile.getJSONObject().toString()));
put.add(HBaseTableMetaModel.profileCacheColumnFamily,
HBaseTableMetaModel.profileCacheTsColumn,
Bytes.toBytes(Long.toString(System.currentTimeMillis())));
}
Delivering Transaction Status
We could publish alerts to a topic on our Kafka or MQ system, allowing any downstream systems needing access to these alerts to subscribe to the appropriate topic.
Path Between the Client and Flume
client push, logfile pull, and a message queue between the client and the final sink.
FlumeFlusher
NettyAvroRpcClient client = null;
while (isRunning) {
if (client == null) {
client = getClient();
}
List<Event> eventActionList = new ArrayList<Event>();
List<Action> actionList = new ArrayList<Action>();
try {
for (int i = 0; i < MAX_BATCH_PUT_SIZE; i++) {
Action action = pendingFlumeSubmits.poll();
if (action == null) {
break;
}
Event event = new SimpleEvent();
event.setBody(Bytes.toBytes(action.getJSONObject().toString()));
eventActionList.add(event);
actionList.add(action);
}
if (eventActionList.size() > 0) {
client.appendBatch(eventActionList);
}
} catch (Throwable t) {}
}
LOGFILE PULL
MESSAGE QUEUE OR KAFKA IN THE MIDDLE
the advantage of having Spark receiving events from Flume is that we could have filtering before we send to Spark. This would allow us to reduce the load on our microbatching system.
client → Kafka → Flume/Spark
there is the big advantage of replaying the data here, which can come in handy for development and testing.
Kafka to Storm or Spark Streaming
ETL/ELT
Data archiving
Removing Duplicate Records by Primary Key
{PrimaryKey},{timeStamp},{value}
val dedupOriginalDataRDD = sc.hadoopFile(inputPath,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
1)
//Get the data in a key value format
val keyValueRDD = dedupOriginalDataRDD.map(t => {
val splits = t._2.toString.split(",")
(splits(0), (splits(1), splits(2)))})
//reduce by key so we will only get one record for every primary key
val reducedRDD = keyValueRDD.reduceByKey((a,b) => if (a._1.compareTo(b._1) > 0) a else b)
//Format the data to a human readable format and write it back out to HDFS
reducedRDD
.map(r => r._1 + "," + r._2._1 + "," + r._2._2)
.saveAsTextFile(outputPath)
The TextInputFormat provides functionality that will allow Spark or MapReduce jobs to break up a directory into files, which are then broken up into blocks to be processed by different tasks.
CREATE EXTERNAL TABLE COMPACTION_TABLE (
PRIMARY_KEY STRING,
TIME_STAMP BIGINT,
EVENT_VALUE STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'compaction_data';
SELECT
A.PRIMARY_KEY,
A.TIME_STAMP,
MAX(A.EVENT_VALUE)
FROM COMPACTION_TABLE A JOIN (
SELECT
PRIMARY_KEY AS P_KEY,
MAX(TIME_STAMP) as TIME_SP
FROM COMPACTION_TABLE
GROUP BY PRIMARY_KEY
) B
WHERE A.PRIMARY_KEY = B.P_KEY AND A.TIME_STAMP = B.TIME_SP
GROUP BY A.PRIMARY_KEY, A.TIME_STAMP
Pattern: Windowing Analysis
//Part 1 : Reading in the data
var originalDataRDD = sc.hadoopFile(inputPath,
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text],
1).map(r => {
val splits = r._2.toString.split(",")
(new DataKey(splits(0), splits(1).toLong), splits(2).toInt)
})
//Part 2 : Partition to partition by primaryKey only
val partitioner = new Partitioner {
override def numPartitions: Int = numberOfPartitions
override def getPartition(key: Any): Int = {
Math.abs(key.asInstanceOf[DataKey].uniqueId.hashCode() % numPartitions)
}
}
//Part 3 : Partition and sort
val partedSortedRDD =
new ShuffledRDD[DataKey, Int, Int](
originalDataRDD,
partitioner).setKeyOrdering(implicitly[Ordering[DataKey]])
//Part 4 // MapPartition to do windowing
val pivotPointRDD = partedSortedRDD.mapPartitions(it => {
val results = new mutable.MutableList[PivotPoint]
//Part 5 // Keeping context
var lastUniqueId = "foobar"
var lastRecord: (DataKey, Int) = null
var lastLastRecord: (DataKey, Int) = null
var position = 0
it.foreach( r => {
position = position + 1
if (!lastUniqueId.equals(r._1.uniqueId)) {
lastRecord = null
lastLastRecord = null
}
//Part 6 : Finding those peaks and valleys
if (lastRecord != null && lastLastRecord != null) {
if (lastRecord._2 < r._2 && lastRecord._2 < lastLastRecord._2) {
results.+=(new PivotPoint(r._1.uniqueId,
position,
lastRecord._1.eventTime,
lastRecord._2,
false))
} else if (lastRecord._2 > r._2 && lastRecord._2 > lastLastRecord._2) {
results.+=(new PivotPoint(r._1.uniqueId,
position,
lastRecord._1.eventTime,
lastRecord._2,
true))
}
}
lastUniqueId = r._1.uniqueId
lastLastRecord = lastRecord
lastRecord = r
})
results.iterator
})
//Part 7 : pretty everything up
pivotPointRDD.map(r => {
val pivotType = if (r.isPeak) "peak" else "valley"
r.uniqueId + "," +
r.position + "," +
r.eventTime + "," +
r.eventValue + "," +
pivotType
} ).saveAsTextFile(outputPath)