https://www.packtpub.com/mapt/book/big_data_and_business_intelligence/9781787287945/3/ch03lvl1sec32/accumulators-and-implementing-bfs-in-spark
https://github.com/PacktPublishing/Frank-Kanes-Taming-Big-Data-with-Apache-Spark-and-Python
https://github.com/PacktPublishing/Frank-Kanes-Taming-Big-Data-with-Apache-Spark-and-Python/blob/8c2ad0a642556e398c38a2c7f15a8c7ad59e49d8/degrees-of-separation.py
http://www.theanalyticsuniverse.com/breadth-first-search-algorithm-bfs-in-apace-spark
https://medium.com/@KerrySheldon/breadth-first-search-in-apache-spark-d274403494ca
apache zeppelin
https://github.com/cloudera/livy/wiki
https://github.com/cloudera/livy#rest-api
https://github.com/tobilg/docker-livy
http://gethue.com/how-to-use-the-livy-spark-rest-job-server-for-interactive-spark-2-2/
curl localhost:8998/sessions
curl localhost:8998/sessions/0 | python -m json.tool
curl -X POST --data '{"kind": "spark"}' -H "Content-Type: application/json" localhost:8998/sessions
curl localhost:8998/sessions/0 | python -m json.tool
When the session state is idle, it means it is ready to accept statements! Lets compute 1 + 1
curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"1 + 1"}'
curl localhost:8998/sessions/0/statements/0
Statements are incrementing and all share the same context, so we can have a sequences
Spanning multiple statements
curl localhost:8998/sessions/5/statements -X POST -H 'Content-Type: application/json' -d '{"code":"a + 1"}'
Livy can manage multiple spark sessions, which each have their own contexts, but at the moment it doesn’t support a single session having multiple contexts.
http://gethue.com/how-to-use-the-livy-spark-rest-job-server-api-for-submitting-batch-jar-python-and-streaming-spark-jobs/
curl -X POST --data '{"file": "/user/romain/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi"}' -H "Content-Type: application/json" localhost:8998/batches
curl -X POST --data '{"file": "/usr/lib/spark/lib/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "args": ["100"]}' -H "Content-Type: application/json" localhost:8998/batches
curl -X POST --data '{"file": "/user/romain/spark-solr-1.0-SNAPSHOT.jar", "className": "com.lucidworks.spark.SparkApp", "args": ["twitter-to-solr", "-zkHost", "localhost:9983", "-collection", "tweets"], "files": ["/user/romain/twitter4j.properties"]}' -H "Content-Type: application/json" localhost:8998/batches
https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-livy-rest-interface
https://blogs.msdn.microsoft.com/pliu/2016/06/18/run-hue-spark-notebook-on-cloudera/
http://yular.github.io/2016/07/03/Apache-Livy-PySpark-Quickstart/
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/
https://github.com/PacktPublishing/Frank-Kanes-Taming-Big-Data-with-Apache-Spark-and-Python
https://github.com/PacktPublishing/Frank-Kanes-Taming-Big-Data-with-Apache-Spark-and-Python/blob/8c2ad0a642556e398c38a2c7f15a8c7ad59e49d8/degrees-of-separation.py
hitCounter = sc.accumulator(0) | |
def convertToBFS(line): | |
fields = line.split() | |
heroID = int(fields[0]) | |
connections = [] | |
for connection in fields[1:]: | |
connections.append(int(connection)) | |
color = 'WHITE' | |
distance = 9999 | |
if (heroID == startCharacterID): | |
color = 'GRAY' | |
distance = 0 | |
return (heroID, (connections, distance, color)) | |
def createStartingRdd(): | |
inputFile = sc.textFile("file:///sparkcourse/marvel-graph.txt") | |
return inputFile.map(convertToBFS) | |
def bfsMap(node): | |
characterID = node[0] | |
data = node[1] | |
connections = data[0] | |
distance = data[1] | |
color = data[2] | |
results = [] | |
#If this node needs to be expanded... | |
if (color == 'GRAY'): | |
for connection in connections: | |
newCharacterID = connection | |
newDistance = distance + 1 | |
newColor = 'GRAY' | |
if (targetCharacterID == connection): | |
hitCounter.add(1) | |
newEntry = (newCharacterID, ([], newDistance, newColor)) | |
results.append(newEntry) | |
#We've processed this node, so color it black | |
color = 'BLACK' | |
#Emit the input node so we don't lose it. | |
results.append( (characterID, (connections, distance, color)) ) | |
return results | |
def bfsReduce(data1, data2): | |
edges1 = data1[0] | |
edges2 = data2[0] | |
distance1 = data1[1] | |
distance2 = data2[1] | |
color1 = data1[2] | |
color2 = data2[2] | |
distance = 9999 | |
color = 'WHITE' | |
edges = [] | |
# See if one is the original node with its connections. | |
# If so preserve them. | |
if (len(edges1) > 0): | |
edges.extend(edges1) | |
if (len(edges2) > 0): | |
edges.extend(edges2) | |
# Preserve minimum distance | |
if (distance1 < distance): | |
distance = distance1 | |
if (distance2 < distance): | |
distance = distance2 | |
# Preserve darkest color | |
if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')): | |
color = color2 | |
if (color1 == 'GRAY' and color2 == 'BLACK'): | |
color = color2 | |
if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')): | |
color = color1 | |
if (color2 == 'GRAY' and color1 == 'BLACK'): | |
color = color1 | |
return (edges, distance, color) | |
#Main program here: | |
iterationRdd = createStartingRdd() | |
for iteration in range(0, 10): | |
print("Running BFS iteration# " + str(iteration+1)) | |
# Create new vertices as needed to darken or reduce distances in the | |
# reduce stage. If we encounter the node we're looking for as a GRAY | |
# node, increment our accumulator to signal that we're done. | |
mapped = iterationRdd.flatMap(bfsMap) | |
# Note that mapped.count() action here forces the RDD to be evaluated, and | |
# that's the only reason our accumulator is actually updated. | |
print("Processing " + str(mapped.count()) + " values.") | |
if (hitCounter.value > 0): | |
print("Hit the target character! From " + str(hitCounter.value) \ | |
+ " different direction(s).") | |
break | |
# Reducer combines data for each character ID, preserving the darkest | |
# color and shortest path. | |
iterationRdd = mapped.reduceByKey(bfsReduce) |
http://www.theanalyticsuniverse.com/breadth-first-search-algorithm-bfs-in-apace-spark
https://medium.com/@KerrySheldon/breadth-first-search-in-apache-spark-d274403494ca
Let’s pretend that we have access to a TON of cell phone records for people that are involved in a criminal operation. And let’s assume that we are trying to figure out if, and how, a new potential target is connected to known people in the criminal network. With these records, we can answer the following questions:
- Is this new potential target (target B) connected to a known target of the investigation (target A)?
- How close is the connection — 1st degree (they are in direct contact with each other), 2nd (they share a connection), 3rd, etc.?
- How strong is the connection? That is, how many people are connecting these two people at that level? 1, 10, or 50, etc.?
On the first pass, there will be one node that is ready to search — targetA’s node. When this function encounters that node, it will cycle through targetA’s connections and do the following:
- mark the connection as ready to search on the next iterative pass
- set its distance equal to 1 (since they are a first connection to targetA)
- increment the accumulator/counter if we find targetB
- change the search status of targetA to ‘SEARCHED’
apache zeppelin
https://github.com/cloudera/livy/wiki
LivyClient client = new LivyClientBuilder() .setURI(new URI(livyUrl)) .build(); try { System.err.printf("Uploading %s to the Spark context...\n", piJar); client.uploadJar(new File(piJar)).get(); System.err.printf("Running PiJob with %d samples...\n", samples); double pi = client.submit(new PiJob(samples)).get(); System.out.println("Pi is roughly: " + pi); } finally { client.stop(true); }http://livy.io/quickstart.html
https://github.com/cloudera/livy#rest-api
https://github.com/tobilg/docker-livy
docker run -p 8998:8998 -d tobilg/livy
By default livy runs on port 8998 (which can be changed with the livy_server_port config
option).
curl localhost:8998/sessions
curl localhost:8998/sessions/0 | python -m json.tool
curl -X POST --data '{"kind": "spark"}' -H "Content-Type: application/json" localhost:8998/sessions
curl localhost:8998/sessions/0 | python -m json.tool
When the session state is idle, it means it is ready to accept statements! Lets compute 1 + 1
curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"1 + 1"}'
curl localhost:8998/sessions/0/statements/0
Statements are incrementing and all share the same context, so we can have a sequences
Spanning multiple statements
curl localhost:8998/sessions/5/statements -X POST -H 'Content-Type: application/json' -d '{"code":"a + 1"}'
Livy can manage multiple spark sessions, which each have their own contexts, but at the moment it doesn’t support a single session having multiple contexts.
http://gethue.com/how-to-use-the-livy-spark-rest-job-server-api-for-submitting-batch-jar-python-and-streaming-spark-jobs/
curl -X POST --data '{"file": "/user/romain/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi"}' -H "Content-Type: application/json" localhost:8998/batches
curl -X POST --data '{"file": "/usr/lib/spark/lib/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "args": ["100"]}' -H "Content-Type: application/json" localhost:8998/batches
curl -X POST --data '{"file": "/user/romain/spark-solr-1.0-SNAPSHOT.jar", "className": "com.lucidworks.spark.SparkApp", "args": ["twitter-to-solr", "-zkHost", "localhost:9983", "-collection", "tweets"], "files": ["/user/romain/twitter4j.properties"]}' -H "Content-Type: application/json" localhost:8998/batches
https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-livy-rest-interface
https://blogs.msdn.microsoft.com/pliu/2016/06/18/run-hue-spark-notebook-on-cloudera/
http://yular.github.io/2016/07/03/Apache-Livy-PySpark-Quickstart/
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/
Basically reduceByKey function works only for RDDs which contains key and value pairs kind of elements(i.e RDDs having tuple or Map as a data element). It is a transformation operation which means it is lazily evaluated. We need to pass one associative function as a parameter, which will be applied to the source RDD and will create a new RDD as with resulting values(i.e. key value pair). This operation is a wide operation as data shuffling may happen across the partitions.
The associative function (which accepts two arguments and returns a single element) should be Commutative and Associative in mathematical nature. That intuitively means, this function produces same result when repetitively applied on same set of RDD data with multiple partitions irrespective of element’s order. Additionally, it performs merging locally using reduce function and than sends records across the partitions for preparing the final results.
- reduceByKey is a transformation operation in Spark hence it is lazily evaluated
- It is a wide operation as it shuffles data from multiple partitions and creates another RDD
- Before sending data across the partitions, it also merges the data locally using the same associative function for optimized data shuffling
- It can only be used with RDDs which contains key and value pairs kind of elements
- It accepts a Commutative and Associative function as an argument
- The parameter function should have two arguments of the same data type
- The return type of the function also must be same as argument types