Storm Real-Time Processing Cookbook
Log Stream Processing
Creating a log agent - Logstash
input {
file {
type => "syslog"
# Wildcards work here :)
path => [ "/var/log/messages", "/var/log/system.*", "/var/log/*.log" ]
}
}
output {
stdout { }
redis {
host => "localhost"
data_type => "list"
key => "rawLogs"
}
}
The output plugin is the Redis plugin that will output the log to the Redis instance on localhost to a list called rawLogs.
Creating the log spout
The log topology will read all logs through the Redis channel that is fed by logstash;
Tuples in the log topology will carry a log domain object that encapsulates the data and parsing logic for a single log record or an entry in a logfile.
logstash submits log lines as separate JSON values into the Redis channel.
public class LogSpout extends BaseRichSpout {
public static final String LOG_CHANNEL = "log";
private Jedis jedis;
private String host;
private int port;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(FieldNames.LOG_ENTRY));
}
@Override
public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
host = conf.get(Conf.REDIS_HOST_KEY).toString();
port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
this.collector = spoutOutputCollector;
connectToRedis();
}
private void connectToRedis() {
jedis = new Jedis(host, port);
}
@Override
public void nextTuple() {
String content = jedis.rpop(LOG_CHANNEL);
if(content==null || "nil".equals(content)) {
try { Thread.sleep(300); } catch (InterruptedException e) {}
} else {
JSONObject obj=(JSONObject) JSONValue.parse(content);
LogEntry entry = new LogEntry(obj);
collector.emit(new Values(entry));
}
}
}
Rule-based analysis of the log stream
Filter logs that aren't important, and therefore should not be counted or stored. These often include log entries at the INFO or DEBUG levels (yes, these exist in production systems).
Analyze the log entry further and extract as much meaning and new fields as possible.
Enhance/update the log entry prior to storage.
Send notifications on when certain logs are received.
Correlate log events to derive new meaning.
Deal with changes in the log's structure and formatting.
public class LogRulesBolt extends BaseRichBolt {
private StatelessKnowledgeSession ksession;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
//TODO: load the rule definitions from an external agent instead of the classpath.
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add( ResourceFactory.newClassPathResource( "/Syslog.drl",
getClass() ), ResourceType.DRL );
if ( kbuilder.hasErrors() ) {
LOG.error( kbuilder.getErrors().toString() );
}
KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );
ksession = kbase.newStatelessKnowledgeSession();
}
@Override
public void execute(Tuple input) {
LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ENTRY);
if(entry == null){
LOG.fatal( "Received null or incorrect value from tuple" );
return;
}
ksession.execute( entry );
if(!entry.isFilter()){
LOG.debug("Emitting from Rules Bolt");
collector.emit(new Values(entry));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.LOG_ENTRY));
}
}
Drools is an open source implementation of a forward-chaining rules engine that is able to infer new values and execute the logic based on matching logic.
Drools:
rule "Host Correction"
when
l: LogEntry(sourceHost == "localhost")
then
l.setSourceHost("localhost.example.com");
end
rule "Filter By Type"
when
l: LogEntry(type != "syslog")
then
l.setFilter(true);
end
rule "Extract Fields"
salience 100//run later
when
l: LogEntry(filter != true)
then
String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
Matcher matcher = Pattern.compile(logEntryPattern).matcher(l.getMessage());
if(matcher.find()){
l.addField("_pid",matcher.group(1));
l.addField("_src",matcher.group(2));
}
end
salience ensures it is evaluated last. This ensures that it never extracts fields from filtered logs.
Indexing and persisting the log data
public class IndexerBolt extends BaseRichBolt {
private Client client;
private OutputCollector collector;
public static final String INDEX_NAME = "logstorm";
public static final String INDEX_TYPE = "logentry";
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
Node node;
if((Boolean)stormConf.get(backtype.storm.Config.TOPOLOGY_DEBUG) == true){
node = NodeBuilder.nodeBuilder().local(true).node();
} else {
String clusterName = (String) stormConf.get(Conf.ELASTIC_CLUSTER_NAME);
if(clusterName == null)
clusterName = Conf.DEFAULT_ELASTIC_CLUSTER;
node = NodeBuilder.nodeBuilder().clusterName(clusterName).node();
}
client = node.client();
}
@Override
public void execute(Tuple input) {
LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ENTRY);
if(entry == null){
LOG.fatal( "Received null or incorrect value from tuple" );
return;
}
String toBeIndexed = entry.toJSON().toJSONString();
IndexResponse response = client.prepareIndex(INDEX_NAME,INDEX_TYPE)
.setSource(toBeIndexed)
.execute().actionGet();
if(response == null)
LOG.error("Failed to index Tuple: " + input.toString());
else{
if(response.getId() == null)
LOG.error("Failed to index Tuple: " + input.toString());
else{
LOG.debug("Indexing success on Tuple: " + input.toString());
collector.emit(new Values(entry,response.getId()));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.LOG_ENTRY, FieldNames.LOG_INDEX_ID));
}
}
public class VolumeCountingBolt extends BaseRichBolt {
public static Logger LOG = Logger.getLogger(VolumeCountingBolt.class);
private OutputCollector collector;
public static final String FIELD_ROW_KEY = "RowKey";
public static final String FIELD_COLUMN = "Column";
public static final String FIELD_INCREMENT = "IncrementAmount"
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public static Long getMinuteForTime(Date time) {
Calendar c = Calendar.getInstance();
c.setTime(time);
c.set(Calendar.SECOND,0);
c.set(Calendar.MILLISECOND, 0);
return c.getTimeInMillis();
}
@Override
public void execute(Tuple input) {
LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ENTRY);
collector.emit(new Values(getMinuteForTime(entry.getTimestamp()), entry.getSource(),1L));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT));
}
}
public class LogTopology {
private TopologyBuilder builder = new TopologyBuilder();
private Config conf = new Config();
private LocalCluster cluster;
public LogTopology() {
builder.setSpout("logSpout", new LogSpout(), 10);
builder.setBolt("logRules", new LogRulesBolt(), 10).shuffleGrouping(
"logSpout");
builder.setBolt("indexer", new IndexerBolt(), 10).shuffleGrouping(
"logRules");
builder.setBolt("counter", new VolumeCountingBolt(), 10).shuffleGrouping("logRules");
CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt(
Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, VolumeCountingBolt.FIELD_INCREMENT );
logPersistenceBolt.setAckStrategy(AckStrategy.ACK_ON_RECEIVE);
builder.setBolt("countPersistor", logPersistenceBolt, 10)
.shuffleGrouping("counter");
// Maybe add:
// Stem and stop word counting per file
// The persister for the stem analysis (need to check the counting
// capability first on storm-cassandra)
conf.put(Conf.REDIS_PORT_KEY, Conf.DEFAULT_JEDIS_PORT);
conf.put(CassandraBolt.CASSANDRA_KEYSPACE, Conf.LOGGING_KEYSPACE);
}
public TopologyBuilder getBuilder() {
return builder;
}
public LocalCluster getLocalCluster() {
return cluster;
}
public Config getConf() {
return conf;
}
public void runLocal(int runTime) {
conf.setDebug(true);
conf.put(Conf.REDIS_HOST_KEY, "localhost");
conf.put(CassandraBolt.CASSANDRA_HOST, "localhost:9171");
cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
if (runTime > 0) {
Utils.sleep(runTime);
shutDownLocal();
}
}
public void shutDownLocal() {
if (cluster != null) {
cluster.killTopology("test");
cluster.shutdown();
}
}
public void runCluster(String name, String redisHost, String cassandraHost)
throws AlreadyAliveException, InvalidTopologyException {
conf.setNumWorkers(20);
conf.put(Conf.REDIS_HOST_KEY, redisHost);
conf.put(CassandraBolt.CASSANDRA_HOST,cassandraHost);
StormSubmitter.submitTopology(name, conf, builder.createTopology());
}
public static void main(String[] args) throws Exception {
LogTopology topology = new LogTopology();
if (args != null && args.length > 1) {
topology.runCluster(args[0], args[1], args[2]);
} else {
if (args != null && args.length == 1)
System.out
.println("Running in local mode, redis ip missing for cluster run");
topology.runLocal(10000);
}
}
}
Log Stream Processing
Creating a log agent - Logstash
input {
file {
type => "syslog"
# Wildcards work here :)
path => [ "/var/log/messages", "/var/log/system.*", "/var/log/*.log" ]
}
}
output {
stdout { }
redis {
host => "localhost"
data_type => "list"
key => "rawLogs"
}
}
The output plugin is the Redis plugin that will output the log to the Redis instance on localhost to a list called rawLogs.
Creating the log spout
The log topology will read all logs through the Redis channel that is fed by logstash;
Tuples in the log topology will carry a log domain object that encapsulates the data and parsing logic for a single log record or an entry in a logfile.
logstash submits log lines as separate JSON values into the Redis channel.
public class LogSpout extends BaseRichSpout {
public static final String LOG_CHANNEL = "log";
private Jedis jedis;
private String host;
private int port;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(FieldNames.LOG_ENTRY));
}
@Override
public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
host = conf.get(Conf.REDIS_HOST_KEY).toString();
port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
this.collector = spoutOutputCollector;
connectToRedis();
}
private void connectToRedis() {
jedis = new Jedis(host, port);
}
@Override
public void nextTuple() {
String content = jedis.rpop(LOG_CHANNEL);
if(content==null || "nil".equals(content)) {
try { Thread.sleep(300); } catch (InterruptedException e) {}
} else {
JSONObject obj=(JSONObject) JSONValue.parse(content);
LogEntry entry = new LogEntry(obj);
collector.emit(new Values(entry));
}
}
}
Rule-based analysis of the log stream
Filter logs that aren't important, and therefore should not be counted or stored. These often include log entries at the INFO or DEBUG levels (yes, these exist in production systems).
Analyze the log entry further and extract as much meaning and new fields as possible.
Enhance/update the log entry prior to storage.
Send notifications on when certain logs are received.
Correlate log events to derive new meaning.
Deal with changes in the log's structure and formatting.
public class LogRulesBolt extends BaseRichBolt {
private StatelessKnowledgeSession ksession;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
//TODO: load the rule definitions from an external agent instead of the classpath.
KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add( ResourceFactory.newClassPathResource( "/Syslog.drl",
getClass() ), ResourceType.DRL );
if ( kbuilder.hasErrors() ) {
LOG.error( kbuilder.getErrors().toString() );
}
KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );
ksession = kbase.newStatelessKnowledgeSession();
}
@Override
public void execute(Tuple input) {
LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ENTRY);
if(entry == null){
LOG.fatal( "Received null or incorrect value from tuple" );
return;
}
ksession.execute( entry );
if(!entry.isFilter()){
LOG.debug("Emitting from Rules Bolt");
collector.emit(new Values(entry));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.LOG_ENTRY));
}
}
Drools is an open source implementation of a forward-chaining rules engine that is able to infer new values and execute the logic based on matching logic.
Drools:
rule "Host Correction"
when
l: LogEntry(sourceHost == "localhost")
then
l.setSourceHost("localhost.example.com");
end
rule "Filter By Type"
when
l: LogEntry(type != "syslog")
then
l.setFilter(true);
end
rule "Extract Fields"
salience 100//run later
when
l: LogEntry(filter != true)
then
String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
Matcher matcher = Pattern.compile(logEntryPattern).matcher(l.getMessage());
if(matcher.find()){
l.addField("_pid",matcher.group(1));
l.addField("_src",matcher.group(2));
}
end
salience ensures it is evaluated last. This ensures that it never extracts fields from filtered logs.
Indexing and persisting the log data
public class IndexerBolt extends BaseRichBolt {
private Client client;
private OutputCollector collector;
public static final String INDEX_NAME = "logstorm";
public static final String INDEX_TYPE = "logentry";
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
Node node;
if((Boolean)stormConf.get(backtype.storm.Config.TOPOLOGY_DEBUG) == true){
node = NodeBuilder.nodeBuilder().local(true).node();
} else {
String clusterName = (String) stormConf.get(Conf.ELASTIC_CLUSTER_NAME);
if(clusterName == null)
clusterName = Conf.DEFAULT_ELASTIC_CLUSTER;
node = NodeBuilder.nodeBuilder().clusterName(clusterName).node();
}
client = node.client();
}
@Override
public void execute(Tuple input) {
LogEntry entry = (LogEntry)input.getValueByField(FieldNames.LOG_ENTRY);
if(entry == null){
LOG.fatal( "Received null or incorrect value from tuple" );
return;
}
String toBeIndexed = entry.toJSON().toJSONString();
IndexResponse response = client.prepareIndex(INDEX_NAME,INDEX_TYPE)
.setSource(toBeIndexed)
.execute().actionGet();
if(response == null)
LOG.error("Failed to index Tuple: " + input.toString());
else{
if(response.getId() == null)
LOG.error("Failed to index Tuple: " + input.toString());
else{
LOG.debug("Indexing success on Tuple: " + input.toString());
collector.emit(new Values(entry,response.getId()));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.LOG_ENTRY, FieldNames.LOG_INDEX_ID));
}
}
public class VolumeCountingBolt extends BaseRichBolt {
public static Logger LOG = Logger.getLogger(VolumeCountingBolt.class);
private OutputCollector collector;
public static final String FIELD_ROW_KEY = "RowKey";
public static final String FIELD_COLUMN = "Column";
public static final String FIELD_INCREMENT = "IncrementAmount"
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public static Long getMinuteForTime(Date time) {
Calendar c = Calendar.getInstance();
c.setTime(time);
c.set(Calendar.SECOND,0);
c.set(Calendar.MILLISECOND, 0);
return c.getTimeInMillis();
}
@Override
public void execute(Tuple input) {
LogEntry entry = (LogEntry) input.getValueByField(FieldNames.LOG_ENTRY);
collector.emit(new Values(getMinuteForTime(entry.getTimestamp()), entry.getSource(),1L));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELD_ROW_KEY, FIELD_COLUMN, FIELD_INCREMENT));
}
}
public class LogTopology {
private TopologyBuilder builder = new TopologyBuilder();
private Config conf = new Config();
private LocalCluster cluster;
public LogTopology() {
builder.setSpout("logSpout", new LogSpout(), 10);
builder.setBolt("logRules", new LogRulesBolt(), 10).shuffleGrouping(
"logSpout");
builder.setBolt("indexer", new IndexerBolt(), 10).shuffleGrouping(
"logRules");
builder.setBolt("counter", new VolumeCountingBolt(), 10).shuffleGrouping("logRules");
CassandraCounterBatchingBolt logPersistenceBolt = new CassandraCounterBatchingBolt(
Conf.COUNT_CF_NAME, VolumeCountingBolt.FIELD_ROW_KEY, VolumeCountingBolt.FIELD_INCREMENT );
logPersistenceBolt.setAckStrategy(AckStrategy.ACK_ON_RECEIVE);
builder.setBolt("countPersistor", logPersistenceBolt, 10)
.shuffleGrouping("counter");
// Maybe add:
// Stem and stop word counting per file
// The persister for the stem analysis (need to check the counting
// capability first on storm-cassandra)
conf.put(Conf.REDIS_PORT_KEY, Conf.DEFAULT_JEDIS_PORT);
conf.put(CassandraBolt.CASSANDRA_KEYSPACE, Conf.LOGGING_KEYSPACE);
}
public TopologyBuilder getBuilder() {
return builder;
}
public LocalCluster getLocalCluster() {
return cluster;
}
public Config getConf() {
return conf;
}
public void runLocal(int runTime) {
conf.setDebug(true);
conf.put(Conf.REDIS_HOST_KEY, "localhost");
conf.put(CassandraBolt.CASSANDRA_HOST, "localhost:9171");
cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
if (runTime > 0) {
Utils.sleep(runTime);
shutDownLocal();
}
}
public void shutDownLocal() {
if (cluster != null) {
cluster.killTopology("test");
cluster.shutdown();
}
}
public void runCluster(String name, String redisHost, String cassandraHost)
throws AlreadyAliveException, InvalidTopologyException {
conf.setNumWorkers(20);
conf.put(Conf.REDIS_HOST_KEY, redisHost);
conf.put(CassandraBolt.CASSANDRA_HOST,cassandraHost);
StormSubmitter.submitTopology(name, conf, builder.createTopology());
}
public static void main(String[] args) throws Exception {
LogTopology topology = new LogTopology();
if (args != null && args.length > 1) {
topology.runCluster(args[0], args[1], args[2]);
} else {
if (args != null && args.length == 1)
System.out
.println("Running in local mode, redis ip missing for cluster run");
topology.runLocal(10000);
}
}
}