Understanding Apache Spark: Fast, Interactive, Cluster Computing
Apache Spark, developed by Matei Zaharia and team at UC Berkeley, aims to enhance cluster computing by supporting iterative algorithms, interactive data mining, and programmability through integration with Scala. The motivation behind Spark's Resilient Distributed Datasets (RDDs) is to efficiently reuse working sets of data while retaining MapReduce properties like fault tolerance and scalability. The programming model revolves around RDDs as immutable, partitioned collections of objects that can be transformed in parallel. Spark provides a powerful solution for a wide range of applications by overcoming the limitations of current cluster programming models based on acyclic data flow.
Download Presentation
Please find below an Image/Link to download the presentation.
The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. Download presentation by click this link. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.
E N D
Presentation Transcript
Spark Fast, Interactive, Language-Integrated Cluster Computing Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael Franklin, Scott Shenker, Ion Stoica www.spark-project.org UC BERKELEY
Project Goals Extend the MapReduce model to better support two common classes of analytics apps: Iterative algorithms (machine learning, graphs) Interactive data mining Enhance programmability: Integrate into Scala programming language Allow interactive use from Scala interpreter
Motivation Most current cluster programming models are based on acyclic data flow from stable storage to stable storage Map Reduce Input Output Map Reduce Map
Motivation Most current cluster programming models are based on acyclic data flow from stable storage to stable storage Map Reduce Benefits of data flow: runtime can decide Input where to run tasks and can automatically Output Map recover from failures Reduce Map
Motivation Acyclic data flow is inefficient for applications that repeatedly reuse a working set of data: Iterative algorithms (machine learning, graphs) Interactive data mining tools (R, Excel, Python) With current frameworks, apps reload data from stable storage on each query
Solution: Resilient Distributed Datasets (RDDs) Allow apps to keep working sets in memory for efficient reuse Retain the attractive properties of MapReduce Fault tolerance, data locality, scalability Support a wide range of applications
Outline Spark programming model Implementation User applications
Programming Model Resilient distributed datasets (RDDs) Immutable, partitioned collections of objects Created through parallel transformations (map, filter, groupBy, join, ) on data in stable storage Can be cached for efficient reuse Actions on RDDs Count, reduce, collect, save,
Example: Log Mining Load error messages from a log into memory, then interactively search for various patterns Cache 1 Base RDD Transformed RDD lines = spark.textFile( hdfs://... ) Worker results errors = lines.filter(_.startsWith( ERROR )) tasks messages = errors.map(_.split( \t )(2)) Block 1 Driver cachedMsgs = messages.cache() Action cachedMsgs.filter(_.contains( foo )).count Cache 2 cachedMsgs.filter(_.contains( bar )).count Worker . . . Cache 3 Block 2 Worker Result: scaled to 1 TB data in 5-7 sec Result: full-text search of Wikipedia in <1 sec (vs 20 sec for on-disk data) (vs 170 sec for on-disk data) Block 3
RDD Fault Tolerance RDDs maintain lineage information that can be used to reconstruct lost partitions Ex: messages = textFile(...).filter(_.startsWith( ERROR )) .map(_.split( \t )(2)) HDFS File Filtered RDD Mapped RDD filter map (func = _.contains(...)) (func = _.split(...))
Example: Logistic Regression Goal: find best line separating two sets of points random initial line target
Example: Logistic Regression val data = spark.textFile(...).map(readPoint).cache() var w = Vector.random(D) for (i <- 1 to ITERATIONS) { val gradient = data.map(p => (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ).reduce(_ + _) w -= gradient } println("Final w: " + w)
Logistic Regression Performance 4500 4000 127 s / iteration 3500 Running Time (s) 3000 2500 Hadoop 2000 Spark 1500 1000 500 first iteration 174 s further iterations 6 s 0 1 5 10 20 30 Number of Iterations
Spark Applications In-memory data mining on Hive data (Conviva) Predictive analytics (Quantifind) City traffic prediction (Mobile Millennium) Twitter spam classification (Monarch) Collaborative filtering via matrix factorization
Spark Operations flatMap union join cogroup cross mapValues map filter sample groupByKey reduceByKey sortByKey Transformations (define a new RDD) collect reduce count save lookupKey Actions (return a result to driver program)
Conviva GeoReport Hive 20 Spark 0.5 Time (hours) 0 5 10 15 20 Aggregations on many keys w/ same WHERE clause 40 gain comes from: Not re-reading unused columns or filtered records Avoiding repeated decompression In-memory storage of deserialized objects
Implementation Runs on Apache Mesos to share resources with Hadoop & other apps Spark Hadoop MPI Mesos Can read from any Hadoop input source (e.g. HDFS) Node Node Node Node No changes to Scala compiler
Spark Scheduler Dryad-like DAGs B: A: Pipelines functions within a stage G: Stage 1 groupBy F: D: Cache-aware work reuse & locality C: map E: join Partitioning-aware to avoid shuffles Stage 2 union Stage 3 = cached data partition
Interactive Spark Modified Scala interpreter to allow Spark to be used interactively from the command line Required two changes: Modified wrapper code generation so that each line typed has references to objects for its dependencies Distribute generated classes over the network
What is Spark Streaming? Framework for large scale stream processing Scales to 100s of nodes Can achieve second scale latencies Integrates with Spark s batch and interactive processing Provides a simple batch-like API for implementing complex algorithm Can absorb live data streams from Kafka, Flume, ZeroMQ, etc.
Motivation Many important applications must process large streams of live data and provide results in near-real-time Social network trends Website statistics Intrustion detection systems etc. Require large clusters to handle workloads Require latencies of few seconds
Need for a framework for building such complex stream processing applications But what are the requirements from such a framework?
Requirements Scalableto large clusters Second-scale latencies Simpleprogramming model
Case study: Conviva, Inc. Real-time monitoring of online video metadata HBO, ESPN, ABC, SyFy, Custom-built distributed stream processing system 1000s complex metrics on millions of video sessions Requires many dozens of nodes for processing Two processing stacks Hadoop backend for offline analysis Generating daily and monthly reports Similar computation as the streaming system
Case study: XYZ, Inc. Any company who wants to process live streaming data has this problem Twice the effort to implement any new function Twice the number of bugs to solve Custom-built distributed stream processing system 1000s complex metrics on millions of videos sessions Requires many dozens of nodes for processing Twice the headache Two processing stacks Hadoop backend for offline analysis Generating daily and monthly reports Similar computation as the streaming system
Requirements Scalableto large clusters Second-scale latencies Simpleprogramming model Integratedwith batch & interactive processing
Stateful Stream Processing Traditional streaming systems have a event-driven record-at-a-time processing model Each node has mutable state For each record, update state & send new records mutable state input records node 1 node 3 State is lost if node dies! input records node 2 Making stateful stream processing be fault-tolerant is challenging 27
Requirements Scalableto large clusters Second-scale latencies Simpleprogramming model Integratedwith batch & interactive processing Efficient fault-tolerancein stateful computations
Spark Streaming Tathagata Das (TD) UC Berkeley 29
Discretized Stream Processing Run a streaming computation as a series of very small, deterministic batch jobs live data stream Spark Streaming Chop up the live stream into batches of X seconds batches of X seconds Spark treats each batch of data as RDDs and processes them using RDD operations Finally, the processed results of the RDD operations are returned in batches Spark processed results 30
Discretized Stream Processing Run a streaming computation as a series of very small, deterministic batch jobs live data stream Spark Streaming Batch sizes as low as second, latency ~ 1 second batches of X seconds Potential for combining batch processing and streaming processing in the same system Spark processed results 31
Example 1 Get hashtags from Twitter val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) DStream: a sequence of RDD representing a stream of data batch @ t+1 batch @ t+2 Twitter Streaming API batch @ t tweets DStream stored in memory as an RDD (immutable, distributed)
Example 1 Get hashtags from Twitter val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) transformation: modify data in one Dstream to create another DStream new DStream batch @ t+1 batch @ t+2 batch @ t tweets DStream flatMa p flatMa p flatMa p hashTags Dstream [#cat, #dog, ] new RDDs created for every batch
Example 1 Get hashtags from Twitter val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) hashTags.saveAsHadoopFiles("hdfs://...") output operation: to push data to external storage batch @ t+1 batch @ t+2 batch @ t tweets DStream flatMap flatMap flatMap hashTags DStream save save save every batch saved to HDFS
Java Example Scala val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) hashTags.saveAsHadoopFiles("hdfs://...") Java JavaDStream<Status> tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) Function object to define the transformation JavaDstream<String> hashTags = tweets.flatMap(new Function<...> { }) hashTags.saveAsHadoopFiles("hdfs://...")
Fault-tolerance tweets RDD RDDs remember the sequence of operations that created it from the original fault-tolerant input data input data replicated in memory flatMap Batches of input data are replicated in memory of multiple worker nodes, therefore fault-tolerant hashTags RDD lost partitions recomputed on other workers Data lost due to worker failure, can be recomputed from input data
Key concepts DStream sequence of RDDs representing a stream of data Twitter, HDFS, Kafka, Flume, ZeroMQ, Akka Actor, TCP sockets Transformations modify data from on DStream to another Standard RDD operations map, countByValue, reduce, join, Stateful operations window, countByValueAndWindow, Output Operations send data to external entity saveAsHadoopFiles saves to HDFS foreach do anything with each batch of results
Example 2 Count the hashtags val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) val tagCounts = hashTags.countByValue() batch @ t+1 batch @ t+2 batch @ t tweets flatM ap flatM ap flatM ap hashTags map map map reduceByKe y reduceByKe y reduceByKe y tagCounts [(#cat, 10), (#dog, 25), ... ]
Example 3 Count the hashtags over last 10 mins val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) val tagCounts = hashTags.window(Minutes(10), Seconds(1)).countByValue() sliding window operation window length sliding interval
Example 3 Counting the hashtags over last 10 mins val tagCounts = hashTags.window(Minutes(10), Seconds(1)).countByValue() t-1 t+2 t+3 t t+1 hashTags sliding window countByValue tagCounts count over all the data in the window
Smart window-based countByValue val tagCounts = hashtags.countByValueAndWindow(Minutes(10), Seconds(1)) t-1 t+2 t+3 t t+1 hashTags countByValue add the counts from the new batch in the window + subtract the counts from batch before the window tagCounts + ?