Dynamic Data Management Systems in Agile Views

Agile Views in a
Dynamic Data Management System
Oliver Kennedy
1+3
, 
Yanif Ahmad
2
, Christoph Koch
1
, 
{oliver.kennedy
,
EPFL
1
,
 Johns Hopkins University
2
, Cornell University
3
christoph.koch}@epfl.chyanif@jhu.edu
Large, Dynamic Data
User and enterprise-generated data and analytics are gaining popularity
Generating 
data has never been easier
Applications are often 
long-running
, and supported by 
evolving 
datasets
Application examples
Algorithmic trading on order books
Log analysis (web servers, search engines, clickstreams, etc.)
Status feeds (FB, Twitter, etc.)
Enterprise monitoring: workflow and infrastructure auditing and analysis
Challenge: databases are poor at handling many modifications on big data
We have many techniques to improve query processing,
what about update processing?
Today’s Approaches to Large, Dynamic Data
Scale out on dataset size:
Map-reduce, key-value stores
Communities build 
lightweight
 engines,
trading off consistency for scalability
Benefit from low-latency processing!
Scale up on dataset evolution:
Stream, complex event engines
Non-standard constructs (windows,
formal language)
These apps have large state!
Parallel DBMS, MPP
Main-memory DBMS
Standard DBMS
Stream engines
3. LHC (monitoring)
1. Algorithmic trading
4. LHC (analysis)
5. Google Caffeine
2. Algo simulation and
backtesting
6. Google crawler
and indexer
Architectures:
Applications:
1
2
3
4
5
6
Today’s Approaches to Large, Dynamic Data
Scale out on dataset size:
Map-reduce, key-value stores
Communities build 
lightweight
 engines,
trading off consistency for scalability
Benefit from low-latency processing!
Scale up on dataset evolution:
Stream, complex event engines
Non-standard constructs (windows,
formal language)
These apps have large state!
Parallel DBMS, MPP
Main-memory DBMS
Standard DBMS
Stream engines
3. LHC (monitoring)
1. Algorithmic trading
4. LHC (analysis)
5. Google Caffeine
2. Algo simulation and
backtesting
6. Google crawler
and indexer
Architectures:
Applications:
Ideal
1
3
4
5
2
6
The DBToaster Project
DBToaster
Incremental data management
Discover deep properties of queries,
exploit them for scalability
Concerted focus on being incremental
throughout the system design
Compilation and database synthesis
Automate the construction of minimal,
lightweight data management tools
Related work: 
Starburst, Exodus, Genesis, compiling
holistic plans [Arumugam et al. ‘10, Krikellas et al. ‘10],
language embeddings [Meijer et al. ‘06, Grust et al. ‘10]
DBToaster: a simple, lightweight system to
handle large, dynamic datasets
(a 
D
ynamic 
D
ata 
M
anagement 
S
ystem)
The DBToaster Project
DBToaster
Incremental data management
Discover deep properties of queries,
exploit them for scalability
Concerted focus on being incremental
throughout the system design
Compilation and database synthesis
Automate the construction of minimal,
lightweight data management tools
Related work: 
Starburst, Exodus, Genesis, compiling
holistic plans [Arumugam et al. ‘10, Krikellas et al. ‘10],
language embeddings [Meijer et al. ‘06, Grust et al. ‘10]
DBToaster: a simple, lightweight system to
handle large, dynamic datasets
(a 
D
ynamic 
D
ata 
M
anagement 
S
ystem)
High-Frequency Orderbook Trading
High-frequenc
y
 algorithmic trading on order books
Q1/2009: 73% of trading volume of US equities
$13.4B spent worldwide on trading infrastructure, 41% in US  
[TABB Group, Sep ’10]
Order book trading
Actions:
Algos: insertions,
deletions of bid and
ask orders
Exchange: matches
bids and asks
Queries:
Algo strategies
Exchange simulation
for backtesting
Electronic exchange
(e.g., NYSE, NASDAQ)
Bid order book (buyers)
Ask order book (sellers)
High-Frequency Orderbook Trading
Electronic exchange
(e.g., NYSE, NASDAQ)
Bid order book (buyers)
Ask order book (sellers)
Order book trading
Actions:
Algos: insertions,
deletions of bid and
ask orders
Exchange: matches
bids and asks
Queries:
Algo strategies
Exchange simulation
for backtesting
SPEs are unsuitable for general, dynamic, updates 
[Ahmad&Koch
 ’09, 
Ghanem et al. ’10]
Windows couple scoping and manipulation 
[Botan et al.
 
’10]
High-Frequency Orderbook Trading
Electronic exchange
(e.g., NYSE, NASDAQ)
Bid order book (buyers)
Ask order book (sellers)
No window can
enclose all updates
No “snapshot” queries
Lagged OLAP and
nested aggregates
SPEs rely on main-
memory DBMS
techniques
SPEs are unsuitable for general, dynamic, updates 
[Ahmad&Koch
 ’09, 
Ghanem et al. ’10]
Windows couple scoping and manipulation 
[Botan et al.
 
’10]
High-Frequency Orderbook Trading
Electronic exchange
(e.g., NYSE, NASDAQ)
Bid order book (buyers)
Ask order book (sellers)
Traits of DDMS apps:
Access to the data is
primarily by monitoring
views and simple
computations on them.
 
The data store is rarely
directly accessed.
Materialized view
maintenance dominates
ad-hoc querying.
There is usually no
feedback loop.
SPEs are unsuitable for general, dynamic, updates 
[Ahmad&Koch
 ’09, 
Ghanem et al. ’10]
Windows couple scoping and manipulation 
[Botan et al.
 
’10]
Related Work: Incremental View Maintenance
A view can be thought of as:
a continuous query over an arbitrary database,
independent of how that database is manipulated
Well-studied from 1980s-today:
[Roussopoulos, 1991; Yan and Larson, 1995;
 Colby et al, 1996; Kotidis,Roussopoulos, 2001;
 Zhou et al, 2007]
Incremental view maintenance
Mechanism: delta queries
Delta queries are derived by a rewrite
Deltas queries, 
just like any other query
,
are processed from scratch
Delta
query
Delta tables
Base tables
This Talk: Agile Views
Observation: delta queries are 
still queries
, and amenable to further
(delta) rewrites, leading to 
higher-level
 deltas
Proposal: agile views, for as-incremental-as-possible evaluation
Materialize higher-level deltas, maintain and reuse them
Provide as lightweight data structures to apps, for in-process analytics
Our contributions: a 
recursive delta compilation
 algorithm yielding a
fully incremental query processor
k-level delta is simpler (i.e. has lower 
degree
) than (k-1)-level delta
Each higher-level delta is materialized as a in-memory map, k-level map
is used to maintain (k-1)-level map
on insert S(
@c
,
@d
):
qS[][c] = ∆
S
(q)
 
= select c, sum(a*
@e
)
 
from R, 
values(@d,@e)
 
where b < 
@d
 
=
 
select c, 
@e
*v from
  
(select c, sum(a) as v from R
 
 where b < 
@d
 group by c)
Delta Queries
q[][c] =
 
select c, sum(a*e) from R, S
 
where b < d group by c
m
S
[d][c]
We consider sum aggregate queries
Delta queries can be represented as
parameterized SQL
Includes parameters or 
input 
variables
Produces group-bys or 
output 
variables,
and a value
Similar to bound-free annotations in
Datalog, and binding patterns for data
integration
Delta transform: standard IVM method,
with nesting
Schema: R(a,b), S(c,d)
Incremental Trigger Programs
Our query language can compose parameterized SQL, propagate
parameters, and define triggers
Triggers are a sequence of statements
Each statement is a map update by a (materialized) delta query
Statements may include loops, which are embarrassingly parallelizable
select l.ordkey, o.sprior,
  
sum(l.extprice)
from Orders o, Lineitem l
where l.ordkey = o.ordkey
group by l.ordkey, o.sprior
on_insert_lineitem(ok,ep) { … }
on_insert_order(ck,ok,sp) {
m[][ok, sp] += mco[][ok] * mo[][ck];
ml[][ok, sp] += mo[][ck];
mc[][ck, ok, sp] += mco[][ok];
mcl[][ck, ok, sp] += 1;
}
Schema: 
 
Customer(custkey, name, nationkey, acctbal), Lineitem(ordkey,extprice),
  
Orders(custkey,ordkey,sprior)
Compiling to Incremental Programs
q = select l.ordkey, o.sprior,
  
sum(l.extprice)
 
from  Lineitem l, Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior;
Compiling to Incremental Programs
Level 1:
+L(@ok,@ep)         : m[][ordkey,sprior] += ∆L(q);
+O(@ck2,@ok2,@sp)   : m[][ordkey,sprior] += ∆O(q);
q = select l.ordkey, o.sprior,
  
sum(l.extprice)
 
from  Lineitem l, Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior;
Compiling to Incremental Programs
Level 1:
+L(@ok,@ep):
m[][ordkey,sprior] +=
 
select l.ordkey, o.sprior,
   
sum(l.extprice)
 
from 
values(@ok,@ep)
   
as l(ordkey,extprice)
,
   
Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior
q = select l.ordkey, o.sprior,
  
sum(l.extprice)
 
from  Lineitem l, Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior;
Compiling to Incremental Programs
Level 1:
+L(@ok,@ep):
m[][ordkey,sprior] +=
 
select l.ordkey, o.sprior,
   
sum(l.extprice)
 
from 
values(@ok,@ep)
   
as l(ordkey,extprice)
,
   
Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior
q = select l.ordkey, o.sprior,
  
sum(l.extprice)
 
from  Lineitem l, Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior;
select 
@ok
, sprior, 
@ep
*v
from 
(select o.sprior, sum(1) as v
 from  Orders o
 where 
@ok
 = o.ordkey
 group by o.sprior)
simplify
=>
ml[][
@ok
,sprior]
Compiling to Incremental Programs
q = select l.ordkey, o.sprior,
  
sum(l.extprice)
 
from  Lineitem l, Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior;
Level 1:
+L(@ok,@ep):
m[][
@ok
,
sprior
] += 
@ep 
* ml[][
@ok
,
 sprior
];
Compiling to Incremental Programs
+L(@ok,@ep):
m[][
@ok
,
sprior
] += 
@ep 
* ml[][
@ok
,
 sprior
];
In this talk, we’ll use our syntax with implicit for loops everywhere:
q = select l.ordkey, o.sprior,
  
sum(l.extprice)
 
from  Lineitem l, Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior;
Level 1:
+L(@ok,@ep):
foreach sprior in ml:
   m[][
@ok
,
sprior
] += 
@ep 
* ml[][
@ok
,
 sprior
];
Compiling to Incremental Programs
q = select l.ordkey, o.sprior,
  
sum(l.extprice)
 
from  Lineitem l, Orders o
 
where l.ordkey = o.ordkey
 
group by l.ordkey, o.sprior;
Level 1:
+L(@ok,@ep): m[][
@ok
,
sprior
] += 
@ep 
* ml[][
@ok
,
 sprior
];
We must recursively maintain this map!
Compiling to Incremental Programs
ql
 = select o.sprior, sum(1) as v
 
 from  Orders o
 
 where 
@ordkey
 = o.ordkey
 
 group by o.sprior
Level 2:
+O(@ck2,@ok2,@sp)   : ml[][ordkey,sprior] += ∆O(ql);
Level 1:
+L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior];
Compiling to Incremental Programs
+L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior];
Level 2:
+O(@ck2,@ok2,@sp) : ml[][ordkey,sprior] +=
select o.sprior, sum(1)
from  
values(@ck2,@ok2,@sp)
 
 as
o(custkey,ordkey,sprior)
where @ordkey = o.ordkey
group by o.sprior
select 
@sp
, 1
where @ordkey = 
@ok2
group by 
@sp
=>
ql
 = select o.sprior, sum(1) as v
 
 from  Orders o
 
 where @ordkey = o.ordkey
 
 group by o.sprior
Compiling to Incremental Programs
+L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior];
Level 2:
+O(@ck2,@ok2,@sp) : ml[][
@ok2
,
 @sp
] += 1;
ql
 = select o.sprior, sum(1) as v
 
 from  Orders o
 
 where @ordkey = o.ordkey
 
 group by o.sprior
Independent of database!
Compilation terminates
Recursive Delta Compilation
Compilation terminates:
For a query q:
deg(∆(q)) = max(0, deg(q)-1)
(think degree of polynomial
when taking derivatives)
Deltas are eventually
independent of the database,
and depend only on the
update, i.e. for some k:
 
 
deg(∆
k
(q)) = 0
on_insert_lineitem(ordkey, extprice)
{
  q[][ordkey, sprior] +=
    extprice*ql[][ordkey, sprior];
  qo[][ordkey] += extprice;
}
on_insert_order(
  custkey, ordkey, sprior)
{
  q[][ordkey, sprior] +=
    qo[][ordkey];
  ql[][ordkey, sprior] += 1;
}
Recursive Delta Compilation
on_insert_customer(ck,nm,nk,bal) :
m[][ordkey, sprior] +=
 
mc[][ck, ordkey, sprior];
ml[][ordkey, sprior] +=
 
mcl[][ck, ordkey, sprior];
mo[][ck] += 1;
on_insert_lineitem(ok,ep) :
m[][ok, sprior] +=
 
ep * ml[][ok, sprior];
mc[][custkey, ok, sprior] +=
 
ep * mcl[][custkey, ok, sprior];
mco[][ok] += ep;
on_insert_order(ck,ok,sp) :
m[][ok, sp] += mco[][ok] * mo[][ck];
ml[][ok, sp] += mo[][ck];
mc[][ck, ok, sp] += mco[][ok];
mcl[][ck, ok, sp] += 1;
select l.ordkey, o.sprior,
  
sum(l.extprice)
from Customer c, Orders o,
 
Lineitem l
where c.custkey = o.custkey
and   l.ordkey = o.ordkey
group by l.ordkey, o.sprior;
Compilation Framework (v.4)
Preaggregation
Delta rewrite
Simplification
Materialization
Trigger
construction
Databases as rings
: incremental program construction
Low-level engine
compilation (v.3)
Parser
Assume Q1, Q2,Q3 are SPJ-queries:
1. Recursively polynomial form: 
  
2. Nested aggregate normal form:
Handles nested
query deltas
Minimizes
propagation
Unifies,
eliminates
variables
Map
declaration
Initial value
computation
(select x, sum(v) from (select x,y,sum(t) as v from Q1) g.b. x)
*
 (((select sum(t) from Q2 where 
f(x,…)
 ≤ 0) < 0)
1
2
1
(select x, sum(t) from Q1 g.b. x) 
* 
(select y, sum(t) from Q2 g.b. y) 
+
 (select x, y sum(t) from Q3 g.b. x,y)
Ongoing Work: Low-Level Query Evaluation
Challenge:  create a customized, lightweight engine to evaluate delta queries
How do we incorporate PL and compiler research to specialize?
Our approach: use a simple functional language for lower-level plans
A whole-query representation facilitating powerful transformations of physical
aspects of query processing (pipelining vs. blocking, memoization, etc.)
Our functional language: 
K3
, supports structural recursion 
[Buneman et al. ‘95]
Functional primitives on nested collections, forming a monad
Shown to be as expressive as nested relational algebra and comprehensions (e.g.
LINQ, Ferry 
[Meijer et al. ‘06, Grust et al. ‘10]
)
Core primitives: map, flatten, aggregate (w/ group-bys), persistent collections
“Structural” Plans with Nested Collections
Schema: 
 
R(a,b), S(c,d,e), T(f,g)
Query: 
 
SELECT sum(a*g) FROM R,S,T;
aggregate(fun < <x,y>, z>. (x*y)+z, 0,
 flatten(
  map(fun <x>. map(fun <f,g>.<x,g>, T[][fg]),
      flatten(
       map(fun <a,b>. map(fun <c,d,e>.<a>, S[][cde]),
           R[][ab])))))
K3 unoptimized:
R
S
T
“Structural” Plans with Nested Collections
Schema: 
 
R(a,b), S(c,d,e), T(f,g)
Query: 
 
SELECT sum(a*g) FROM R,S,T;
aggregate(fun < <x,y>, z>. (x*y)+z, 0,
 flatten(
  map(fun <x>. map(fun <f,g>.<x,g>, T[][fg]),
      flatten(
       map(fun <a,b>. map(fun <c,d,e>.<a>, S[][cde]),
           R[][ab])))))
K3 unoptimized:
aggregate(fun < <x,y>, z>. (x*y)+z, 0,
 flatten(flatten(
   map(fun <a,b>. map(fun <c,d,e>. map(fun <f,g>. <a,g>,
       T[][fg]), S[][cde]), R[][ab]))))
K3 optimized:
R
S
T
R
S
T
K3 provides a clean framework for mixing tuple- and set-at-a-time processing: eliminates
temporaries (intermediates) via function composition and inlining, and abstracts:
i) blocking vs. pipelining via map primitive, ii) 1NF construction with flatten
Storage
Challenge: design external map layouts,
and data structures based on incremental
program access workload
K3 provides a unified representation of
query temporary and storage layouts
Abstracts implementation of nesting, e.g. via
pointers in main-memory or
clustering/linearization
Many benefits:  simpler DB internals,
common optimizer framework for queries and
layouts
Other issues:
Language support for index representations
(via variant datatypes) and denormalization
aggregate(fun < <x,y>, z>. (x*y)+z, 0,
 flatten(flatten(
  map(fun <a,b>. map(fun <c,d,e>.
   map(fun <f,g>. <a,g>,
   T[][fg]), S[][cde]), R[][ab]))))
Storage
Challenge: design external map layouts,
and data structures based on incremental
program access workload
K3 provides a unified representation of
query temporary and storage layouts
Abstracts implementation of nesting, e.g. via
pointers in main-memory or
clustering/linearization
Many benefits:  simpler DB internals,
common optimizer framework for queries and
layouts
Other issues:
Language support for index representations
(via variant datatypes) and denormalization
aggregate(fun < <x,y>, z>. (x*y)+z, 0,
 flatten(flatten(
  map(fun <a,b>. map(fun <c,d,e>.
   map(fun <f,g>. <a,g>,
   T[][fg]), 
S[][cde]
), R[][ab]))))
aggregate(fun < <x,y>, z>. (x*y)+z, 0,
 flatten(flatten(
  map(fun <a,b>. map(fun <c,d,e>.
   map(fun <f,g>. <a,g>,
   T[][fg]),
   disk{
    map(fun <e>. map(fun <c,d>. <c,d,e>,
       S.CD[][cd]), S.E[][e])
   })
,
   R[][ab]))))
Storage section annotation
Ongoing Work: Incremental Processing in the Cloud
Cumulus
: facilitates large, shared-nothing main-
memory trigger processing
Exploit embarrassingly parallelizable trigger
programs
Map entry updates in a trigger are independent
Subclass of our query language (no nesting, only
equijoins) yields programs with NC0 parallel
complexity [Koch ‘10]
Consistency requirement: prior state must be
consistent before processing an update
Naïve approach: serial, atomic trigger execution
Our approach: bounded out-of-order processing
Computation performed on out-of-order events is
simple, 
just evaluate delta queries
Easy to realize eventual consistency via loosely
coordinated, distributed incremental processing
DBToaster Cumulus: a distributed
main-memory  DDMS
Incremental Processing in the Cloud
Trigger:
on_insert_customer(ck,nm,nk,bal) {
m[][
ordkey
, 
sprior
] +=
 
  m_c[][ck, 
ordkey
, 
sprior
];
...
}
Trace:
on_insert_customer(1,2,3,4):
  m[][1,2] += m_c...
  m[][3,3] += m_c...
  ...
on_insert_customer(5,6,7,8):
  m[][1,2] += m_c...
  m[][3,3] += m_c...
  ...
Cumulus
: facilitates large, shared-nothing main-
memory trigger processing
Exploit embarrassingly parallelizable trigger
programs
Map entry updates in a trigger are independent
Subclass of our query language (no nesting, only
equijoins) yields programs with NC0 parallel
complexity [Koch ‘10]
Consistency requirement: prior state must be
consistent before processing an update
Naïve approach: serial, atomic trigger execution
Our approach: bounded out-of-order processing
Computation performed on out-of-order events is
simple, 
just evaluate delta queries
Easy to realize eventual consistency via loosely
coordinated, distributed incremental processing
must consistently
use prior state
parallel execution
within trigger
Incremental Processing in the Cloud
Cumulus
: facilitates large, shared-nothing main-
memory trigger processing
Exploit embarrassingly parallelizable trigger
programs
Map entry updates in a trigger are independent
Subclass of our query language (no nesting, only
equijoins) yields programs with NC0 parallel
complexity [Koch ‘10]
Consistency requirement: prior state must be
consistent before processing an update
Naïve approach: serial, atomic trigger execution
Our approach: bounded out-of-order processing
Computation performed on out-of-order events is
simple, 
just evaluate delta queries
Easy to realize eventual consistency via loosely
coordinated, distributed incremental processing
Summary
Many applications work with large, evolving datasets made up of
both short- and long-lived items. This necessitates agile dynamic data
management techniques.
Three takeaways:
Agile views, lightweight data structures for apps, maintained 
as
incrementally as possible
QP engine based on a functional language, to 
mix tuple-at-a-time and
set-at-a-time
 processing, and specialize physical representations
With an incremental core, we can develop 
loosely co-ordinated
distributed systems mechanisms more easily
We’re building 
DBToaster
, a prototype DDMS, at EPFL and JHU
Language embedding, multicore, one-pass batch processing
Introducing the JHU
Data Management Systems Lab
http://damsel.cs.jhu.edu
Current projects: DBToaster,  declarative query optimization
Collaborators: Randal Burns, Jason Eisner, Alex Szalay, Andreas Terzis
Interests: dynamic data, declarative languages, scientific data management, data intensive computing
JHU research highlights:
Dyna: weighted logic programming for NLP, and generally AI algorithms
Data-intensive architectures:
 
DataScope (5 PB NSF MRI grant), GrayWulf (SC 08 Storage winner, 70GB/s), Amdahl blades
Scientific datasets: PanSTARRS & SDSS, Turbulence (27TB), Life Under Your Feet, DC Genome
NEW
, Sigmod Record ‘11 article:
“Scientific Data Management at the Johns Hopkins Institute for Data Intensive Engineering and Science”
Questions?
Slide Note
Embed
Share

Large, dynamic data user and enterprise-generated data are increasingly popular, leading to the need for better data management systems. Today's approaches involve handling evolving datasets, algorithmic trading, log analysis, and more. The DBToaster project focuses on lightweight systems for managing large, dynamic datasets efficiently.

  • Data Management
  • Dynamic Data
  • Agile Views
  • Large Datasets
  • DBToaster

Uploaded on Sep 17, 2024 | 0 Views


Download Presentation

Please find below an Image/Link to download the presentation.

The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. Download presentation by click this link. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.

E N D

Presentation Transcript


  1. Agile Views in a Dynamic Data Management System Oliver Kennedy1+3, Yanif Ahmad2, Christoph Koch1 yanif@jhu.edu, {oliver.kennedy,christoph.koch}@epfl.ch EPFL1, Johns Hopkins University2, Cornell University3

  2. Large, Dynamic Data User and enterprise-generated data and analytics are gaining popularity Generating data has never been easier Applications are often long-running, and supported by evolving datasets Application examples Algorithmic trading on order books Log analysis (web servers, search engines, clickstreams, etc.) Status feeds (FB, Twitter, etc.) Enterprise monitoring: workflow and infrastructure auditing and analysis Challenge: databases are poor at handling many modifications on big data We have many techniques to improve query processing, what about update processing?

  3. Todays Approaches to Large, Dynamic Data Applications: Dataset size or rate Architectures: Exa 4 1. Algorithmic trading 2. Algo simulation and backtesting Stream engines Main-memory DBMS Peta 6 2 Parallel DBMS, MPP 5 Tera 3. LHC (monitoring) 4. LHC (analysis) Standard DBMS 1 3 Giga 5. Google Caffeine 6. Google crawler and indexer Data ingestion frequency << 1 s Minutes Nightly Monthly Seconds Hourly Weekly Scale out on dataset size: Scale up on dataset evolution: Map-reduce, key-value stores Stream, complex event engines Communities build lightweight engines, trading off consistency for scalability Non-standard constructs (windows, formal language) Benefit from low-latency processing! These apps have large state!

  4. Todays Approaches to Large, Dynamic Data Applications: Dataset size or rate Architectures: Exa 4 1. Algorithmic trading 2. Algo simulation and backtesting Stream engines Main-memory DBMS Peta 6 2 Parallel DBMS, MPP 5 Tera 3. LHC (monitoring) 4. LHC (analysis) Standard DBMS Ideal 13 Giga 5. Google Caffeine 6. Google crawler and indexer Data ingestion frequency << 1 s Minutes Nightly Monthly Seconds Hourly Weekly Scale out on dataset size: Scale up on dataset evolution: Map-reduce, key-value stores Stream, complex event engines Communities build lightweight engines, trading off consistency for scalability Non-standard constructs (windows, formal language) Benefit from low-latency processing! These apps have large state!

  5. The DBToaster Project Dataset size or rate DBToaster: a simple, lightweight system to handle large, dynamic datasets (a Dynamic Data Management System) Exa Peta Tera Giga DBToaster Data ingestion frequency << 1 s Minutes Nightly Monthly Seconds Hourly Weekly Incremental data management Compilation and database synthesis Discover deep properties of queries, exploit them for scalability Automate the construction of minimal, lightweight data management tools Concerted focus on being incremental throughout the system design Related work: Starburst, Exodus, Genesis, compiling holistic plans [Arumugam et al. 10, Krikellas et al. 10], language embeddings [Meijer et al. 06, Grust et al. 10]

  6. The DBToaster Project Dataset size or rate DBToaster: a simple, lightweight system to handle large, dynamic datasets (a Dynamic Data Management System) Exa Peta Tera Giga DBToaster Data ingestion frequency << 1 s Minutes Nightly Monthly Seconds Hourly Weekly Incremental data management Compilation and database synthesis Discover deep properties of queries, exploit them for scalability Automate the construction of minimal, lightweight data management tools Concerted focus on being incremental throughout the system design Related work: Starburst, Exodus, Genesis, compiling holistic plans [Arumugam et al. 10, Krikellas et al. 10], language embeddings [Meijer et al. 06, Grust et al. 10]

  7. High-Frequency Orderbook Trading High-frequency algorithmic trading on order books Q1/2009: 73% of trading volume of US equities $13.4B spent worldwide on trading infrastructure, 41% in US [TABB Group, Sep 10] t = timestamp oid = order id bid = broker id p = price v = volume Order book trading Actions: Algos: insertions, deletions of bid and ask orders Exchange: matches bids and asks Bid order book (buyers) Ask order book (sellers) t oi d bi d p v t oi d bi d p v 252603 5 36721 NITE 18400 1500 252634 5 36750 GSCO 18400 1000 0 0 Electronic exchange (e.g., NYSE, NASDAQ) 252669 0 36909 MASH 18320 200 252738 9 37002 MSCO 18520 200 Queries: Algo strategies Exchange simulation for backtesting 0 0 252754 3 37001 MSCO 18270 3000 252792 8 37006 GSCO 18610 500 0 0 252832 1 37008 GSCO 18250 500 252889 4 37020 NITE 18680 500 0 0 252903 2 37011 FBCO 18190 600 252975 8 37032 MASH 18790 700 0 0 7

  8. High-Frequency Orderbook Trading SPEs are unsuitable for general, dynamic, updates [Ahmad&Koch 09, Ghanem et al. 10] Windows couple scoping and manipulation [Botan et al. 10] t = timestamp oid = order id bid = broker id p = price v = volume Order book trading Actions: Algos: insertions, deletions of bid and ask orders Exchange: matches bids and asks Bid order book (buyers) Ask order book (sellers) t oi d bi d p v t oi d bi d p v 252603 5 36721 NITE 18400 1500 252634 5 36750 GSCO 18400 1000 0 0 Electronic exchange (e.g., NYSE, NASDAQ) 252669 0 36909 MASH 18320 200 252738 9 37002 MSCO 18520 200 Queries: Algo strategies Exchange simulation for backtesting 0 0 252792 8 37006 GSCO 18610 500 253057 4 37055 NITE 18300 300 0 0 252754 3 37001 MSCO 18270 3000 253123 0 37075 FBCO 18640 1000 0 0 252832 1 37008 GSCO 18250 500 252889 4 37020 NITE 18680 500 0 0 8 252903 2 37011 FBCO 18190 600 252975 8 37032 MASH 18790 700 0 0

  9. High-Frequency Orderbook Trading SPEs are unsuitable for general, dynamic, updates [Ahmad&Koch 09, Ghanem et al. 10] Windows couple scoping and manipulation [Botan et al. 10] t = timestamp oid = order id bid = broker id p = price v = volume No window can enclose all updates Bid order book (buyers) Ask order book (sellers) t oi d bi d p v t oi d bi d p v No snapshot queries Lagged OLAP and nested aggregates 252603 5 36721 NITE 18400 252634 5 36750 GSCO 18400 1000 1000 0 0 Electronic exchange (e.g., NYSE, NASDAQ) SPEs rely on main- memory DBMS techniques 252669 0 36909 MASH 18320 200 252738 9 37002 MSCO 18520 200 0 0 252792 8 37006 GSCO 18610 500 253057 4 37055 NITE 18300 300 0 0 252754 3 37001 MSCO 18270 3000 253123 0 37075 FBCO 18640 1000 0 0 252832 1 37008 GSCO 18250 500 252889 4 37020 NITE 18680 500 0 0 9 252903 2 37011 FBCO 18190 600 252975 8 37032 MASH 18790 700 0 0

  10. High-Frequency Orderbook Trading SPEs are unsuitable for general, dynamic, updates [Ahmad&Koch 09, Ghanem et al. 10] Windows couple scoping and manipulation [Botan et al. 10] Traits of DDMS apps: Access to the data is primarily by monitoring views and simple computations on them. The data store is rarely directly accessed. t = timestamp oid = order id bid = broker id p = price v = volume Bid order book (buyers) Ask order book (sellers) t oi d bi d p v t oi d bi d p v 252603 5 36721 NITE 18400 252634 5 36750 GSCO 18400 1000 1000 0 0 Electronic exchange (e.g., NYSE, NASDAQ) 252669 0 36909 MASH 18320 200 252738 9 37002 MSCO 18520 200 Materialized view maintenance dominates ad-hoc querying. 0 0 252792 8 37006 GSCO 18610 500 253057 4 37055 NITE 18300 300 0 0 252754 3 37001 MSCO 18270 3000 253123 0 37075 FBCO 18640 1000 There is usually no feedback loop. 0 0 252832 1 37008 GSCO 18250 500 252889 4 37020 NITE 18680 500 0 0 10 700 252903 2 37011 FBCO 18190 600 252975 8 37032 MASH 18790 0 0

  11. Related Work: Incremental View Maintenance holds -14500 A view can be thought of as: a continuous query over an arbitrary database, independent of how that database is manipulated Incremental view maintenance Mechanism: delta queries Delta queries are derived by a rewrite Deltas queries, just like any other query, are processed from scratch Delta query 8 4 2 10 100 0 Base tables t oid bid p v t oi d bi d p v Well-studied from 1980s-today: 4 3 1 100 50 8 4 2 100 10 0 [Roussopoulos, 1991; Yan and Larson, 1995; Colby et al, 1996; Kotidis,Roussopoulos, 2001; Zhou et al, 2007] 2 1 1 120 50 6 5 2 150 30 3 2 1 80 10 0 11 Delta tables 7 6 1 80 50

  12. This Talk: Agile Views Observation: delta queries are still queries, and amenable to further (delta) rewrites, leading to higher-level deltas Proposal: agile views, for as-incremental-as-possible evaluation Materialize higher-level deltas, maintain and reuse them Provide as lightweight data structures to apps, for in-process analytics Our contributions: a recursive delta compilation algorithm yielding a fully incremental query processor k-level delta is simpler (i.e. has lower degree) than (k-1)-level delta Each higher-level delta is materialized as a in-memory map, k-level map is used to maintain (k-1)-level map

  13. Delta Queries We consider sum aggregate queries Schema: R(a,b), S(c,d) Delta queries can be represented as parameterized SQL Includes parameters or input variables Produces group-bys or output variables, and a value Similar to bound-free annotations in Datalog, and binding patterns for data integration q[][c] = select c, sum(a*e) from R, S where b < d group by c on insert S(@c,@d): qS[][c] = S(q) = select c, sum(a*@e) from R, values(@d,@e) where b < @d = select c, @e*v from (select c, sum(a) as v from R where b < @d group by c) Delta transform: standard IVM method, with nesting mS[d][c]

  14. Incremental Trigger Programs Our query language can compose parameterized SQL, propagate parameters, and define triggers Triggers are a sequence of statements Each statement is a map update by a (materialized) delta query Statements may include loops, which are embarrassingly parallelizable Schema: Customer(custkey, name, nationkey, acctbal), Lineitem(ordkey,extprice), Orders(custkey,ordkey,sprior) on_insert_lineitem(ok,ep) { } on_insert_order(ck,ok,sp) { m[][ok, sp] += mco[][ok] * mo[][ck]; ml[][ok, sp] += mo[][ck]; mc[][ck, ok, sp] += mco[][ok]; mcl[][ck, ok, sp] += 1; } select l.ordkey, o.sprior, sum(l.extprice) from Orders o, Lineitem l where l.ordkey = o.ordkey group by l.ordkey, o.sprior

  15. Compiling to Incremental Programs q = select l.ordkey, o.sprior, sum(l.extprice) from Lineitem l, Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior; Query Map signature q m[][ordkey,sprior]

  16. Compiling to Incremental Programs q = select l.ordkey, o.sprior, sum(l.extprice) from Lineitem l, Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior; Query Map signature q m[][ordkey,sprior] Level 1: +L(@ok,@ep) : m[][ordkey,sprior] += L(q); +O(@ck2,@ok2,@sp) : m[][ordkey,sprior] += O(q);

  17. Compiling to Incremental Programs q = select l.ordkey, o.sprior, sum(l.extprice) from Lineitem l, Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior; Query Map signature q m[][ordkey,sprior] Level 1: +L(@ok,@ep): m[][ordkey,sprior] += select l.ordkey, o.sprior, sum(l.extprice) from values(@ok,@ep) as l(ordkey,extprice), Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior

  18. Compiling to Incremental Programs q = select l.ordkey, o.sprior, sum(l.extprice) from Lineitem l, Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior; Query Map signature q m[][ordkey,sprior] ql = L(q) ml[][ordkey,sprior] Level 1: +L(@ok,@ep): m[][ordkey,sprior] += select l.ordkey, o.sprior, sum(l.extprice) from values(@ok,@ep) as l(ordkey,extprice), Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior simplify => select @ok, sprior, @ep*v from (select o.sprior, sum(1) as v from Orders o where @ok = o.ordkey group by o.sprior) ml[][@ok,sprior]

  19. Compiling to Incremental Programs q = select l.ordkey, o.sprior, sum(l.extprice) from Lineitem l, Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior; Query Map signature q m[][ordkey,sprior] ql = L(q) ml[][ordkey,sprior] Level 1: +L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior];

  20. Compiling to Incremental Programs q = select l.ordkey, o.sprior, sum(l.extprice) from Lineitem l, Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior; Query Map signature q m[][ordkey,sprior] ql = L(q) ml[][ordkey,sprior] Level 1: +L(@ok,@ep): foreach sprior in ml: m[][@ok,sprior] += @ep * ml[][@ok, sprior]; In this talk, we ll use our syntax with implicit for loops everywhere: +L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior];

  21. Compiling to Incremental Programs q = select l.ordkey, o.sprior, sum(l.extprice) from Lineitem l, Orders o where l.ordkey = o.ordkey group by l.ordkey, o.sprior; Query Map signature q m[][ordkey,sprior] ql = L(q) ml[][ordkey,sprior] Level 1: +L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior]; We must recursively maintain this map!

  22. Compiling to Incremental Programs ql = select o.sprior, sum(1) as v from Orders o where @ordkey = o.ordkey group by o.sprior Query Map signature q m[][ordkey,sprior] ql = L(q) ml[][ordkey,sprior] Level 1: +L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior]; Level 2: +O(@ck2,@ok2,@sp) : ml[][ordkey,sprior] += O(ql);

  23. Compiling to Incremental Programs ql = select o.sprior, sum(1) as v from Orders o where @ordkey = o.ordkey group by o.sprior Query Map signature q m[][ordkey,sprior] ql = L(q) ml[][ordkey,sprior] +L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior]; Level 2: +O(@ck2,@ok2,@sp) : ml[][ordkey,sprior] += select o.sprior, sum(1) from values(@ck2,@ok2,@sp) as o(custkey,ordkey,sprior) where @ordkey = o.ordkey group by o.sprior select @sp, 1 where @ordkey = @ok2 group by @sp =>

  24. Compiling to Incremental Programs ql = select o.sprior, sum(1) as v from Orders o where @ordkey = o.ordkey group by o.sprior Query Map signature q m[][ordkey,sprior] ql = L(q) ml[][ordkey,sprior] +L(@ok,@ep): m[][@ok,sprior] += @ep * ml[][@ok, sprior]; Level 2: +O(@ck2,@ok2,@sp) : ml[][@ok2, @sp] += 1; Independent of database! Compilation terminates

  25. Recursive Delta Compilation Compilation terminates: on_insert_lineitem(ordkey, extprice) { q[][ordkey, sprior] += extprice*ql[][ordkey, sprior]; For a query q: deg( (q)) = max(0, deg(q)-1) (think degree of polynomial when taking derivatives) qo[][ordkey] += extprice; } Deltas are eventually independent of the database, and depend only on the update, i.e. for some k: deg( k(q)) = 0 on_insert_order( custkey, ordkey, sprior) { q[][ordkey, sprior] += qo[][ordkey]; ql[][ordkey, sprior] += 1; }

  26. Recursive Delta Compilation on_insert_customer(ck,nm,nk,bal) : m[][ordkey, sprior] += mc[][ck, ordkey, sprior]; ml[][ordkey, sprior] += mcl[][ck, ordkey, sprior]; mo[][ck] += 1; select l.ordkey, o.sprior, sum(l.extprice) from Customer c, Orders o, Lineitem l where c.custkey = o.custkey and l.ordkey = o.ordkey group by l.ordkey, o.sprior; on_insert_lineitem(ok,ep) : m[][ok, sprior] += ep * ml[][ok, sprior]; mc[][custkey, ok, sprior] += ep * mcl[][custkey, ok, sprior]; mco[][ok] += ep; on_insert_order(ck,ok,sp) : m[][ok, sp] += mco[][ok] * mo[][ck]; ml[][ok, sp] += mo[][ck]; mc[][ck, ok, sp] += mco[][ok]; mcl[][ck, ok, sp] += 1;

  27. Compilation Framework (v.4) Unifies, eliminates variables Minimizes propagation Handles nested query deltas Initial value computation Map Parser declaration 1 2 1 Trigger construction Preaggregation Delta rewrite Simplification Materialization Databases as rings: incremental program construction Low-level engine compilation (v.3) Assume Q1, Q2,Q3 are SPJ-queries: 1. Recursively polynomial form: (select x, sum(t) from Q1 g.b. x) * (select y, sum(t) from Q2 g.b. y) + (select x, y sum(t) from Q3 g.b. x,y) 2. Nested aggregate normal form: (select x, sum(v) from (select x,y,sum(t) as v from Q1) g.b. x) * (((select sum(t) from Q2 where f(x, ) 0) < 0)

  28. Ongoing Work: Low-Level Query Evaluation Challenge: create a customized, lightweight engine to evaluate delta queries How do we incorporate PL and compiler research to specialize? Our approach: use a simple functional language for lower-level plans A whole-query representation facilitating powerful transformations of physical aspects of query processing (pipelining vs. blocking, memoization, etc.) Our functional language: K3, supports structural recursion [Buneman et al. 95] Functional primitives on nested collections, forming a monad Shown to be as expressive as nested relational algebra and comprehensions (e.g. LINQ, Ferry [Meijer et al. 06, Grust et al. 10]) Core primitives: map, flatten, aggregate (w/ group-bys), persistent collections

  29. Structural Plans with Nested Collections Schema: R(a,b), S(c,d,e), T(f,g) Query: SELECT sum(a*g) FROM R,S,T; K3 unoptimized: aggregate(fun < <x,y>, z>. (x*y)+z, 0, flatten( map(fun <x>. map(fun <f,g>.<x,g>, T[][fg]), flatten( map(fun <a,b>. map(fun <c,d,e>.<a>, S[][cde]), R[][ab]))))) T R S

  30. Structural Plans with Nested Collections Schema: R(a,b), S(c,d,e), T(f,g) Query: SELECT sum(a*g) FROM R,S,T; K3 unoptimized: aggregate(fun < <x,y>, z>. (x*y)+z, 0, flatten( map(fun <x>. map(fun <f,g>.<x,g>, T[][fg]), flatten( map(fun <a,b>. map(fun <c,d,e>.<a>, S[][cde]), R[][ab]))))) T R S K3 optimized: aggregate(fun < <x,y>, z>. (x*y)+z, 0, flatten(flatten( map(fun <a,b>. map(fun <c,d,e>. map(fun <f,g>. <a,g>, T[][fg]), S[][cde]), R[][ab])))) R S T K3 provides a clean framework for mixing tuple- and set-at-a-time processing: eliminates temporaries (intermediates) via function composition and inlining, and abstracts: i) blocking vs. pipelining via map primitive, ii) 1NF construction with flatten

  31. Storage Challenge: design external map layouts, and data structures based on incremental program access workload aggregate(fun < <x,y>, z>. (x*y)+z, 0, flatten(flatten( map(fun <a,b>. map(fun <c,d,e>. map(fun <f,g>. <a,g>, T[][fg]), S[][cde]), R[][ab])))) K3 provides a unified representation of query temporary and storage layouts Abstracts implementation of nesting, e.g. via pointers in main-memory or clustering/linearization Many benefits: simpler DB internals, common optimizer framework for queries and layouts Other issues: Language support for index representations (via variant datatypes) and denormalization

  32. Storage Challenge: design external map layouts, and data structures based on incremental program access workload aggregate(fun < <x,y>, z>. (x*y)+z, 0, flatten(flatten( map(fun <a,b>. map(fun <c,d,e>. map(fun <f,g>. <a,g>, T[][fg]), S[][cde]), R[][ab])))) K3 provides a unified representation of query temporary and storage layouts Abstracts implementation of nesting, e.g. via pointers in main-memory or clustering/linearization Many benefits: simpler DB internals, common optimizer framework for queries and layouts Storage section annotation aggregate(fun < <x,y>, z>. (x*y)+z, 0, flatten(flatten( map(fun <a,b>. map(fun <c,d,e>. map(fun <f,g>. <a,g>, T[][fg]), disk{ map(fun <e>. map(fun <c,d>. <c,d,e>, S.CD[][cd]), S.E[][e]) }), R[][ab])))) Other issues: Language support for index representations (via variant datatypes) and denormalization

  33. Ongoing Work: Incremental Processing in the Cloud Cumulus: facilitates large, shared-nothing main- memory trigger processing Exploit embarrassingly parallelizable trigger programs Map entry updates in a trigger are independent Subclass of our query language (no nesting, only equijoins) yields programs with NC0 parallel complexity [Koch 10] Consistency requirement: prior state must be consistent before processing an update Na ve approach: serial, atomic trigger execution DBToaster Cumulus: a distributed main-memory DDMS Our approach: bounded out-of-order processing Computation performed on out-of-order events is simple, just evaluate delta queries Easy to realize eventual consistency via loosely coordinated, distributed incremental processing

  34. Incremental Processing in the Cloud Cumulus: facilitates large, shared-nothing main- memory trigger processing Trigger: on_insert_customer(ck,nm,nk,bal) { m[][ordkey, sprior] += m_c[][ck, ordkey, sprior]; ... } Exploit embarrassingly parallelizable trigger programs Map entry updates in a trigger are independent Subclass of our query language (no nesting, only equijoins) yields programs with NC0 parallel complexity [Koch 10] Trace: on_insert_customer(1,2,3,4): m[][1,2] += m_c... m[][3,3] += m_c... ... parallel execution within trigger Consistency requirement: prior state must be consistent before processing an update Na ve approach: serial, atomic trigger execution must consistently use prior state Our approach: bounded out-of-order processing Computation performed on out-of-order events is simple, just evaluate delta queries Easy to realize eventual consistency via loosely coordinated, distributed incremental processing on_insert_customer(5,6,7,8): m[][1,2] += m_c... m[][3,3] += m_c... ...

  35. Incremental Processing in the Cloud Cumulus: facilitates large, shared-nothing main- memory trigger processing Exploit embarrassingly parallelizable trigger programs Map entry updates in a trigger are independent Subclass of our query language (no nesting, only equijoins) yields programs with NC0 parallel complexity [Koch 10] Consistency requirement: prior state must be consistent before processing an update Na ve approach: serial, atomic trigger execution Our approach: bounded out-of-order processing Computation performed on out-of-order events is simple, just evaluate delta queries Easy to realize eventual consistency via loosely coordinated, distributed incremental processing

  36. Summary Many applications work with large, evolving datasets made up of both short- and long-lived items. This necessitates agile dynamic data management techniques. Three takeaways: Agile views, lightweight data structures for apps, maintained as incrementally as possible QP engine based on a functional language, to mix tuple-at-a-time and set-at-a-time processing, and specialize physical representations With an incremental core, we can develop loosely co-ordinated distributed systems mechanisms more easily We re building DBToaster, a prototype DDMS, at EPFL and JHU Language embedding, multicore, one-pass batch processing

  37. Questions? Introducing the JHU Data Management Systems Lab http://damsel.cs.jhu.edu Current projects: DBToaster, declarative query optimization Collaborators: Randal Burns, Jason Eisner, Alex Szalay, Andreas Terzis Interests: dynamic data, declarative languages, scientific data management, data intensive computing JHU research highlights: Dyna: weighted logic programming for NLP, and generally AI algorithms Data-intensive architectures: DataScope (5 PB NSF MRI grant), GrayWulf (SC 08 Storage winner, 70GB/s), Amdahl blades Scientific datasets: PanSTARRS & SDSS, Turbulence (27TB), Life Under Your Feet, DC Genome NEW, Sigmod Record 11 article: Scientific Data Management at the Johns Hopkins Institute for Data Intensive Engineering and Science

More Related Content

giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#giItT1WQy@!-/#