Friday, March 18, 2011

The Stanford Data Stream Management System

Arvind Arasu, Brian Babcock, Shivnath Babu, John Cieslewicz, Mayur
Datar, Keith Ito, Rajeev Motwani, Utkarsh Srivastava, and Jennifer Widom
Department of Computer Science, Stanford University
Contact author:
1 Introduction
Traditional database management systems are best equipped to run one-
time queries over nite stored data sets. However, many modern applications
such as network monitoring, nancial analysis, manufacturing, and sensor net-
works require long-running, or continuous, queries over continuous unbounded
streams of data. In the STREAM project at Stanford, we are investigating
data management and query processing for this class of applications. As part
of the project we are building a general-purpose prototype Data Stream Man-
agement System (DSMS), also called STREAM, that supports a large class of
declarative continuous queries over continuous streams and traditional stored
data sets. The STREAM prototype targets environments where streams may
be rapid, stream characteristics and query loads may vary over time, and
system resources may be limited.
Building a general-purpose DSMS poses many interesting challenges:
Although we consider streams of structured data records together with
conventional stored relations, we cannot directly apply standard relational
semantics to complex continuous queries over this data. In Sect. 2, we
describe the semantics and language we have developed for continuous
queries over streams and relations.
Declarative queries must be translated into physical query plans that are
flexible enough to support optimizations and ne-grained scheduling deci-
sions. Our query plans, composed of operators, queues, and synopses, are
described in Sect. 3.
Achieving high performance requires that the DSMS exploit possibilities
for sharing state and computation within and across query plans. In ad-
dition, constraints on stream data (e.g., ordering, clustering, referential
integrity) can be inferred and used to reduce resource usage. In Sect. 4,
we describe some of these techniques.
2 Arasu et al.
Since data, system characteristics, and query load may fluctuate over the
lifetime of a single continuous query, an adaptive approach to query exe-
cution is essential for good performance. Our continuous monitoring and
reoptimization subsystem is described in Sect. 5.
When incoming data rates exceed the DSMS's ability to provide exact
results for the active queries, the system should perform load-shedding by
introducing approximations that gracefully degrade accuracy. Strategies
for approximation are discussed in Sect. 6.
Due to the long-running nature of continuous queries, DSMS administra-
tors and users require tools to monitor and manipulate query plans as they
run. This functionality is supported by our graphical interface described
in Sect. 7.
Many additional problems, including exploiting parallelism and supporting
crash recovery, are still under investigation. Future directions are discussed in
Sect. 8.
2 The CQL Continuous Query Language
For simple continuous queries over streams, it can be su cient to use a re-
lational query language such as SQL, replacing references to relations with
references to streams, and streaming new tuples in the result. However, as
continuous queries grow more complex, e.g., with the addition of aggrega-
tion, subqueries, windowing constructs, and joins of streams and relations,
the semantics of a conventional relational language applied to these queries
quickly becomes unclear [3]. To address this problem, we have de ned a for-
mal abstract semantics for continuous queries, and we have designed CQL, a
concrete declarative query language that implements the abstract semantics.
2.1 Abstract Semantics
The abstract semantics is based on two data types, streams and relations,
which are de ned using a discrete, ordered time domain 􀀀:
A stream S is an unbounded bag (multiset) of pairs hs; i, where s is a
tuple and 2 􀀀 is the timestamp that denotes the logical arrival time of
tuple s on stream S.
A relation R is a time-varying bag of tuples. The bag of tuples at time
2 􀀀 is denoted R( ), and we call R( ) an instantaneous relation. Note
that our de nition of a relation di ers from the traditional one which has
no built-in notion of time.
The abstract semantics uses three classes of operators over streams and
STREAM: The Stanford Data Stream Management System 3
Fig. 1. Data types and operator classes in abstract semantics.
A relation-to-relation operator takes one or more relations as input and
produces a relation as output.
A stream-to-relation operator takes a stream as input and produces a
relation as output.
A relation-to-stream operator takes a relation as input and produces a
stream as output.
Stream-to-stream operators are absent|they are composed from operators of
the above three classes. These three classes are \black box" components of our
abstract semantics: the semantics does not depend on the exact operators in
these classes, but only on generic properties of each class. Figure 1 summarizes
our data types and operator classes.
A continuous query Q is a tree of operators belonging to the above classes.
The inputs of Q are the streams and relations that are input to the leaf
operators, and the output of Q is the output of the root operator. The output
is either a stream or a relation, depending on the class of the root operator.
At time , an operator of Q logically depends on its inputs up to : tuples of
Si with timestamps for each input stream Si, and instantaneous relations
Rj( 0), 0 , for each input relation Rj . The operator produces new outputs
corresponding to : tuples of S with timestamp if the output is a stream S,
or instantaneous relation R( ) if the output is a relation R. The behavior of
query Q is derived from the behavior of its operators in the usual inductive
2.2 Concrete Language
Our concrete declarative query language, CQL (for Continuous Query Lan-
guage), is de ned by instantiating the operators of our abstract semantics.
Syntactically, CQL is a relatively minor extension to SQL.
Relation-to-Relation Operators in CQL
CQL uses SQL constructs to express its relation-to-relation operators, and
much of the data manipulation in a typical CQL query is performed using
these constructs, exploiting the rich expressive power of SQL.
4 Arasu et al.
Stream-to-Relation Operators in CQL
The stream-to-relation operators in CQL are based on the concept of a sliding
window [5] over a stream, and are expressed using a window speci cation
language derived from SQL-99:
A tuple-based sliding window on a stream S takes an integer N > 0 as
a parameter and produces a relation R. At time , R( ) contains the N
tuples of S with the largest timestamps . It is speci ed by following
S with \[Rows N]." As a special case, \[Rows Unbounded]" denotes the
append-only window \[Rows 1]."
A time-based sliding window on a stream S takes a time interval ! as a
parameter and produces a relation R. At time , R( ) contains all tuples
of S with timestamps between − ! and . It is speci ed by following S
with \[Range !]." As a special case, \[Now]" denotes the window with
! = 0.
A partitioned sliding window on a stream S takes an integer N and a set of
attributes fA1; : : :;Akg of S as parameters, and is speci ed by following S
with \[Partition By A1,...,Ak Rows N]." It logically partitions S into
di erent substreams based on equality of attributes A1; : : : ; Ak, computes
a tuple-based sliding window of size N independently on each substream,
then takes the union of these windows to produce the output relation.
Relation-to-Stream Operators in CQL
CQL has three relation-to-stream operators: Istream, Dstream, and Rstream.
Istream (for \insert stream") applied to a relation R contains hs; i whenever
tuple s is in R( ) − R( − 1), i.e., whenever s is inserted into R at time .
Dstream (for \delete stream") applied to a relation R contains hs; i whenever
tuple s is in R( − 1) − R( ), i.e., whenever s is deleted from R at time .
Rstream (for \relation stream") applied to a relation R contains the hs; i
whenever tuple s is in R( ), i.e., every current tuple in R is streamed at every
time instant.
Example CQL Queries
Example 1. The following continuous query lters a stream S:
Select Istream(*) From S [Rows Unbounded] Where S.A > 10
Stream S is converted into a relation by applying an unbounded (append-only)
window. The relation-to-relation lter \S.A > 10" acts over this relation, and
the inserts to the ltered relation are streamed as the result. CQL includes a
number of syntactic shortcuts and defaults for convenience, which permit the
above query to be rewritten in the following more intuitive form:
STREAM: The Stanford Data Stream Management System 5
Select * From S Where S.A > 10
Example 2. The following continuous query is a windowed join of two streams
S1 and S2:
Select * From S1 [Rows 1000], S2 [Range 2 Minutes]
Where S1.A = S2.A And S1.A > 10
The answer to this query is a relation. At any given time, the answer relation
contains the join (on attribute A with A > 10) of the last 1000 tuples of S1
with the tuples of S2 that have arrived in previous 2 minutes. If we prefer
instead to produce a stream containing new A values as they appear in the
join, we can write \Istream(S1.A)" instead of \*" in the Select clause.
Example 3. The following continuous query probes a stored table R based on
each tuple in stream S and streams the result.
Select Rstream(S.A, R.B) From S [Now], R Where S.A = R.A
Complete details of CQL including syntax, semantic foundations, syntactic
shortcuts and defaults, equivalences, and a comparison against related con-
tinuous query languages are given in [3].
3 Query Plans and Execution
When a continuous query speci ed in CQL is registered with the STREAM
system, a query plan is compiled from it. Query plans are composed of op-
erators, which perform the actual processing, queues, which bu er tuples (or
references to tuples) as they move between operators, and synopses, which
store operator state.
3.1 Operators
Recall from Sect. 2 that there are two fundamental data types in our query
language: streams, de ned as bags of tuple-timestamp pairs, and relations,
de ned as time-varying bags of tuples. We unify these two types in our imple-
mentation as sequences of timestamped tuples, where each tuple additionally
is flagged as either an insertion (+) or deletion (−). We refer to the tuple-
timestamp-flag triples as elements.
Streams only include + elements, while relations may include both +
and − elements to capture the changing relation state over time. Queues log-
ically contain sequences of elements representing either streams or relations.
Each query plan operator reads from one or more input queues, processes
the input based on its semantics, and writes any output to an output queue.
Individual operators may materialize their relational inputs in synopses (see
Sect. 3.3) if such state is useful.
6 Arasu et al.
Name Operator Type Description
select relation-to-relation Filters elements based on predicate(s)
project relation-to-relation Duplicate-preserving projection
binary-join relation-to-relation Joins two input relations
mjoin relation-to-relation Multiway join from [22]
union relation-to-relation Bag union
except relation-to-relation Bag di erence
intersect relation-to-relation Bag intersection
antisemijoin relation-to-relation Antisemijoin of two input relations
aggregate relation-to-relation Performs grouping and aggregation
duplicate-eliminate relation-to-relation Performs duplicate elimination
seq-window stream-to-relation Implements time-based, tuple-based,
and partitioned windows
i-stream relation-to-stream Implements Istream semantics
d-stream relation-to-stream Implements Dstream semantics
r-stream relation-to-stream Implements Rstream semantics
Table 1. Operators used in STREAM query plans.
The operators in the STREAM system that implement the CQL language
are summarized in Table 1. In addition, there are several system operators
to handle \housekeeping" tasks such as marshaling input and output and
connecting query plans together. During execution, operators are scheduled
individually, allowing for ne-grained control over queue sizes and query la-
tencies. Scheduling algorithms are discussed later in Sect. 4.3.
3.2 Queues
A queue in a query plan connects its \producing" plan operator OP to its
\consuming" operator OC. At any time a queue contains a (possibly empty)
collection of elements representing a portion of a stream or relation. The
elements that OP produces are inserted into the queue and bu ered there
until they are processed by OC.
Many of the operators in our system require that elements on their input
queues be read in nondecreasing timestamp order. Consider, for example, a
window operator OW on a stream S as described in Sect. 2.2. If OW receives
an element hs; ;+i and its input queue is guaranteed to be in nondecreasing
timestamp order, then OW knows it has received all elements with timestamp
0 < , and it can construct the state of the window at time − 1. (If times-
tamps are known to be unique it can construct the state at time .) If, on
the other hand, OW does not have this guarantee, it can never be sure it has
enough information to construct any window correctly. Thus, we require all
queues to enforce nondecreasing timestamps.
Mechanisms for bu ering tuples and generating heartbeats to ensure non-
decreasing timestamps, without sacri cing correctness or completeness, are
discussed in detail in [17].
STREAM: The Stanford Data Stream Management System 7
3.3 Synopses
Logically, a synopsis belongs to a speci c plan operator, storing state that may
be required for future evaluation of that operator. (In our implementation,
synopses are shared among operators whenever possible, as described later
in Sect. 4.1.) For example, to perform a windowed join of two streams, the
join operator must be able to probe all tuples in the current window on each
input stream. Thus, the join operator maintains one synopsis (e.g., a hash
table) for each of its inputs. On the other hand, operators such as selection
and duplicate-preserving union do not require any synopses.
The most common use of a synopsis in our system is to materialize the
current state of a (derived) relation, such as the contents of a sliding win-
dow or the relation produced by a subquery. Synopses also may be used to
store a summary of the tuples in a stream or relation for approximate query
answering, as discussed later in Sect. 6.2.
Performance requirements often dictate that synopses (and queues) must
be kept in memory, and we tacitly make that assumption throughout this
chapter. Our system does support overflow of these structures to disk, al-
though currently it does not implement sophisticated algorithms for minimiz-
ing I/O when overflow occurs, e.g., [20].
3.4 Example Query Plan
When a CQL query is registered, STREAM constructs a query plan: a tree
of operators, connected by queues, with synopses attached to operators as
needed. As a simple example, a plan for the query from Example 2 is shown
in Figure 2. The original query is repeated here for convenience:
Select * From S1 [Rows 1000], S2 [Range 2 Minutes]
Where S1.A = S2.A And S1.A > 10
There are four operators in the example plan: a select, a binary-join,
and one instance of seq-window for each input stream. Queues q1 and q2
hold the input stream elements which could, for example, have been re-
ceived over the network and placed into queues by a system operator (not de-
picted). Queue q3, which is the output queue of the (stream-to-relation) opera-
tor seq-window, holds elements representing the relation \S1 [Rows 1000]."
Queue q4 holds elements for \S2 [Range 2 Minutes]." Queue q5 holds ele-
ments for the joined relation \S1 [Rows 1000] ./ S2 [Range 2 Minutes],"
and from these elements, Queue q6 holds the elements passing the select op-
erator. q6 may lead to an output operator sending elements to the application,
or to another query plan operator within the system.
The select operator can be pushed down into one or both branches be-
low the binary-join operator, and also below the seq-window operator on
S2. However, tuple-based windows do not commute with lter conditions,
8 Arasu et al.
Fig. 2. A simple query plan illustrating operators, queues, and synopses.
and therefore the select operator cannot be pushed below the seq-window
operator on S1.
The plan has four synopses, synopsis1{synopsis4. Each seq-window op-
erator maintains a synopsis so that it can generate \−" elements when tuples
expire from the sliding window. The binary-join operator maintains a syn-
opsis materializing each of its relational inputs for use in performing joins with
tuples on the opposite input, as described earlier. Since the select operator
does not need to maintain any state, it does not have a synopsis.
Note that the contents of synopsis1 and synopsis3 are similar (as are the
contents of synopsis2 and synopsis4), since both maintain a materialization
of the same window, but at slightly di erent positions of stream S1. Sect. 4.1
discusses how we eliminate such redundancy.
3.5 Query Plan Execution
When a query plan is executed, a scheduler selects operators in the plan to
execute in turn. The semantics of each operator depends only on the times-
tamps of the elements it processes, not on system or \wall-clock" time. Thus,
the order of execution has no e ect on the data in the query result, although it
can a ect other properties such as latency and resource utilization. Scheduling
is discussed further in Sect. 4.3.
Continuing with our example from the previous section, the seq-window
operator on S1, on being scheduled, reads stream elements from q1. Suppose
it reads element hs; ;+i. It inserts tuple s into synopsis1, and if the window
STREAM: The Stanford Data Stream Management System 9
is full (i.e., the synopsis already contains 1000 tuples), it removes the earliest
tuple s0 in the synopsis. It then writes output elements into q3: the element
hs; ;+i to reflect the addition of s to the window, and the element hs0; ; −i
to reflect the deletion of s0 as it exits the window. Both of these events oc-
cur logically at the same time instant . The other seq-window operator is
When scheduled, the binary-join operator reads the earliest element
across its two input queues. If it reads an element hs; ;+i from q3, then it
inserts s into synopsis3 and joins s with the contents of synopsis4, generating
output elements hs t; ;+i for each matching tuple t in synopsis4. Similarly,
if the binary-join operator reads an element hs; ; −i from q3, it generates
hs t; ; −i for each matching tuple t in synopsis4. A symmetric process occurs
for elements read from q4. In order to ensure that the timestamps of its output
elements are nondecreasing, the binary-join operator must process its input
elements in nondecreasing timestamp order across both inputs.
Since the select operator is stateless, it simply dequeues elements from
q5, tests the tuple against its selection predicate, and enqueues the identical
element into q6 if the test passes, discarding it otherwise.
4 Performance Issues
In the previous section we introduced the basic architecture of our query pro-
cessing engine. However, simply generating the straightforward query plans
and executing them as described can be very ine cient. In this section, we dis-
cuss ways in which we improve the performance of our system by eliminating
data redundancy (Sect. 4.1), selectively discarding data that will not be used
(Sect. 4.2), and scheduling operators to most e ciently reduce intermediate
state (Sect. 4.3).
4.1 Synopsis Sharing
In Sect. 3.4, we observed that multiple synopses within a single query plan may
materialize nearly identical relations. In Figure 2, synopsis1 and synopsis3
are an example of such a pair.
We eliminate this redundancy by replacing the two synopses with light-
weight stubs, and a single store to hold the actual tuples. These stubs imple-
ment the same interfaces as non-shared synopses, so operators can be oblivi-
ous to the details of sharing. As a result, synopsis sharing can be enabled or
disabled on the fly.
Since operators are scheduled independently, it is likely that operators
sharing a single synopsis store will require slightly di erent views of the data.
For example, if queue q3 in Figure 2 contains 10 elements, then synopsis3
will not reflect these changes (since the binary-join operator has not yet
processed them), although synopsis1 will. When synopses are shared, logic in
10 Arasu et al.
Fig. 3. A query plan illustrating synopsis sharing.
the store tracks the progress of each stub, and presents the appropriate view
(subset of tuples) to each of the stubs. Clearly the store must contain the
union of its corresponding stubs: A tuple is inserted into the store as soon as
it is inserted by any one of the stubs, and it is removed only when it has been
removed from all of the stubs.
To further decrease state redundancy, multiple query plans involving sim-
ilar intermediate relations can share synopses as well. For example, suppose
the following query is registered in addition to the query in Sect. 3.4:
Select A, Max(B) From S1 [Rows 200] Group By A
Since sliding windows are contiguous in our system, the window on S1 in this
query is a subset of the window on S1 in the other query. Thus, the same
data store can be used to materialize both windows. The combination of the
two query plans with both types of sharing is illustrated in Figure 3.
4.2 Exploiting Constraints
Streams may exhibit certain data or arrival patterns that can be exploited
to reduce run-time synopsis sizes. Such constraints can either be speci ed
explicitly at stream-registration time, or inferred by gathering statistics over
time [6]. (An alternate and more dynamic technique is for the streams to
contain punctuations, which specify run-time constraints that also can be
used to reduce resource requirements [21].)
STREAM: The Stanford Data Stream Management System 11
As a simple example, consider a continuous query that joins a stream
Orders with a stream Ful llments based on attributes orderID and itemID,
perhaps to monitor average ful llment delays. In the general case, answering
this query precisely requires synopses of unbounded size [2]. However, if we
know that all elements for a given orderID and itemID arrive on Orders before
the corresponding elements arrive on Ful llments, then we need not maintain
a join synopsis for the Ful llments operand at all. Furthermore, if Ful llments
elements arrive clustered by orderID, then we need only save Orders tuples
for a given orderID until the next orderID is seen.
We have identi ed several types of useful constraints over data streams.
E ective optimizations can be made even when the constraints are not strictly
met by de ning an adherence parameter, k, that captures how closely a given
stream or pair of streams adheres to a constraint of that type. We refer to
these as k-constraints:
A referential integrity k-constraint on a many-one join between streams
de nes a bound k on the delay between the arrival of a tuple on the \many"
stream and the arrival of its joining \one" tuple on the other stream.
An ordered-arrival k-constraint on a stream attribute S:A de nes a bound
k on the amount of reordering in values of S:A. Speci cally, given any
tuple s in stream S, for all tuples s0 that arrive at least k + 1 elements
after s, it must be true that s0:A s:A.
A clustered-arrival k-constraint on a stream attribute S:A de nes a bound
k on the distance between any two elements that have the same value of
We have developed query plan construction and execution algorithms that
take stream constraints into account in order to reduce synopsis sizes at query
operators by discarding unnecessary state [9]. The smaller the value of k for
each constraint, the more state that can be discarded. Furthermore, if an
assumed k-constraint is not satis ed by the data, our algorithm produces an
approximate answer whose error is proportional to the degree of deviation of
the data from the constraint.
4.3 Operator Scheduling
An operator consumes elements from its input queues and produces elements
on its output queue. Thus, the global operator scheduling policy can have a
large e ect on memory utilization, particularly with bursty input streams.
Consider the following simple example. Suppose we have a query plan
with two operators, O1 followed by O2. Assume that O1 takes one time unit
to process a batch of n elements, and it produces 0:2n output elements per
input batch (i.e., its selectivity is 0.2). Further, assume that O2 takes one time
unit to operate on 0:2n elements, and it sends its output out of the system.
(As far as the system is concerned, O2 produces no elements, and therefore
12 Arasu et al.
its selectivity is 0.) Consider the following bursty arrival pattern: n elements
arrive at every time instant from t = 0 to t = 6, then no elements arrive from
time t = 7 through t = 13.
Under this scenario, consider the following scheduling strategies:
FIFO scheduling: When batches of n elements have been accumulated,
they are passed through both operators in two consecutive time units,
during which no other element is processed.
Greedy scheduling: At any time instant, if there is a batch of n elements
bu ered before O1, it is processed in one time unit. Otherwise, if there
are more than 0:2n elements bu ered before O2, then 0:2n elements are
processed using one time unit. This strategy is \greedy" since it gives
preference to the operator that has the greatest rate of reduction in total
queue size per unit time.
The following table shows the expected total queue size for each strategy,
where each table entry is a multiplier for n.
Time 0 1 2 3 4 5 6 Avg
FIFO scheduling 1.0 1.2 2.0 2.2 3.0 3.2 4.0 2.4
Greedy scheduling 1.0 1.2 1.4 1.6 1.8 2.0 2.2 1.6
After time t = 6, input queue sizes for both strategies decline until they reach
0 after time t = 13. The greedy strategy performs better because it runs O1
whenever it has input, reducing queue queue size by 0:8n elements each time
step, while the FIFO strategy alternates between executing O1 and O2.
However, the greedy algorithm has its shortcomings. Consider a plan with
operators O1, O2, and O3. O1 produces 0:9n elements per n input elements in
one time unit, O2 processes 0:9n elements in one time unit without changing
the input size (i.e., it has selectivity 1), and O3 processes 0:9n elements in
one time unit and sends its output out of the system (i.e., it has selectivity
0). Clearly, the greedy algorithm will prioritize O3 rst, followed by O1, and
then O2. If we consider the arrival pattern in the previous example then our
total queue size is as follows (again as multipliers for n):
Time 0 1 2 3 4 5 6 Avg
FIFO scheduling 1.0 1.9 2.9 3.0 3.9 4.9 5.0 3.2
Greedy scheduling 1.0 1.9 2.8 3.7 4.6 5.5 6.4 3.7
In this case, the FIFO algorithm is better. Under the greedy strategy,
although O3 has highest priority, sometimes it is \blocked" from running
because it is preceded by O2, the operator with the lowest priority. If O1, O2
and O3 are viewed as a single block, then together they reduce n elements
to zero elements over three units of time, for an average reduction of 0:33n
elements per unit time|better than the reduction rate of 0:1n elements O1
provides. Since the greedy algorithm considers individual operators only, it
does not take advantage of this fact.
STREAM: The Stanford Data Stream Management System 13
Fig. 4. Adaptive query processing.
This observation forms the basis of our chain scheduling algorithm [4]. Our
algorithm forms blocks (\chains") of operators as follows: Start by marking
the rst operator in the plan as the \current" operator. Next, nd the block
of consecutive operators starting at the \current" operator that maximizes
the reduction in total queue size per unit time. Mark the rst operator fol-
lowing this block as the \current" operator and repeat the previous step until
all operators have been assigned to chains. Chains are scheduled according
to the greedy algorithm, but within a chain, execution proceeds in FIFO or-
der. In terms of overall memory usage, this strategy is provably close to the
optimal \clairvoyant" scheduling strategy, i.e., the optimal strategy based on
knowledge of future input [4].
5 Adaptivity
In long-running stream applications, data and arrival characteristics of streams
may vary signi cantly over time [13]. Query loads and system conditions
may change as well. Without an adaptive approach to query processing, per-
formance may drop drastically over time as the environment changes. The
STREAM system includes a monitoring and adaptive query processing in-
frastructure called StreaMon [10].
StreaMon has three components as shown in Figure 4(a): an Executor,
which runs query plans to produce results, a Pro ler, which collects and
maintains statistics about stream and plan characteristics, and a Reoptimizer,
which ensures that the plans and memory structures are the most e cient for
current characteristics. In many cases, we combine the pro ler and executor
to reduce the monitoring overhead.
The Pro ler and Reoptimizer are essential for adaptivity, but they compete
for resources with the Executor. We have identi ed a clear three-way tradeo
among run-time overhead, speed of adaptivity, and provable convergence to
14 Arasu et al.
good strategies if conditions stabilize. StreaMon supports multiple adaptive
algorithms that lie at di erent points along this tradeo spectrum.
StreaMon can detect useful k-constraints (recall Sect. 4.2) in streams and
exploit them to reduce memory requirements for many continuous queries.
In addition, it can adaptively adjust the adherence parameter k based on the
actual data in the streams. Figure 4(b) shows the portions of StreaMon's Pro-
ler and Reoptimizer that handle k-constraints, referred to as k-Mon. When
a query is registered, the optimizer noti es the pro ler of potentially useful
constraints. As the executor runs the query, the pro ler monitors the input
streams continuously and informs the reoptimizer whenever it detects a change
in a k value for any of these constraints. The reoptimizer component adapts
to these changes by adding or dropping constraints used by the executor and
adjusting k values used for memory allocation.
StreaMon also implements an algorithm called Adaptive Greedy (or A-
Greedy) [7], which maintains join orders adaptively for pipelined multiway
stream joins, also known as MJoins [22]. Figure 4(c) shows the portions
of StreaMon's Pro ler and Reoptimizer that comprise the A-Greedy algo-
rithm. Using A-Greedy, StreaMon monitors conditional selectivities and or-
ders stream joins to minimize overall work in current conditions. In addition,
StreaMon detects when changes in conditions may have rendered current or-
derings suboptimal, and reorders in those cases. In stable conditions, the or-
derings converged on by the A-Greedy algorithm are equivalent to those se-
lected by a static Greedy algorithm that is provably within a cost factor < 4
of optimal. In practice, the Greedy algorithm, and therefore A-Greedy, nearly
always nds the optimal orderings.
In addition to adaptive join ordering, we use StreaMon to adaptively add
and remove subresult caches in stream join plans, to avoid recomputation of
intermediate results [8]. StreaMon monitors costs and bene ts of candidate
caches, selects caches to use, allocates memory to caches, and adapts over the
entire spectrum between stateless MJoins and cache-rich join trees, as stream
and system conditions change.
Currently we are in the process of applying the StreaMon approach to
make even more aspects of the STREAM system adaptive, including sharing
of synopses and subplans, and operator scheduling.
6 Approximation
In many applications data streams can be bursty, with unpredictable peaks
during which the load may exceed available system resources, especially if nu-
merous complex queries have been registered. Fortunately, for many stream
applications (e.g., in many monitoring tasks), it is acceptable to degrade ac-
curacy gracefully by providing approximate answers during load spikes [18].
There are two primary ways in which a DSMS may be resource-limited:
STREAM: The Stanford Data Stream Management System 15
CPU-limited (Sect. 6.1) { The data arrival rate may be so high that there
is insu cient CPU time to process each stream element. In this case, the
system may approximate by dropping elements before they are processed.
Memory-limited (Sect. 6.2) { The total state required for all registered
queries may exceed available memory. In this case, the system may selec-
tively retain some state, discarding the rest.
6.1 CPU-Limited Approximation
CPU usage can be reduced by load-shedding|dropping elements from query
plans and saving the CPU time that would be required to process them to
completion. We implement load-shedding by introducing sampling operators
that probabilistically drop stream elements as they are input to the query
The time-accuracy tradeo s for sampling are more understandable for
some query plans than others. For example, if we know a few basic statistics
on the distribution of values in our streams, probabilistic guarantees on the ac-
curacy of sliding-window aggregation queries for a given sampling rate can be
derived mathematically, as we will show in below. However, in more complex
queries|ones involving joins, for example|the error introduced by sampling
is less clear and the choice of error metric may be application-dependent.
Suppose we have a set of sliding-window aggregation queries over the input
streams. A simple example is:
Select Avg(Temp) From SensorReadings [Range 5 Minutes]
If we have many such queries in a CPU-limited setting, our goal is to sample
the inputs so as to minimize the maximum relative error across all queries.
(As an extension, we can weight the relative errors to provide \quality-of-
service" distinctions.) It follows that we should select sampling rates such
that the relative error is the same for all queries. Assume that for a given
query Qi we know the mean i and standard deviation i of the values we are
aggregating, as well as the window size Ni. These statistics can be collected
by the pro ler component in the StreaMon architecture (recall Sect. 5). We
can use the Hoe ding inequality [16] to derive a bound on the probability
that our relative error exceeds a given threshold max for a given sampling
rate. We then x at a low value (e.g., 0.01) and algebraically manipulate
this equation to derive the required sampling rate Pi [6]:
Pi =
i + 2
2Ni 2

Our load-shedding policy solves for the best achievable max given the
constraint that the system, after inserting load-shedders, can keep up with
the arrival of elements. It then adds sampling operators at various points in
the query plan such that e ective sampling rate for a query Qi is Pi.
16 Arasu et al.
6.2 Memory-Limited Approximation
Even using our scheduling algorithm that minimizes memory devoted to
queues (Sect. 4.3), and our constraint-aware execution strategy that mini-
mizes synopsis sizes (Sect. 4.2), if we have many complex queries with large
windows (e.g., large tuple-based windows, or any size time-based windows
over rapid data streams), memory may become a constraint. Spilling to disk
may not be a feasible option due to online performance requirements.
In this scenario, memory usage can be reduced at the cost of accuracy
by reducing the size of synopses at one or more operators. Incorporating a
window into a synopsis where no window is being used, or shrinking the
existing window, will shrink the synopsis. Note that if sharing is in place
(Sect. 4.1), then modifying a single synopsis may a ect multiple queries.
Reducing the size of a synopsis generally tends to also reduce the sizes
of synopses above it in the query plan, but there are exceptions. Consider a
query plan where a sliding-window synopsis is used by a duplicate-elimination
operator. Shrinking the window size can increase the operator's output rate,
leading to an increase in the size of \later" synopses. Fortunately, most of
these cases can be detected statically when the query plan is generated, and
the system can avoid reducing synopsis sizes in such cases.
There are other methods for reducing synopsis size, including maintaining
a sample of the intended synopsis content (which is not always equivalent
to inserting a sample operator into the query plan), using histograms [19] or
wavelets [12] when the synopsis is used for aggregation or even for a join,
and using Bloom lters [11] for duplicate elimination, set di erence, or set
intersection. In addition, synopsis sizes can be reduced by lowering the k
values for known k-constraints (Sect. 4.2). Lower k values cause more state to
be discarded, but result in loss of accuracy if the constraint does not hold for
the assumed k. All of these techniques share the property that memory use is
flexible, and it can be traded against precision statically or dynamically.
See Sect. 8.3 for discussion on future directions related to approximation.
7 The STREAM System Interface
In a system for continuous queries, it is important for users, system admin-
istrators, and system developers to have the ability to inspect the system
while it is running and to experiment with adjustments. To meet these needs,
we have developed a graphical query and system visualizer for the STREAM
system. The visualizer allows the user to:
View the structure of query plans and their component entities (operators,
queues, and synopses). Users can view the path of data flow through each
query plan as well as the sharing of computation and state within the plan.
STREAM: The Stanford Data Stream Management System 17
Fig. 5. Screenshot of the STREAM visualizer.
View the detailed properties of each entity. For example, the user can
inspect the amount of memory being used (for queue and synopsis entities),
the current throughput (for queue and operator entities), selectivity of
predicates (for operator entities), and other properties.
Dynamically adjust entity properties. These changes are reflected in the
system in real time. For example, an administrator may choose to increase
the size of a queue to better handle bursty arrival patterns.
View monitoring graphs that display time-varying entity properties such
as queue sizes, throughput, overall memory usage, and join selectivity,
plotted dynamically against time.
A screenshot of our visualizer is shown in Figure 5. The large pane at
the left displays a graphical representation of a currently selected query plan.
The particular query shown is a windowed join over two streams, R and S.
Each entity in the plan is represented by an icon: the ladder-shaped icons
are queues, the boxes with magnifying glasses over them are synopses, the
window panes are windowing operators, and so on. In this example, the user
has added three monitoring graphs: the rate of element flow through queues
above and below the join operator, and the selectivity of the join.
The upper-right pane displays the property-value table for a currently
selected entity. The user can inspect this list and can alter the values of some
of the properties interactively. Finally, the lower-right pane displays a legend
of entity icons and descriptions for reference.
Our technique for implementing the monitoring graphs shown in Fig-
ure 5 is based on introspection queries on a special system stream called
18 Arasu et al.
SysStream. Every entity can publish any of its property values at any time
onto SysStream. When a speci c dynamic monitoring task is desired, e.g.,
monitoring recent join selectivity, the relevant entity writes its statistics pe-
riodically on SysStream. Then a standard CQL query, typically a windowed
aggregation query, is registered over SysStream to compute the desired con-
tinuous result, which is fed to the monitoring graph in the visualizer. Users
and applications can also register arbitrary CQL queries over SysStream for
customized monitoring tasks.
8 Future Directions
At the time of writing we plan to pursue the following general directions of
future work.
8.1 Distributed Stream Processing
So far we have considered a centralized DSMS model where all processing
takes place at a single system. In many applications the stream data is actu-
ally produced at distributed sources. Moving some processing to the sources
instead of moving all data to a central system may lead to more e cient use
of processing and network resources. Many new challenges arise if we wish to
build a fully distributed data stream system with capabilities equivalent to
our centralized system.
8.2 Crash Recovery
The ability to recover to a consistent state following a system crash is a key
feature of conventional database systems, but has yet to be investigated for
data stream systems. There are some fundamental di erences between DBMSs
and DSMSs that play important role in crash recovery:
The notion of consistent state in a DBMS is de ned based on transac-
tions, which are closely tied to the conventional one-time query model.
ACID transactional properties do not map directly to the continuous query
In a DBMS, the data in the database cannot change during down-time. In
contrast, many stream applications deliver data to the DSMS from outside
sources that do not stop generating data while the system is down, possibly
requiring the DSMS to \catch up" following a crash.
In a DBMS, queries underway at the time of a crash may be forgotten|it is
the responsibility of the application to restart them. In contrast, registered
continuous queries are part of the persistent state of a DSMS.
STREAM: The Stanford Data Stream Management System 19
These di erences lead us to believe that new mechanisms are needed for crash
recovery in data stream systems. While logging of some type and perhaps
even some notion of transactions may form a component of the solution, new
techniques will be required as well.
8.3 Improved Approximation
Although some aspects of the approximation problem have already been ad-
dressed (see Sect. 6), more work is needed to address the problem in its full
generality. In the memory-limited case, work is needed on the problem of sam-
pling over arbitrary subqueries, computing \maximum-subset" as opposed to
sampling approximations, and maximizing accuracy over multiple weighted
queries. In the CPU-limited case, we need to address a broader range of
queries, especially considering joins. Finally, we need to handle situations
when the DSMS may be both CPU and memory-limited.
A signi cant challenge related to approximation is developing mechanisms
whereby the system can indicate to users or applications that approximation is
occurring, and to what degree. The converse is also important: mechanisms for
users to indicate acceptable degrees of approximation. As one step in the latter
direction, we are developing extensions to CQL that enable the speci cation of
\approximation guidelines" so that the user can indicate acceptable tolerances
and priorities.
8.4 Relationship to Publish-Subscribe Systems
In a publish-subscribe (pub-sub) system, e.g., [1, 14, 15], events may be pub-
lished continuously, and they are forwarded by the system to users who have
registered matching subscriptions. Clearly we can map a pub-sub system to
a DSMS by considering publications as streams and subscriptions as continu-
ous queries. However, the techniques we have developed so far for processing
continuous queries in a DSMS have been geared primarily toward a relatively
small number of independent, complex queries, while a pub-sub system has
potentially millions of simple, similar queries. We are exploring techniques to
bridge the capabilities of the two: From the pub-sub perspective, provide a
system that supports a more general model of subscriptions. From the DSMS
perspective, extend our approach to scale to an extremely larger number of
