Currently Being Moderated

Creating Windows from Event Streams

*** Note for ESP users:  this "how to" post has been updated to reflect new features in SP08 that make it easier to create Windows from Streams **

 

There are many times when you want to create a window on a stream of events. In fact in most streaming projects you will create Windows and not just Streams. In fact, you might only have Windows, and not have any streams in some projects.

 

Now as you know, the 3 main characteristics that distinguish a Stream from a Window are:

 

  1. Streams are stateless
  2. Streams do not have a primary key
  3. Streams only supports inserts

 

(if you want more info on opcodes and how inserts are different from updates and deletes, see Understanding CCL OpCodes)

 

But it’s often the case that you want to create a window on a set of events arriving in a stream.  You can do this in CCL, but it’s not as simple as just saying CREATE WINDOW….FROM STREAM.  There are some things you need to do and some things you need to watch out for.

 

There are actually several ways of doing this; choose the one that seems simplest for your use case.

 

Option 1: aggregate over a stream using an unnamed window

 

The most common reason to create a window on a stream is to aggregate over the most recent events on the stream. Creating an aggregation window on a stream is very easy:  just create a window,  add a group by clause, and be sure to include a KEEP element in the FROM clause, like this:

 

CREATE WINDOW B

PRIMARY KEY DEDUCED

AS SELECT

  A.Column1,

  last(A.Column2),

  sum(A.Column3)

FROM A KEEP 100 ROWS

GROUP BY A.Column1;

 

 

Option 2: use an Input Window instead of an Input Stream

 

The simplest is not to do it – i.e. rather than create a stream in the first place, take the events directly into an input window. If the incoming events don’t have a unique field that can be used as a primary key, use the new Autogenerate feature in CCL to create a “synthetic” key.  And don’t forget to put a KEEP clause on the window so that it doesn’t grow infinitely large.  Here’s an example of an input window using the Autogenerate feature:

 

CREATE INPUT WINDOW A
  SCHEMA (  ID long,
   Column1 integer ,
   Column2 integer ,
   Column3 integer )
  PRIMARY KEY (ID)
  KEEP 100 ROWS
  AUTOGENERATE(ID) ;

 

Option 3:  other ways to create a Window on a Stream

 

 

The thing to keep in mind is that Windows have to be keyed, so every event from the Stream has to be added to window with a unique key. There are three ways of doing this to ensure that you end up with unique primary key values in the window – the compiler will insist on one or the other:

  1. You can create a new column that will contain a "synthetic" key by using the nextval() function
  2. If the stream already has a column that can be used as a key, you can use a GROUP BY clause
  3. If the raw input stream doesn't have a column that can be used as a key, you can add one by adding an AUTOGEN clause to the definition of the input stream

 

 

 

Use nextval() to create a synthetic key

 

Here you create a new field (implicit type is long) to use as the primary key and set the value using the nextval() function.   Here’s what it looks like:

 

CREATE INPUT STREAM A
  SCHEMA ( Column1 integer ,
           Column2 integer ,
           Column3 integer ) ;

CREATE LOCAL WINDOW B
  PRIMARY KEY (ID)
  KEEP 100 ROWS
  AS SELECT nextval ( ) ID, *

  FROM A;

 

 

Or, if your stream already has a column you want to use as the key:

 

Here’s a valid example that feeds a window directly from a stream, where the window includes a GROUP BY clause and the primary key is deduced from the GROUP BY. Note that in order to save every event in the window, each event needs a unique value in Column1 which will become the primary key.  Otherwise you really will be aggregating and creating on row in the window for each group of events with the same Column1 value.

 

CREATE INPUT STREAM A
SCHEMA (Column1 integer,
        Column2 integer,
        Column3 integer);

 

CREATE WINDOW B
PRIMARY KEY DEDUCED
KEEP 100 ROWS
AS SELECT 
  A.Column1,
  valueInserted(A.Column2),
  valueInserted(A.Column3)
FROM A GROUP BY A.Column1;

 

One word of caution with this example: if you use this to do an actual aggregation, and use aggregate functions other than valueinserted(),  there’s an important caveat to this approach – a related pitfall if you aren’t careful.  The KEEP clause only manages the output – i.e. the contents of Window B. Window B will have one row for each group up to a maximum of 100 rows. BUT,  in order to compute the aggregate values, most aggregate functions will cause an “aggregation index” to be created that will hold all incoming events. The problem is that if an aggregation index gets created it could grow unbounded, consuming ever more memory. Not good.

 

In addition to valueInserted(), there are three other aggregation functions that  are additive and can safely be used in cases where you aren't using a KEEP clause to limit the size of the underlying aggregation index – i.e. they are computed incrementally without retaining all the underlying events. They are:


valueInserted()
count()
sum()
average()

 

So as long as you only use these and don’t use any other aggregate functions, you’ll be fine.

 

And if you have a case where, for some reason, you don't want to use a KEEP in the FROM clause (or you are using an older version, where the compiler won't let you), you can still compute other aggregate functions - just do it in 2 steps:

CREATE INPUT STREAM A
  SCHEMA ( Column1 integer ,
      Column2 integer ,
      Column3 integer ) ;

CREATE WINDOW B
  PRIMARY KEY DEDUCED
  KEEP 100 ROWS
  AS SELECT
   A.Column1,
   valueInserted(A.Column2) Column2,
   valueInserted(A.Column3) Column3
  FROM A GROUP BY A.Column1;
 
CREATE WINDOW C
  PRIMARY KEY DEDUCED
  AS SELECT
   B.Column2,
   max(B.Column3) Maxval
  FROM B GROUP BY B.Column2;

Comments

Delete Document

Are you sure you want to delete this document?