Currently Being Moderated

Creating Windows from Event Streams (updated for SP08)

*** This "how to" post has been updated to reflect new features in SP08 that make it easier to create Windows from Streams **

 

There are many times when you want to create a window on a stream of events. In fact in most ESP projects you will create Windows and not just Streams. In fact, you might only have Windows, and not have any streams in some projects.

 

Now as you know, the 3 main characteristics that distinguish a Stream from a Window are:

 

  1. Streams are stateless
  2. Streams do not have a primary key
  3. Streams only supports inserts

 

(if you want more info on opcodes and how inserts are different from updates and deletes, see Understanding ESP OpCodes)

 

But it’s often the case that you want to create a window on a set of events arriving in a stream.  You can do this in ESP, but it’s not as simple as just saying CREATE WINDOW….FROM STREAM.  There are some things you need to do and some things you need to watch out for.

 

There are actually several ways of doing this; choose the one that seems simplest for your use case.

 

Option 1: use an Input Window instead of an Input Stream

 

The simplest is not to do it – i.e. rather than create a stream in the first place, take the events directly into an input window. If the incoming events don’t have a unique field that can be used as a primary key, use the new Autogenerate feature in ESP 5.1 to create a “synthetic” key.  And don’t forget to put a KEEP clause on the window so that it doesn’t grow infinitely large.  Here’s an example of an input window using the Autogenerate feature:

 

CREATE INPUT WINDOW A
  SCHEMA (  ID long,
   Column1 integer ,
   Column2 integer ,
   Column3 integer )
  PRIMARY KEY (ID)
  KEEP 100 ROWS
  AUTOGENERATE(ID) ;

 

Option 2:  create a Window on a Stream

 

Now there are often very good reasons to take the data in as an input stream, and feed that stream into a window – for example maybe the input stream needs to feed other streams as well as the window.  The only complexity really is that Windows have to be keyed, so every event from the Stream has to be added to window with a unique key. There are three ways of doing this to ensure that you end up with unique primary key values in the window – the compiler will insist on one or the other:

  1. You can create a new column that will contain a "synthetic" key by using the nextval() function
  2. If the stream already has a column that can be used as a key, you can use a GROUP BY clause
  3. If the raw input stream doesn't have a column that can be used as a key, you can add one by adding an AUTOGEN clause to the definition of the input stream

 

 

Use nextval() to create a synthetic key

 

Here you create a new field (implicit type is long) to use as the primary key and set the value using the nextval() function.   Here’s what it looks like:

 

CREATE INPUT STREAM A
  SCHEMA ( Column1 integer ,
           Column2 integer ,
           Column3 integer ) ;

CREATE LOCAL WINDOW B
  PRIMARY KEY (ID)
  KEEP 100 ROWS
  AS SELECT nextval ( ) ID, *

  FROM A;

 

 

Or, if your stream already has a column you want to use as the key:

 

Here’s a valid example that feeds a window directly from a stream, where the window includes a GROUP BY clause and the primary key is deduced from the GROUP BY. Note that in order to save every event in the window, each event needs a unique value in Column1 which will become the primary key.  Otherwise you really will be aggregating and creating on row in the window for each group of events with the same Column1 value.

 

CREATE INPUT STREAM A
SCHEMA (Column1 integer,
        Column2 integer,
        Column3 integer);

 

CREATE WINDOW B
PRIMARY KEY DEDUCED
KEEP 100 ROWS
AS SELECT 
  A.Column1,
  valueInserted(A.Column2),
  valueInserted(A.Column3)
FROM A GROUP BY A.Column1;

 

Now if you want to use this to do an actual aggregation, and use aggregate functions other than valueinserted(),  there’s an important caveat to this approach – a related pitfall if you aren’t careful.  The KEEP clause only manages the output – i.e. the contents of Window B. Window B will have one row for each group up to a maximum of 100 rows. BUT,  in order to compute the aggregate values, most aggregate functions will cause an “aggregation index” to be created that will hold all incoming events. The problem is that if an aggregation index gets created it could grow unbounded, consuming ever more memory. Not good.

 

Prior to SP08, to avoid accidentally creating an aggregation index with unbounded growth, you would need to limit yourself to the "safe" aggregation functions described below.  However, with SP08 this has become much simpler.  You can now a KEEP clause to the stream in the FROM clause and this limits the size of the aggregation index.  So we can change the above example to this:

 

CREATE WINDOW B

PRIMARY KEY DEDUCED

AS SELECT 

  A.Column1,

  last(A.Column2),

  sum(A.Column3)

FROM A KEEP 100 ROWS

GROUP BY A.Column1;

 

But in addition to valueInserted(), there are three other aggregation functions that  are additive and can safely be used in cases where you aren't using a KEEP clause to limit the size of the underlying aggregation index – i.e. they are computed incrementally without retaining all the underlying events. They are:


valueInserted()
count()
sum()
average()

 

So as long as you only use these and don’t use any other aggregate functions, you’ll be fine.

 

And if you have a case where, for some reason, you don't want to use a KEEP in the FROM clause (or you are using an older version, where the compiler won't let you), you can still compute other aggregate functions - just do it in 2 steps:

CREATE INPUT STREAM A
  SCHEMA ( Column1 integer ,
      Column2 integer ,
      Column3 integer ) ;

CREATE WINDOW B
  PRIMARY KEY DEDUCED
  KEEP 100 ROWS
  AS SELECT
   A.Column1,
   valueInserted(A.Column2) Column2,
   valueInserted(A.Column3) Column3
  FROM A GROUP BY A.Column1;
 
CREATE WINDOW C
  PRIMARY KEY DEDUCED
  AS SELECT
   B.Column2,
   max(B.Column3) Maxval
  FROM B GROUP BY B.Column2;

Comments

Delete Document

Are you sure you want to delete this document?