share knowledge of OS

Friday, March 18, 2011

Scalability via Summaries: Stream Query Processing using Promising Tuples

M. H. Ali Walid G. Aref M. Y. ElTabakh
Department of Computer Science, Purdue University, West Lafayette, IN
{mhali, aref, meltabak}@cs.purdue.edu
Abstract
In many data streaming applications, streams
may contain data tuples that are either re-
dundant, repetitive, or that are not “interest-
ing” to any of the standing continuous queries.
Processing such tuples may waste system re-
sources without producing useful answers. To
the contrary, some other tuples can be catego-
rized as promising. This paper proposes that
stream query engines can have the option to
execute on promising tuples only and not on
all tuples. We propose to maintain interme-
diate stream summaries and indices that can
direct the stream query engine to detect and
operate on promising tuples. As an illustra-
tion, the proposed intermediate stream sum-
maries are tuned towards capturing promising
tuples that (1) maximize the number of out-
put tuples, (2) contribute to producing a faith-
ful representative sample of the output tuples
(compared to the output produced when as-
suming infinite resources), or (3) produce the
outlier or deviant results. Experiments are
conducted in the context of Nile [24], a pro-
totype stream query processing engine devel-
oped at Purdue University.
1 Introduction
Recently, many applications, e.g., monitoring network
traffic, sensor network applications and retail store
online transaction processing, have moved from the
traditional database paradigm to a more challenging
paradigm in which data evolves infinitely and unpre-
dictably over time to form streams of data. Although
traditional DBMSs have reached some level of matu-
rity in processing various types of queries over their
persistent data layer, data stream systems are still
evolving to cope with the challenging nature of the
streaming environment. Scalability in terms of the
number of streams, stream rates, and number of stand-
ing continuous queries that the system can handle is
still a major challenge for data stream systems. In
this paper, we propose a framework for stream query
processing engines that achieves scalability via the no-
tion of promising tuples. Promising tuples limit the
attention of the query processor to a smaller subset of
the input tuples that preserve the output features with
respect to a specific query preference.
1.1 Motivation
By observing the distribution of tuples in the answer
of various queries, we notice that tuples contribute
with different frequencies to the output. The behav-
ior of the output is usually dominated and shaped by
a smaller subset of the tuples. Focusing on these tu-
ples, the promising tuples, utilizes the available time
budget effectively and produces output with desirable
features.
Consider a join operation between the two streams
in Figure 1 over the depicted time-window. Assume
that, due to scarcity of resources, we are limited to
processing only three tuples from Stream 1. What
would be a smart choice of those three tuples? We
need to assume a specific query preference before we
decide which tuples to choose. For example, consider
the following cases: (1) If we take all the three tuples
to be the “5”’s, we maximize the number of output
tuples (a total of 6 output tuples). (2) If we take two
tuples to be “5” and one tuple to be “9”, we get a total
output of 5 tuples. However, in the latter case, the
output looks more representative to the exact output
of the join operation, i.e., comes close to a random
sample over the exact output. (3) If we take the value
“3” that rarely appears in the streams as one of the
tuples, the output sounds good to some applications
that keep track of irregular behavior to detect outliers.
To map the above example to real life, imagine that
the two streams are coming from the online transac-
tions of two retail stores, where the tuple values rep-
Figure 1: A two stream example
resent identifiers of the sold items. Maximizing the
number of output tuples gives as much points of sim-
ilarity as possible between the two stores. A faithful
representative output avoids excessive duplication of
high frequency elements and produces an output that
spans the exact output set to give a near-random sam-
ple of the output. Low-frequency tuples may express
outliers among the two stores, e.g., selling a very rare
or a very expensive item in the two stores over a short
period of time.
1.2 Contributions
The main contributions of this paper can be summa-
rized as follows:
1. We propose the notion of promising tuples to uti-
lize the available budget of CPU time effectively
in processing tuples that contribute heavily to the
output.
2. We propose an intermediate stream representa-
tion that is capable of indexing the stream’s
promising tuples.
3. We tune various stream operations to adopt the
notion of promising tuples.
4. We conduct an experimental study to evaluate the
effectiveness of promising tuples inside the Nile
stream query processing engine [24].
The rest of this paper is organized as follows. Sec-
tion 2 introduces the concept of promising tuples. Sec-
tion 3 presents the proposed intermediate stream rep-
resentation. Section 4 explains how various types of
operations are performed on top of the intermediate
stream representation. An experimental study that is
based on a real implementation of the promising tuple
approach inside Nile is given in Section 5. Section 6
overviews related work. Finally, the paper is concluded
in Section 7.
2 Promising Tuples
In this section, we introduce the concept of promising
tuples and we present a framework for query processing
using the notion of promising tuples. Promising tuples
can be defined as follows:
Definition 1 Let Q be a query over stream X={x1,
x2, x3, · · ·} and let O={o1, o2, o3, · · ·} be the answer
to the query Q (i.e., Q(X) = O). Let Pq be a prefer-
ence function of query Q that is defined over its answer
O such that Pq(O) = Op where Op O. A promising
􀀀 􀀀












Figure 2: Promising-tuple based processing
tuple is a tuple xp 2 X such that Q(xp) = op where
op 2 Op.
The definition above indicates that a query Q may
have a preference function that is defined over its exact
answer O to yield a preferred answer set Op. A promis-
ing tuple is the tuple that contributes to producing an
output tuple that is part of the query preferred answer
set. A promising region is a region in the stream that
contains plenty of promising tuples. These promising
regions are the regions that are worth processing. We
aim at investing the system’s resources in processing
promising regions to satisfy the query preferences.
Figure 2 illustrates the promising-tuple approach.
In the promising tuple approach, we maintain an in-
termediate stream representation for the largest por-
tion of the stream that is of interest to any query.
Once a new tuple arrives, it is inserted in the inter-
mediate representation where summaries are built on
top of the stream tuples to capture the behavior of
the stream. Sets of promising tuples are extracted
from the stream summaries based on the preferences
of standing queries. An entry in the promising tuple
set is on the form
(promising tuple, promising region, priority)
Each promising tuple points to the stream regions
where it can be found with a specific priority to indi-
cate how much a region is rich in this promising tuple.
The framework of query processing using the notion
of promising tuples is a three-step process and can be
summarized as follows:
1. Identify and extract from summaries the query’s
promising regions.
2. Prioritize the query’s promising regions
3. Execute query on promising regions in the order
of their priorities.
Notice that the promising tuple approach results in an
out-of-order query answer because stream regions are
processed based on their promise regardless of their
arrival order.
The stream intermediate summaries are capable of
indexing a stream’s promising regions and are built in
CREATE SUMMARY summary-name ON stream (attr)
WITH GRANULARITY T1 AND COLLAPSE(rate,
depth)
(a)
SELECT attr1, attr2, · · ·
FROM stream1, stream2, · · ·
WHERE conditions
WITH PREFERENCE [asc|desc|weighted] Pq
FADE FACTOR
WINDOW w
(b)
Figure 3: Extended SQL statements
response to a create-summary statement as shown in
Figure 2a. The create-summary statement instructs
the summary manager to divide the stream into time
granularities and to summarize each granularity. High-
resolution summaries are built over the most recent
granularity that is of length T1 time units. The col-
lapse clause specifies two parameters: the rate and the
depth. The quality of the summaries decreases or col-
lapses as we go to older granularity according to the
rate parameter. The depth parameter avoids the in-
definite growth of the summaries and specifies how far
the summaries are maintained over the past.
Sets of promising tuple are extracted from sum-
maries in response to an SQL continuous query over
a sliding window of size w with a preference clause as
shown in Figure 2b. The preference clause specifies
how sets of promising tuples are extracted and prior-
itized from the stream summaries. The priority of a
promising tuple reduces or fades with a factor of
as we move in the past. Notice that the stream sum-
maries are stream-dependent structures and are con-
structed regardless of the standing queries while the
promising tuple sets are query dependent and are con-
structed based on a query preference (Pq).
Promising tuples focus on the CPU processing cy-
cles as a primary resource. The interarrival time be-
tween two consecutive tuples is our processing budget
where useful computations can take place. To quan-
tify how many output tuples can be generated during
one interarrival time period, we introduce the follow-
ing measure of performance:
No/p =
− tins
thit
(1)
where No/p is the number of output tuples per input
tuple, is the average interarrival time between two
consecutive tuples, tins is the time required to insert
the incoming tuple into the intermediate stream repre-
sentation, and thit is the average time required by the
intermediate representation to guide the search to one
output tuple. Basically, we invest a little portion of
the budget interarrival time ( ) in data summarization
(tins) to reduce the output tuple hit time (thit). Smart
guidance to the stream promising regions reduces ef-
fectively the output hit time, which in turn increases
the number of generated output tuples (No/p).
3 Stream Intermediate Representation
Given the create-summary statement of Figure 2a, it
is required to build an intermediate stream represen-
tation that indexes the incoming stream tuples. The
intermediate stream representation keeps track of in-
coming tuples plus some summaries to direct stream
operations to their promising regions. We propose an
intermediate stream representation that has the fol-
lowing two properties:
1. The support of multiple time granularities, G1,
G2, ...., Gm (Figure 4a) such that:
(a) The number of granularities m corresponds
to the depth of the collapse clause in the
create-summary statement (i.e., m = depth).
(b) Granularities span regions in the stream’s
timeline with sizes that are multiples of each
other (i.e., |Gi+1| = rate × |Gi|, |G1| = T1),
where rate and T1 are specified in the create-
summary statement.
2. The capability to index each time granularity us-
ing a multi-resolution index structure as illus-
trated in Figure 4b (Section 3.4).
Multiple time granularities are desirable to make up
for the bounded memory requirements. Notice that
the summary size is fixed in each granularity despite
the increase in the size of the region that is covered
by that granularity. As data gets older, it moves from
one granularity to the next and a coarser summariza-
tion is applied to the data due to the increase in the
granularity size. This behavior results in a gradual
degradation in the resolution of summaries as we go
farther in the past. The collapse clause in the create-
summary statement specifies the rate at which sum-
maries collapse and how deep summaries can go in the
past. Summaries in each granularity are indexed us-
ing a multi-resolution index structure in order to allow
coarser resolutions to guide the search down the hier-
archy to finer resolutions.
3.1 Two-dimensional Representation of Data
Streams
Each granularity in the stream is divided into smaller
partitions in order to build more accurate summaries
over these partitions. There are two ways to partition
a stream:
1. Partition the stream granularity into value-based
buckets by hashing on the values of incoming tu-
ples.
􀀀
Time
X1
X2
Xn
Item value
Time
Item value
Time
(b)
(a)
(c) (d)
Figure 4: Stream intermediate representation
2. Partition the stream granularity into time-based
zones. Each zone corresponds to a contiguous
time zone of the stream.
In the first approach, buckets that are based on the
tuple values span the timeline of the stream. Conse-
quently, summaries may suffer inaccuracies due to the
change in the stream’s behavior over time. In the sec-
ond approach, a time zone contains all tuple values
in this portion of the stream. Summaries may suffer
inaccuracies due to mixing the summaries of a wide
range of values in the same zone.
Instead of using a one-dimensional space, we pro-
pose to hash the stream tuples over two dimensions:
(a) the value dimension and (b) the time dimension
(Figure 4c). The two-dimensional representation of
one granularity can be summarized as follows:
1. The value dimension is divided into buckets based
on a suitable hash function over the tuple values.
2. The time dimension is divided into time zones
based on the stream behavior. A change in the
stream’s behavior implies a new time zone (Sec-
tion 3.3).
3. A granularity cell is the intersection of a value
bucket with a time zone (Figure 4d).
4. For each granularity cell:
(a) Tuple values are materialized and are linked
in the order of their arrival time.
(b) Summaries are build to provide a rough but
quick estimation of the tuples in that cell
(Section 3.2).
(c) A multi-resolution index is built to speed up
the access to summaries (Section 3.4).
(d) Sets of promising tuples are extracted based
on query preferences (Section 4.1).
3.2 Summary Information
In each granularity cell, we maintain summaries to
capture the behavior of the stream tuples in this cell.
A summary abstract data type (summary-ADT) is
added to the system to encapsulate the functionali-
ties of summarization techniques. Various types of
summaries have been studied in the literature, e.g.,
histograms [21], wavelets [10], samples [7], and aggre-
gates [34]. In our work, the summarization technique
is considered to be a black box as long as it provides
the following set of interface functions:
1. Summarize(x), to insert a tuple x and to add the
effect of its value to the summaries.
2. Estimate(x), to estimate various properties of a
stream tuple x from the summaries. These esti-
mations are used in the context of the query pref-
erence function to specify how promising tuples
are extracted.
3. Confidence(), to report how much confident we
are about the stream summaries. This function is
used as a measure of accuracy.
4. Add(r1,r2), to merge the summaries of regions r1
and r2 into one summary structure (r1). This
function is used to build coarser summaries upon
merging the two regions.
5. Subtract(r1,r2), to subtract the summaries of re-
gion r2 from the summaries of region r1. This
function is used to remove the effect of the sum-
maries of region r2 once it expires, i.e., goes out-
side the time-window of interest.
For implementation purposes, we adopt the counts-
ketch technique as presented in [12]. Countsketches
provide an estimation of both the absolute and the rel-
ative frequencies of the stream tuples. Countsketches
are efficient with respect to the update and the search
operations. The Confidence function takes the vari-
ance of the items in the countsketch as a measure of ac-
curacy. Another interesting feature of countsketches is
that they are additive. Two countsketches can be com-
bined together in linear time simply by adding the two
sketches. Similarly, the effect of one countsketch can
be subtracted from another countsketch in linear time.
Other forms of summarizations, e.g., histograms, can
be addressed to define the same interface functions.
3.3 Self-adjusting Time Zones
As mentioned in Section 3.1, the stream is partitioned
over the infinite time dimension into time zones. Each
zone can have a fixed predefined size. However, fixing
the size of each zone may result in zones containing
various behaviors of the stream which in turn degrades
the accuracy of the summaries.
To improve the accuracy of the summaries, we par-
tition the timeline into zones based on the change
in the stream’s behavior. We sense a change in the
stream’s behavior through the Confidence function of
the summarization technique. Once a tuple arrives,
it is inserted into the summaries of the current zone.
Then, the Confidence function is evaluated. If the con-
fidence is below a certain threshold ( ) or the length
of a time zone exceeded MAXLENGTH, a new time
zone is started and its summaries are initialized. For a
more efficient implementation of detecting changes in
the stream’s behavior, the reader is referred to [26].
3.4 Multi-resolution Index
Our intermediate stream representation makes use of
a multi-resolution index to provide an efficient access
to stream tuples. Coarser resolutions of summaries are
formed by the aggregation of the fine-resolution sum-
maries. The top-level summarization maintains the
coarsest summaries that provide a rough estimation
of how much a tuple is promising. Then, we follow
the hierarchy of the multi-resolution summaries till we
reach the exact regions where the tuple resides.
For each stream granularity, we maintain a
pyramid-like structure [31] to index this granularity
(Figure 4b). The lowest-level cells of the pyramid
contain the stream tuples plus their summaries. The
lowest-level cells (LLC) are on the form:
LLC (tuple-list[ · · · ], Summary-info).
As we go up in the pyramid, the summary information
are aggregated using the add function and pointers to
their original cells are maintained. Upper level cells
(ULC) of the pyramid are on the form:
ULC (Aggregate-Summary-Info, cell-ptrs[ · · · ])
3.5 Multiple Granularities
As mentioned at the beginning of this section, we have
m time granularities: G1, ..., Gm with G1 being the
most recent granularity and Gm being the oldest gran-
ularity (Figure 4a). Each granularity (except G1) is
divided over the time dimension into n time zones. G1
is divided into n′ time zones based on the stream’s be-
havior as explained in Section 3.3. G1 spans the most
recent portion of the stream that is of length T1. T1
is a parameter that is specified by the create-summary
statement (Figure 2a). This parameter defines the ba-
sic granularity of the stream on which other granular-
ities will be based. Granularity Gi spans a portion of
the stream that is of length Ti such that:
Ti = n × Ti−1, i = 2, ...,m.
Notice that old granularities span larger portions
of the stream than recent granularities. As time pro-
ceeds, all the cells of Gi−1 are combined and their sum-
maries are compressed into one cell of Gi. Figure 5
illustrates the movement of data between two granu-
larities of the stream. Let Gi be granularity number i
􀀀






(a)
Procedure COLLAPSE(CZi+1, CZi)
1. CZi = CZi + 1 mod n
2. Add(CZi+1.summary-info, CZi.summary-info)
3. Concatenate(CZi+1.tuple-list, CZi.tuple-list)
4. CZi.Summary-Info=NULL
5. CZi.tuple-list=NULL
6. update the multi-resolution indices over Gi and Gi+1
END.
(b)
Figure 5: Data movement between granularities
of length Ti and let CZi be the current zone in granu-
larity Gi that receives the incoming tuples. All CZi’s
are initially set to 1 and are advanced in a round robin
fashion (CZi = CZi+1 mod n, where n is the number
of time zones in a granularity). CZi, 8i = 2, ...,m,
advances to the next time zone once the current time
zones elapses, i.e., every Ti
n units of time. CZ1 ad-
vances to the next time zone once a change in the
stream’s behavior is detected.
Figure 5b lists the steps required to move CZi to
the next time zone (CZi + 1 mod n). In Step 1, we
advance CZi (where i 2) to the next time zone.
Notice that CZ1 advances based on a change in the
stream behavior and returns to the first time zone in
the granularity if the granularity size becomes T1. This
is not mentioned in the figure for simplicity. In Steps
2 and 3, the summaries and data of CZi are pushed
to the current time zone of granularity Gi+1 (CZi+1).
We free CZi from its summaries and data in Steps
4 and 5. Finally, the multi-resolution index of Gi+1
is updated to reflect the added summaries, which are
subtracted from higher level resolutions of granularity
Gi (Step 6).
4 Data Stream Operations
Promising tuples can be applied widely to various
stream operations. In this section, we explain how
stream operations can benefit from the promising tu-
ple approach. We explore the join, selection, and ag-
gregation operations as example operations.
SELECT S1.ItemNo, S1.TimeStamp, S2.TimeStamp
FROM Store1 S1, Store2 S2
WHERE S1.ItemNo= S2.ItemNo
WITH PREFERENCE [desc|asc|weighted]
SUMMARY.Estimate.Count(S1.ItemNo),
SUMMARY.Estimate.Count(S2.ItemNo)
FADE FACTOR
WINDOW w
Figure 6: An example window join query
4.1 Window Join
Given two streams and a window specification, Fig-
ure 6 gives the SQL query that performs a window join
over the two streams. The two streams are generated
from the online transactions of two retail stores. Each
stream tuple consists of an item number and its asso-
ciated timestamp. The preference function is based on
the summary-estimated counts of the item number in
both streams. One out of three preference specifiers
(i.e., desc, asc, or weighted) is used to tune the prefer-
ence function (Section 4.1.1). The fade factor reduces
of the priority of the time granularities by a factor of
as we move in the past. The window clause specifies
the size of the sliding window of interest. If no window
clause is specified, the window is assumed to span the
whole stream that is seen so far. The details of the
query preferences and the join algorithm are given in
the next sections.
4.1.1 Query Preferences
The preference clause in the query syntax bridges the
gap between the general-purpose stream summaries
and the query-specific promising tuples. Consider the
following three preference specifiers and how they con-
trol the way the priorities are handled in the SQL
query of Figure 6:
1. Desc. The promising tuples are extracted and
processed based on the descending order of the tu-
ples’ Count estimates to give frequent tuples more
priority. Focusing on frequent tuples maximizes
the number of output tuples.
2. Weighted. The CPU time is divided among the
promising tuples based on the ratio of their Count
estimates. This approach avoids the case where
the CPU gets monopolized by high frequency tu-
ples and produces a near-optimal random sample.
3. Asc. The promising tuples are extracted and pro-
cessed based on the ascending order of the tuples’
Count estimates to give the least frequent tuples
more priority, which leads to producing outliers
and deviant tuples. Our notion of outliers consid-
ers a tuple to be an outlier if it occurs with a low
frequency, e.g., a sensor triggering a fire alarm
or the purchase of a rare item. Similarly, other
notions of outliers can be extracted from various
preference functions.
The query processor may consider time in deter-
mining the promise of a tuple, i.e., by favoring output
tuples with more recent timestamps. The fade fac-
tor clause in the query syntax instructs the query
processor to decrease the priority of a promising tuple
based on its time granularity. If a promising tuple is
extracted from granularity i, its priority is decreased
by a factor of i−1.
4.1.2 Window Join Algorithm
Our window join algorithm is a variation of the sym-
metric hash join algorithm [33] that is tuned to bene-
fit from the promising tuple approach. When a tuple
arrives at the system, (1) it is inserted into the sum-
maries of its stream, and (2) it probes the summaries
of the other stream looking for join matches. This
process is symmetric with respect to each stream.
Figure 7 summarizes our join algorithm consider-
ing the join probes of only one stream (i.e., tuples of
stream S1 probes stream S2). In Step 1, the tuple
is inserted into its stream summaries. In step 2, a
pointer is placed at the first granularity of the other
stream. If the granularity intersects with the join win-
dow (Step 3), we investigate all its time zones (Step
3.a). We estimate the priorities of x1 in these zones
and we insert (x1, the time zone, the priority) into a
priority queue. We weight the priority by how old the
granularity is (i.e., by a factor of i−1). Notice that
the multi-resolution index on top of each granularity
prunes many time zones where the tuple is less likely
to occur. This step is not shown in the figure for sim-
plicity. We advance the pointer to the next granularity
(Step 3.b). In step 4, the priority queue (PQ) is tra-
versed and the actual join probes take place until a
time out occurs. The query processor times out the
joining process once another tuple arrives at the sys-
tem. The query processor revisits the priority queue
again once it becomes idle. The query processor it-
erates over standing queries in a round robin fashion.
The priority queue is traversed in ascending, descend-
ing, or weighted order based on the preference specifier
of the query (i.e., asc, desc, or weighted). As a draw-
back of this traversal, the output tuples are generated
in no specific order of their timestamps.
4.2 Selection
Consider the simplest case of restricting the selection
problem to one selection predicate that is applied over
one stream. If this is all what we have in the sys-
tem, then the optimal solution is to apply the selection
predicate over each individual tuple once it arrives at
the system. However, if we have hundreds of queries
such that each query has a different selection predicate,
each query will lose some of the input tuples due to the
limited CPU time that is allocated to each query. The
major problem with the above approach is the random
Procedure JOIN-PROBE
Input. given two streams S1 and S2, a time-window w, a
tuple x1 ∈ S1.
Output. The join pairs on the form (x1,x2) S.T. x2 ∈ S2
and |x1.ts-x2.ts| ≤ w
1. S1.Summarize(x1)
2. i=1
3. while(S2.Gi T ( − w + 1, ) 6= )
(a) for j=1 to n
i. Prio= j−1 × S2.Gi[j].Estimate(x1)
ii. PQ.Insert(x1, S2.Gi[n], Prio)
(b) i = i + 1
4. while (NOT TimeOut()) PQ.Probe()
END.
Figure 7: Window Join Algorithm.
Procedure SELECTION
1. tstart=−∞
2. while (TRUE)
(a) tend=now
(b) Process the selection predicate on top of sum-
maries over the region ( tstart, tend).
(c) Prioritize regions based on their promise.
(d) Process the selection predicate over the identi-
fied promising regions.
(e) tstart=tend
END.
Figure 8: The selection operation
dropping of tuples. There is no control over which tu-
ples a query will lose. Queries may be losing the most
interesting tuples that satisfy their predicates.
The proposed summary-guided selection allocates
some portion of its budget time to summarize the in-
coming stream tuples. Then, each query is guided by
the summaries to its promising regions that are rich
in terms of tuples that satisfy its search predicate.
Following the same framework of the promising tuple
approach, the system iterates over standing selection
queries as described in Figure 8. To avoid reporting
duplicate tuples in the answer, the system remembers
the last tuple that is processed by each query. The
next time a query is dispatched to the query proces-
sor, the query processor resumes the selection process
from where it stopped last time. Tstart and tend mark
the selection boundaries of the current iteration.
Notice that the selection operation need not have
a window clause. In this case, the selection operation
is a filter over the stream tuples. However, if a win-
dow clause with a window of size w is specified, the
selected tuples are buffered for w time units. A tuple
is reported to be expired and is invalidated from the
query answer if its timestamp ts gets older than −w,
where is the current timestamp.
4.3 Aggregates
The behavior of an aggregate is usually dominated
by a smaller subset of tuples, i.e., the promising
tuples in our notion. The challenge is to specify
which set of tuples dominate the behavior of the
aggregate. For example, the behavior of the av-
erage and the median aggregates seems to be pre-
served under a faithful representative subset of tu-
ples. The preference of an average or a median query
may look like: (WITH PREFERENCE weighted SUM-
MARY.Estimate.Count(S1.ItemNo)).
The summation aggregate introduces a new
notion for promising tuples. It assumes that
the behavior of the sum is dominated by the
tuples that have a high value × frequency
product. The preference of a summation query
may look like: (WITH PREFERENCE desc
SUMMARY.Estimate.Count(S1.ItemNo)×S1.ItemNo).
Similarly, other aggregates can define their own no-
tions of promising tuples. In response to the
aggregate queries, the query processor will extract
sets of promising tuples. Then, the regions that are
associated with these sets of promising tuples are
visited and aggregated.
5 Experiments
In this section, we provide an experimental evidence
that the promising tuple approach enhances the per-
formance of stream query processing engines. Four
sets of experiments are conducted. In Section 5.1, we
compare the performance of various forms of interme-
diate stream summaries. Then, we devote a section
for each stream operation. The performance of the
join, selection and aggregate operations is addressed
in Sections 5.2, 5.3, and 5.4, respectively.
Unless mentioned otherwise, data streams are gen-
erated using logs of online retail transactions on the
form (StoreID, TransactionID, TimeStamp, ItemID,
Price). Retail transactions are extracted from three-
month logs of WalMart retail stores. We playback the
data stream logs such that the interarrival time be-
tween two consecutive tuples follows the exponential
distribution with an average of 0.1 second. Stream op-
erations are interested in a one-minute sliding window
over the most recent portion of the stream. All the ex-
periments in this section are based on a real implemen-
tation of the promising tuple approach inside the Nile
stream query processing engine [24]. The Nile engine
executes on a machine with Intel Pentium IV, CPU
2.4GHZ with 512MB RAM running Windows XP.
5.1 Performance of intermediate stream sum-
maries
In this section, we compare the performance of four
intermediate stream summaries: (1) T1, the naive
approach where no summaries are maintained. (2)
3.5
4
4.5
5
5.5
6
6.5
7
7.5
T1 T2 T3 T4
tuple insertion time (usec)
0
10
20
30
40
50
60
70
80
T1 T2 T3 T4
tuple hit time (usec)
0
2
4
6
8
10
12
T1 T2 T3 T4
Number of o/p tuples per i/p tuple
(a) (b) (c)
Figure 9: Cost parameters for various structures
T2, where a one-dimensional summary structure is
built over the time dimension. (3) T3, where a
two-dimensional summary structure is built over the
both the time and the value dimensions. (4) T4,
where a two-dimensional summary structure with self-
adjusting time zones (as proposed in Section 3.3) is
utilized.
Our major measure of performance is the number of
generated output tuples per input tuple as mentioned
in Section 2. Equation 1 gives the number of output
tuples per input tuple in terms of the tuple insertion
time (tins) and the tuple hit time (thit). In Figure 9,
we perform a join operation between two streams of
retail transactions and we monitor both the tins (Fig-
ure 9a) and the thit (Figure 9b) parameters along with
the number of output tuples per an input tuples (Fig-
ure 9c). Notice that as techniques get more sophisti-
cated, the insertion cost increases. However, the out-
put tuple hit time improves and dominates the overall
performance by increasing the total number of output
tuples.
In some cases, it is cheaper and more efficient to
operate on the raw stream and to avoid the summa-
rization overhead. For very high rate streams, the sys-
tem may end up losing all its processing cycles in data
summarization. In Figure 10a, we conduct an exper-
iment to measure the performance of the join opera-
tion between two streams with small average interar-
rival times (< 50msec). Less sophisticated techniques
with small insertion cost become more efficient than
the techniques with high insertion cost. Similarly, if
we are interested in small window sizes, it may be
cheaper to traverse the window tuple by tuple. Figure
10b evaluates the performance of the join operation
under a time window that varies from 10 seconds till
2 minutes with 20 second increments. Notice that the
performance gains are obtained from summary-based
structures as the window gets larger.
5.2 Performance of Window Join
In this section, we study the performance of the join
operation under various notions of promising tuples.
The average join selectivity between the two data
streams (that are drawn from real data sets) is found
to be 6.7%. We compare the performance of the
promising tuple approach to the symmetric hash join
(SIMPLE-SHJ) that is presented in [33]. Consider the
0
1
2
3
4
5
6
10 15 20 25 30 35 40 45 50
Number of output tuples per input tuple
avg interarrival time (msec)
T1
T2
T3
T4
(a)
0
2
4
6
8
10
12
14
16
20 40 60 80 100 120
Number of output tuples per input tuple
window size (sec)
T1
T2
T3
T4
(b)
Figure 10: Effect of small interarrival time and small
window size
following notions of promising tuples:
1. Maximizing the number of output tuples
Figure 11 evaluates the number of output tuples that
are generated by both the simple symmetric hash
join (SIMPLE-SHJ) and the promising-tuple join (PT-
Join) over a one-minute sliding window. Our measure
of performance is the percentage of the exact output
that is obtained at run time. The figure shows that the
promising tuple approach utilizes its processing time
effectively and focuses on the tuples that maximize the
number of output tuples.
Figure 12 evaluates the performance of the join op-
eration under a whole-stream window. In addition to
the number of output tuples (Figure 12a), we monitor
the freshness of the data as another measure of good-
ness. (Figure 12b) assesses the average difference in
timestamps between the two join components as our
measure of freshness (|ts1 − ts2|). This measure in-
dicates how far a tuple goes in the past to look for a
join match. From the figure, notice that the promising
tuple approach favors recent and fresh tuples over old
ones. This behavior is achieved by reducing the prior-
ity of old granularities by a factor of ( is set to be
80% in this experiment). SIMPLE-SHJ has no way to
know in advance whether an incoming tuple will join
with a recent tuple or with an old one.
2. Providing a faithful representation of the
0
10
20
30
40
50
60
70
80
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
Figure 11: Sliding window join
output
In Figure 13, we tune the promising tuples to pro-
vide a faithful representative set of the output. We
take the Hist-MSE or the mean square error between
the histograms of both the exact and the obtained an-
swers as our measure of faithfulness. Let H1 be the
histogram of the exact answer and let H2 be the his-
togram of the obtained answer. Each histogram is an
equi-width histogram of k intervals (k is set to 100).
H1 is divided into H11,H12, · · · ,H1k and H2 is divided
into H21,H22, · · · ,H2k. Let N1 be the size of the exact
answer and let N2 be the size of the obtained answer
(N1 N2). The Hist-MSE is defined as follows:
Hist −MSE =
k
X
i=1
MSE(
h1i
N1
,
h2i
N2
) (2)
Figure 13a shows that PT-Join still provides more
output than SIMPLE SHJ while Figure 13b illustrates
the superiority of PT-Join with respect to the Hist-
MSE measure of performance.
3. Detecting low-frequency tuples and out-
liers
In Figure 13, we tune the promising tuples to detect
low-frequency tuples. This notion of promising tuples
has two desired features: (1) Less number of tuples
are produced (Figure 14a). (2) Most of the produced
tuples are true outliers (Figure 14b). In other words,
the algorithm detects outliers and only outliers with-
out being distracted by other tuples. In this exper-
iment, a tuple is an outlier if its frequency is in the
least 5% percentile of tuple frequencies.
5.3 Performance of the Selection Operation
In this section, we show the applicability of the promis-
ing tuple approach to the selection operation. Fig-
ure 15a shows hundred queries, each performs a dif-
ferent selection predicate over the same stream. The
selectivity of each predicate ranges uniformly from 5%
to 10%. The experiment is conducted using both
the naive selection approach (SIMPLE-Selection) that
feeds the stream to each query separately, and the
0
2
4
6
8
10
12
14
16
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(a)
0
20
40
60
80
100
120
140
160
180
10 20 30 40 50 60 70 80 90 100
|ts1-ts2| (sec)
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(b)
Figure 12: Whole-stream window join
promising tuple approach which makes use of sum-
maries. The promising tuple approach aims at maxi-
mizing the number of output tuples. The percentage
of retrieved tuples relative to the size of the exact re-
sult is calculated for each query and the average over
all queries is taken as our measure of performance.
The figure shows that the promising tuple approach
produces up to 33% output tuples over the SIMPLE
selection (at 100 msec average interarrival).
In Figure 15b, the number of concurrent queries
is varied from 20 to 200 to explore the performance
of the selection operation under various query loads.
The SIMPLE-Selection is efficient at low system loads.
However, the promising tuple approach outperforms
the SIMPLE-Selection with the increase in the query
load (more than 80 concurrent queries).
5.4 Performance of Aggregates
The performance of aggregates is tested on top of
the hundred selection queries that are experimented
in Section 5.3. Aggregates are computed using both
the traditional approach that executes each query sep-
arately over each stream and the promising-tuple ap-
proach. The mean square error (MSE) between the
computed aggregate and the exact aggregate is used
as our measure of performance.
For the sake of illustration, we show the perfor-
0
5
10
15
20
25
30
35
40
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(a)
0
5
10
15
20
25
30
35
40
10 20 30 40 50 60 70 80 90 100
Histogram MSE
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(b)
Figure 13: Output faithful representation
mance of two example aggregates under various stream
rates. The performance of the average and the sum op-
erations is illustrated in Figure 16 a, b, respectively.
The figure shows that the promising tuple approach
exhibits a lower mean square error (MSE) between its
answer and the exact answer for both the average and
the sum operations. Notice that as we increase the
average interarrival time between consecutive tuples,
the MSE decreases because the system gets less loaded
and more input tuples are processed by the system.
6 Related Work
In this section, we overview related work in the context
of data stream processing through three major lines.
First, we list major data stream systems that have
been developed by various research groups. Second,
we survey summarization techniques that are deployed
widely to enhance query performance. Third, we give
examples of stream operations that are explored in lit-
erature.
Prototype data stream systems have recently gained
a lot of research interest. Stanford STREAM [8, 28]
addresses the problem of resource management in the
context of data stream processing. Quality of ser-
vice (QoS) has been investigated in the AURORA
project [1]. The Niagra project [14] adopts the notion
of continuous queries and how group optimizations can
0
2
4
6
8
10
12
14
16
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(a)
0
5
10
15
20
25
30
35
40
45
50
55
10 20 30 40 50 60 70 80 90 100
% of outliers in output
avg interarrival time (msec)
SIMPLE-SHJ
PT-Join
(b)
Figure 14: Outlier detection
be performed over these queries. Telegraph [11] has
an adaptive query engine where the execution order
of query operators can change during execution. A
framework for query execution plans over data streams
is suggested by the Fjord project [27]. COUGAR [9]
introduces a new data type for sensors. Gigascope [15]
is another data stream system developed at AT&T to
process streams of network traffic.
With the limited CPU time and bounded memory
constraints [5], approximate answers that are obtained
via summaries are accepted in place of the exact ones.
Sampling [7], histograms [21], and wavelets [10, 32] can
represent a stream using lower memory requirements.
Streams are also summarized using sketches [4]. Some
of the sketching techniques have the capability to
maintain a stream’s most frequent items [12]. Sketch-
based processing and sketch sharing among multiple
queries are presented in [19]. The concept of multi-
ple granularities over data streams is proposed in [35].
The work in [29] suggests a gradual degradation of
the summary resolution as data tuples move from one
granularity to the next. This gradual degradation is
referred to by the term amnesic stream approximation.
Statistical models are utilized in [18] to query sensors
interactively. Based on outstanding queries, statistical
models provide an estimate for a sensor reading and
tell how much this estimate is accurate. Consequently,
25
30
35
40
45
50
55
60
65
70
75
10 20 30 40 50 60 70 80 90 100
% of output tuples
avg interarrival time (msec)
SIMPLE Selection
PT-Selection
(a)
40
50
60
70
80
90
100
20 40 60 80 100 120 140 160 180 200
% of output tuples
Number of concurrent queries
SIMPLE Selection
PT-Selection
(b)
Figure 15: Performance of the selection operation
one may decide to query the sensor for a fresh reading.
From the query side, various stream operators have
been studied in the literature. For example, the win-
dow join operation is addressed in [16, 20, 22, 23, 25,
30]. Maintaining statistics over sliding windows is in-
vestigated in [17]. Incremental maintenance of tempo-
ral aggregates over windows is presented in [34]. There
has been some ongoing research on random sampling
for some special operations, e.g., the join operation
[3, 13] and the group-by operation [2]. The work in [6]
presents some optimizations for conjunctive queries
over sliding windows.
7 Conclusions
In this paper, we introduced the notion of promising
tuples. Promising tuples are those tuples that con-
tribute heavily to satisfying a specific preference in
the query answer. Example query preferences include:
(1) the maximization of the number of output tuples,
(2) producing a faithful representative sample of the
output tuples, or (3) producing the outlier or deviant
tuples in the result. Other notions of promising tuples
can be flexibly defined to satisfy the query require-
ments. The promising tuples may also consider the
freshness of the stream tuples. We proposed stream
intermediate summaries that are capable of identify-
ing the promising tuples of a stream. The intermediate
2
4
6
8
10
12
14
16
18
20
22
10 20 30 40 50 60 70 80 90 100
MSE
avg interarrival time (msec)
SIMPLE Average
PT-Average
(a) the AVG operation
150
200
250
300
350
400
10 20 30 40 50 60 70 80 90 100
MSE
avg interarrival time (msec)
SIMPLE Sum
PT-Sum
(b) the SUM operation
Figure 16: Performance of aggregates
summaries are organized in a multi-resolution index
that slides in steps over the stream. We proved the
applicability of the promising tuple approach in the
context of various stream operations, i.e., join, selec-
tion, and aggregate operations. Experimental results
show that the notion of promising tuples increases the
effectiveness and resource utilization of a stream query
processing engine. Experiments are based on a real
implementation of the proposed summaries as part of
the summary manager inside the Nile stream query
processing engine
References
[1] D. Abadi, D. Carney, U. Cetintemel, M. Cherniack,
C. Convey, S. Lee, M. Stonebraker, N. Tatbul, and
S. Zdonik. Aurora: A new model and architecture for
data stream management. VLDB Journal, 2:120–139,
August 2003.
[2] S. Acharya, P. B. Gibbons, and V. Poosala. Congres-
sional samples for approximate answering of group-by
queries. In Proceedings of ACM SIGMOD, pages 487–
498, May 2000.
[3] S. Acharya, P. B. Gibbons, V. Poosala, and S. Ra-
maswamy. Join synopses for approximate query an-
swering. In Proceedings of ACM SIGMOD, pages 275–
286, June 1999.
[4] N. Alon, Y. Matias, and M. Szegedy. The space com-
plexity of approximating the frequency moments. In
Proceedings of the annual ACM symp. on Theory of
computing, pages 20–29, May 1996.
[5] A. Arasu, B. Babcock, S. Babu, J. McAlister, and
J. Widom. Characterizing memory requirements
for queries over continuous data streams. TODS,
29(1):162–194, March 2004.
[6] A. Ayad and J. F. Naughton. Static optimization of
conjunctive queries with sliding windows over infinite
streams. In Proceedings of ACM SIGMOD, pages 419–
430, June 2004.
[7] B. Babcoc, M. Datar, and R. Motwani. Sampling from
a moving window over streaming data. In Proceedings
of the Annual ACM-SIAM Symp. on Discrete Algorithms
, pages 633–634, Jan. 2002.
[8] B. Babcock, S. Babu, M. Datar, R. Motwani, and
J. Widom. Models and issues in data stream systems.
In Proceedings of PODS, pages 1–16, June 2002.
[9] P. Bonnet, J. E. Gehrke, and P. Seshadri. Towards
sensor database systems. In Proceedings of the Intl.
Conf. on Mobile Data Management, pages 3–14, Jan.
2001.
[10] K. Chakrabarti, M. N. Garofalakis, R. Rastogi,
and K. Shim. Approximate query processing using
wavelets. In Proceedings of VLDB, pages 111–122,
Sept. 2000.
[11] S. Chandrasekaran, O. Cooper, A. Deshpande, and
et al. Telegraphcq: Continuous dataflow processing
for an uncertain world. In Proceedings of CIDR, Jan.
2003.
[12] M. Charikar, K. Chen, and M. Farach-Colton. Find-
ing frequent items in data streams. In Proceedings
of the Intl. Colloquium on Automata, Languages and
Programming, pages 693–703, July 2002.
[13] S. Chaudhuri, R. Motwani, and V. Narasayya. On
random sampling over joins. In Proceedings of ACM
SIGMOD, pages 263–274, June 1999.
[14] J. Chen, D. J. DeWitt, F. Tian, and Y. Wang. Ni-
agracq: A scalable continuous query system for in-
ternet databases. In Proceedings of ACM SIGMOD,
pages 379–390, May 2000.
[15] C. D. Cranor, T. Johnson, O. Spatscheck, and et al.
Gigascope: A stream database for network applica-
tions. In Proceedings of ACM SIGMOD, pages 647–
651, June 2003.
[16] A. Das, J. Gehrke, and M. Riedewald. Semantic ap-
proximation of data stream joins. IEEE Trans. Knowl.
Data Eng., 17(1):44–59, 2005.
[17] M. Datar, A. Gionis, P. Indyk, and et al. Maintain-
ing stream statistics over sliding windows. In of the
Thirteenth Annual ACM-SIAM Symp. on Discrete Algorithms
, pages 635–644, Jan. 2002.
[18] A. Deshpande, C. Guestrin, S. Madden, J. M. Heller-
stein, and W. Hong. Model-driven data acquisition
in sensor networks. In Proceedings of VLDB, pages
588–599, August 2004.
[19] A. Dobra, M. N. Garofalakis, J. Gehrke, and R. Ras-
togi. Sketch-based multi-query processing over data
streams. In Proceedings of EDBT, pages 551–568,
March 2004.
[20] L. Golab and M. T. Ozsu. Processing sliding window
multi-joins in continuous queries over data streams.
In Proceedings of VLDB, pages 500–511, Sept. 2003.
[21] S. Guha, N. Koudas, and K. Shim. Data-streams and
histograms. In Proceedings of the annual ACM symp.
on Theory of computing, pages 471–475, July 2001.
[22] M. A. Hammad, W. G. Aref, and A. K. Elmagarmid.
Stream window join: Tracking moving objects in
sensor-network databases. In Proceedings of SSDBM,
pages 75–84, July 2003.
[23] M. A. Hammad, M. J. Franklin,W. G. Aref, and A. K.
Elmagarmid. Scheduling for shared window joins over
data streams. In Proceedings of VLDB, pages 297–308,
Sept. 2003.
[24] M. A. Hammad, M. F. Mokbel, M. H. Ali, W. G.
Aref, A. C. Catlin, A. K. Elmagarmid, M. Eltabakh,
M. G. Elfeky, T. Ghanem, R. G. andIhab F. Ilyas,
M. Marzouk, and X. Xiong. Nile: A query processing
engine for data streams. In Proceedings of ICDE, page
851, April 2004.
[25] J. Kang, J. F. Naughton, and S. D. Viglas. Evaluating
window joins over unbounded streams. In Proceedings
of ICDE, pages 341–352, March 2003.
[26] D. Kifer, S. Ben-David, and J. Gehrke. Detecting
change in data streams. In Proceedings of VLDB,
pages 180–191, 2004.
[27] S. Madden and M. Franklin. Fjording the stream: An
architecture for queries over streaming sensor data. In
Proceedings of ICDE, pages 555–566, Feb. 2002.
[28] R. Motwani, J. Widom, A. Arasu, B. Babcock,
S. Babu, M. Datar, G. Manku, C. Olston, J. Rosen-
stein, and R. Varma. Query processing, resource man-
agement, and approximation in a data stream man-
agement system. In Proceedings of CIDR, Jan. 2003.
[29] T. Palpanas, M. Vlachos, E. J. Keogh, D. Gunopulos,
and W. Truppel. Online amnesic approximation of
streaming time series. In Proceedings of ICDE, pages
338–349, April 2004.
[30] U. Srivastava and J. Widom. Memory-limited exe-
cution of windowed stream joins. In Proceedings of
VLDB, pages 324–335, 2004.
[31] S. Tanimoto and T. Pavlidis. A hierarchical data
structure for picture processing. Computer Graphics
and Image Processing, 4(2):104–119, June 1975.
[32] J. S. Vitter and M. Wang. Approximate computa-
tion of multidimensional aggregates of sparse data us-
ing wavelets. In Proceedings of ACM SIGMOD, pages
193–204, June 1999.
[33] A. N. Wilschut and E. M. G. Apers. Pipelining in
query execution. In Proceedings of the International
Conference on Databases, Parallel Architectures and
their Applications, 1991.
[34] J. Yang and J. Widom. Incremental computation and
maintenance of temporal aggregates. In Proceedings
of ICDE, pages 51–60, April 2001.
[35] D. Zhang, D. Gunopulos, V. J. Tsotras, and B. Seeger.
Temporal aggregation over data streams using multi-
ple granularities. In In Proceedings of EDBT, pages
646–663, March 2002.

No comments:

Post a Comment