Currently Being Moderated

Creating Windows from Event Streams

There are many times when you want to create a window on a stream of events. In fact in most ESP projects you will create Windows and not just Streams. In fact, you might only have Windows, and not have any streams in some projects.

 

Now as you know, the 3 main characteristics that distinguish a Stream from a Window are:

 

  1. Streams are stateless
  2. Streams do not have a primary key
  3. Streams only supports inserts

 

(if you want more info on opcodes and how inserts are different from updates and deletes, see Understanding ESP OpCodes)

 

But it’s often the case that you want to create a window on a set of events arriving in a stream.  You can do this in ESP, but it’s not as simple as just saying CREATE WINDOW….FROM STREAM.  There are some things you need to do and some things you need to watch out for.

 

There are actually several ways of doing this; choose the one that seems simplest for your use case.

 

Option 1: use an Input Window instead of an Input Stream

 

The simplest is not to do it – i.e. rather than create a stream in the first place, take the events directly into an input window. If the incoming events don’t have a unique field that can be used as a primary key, use the new Autogenerate feature in ESP 5.1 to create a “synthetic” key.  And don’t forget to put a KEEP clause on the window so that it doesn’t grow infinitely large.  Here’s an example of an input window using the Autogenerate feature:

 

CREATE INPUT WINDOW A
  SCHEMA (  ID long,
   Column1 integer ,
   Column2 integer ,
   Column3 integer )
  PRIMARY KEY (ID)
  KEEP 100 ROWS
  AUTOGENERATE(ID) ;

 

Option 2:  create a Window on a Stream

 

Now there may be a reason that you need to take the data in as an input stream, and feed that stream into a window – for example maybe the input stream needs to feed other streams as well as the window.  The only complexity really is that Windows have to be keyed, so every event from the Stream has to be added to window with a unique key. There are two ways of doing this to ensure that you end up with unique primary key values in the window – the compiler will insist on one or the other:

  1. The simplest is to create a new column that will contain a "synthetic" key by using the nextval() function
  2. If the stream already has a column that can be used as a key, you can use a GROUP BY clause.

 

 

Use nextval() to create a synthetic key

 

Here you create a new field (implicit type is long) to use as the primary key and set the value using the nextval() function.   Here’s what it looks like:

 

CREATE INPUT STREAM A
  SCHEMA ( Column1 integer ,
           Column2 integer ,
           Column3 integer ) ;

CREATE LOCAL WINDOW B
  PRIMARY KEY (ID)
  KEEP 100 ROWS
  AS SELECT nextval ( ) ID, * 

  FROM A;

 

 

Or, if your stream already has a column you want to use as the key:

 

Here’s a valid example that feeds a window directly from a stream, where the window includes a GROUP BY clause and the primary key is deduced from the GROUP BY. Note that in order to save every event in the window, each event needs a unique value in Column1 which will become the primary key.  Otherwise you really will be aggregating and creating on row in the window for each group of events with the same Column1 value.

 

CREATE INPUT STREAM A
SCHEMA (Column1 integer,
        Column2 integer,
        Column3 integer);

 

CREATE WINDOW B
PRIMARY KEY DEDUCED
KEEP 100 ROWS
AS SELECT 
  A.Column1,
  valueInserted(A.Column2),
  valueInserted(A.Column3)
FROM A GROUP BY A.Column1;

 

Now if you want to use the to do an actual aggregation, and use aggregate functions other than valueinserted(),  there’s an important caveat to this approach – a related pitfall if you aren’t careful.  The KEEP clause only manages the output – i.e. the contents of Window B. Window B will have one row for each group up to a maximum of 100 rows. BUT,  in order to compute the aggregate values, most aggregate functions will cause an “aggregation index” to be created that will hold all incoming events. The problem is that if an aggregation index gets created it will grow unbounded, consuming ever more memory. Not good.  But in addition to valueInserted(), there are three other aggregation functions that  are additive and can safely be used in this context – i.e. they are computed incrementally without retaining all the underlying events. They are:


valueInserted()
count()
sum()
average()

 

So as long as you only use these and don’t use any other aggregate functions, you’ll be fine.

 

But what if you need to calculate other aggregates?  No problem, just create two windows instead of one, as follows. Of course you need a field to aggregate on – here we’ll use Column2.

 

CREATE INPUT STREAM A
  SCHEMA ( Column1 integer ,
      Column2 integer ,
      Column3 integer ) ;

CREATE WINDOW B
  PRIMARY KEY DEDUCED
  KEEP 100 ROWS
  AS SELECT
   A.Column1,
   valueInserted(A.Column2) Column2,
   valueInserted(A.Column3) Column3
  FROM A GROUP BY A.Column1;
 
CREATE WINDOW C
  PRIMARY KEY DEDUCED
  AS SELECT
   B.Column2,
   max(B.Column3) Maxval
  FROM B GROUP BY B.Column2;

Comments

Delete Document

Are you sure you want to delete this document?

Actions