Sunday, December 18, 2016

Spark Misc Part 3



https://www.packtpub.com/mapt/book/big_data_and_business_intelligence/9781787287945/3/ch03lvl1sec32/accumulators-and-implementing-bfs-in-spark
https://github.com/PacktPublishing/Frank-Kanes-Taming-Big-Data-with-Apache-Spark-and-Python
https://github.com/PacktPublishing/Frank-Kanes-Taming-Big-Data-with-Apache-Spark-and-Python/blob/8c2ad0a642556e398c38a2c7f15a8c7ad59e49d8/degrees-of-separation.py

hitCounter = sc.accumulator(0)
def convertToBFS(line):
fields = line.split()
heroID = int(fields[0])
connections = []
for connection in fields[1:]:
connections.append(int(connection))
color = 'WHITE'
distance = 9999
if (heroID == startCharacterID):
color = 'GRAY'
distance = 0
return (heroID, (connections, distance, color))
def createStartingRdd():
inputFile = sc.textFile("file:///sparkcourse/marvel-graph.txt")
return inputFile.map(convertToBFS)
def bfsMap(node):
characterID = node[0]
data = node[1]
connections = data[0]
distance = data[1]
color = data[2]
results = []
#If this node needs to be expanded...
if (color == 'GRAY'):
for connection in connections:
newCharacterID = connection
newDistance = distance + 1
newColor = 'GRAY'
if (targetCharacterID == connection):
hitCounter.add(1)
newEntry = (newCharacterID, ([], newDistance, newColor))
results.append(newEntry)
#We've processed this node, so color it black
color = 'BLACK'
#Emit the input node so we don't lose it.
results.append( (characterID, (connections, distance, color)) )
return results
def bfsReduce(data1, data2):
edges1 = data1[0]
edges2 = data2[0]
distance1 = data1[1]
distance2 = data2[1]
color1 = data1[2]
color2 = data2[2]
distance = 9999
color = 'WHITE'
edges = []
# See if one is the original node with its connections.
# If so preserve them.
if (len(edges1) > 0):
edges.extend(edges1)
if (len(edges2) > 0):
edges.extend(edges2)
# Preserve minimum distance
if (distance1 < distance):
distance = distance1
if (distance2 < distance):
distance = distance2
# Preserve darkest color
if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
color = color2
if (color1 == 'GRAY' and color2 == 'BLACK'):
color = color2
if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
color = color1
if (color2 == 'GRAY' and color1 == 'BLACK'):
color = color1
return (edges, distance, color)
#Main program here:
iterationRdd = createStartingRdd()
for iteration in range(0, 10):
print("Running BFS iteration# " + str(iteration+1))
# Create new vertices as needed to darken or reduce distances in the
# reduce stage. If we encounter the node we're looking for as a GRAY
# node, increment our accumulator to signal that we're done.
mapped = iterationRdd.flatMap(bfsMap)
# Note that mapped.count() action here forces the RDD to be evaluated, and
# that's the only reason our accumulator is actually updated.
print("Processing " + str(mapped.count()) + " values.")
if (hitCounter.value > 0):
print("Hit the target character! From " + str(hitCounter.value) \
+ " different direction(s).")
break
# Reducer combines data for each character ID, preserving the darkest
# color and shortest path.
iterationRdd = mapped.reduceByKey(bfsReduce)

http://www.theanalyticsuniverse.com/breadth-first-search-algorithm-bfs-in-apace-spark

https://medium.com/@KerrySheldon/breadth-first-search-in-apache-spark-d274403494ca
Let’s pretend that we have access to a TON of cell phone records for people that are involved in a criminal operation. And let’s assume that we are trying to figure out if, and how, a new potential target is connected to known people in the criminal network. With these records, we can answer the following questions:
  • Is this new potential target (target B) connected to a known target of the investigation (target A)?
  • How close is the connection — 1st degree (they are in direct contact with each other), 2nd (they share a connection), 3rd, etc.?
  • How strong is the connection? That is, how many people are connecting these two people at that level? 1, 10, or 50, etc.?
On the first pass, there will be one node that is ready to search — targetA’s node. When this function encounters that node, it will cycle through targetA’s connections and do the following:
  • mark the connection as ready to search on the next iterative pass
  • set its distance equal to 1 (since they are a first connection to targetA)
  • increment the accumulator/counter if we find targetB
  • change the search status of targetA to ‘SEARCHED’


apache zeppelin

https://github.com/cloudera/livy/wiki
LivyClient client = new LivyClientBuilder()
  .setURI(new URI(livyUrl))
  .build();

try {
  System.err.printf("Uploading %s to the Spark context...\n", piJar);
  client.uploadJar(new File(piJar)).get();

  System.err.printf("Running PiJob with %d samples...\n", samples);
  double pi = client.submit(new PiJob(samples)).get();

  System.out.println("Pi is roughly: " + pi);
} finally {
  client.stop(true);
}
http://livy.io/quickstart.html
https://github.com/cloudera/livy#rest-api

https://github.com/tobilg/docker-livy
docker run -p 8998:8998 -d tobilg/livy

By default livy runs on port 8998 (which can be changed with the livy_server_port config option). 

http://gethue.com/how-to-use-the-livy-spark-rest-job-server-for-interactive-spark-2-2/
curl localhost:8998/sessions
curl localhost:8998/sessions/0 | python -m json.tool
curl -X POST --data '{"kind": "spark"}' -H "Content-Type: application/json" localhost:8998/sessions
curl localhost:8998/sessions/0 | python -m json.tool
When the session state is idle, it means it is ready to accept statements! Lets compute 1 + 1

curl localhost:8998/sessions/0/statements -X POST -H 'Content-Type: application/json' -d '{"code":"1 + 1"}'
curl localhost:8998/sessions/0/statements/0
Statements are incrementing and all share the same context, so we can have a sequences

Spanning multiple statements
curl localhost:8998/sessions/5/statements -X POST -H 'Content-Type: application/json' -d '{"code":"a + 1"}'
Livy can manage multiple spark sessions, which each have their own contexts, but at the moment it doesn’t support a single session having multiple contexts.

http://gethue.com/how-to-use-the-livy-spark-rest-job-server-api-for-submitting-batch-jar-python-and-streaming-spark-jobs/
curl -X POST --data '{"file": "/user/romain/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi"}' -H "Content-Type: application/json" localhost:8998/batches
curl -X POST --data '{"file": "/usr/lib/spark/lib/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "args": ["100"]}' -H "Content-Type: application/json" localhost:8998/batches

curl -X POST --data '{"file": "/user/romain/spark-solr-1.0-SNAPSHOT.jar", "className": "com.lucidworks.spark.SparkApp", "args": ["twitter-to-solr", "-zkHost", "localhost:9983", "-collection", "tweets"], "files": ["/user/romain/twitter4j.properties"]}' -H "Content-Type: application/json" localhost:8998/batches

https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-livy-rest-interface
https://blogs.msdn.microsoft.com/pliu/2016/06/18/run-hue-spark-notebook-on-cloudera/
http://yular.github.io/2016/07/03/Apache-Livy-PySpark-Quickstart/
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/
Basically reduceByKey function works only for RDDs which contains key and value pairs kind of elements(i.e RDDs having tuple or Map as a data element). It is a transformation operation which means it is lazily evaluated. We need to pass one associative function as a parameter, which will be applied to the source RDD and will create a new RDD as with resulting values(i.e. key value pair). This operation is a wide operation as data shuffling may happen across the partitions.
The associative function (which accepts two arguments and returns a single element) should be Commutative and Associative in mathematical nature. That intuitively means, this function produces same result when repetitively applied on same set of RDD data with multiple partitions irrespective of element’s order. Additionally, it performs merging locally using reduce function and than sends records across the partitions for preparing the final results.
  • reduceByKey is a transformation operation in Spark hence it is lazily evaluated
  • It is a wide operation as it shuffles data from multiple partitions and creates another RDD
  • Before sending data across the partitions, it also merges the data locally using the same associative function for optimized data shuffling
  • It can only be used with RDDs which contains key and value pairs kind of elements
  • It accepts a Commutative and Associative function as an argument
    • The parameter function should have two arguments of the same data type
    • The return type of the function also must be same as argument types

Labels

Review (572) System Design (334) System Design - Review (198) Java (189) Coding (75) Interview-System Design (65) Interview (63) Book Notes (59) Coding - Review (59) to-do (45) Linux (43) Knowledge (39) Interview-Java (35) Knowledge - Review (32) Database (31) Design Patterns (31) Big Data (29) Product Architecture (28) MultiThread (27) Soft Skills (27) Concurrency (26) Cracking Code Interview (26) Miscs (25) Distributed (24) OOD Design (24) Google (23) Career (22) Interview - Review (21) Java - Code (21) Operating System (21) Interview Q&A (20) System Design - Practice (20) Tips (19) Algorithm (17) Company - Facebook (17) Security (17) How to Ace Interview (16) Brain Teaser (14) Linux - Shell (14) Redis (14) Testing (14) Tools (14) Code Quality (13) Search (13) Spark (13) Spring (13) Company - LinkedIn (12) How to (12) Interview-Database (12) Interview-Operating System (12) Solr (12) Architecture Principles (11) Resource (10) Amazon (9) Cache (9) Git (9) Interview - MultiThread (9) Scalability (9) Trouble Shooting (9) Web Dev (9) Architecture Model (8) Better Programmer (8) Cassandra (8) Company - Uber (8) Java67 (8) Math (8) OO Design principles (8) SOLID (8) Design (7) Interview Corner (7) JVM (7) Java Basics (7) Kafka (7) Mac (7) Machine Learning (7) NoSQL (7) C++ (6) Chrome (6) File System (6) Highscalability (6) How to Better (6) Network (6) Restful (6) CareerCup (5) Code Review (5) Hash (5) How to Interview (5) JDK Source Code (5) JavaScript (5) Leetcode (5) Must Known (5) Python (5)

Popular Posts