Tuesday, January 5, 2016

Storm Applied: Strategies for real-time event processing



Storm Applied: Strategies for real-time event processing
https://github.com/Storm-Applied
HeatMapBuilder

Other approaches to consider include locally caching geocoding results within your data center to avoid making unnecessary invocations to Google’s API.

Tick tuples
Sometimes you need to trigger an action periodically, such as aggregating a batch of data or flushing some writes to a database. Storm has a feature called tick tuples to handle this eventuality. Tick tuples can be configured to be received at a user-defined frequency and when configured, the execute method on the bolt will receive the tick tuple at the given frequency. You need to inspect the tuple to determine whether it’s one of these system-emitted tick tuples or whether it’s a normal tuple. Normal tuples within a topology will flow through the default stream, whereas tick tuples are flowing through a system tick stream, making them easily identifiable.

Tick tuples that are sent to a bolt are queued behind the other tuples currently waiting to be consumed by the execute() method on that bolt. A bolt may not necessarily process the tick tuples at the frequency that they’re emitted if the bolt is lagging behind due to high latency in processing its regular stream of tuples.

execute() is processing only one tuple at a time. within a given bolt instance, there will never be multiple threads running through it.

Global grouping will ensure that the entire stream of tuples will go to one specific instance of HeatMapBuilder. Specifically, the entire stream will go to the instance of HeatMapBuilder with the lowest task ID (an ID assigned internally by Storm).
public class Checkins extends BaseRichSpout {
  private List<String> checkins;
  private int nextEmitIndex;
  private SpoutOutputCollector outputCollector;

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("time", "address"));
  }

  @Override
  public void open(Map map,
                   TopologyContext topologyContext,
                   SpoutOutputCollector spoutOutputCollector) {
    this.outputCollector = spoutOutputCollector;
    this.nextEmitIndex = 0;

    try {
      checkins = IOUtils.readLines(ClassLoader.getSystemResourceAsStream("checkins.txt"),
                                   Charset.defaultCharset().name());
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public void nextTuple() {
    String checkin = checkins.get(nextEmitIndex);
    String[] parts = checkin.split(",");
    Long time = Long.valueOf(parts[0]);
    String address = parts[1];
    outputCollector.emit(new Values(time, address));

    nextEmitIndex = (nextEmitIndex + 1) % checkins.size();
  }
}

public class GeocodeLookup extends BaseBasicBolt {
  private Geocoder geocoder;

  @Override
  public void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {
    fieldsDeclarer.declare(new Fields("time", "geocode"));
  }

  @Override
  public void prepare(Map stormConf,
                      TopologyContext context) {
    geocoder = new Geocoder();
  }

  @Override
  public void execute(Tuple tuple,
                      BasicOutputCollector outputCollector) {
    String address = tuple.getStringByField("address");
    Long time = tuple.getLongByField("time");

    GeocoderRequest request = new GeocoderRequestBuilder()
        .setAddress(address)
        .setLanguage("en")
        .getGeocoderRequest();
    GeocodeResponse response = geocoder.geocode(request);
    GeocoderStatus status = response.getStatus();
    if (GeocoderStatus.OK.equals(status)) {
      GeocoderResult firstResult = response.getResults().get(0);
      LatLng latLng = firstResult.getGeometry().getLocation();
      outputCollector.emit(new Values(time, latLng));
    }
  }
}

public class HeatMapBuilder extends BaseBasicBolt {
  private Map<Long, List<LatLng>> heatmaps;

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("time-interval", "hotzones"));
  }

  @Override
  public void prepare(Map stormConf,
                      TopologyContext context) {
    heatmaps = new HashMap<Long, List<LatLng>>();
  }

  @Override
  public Map<String, Object> getComponentConfiguration() {
    Config conf = new Config();
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
    return conf;
  }

  @Override
  public void execute(Tuple tuple,
                      BasicOutputCollector outputCollector) {
    if (isTickTuple(tuple)) {
      emitHeatmap(outputCollector);
    } else {
      Long time = tuple.getLongByField("time");
      LatLng geocode = (LatLng) tuple.getValueByField("geocode");

      Long timeInterval = selectTimeInterval(time);
      List<LatLng> checkins = getCheckinsForInterval(timeInterval);
      checkins.add(geocode);
    }
  }

  private boolean isTickTuple(Tuple tuple) {
    String sourceComponent = tuple.getSourceComponent();
    String sourceStreamId = tuple.getSourceStreamId();
    return sourceComponent.equals(Constants.SYSTEM_COMPONENT_ID)
        && sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
  }

  private void emitHeatmap(BasicOutputCollector outputCollector) {
    Long now = System.currentTimeMillis();
    Long emitUpToTimeInterval = selectTimeInterval(now);
    Set<Long> timeIntervalsAvailable = heatmaps.keySet();
    for (Long timeInterval : timeIntervalsAvailable) {
      if (timeInterval <= emitUpToTimeInterval) {
        List<LatLng> hotzones = heatmaps.remove(timeInterval);
        outputCollector.emit(new Values(timeInterval, hotzones));
      }
    }
  }

  private Long selectTimeInterval(Long time) {
    return time / (15 * 1000);
  }

  private List<LatLng> getCheckinsForInterval(Long timeInterval) {
    List<LatLng> hotzones = heatmaps.get(timeInterval);
    if (hotzones == null) {
      hotzones = new ArrayList<LatLng>();
      heatmaps.put(timeInterval, hotzones);
    }
    return hotzones;
  }
}

public class Persistor extends BaseBasicBolt {
  private final Logger logger = LoggerFactory.getLogger(Persistor.class);

  private Jedis jedis;
  private ObjectMapper objectMapper;

  @Override
  public void prepare(Map stormConf,
                      TopologyContext context) {
    jedis = new Jedis("localhost");
    objectMapper = new ObjectMapper();
  }

  @Override
  public void execute(Tuple tuple,
                      BasicOutputCollector outputCollector) {
    Long timeInterval = tuple.getLongByField("time-interval");
    List<LatLng> hz = (List<LatLng>) tuple.getValueByField("hotzones");
    List<String> hotzones = asListOfStrings(hz);

    try {
      String key = "checkins-" + timeInterval;
      String value = objectMapper.writeValueAsString(hotzones);
      jedis.set(key, value);
    } catch (Exception e) {
      logger.error("Error persisting for time: " + timeInterval, e);
    }
  }

  private List<String> asListOfStrings(List<LatLng> hotzones) {
    List<String> hotzonesStandard = new ArrayList<String>(hotzones.size());
    for (LatLng geoCoordinate : hotzones) {
      hotzonesStandard.add(geoCoordinate.toUrlValue());
    }
    return hotzonesStandard;
  }

  @Override
  public void cleanup() {
    if (jedis.isConnected()) {
      jedis.quit();
    }
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // No output fields to be declared
  }
}

public class StormTopologyBuilder {
  public static final String TOPOLOGY_NAME = "realtime-heatmap";
  public static final String CHECKINS_ID = "checkins";
  public static final String HEATMAP_BUILDER_ID = "heatmap-builder";
  public static final String GEOCODE_LOOKUP_ID = "geocode-lookup";
  public static final String PERSISTOR_ID = "persistor";

  public static StormTopology build() {
    return buildWithSpout(CHECKINS_ID, new Checkins());
  }

  public static StormTopology buildWithSpout(String spoutId, BaseRichSpout spout) {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(spoutId, spout);
    builder.setBolt(GEOCODE_LOOKUP_ID, new GeocodeLookup()).shuffleGrouping(spoutId);
    builder.setBolt(HEATMAP_BUILDER_ID, new HeatMapBuilder())
        .addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 3).globalGrouping(GEOCODE_LOOKUP_ID);
    builder.setBolt(PERSISTOR_ID, new Persistor()).shuffleGrouping(HEATMAP_BUILDER_ID);
    return builder.createTopology();
  }
}

public class LocalTopologyRunner {
  private static final int TEN_MINUTES = 600000;

  public static void main(String[] args) {
    Config config = new Config();
    config.setDebug(true);

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology(StormTopologyBuilder.TOPOLOGY_NAME, config, StormTopologyBuilder.build());

    Utils.sleep(TEN_MINUTES);
    cluster.killTopology(StormTopologyBuilder.TOPOLOGY_NAME);
    cluster.shutdown();
  }
}

SCALING THE TOPOLOGY
this topology operates in a serial fashion, processing one check-in at a time.
One property that makes Storm so alluring is how easy it is to parallelize workflows.

Parallelism hints
builder.setSpout("checkins", new Checkins(), 4);
the parallelism hint tells Storm how many check-in spouts to create.

GeocodeLookup is going to take a lot longer than receiving a check-in and handing it off to our bolt.
builder.setBolt("geocode-lookup", new GeocodeLookup(), 8);
this is telling Storm to create eight executors (threads) and run eight tasks (instances) of GeocodeLookup.

Executors and tasks
Executors are a thread of execution on the JVM, and tasks are the instances of our spouts and bolts running within a thread of execution.
Storm provides additional ways to scale by changing the number of worker nodes and JVMs.

builder.setBolt("geocode-lookup", new GeocodeLookup(), 8).setNumTasks(8)
each of those instances will spend most of its time waiting on network I/O. We suspect that this means GeocodeLookup is going to be a source of contention in the future and will need to be scaled up.
builder.setBolt("geocode-lookup", new GeocodeLookup(), 8).setNumTasks(64)
we can keep increasing the number of executors up to a maximum of 64 without stopping our topology.

A fields grouping is a type of stream grouping where tuples with the same value for a particular field name are always emitted to the same instance of a bolt.
Fields groupings work by consistently hashing tuples across a set number of bolts. To keep keys with the same value going to the same bolt, the number of bolts can’t change. If it did, tuples would start going to different bolts.

global grouping dictates that every tuple goes to one specific instance of HeatMapBuilder, increasing parallelism on it doesn’t have any effect; only one instance will be actively working on the stream.
With global grouping, we’re trading our ability to scale and introducing an intentional bottleneck with being able to see the entire stream of tuples in one specific bolt instance.

TimeIntervalExtractor bolt, which emits the time interval and geocoordinate
HeatMapBuilder bolt, which emits the time interval as well as a list of grouped geocoordinates

We scaled this bolt by replacing global grouping with fields grouping after some minor design changes.
if we were to use global grouping postaggregation, it’d be dealing with a smaller stream of tuples and we wouldn’t have as great a need for scale as we would preaggregation.

Adjusting the topology to address bottlenecks inherent within a data stream
we can refine our higher-level solution so that we’re delivering heat maps by time interval by city. When we add an additional level of grouping by city, we’ll have multiple data flows for a given time interval and they may flow through different instances of the HeatmapBuilder.

Design by breakdown into functional components
Giving each bolt a single responsibility makes it easy to work with a given bolt in isolation. It also makes it easy to scale a single bolt without interference from the rest of the topology because parallelism is tuned at the bolt level. Whether it’s scaling or troubleshooting a problem, when you can zoom in and focus your attention on a single component

With this pattern of topology design, we strive to minimize the number of repartitions within a topology. Every time there’s a repartitioning, tuples will be sent from one bolt to another across the network.

Design by breakdown into components at points of repartition
The topology operates within a distributed cluster. When tuples are emitted, they may travel across the cluster and this may incur network overhead.
With every emit, a tuple will need to be serialized and deserialized at the receiving point.
The higher the number of partitions, the higher the number of resources needed. Each bolt will require a number of executors and tasks and a queue in front for all the incoming tuples.

But certain things related to adapting the initial tuples (such as parsing, extracting, and converting) do fit within the responsibilities of a spout.

we can merge TimeIntervalExtractor with GeocodeLookup or the Checkins spout.
You can always start with the simplest functional components and then advance toward combining different operations together to reduce the number of partitions.

https://github.com/Storm-Applied/C4-Credit-card-authorization
A reliable data source with a corresponding reliable spout
An anchored tuple stream
A topology that acknowledges each tuple as it’s processed or notifies us of the failure
A fault-tolerant Storm cluster infrastructure

GUARANTEED MESSAGE PROCESSING
A tuple that’s emitted from a spout can result in many additional tuples being emitted by the downstream bolts. This creates a tuple tree, with the tuple emitted by the spout acting as the root. Storm creates and tracks a tuple tree for every tuple emitted by the spout. Storm will consider a tuple emitted by a spout to be fully processed when all the leaves in the tree for that tuple have been marked as processed.

Make sure you anchor to input tuples when emitting new tuples from a bolt.
Make sure your bolts tell Storm when they’ve finished processing an input tuple.

Directed acyclic graph and tuple trees
config.setMessageTimeoutSecs(60);.
BaseBasicBolt automatically provides anchoring and acking for us.
It’s generally helpful only in use cases where a single tuple enters the bolt and a single corresponding tuple is emitted from that bolt immediately.

A spout’s role in guaranteed message processing
When emitting the tuple, the spout provides a messageId that’s used to identify that particular tuple.
spoutOutputCollector.emit(tuple, messageId);

At-most-once processing
At-least-once processing
- you need a reliable spout with a reliable data source and an anchored stream with acked or failed tuples.
Exactly-once processing

 If you’re computing a non-idempotent result within a topology and will then store “doneness,” verify at the time you begin your unit of work that you’ll be able record it.

Moving from local to remote topologies
A master node runs a daemon called Nimbus, and the worker nodes each run a daemon called a Supervisor.
master, worker, and Zookeeper nodes.

each worker node has a Supervisor daemon that’s tasked with administering the worker processes and keeping them in a running state. If a Supervisor notices that one of the worker processes is down, it will immediately restart it.

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts