share knowledge of OS

Friday, March 18, 2011

Query Processing, Resource Management, and Approximation in a Data Stream Management System

Rajeev Motwani, Jennifer Widom, Arvind Arasu, Brian Babcock, Shivnath Babu,
Mayur Datar, Gurmeet Manku, Chris Olston, Justin Rosenstein, Rohit Varma
Stanford University
http://www-db.stanford.edu/stream
Abstract
This paper describes our ongoing work developing the
Stanford Stream Data Manager (STREAM), a system for
executing continuous queries over multiple continuous
data streams. The STREAM system supports a declarative
query language, and it copes with high data rates
and query workloads by providing approximate answers
when resources are limited. This paper describes specific
contributions made so far and enumerates our next steps
in developing a general-purpose Data Stream Management
System.
1 Introduction
At Stanford we are building a Data Stream Management
System (DSMS) that we call STREAM. The new challenges
in building a DSMS instead of a traditionalDBMS
arise from two fundamental differences:
1. In addition to managing traditional stored data such
as relations, a DSMS must handle multiple continuous,
unbounded, possibly rapid and time-varying
data streams.
2. Due to the continuous nature of the data, a DSMS
typically supports long-running continuous queries,
which are expected to produce answers in a continuous
and timely fashion.
Our goal is to build and evaluate a general-purpose
DSMS that supports a declarative query language and
can cope with high data rates and thousands of continuous
queries. In addition to the obvious need for multi-
This work was supported by NSF Grants IIS-0118173
and IIS-9817799, by Stanford Graduate Fellowships from
Chambers, 3Com and Rambus, by a Microsoft graduate fellowship,
and by grants from Microsoft, Veritas, and the Okawa
Foundation.
Permission to copy without fee all or part of this material is
granted provided that the copies are not made or distributed
for direct commercial advantage, the VLDB copyright notice
and the title of the publication and its date appear, and notice
is given that copying is by permission of the Very Large Data
Base Endowment. To copy otherwise, or to republish, requires
a fee and/or special permission from the Endowment.
Proceedings of the 2003 CIDR Conference
query optimization, judicious resource allocation, and
sophisticated scheduling to achieve high performance,
we are targeting environments where data rates and query
load may exceed available resources. In these cases our
system is designed to provide approximate answers to
continuous queries. Managing the interaction between
resource availability and approximation is an important
focus of our project. We are developing both static techniques
and techniques for adapting as run-time conditions
change.
This paper presents a snapshot of our language design,
algorithms, system design, and system implementation
efforts as of autumn 2002. Clearly we are not presenting
a finished prototype in any sense, e.g., our query language
is designed but only a subset is implemented, and
our approximation techniques have been identified but
are not exploited fully by our resource allocation algorithms.
However, there are a number of concrete contributions
to report on at this point:
An extension of SQL suitable for a general-purpose
DSMS with a precisely-defined semantics (Section 2)
Structure of query plans, accounting for plan sharing
and approximation techniques (Section 3)
An algorithm for exploiting constraints on data
streams to reduce memory overhead during query
processing (Section 4.1)
A near-optimal scheduling algorithm for reducing
inter-operator queue sizes (Section 4.2)
A set of techniques for static and dynamic approximation
to cope with limited resources (Section 5)
An algorithm for allocating resources to queries (in
a limited environment) that maximizes query result
precision (Section 5.3)
A software architecture designed for extensibility
and for easy experimentation with DSMS query processing
techniques (Section 6)
Some current limitations are:
Our DSMS is centralized and based on the relational
model. We believe that distributed query processing
will be essential for many data stream applications,
and we are designing our query processor with a migration
to distributed processing in mind. We may
eventually extend our system to handle XML data
streams, but extending to distributed processing has
higher priority.
We have done no significantwork so far in query plan
generation. Our system supports a subset of our extended
query language with naive translation to a single
plan. It also supports direct input of plans, including
plan component sharing across multiple queries.
Due to space limitations this paper does not include a
section dedicated to related work. We refer the reader
to our recent survey paper [3], which provides extensive
coverage of related work. We do make some comparisons
to other work throughout this paper, particularly
the Aurora project [5], which appears to be the closest in
overall spirit to STREAM. However even these comparisons
are narrow in scope and again we refer the reader
to [3].
2 Query Language
The STREAM system allows direct input of query plans,
similar to the Aurora approach [5] and described briefly
in Section 6. However, the system also supports a declarative
query language using an extended version of SQL.
All queries are continuous, as opposed to the one-time
queries supported by a standard DBMS, so we call our
language CQL (pronounced “sequel”), for Continuous
Query Language. In this section we focus on the syntax
and semantics of continuous queries in CQL.
Queries in a DSMS should handle data from both continuous
data streams and conventional relations. For now
assume a global, discrete, ordered time domain. Timerelated
issues are discussed briefly in Section 2.3.
Streams have the notion of an arrival order, they
are unbounded, and they are append-only. (Updates
can be modeled in a stream using keys, but from
the query-processor perspective we treat streams as
append-only.) A stream can be thought of as a set
(multiset to be precise) of pairs h ; si, indicating that
a tuple s arrives on the stream at time . In addition
to the continuous source data streams that arrive at
the DSMS, streams may result from queries or subqueries
as specified in Section 2.2.
Relations are unordered, and they support updates
and deletions as well as insertions (all of which are
timestamped). In addition to relations stored by the
DSMS, relations may result from queries or subqueries
as specified in Section 2.2.
Syntactically, CQL extends SQL by allowing the
From clause of any query or subquery to contain relations,
streams, or both. A stream in the From clause may
be followed by an optional sliding window specification,
enclosed in brackets, and an optional sampling clause.
CQL also contains two new operators—Istream and
Dstream—whose function is discussed in Section 2.2.
As introduced in [3], in CQL a window specification
consists of an optional partitioning clause, a mandatory
window size, and an optional filtering predicate. The partitioning
clause partitions the data into several groups,
computes a separate window for each group, and then
merges the windows into a single result. It is syntactically
analogous to a grouping clause, using the keywords
Partition By in place of Group By. As in SQL-
99 [19], windows are specified using either Rows (e.g.,
“Rows 50 Preceding”) or Range (e.g., “Range
15 Minutes Preceding”). The filtering predicate
is specified using a standard SQL Where clause.
A sampling clause specifies that a random sample of
the data elements from the stream should be used for
query processing in place of the entire stream. The syntax
of the sampling clause is the keyword Sample parameterized
by percentage sampling rate. For example,
“Sample(2)” indicates that, independently, each data
element in the stream should be retained with probability
0:02 and discarded with probability 0:98.
2.1 Examples
Our example queries reference a stream Requests of
requests to a web proxy server. Each request tuple has
three attributes: client id, domain, and URL.
The following query counts the number of requests for
pages from the domain stanford.edu in the last day.
Select Count(*)
From Requests S [Range 1 Day Preceding]
Where S.domain = ‘stanford.edu’
The semantics of providing continuous answers to this
query (and the next two examples) are covered in Section
2.2.
The following query counts how many page requests
were for pages served by Stanford’s CS department web
server, considering only each client’s 10 most recent
page requests from the domain stanford.edu. This
query makes use of a partitioning clause and also brings
out the distinction between predicates applied before determining
the sliding window cutoffs and predicates applied
after windowing.
Select Count(*)
From Requests S
[Partition By S.client_id
Rows 10 Preceding
Where S.domain = ‘stanford.edu’]
Where S.URL Like ‘http://cs.stanford.edu/%’
Our final example references a stored relation Domains
that classifies domains by the primary type of
web content they serve. This query extracts a 10% sample
of requests to sites that are primarily of type “commerce,”
and from those it streams the URLs of requests
where the client id is in the range [1..1000]. Notice that
Window Specification
Langauge
Operators
Language
Relational Query
Streams Relations
Istream,Dstream
Figure 1: Mappings among streams and relations.
a subquery is used to produce an intermediate stream T
from which the 10% sample is taken.
Select T.URL
From
(Select client_id, URL
From Requests S, Domains R
Where S.domain = R.domain
And R.type = ’commerce’) T Sample(10)
Where T.client_id Between 1 And 1000
2.2 Formal Semantics
One of our contributions is to provide a precise semantics
for continuous queries over streams and relations. In
specifying our semantics we had several goals in mind:
1. We should exploit well-understood relational semantics
to the extent possible.
2. Since transformations are crucial for query optimization,
we should not inhibit standard relational transformations
and we should enable new transformations
relevant to streams.
3. Easy queries should be easy to write, and simple
queries should do what one expects.
Our approach is based on mappings between streams
and relations, as illustrated in Figure 1. We explain each
arc in Figure 1, then specify our query semantics. Recall
again that we assume a global discrete time domain,
further discussed in Section 2.3.
A stream is mapped to a relation by applying a window
specification. In general any windowing language may
be used, but CQL supports the Rows, Range, Partition
By, and Where constructs described earlier. A
window specification is applied to a stream up to a specific
time , and the result is a finite set of tuples which
is treated as a relation. A window specification may be
applied to a source stream, or to a stream produced by a
subquery. The semantics of the Sample operator, which
is applied before windowing, are straightforward and not
discussed again in this section.
Relations are mapped to streams by applying special
operators:
Istream (for “insert stream”) applied to relation R
contains a stream element h ; si whenever tuple s is
in R at time but not in R at time 􀀀 1.
Analogously, Dstream (for “delete stream”) applied
to relation R contains a stream element h ; si
whenever tuple s is in R at time 􀀀1 but not in R at
time .
Although these operators can be specified explicitly in
queries in order to turn relations into streams for windowing,
or to produce streamed rather than relational
query results, the most common case is an implicit
Istream operator as part of CQL’s default behavior,
discussed below.
The last mapping in Figure 1 is from relations to relations
via a relational query language. In general any
relational query language may be used, but CQL relies
on SQL as its relational basis.
Using the mappings in Figure 1, CQL queries can
freely mix relations and streams. However, whenever a
stream is used it is mapped immediately to a relation by
an explicit or implicit window specification. We can now
state the semantics of continuous queries over streams
and relations:
The result of a query Q at time is obtained by taking
all relations at time , all streams up to time
converted to relations by their windowspecifications,
and applying conventional relational semantics. If
the outermost operator is Istream or Dstream
then the query result is converted to a stream, otherwise
it remains as a relation.
Let us briefly consider the three example queries in
Section 2.1. The first two queries are similar and relatively
straightforward: At any time , a window is evaluated
on the Requests stream up to time , a predicate
is applied, and the result contains the number of remaining
tuples. By default, the result is a singleton relation,
however if we add an outermost Istream operator the
result instead streams a new value each time the count
changes. The third query is somewhat more complex,
relying on two CQL defaults: First, when no window
specification is provided the default windowis “[Range
Unbounded Preceding].” Second, whenever the
outermost From list of a non-aggregation query contains
one or more streams, the query result has an Istream
operator applied by default. We leave it to the reader to
verify that the transformations between streams and relations
in this query do indeed produce the desired result.
Space limitations prohibit detailed elaboration of our
semantics, but we briefly discuss the three goals set out
at the beginning of this subsection. First, clearly we rely
heavily on existing relational semantics. Second, relational
transformations continue to hold and our language
supports a number of useful stream-based transformations.
For instance, in our third example query, if relation
Domains is static then we can transform the (default)
Unbounded window on the Requests stream into a
Now window. Finally, although the mapping-based semantics
may appear complex, it is our belief that the defaults
in our language do make easy queries easy to write,
and queries do behave as they appear.
As the most basic of test cases for our semantic goals,
query “Select * From S” should produce stream
S, and it does. The default window for S is Unbounded
(but can be transformed to Now if S contains a key). Each
time a stream element h ; si arrives on S, s is logically
added to the relational result of the query, and thus s
is generated in the default Istream query result with
timestamp .
2.3 Stream Ordering and Timestamps
Our language semantics—or more accurately the ability
to implement our semantics—makes a number of implicit
assumptions about time:
So that we can evaluate row-based (Rows) and timebased
(Range) sliding windows, all stream elements
arrive in order, timestamped according to a global
clock.
So that we can coordinate streams with relation
states, all relation updates are timestamped according
to the same global clock as streams.
So that we can generate query results, the global
clock provides periodic “heartbeats” that tell us when
no further stream elements or relation updates will
occur with a timestamp lower than the heartbeat
value.
If we use a centralized system clock and the system
timestamps stream elements and relation updates as they
arrive, then these assumptions are satisfied. We can also
handle less strict notions of time, including applicationdefined
time. Full details are beyond the scope of this paper,
but out-of-order streams can be ordered by buffering
and waiting for heartbeats, and the absence of a heartbeat
can be compensated for with timeout mechanisms, in the
worst case resulting in some query result imprecision.
Note that the related areas of temporal and sequence
query languages [15, 16] can capture most aspects of the
timestamps and window specifications in our language.
Those languages are considerably more expressive than
our language, and we feel they are “overkill” in typical
data stream environments.
2.4 Inactive andWeighted Queries
Two dynamic properties of queries are controlled
through our administrative interface discussed in Section
6. One property is whether the query is active or
inactive, and the other is the weight assigned to the query.
When a query is inactive, the system may not maintain
the answer to the query as new data arrives. However,
because an inactive query may be activated at any time,
its presence serves as a hint to the system that may influence
decisions about query plans and resource allocation
(Sections 3–5).
Queries may be assigned weights indicating their relative
importance. These weights are taken into account
by the system when it is forced to provide approximate
answers due to resource limitations. Given a choice between
introducing error into the answers of two queries,
the system will attempt to provide more precision for the
query with higher weight. Weights might also influence
scheduling decisions, although we have not yet explored
weighted scheduling. Note that inactive queries may be
thought of as queries with negligible weight.
3 Query Plans
This section describes the basic query processing architecture
of the STREAM system. Queries are registered
with the system and execute continuously as new data arrives.
For now let us assume that a separate query plan is
used for each continuous query, although sharing of plan
components is very important and will be discussed in
Section 3.2. We also assume that queries are registered
before their input streams begin producing data, although
clearly we must address the issue of adding queries over
existing (perhaps partially discarded or archived) data
streams.
It is worth a short digression to highlight a basic difference
between our approach and that of Aurora [5].
Aurora uses one “mega” query plan performing all computation
of interest to all users. Adding a query consists
of directly augmenting portions of the current megaplan,
and conversely for deleting a query. In STREAM,
queries are independent units that logically generate separate
plans, although plans may be combined by the system
and ultimately could result in an Aurora-like megaplan.
To date we have an initial implementation in place
for a fairly substantial subset of the language presented
in Section 2, omitting primarily certain subqueries and
many esoteric features of standard SQL. In this section
we highlight the features of our basic query processing
architecture but do not go into detail about individual
query operators. Most of our operators are stream- or
window-based analogs to operators found in a traditional
DBMS.
A query plan in our system runs continuously and is
composed of three different types of components:
Query operators, similar to a traditional DBMS.
Each operator reads a stream of tuples from a set of
input queues, processes the tuples based on its semantics,
and writes its output tuples into a single output
queue.
Inter-operator queues, also similar to the approach
taken by some traditional DBMS’s. Queues connect
different operators and define the paths along which
tuples flow as they are being processed.
Synopses, used to maintain state associated with operators
and discussed in more detail next.
A synopsis summarizes the tuples seen so far at some
intermediate operator in a running query plan, as needed
for future evaluation of that operator. For example, for
full precision a join operator must remember all the tuples
it has seen so far on each of its input streams, so it
maintains one synopsis for each (similar to a symmetric
hash join [21]). On the other hand, simple filter operators,
such as selection and duplicate-preserving projection,
do not require a synopsis since they need not maintain
state.
For many queries, synopsis sizes grow without bound
if full precision is expected in the query result [1]. Thus,
an important feature to support is synopses that use some
kind of summarization technique to limit their size [10],
e.g., fixed-size hash tables, sliding windows, reservoir
samples, quantile estimates, and histograms. Of course
limited-size synopses may produce approximate operator
results, further discussed in Section 5.
Although operators and synopses are closely coupled
in query plans, we have carefully separated their implementation
and provide generic interfaces for both. This
approach allows us to couple any operator type with any
synopsis type, and it also paves the way for operator and
synopsis sharing. The generic methods of the Operator
class are:
create, with parameters specifying the input
queues, output queue, and initial memory allocation.
changeMem, with a parameter indicating a dynamic
decrease or increase in allocated memory.
run, with a parameter indicating how much work the
operator should perform before returning control to
the scheduler (see Section 4.2).
The generic methods of the Synopsis class are:
create, with a parameter specifying an initial
memory allocation.
changeMem, with a parameter indicating a dynamic
decrease or increase in allocated memory.
insert and delete, with a parameter indicating
the data element to be inserted into or deleted from
the synopsis.
query, whose parameters and behavior depend on
the synopsis type. For example, in a hash-table
synopsis this method might look for matching tuples
with a particular key value, while for a slidingwindow
synopsis this method might support a full
window scan.
So far in our system we have focused on slidings1
s2
s s3 s4
O2
O3
O1
q1
q2
q4
Scheduler
Q1 Q2
q3
R S T
Figure 2: Plans for queries Q1,Q2 over streams R,S,T.
window synopses, which keep a summary of the last
w (0 w 1) tuples of some intermediate stream.
One use of sliding-window synopses is to implement the
window specifications in our query language (Section 2).
For RANGE window specifications we cannot bound the
synopsis size, but for ROWS window specifications the
memory requirement M is determined by the tuple size
and number of rows w. Sliding-window synopses also
are used for approximation (Section 5), in which case
typically w is determined by the tuple size and memory
allocationM.
3.1 Example
Figure 3.1 illustrates plans for two queries, Q1 and Q2.
Together the plans contain three operators O1–O3, four
synopses s1–s4 (two per join operator), and four queues
q1–q4. Query Q1 is a selection over a join of two streams
R and S. Query Q2 is a join of three streams, R, S, and
T. The two plans share a subplan joining streams R and
S by sharing its output queue q3. Plan and queue sharing
is discussed in Section 3.2. Execution of query operators
is controlled by a global scheduler. When an operator O
is scheduled, control passes to O for a period currently
determined by number of tuples processed, although we
may later incorporate timeslice-based scheduling. Section
4.2 considers different scheduling algorithms and
their impact on resource utilization.
3.2 Resource Sharing in Query Plans
As illustrated in Figure 3.1, when continuous queries
contain common subexpressions we can share resources
and computation within their query plans, similar to
multi-query optimization and processing in a traditional
DBMS [14]. We have not yet focused on resource sharing
in our work—we have established a query plan architecture
that enables sharing, and we can combine plans
that have exact matching subexpressions. However, several
important topics are yet to be addressed:
For now we are considering resource sharing and
approximation separately. That is, we do not introduce
sharing that intrinsically introduces approximate
query results, such as merging subexpressions
with different window sizes, sampling rates, or filters.
Doing so may be a very effective technique
when resources are limited, but we have not yet explored
it in sufficient depth to report here.
Our techniques so far are based on exact common
subexpressions. Detecting and exploiting subexpression
containment is a topic of future work that poses
some novel challenges due to window specifications,
timestamps and ordering, and sampling in our query
language.
The implementation of a shared queue (e.g., q3 in Figure
3.1) maintains a pointer to the first unread tuple for
each operator that reads from the queue, and it discards
tuples once they have been read by all parent operators.
Currently multiple queries accessing the same incoming
base data stream S “share” S as a common subexpression,
although we may decide ultimately that input
data streams should be treated separately from common
subexpressions.
The number of tuples in a shared queue at any time
depends on the rate at which tuples are added to the
queue, and the rate at which the slowest parent operator
consumes the tuples. If two queries with a common
subexpression produce parent operators with very different
consumption rates, then it may be preferable not to
use a shared subplan. As an example, consider a queue
q output from a join operator J, and suppose J is very
unselective so it produces nearly the cross-product of its
inputs. If J’s parent P1 in one query is a “heavy consumer,”
then our scheduling algorithm (Section 4.2) is
likely to schedule J frequently in order to produce tuples
for P1 to consume. If J’s parent P2 in another query is a
“light consumer,” then the scheduler will scheduleJ less
frequently so tuples don’t proliferate in q. In this situation
it may not be beneficial for P1 and P2 to share a
common subplan rooted in J.
We have shown formally that although subplan sharing
may be suboptimal in the case of common subexpressions
with joins, for common subexpressions without
joins sharing always is preferable. Details are beyond
the scope of this paper.
When several operators read from the same queue, and
when more than one of those operators builds some kind
of synopsis, then it may be beneficial to introduce synopsis
sharing in addition to subplan sharing. A number
of interesting issues arise, most of which we have not yet
addressed:
Which operator is responsible for managing the
shared synopsis (e.g., allocating memory, inserting
tuples)?
If the synopses required by the different operators are
not of identical types or sizes, is there a theory of
“synopsis subsumption” (and synopsis overlap) that
we can rely on?
If the synopses are identical, how do we cope with
the different rates at which operators may “consume”
data in the synopses?
Clearly we have much work to do in the area of resource
sharing. Note again that the issue of automatic
resource sharing is less crucial in a system like Aurora,
where resource sharing is primarily programmed by
users when they augment the current mega-plan.
4 Resource Management
Effective resource management is a key component of a
data stream management system, and it is a specific focus
of our project. There are a number of relevant resources
in a DSMS: memory, computation, I/O if disk is used,
and network bandwidth in a distributed DSMS. We are
focusing initially on memory consumed by query plan
synopses and queues,1 although some of our techniques
can be applied readily to other resources. Furthermore,
in many cases reducing memory overhead has a natural
side-effect of reducing other resource requirements
as well.
In this section we discuss two techniques we have
developed that can reduce memory overhead dramatically
during query execution. Neither of these techniques
compromises the precision of query results.
1. An algorithm for incorporating known constraints on
input data streams to reduce synopsis sizes. This
work is described in Section 4.1.
2. An algorithm for operator scheduling that minimizes
queue sizes. This work is described in Section 4.2.
In Section 5 we discuss approximation techniques, and
the important interaction between resource allocation
and approximation.
4.1 Exploiting Constraints Over Data Streams
So far we have not discussed exploiting data or arrival
characteristics of input streams during query processing.
Certainly we must be able to handle arbitrary streams,
but when we have additional information about streams,
1Disk also could be used for synopses and queues, although
in that case we might want to treat I/O as a separate resource
given its different performance characteristics, as in Aurora [5].
either by gathering statistics over time or through constraint
specifications at stream-registration time, we can
use this information to reduce resource requirements
without sacrificing query result precision. (An alternate
and more dynamic technique is for the streams to
contain punctuations, which specify run-time constraints
that also can be used to reduce resource requirements;
see [18].)
We have identified several types of constraints over
data streams, and for each constraint type we specify an
“adherence parameter” that captures how closely a given
stream or pair of streams adheres to a constraint of that
type. We have developed query plan construction and execution
algorithms that take stream constraints into account
in order to reduce synopsis sizes at query operators,
while still producing precise output streams. Using
our algorithm, the closer the streams adhere to the specified
constraints at run-time, the smaller the required synopses.
We have implemented our algorithm in a standalone
query processor in order to run experiments, and
our next step is to incorporate it into the STREAM prototype.
As a simple example, consider a continuous query
that joins a stream Orders (hereafter O) with a stream
Fulfillments (hereafter F) based on orderID and
itemID (orders may be fulfilled in multiple pieces), perhaps
to monitor average fulfillment delays. In the general
case, answering this query precisely requires synopses of
unbounded size [1]. However, if we know that all tuples
for a given orderID and itemID arrive on O before
the corresponding tuples arrive on F, then we need not
maintain a join synopsis for the F operand at all. Furthermore,
if F tuples arrive clustered by orderID, then
we need only save O tuples for a given orderID until the
next orderID is seen.
In practice, constraints may not be adhered to by data
streams strictly, even if they “usually” hold. For example,
we may expect tuples on stream F to be clustered
by orderID within a tolerance parameter k: no more than
k tuples with a different orderID appear between two tuples
with same orderID. Similarly, due to network delays
a tuple for a given orderID and itemID may arrive on F
before the corresponding tuple arrives on O, butwemay
be able to bound the time delay with a constant k. These
constants are the “adherence parameters” discussed earlier,
and it should be clear that the smaller the value of k,
the smaller the necessary synopses.
The constraints considered in our work are many-one
join and referential integrity constraints between two
streams, and clustered-arrival and ordered-arrival constraints
on individual streams. Our algorithm accepts
select-project-join queries over streams with arbitrary
constraints, and it produces a query plan that exploits
constraints to reduce synopsis sizes without compromising
precision. Details are beyond the scope of this paper.
4.2 Scheduling
Query plans are executed via a global scheduler, which
calls the run methods of query plan operators (Section
3) in order to make progress in moving tuples
through query plans and producing query results. Our
initial scheduler uses a simple round-robin scheme and
a single granularity for the run operator expressed as
the maximum number of tuples to be consumed from
the operator’s input queue before relinquishing control.
This simple scheduler gives us a functioning system but
clearly is far from optimal for most sets of query plans.
There are many possible objectives for the scheduler,
including stream-based variations of response time,
throughput, and (weighted) fairness among queries. For
our first cut at a more “intelligent” scheduler, we focused
on minimizing peak total queue size during query processing,
in keeping with our general project goal of coping
with limited resources.
Consider the following very simple example. Suppose
we have a query plan with two unary operators: O1 operates
on input queue q1, writing its results to queue q2
which is the input to operator O2. Suppose O1 takes one
time unit to operate on a batch of n tuples from q1, and
it has 20% selectivity, i.e., it introduces n=5 tuples into
q2 when consuming n tuples from q1. (Time units and
batches of n input tuples simplify exposition; their actual
values are not relevant to the overall reasoning in
our example.) Operator O2 takes one time unit to operate
on n=5 tuples, and let us assume that its output is
not queued by the system since it is the final result of
the query. Suppose that over the long-term the average
arrival rate of tuples at q1 is no more than n=2 tuples
per time unit, so all tuples can be processed and queues
will not grow without bound. (If queues do grow without
bound, eventually some form of load shedding must
occur, as discussed in Section 5.2.2.) However, tuple arrivals
may be bursty.
Here are two possible scheduling strategies:
1. Tuples are processed to completion in the order they
arrive at q1. Each batch of n tuples in q1 is processed
by O1 and then O2 based on arrival time, requiring
two time units overall.
2. If there is a batch of n tuples in q1, then O1 operates
on them using one time unit, producing n=5 new tuples
in q2. Otherwise, if there are any tuples in q2,
then up to n=5 of these tuples are operated on by O2,
requiring one time unit.
Suppose we have the following arrival pattern: n tuples
arrive at every time instant from = 1 to = 7, then no
tuples arrive from time = 8 through = 14. On average,
n=2 tuples arrive per unit of time, but with an initial
burst. The following table shows the total size of queues
q1 and q2 under the two scheduling strategies during the
burst, where each table entry is a multiplier for n.
Time 1 2 3 4 5 6 7
Strat. 1 1 1.2 2 2.2 3 3.2 4
Strat. 2 1 1.2 1.4 1.6 1.8 2.0 2.2
After time = 7, queue sizes for both strategies decline
until they reach 0 when time = 15. In this example,
both strategies finish at = 15, and Strategy 2 is clearly
preferable in terms of run-time memory overhead during
the burst.
We have designed a scheduling policy that provably
has near-optimal maximum total queue size and is based
roughly on the general property observed in our example:
Greedily schedule the operator that “consumes” the
largest number of tuples per time unit and is the most selective
(i.e., “produces” the fewest tuples). However, this
per-operator greedy approach may underutilize a highpriority
operator if the operators feeding it are low priority.
Therefore, we consider chains of operators within a
plan when making scheduling decisions. Details of our
scheduling algorithm and the proof of its near-optimality
are fairly involved and not presented due to space limitations.
Scheduling chains of operators also is being considered
in Aurora’s train scheduling algorithm [5], although
for entirely different reasons. Aurora’s objective is to improve
throughput by reducing context-switching between
operators, batching the processing of tuples through operators,
and reducing I/O overhead since their interoperator
queues may be written to disk. So far we have
considered minimizing memory-based peak queue sizes
as our only scheduling objective. Also, we have been
assuming a single thread of control shared among operators,
while Aurora considers multiple threads for different
operators. Eddies [2, 13] uses a scheduling criterion
similar to our per-operator greedy approach, but for routing
individual tuples through operator queues rather than
scheduling the execution of operators. Furthermore, like
Aurora the goal of Eddies is to maximize throughput, unlike
our goal of minimizing total queue size.
While our algorithm achieves queue-size minimization,
it may incur increased time to initial results. In our
example above, although both strategies finish processing
tuples at the same time, Strategy 1 generally has the
potential to produce initial results earlier than Strategy 2.
An important next step is to incorporate response time
and (weighted) fairness across queries into our scheduling
algorithm.
5 Approximations
The previous section described two complementary techniques
for reducing the memory overhead (synopses and
queue sizes respectively) required during query processing.
Even exploiting those techniques, it is our supposition
that the combination of:
multiple unbounded and possibly rapid incoming
data streams,
multiple complex continuous queries with timeliness
requirements, and
finite computation and memory resources
yields an environment where eventually the system will
not be able to provide continuous and timely exact answers
to all registered queries. Our goal is to build a
system that, under these circumstances, degrades gracefully
to approximate query answers. In this section we
present a number of approximation techniques, and we
discuss the close relationship between resource management
and approximation.
When conditions such as data rates and query load
change, the availability and best use of resources change
also. Our overall goal is to maximize query precision
by making the best use of available resources, and ultimately
to have the capability of doing so dynamically
and adaptively. Solving the overall problem (which further
includes inactive and weighted queries as discussed
in Section 2.4) involves a huge number of variables, and
certainly is intractable in the general case.
In the remainder of this section we propose some static
approximation (compile-time) techniques in Section 5.1
and some dynamic approximation (run-time, adaptive)
techniques in Section 5.2. In Section 5.3 we present
our first algorithm—largely theoretical at this point but a
good first step—for allocating memory in order to maximize
result precision.
In comparison with other systems for processing
queries over data streams, both the Telegraph [12] and
Niagara [9] projects do consider resource management
(largely dynamic in the case of Telegraph and static in
the case of Niagara), but not in the context of providing
approximate query answers when available resources are
insufficient. An important contribution was made in Aurora
[5] with the introduction of “QoS graphs” that capture
tradeoffs among precision, response time, resource
usage, and usefulness to the application. However, in
Aurora approximation currently appears to occur solely
through drop-boxes that perform load shedding as described
in Section 5.2.2.
5.1 Static Approximation
In static approximation, queries are modified when they
are submitted to the system so that they use fewer resources
at execution time. The advantages of static approximation
over dynamic approximation (discussed in
Section 5.2) are:
1. Assuming the statically optimized query is executed
precisely by the system, the user is guaranteed certain
query behavior. A user might even participate
in the process of static approximation, guiding or approving
the system’s query modifications.
2. Adaptive approximation techniques and continuous
monitoring of system activity are not required—the
query is modified once, before it begins execution.
The two static approximation techniques we consider are
window reduction and sampling rate reduction.
5.1.1 Window Reduction
Our query language includes a windowing clause for
specifying sliding windows on streams or on subqueries
producing streams (Section 2). By decreasing the size
of a window, or introducing a window where none was
specified originally, both memory and computation requirements
can be reduced. In fact, several proposals
for stream query languages automatically introduce
windows in all joins, sometimes referred to as band
joins, in order to bound the resource requirement, e.g.,
[5, 7, 12, 13, 20].
Suppose W is an operator that incorporates a window
specification, most commonly a windowed join. Reducing
W’s window size not only affects the resources used
by W, but it can have a ripple effect that propagates up
the operator tree—in general a smaller window results
in fewer tuples to be processed by the remainder of the
query plan. However, there are at least two cases where
we need to be careful:
IfW is a duplicate-elimination operator, then shrinking
W’s window can actually increase its output rate.
If W is part of the right-hand subtree of a negation
construct (e.g., NOT EXISTS or EXCEPT), then reducing
the size of W’s output may have the effect of
increasing output further up the query plan.
Fortunately, these “bad” cases can be detected statically
at query modification time, so the system can avoid introducing
or shrinking windows in these situations.
5.1.2 Sampling Rate Reduction
Analogous to shrinking window sizes, we can reduce
the sampling rate when a Sample clause (Section 2)
is applied to a stream or to a subquery producing a
stream. We can also introduce Sample clauses where
not present in the original query. Although changing the
sampling rate at an operator O will not reduce the resource
requirements of O, it will reduce the output rate.
We can also take an existing sample operator and push
it down the query plan. However, we must be careful
to ensure that we don’t introduce biased sampling when
we do so, especially in the presence of joins as discussed
in [8].
5.2 Dynamic Approximation
In our second and more challenging approach, dynamic
approximation, queries are unchanged, but the system
may not always provide precise query answers. Dynamic
approximation has some important advantages over static
approximation:
The level of approximation can vary with fluctuations
in data rates and distributions, query workload, and
resource availability. In “times of plenty,” when loads
are low and resources are high, queries can be answered
precisely, with approximation occurring only
when absolutely necessary.
Approximation can occur at the plan operator level,
and decisions can be made based on the global set of
(possibly shared) query plans running in the system.
Of course a significant challenge from the usability
perspective is conveying to users or applications at any
given time what kind of approximation is being performed
on their queries, and some applications simply
may not want to cope with variable and unpredictable
accuracy. We are considering augmenting our query language
so users can specify tolerable imprecision (e.g.,
ranges of acceptable windowsizes, or ranges of sampling
rates), which offers a middle ground between static and
dynamic approximation.
The three dynamic approximation techniques we consider
are synopsis compression, which is roughly analogous
to window reduction in Section 5.1.1, sampling,
which is analogous to sampling rate reduction in Section
5.1.2, and load shedding.
5.2.1 Synopsis Compression
One technique for reducing the memory overhead of
a query plan is to reduce synopsis sizes at one or more
operators. Incorporating a sliding window into a synopsis
where no window is being used, or shrinking the
existing window, typically shrinks the synopsis. Doing
so is analogous to introducing windows or statically reducing
window sizes through query modification (Section
5.1.1). Note that if plan sharing is in place then modifying
a single window dynamically may affect multiple
queries, and if sophisticated synopsis-sharing algorithms
are being used then different queries may be affected in
different ways.
There are other methods for reducing synopsis size,
including maintaining a sample of the intended synopsis
content (which is not always equivalent to inserting
a sample operator into the query plan), using histograms
[17] or compressed wavelets [11] when the synopsis
is used for aggregation or even for a join [6], and
using Bloom filters [4] for duplicate elimination, set difference,
or set intersection.
All of these techniques share the property that memory
use is flexible, and it can be traded against precision
statically or on-the-fly. Some of the techniques provide
error guarantees, e.g., [11], however we have not solved
the general problem of conveying accuracy to users dynamically.
5.2.2 Sampling and Load Shedding
The two primary consumers of memory in our query
plans are synopses and queues (recall Section 3). In the
previous subsection we discussed approximation techniques
that reduce synopsis sizes (which may as a sideeffect
reduce queue sizes). In this section we mention
approximation techniques that reduce queue sizes (which
may as a side-effect reduce synopsis sizes).
One technique is to introduce one or more sample operators
into the query plan, or to reduce the sampling
rate at existing operators. This approach is the dynamic
analogue of introducing sampling or statically reducing
a sampling rate through query modification (Section
5.1.1), although again we note that when plan sharing
is in place one sampling rate may affect multiple
queries.
We can also simply drop tuples from queues when the
queues grow too large, a technique sometimes referred
to as load shedding [5]. Load shedding at queues differs
from sampling at operators since load shedding may
drop chunks of tuples at a time, instead of eliminating
tuples probabilistically. Both are effective techniques for
reducing queue sizes. While sampling may be more “unbiased,”
dropping chunks of tuples may be easier to implement
and to make decisions about dynamically.
5.3 Resource Allocation toMaximize Precision
In this subsection we present our preliminary work on
allocating resources to query plans with the objective
of maximizing result precision. The work is applicable
whenever resource limitations are expected to force approximate
query results. We address a restricted scenario
for now, but we believe our approach provides a solid basis
for more general algorithms. Consider one query, and
assume the query plan is provided or the system has already
selected a “best” query plan. Plans are expressed
using the operators of relational algebra (including set
difference, which as usual introduces some challenges).
We use a simple model of precision that measures the
accuracy of a query result as its average rate of false positives
and false negatives.
We give a brief overview of our approach and algorithm.
Let us assume that each operator in a query plan
has a known function from resources to precision, typically
based on one or more of the approximation methods
that reduce synopsis sizes discussed earlier in this section.
(We have encoded a number of realistic functions
from memory to precision for several relational operators,
but details are beyond the scope of this paper.) Further
suppose that we know how to compute precision for
a plan from precision for its constituent operators—we
will discuss this computation shortly. Finally, assume we
have fixed total resources. (Resources can be of any type
as long as they can be expressed and allocated numerically.)
Then our goal of allocating resources to operators
in order to maximize overall query precision can be expressed
as a nonlinear optimization problem for which
we use a packaged solver, although we are optimistic
about finding a more efficient formulation.
In the language handled by our resource allocation algorithm,
all operators and plans produce a stream of output
tuples, although ordering is not relevant for the operators
we consider. The precision of a stream—either a
result stream or a stream within a query plan—is defined
by (FP,FN), where FP 2 [0; 1] and FN 2 [0; 1]. FP captures
the false positive rate: the probability that an output
stream tuple is incorrect. FN captures the false negative
rate: the probability, for each correct output stream tuple,
that there is another correct tuple that was missed.
(FP,FN) also can denote the precision of an operator,
with the interpretation that the operator produces a result
stream with (FP,FN) precision when given input(s) with
(0,0) (exact) precision. In all cases, FP and FN denote
expected (mean) precision values over time.
We assume that all plan operators map allocated resources
to precision specifications (FP,FN). Currently
we do not depend on monotonicity—i.e., we do not assume
that more resources result in lower values for FP
and FN—although we can expect monotonicity to hold
and are investigating whether it may help us in our numerical
solver. We have devised (and shown to be correct,
both mathematically and empirically) fairly complex
formulas that, for each operator type, compute output
stream precision (FP,FN) values from the precision
of the input streams and the precision of the operator itself.
We assume the base input streams to a query have exact
precision, i.e., (0,0). We apply our formulas bottomup
to the query plan, feeding the result to the numerical
solver which produces the optimal resource allocation.
The next steps in this work are to incorporate variance
into our precision model, to extend the model to include
value-based precision so we can handle operators such
as aggregation, and eventually to couple plan generation
with resource allocation.
5.4 ResourceManagement and Approximation:
Discussion
Recall that our overall goal is to manage resources carefully,
and to perform approximation in the face of resource
limitations in a flexible, usable, and principled
manner. We want solutions that perform static approximation
based on predictable resource availability (Sections
5.1 and 5.3), and we want alternate solutions that
perform dynamic approximation and resource allocation
to maximize the use of available resources and adapt
to changes in data rates and query loads (Section 5.2).
Although we have solved some pieces of the problem
in limited environments, many important challenges lie
ahead; for example:
We need a means of monitoring synopsis and queue
sizes and determining when dynamic reduction measures
(e.g., window size reduction, load shedding)
should kick in.
Even if we have a good algorithm for initial allocation
of memory to synopses and queues, we
need a reallocation algorithm to handle the inevitable
changes in data rates and distributions.
The ability to add, delete, activate, and deactivate
queries at any time forces all resource allocation
schemes, including static ones, to provide a means
of making incremental changes.
It is clear to us that no system will provide a completely
general and optimal solution to the problems
posed here, particularly in the dynamic case. However,
we will continue to chip away at important pieces of the
problem, with (we hope) the end result being a cohesive
system that achieves good performance and usable, understandable
functionality.
6 Implementation and Interfaces
Since we are developing the STREAM prototype from
scratch we have the opportunity to create an extensible
and flexible software architecture, and to provide useful
interfaces for system developers and “power users” to visualize
and influence system behavior. Here we cover
three features of our design: our generic entities, our encoding
of query plans, and the system interface. Collectively,
these features form the start of a comprehensive
“workbench” we envision for programming and interacting
with the DSMS.
6.1 Entities and Control Tables
In the implementation of our system, operators, queues,
and synopses all are subclasses of a generic Entity
class. Each entity has a table of attribute-values pairs
called its Control Table (CT for short), and each entity
exports an interface to query and update its CT. The CT
serves two purposes in our system so far. First, some CT
attributes are used to dynamically control the behavior of
an entity. For example, the amount of memory used by
a synopsis S can be controlled by updating the value of
attribute Memory in S’s control table. Second, some CT
attributes are used to collect statistics about entity behavior.
For example, the number of tuples that have passed
through a queue q is stored in attribute Count of q’s
control table. These statistics are available for resource
management and for user-level system monitoring. It is
a simple matter to add new attributes to a CT as needs
arise, offering convenient extensibility.
6.2 Query Plans
We want to be able to create, view, understand, and manually
edit query plans in order to explore various aspects
of query optimization. Our query plans are implemented
as networks of entities as described in the previous section,
stored in main memory. A graphical interface is
provided for creating and viewing plans, and for adjusting
attributes of operators, queues, and synopses. The interface
was very easy to implement based on our generic
CT structure, since the same code could be used for most
query plan elements.
Query plansmay be viewed and edited even as queries
are running. Currently we do not support viewing of data
moving through query plans, although we certainly are
planning this feature for the future. Since continuous
queries in a DSMS should be persistent, main-memory
plan structures are mirrored in XML files, which were
easy to design again based on CT attribute-value pairs.
Plans are loaded at system startup, and any modifications
to plans during system execution are reflected in the corresponding
XML. Of course users are free to create and
edit XML plans offline.
6.3 Programmatic and Human Interfaces
Rather than creating a traditional application programming
interface (API), we provide a web interface to the
DSMS through direct HTTP (and we are planning to expose
the system as a web service through SOAP [22]).
Remote applications can be written in any language and
on any platform. They can register queries, they can request
and update CT attribute values, and they can receive
the results of a query as a streaming HTTP response
in XML. For human users, we have developed a webbased
GUI exposing the same functionality.
7 Conclusion and Acknowledgments
A system realizing the techniques described in this
paper is being developed at Stanford. Please
visit http://www.db.stanford.edu/stream,
which also contains links to papers expanding on several
of the topics covered in this paper, including the
query language, exploiting constraints, operator scheduling,
and resource allocation.
We are grateful to Aris Gionis, Jon McAlister, Liadan
O’Callaghan, Qi Sun, and Jeff Ullman for their participation
in the STREAM project, and to Bruce Lindsay for
his excellent suggestion about heartbeats.
References
[1] A. Arasu, B. Babcock, S. Babu, J. McAlister, and
J. Widom. Characterizing memory requirements
for queries over continuous data streams. In Proc.
21st ACM SIGACT-SIGMOD-SIGART Symp. on
Principles of Database Systems, pages 221–232,
Madison,Wisconsin, May 2002.
[2] R. Avnur and J.M. Hellerstein. Eddies: Continuously
adaptive query processing. In Proc.
ACMSIGMOD Intl. Conf. onManagement of Data,
pages 261–272, Dallas, Texas, May 2000.
[3] B. Babcock, S. Babu, M. Datar, R. Motwani, and
J. Widom. Models and issues in data stream
systems. In Proc. 21st ACM SIGACT-SIGMODSIGART
Symp. on Principles of Database Systems,
pages 1–16, Madison,Wisconsin, May 2002.
[4] B. Bloom. Space/time trade-offs in hash coding
with allowable errors. Communications of the
ACM, 13(7):422–426, July 1970.
[5] D. Carney, U. Cetintemel, M. Cherniack, C. Convey,
S. Lee, G. Seidman, M. Stonebraker,N. Tatbul,
and S. Zdonik. Monitoring streams–a new class
of data management applications. In Proc. 28th
Intl. Conf. on Very Large Data Bases, Hong Kong,
China, August 2002.
[6] K. Chakrabarti, M.N. Garofalakis, R. Rastogi, and
K. Shim. Approximate query processing using
wavelets. In Proc. 26th Intl. Conf. on Very Large
Data Bases, pages 111–122, Cairo, Egypt, August
2002.
[7] S. Chandrasekaran and M. Franklin. Streaming
queries over streaming data. In Proc. 28th Intl.
Conf. on Very Large Data Bases, Hong Kong,
China, August 2002.
[8] S. Chaudhuri, R. Motwani, and V.R. Narasayya. On
random sampling over joins. In Proc. ACM SIGMOD
Intl. Conf. on Management of Data, pages
263–274, Philadelphia, Pennsylvania, June 1999.
[9] J. Chen, D.J. DeWitt, F. Tian, and Y. Wang. NiagraCQ:
A scalable continuous query system for
internet databases. In Proc. ACM SIGMOD Intl.
Conf. on Management of Data, pages 379–390,
Dallas, Texas, May 2000.
[10] M.N. Garofalakis, J. Gehrke, and R. Rastogi.
Querying and mining data streams: You only get
one look (tutorial). In Proc. ACM SIGMOD Intl.
Conf. on Management of Data, page 635, Madison,
Wisconsin, May 2002.
[11] M.N. Garofalakis and P.B. Gibbons. Wavelet synopses
with error guarantees. In Proc. ACM SIGMOD
Intl. Conf. on Management of Data, pages
476–487, Madison,Wisconsin, May 2002.
[12] J.M. Hellerstein, M.J. Franklin, et al. Adaptive
query processing: Technology in evolution. IEEE
Data Engineering Bulletin, 23(2):7–18, June 2000.
[13] S. Madden, M. Shah, J. Hellerstein, and V. Raman.
Continuously adaptive continuous queries
over streams. In Proc. ACM SIGMOD Intl. Conf.
on Management of Data, pages 49–60, Madison,
Wisconsin, May 2002.
[14] T.K. Sellis. Multiple-query optimization. ACM
Trans. on Database Systems, 13(1):23–52, March
1988.
[15] P. Seshadri, M. Livny, and R. Ramakrishnan.
The design and implementation of a sequence
database system. In Proc. 22nd Intl. Conf. on Very
Large Data Bases, pages 99–110, Bombay, India,
September 1996.
[16] M.D. Soo. Bibliography on temporal databases.
SIGMOD Record, 20(1):14–24, March 1991.
[17] N. Thaper, S. Guha, P. Indyk, and N. Koudas.
Dynamic multidimensional histograms. In Proc.
ACMSIGMOD Intl. Conf. onManagement of Data,
pages 428–439, Madison,Wisconsin, May 2002.
[18] P. Tucker, D. Maier, T. Sheard, and L. Fegaras.
Punctuated data streams. http://www.cse.ogi.edu/
~ptucker/PStream.
[19] J.D. Ullman and J. Widom. A First Course in
Database Systems (Second Edition). Prentice Hall,
Upper Saddle River, New Jersey, 2002.
[20] S.D. Viglas and J.F. Naughton. Rate-based query
optimization for streaming information sources. In
Proc. ACM SIGMOD Intl. Conf. on Management
of Data, pages 37–48, Madison, Wisconsin, May
2002.
[21] A.N. Wilschut and P.M.G. Apers. Dataflow query
execution in a parallel main-memory environment.
In Proc. Intl. Conf. on Parallel and Distributed
Information Systems, pages 68–77, Miami Beach,
Florida, December 1991.
[22] Web Services Description Language (WSDL) 1.1,
March 2001. http://www.w3.org/TR/wsdl.

STREAM: The Stanford Stream Data Manager The STREAM Group􀀀 Stanford University

Abstract
The STREAM project at Stanford is developing a general-purpose system for processing continuous
queries over multiple continuous data streams and stored relations. It is designed to handle high-volume
and bursty data streams with large numbers of complex continuous queries. We describe the status of
the system as of early 2003 and outline our ongoing research directions.
1 Introduction
The STanford stREam datA Manager (STREAM) project at Stanford is developing a general-purpose Data Stream
Management System (DSMS) for processing continuous queries over multiple continuous data streams and stored
relations. The following two fundamental differences between a DSMS and a traditional DBMS have motivated
us to design and build a DSMS from scratch:
1. A DSMS must handle multiple continuous, high-volume, and possibly time-varying data streams in additional
to managing traditional stored relations.
2. Due to the continuous nature of data streams, a DSMS needs to support long-running continuous queries,
producing answers in a continuous and timely fashion.
A high-level view of STREAM is shown in Figure 1. On the left are the incoming Input Streams, which
produce data indefinitely and drive query processing. Processing of continuous queries typically requires intermediate
state, which we denote as Scratch Store in the figure. This state could be stored and accessed in
memory or on disk. Although we are concerned primarily with the online processing of continuous queries, in
many applications stream data also may be copied to an Archive, for preservation and possible offline processing
of expensive analysis or mining queries. Across the top of the figure we see that users or applications register
Continuous Queries, which remain active in the system until they are explicitly deregistered. Results of continuous
queries are generally transmitted as output data streams, but they could also be relational results that are
updated over time (similar to materialized views).
Copyright 0000 IEEE. Personal use of this material is permitted. However, permission to
reprint/republish this material for advertising or promotional purposes or for creating new collective
works for resale or redistribution to servers or lists, or to reuse any copyrighted component of this
work in other works must be obtained from the IEEE.
Bulletin of the IEEE Computer Society Technical Committee on Data Engineering
Active members as of March 2003: A. Arasu, B. Babcock, S. Babu, M. Datar, K. Ito, R. Motwani, I. Nishizawa, U. Srivastava,
D. Thomas, R. Varma, J. Widom. This work was supported by NSF Grants IIS-0118173 and IIS-9817799, by Stanford Graduate
Fellowships from 3Com, Rambus, and Sequoia Capital, by a Siebel Scholarship, and by support from the Okawa Foundation, Microsoft,
Stanford Networking Research Center, and Veritas. Nishizawa is visiting Stanford from Hitachi, Ltd.
1
Figure 1: Overview of STREAM
Currently STREAM offers a Web system interface through direct HTTP, and we are planning to expose the
system as a Web service through SOAP. Thus, remote applications can be written in any language and on any
platform. Applications can register queries and receive the results of a query as a streaming HTTP response
in XML. To allow interactive use of the system, we have developed a Web-based GUI as an alternative way to
register queries and view results, and we provide an interactive interface for visualizing and modifying system
behavior (see Section 4).
In Sections 2 (Query Language and Processing), 3 (Operator Scheduling), and 4 (User Interface) we describe
the most important components of STREAM. In Section 5 we outline our current research directions. Due to
space limitations this paper does not include a section dedicated to related work. We refer the reader to our
recent survey paper [BBD􀀀 02], which provides extensive coverage of related work.
2 Query Language and Processing
We first describe the query language and semantics for continuous queries supported by STREAM. The latter
half of this section describes STREAM’s query processing architecture.
2.1 Query Language and Semantics
We have designed an abstract semantics and a concrete declarative query language for continuous queries over
data streams and relations. We model a stream as an unbounded, append-only bag of tuple, timestamp

pairs,
and a relation as a time-varying bag of tuples supporting updates and deletions as well as insertions. Our semantics
for continuous queries over streams and relations leverages well-understood relational semantics. Streams
are converted into relations using special windowing operators; transformations on relations are performed using
standard relational operators; then the transformed relational data is (optionally) converted back into a streamed
answer. This semantics relies on three abstract building blocks:
1. A relational query language, which we can view abstractly as a set of relation-to-relation operators.
2. A window specification language used to extract tuples from streams, which we can view as a set of
stream-to-relation operators. In theory these operators need not have anything to do with “windows,” but
2
Window Specification
Relational Query
Streams Relations
Relation−to−Stream
Operators
Language
Language
Figure 2: Mappings used in abstract semantics
in practice windowing is the most common way of producing bounded sets of tuples from unbounded
streams [BBD􀀀 02].
3. A set of relation-to-stream operators.
The interaction among these three building blocks is depicted in Figure 2.
We have developed a concrete declarative query language, CQL (for Continuous Query Language), which
instantiates our abstract semantics. Our language uses SQL as its relational query language, its window specification
language is derived from SQL-99, and it includes three relation-to-stream operators. The CQL language
also supports syntactic shortcuts and defaults for convenient and intuitive query formulation. The complete
specification of our query semantics and CQL is provided in an earlier paper [ABW02]. The interested reader
is referred to our Stream Query Repository [SQR], which contains queries from many realistic stream applications,
including a large number and variety of queries expressed in CQL. A significant fraction of CQL has been
implemented to date, as described in the next section.
2.2 Query Processing
When a continuous query specified in CQL is registered with STREAM, it is compiled into a query plan. The
query plan is merged with existing query plans whenever possible, in order to share computation and memory.
Alternatively, the structure of query plans can be specified explicitly using XML. A query plan in our system
runs continuously and is composed of three different types of components:
1. Query operators correspond to the three types of operators in our abstract semantics (Section 2.1). Each
operator reads tuples from a set of input queues, processes the tuples based on its semantics, and writes
its output tuples into an output queue.
2. Inter-operator queues are used to buffer the output of one operator that is passed as input to one or more
other operators. Incoming stream tuples and relation updates are placed in input queues feeding leaf
operators.
3. Synopses maintain run-time state associated with operators.
STREAM supports the standard relational operators (including aggregation and duplicate elimination), window
operators that compute time-based, tuple-based, and partitioned windows over streams [ABW02], three operators
that convert relations into streams, and sampling operators for approximate query answering. Note that the
queues and synopses for the active query plans in the system comprise the Scratch Store depicted in Figure 1.
A synopsis stores intermediate state at some operator in a running query plan, as needed for future evaluation
of that operator. For example, a sliding-window join operator [KNV03] must have access to all the tuples that are
3
S3 S
S1 S2
S
Q1 Q2
q
q
q q
q
q
1 2
3 4
5 6
Max Join
SW SW
R S
4 5
1 2
Figure 3: STREAM query plans
part of the current window on each of its input streams, so we maintain one sliding-window synopsis (typically a
hash table) for each of these streams. On the other hand, simple filter operators, such as selection and duplicatepreserving
projection, do not require a synopsis since they do not need to maintain state. The most common use
of a synopsis in our system is to materialize a relation or a view (e.g., a sliding window). Synopses can also be
used to store a summary of the tuples in a stream or a relation for approximate query answering. For this reason
we have implemented reservoir samples [Vit85] over streams, and we will soon add Bloom filters [MW􀀀 03].
Figure 3 illustrates plans for two queries,
􀀀
and
􀀀
, over input streams

and

. Query
􀀀
is a windowedaggregate
query: it maintains the maximum value of attribute


over a sliding window on stream

. Query
􀀀
is a sliding-window join query over streams

and

. Together the plans contain four operators SW

, SW

,
Max, and Join, five synopses




, and six queues



. SW

is a sliding-window operator that reads stream

’s tuples from queue

, updates the sliding-window synopsis


, and outputs the inserts and deletes to this
sliding window into queue

. Thus, queue

represents stream

, while queue

represents the relation that is
the sliding-window on stream

. Similarly, SW

processes stream

’s tuples from

, updating synopsis

and
queue

. The Max operator maintains the maximum value of


incrementally over the window on

, using
the inserts and deletes from the window maintained by SW

. Whenever the current maximum value expires
from the window, Max will potentially need to access the entire window to compute the new maximum value.
Thus, Max must materialize this window in its synopsis

. However, since

is simply a time-shifted version
of

, we can share the data store between


and

, as indicated by the dotted arrow from

to

. Similarly,
the sliding-window synopsis

maintained by the join operator Join can be shared with

and

, and
can be shared with

. Also, queue

is shared by Max and Join, effectively sharing the window-computation
subplan between queries
􀀀
and
􀀀
.
The Aurora system [CCC􀀀 02] supports shared queues, used to share storage for sliding windows on streams.
Our system goes a step further in synopsis-sharing, including the ability to share the storage and maintenance
overhead for indexes over the synopses as well. For example, if Max in Figure 3 computed Group By

,
Max


, and Join used the join predicate
!
, then it would be useful to maintain a hash-index over

in synopsis

which both Max and Join could use. We currently support shared windows over streams
where all the window specifications need not be the same, and shared materialized views, which are effectively
common subexpressions in our query plans [CDTW00]. We use novel techniques to eliminate from synopses
tuples that will not be accessed in the future, for example using reasoning based on constraints on the input
4
streams [BW02].
Execution of query operators is controlled by a global scheduler, discussed next in Section 3. The operators
have been implemented in such a way that they make no assumptions about the scheduling policy, giving the
scheduler complete flexibility to adapt its scheduling strategy to the query workload and input stream characteristics.
3 Operator Scheduling
The execution of query plans is controlled by a global scheduler running in the same thread as all the operators
in the system. (The I/O operations are handled by a separate thread.) Each time the scheduler is invoked, it
selects an operator to execute and calls a specific procedure defined for that operator, passing as a parameter the
maximum amount of time that the operator should run before returning control to the scheduler. The operator
may return earlier if its input queues become empty.
The goals of a scheduler in a continuous query system are somewhat different than in a traditional DBMS.
Some traditional scheduling objectives, such as minimizing run-time resource consumption and maximizing
throughput, are applicable in the context of continuous queries, whereas other objectives, such as minimizing
query response time, are not directly relevant in a continuous query setting, though they may have relevant
counterparts (e.g., minimizing average latency of results). One objective that takes on unusual importance when
processing data streams is careful management of run-time resources such as memory. Memory management
is a particular challenge when processing streams because many real data streams are irregular in their rate of
arrival, exhibiting burstiness and variation of data arrival rate over time. This phenomenon has been observed in
networking [FP95], web-page access patterns, e-mail messages [Kle02], and elsewhere. When processing highvolume
and bursty data streams, temporary bursts of data arrival are usually buffered, and this backlog of tuples
is processed during periods of light load. However, it is important for the stream system to minimize the memory
required for backlog buffering. Otherwise, total memory usage can exceed the available physical memory during
periods of heavy load, causing the system to page to disk and limiting system throughput. To address this
problem, we have developed an operator scheduling strategy that minimizes the memory requirement for backlog
buffering [BBDM03]. This strategy, called Chain scheduling, is near-optimal in minimizing run-time memory
usage for single-stream queries involving selections, projections, and foreign-key joins with stored relations.
Chain scheduling also performs well for queries with sliding-window joins over multiple streams, and multiple
queries of the above types.
The basic idea in Chain scheduling is to break up query plans into disjoint chains of consecutive operators
based on their effectiveness in reducing run-time memory usage, favoring operators that “consume” a large
number of tuples per time unit and “produce” few output tuples. This metric also determines the scheduling
priority of each operator chain. Chain scheduling decisions are made by picking the operator chain with highest
priority among those that have operators that are ready to execute and scheduling the first ready operator in
that chain. Complete details of Chain, proofs of its near-optimality, and experimental results demonstrating the
benefits of Chain with respect to other scheduling strategies, are provided in an earlier paper [BBDM03].
While Chain achieves run-time memory minimization, it may suffer from starvation and poor response
times during bursts. As ongoing work, we are considering how to adapt our strategy to take into account these
additional objectives.
4 User Interface
We are developing a comprehensive interactive interface for STREAM users, system administrators, and system
developers to visualize and modify query plans as well as query-specific and system-wide resource allocation
while the system is in operation.
5
4.1 Query Plan Execution
STREAM will provide a graphical interface to visualize the execution of any registered continuous query. Query
plans are implemented as networks of entities, each of which is an operator, a queue, or a synopsis. The query
plan execution visualizer will provide the following features.
1. View the structure of the plan and its component entities.
2. View specific attributes of an entity, e.g., the amount of memory being used by a synopsis in the plan.
3. View data moving through the plan, e.g., tuples entering and leaving inter-operator queues, and synopsis
contents growing and shrinking as operators execute. Depending on the scope of activity individual tuples
or tuple counts can be visualized.
4.2 Global System Behavior
‘The query execution visualizer described in the previous section is useful for visualizing the execution and
resource utilization of a single query, or a small number of queries that may share plan components. However, a
system administrator or developer might want to obtain a more global picture of DSMS behavior. The STREAM
system will provide an interface to visualize system-wide query execution and resource utilization information.
The supported features include:
1. View the entire set of query plans in the system, with the level of detail dependent on the number and size
of plans.
2. View the fraction of memory used by each query in the system, or in more detail by each queue and each
synopsis.
3. View the fraction of processor time consumed by each query in the system.
4.3 Controlling System Behavior
Visualizing query-specific and system-wide execution and resource allocation information is important for system
administrators and developers to understand and tune the performance of a DSMS running long-lived continuous
queries. A sophisticated DSMS should adapt automatically to changing stream characteristics and changing
query load, but it is still useful for “power users” and certainly useful for system developers to have the capability
to control certain aspects of system behavior. STREAM does or will support the following features:
1. Run-time modification of memory allocation, e.g., increasing the memory allocated to one synopsis while
decreasing memory for another.
2. Run-time modification of plan structure, e.g., changing the order of synopsis joins in a query over multiple
streams, or changing the type of synopsis used by a join operator.
3. Run-time modification of the scheduling policy, choosing among several alternative policies.
5 Directions of Ongoing Research
This section outlines the problems that we are addressing currently in the STREAM project, in addition to
implementing the basic prototype as described above. These problems fall broadly into the areas of efficient
query processing algorithms, cost-based optimization and resource allocation, operator scheduling, graceful
degradation under overload, and distributed stream processing.
6
Efficient query processing: Our system needs efficient query processing algorithms to handle high-volume
data streams and large numbers of complex continuous queries. Some of the issues we are addressing in this
area include techniques for sharing computation and memory aggressively among query plans, algorithms for
multi-way sliding-window joins over streams, tradeoffs between incremental computation and recomputation for
different types of continuous queries, and strategies for processing continuous queries efficiently while ensuring
correctness in the absence of time-synchronization among stream sources and the DSMS.
Cost-based optimization and resource allocation: Although we have implemented support for a significant
fraction of CQL in STREAM to date, our query plan generator is fairly naive and uses hard-coded heuristics to
generate query plans. We are now moving towards one-time and dynamic cost-based query optimization of CQL
queries. Since CQL uses SQL as its relational query language, we can leverage most of the one-time optimization
techniques used in traditional relational DBMSs. Our unique optimization techniques include relocating
window operators in query plans, exploiting stream constraints to reduce window sizes without affecting result
correctness, and identifying opportunities for sharing computation (e.g., common subexpression computation,
index maintenance) and memory (synopses and queues). Apart from choosing plans shapes and operators, a
query optimizer must allocate resources such as memory within and across queries. One of the problems we are
addressing in this area is how to allocate resources to query plans so as to maximize result precision whenever
resource limitations force approximate query results. We are also exploring dynamic and adaptive approaches
to query processing and resource allocation. Our adaptive query processing is less fine-grained than Eddies (as
used in the Telegraph project [CC􀀀 03]). Our approach relies on two interacting components: a monitor that captures
properties of streams and system behavior, and an optimizer that can reconfigure query plans and resource
allocation as properties change over time.
Scheduling: As described in Section 3, the Chain scheduling strategy achieves run-time memory minimization,
but it may suffer from poor response times during bursts. As ongoing work, we are adapting Chain to
minimize total run-time memory usage for queries under the constraint that the latency of any query-result tuple
must not exceed a given threshold. Another planned extension needed for a complete scheduling strategy for a
DSMS is the intelligent handling of query workloads where synopses and queues do not all fit into the physical
memory available in the DSMS.
Graceful degradation under overload: There could be large intervals of time when input stream arrival rates
exceed the maximum rate at which the DSMS can process its query workload over these streams. As shown
by the Aurora system [CCC􀀀 02], a general approach to handle such overload situations is load shedding. The
system load is reduced to manageable levels by dropping input tuples selectively so that the overall quality-ofservice
given by the system degrades as little as possible [CCC􀀀 02]. Ongoing work in our project adopts a
similar approach, using sampling-based techniques to drop input tuples with the goal of minimizing the overall
weighted error in query results incurred during overload situations.
Distributed stream processing: A final important aspect of our long-term research agenda is to incorporate
distributed data stream processing techniques into the STREAM system. Data stream sources are frequently
geographically dispersed, and our experiments and simulations show that processing strategies that take this fact
into account can result in significant savings in computation and communication costs [OJW03, BO03]. We
plan to modify STREAM to function in a distributed environment, incorporating specialized distributed data
processing strategies.
7
References
[ABW02] A. Arasu, S. Babu, and J. Widom. An abstract semantics and concrete language for continuous
queries over streams and relations. Technical report, Stanford University Database Group, November
2002. Available at http://dbpubs.stanford.edu/pub/2002-57.
[BBD􀀀 02] B. Babcock, S. Babu, M. Datar, R. Motwani, and J. Widom. Models and issues in data stream
systems. In Proc. of the 2002 ACM Symp. on Principles of Database Systems, pages 1–16, June
2002.
[BBDM03] B. Babcock, S. Babu, M. Datar, and R. Motwani. Chain: Operator scheduling for memory minimization
in data stream systems. In Proc. of the 2003 ACM SIGMOD Intl. Conf. on Management
of Data, June 2003. (To appear).
[BO03] B. Babcock and C. Olston. Distributed top-k monitoring. In Proc. of the 2003 ACM SIGMOD Intl.
Conf. on Management of Data, June 2003. (To appear).
[BW02] S. Babu and J. Widom. Exploiting k-constraints to reduce memory overhead in continuous queries
over data streams. Technical report, Stanford University Database Group, November 2002. Available
at http://dbpubs.stanford.edu/pub/2002-52.
[CC􀀀 03] S. Chandrasekaran, O. Cooper, et al. TelegraphCQ: Continuous dataflow processing for an uncertain
world. In Proc. First Biennial Conf. on Innovative Data Systems Research (CIDR), January 2003.
[CCC􀀀 02] D. Carney, U. Cetintemel, M. Cherniack, C. Convey, S. Lee, G. Seidman, M. Stonebraker, N. Tatbul,
and S. Zdonik. Monitoring streams–a new class of data management applications. In Proc. 28th
Intl. Conf. on Very Large Data Bases, August 2002.
[CDTW00] J. Chen, D. J. DeWitt, F. Tian, and Y. Wang. NiagaraCQ: A scalable continuous query system for
internet databases. In Proc. of the 2000 ACM SIGMOD Intl. Conf. on Management of Data, pages
379–390, May 2000.
[FP95] S. Floyd and V. Paxson. Wide-area traffic: The failure of poisson modeling. IEEE/ACM Transactions
on Networking, 3(3):226–244, June 1995.
[Kle02] J. Kleinberg. Bursty and hierarchical structure in streams. In Proc. of the 2002 ACM SIGKDD Intl.
Conf. on Knowledge Discovery and Data Mining, August 2002.
[KNV03] J. Kang, J. F. Naughton, and S. Viglas. Evaluating window joins over unbounded streams. In Proc.
of the 2003 Intl. Conf. on Data Engineering, March 2003.
[MW􀀀 03] R. Motwani, J.Widom, et al. Query processing, approximation, and resource management in a data
stream management system. In Proc. First Biennial Conf. on Innovative Data Systems Research
(CIDR), January 2003.
[OJW03] C. Olston, J. Jiang, and J. Widom. Adaptive filters for continuous queries over distributed data
streams. In Proc. of the 2003 ACM SIGMOD Intl. Conf. on Management of Data, June 2003. (To
appear).
[SQR] SQR – A Stream Query Repository. http://www-db.stanford.edu/stream/sqr.
[Vit85] J.S. Vitter. Random sampling with a reservoir. ACM Trans. on Mathematical Software, 11(1):37–
57, 1985.
8

Scalability via Summaries: Stream Query Processing using Promising Tuples

M. H. Ali Walid G. Aref M. Y. ElTabakh
Department of Computer Science, Purdue University, West Lafayette, IN
{mhali, aref, meltabak}@cs.purdue.edu
Abstract
In many data streaming applications, streams
may contain data tuples that are either re-
dundant, repetitive, or that are not “interest-
ing” to any of the standing continuous queries.
Processing such tuples may waste system re-
sources without producing useful answers. To
the contrary, some other tuples can be catego-
rized as promising. This paper proposes that
stream query engines can have the option to
execute on promising tuples only and not on
all tuples. We propose to maintain interme-
diate stream summaries and indices that can
direct the stream query engine to detect and
operate on promising tuples. As an illustra-
tion, the proposed intermediate stream sum-
maries are tuned towards capturing promising
tuples that (1) maximize the number of out-
put tuples, (2) contribute to producing a faith-
ful representative sample of the output tuples
(compared to the output produced when as-
suming infinite resources), or (3) produce the
outlier or deviant results. Experiments are
conducted in the context of Nile [24], a pro-
totype stream query processing engine devel-
oped at Purdue University.
1 Introduction
Recently, many applications, e.g., monitoring network
traffic, sensor network applications and retail store
online transaction processing, have moved from the
traditional database paradigm to a more challenging
paradigm in which data evolves infinitely and unpre-
dictably over time to form streams of data. Although
traditional DBMSs have reached some level of matu-
rity in processing various types of queries over their
persistent data layer, data stream systems are still
evolving to cope with the challenging nature of the
streaming environment. Scalability in terms of the
number of streams, stream rates, and number of stand-
ing continuous queries that the system can handle is
still a major challenge for data stream systems. In
this paper, we propose a framework for stream query
processing engines that achieves scalability via the no-
tion of promising tuples. Promising tuples limit the
attention of the query processor to a smaller subset of
the input tuples that preserve the output features with
respect to a specific query preference.
1.1 Motivation
By observing the distribution of tuples in the answer
of various queries, we notice that tuples contribute
with different frequencies to the output. The behav-
ior of the output is usually dominated and shaped by
a smaller subset of the tuples. Focusing on these tu-
ples, the promising tuples, utilizes the available time
budget effectively and produces output with desirable
features.
Consider a join operation between the two streams
in Figure 1 over the depicted time-window. Assume
that, due to scarcity of resources, we are limited to
processing only three tuples from Stream 1. What
would be a smart choice of those three tuples? We
need to assume a specific query preference before we
decide which tuples to choose. For example, consider
the following cases: (1) If we take all the three tuples
to be the “5”’s, we maximize the number of output
tuples (a total of 6 output tuples). (2) If we take two
tuples to be “5” and one tuple to be “9”, we get a total
output of 5 tuples. However, in the latter case, the
output looks more representative to the exact output
of the join operation, i.e., comes close to a random
sample over the exact output. (3) If we take the value
“3” that rarely appears in the streams as one of the
tuples, the output sounds good to some applications
that keep track of irregular behavior to detect outliers.
To map the above example to real life, imagine that
the two streams are coming from the online transac-
tions of two retail stores, where the tuple values rep-
Figure 1: A two stream example
resent identifiers of the sold items. Maximizing the
number of output tuples gives as much points of sim-
ilarity as possible between the two stores. A faithful
representative output avoids excessive duplication of
high frequency elements and produces an output that
spans the exact output set to give a near-random sam-
ple of the output. Low-frequency tuples may express
outliers among the two stores, e.g., selling a very rare
or a very expensive item in the two stores over a short
period of time.
1.2 Contributions
The main contributions of this paper can be summa-
rized as follows:
1. We propose the notion of promising tuples to uti-
lize the available budget of CPU time effectively
in processing tuples that contribute heavily to the
output.
2. We propose an intermediate stream representa-
tion that is capable of indexing the stream’s
promising tuples.
3. We tune various stream operations to adopt the
notion of promising tuples.
4. We conduct an experimental study to evaluate the
effectiveness of promising tuples inside the Nile
stream query processing engine [24].
The rest of this paper is organized as follows. Sec-
tion 2 introduces the concept of promising tuples. Sec-
tion 3 presents the proposed intermediate stream rep-
resentation. Section 4 explains how various types of
operations are performed on top of the intermediate
stream representation. An experimental study that is
based on a real implementation of the promising tuple
approach inside Nile is given in Section 5. Section 6
overviews related work. Finally, the paper is concluded
in Section 7.
2 Promising Tuples
In this section, we introduce the concept of promising
tuples and we present a framework for query processing
using the notion of promising tuples. Promising tuples
can be defined as follows:
Definition 1 Let Q be a query over stream X={x1,
x2, x3, · · ·} and let O={o1, o2, o3, · · ·} be the answer
to the query Q (i.e., Q(X) = O). Let Pq be a prefer-
ence function of query Q that is defined over its answer
O such that Pq(O) = Op where Op O. A promising
􀀀 􀀀












Figure 2: Promising-tuple based processing
tuple is a tuple xp 2 X such that Q(xp) = op where
op 2 Op.
The definition above indicates that a query Q may
have a preference function that is defined over its exact
answer O to yield a preferred answer set Op. A promis-
ing tuple is the tuple that contributes to producing an
output tuple that is part of the query preferred answer
set. A promising region is a region in the stream that
contains plenty of promising tuples. These promising
regions are the regions that are worth processing. We
aim at investing the system’s resources in processing
promising regions to satisfy the query preferences.
Figure 2 illustrates the promising-tuple approach.
In the promising tuple approach, we maintain an in-
termediate stream representation for the largest por-
tion of the stream that is of interest to any query.
Once a new tuple arrives, it is inserted in the inter-
mediate representation where summaries are built on
top of the stream tuples to capture the behavior of
the stream. Sets of promising tuples are extracted
from the stream summaries based on the preferences
of standing queries. An entry in the promising tuple
set is on the form
(promising tuple, promising region, priority)
Each promising tuple points to the stream regions
where it can be found with a specific priority to indi-
cate how much a region is rich in this promising tuple.
The framework of query processing using the notion
of promising tuples is a three-step process and can be
summarized as follows:
1. Identify and extract from summaries the query’s
promising regions.
2. Prioritize the query’s promising regions
3. Execute query on promising regions in the order
of their priorities.
Notice that the promising tuple approach results in an
out-of-order query answer because stream regions are
processed based on their promise regardless of their
arrival order.
The stream intermediate summaries are capable of
indexing a stream’s promising regions and are built in
CREATE SUMMARY summary-name ON stream (attr)
WITH GRANULARITY T1 AND COLLAPSE(rate,
depth)
(a)
SELECT attr1, attr2, · · ·
FROM stream1, stream2, · · ·
WHERE conditions
WITH PREFERENCE [asc|desc|weighted] Pq
FADE FACTOR
WINDOW w
(b)
Figure 3: Extended SQL statements
response to a create-summary statement as shown in
Figure 2a. The create-summary statement instructs
the summary manager to divide the stream into time
granularities and to summarize each granularity. High-
resolution summaries are built over the most recent
granularity that is of length T1 time units. The col-
lapse clause specifies two parameters: the rate and the
depth. The quality of the summaries decreases or col-
lapses as we go to older granularity according to the
rate parameter. The depth parameter avoids the in-
definite growth of the summaries and specifies how far
the summaries are maintained over the past.
Sets of promising tuple are extracted from sum-
maries in response to an SQL continuous query over
a sliding window of size w with a preference clause as
shown in Figure 2b. The preference clause specifies
how sets of promising tuples are extracted and prior-
itized from the stream summaries. The priority of a
promising tuple reduces or fades with a factor of
as we move in the past. Notice that the stream sum-
maries are stream-dependent structures and are con-
structed regardless of the standing queries while the
promising tuple sets are query dependent and are con-
structed based on a query preference (Pq).
Promising tuples focus on the CPU processing cy-
cles as a primary resource. The interarrival time be-
tween two consecutive tuples is our processing budget
where useful computations can take place. To quan-
tify how many output tuples can be generated during
one interarrival time period, we introduce the follow-
ing measure of performance:
No/p =
− tins
thit
(1)
where No/p is the number of output tuples per input
tuple, is the average interarrival time between two
consecutive tuples, tins is the time required to insert
the incoming tuple into the intermediate stream repre-
sentation, and thit is the average time required by the
intermediate representation to guide the search to one
output tuple. Basically, we invest a little portion of
the budget interarrival time ( ) in data summarization
(tins) to reduce the output tuple hit time (thit). Smart
guidance to the stream promising regions reduces ef-
fectively the output hit time, which in turn increases
the number of generated output tuples (No/p).
3 Stream Intermediate Representation
Given the create-summary statement of Figure 2a, it
is required to build an intermediate stream represen-
tation that indexes the incoming stream tuples. The
intermediate stream representation keeps track of in-
coming tuples plus some summaries to direct stream
operations to their promising regions. We propose an
intermediate stream representation that has the fol-
lowing two properties:
1. The support of multiple time granularities, G1,
G2, ...., Gm (Figure 4a) such that:
(a) The number of granularities m corresponds
to the depth of the collapse clause in the
create-summary statement (i.e., m = depth).
(b) Granularities span regions in the stream’s
timeline with sizes that are multiples of each
other (i.e., |Gi+1| = rate × |Gi|, |G1| = T1),
where rate and T1 are specified in the create-
summary statement.
2. The capability to index each time granularity us-
ing a multi-resolution index structure as illus-
trated in Figure 4b (Section 3.4).
Multiple time granularities are desirable to make up
for the bounded memory requirements. Notice that
the summary size is fixed in each granularity despite
the increase in the size of the region that is covered
by that granularity. As data gets older, it moves from
one granularity to the next and a coarser summariza-
tion is applied to the data due to the increase in the
granularity size. This behavior results in a gradual
degradation in the resolution of summaries as we go
farther in the past. The collapse clause in the create-
summary statement specifies the rate at which sum-
maries collapse and how deep summaries can go in the
past. Summaries in each granularity are indexed us-
ing a multi-resolution index structure in order to allow
coarser resolutions to guide the search down the hier-
archy to finer resolutions.
3.1 Two-dimensional Representation of Data
Streams
Each granularity in the stream is divided into smaller
partitions in order to build more accurate summaries
over these partitions. There are two ways to partition
a stream:
1. Partition the stream granularity into value-based
buckets by hashing on the values of incoming tu-
ples.
􀀀
Time
X1
X2
Xn
Item value
Time
Item value
Time
(b)
(a)
(c) (d)
Figure 4: Stream intermediate representation
2. Partition the stream granularity into time-based
zones. Each zone corresponds to a contiguous
time zone of the stream.
In the first approach, buckets that are based on the
tuple values span the timeline of the stream. Conse-
quently, summaries may suffer inaccuracies due to the
change in the stream’s behavior over time. In the sec-
ond approach, a time zone contains all tuple values
in this portion of the stream. Summaries may suffer
inaccuracies due to mixing the summaries of a wide
range of values in the same zone.
Instead of using a one-dimensional space, we pro-
pose to hash the stream tuples over two dimensions:
(a) the value dimension and (b) the time dimension
(Figure 4c). The two-dimensional representation of
one granularity can be summarized as follows:
1. The value dimension is divided into buckets based
on a suitable hash function over the tuple values.
2. The time dimension is divided into time zones
based on the stream behavior. A change in the
stream’s behavior implies a new time zone (Sec-
tion 3.3).
3. A granularity cell is the intersection of a value
bucket with a time zone (Figure 4d).
4. For each granularity cell:
(a) Tuple values are materialized and are linked
in the order of their arrival time.
(b) Summaries are build to provide a rough but
quick estimation of the tuples in that cell
(Section 3.2).
(c) A multi-resolution index is built to speed up
the access to summaries (Section 3.4).
(d) Sets of promising tuples are extracted based
on query preferences (Section 4.1).
3.2 Summary Information
In each granularity cell, we maintain summaries to
capture the behavior of the stream tuples in this cell.
A summary abstract data type (summary-ADT) is
added to the system to encapsulate the functionali-
ties of summarization techniques. Various types of
summaries have been studied in the literature, e.g.,
histograms [21], wavelets [10], samples [7], and aggre-
gates [34]. In our work, the summarization technique
is considered to be a black box as long as it provides
the following set of interface functions:
1. Summarize(x), to insert a tuple x and to add the
effect of its value to the summaries.
2. Estimate(x), to estimate various properties of a
stream tuple x from the summaries. These esti-
mations are used in the context of the query pref-
erence function to specify how promising tuples
are extracted.
3. Confidence(), to report how much confident we
are about the stream summaries. This function is
used as a measure of accuracy.
4. Add(r1,r2), to merge the summaries of regions r1
and r2 into one summary structure (r1). This
function is used to build coarser summaries upon
merging the two regions.
5. Subtract(r1,r2), to subtract the summaries of re-
gion r2 from the summaries of region r1. This
function is used to remove the effect of the sum-
maries of region r2 once it expires, i.e., goes out-
side the time-window of interest.
For implementation purposes, we adopt the counts-
ketch technique as presented in [12]. Countsketches
provide an estimation of both the absolute and the rel-
ative frequencies of the stream tuples. Countsketches
are efficient with respect to the update and the search
operations. The Confidence function takes the vari-
ance of the items in the countsketch as a measure of ac-
curacy. Another interesting feature of countsketches is
that they are additive. Two countsketches can be com-
bined together in linear time simply by adding the two
sketches. Similarly, the effect of one countsketch can
be subtracted from another countsketch in linear time.
Other forms of summarizations, e.g., histograms, can
be addressed to define the same interface functions.
3.3 Self-adjusting Time Zones
As mentioned in Section 3.1, the stream is partitioned
over the infinite time dimension into time zones. Each
zone can have a fixed predefined size. However, fixing
the size of each zone may result in zones containing
various behaviors of the stream which in turn degrades
the accuracy of the summaries.
To improve the accuracy of the summaries, we par-
tition the timeline into zones based on the change
in the stream’s behavior. We sense a change in the
stream’s behavior through the Confidence function of
the summarization technique. Once a tuple arrives,
it is inserted into the summaries of the current zone.
Then, the Confidence function is evaluated. If the con-
fidence is below a certain threshold ( ) or the length
of a time zone exceeded MAXLENGTH, a new time
zone is started and its summaries are initialized. For a
more efficient implementation of detecting changes in
the stream’s behavior, the reader is referred to [26].
3.4 Multi-resolution Index
Our intermediate stream representation makes use of
a multi-resolution index to provide an efficient access
to stream tuples. Coarser resolutions of summaries are
formed by the aggregation of the fine-resolution sum-
maries. The top-level summarization maintains the
coarsest summaries that provide a rough estimation
of how much a tuple is promising. Then, we follow
the hierarchy of the multi-resolution summaries till we
reach the exact regions where the tuple resides.
For each stream granularity, we maintain a
pyramid-like structure [31] to index this granularity
(Figure 4b). The lowest-level cells of the pyramid
contain the stream tuples plus their summaries. The
lowest-level cells (LLC) are on the form:
LLC (tuple-list[ · · · ], Summary-info).
As we go up in the pyramid, the summary information
are aggregated using the add function and pointers to
their original cells are maintained. Upper level cells
(ULC) of the pyramid are on the form:
ULC (Aggregate-Summary-Info, cell-ptrs[ · · · ])
3.5 Multiple Granularities
As mentioned at the beginning of this section, we have
m time granularities: G1, ..., Gm with G1 being the
most recent granularity and Gm being the oldest gran-
ularity (Figure 4a). Each granularity (except G1) is
divided over the time dimension into n time zones. G1
is divided into n′ time zones based on the stream’s be-
havior as explained in Section 3.3. G1 spans the most
recent portion of the stream that is of length T1. T1
is a parameter that is specified by the create-summary
statement (Figure 2a). This parameter defines the ba-
sic granularity of the stream on which other granular-
ities will be based. Granularity Gi spans a portion of
the stream that is of length Ti such that:
Ti = n × Ti−1, i = 2, ...,m.
Notice that old granularities span larger portions
of the stream than recent granularities. As time pro-
ceeds, all the cells of Gi−1 are combined and their sum-
maries are compressed into one cell of Gi. Figure 5
illustrates the movement of data between two granu-
larities of the stream. Let Gi be granularity number i
􀀀






(a)
Procedure COLLAPSE(CZi+1, CZi)
1. CZi = CZi + 1 mod n
2. Add(CZi+1.summary-info, CZi.summary-info)
3. Concatenate(CZi+1.tuple-list, CZi.tuple-list)
4. CZi.Summary-Info=NULL
5. CZi.tuple-list=NULL
6. update the multi-resolution indices over Gi and Gi+1
END.
(b)
Figure 5: Data movement between granularities
of length Ti and let CZi be the current zone in granu-
larity Gi that receives the incoming tuples. All CZi’s
are initially set to 1 and are advanced in a round robin
fashion (CZi = CZi+1 mod n, where n is the number
of time zones in a granularity). CZi, 8i = 2, ...,m,
advances to the next time zone once the current time
zones elapses, i.e., every Ti
n units of time. CZ1 ad-
vances to the next time zone once a change in the
stream’s behavior is detected.
Figure 5b lists the steps required to move CZi to
the next time zone (CZi + 1 mod n). In Step 1, we
advance CZi (where i 2) to the next time zone.
Notice that CZ1 advances based on a change in the
stream behavior and returns to the first time zone in
the granularity if the granularity size becomes T1. This
is not mentioned in the figure for simplicity. In Steps
2 and 3, the summaries and data of CZi are pushed
to the current time zone of granularity Gi+1 (CZi+1).
We free CZi from its summaries and data in Steps
4 and 5. Finally, the multi-resolution index of Gi+1
is updated to reflect the added summaries, which are
subtracted from higher level resolutions of granularity
Gi (Step 6).
4 Data Stream Operations
Promising tuples can be applied widely to various
stream operations. In this section, we explain how
stream operations can benefit from the promising tu-
ple approach. We explore the join, selection, and ag-
gregation operations as example operations.
SELECT S1.ItemNo, S1.TimeStamp, S2.TimeStamp
FROM Store1 S1, Store2 S2
WHERE S1.ItemNo= S2.ItemNo
WITH PREFERENCE [desc|asc|weighted]
SUMMARY.Estimate.Count(S1.ItemNo),
SUMMARY.Estimate.Count(S2.ItemNo)
FADE FACTOR
WINDOW w
Figure 6: An example window join query
4.1 Window Join
Given two streams and a window specification, Fig-
ure 6 gives the SQL query that performs a window join
over the two streams. The two streams are generated
from the online transactions of two retail stores. Each
stream tuple consists of an item number and its asso-
ciated timestamp. The preference function is based on
the summary-estimated counts of the item number in
both streams. One out of three preference specifiers
(i.e., desc, asc, or weighted) is used to tune the prefer-
ence function (Section 4.1.1). The fade factor reduces
of the priority of the time granularities by a factor of
as we move in the past. The window clause specifies
the size of the sliding window of interest. If no window
clause is specified, the window is assumed to span the
whole stream that is seen so far. The details of the
query preferences and the join algorithm are given in
the next sections.
4.1.1 Query Preferences
The preference clause in the query syntax bridges the
gap between the general-purpose stream summaries
and the query-specific promising tuples. Consider the
following three preference specifiers and how they con-
trol the way the priorities are handled in the SQL
query of Figure 6:
1. Desc. The promising tuples are extracted and
processed based on the descending order of the tu-
ples’ Count estimates to give frequent tuples more
priority. Focusing on frequent tuples maximizes
the number of output tuples.
2. Weighted. The CPU time is divided among the
promising tuples based on the ratio of their Count
estimates. This approach avoids the case where
the CPU gets monopolized by high frequency tu-
ples and produces a near-optimal random sample.
3. Asc. The promising tuples are extracted and pro-
cessed based on the ascending order of the tuples’
Count estimates to give the least frequent tuples
more priority, which leads to producing outliers
and deviant tuples. Our notion of outliers consid-
ers a tuple to be an outlier if it occurs with a low
frequency, e.g., a sensor triggering a fire alarm
or the purchase of a rare item. Similarly, other
notions of outliers can be extracted from various
preference functions.
The query processor may consider time in deter-
mining the promise of a tuple, i.e., by favoring output
tuples with more recent timestamps. The fade fac-
tor clause in the query syntax instructs the query
processor to decrease the priority of a promising tuple
based on its time granularity. If a promising tuple is
extracted from granularity i, its priority is decreased
by a factor of i−1.
4.1.2 Window Join Algorithm
Our window join algorithm is a variation of the sym-
metric hash join algorithm [33] that is tuned to bene-
fit from the promising tuple approach. When a tuple
arrives at the system, (1) it is inserted into the sum-
maries of its stream, and (2) it probes the summaries
of the other stream looking for join matches. This
process is symmetric with respect to each stream.
Figure 7 summarizes our join algorithm consider-
ing the join probes of only one stream (i.e., tuples of
stream S1 probes stream S2). In Step 1, the tuple
is inserted into its stream summaries. In step 2, a
pointer is placed at the first granularity of the other
stream. If the granularity intersects with the join win-
dow (Step 3), we investigate all its time zones (Step
3.a). We estimate the priorities of x1 in these zones
and we insert (x1, the time zone, the priority) into a
priority queue. We weight the priority by how old the
granularity is (i.e., by a factor of i−1). Notice that
the multi-resolution index on top of each granularity
prunes many time zones where the tuple is less likely
to occur. This step is not shown in the figure for sim-
plicity. We advance the pointer to the next granularity
(Step 3.b). In step 4, the priority queue (PQ) is tra-
versed and the actual join probes take place until a
time out occurs. The query processor times out the
joining process once another tuple arrives at the sys-
tem. The query processor revisits the priority queue
again once it becomes idle. The query processor it-
erates over standing queries in a round robin fashion.
The priority queue is traversed in ascending, descend-
ing, or weighted order based on the preference specifier
of the query (i.e., asc, desc, or weighted). As a draw-
back of this traversal, the output tuples are generated
in no specific order of their timestamps.
4.2 Selection
Consider the simplest case of restricting the selection
problem to one selection predicate that is applied over
one stream. If this is all what we have in the sys-
tem, then the optimal solution is to apply the selection
predicate over each individual tuple once it arrives at
the system. However, if we have hundreds of queries
such that each query has a different selection predicate,
each query will lose some of the input tuples due to the
limited CPU time that is allocated to each query. The
major problem with the above approach is the random
Procedure JOIN-PROBE
Input. given two streams S1 and S2, a time-window w, a
tuple x1 ∈ S1.
Output. The join pairs on the form (x1,x2) S.T. x2 ∈ S2
and |x1.ts-x2.ts| ≤ w
1. S1.Summarize(x1)
2. i=1
3. while(S2.Gi T ( − w + 1, ) 6= )
(a) for j=1 to n
i. Prio= j−1 × S2.Gi[j].Estimate(x1)
ii. PQ.Insert(x1, S2.Gi[n], Prio)
(b) i = i + 1
4. while (NOT TimeOut()) PQ.Probe()
END.
Figure 7: Window Join Algorithm.
Procedure SELECTION
1. tstart=−∞
2. while (TRUE)
(a) tend=now
(b) Process the selection predicate on top of sum-
maries over the region ( tstart, tend).
(c) Prioritize regions based on their promise.
(d) Process the selection predicate over the identi-
fied promising regions.
(e) tstart=tend
END.
Figure 8: The selection operation
dropping of tuples. There is no control over which tu-
ples a query will lose. Queries may be losing the most
interesting tuples that satisfy their predicates.
The proposed summary-guided selection allocates
some portion of its budget time to summarize the in-
coming stream tuples. Then, each query is guided by
the summaries to its promising regions that are rich
in terms of tuples that satisfy its search predicate.
Following the same framework of the promising tuple
approach, the system iterates over standing selection
queries as described in Figure 8. To avoid reporting
duplicate tuples in the answer, the system remembers
the last tuple that is processed by each query. The
next time a query is dispatched to the query proces-
sor, the query processor resumes the selection process
from where it stopped last time. Tstart and tend mark
the selection boundaries of the current iteration.
Notice that the selection operation need not have
a window clause. In this case, the selection operation
is a filter over the stream tuples. However, if a win-
dow clause with a window of size w is specified, the
selected tuples are buffered for w time units. A tuple
is reported to be expired and is invalidated from the
query answer if its timestamp ts gets older than −w,
where is the current timestamp.
4.3 Aggregates
The behavior of an aggregate is usually dominated
by a smaller subset of tuples, i.e., the promising
tuples in our notion. The challenge is to specify
which set of tuples dominate the behavior of the
aggregate. For example, the behavior of the av-
erage and the median aggregates seems to be pre-
served under a faithful representative subset of tu-
ples. The preference of an average or a median query
may look like: (WITH PREFERENCE weighted SUM-
MARY.Estimate.Count(S1.ItemNo)).
The summation aggregate introduces a new
notion for promising tuples. It assumes that
the behavior of the sum is dominated by the
tuples that have a high value × frequency
product. The preference of a summation query
may look like: (WITH PREFERENCE desc
SUMMARY.Estimate.Count(S1.ItemNo)×S1.ItemNo).
Similarly, other aggregates can define their own no-
tions of promising tuples. In response to the
aggregate queries, the query processor will extract
sets of promising tuples. Then, the regions that are
associated with these sets of promising tuples are
visited and aggregated.
5 Experiments
In this section, we provide an experimental evidence
that the promising tuple approach enhances the per-
formance of stream query processing engines. Four
sets of experiments are conducted. In Section 5.1, we
compare the performance of various forms of interme-
diate stream summaries. Then, we devote a section
for each stream operation. The performance of the
join, selection and aggregate operations is addressed
in Sections 5.2, 5.3, and 5.4, respectively.
Unless mentioned otherwise, data streams are gen-
erated using logs of online retail transactions on the
form (StoreID, TransactionID, TimeStamp, ItemID,
Price). Retail transactions are extracted from three-
month logs of WalMart retail stores. We playback the
data stream logs such that the interarrival time be-
tween two consecutive tuples follows the exponential
distribution with an average of 0.1 second. Stream op-
erations are interested in a one-minute sliding window
over the most recent portion of the stream. All the ex-
periments in this section are based on a real implemen-
tation of the promising tuple approach inside the Nile
stream query processing engine [24]. The Nile engine
executes on a machine with Intel Pentium IV, CPU
2.4GHZ with 512MB RAM running Windows XP.
5.1 Performance of intermediate stream sum-
maries
In this section, we compare the performance of four
intermediate stream summaries: (1) T1, the naive
approach where no summaries are maintained. (2)
3.5
4
4.5
5
5.5
6
6.5
7
7.5
T1 T2 T3 T4
tuple insertion time (usec)
0
10
20
30
40
50
60
70
80
T1 T2 T3 T4
tuple hit time (usec)
0
2
4
6
8
10
12
T1 T2 T3 T4
Number of o/p tuples per i/p tuple
(a) (b) (c)
Figure 9: Cost parameters for various structures
T2, where a one-dimensional summary structure is
built over the time dimension. (3) T3, where a
two-dimensional summary structure is built over the
both the time and the value dimensions. (4) T4,
where a two-dimensional summary structure with self-
adjusting time zones (as proposed in Section 3.3) is
utilized.
Our major measure of performance is the number of
generated output tuples per input tuple as mentioned
in Section 2. Equation 1 gives the number of output
tuples per input tuple in terms of the tuple insertion
time (tins) and the tuple hit time (thit). In Figure 9,
we perform a join operation between two streams of
retail transactions and we monitor both the tins (Fig-
ure 9a) and the thit (Figure 9b) parameters along with
the number of output tuples per an input tuples (Fig-
ure 9c). Notice that as techniques get more sophisti-
cated, the insertion cost increases. However, the out-
put tuple hit time improves and dominates the overall
performance by increasing the total number of output
tuples.
In some cases, it is cheaper and more efficient to
operate on the raw stream and to avoid the summa-
rization overhead. For very high rate streams, the sys-
tem may end up losing all its processing cycles in data
summarization. In Figure 10a, we conduct an exper-
iment to measure the performance of the join opera-
tion between two streams with small average interar-
rival times (< 50msec). Less sophisticated techniques
with small insertion cost become more efficient than
the techniques with high insertion cost. Similarly, if
we are interested in small window sizes, it may be
cheaper to traverse the window tuple by tuple. Figure
10b evaluates the performance of the join operation
under a time window that varies from 10 seconds till
2 minutes with 20 second increments. Notice that the
performance gains are obtained from summary-based
structures as the window gets larger.
5.2 Performance of Window Join
In this section, we study the performance of the join
operation under various notions of promising tuples.
The average join selectivity between the two data
streams (that are drawn from real data sets) is found
to be 6.7%. We compare the performance of the
promising tuple approach to the symmetric hash join
(SIMPLE-SHJ) that is presented in [33]. Consider the
0
1
2
3
4
5
6
10 15 20 25 30 35 40 45 50
Number of output tuples per input tuple
avg interarrival time (msec)
T1
T2
T3
T4
(a)
0
2
4
6
8
10
12
14
16
20 40 60 80 100 120
Number of output tuples per input tuple
window size (sec)
T1
T2
T3
T4
(b)
Figure 10: Effect of small interarrival time and small
window size
following notions of promising tuples:
1. Maximizing the number of output tuples
Figure 11 evaluates the number of output tuples that
are generated by both the simple symmetric hash
join (SIMPLE-SHJ) and the promising-tuple join (PT-
Join) over a one-minute sliding window. Our measure
of performance is the percentage of the exact output
that is obtained at run time. The figure shows that the
promising tuple approach utilizes its processing time
effectively and focuses on the tuples that maximize the
number of output tuples.
Figure 12 evaluates the performance of the join op-
eration under a whole-stream window. In addition to
the number of output tuples (Figure 12a), we monitor
the freshness of the data as another measure of good-
ness. (Figure 12b) assesses the average difference in
timestamps between the two join components as our
measure of freshness (|ts1 − ts2|). This measure in-
dicates how far a tuple goes in the past to look for a
join match. From the figure, notice that the promising
tuple approach favors recent and fresh tuples over old
ones. This behavior is achieved by reducing the prior-
ity of old granularities by a factor of ( is set to be
80% in this experiment). SIMPLE-SHJ has no way to
know in advance whether an incoming tuple will join
with a recent tuple or with an old one.
2. Providing a faithful representation of the
0
10
20
30
40
50
60
70
80
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
Figure 11: Sliding window join
output
In Figure 13, we tune the promising tuples to pro-
vide a faithful representative set of the output. We
take the Hist-MSE or the mean square error between
the histograms of both the exact and the obtained an-
swers as our measure of faithfulness. Let H1 be the
histogram of the exact answer and let H2 be the his-
togram of the obtained answer. Each histogram is an
equi-width histogram of k intervals (k is set to 100).
H1 is divided into H11,H12, · · · ,H1k and H2 is divided
into H21,H22, · · · ,H2k. Let N1 be the size of the exact
answer and let N2 be the size of the obtained answer
(N1 N2). The Hist-MSE is defined as follows:
Hist −MSE =
k
X
i=1
MSE(
h1i
N1
,
h2i
N2
) (2)
Figure 13a shows that PT-Join still provides more
output than SIMPLE SHJ while Figure 13b illustrates
the superiority of PT-Join with respect to the Hist-
MSE measure of performance.
3. Detecting low-frequency tuples and out-
liers
In Figure 13, we tune the promising tuples to detect
low-frequency tuples. This notion of promising tuples
has two desired features: (1) Less number of tuples
are produced (Figure 14a). (2) Most of the produced
tuples are true outliers (Figure 14b). In other words,
the algorithm detects outliers and only outliers with-
out being distracted by other tuples. In this exper-
iment, a tuple is an outlier if its frequency is in the
least 5% percentile of tuple frequencies.
5.3 Performance of the Selection Operation
In this section, we show the applicability of the promis-
ing tuple approach to the selection operation. Fig-
ure 15a shows hundred queries, each performs a dif-
ferent selection predicate over the same stream. The
selectivity of each predicate ranges uniformly from 5%
to 10%. The experiment is conducted using both
the naive selection approach (SIMPLE-Selection) that
feeds the stream to each query separately, and the
0
2
4
6
8
10
12
14
16
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(a)
0
20
40
60
80
100
120
140
160
180
10 20 30 40 50 60 70 80 90 100
|ts1-ts2| (sec)
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(b)
Figure 12: Whole-stream window join
promising tuple approach which makes use of sum-
maries. The promising tuple approach aims at maxi-
mizing the number of output tuples. The percentage
of retrieved tuples relative to the size of the exact re-
sult is calculated for each query and the average over
all queries is taken as our measure of performance.
The figure shows that the promising tuple approach
produces up to 33% output tuples over the SIMPLE
selection (at 100 msec average interarrival).
In Figure 15b, the number of concurrent queries
is varied from 20 to 200 to explore the performance
of the selection operation under various query loads.
The SIMPLE-Selection is efficient at low system loads.
However, the promising tuple approach outperforms
the SIMPLE-Selection with the increase in the query
load (more than 80 concurrent queries).
5.4 Performance of Aggregates
The performance of aggregates is tested on top of
the hundred selection queries that are experimented
in Section 5.3. Aggregates are computed using both
the traditional approach that executes each query sep-
arately over each stream and the promising-tuple ap-
proach. The mean square error (MSE) between the
computed aggregate and the exact aggregate is used
as our measure of performance.
For the sake of illustration, we show the perfor-
0
5
10
15
20
25
30
35
40
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(a)
0
5
10
15
20
25
30
35
40
10 20 30 40 50 60 70 80 90 100
Histogram MSE
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(b)
Figure 13: Output faithful representation
mance of two example aggregates under various stream
rates. The performance of the average and the sum op-
erations is illustrated in Figure 16 a, b, respectively.
The figure shows that the promising tuple approach
exhibits a lower mean square error (MSE) between its
answer and the exact answer for both the average and
the sum operations. Notice that as we increase the
average interarrival time between consecutive tuples,
the MSE decreases because the system gets less loaded
and more input tuples are processed by the system.
6 Related Work
In this section, we overview related work in the context
of data stream processing through three major lines.
First, we list major data stream systems that have
been developed by various research groups. Second,
we survey summarization techniques that are deployed
widely to enhance query performance. Third, we give
examples of stream operations that are explored in lit-
erature.
Prototype data stream systems have recently gained
a lot of research interest. Stanford STREAM [8, 28]
addresses the problem of resource management in the
context of data stream processing. Quality of ser-
vice (QoS) has been investigated in the AURORA
project [1]. The Niagra project [14] adopts the notion
of continuous queries and how group optimizations can
0
2
4
6
8
10
12
14
16
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(a)
0
5
10
15
20
25
30
35
40
45
50
55
10 20 30 40 50 60 70 80 90 100
% of outliers in output
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(b)
Figure 14: Outlier detection
be performed over these queries. Telegraph [11] has
an adaptive query engine where the execution order
of query operators can change during execution. A
framework for query execution plans over data streams
is suggested by the Fjord project [27]. COUGAR [9]
introduces a new data type for sensors. Gigascope [15]
is another data stream system developed at AT&T to
process streams of network traffic.
With the limited CPU time and bounded memory
constraints [5], approximate answers that are obtained
via summaries are accepted in place of the exact ones.
Sampling [7], histograms [21], and wavelets [10, 32] can
represent a stream using lower memory requirements.
Streams are also summarized using sketches [4]. Some
of the sketching techniques have the capability to
maintain a stream’s most frequent items [12]. Sketch-
based processing and sketch sharing among multiple
queries are presented in [19]. The concept of multi-
ple granularities over data streams is proposed in [35].
The work in [29] suggests a gradual degradation of
the summary resolution as data tuples move from one
granularity to the next. This gradual degradation is
referred to by the term amnesic stream approximation.
Statistical models are utilized in [18] to query sensors
interactively. Based on outstanding queries, statistical
models provide an estimate for a sensor reading and
tell how much this estimate is accurate. Consequently,
25
30
35
40
45
50
55
60
65
70
75
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE Selection
PT-Selection
(a)
40
50
60
70
80
90
100
20 40 60 80 100 120 140 160 180 200
% of output tuples
Number of concurrent queries
SIMPLE Selection
PT-Selection
(b)
Figure 15: Performance of the selection operation
one may decide to query the sensor for a fresh reading.
From the query side, various stream operators have
been studied in the literature. For example, the win-
dow join operation is addressed in [16, 20, 22, 23, 25,
30]. Maintaining statistics over sliding windows is in-
vestigated in [17]. Incremental maintenance of tempo-
ral aggregates over windows is presented in [34]. There
has been some ongoing research on random sampling
for some special operations, e.g., the join operation
[3, 13] and the group-by operation [2]. The work in [6]
presents some optimizations for conjunctive queries
over sliding windows.
7 Conclusions
In this paper, we introduced the notion of promising
tuples. Promising tuples are those tuples that con-
tribute heavily to satisfying a specific preference in
the query answer. Example query preferences include:
(1) the maximization of the number of output tuples,
(2) producing a faithful representative sample of the
output tuples, or (3) producing the outlier or deviant
tuples in the result. Other notions of promising tuples
can be flexibly defined to satisfy the query require-
ments. The promising tuples may also consider the
freshness of the stream tuples. We proposed stream
intermediate summaries that are capable of identify-
ing the promising tuples of a stream. The intermediate
2
4
6
8
10
12
14
16
18
20
22
10 20 30 40 50 60 70 80 90 100
MSE
avg interarrival time (msec)
SIMPLE Average
PT-Average
(a) the AVG operation
150
200
250
300
350
400
10 20 30 40 50 60 70 80 90 100
MSE
avg interarrival time (msec)
SIMPLE Sum
PT-Sum
(b) the SUM operation
Figure 16: Performance of aggregates
summaries are organized in a multi-resolution index
that slides in steps over the stream. We proved the
applicability of the promising tuple approach in the
context of various stream operations, i.e., join, selec-
tion, and aggregate operations. Experimental results
show that the notion of promising tuples increases the
effectiveness and resource utilization of a stream query
processing engine. Experiments are based on a real
implementation of the proposed summaries as part of
the summary manager inside the Nile stream query
processing engine
References
[1] D. Abadi, D. Carney, U. Cetintemel, M. Cherniack,
C. Convey, S. Lee, M. Stonebraker, N. Tatbul, and
S. Zdonik. Aurora: A new model and architecture for
data stream management. VLDB Journal, 2:120–139,
August 2003.
[2] S. Acharya, P. B. Gibbons, and V. Poosala. Congres-
sional samples for approximate answering of group-by
queries. In Proceedings of ACM SIGMOD, pages 487–
498, May 2000.
[3] S. Acharya, P. B. Gibbons, V. Poosala, and S. Ra-
maswamy. Join synopses for approximate query an-
swering. In Proceedings of ACM SIGMOD, pages 275–
286, June 1999.
[4] N. Alon, Y. Matias, and M. Szegedy. The space com-
plexity of approximating the frequency moments. In
Proceedings of the annual ACM symp. on Theory of
computing, pages 20–29, May 1996.
[5] A. Arasu, B. Babcock, S. Babu, J. McAlister, and
J. Widom. Characterizing memory requirements
for queries over continuous data streams. TODS,
29(1):162–194, March 2004.
[6] A. Ayad and J. F. Naughton. Static optimization of
conjunctive queries with sliding windows over infinite
streams. In Proceedings of ACM SIGMOD, pages 419–
430, June 2004.
[7] B. Babcoc, M. Datar, and R. Motwani. Sampling from
a moving window over streaming data. In Proceedings
of the Annual ACM-SIAM Symp. on Discrete Algorithms
, pages 633–634, Jan. 2002.
[8] B. Babcock, S. Babu, M. Datar, R. Motwani, and
J. Widom. Models and issues in data stream systems.
In Proceedings of PODS, pages 1–16, June 2002.
[9] P. Bonnet, J. E. Gehrke, and P. Seshadri. Towards
sensor database systems. In Proceedings of the Intl.
Conf. on Mobile Data Management, pages 3–14, Jan.
2001.
[10] K. Chakrabarti, M. N. Garofalakis, R. Rastogi,
and K. Shim. Approximate query processing using
wavelets. In Proceedings of VLDB, pages 111–122,
Sept. 2000.
[11] S. Chandrasekaran, O. Cooper, A. Deshpande, and
et al. Telegraphcq: Continuous dataflow processing
for an uncertain world. In Proceedings of CIDR, Jan.
2003.
[12] M. Charikar, K. Chen, and M. Farach-Colton. Find-
ing frequent items in data streams. In Proceedings
of the Intl. Colloquium on Automata, Languages and
Programming, pages 693–703, July 2002.
[13] S. Chaudhuri, R. Motwani, and V. Narasayya. On
random sampling over joins. In Proceedings of ACM
SIGMOD, pages 263–274, June 1999.
[14] J. Chen, D. J. DeWitt, F. Tian, and Y. Wang. Ni-
agracq: A scalable continuous query system for in-
ternet databases. In Proceedings of ACM SIGMOD,
pages 379–390, May 2000.
[15] C. D. Cranor, T. Johnson, O. Spatscheck, and et al.
Gigascope: A stream database for network applica-
tions. In Proceedings of ACM SIGMOD, pages 647–
651, June 2003.
[16] A. Das, J. Gehrke, and M. Riedewald. Semantic ap-
proximation of data stream joins. IEEE Trans. Knowl.
Data Eng., 17(1):44–59, 2005.
[17] M. Datar, A. Gionis, P. Indyk, and et al. Maintain-
ing stream statistics over sliding windows. In of the
Thirteenth Annual ACM-SIAM Symp. on Discrete Algorithms
, pages 635–644, Jan. 2002.
[18] A. Deshpande, C. Guestrin, S. Madden, J. M. Heller-
stein, and W. Hong. Model-driven data acquisition
in sensor networks. In Proceedings of VLDB, pages
588–599, August 2004.
[19] A. Dobra, M. N. Garofalakis, J. Gehrke, and R. Ras-
togi. Sketch-based multi-query processing over data
streams. In Proceedings of EDBT, pages 551–568,
March 2004.
[20] L. Golab and M. T. Ozsu. Processing sliding window
multi-joins in continuous queries over data streams.
In Proceedings of VLDB, pages 500–511, Sept. 2003.
[21] S. Guha, N. Koudas, and K. Shim. Data-streams and
histograms. In Proceedings of the annual ACM symp.
on Theory of computing, pages 471–475, July 2001.
[22] M. A. Hammad, W. G. Aref, and A. K. Elmagarmid.
Stream window join: Tracking moving objects in
sensor-network databases. In Proceedings of SSDBM,
pages 75–84, July 2003.
[23] M. A. Hammad, M. J. Franklin,W. G. Aref, and A. K.
Elmagarmid. Scheduling for shared window joins over
data streams. In Proceedings of VLDB, pages 297–308,
Sept. 2003.
[24] M. A. Hammad, M. F. Mokbel, M. H. Ali, W. G.
Aref, A. C. Catlin, A. K. Elmagarmid, M. Eltabakh,
M. G. Elfeky, T. Ghanem, R. G. andIhab F. Ilyas,
M. Marzouk, and X. Xiong. Nile: A query processing
engine for data streams. In Proceedings of ICDE, page
851, April 2004.
[25] J. Kang, J. F. Naughton, and S. D. Viglas. Evaluating
window joins over unbounded streams. In Proceedings
of ICDE, pages 341–352, March 2003.
[26] D. Kifer, S. Ben-David, and J. Gehrke. Detecting
change in data streams. In Proceedings of VLDB,
pages 180–191, 2004.
[27] S. Madden and M. Franklin. Fjording the stream: An
architecture for queries over streaming sensor data. In
Proceedings of ICDE, pages 555–566, Feb. 2002.
[28] R. Motwani, J. Widom, A. Arasu, B. Babcock,
S. Babu, M. Datar, G. Manku, C. Olston, J. Rosen-
stein, and R. Varma. Query processing, resource man-
agement, and approximation in a data stream man-
agement system. In Proceedings of CIDR, Jan. 2003.
[29] T. Palpanas, M. Vlachos, E. J. Keogh, D. Gunopulos,
and W. Truppel. Online amnesic approximation of
streaming time series. In Proceedings of ICDE, pages
338–349, April 2004.
[30] U. Srivastava and J. Widom. Memory-limited exe-
cution of windowed stream joins. In Proceedings of
VLDB, pages 324–335, 2004.
[31] S. Tanimoto and T. Pavlidis. A hierarchical data
structure for picture processing. Computer Graphics
and Image Processing, 4(2):104–119, June 1975.
[32] J. S. Vitter and M. Wang. Approximate computa-
tion of multidimensional aggregates of sparse data us-
ing wavelets. In Proceedings of ACM SIGMOD, pages
193–204, June 1999.
[33] A. N. Wilschut and E. M. G. Apers. Pipelining in
query execution. In Proceedings of the International
Conference on Databases, Parallel Architectures and
their Applications, 1991.
[34] J. Yang and J. Widom. Incremental computation and
maintenance of temporal aggregates. In Proceedings
of ICDE, pages 51–60, April 2001.
[35] D. Zhang, D. Gunopulos, V. J. Tsotras, and B. Seeger.
Temporal aggregation over data streams using multi-
ple granularities. In In Proceedings of EDBT, pages
646–663, March 2002.