Analyzing Big Graphs with Pregelix Dataflow Engine

Pregelix: Big(ger) Graph
Analytics on A Dataflow Engine
  Yingyi Bu (UC Irvine)
 Joint work with:
  Vinayak Borkar (UC Irvine) ,
  Michael J. Carey (UC Irvine),
     
  Tyson Condie
(UCLA),
     
  Jianfeng Jia (UC
Irvine)
Outline
Introduction
Pregel Semantics
The Pregel Logical Plan
The Pregelix System
Experimental Results
Related Work
Conclusions
Introduction
Big Graphs are becoming common
o
web graph
o
social network
o
......
Introduction
How Big are Big Graphs?
o
Web:  8.53 Billion pages in 2012
o
Facebook active users:  1.01 Billion
o
de Bruijn graph: 3 Billion nodes
o
......
Weapons for mining Big Graphs
o
Pregel (Google)
o
Giraph (Facebook, LinkedIn, Twitter, etc.)
o
Distributed GraphLab (CMU)
o
GraphX (Berkeley)
Programming Model
Think like a vertex
o
receive messages
o
update states
o
send messages
 
 
Programming Model
p
u
b
l
i
c
 
a
b
s
t
r
a
c
t
 
c
l
a
s
s
 
V
e
r
t
e
x
<
I
 
e
x
t
e
n
d
s
 
W
r
i
t
a
b
l
e
C
o
m
p
a
r
a
b
l
e
,
 
V
         extends 
Writable
, E 
extends
 
Writable
, M 
extends
 
Writable
>
         
implements
 
Writable
{
 
 
 
 
p
u
b
l
i
c
 
a
b
s
t
r
a
c
t
 
v
o
i
d
 
c
o
m
p
u
t
e
(
I
t
e
r
a
t
o
r
<
M
>
 
i
n
c
o
m
i
n
g
M
e
s
s
a
g
e
s
)
;
    .......
}
Vertex
Helper methods
o
sendMsg(I vertexId, M msg)
o
voteToHalt()
o
getSuperstep()
More APIs
Message Combiner
o
Combine messages
o
Reduce network traffic
Global Aggregator
o
Aggregate statistics over all live vertices
o
Done for each iteration
Graph Mutations
o
Add vertex
o
Delete vertex
o
A conflict resolution function
 
Pregel Semantics
Bulk-synchronous
o
A global barrier between iterations
Compute invocation
o
Once per active vertex in each superstep
o
A halted vertex is activated when receiving messages
Global halting
o
Each vertex is halted
o
No messages are in flight
Graph mutations
o
Partial ordering of operations
o
User-defined 
resolve
 function
superstep:3
halt: false
Process-centric runtime
Vertex { id: 1
    halt: false
    value: 3.0
    edges: (3,1.0),
(4,1.0)
 }
Vertex { id: 3
    halt: false
    value: 3.0
    edges: (2,1.0),
(3,1.0)
 }
<5, 1.0>
<4, 3.0>
worker-1
worker-2
master
message <id, payload>
control signal
Vertex { id: 2
    halt: false
    value: 2.0
    edges: (3,1.0),
(4,1.0)
}
Vertex{ id: 4
    halt: false
    value: 1.0
    edges: (1,1.0)
}
<3, 1.0>
<2, 3.0>
Issues and Opportunities
Out-of-core support
26 similar threads on
Giraph-users mailing list
during the past year!
“I’m trying to run the sample
connected components algorithm on a
large data set on a cluster, but I get a
“java.lang.OutOfMemoryError: Java
heap space” error.”
Issues and Opportunities
Physical flexibility
o
PageRank, SSSP, CC, Triangle Counting
o
Web graph, social network, RDF graph
o
8 machine school cluster, 200 machine Facebook
data center
O
n
e
-
s
i
z
e
 
f
i
t
s
-
a
l
l
?
Issues and Opportunities
Software simplicity
Network management
Message delivery
Memory management
Task scheduling
Pregel
GraphLab
Giraph
Hama
......
Vertex/map/msg data
structures
The Pregelix Approach
1.0
vid
edges
vid
payload
vid=vid
2
4
halt
false
false
value
2.0
1.0
(3,1.0),(4,1.0)
(1,1.0)
2
4
3.0
Msg
Vertex
5
1
3.0
1.0
1
false
3.0
(3,1.0),(4,1.0)
3
false
3.0
(2,1.0),(3,1.0)
3
vid
edges
1
halt
false
false
value
3.0
3.0
(3,1.0),(4,1.0)
(2,1.0),(3,1.0)
msg
NULL
1.0
5
1.0
NULL
NULL
NULL
2
false
2.0
(3,1.0),(4,1.0)
3.0
4
false
1.0
(1,1.0)
3.0
 
Relation
Schema
Vertex
Msg
GS
(vid, halt, value, edges)
(vid, payload)
(halt, aggregate, superstep)
Pregel UDFs
compute
o
Executed at each active vertex in each superstep
combine
o
Aggregation function for messages
aggregate
o
Aggregate function for the global states
resolve
o
Used to resolve graph mutations
Logical Plan
D2
D4,D5,D6
vid
 
combine
UDF
 Call (
compute
)
M.vid=V.vid
Vertex
i
(V)
Msg
i
(M)
Vertex
i+1
Msg
i+1
(
V.halt =false || M.payload != NULL
)
D3
D7
D1
Logical Plan
D1
Agg(aggregate)
Agg(bool-and)
D4
D5
UDF
 Call (
compute
)
GS
i+1
GS
i
(G)
superstep=G.superstep+1
D10
D9
D8
D2,D3,D6
D2,D3,D4,D
5
D1
vid
(
resolve
)
UDF
 Call (
compute
)
Vertex
i+1
D6
The Pregelix System
Network management
Message delivery
Memory management
Task scheduling
Vertex/map/msg data
structures
    Connection management
Data exchanging
Buffer
management
Task scheduling
Record/Index
management
A general purpose parallel dataflow engine
Operators
Access methods
Pregel Physical Plans
The Runtime
The Hyracks data-parallel execution engine
o
Out-of-core operators
o
Connectors
o
Access methods
o
User-configurable task scheduling
o
Extensibility
Runtime Choice?
Hyracks
Hadoop
Parallelism
Msg-2
3
vid
edges
vid
msg
vid=vid
2
4
halt
false
false
value
2.0
1.0
(3,1.0) (4,1.0)
(1,1.0)
2
4
3.0
3.0
vid
edges
2
4
halt
false
false
value
2.0
1.0
(3,1.0),(4,1.0)
(1,1.0)
msg
3.0
3.0
vid
edges
vid
msg
vid=vid
1
3
halt
false
false
value
3.0
3.0
(3,1.0) (4,1.0)
3
1.0
1.0
vid
edges
1
halt
false
false
value
3.0
3.0
(3,1.0),(4,1.0)
(2,1.0),(3,1.0)
msg
NULL
1.0
5
1.0
NULL
NULL
NULL
Worker-1
Worker-2
Msg-1
Vertex-1
Vertex-2
(2,1.0),(3,1.0)
2
5
3.0
1.0
output-Msg-1
3
4
1.0
3.0
output-Msg-2
vid
msg
vid
msg
5
Physical Choices
Vertex storage
B-Tree
LSM B-Tree
 
Group-by
o
Pre-clustered group-by
o
Sort-based group-by
o
HashSort group-by
Data redistribution
o
m-to-n merging partitioning connector
o
m-to-n partitioning connector
Join
o
Index Full outer join
o
Index Left outer join
Data Storage
Vertex
o
Partitioned B-tree or LSM B-tree
Msg
o
Partitioned local files, sorted
GS
o
Stored on HDFS
o
Cached in each worker
Physical Plan: Message
Combination
vid
combine
vid
combine
(Sort-based)
(Sort-based)
(Sort-based)
(Sort-
based)
(Sort-based)
(Sort-based)
Sort-Groupby-M-to-N-Partitioning
HashSort-Groupby-M-to-N-Partitioning
Sort-Groupby-M-to-N-Merge-Partitioning
HashSort-Groupby-M-to-N-Merge-Partitioning
M-to-N Partitioning Connector
M-To-N Partitioning Merging Connector
vid
combine
vid
combine
vid
combine
vid
combine
vid
combine
vid
combine
(Preclustered
)
(Sort-based)
(Preclustered
)
(Sort-
based)
(Preclustered
)
(Sort-based)
vid
combine
vid
combine
vid
combine
vid
combine
vid
combine
vid
combine
(Preclustered
)
(HashSort)
(Preclustered
)
(HashSort)
(Preclustered
)
(HashSort)
vid
combine
vid
combine
vid
combine
vid
combine
vid
combine
vid
combine
(HashSort)
(HashSort)
(HashSort)
(HashSort)
(HashSort)
(HashSort)
vid
combine
vid
combine
vid
combine
vid
combine
D1
Physical Plan: Message Delivery
Index Left Outer
Join
UDF
 Call (
compute
)
M.vid=V.vid
Vertex
i
(V)
Msg
i
(M)
(V.
halt = false || M.
paylod
 != NULL
)
UDF
 Call (
compute
)
Vertex
i
(V)
Msg
i
(M)
Vid
i
(I)
Vid
i+1
(
halt = false
)
Function Call (
NullMsg
)
Index Full Outer Join
Merge (
choose()
)
M.vid=I.vid
D11
D12
M.vid=V.vid
D1
D2 -- D6
D2 -- D6
Caching
Iteration-aware (sticky) scheduling?
o
1 Loc: location constraints
Caching of invariant data?
o
B-tree buffer pool -- customized flushing policy: never
flush dirty pages
o
File system cache -- free
Pregel, Giraph, GraphLab all
have caches for this kind of
iterative jobs.  What do you do
for caching?
Experimental Results
Setup
o
Machines
a UCI cluster ~ 32 machines
4 cores, 8GB memory, 2 disk drives.
o
Datasets
Yahoo! webmap (1,413,511,393 vertice, adjacency
list, ~70GB) and its samples.
The Billions of Tuples Challenge dataset
(172,655,479 vertices, adjacency list, ~17GB), its
samples, and its scale-ups.
o
Giraph
Latest trunk (revision 770)
4 vertex computation threads, 8GB JVM heap
Execution Time
I
n
-
m
e
m
o
r
y
O
u
t
-
o
f
-
c
o
r
e
I
n
-
m
e
m
o
r
y
O
u
t
-
o
f
-
c
o
r
e
Execution Time
I
n
-
m
e
m
o
r
y
O
u
t
-
o
f
-
c
o
r
e
I
n
-
m
e
m
o
r
y
O
u
t
-
o
f
-
c
o
r
e
Execution Time
I
n
-
m
e
m
o
r
y
O
u
t
-
o
f
-
c
o
r
e
I
n
-
m
e
m
o
r
y
O
u
t
-
o
f
-
c
o
r
e
Parallel Speedup
Parallel Scale-up
Throughput
Plan Flexibility
1
5
x
I
n
-
m
e
m
o
r
y
O
u
t
-
o
f
-
c
o
r
e
Software Simplicity
Lines-of-Code
o
Giraph: 32,197
o
Pregelix: 8,514
More systems
More Systems
Related Work
Parallel Data Management
o
Gama, GRACE, Teradata
o
Stratosphere (TU Berlin)
o
REX (UPenn)
o
AsterixDB (UCI)
Big Graph Processing Systems
o
Pregel (Google)
o
Giraph (Facebook, LinkedIn, Twitter, etc.)
o
Distributed GraphLab (CMU)
o
GraphX (Berkeley)
o
Hama (Sogou, etc.) 
--- Too slow!
Conclusions
Pregelix offers:
o
Transparent out-of-core support
o
Physical flexibility
o
Software simplicity
We target Pregelix to be an open-source
production system, rather than just a
research prototype:
o
http://pregelix.ics.uci.edu
 
Q & A
Slide Note
Embed
Share

Explore how Pregelix on a dataflow engine enables efficient processing of large graphs, providing insights on its system architecture, experimental results, and related work in graph analytics. Understand Pregel semantics, programming model, APIs, and graph mutations for effective analysis of big data graphs.

  • Big Graphs
  • Dataflow Engine
  • Graph Analytics
  • Pregelix
  • Pregel Semantics

Uploaded on Sep 21, 2024 | 0 Views


Download Presentation

Please find below an Image/Link to download the presentation.

The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. Download presentation by click this link. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.

E N D

Presentation Transcript


  1. Pregelix: Big(ger) Graph Analytics on A Dataflow Engine Yingyi Bu (UC Irvine) Joint work with: Vinayak Borkar (UC Irvine) , Michael J. Carey (UC Irvine), Tyson Condie (UCLA), Jianfeng Jia (UC Irvine)

  2. Outline Introduction Pregel Semantics The Pregel Logical Plan The Pregelix System Experimental Results Related Work Conclusions

  3. Introduction Big Graphs are becoming common o web graph o social network o ......

  4. Introduction How Big are Big Graphs? o Web: 8.53 Billion pages in 2012 o Facebook active users: 1.01 Billion o de Bruijn graph: 3 Billion nodes o ...... Weapons for mining Big Graphs o Pregel (Google) o Giraph (Facebook, LinkedIn, Twitter, etc.) o Distributed GraphLab (CMU) o GraphX (Berkeley)

  5. Programming Model Think like a vertex o receive messages o update states o send messages

  6. Programming Model Vertex public abstract class Vertex<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> implements Writable{ public abstract void compute(Iterator<M> incomingMessages); ....... } Helper methods o sendMsg(I vertexId, M msg) o voteToHalt() o getSuperstep()

  7. More APIs Message Combiner o Combine messages o Reduce network traffic Global Aggregator o Aggregate statistics over all live vertices o Done for each iteration Graph Mutations o Add vertex o Delete vertex o A conflict resolution function

  8. Pregel Semantics Bulk-synchronous o A global barrier between iterations Compute invocation o Once per active vertex in each superstep o A halted vertex is activated when receiving messages Global halting o Each vertex is halted o No messages are in flight Graph mutations o Partial ordering of operations o User-defined resolve function

  9. Process-centric runtime superstep:3 halt: false master Vertex { id: 1 halt: false value: 3.0 edges: (3,1.0), (4,1.0) } Vertex { id: 3 halt: false value: 3.0 edges: (2,1.0), (3,1.0) } Vertex { id: 2 halt: false value: 2.0 edges: (3,1.0), (4,1.0) } Vertex{ id: 4 halt: false value: 1.0 edges: (1,1.0) } <3, 1.0> <2, 3.0> <4, 3.0> <5, 1.0> worker-1 worker-2 message <id, payload> control signal

  10. Issues and Opportunities Out-of-core support I m trying to run the sample connected components algorithm on a large data set on a cluster, but I get a java.lang.OutOfMemoryError: Java heap space error. 26 similar threads on Giraph-users mailing list during the past year!

  11. Issues and Opportunities Physical flexibility o PageRank, SSSP, CC, Triangle Counting o Web graph, social network, RDF graph o 8 machine school cluster, 200 machine Facebook data center One-size fits-all?

  12. Issues and Opportunities Software simplicity Pregel GraphLab Giraph Hama ...... Vertex/map/msg data structures Task scheduling Memory management Message delivery Network management

  13. The Pregelix Approach Relation Schema Vertex (vid, halt, value, edges) (vid, payload) Msg (halt, aggregate, superstep) GS vid payload 3.0 2 4 5 1 3.0 1.0 1.0 msg value halt edges vid vid=vid Msg 3.0 3.0 (3,1.0),(4,1.0) (2,1.0),(3,1.0) 1 false false NULL 1.0 1.0 3 edges vid 2 4 1 false 3.0 (3,1.0),(4,1.0) 3 false 3.0 (2,1.0),(3,1.0) halt false false value 2.0 1.0 NULL NULL NULL false 2.0 false 1.0 5 2 4 (3,1.0),(4,1.0) (1,1.0) (3,1.0),(4,1.0) (1,1.0) 3.0 3.0 Vertex

  14. Pregel UDFs compute o Executed at each active vertex in each superstep combine o Aggregation function for messages aggregate o Aggregate function for the global states resolve o Used to resolve graph mutations

  15. Logical Plan D7 vidcombine Vertexi+1 Msgi+1 Flow Data D3 D2 D2 Vertex tuples D4,D5,D6 D3 Msg tuples UDF Call (compute) D7 Msg tuples after combination D1 (V.halt =false || M.payload != NULL) M.vid=V.vid Msgi(M) Vertexi(V)

  16. Logical Plan Flow Data GSi+1 D4 The global halting state contribution D10 D8 D9 superstep=G.superstep+1 D5 Values for aggregate Agg(aggregate) Agg(bool-and) D4 D8 The global halt state D5 D9 The global aggregate value D2,D3,D6 UDF Call (compute) GSi(G) D10 The increased superstep D1 Vertexi+1 Flow Data D6 Vertex tuples for deletions and insertions vid(resolve) D6 D2,D3,D4,D 5 UDF Call (compute) D1

  17. The Pregelix System Pregel Physical Plans Vertex/map/msg data structures Operators Access methods Task scheduling Record/Index management Task scheduling Memory management Buffer Data exchanging management Message delivery Connection management Network management A general purpose parallel dataflow engine

  18. The Runtime Runtime Choice? Hyracks Hadoop The Hyracks data-parallel execution engine o Out-of-core operators o Connectors o Access methods o User-configurable task scheduling o Extensibility

  19. Parallelism msg value halt edges vid msg halt value edges vid 2 4 3.0 3.0 (3,1.0),(4,1.0) (2,1.0),(3,1.0) 1 false false NULL 1.0 1.0 (3,1.0),(4,1.0) (1,1.0) 3.0 3.0 2.0 1.0 false false 3 5 NULL NULL NULL vid=vid vid=vid vid msg edges msg edges vid halt false false value 2.0 1.0 Vertex-1 halt false false value 3.0 3.0 Vertex-2 vid 2 4 vid 1 3 (3,1.0) (4,1.0) (1,1.0) (3,1.0) (4,1.0) (2,1.0),(3,1.0) 3.0 3.0 1.0 2 4 3 5 1.0 Msg-2 Msg-1 msg vid vid msg 1.0 3.0 3.0 1.0 3 4 2 5 output-Msg-2 output-Msg-1 Worker-1 Worker-2

  20. Physical Choices Vertex storage B-Tree LSM B-Tree Group-by o Pre-clustered group-by o Sort-based group-by o HashSort group-by Data redistribution o m-to-n merging partitioning connector o m-to-n partitioning connector Join o Index Full outer join o Index Left outer join

  21. Data Storage Vertex o Partitioned B-tree or LSM B-tree Msg o Partitioned local files, sorted GS o Stored on HDFS o Cached in each worker

  22. Physical Plan: Message Combination vidcombine vidcombine vidcombine vidcombine vidcombine vidcombine (Sort-based) (Sort-based) (HashSort) (Sort-based) (HashSort) (HashSort) vidcombine vidcombine vidcombine (Sort-based) vidcombine vidcombine (HashSort) vidcombine (Sort- based) (HashSort) (Sort-based) (HashSort) Sort-Groupby-M-to-N-Partitioning HashSort-Groupby-M-to-N-Partitioning vidcombine vidcombine vidcombine vidcombine vidcombine vidcombine (Preclustered ) (Preclustered ) (Preclustered ) (Preclustered ) (Preclustered ) (Preclustered ) vidcombine vidcombine vidcombine (Sort-based) vidcombine vidcombine (HashSort) vidcombine (Sort- based) (HashSort) (Sort-based) (HashSort) Sort-Groupby-M-to-N-Merge-Partitioning HashSort-Groupby-M-to-N-Merge-Partitioning M-to-N Partitioning Connector M-To-N Partitioning Merging Connector

  23. Physical Plan: Message Delivery D12 Function Call (NullMsg) D2 -- D6 Vidi+1 (halt = false) UDF Call (compute) D1 D11 D2 -- D6 UDF Call (compute) (V.halt = false || M.paylod != NULL) D1 Index Left Outer Join Index Full Outer Join Merge (choose()) M.vid=I.vid M.vid=V.vid M.vid=V.vid Msgi(M) Vertexi(V) Msgi(M) Vidi(I) Vertexi(V)

  24. Caching Pregel, Giraph, GraphLab all have caches for this kind of iterative jobs. What do you do for caching? Iteration-aware (sticky) scheduling? o 1 Loc: location constraints Caching of invariant data? o B-tree buffer pool -- customized flushing policy: never flush dirty pages o File system cache -- free

  25. Experimental Results Setup o Machines a UCI cluster ~ 32 machines 4 cores, 8GB memory, 2 disk drives. o Datasets Yahoo! webmap (1,413,511,393 vertice, adjacency list, ~70GB) and its samples. The Billions of Tuples Challenge dataset (172,655,479 vertices, adjacency list, ~17GB), its samples, and its scale-ups. o Giraph Latest trunk (revision 770) 4 vertex computation threads, 8GB JVM heap

  26. Execution Time In-memory In-memory Out-of-core Out-of-core

  27. Execution Time In-memory In-memory Out-of-core Out-of-core

  28. Execution Time In-memory In-memory Out-of-core Out-of-core

  29. Parallel Speedup

  30. Parallel Scale-up

  31. Throughput

  32. Plan Flexibility In-memory Out-of-core 15x

  33. Software Simplicity Lines-of-Code o Giraph: 32,197 o Pregelix: 8,514

  34. More systems

  35. More Systems

  36. Related Work Parallel Data Management o Gama, GRACE, Teradata o Stratosphere (TU Berlin) o REX (UPenn) o AsterixDB (UCI) Big Graph Processing Systems o Pregel (Google) o Giraph (Facebook, LinkedIn, Twitter, etc.) o Distributed GraphLab (CMU) o GraphX (Berkeley) o Hama (Sogou, etc.) --- Too slow!

  37. Conclusions Pregelix offers: o Transparent out-of-core support o Physical flexibility o Software simplicity We target Pregelix to be an open-source production system, rather than just a research prototype: o http://pregelix.ics.uci.edu

  38. Q & A

More Related Content

giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#