Apache Spark: Fast, Interactive, Cluster Computing

 
Matei Zaharia
, Mosharaf Chowdhury, Tathagata Das,
Ankur Dave, Justin Ma, Murphy McCauley, Michael Franklin,
Scott Shenker, Ion Stoica
 
Spark
 
Fast, Interactive, Language-Integrated
Cluster Computing
 
www.spark-project.org
 
Project Goals
 
Extend the MapReduce model to better support
two common classes of analytics apps:
»
Iterative
 algorithms (machine learning, graphs)
»
Interactive
 data mining
Enhance programmability:
»
Integrate into Scala programming language
»
Allow interactive use from Scala interpreter
 
Motivation
 
Most current cluster programming models are
based on 
acyclic data flow
 from stable storage
to stable storage
 
Motivation
Benefits of data flow:
 runtime can decide
where to run tasks and can automatically
recover from failures
 
Most current cluster programming models are
based on 
acyclic data flow
 from stable storage
to stable storage
 
Motivation
 
Acyclic data flow is inefficient for applications
that repeatedly reuse a 
working set
 of data:
»
Iterative
 algorithms (machine learning, graphs)
»
Interactive
 data mining tools (R, Excel, Python)
With current frameworks, apps reload data
from stable storage on each query
 
Solution: Resilient
Distributed Datasets (RDDs)
 
Allow apps to keep working sets in memory for
efficient reuse
Retain the attractive properties of MapReduce
»
Fault tolerance, data locality, scalability
Support a wide range of applications
Outline
Spark programming model
Implementation
User applications
Programming Model
 
Resilient distributed datasets (RDDs)
»
Immutable, partitioned collections of objects
»
Created through parallel 
transformations
 (map, filter,
groupBy, join, …) on data in stable storage
»
Can be
 cached
 for efficient reuse
Actions
 on RDDs
»
Count, reduce, collect, save, …
Example: Log Mining
Load error messages from a log into memory, then
interactively search for various patterns
 
lines = spark.textFile(“hdfs://...”)
errors = lines.
filter
(
_.startsWith(“ERROR”)
)
messages = errors.
map
(
_.split(‘\t’)(2)
)
cachedMsgs = messages.
cache
()
Block 1
Block 2
Block 3
 
cachedMsgs.
filter
(
_.contains(“foo”)
).
count
 
cachedMsgs.
filter
(
_.contains(“bar”)
).
count
 
. . .
 
tasks
 
results
Cache 1
Cache 2
Cache 3
Base RDD
Transformed RDD
Action
Result:
 full-text search of Wikipedia
in <1 sec (vs 20 sec for on-disk data)
Result:
 scaled to 1 TB data in 5-7 sec
(vs 170 sec for on-disk data)
 
RDD Fault Tolerance
 
RDDs maintain 
lineage
 information that can be
used to reconstruct lost partitions
Ex:
 
 
messages = textFile(...).
filter
(
_.startsWith(“ERROR”)
)
                        .
map
(
_.split(‘\t’)(2)
)
HDFS File
Filtered RDD
Mapped RDD
 
filter
(func = _.contains(...))
 
map
(func = _.split(...))
Example: Logistic Regression
Goal: find best line separating two sets of points
+
+
+
+
+
+
+
+
+
+
 
target
 
random initial line
 
Example: Logistic Regression
 
val data = spark.textFile(...).
map
(
readPoint
).
cache
()
 
var w = Vector.random(D)
 
for (i <- 1 to ITERATIONS) {
  val gradient = data.
map
(
p =>
    (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
  
).
reduce
(
_ + _
)
  w -= gradient
}
 
println("Final w: " + w)
 
Logistic Regression Performance
 
Spark Applications
 
In-memory data mining on Hive data (Conviva)
Predictive analytics (Quantifind)
City traffic prediction (Mobile Millennium)
Twitter spam classification (Monarch)
Collaborative filtering via matrix factorization
 
Spark Operations
 
Conviva GeoReport
 
Aggregations on many keys w/ same WHERE clause
40× gain comes from:
»
Not re-reading unused columns or filtered records
»
Avoiding repeated decompression
»
In-memory storage of deserialized objects
 
Time (hours)
 
Implementation
 
Runs on Apache Mesos to
share resources with
Hadoop & other apps
Can read from any Hadoop
input source (e.g. HDFS)
 
No changes to Scala compiler
 
Spark Scheduler
 
Dryad-like DAGs
Pipelines functions
within a stage
Cache-aware work
reuse & locality
Partitioning-aware
to avoid shuffles
 
Interactive Spark
 
Modified Scala interpreter to allow Spark to be
used interactively from the command line
Required two changes:
»
Modified wrapper code generation so that each line
typed has references to objects for its dependencies
»
Distribute generated classes over the network
 
What is Spark Streaming?
 
Framework for large scale stream processing
»
Scales to 100s of nodes
»
Can achieve second scale latencies
»
Integrates with Spark’s batch and interactive processing
»
Provides a simple batch-like API for implementing complex algorithm
»
Can absorb live data streams from Kafka, Flume, ZeroMQ, etc.
 
 
Motivation
 
Many important applications must process large streams of live data and
provide results in near-real-time
»
Social network trends
»
Website statistics
»
Intrustion detection systems
»
etc.
 
Require large clusters to handle workloads
 
Require latencies of few seconds
Need for a framework …
 
… for building such complex stream processing
applications
 
 
 
But what are the requirements
from such a framework?
 
Requirements
 
Scalable
 
to large clusters
Second-scale
 latencies
Simple
 
programming model
Case 
study
: Conviva, Inc.
 
Real-time monitoring of online video metadata
»
HBO, ESPN, ABC, SyFy, …
 
 
 
 
Two processing stacks
 
 
 
Custom-built distributed stream processing system
1000s complex metrics on millions of video sessions
Requires many dozens of nodes for processing
 
Hadoop backend for offline analysis
Generating daily and monthly reports
Similar computation as the streaming system
Custom-built distributed stream processing system
1000s complex metrics on millions of videos sessions
Requires many dozens of nodes for processing
Hadoop backend for offline analysis
Generating daily and monthly reports
Similar computation as the streaming system 
Case study: 
XYZ, Inc.
 
Any company who wants to process live streaming data has this problem
Twice 
the effort to implement any new function
Twice 
the number of bugs to solve
Twice 
the headache
 
 
Two processing stacks
 
 
 
Requirements
 
Scalable
 
to large clusters
Second-scale
 latencies
Simple
 
programming model
Integrated
 
with batch & interactive processing
Stateful Stream Processing
 
Traditional streaming systems have a
event-driven 
record-at-a-time
processing model
»
Each node has mutable state
»
For each record, update state & send new
records
 
State is lost if node dies!
 
Making stateful stream processing be
fault-tolerant is challenging
27
 
Requirements
 
Scalable
 
to large clusters
Second-scale
 latencies
Simple
 
programming model
Integrated
 
with batch & interactive processing
Efficient fault-tolerance
 
in stateful computations
 
Spark Streaming
 
29
 
Tathagata Das (TD)
UC Berkeley
Discretized Stream Processing
Run a streaming computation as a 
series of
very small, deterministic batch jobs
30
Spark
Spark
Streaming
batches of X
seconds
live data stream
 
Chop up the live stream into batches of X
seconds
Spark treats each batch of data as RDDs
and processes them using RDD operations
Finally, the processed results of the RDD
operations are returned in batches
 
Discretized Stream Processing
 
Run a streaming computation as a 
series of
very small, deterministic batch jobs
 
31
Spark
Spark
Streaming
batches of X
seconds
live data stream
 
Batch sizes as low as ½ second, latency ~ 1
second
Potential for combining batch processing
and streaming processing in the same
system
Example 1 – Get hashtags from Twitter
val 
tweets
 
= ssc.
twitterStream
(<Twitter username>, <Twitter
password>)
DStream
: a sequence of RDD representing a stream of
data
 
tweets DStream
stored in memory as an
RDD (immutable,
distributed)
 
Twitter Streaming API
Example 1 – Get hashtags from Twitter
val tweets = ssc.twitterStream(<Twitter username>, <Twitter
password>)
val 
hashTags 
= 
tweets
.
flatMap 
(status => getTags(status))
transformation
: modify data in one Dstream to create another
DStream
new DStream
new RDDs created
for every batch
 
hashTags Dstream
[#cat, #dog, … ]
Example 1 – Get hashtags from Twitter
val tweets = ssc.twitterStream(<Twitter username>, <Twitter
password>)
val hashTags = tweets.flatMap (status => getTags(status))
hashTags
.
saveAsHadoopFiles
("hdfs://...")
output operation
: to push data to external storage
flatMap
flatMap
flatMap
batch @
t+1
batch @ t
batch @
t+2
tweets DStream
hashTags DStream
every batch
saved to HDFS
Java Example
Scala
val 
tweets
 
= ssc.
twitterStream
(<Twitter username>, <Twitter password>)
val 
hashTags 
= 
tweets
.
flatMap 
(status => getTags(status))
hashTags
.
saveAsHadoopFiles
("hdfs://...")
Java
JavaDStream<Status> 
tweets
 = ssc.
twitterStream
(<Twitter username>, <Twitter
password>)
JavaDstream<String> 
hashTags 
= 
tweets
.
flatMap
(new Function<...> {  })
hashTags
.
saveAsHadoopFiles
("hdfs://...")
Function object to define the
transformation
Fault-tolerance
 
RDDs  remember the sequence of
operations that created it from the
original fault-tolerant input data
 
Batches of input data are replicated
in memory of multiple worker
nodes, therefore fault-tolerant
 
Data lost due to worker failure, can
be recomputed from input data
input data
replicated
in memory
flatMap
lost partitions
recomputed on
other workers
tweets
RDD
hashTags
RDD
Key concepts
 
DStream
 – sequence of RDDs representing a stream of data
»
Twitter, HDFS, Kafka, Flume, ZeroMQ, Akka Actor, TCP sockets
 
Transformations
 – modify data from on DStream to another
»
Standard RDD operations – map, countByValue, reduce, join, …
»
Stateful operations – window, countByValueAndWindow, …
 
Output Operations – send data to external entity
»
saveAsHadoopFiles – saves to HDFS
»
foreach – do anything with each batch of results
Example 2 – Count the hashtags
val tweets = ssc.twitterStream(<Twitter username>, <Twitter
password>)
val hashTags = tweets.flatMap (status => getTags(status))
val 
tagCounts
 = 
hashTags
.
countByValue
()
batch @
t+1
batch @ t
batch @
t+2
hashTags
tweets
 
tagCounts
[(#cat, 10), (#dog, 25), ... ]
Example 3 – Count the hashtags over last 10 mins
val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
val hashTags = tweets.flatMap (status => getTags(status))
val 
tagCounts
 = 
hashTags
.
window
(Minutes(10), Seconds(1)).
countByValue
()
sliding window
operation
window length
sliding interval
Example 3 – Counting the hashtags over last
10 mins
val 
tagCounts 
= 
hashTags
.
window
(Minutes(10),
Seconds(1)).
countByValue
()
 
sliding window
 
countByValue
count over all
the data in the
window
?
Smart window-based countByValue
val 
tagCounts
 = 
hashtags
.
countByValueAndWindow
(Minutes(10),
Seconds(1))
 
countByValue
add
 the
counts from
the new batch
in the window
subtract
 the
counts from
batch
before the
window
 
tagCounts
Slide Note
Embed
Share

Apache Spark, developed by Matei Zaharia and team at UC Berkeley, aims to enhance cluster computing by supporting iterative algorithms, interactive data mining, and programmability through integration with Scala. The motivation behind Spark's Resilient Distributed Datasets (RDDs) is to efficiently reuse working sets of data while retaining MapReduce properties like fault tolerance and scalability. The programming model revolves around RDDs as immutable, partitioned collections of objects that can be transformed in parallel. Spark provides a powerful solution for a wide range of applications by overcoming the limitations of current cluster programming models based on acyclic data flow.

  • Apache Spark
  • Cluster Computing
  • RDDs
  • Scala Programming
  • Data Mining

Uploaded on Jul 05, 2024 | 0 Views


Download Presentation

Please find below an Image/Link to download the presentation.

The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. Download presentation by click this link. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.

E N D

Presentation Transcript


  1. Spark Fast, Interactive, Language-Integrated Cluster Computing Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael Franklin, Scott Shenker, Ion Stoica www.spark-project.org UC BERKELEY

  2. Project Goals Extend the MapReduce model to better support two common classes of analytics apps: Iterative algorithms (machine learning, graphs) Interactive data mining Enhance programmability: Integrate into Scala programming language Allow interactive use from Scala interpreter

  3. Motivation Most current cluster programming models are based on acyclic data flow from stable storage to stable storage Map Reduce Input Output Map Reduce Map

  4. Motivation Most current cluster programming models are based on acyclic data flow from stable storage to stable storage Map Reduce Benefits of data flow: runtime can decide Input where to run tasks and can automatically Output Map recover from failures Reduce Map

  5. Motivation Acyclic data flow is inefficient for applications that repeatedly reuse a working set of data: Iterative algorithms (machine learning, graphs) Interactive data mining tools (R, Excel, Python) With current frameworks, apps reload data from stable storage on each query

  6. Solution: Resilient Distributed Datasets (RDDs) Allow apps to keep working sets in memory for efficient reuse Retain the attractive properties of MapReduce Fault tolerance, data locality, scalability Support a wide range of applications

  7. Outline Spark programming model Implementation User applications

  8. Programming Model Resilient distributed datasets (RDDs) Immutable, partitioned collections of objects Created through parallel transformations (map, filter, groupBy, join, ) on data in stable storage Can be cached for efficient reuse Actions on RDDs Count, reduce, collect, save,

  9. Example: Log Mining Load error messages from a log into memory, then interactively search for various patterns Cache 1 Base RDD Transformed RDD lines = spark.textFile( hdfs://... ) Worker results errors = lines.filter(_.startsWith( ERROR )) tasks messages = errors.map(_.split( \t )(2)) Block 1 Driver cachedMsgs = messages.cache() Action cachedMsgs.filter(_.contains( foo )).count Cache 2 cachedMsgs.filter(_.contains( bar )).count Worker . . . Cache 3 Block 2 Worker Result: scaled to 1 TB data in 5-7 sec Result: full-text search of Wikipedia in <1 sec (vs 20 sec for on-disk data) (vs 170 sec for on-disk data) Block 3

  10. RDD Fault Tolerance RDDs maintain lineage information that can be used to reconstruct lost partitions Ex: messages = textFile(...).filter(_.startsWith( ERROR )) .map(_.split( \t )(2)) HDFS File Filtered RDD Mapped RDD filter map (func = _.contains(...)) (func = _.split(...))

  11. Example: Logistic Regression Goal: find best line separating two sets of points random initial line target

  12. Example: Logistic Regression val data = spark.textFile(...).map(readPoint).cache() var w = Vector.random(D) for (i <- 1 to ITERATIONS) { val gradient = data.map(p => (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x ).reduce(_ + _) w -= gradient } println("Final w: " + w)

  13. Logistic Regression Performance 4500 4000 127 s / iteration 3500 Running Time (s) 3000 2500 Hadoop 2000 Spark 1500 1000 500 first iteration 174 s further iterations 6 s 0 1 5 10 20 30 Number of Iterations

  14. Spark Applications In-memory data mining on Hive data (Conviva) Predictive analytics (Quantifind) City traffic prediction (Mobile Millennium) Twitter spam classification (Monarch) Collaborative filtering via matrix factorization

  15. Spark Operations flatMap union join cogroup cross mapValues map filter sample groupByKey reduceByKey sortByKey Transformations (define a new RDD) collect reduce count save lookupKey Actions (return a result to driver program)

  16. Conviva GeoReport Hive 20 Spark 0.5 Time (hours) 0 5 10 15 20 Aggregations on many keys w/ same WHERE clause 40 gain comes from: Not re-reading unused columns or filtered records Avoiding repeated decompression In-memory storage of deserialized objects

  17. Implementation Runs on Apache Mesos to share resources with Hadoop & other apps Spark Hadoop MPI Mesos Can read from any Hadoop input source (e.g. HDFS) Node Node Node Node No changes to Scala compiler

  18. Spark Scheduler Dryad-like DAGs B: A: Pipelines functions within a stage G: Stage 1 groupBy F: D: Cache-aware work reuse & locality C: map E: join Partitioning-aware to avoid shuffles Stage 2 union Stage 3 = cached data partition

  19. Interactive Spark Modified Scala interpreter to allow Spark to be used interactively from the command line Required two changes: Modified wrapper code generation so that each line typed has references to objects for its dependencies Distribute generated classes over the network

  20. What is Spark Streaming? Framework for large scale stream processing Scales to 100s of nodes Can achieve second scale latencies Integrates with Spark s batch and interactive processing Provides a simple batch-like API for implementing complex algorithm Can absorb live data streams from Kafka, Flume, ZeroMQ, etc.

  21. Motivation Many important applications must process large streams of live data and provide results in near-real-time Social network trends Website statistics Intrustion detection systems etc. Require large clusters to handle workloads Require latencies of few seconds

  22. Need for a framework for building such complex stream processing applications But what are the requirements from such a framework?

  23. Requirements Scalableto large clusters Second-scale latencies Simpleprogramming model

  24. Case study: Conviva, Inc. Real-time monitoring of online video metadata HBO, ESPN, ABC, SyFy, Custom-built distributed stream processing system 1000s complex metrics on millions of video sessions Requires many dozens of nodes for processing Two processing stacks Hadoop backend for offline analysis Generating daily and monthly reports Similar computation as the streaming system

  25. Case study: XYZ, Inc. Any company who wants to process live streaming data has this problem Twice the effort to implement any new function Twice the number of bugs to solve Custom-built distributed stream processing system 1000s complex metrics on millions of videos sessions Requires many dozens of nodes for processing Twice the headache Two processing stacks Hadoop backend for offline analysis Generating daily and monthly reports Similar computation as the streaming system

  26. Requirements Scalableto large clusters Second-scale latencies Simpleprogramming model Integratedwith batch & interactive processing

  27. Stateful Stream Processing Traditional streaming systems have a event-driven record-at-a-time processing model Each node has mutable state For each record, update state & send new records mutable state input records node 1 node 3 State is lost if node dies! input records node 2 Making stateful stream processing be fault-tolerant is challenging 27

  28. Requirements Scalableto large clusters Second-scale latencies Simpleprogramming model Integratedwith batch & interactive processing Efficient fault-tolerancein stateful computations

  29. Spark Streaming Tathagata Das (TD) UC Berkeley 29

  30. Discretized Stream Processing Run a streaming computation as a series of very small, deterministic batch jobs live data stream Spark Streaming Chop up the live stream into batches of X seconds batches of X seconds Spark treats each batch of data as RDDs and processes them using RDD operations Finally, the processed results of the RDD operations are returned in batches Spark processed results 30

  31. Discretized Stream Processing Run a streaming computation as a series of very small, deterministic batch jobs live data stream Spark Streaming Batch sizes as low as second, latency ~ 1 second batches of X seconds Potential for combining batch processing and streaming processing in the same system Spark processed results 31

  32. Example 1 Get hashtags from Twitter val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) DStream: a sequence of RDD representing a stream of data batch @ t+1 batch @ t+2 Twitter Streaming API batch @ t tweets DStream stored in memory as an RDD (immutable, distributed)

  33. Example 1 Get hashtags from Twitter val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) transformation: modify data in one Dstream to create another DStream new DStream batch @ t+1 batch @ t+2 batch @ t tweets DStream flatMa p flatMa p flatMa p hashTags Dstream [#cat, #dog, ] new RDDs created for every batch

  34. Example 1 Get hashtags from Twitter val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) hashTags.saveAsHadoopFiles("hdfs://...") output operation: to push data to external storage batch @ t+1 batch @ t+2 batch @ t tweets DStream flatMap flatMap flatMap hashTags DStream save save save every batch saved to HDFS

  35. Java Example Scala val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) hashTags.saveAsHadoopFiles("hdfs://...") Java JavaDStream<Status> tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) Function object to define the transformation JavaDstream<String> hashTags = tweets.flatMap(new Function<...> { }) hashTags.saveAsHadoopFiles("hdfs://...")

  36. Fault-tolerance tweets RDD RDDs remember the sequence of operations that created it from the original fault-tolerant input data input data replicated in memory flatMap Batches of input data are replicated in memory of multiple worker nodes, therefore fault-tolerant hashTags RDD lost partitions recomputed on other workers Data lost due to worker failure, can be recomputed from input data

  37. Key concepts DStream sequence of RDDs representing a stream of data Twitter, HDFS, Kafka, Flume, ZeroMQ, Akka Actor, TCP sockets Transformations modify data from on DStream to another Standard RDD operations map, countByValue, reduce, join, Stateful operations window, countByValueAndWindow, Output Operations send data to external entity saveAsHadoopFiles saves to HDFS foreach do anything with each batch of results

  38. Example 2 Count the hashtags val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) val tagCounts = hashTags.countByValue() batch @ t+1 batch @ t+2 batch @ t tweets flatM ap flatM ap flatM ap hashTags map map map reduceByKe y reduceByKe y reduceByKe y tagCounts [(#cat, 10), (#dog, 25), ... ]

  39. Example 3 Count the hashtags over last 10 mins val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>) val hashTags = tweets.flatMap (status => getTags(status)) val tagCounts = hashTags.window(Minutes(10), Seconds(1)).countByValue() sliding window operation window length sliding interval

  40. Example 3 Counting the hashtags over last 10 mins val tagCounts = hashTags.window(Minutes(10), Seconds(1)).countByValue() t-1 t+2 t+3 t t+1 hashTags sliding window countByValue tagCounts count over all the data in the window

  41. Smart window-based countByValue val tagCounts = hashtags.countByValueAndWindow(Minutes(10), Seconds(1)) t-1 t+2 t+3 t t+1 hashTags countByValue add the counts from the new batch in the window + subtract the counts from batch before the window tagCounts + ?

Related


More Related Content

giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#