Currently Being Moderated

FAQ (Technical): CCL

Note: questions here apply to both SAP HANA smart data streaming (SDS) and SAP Event Stream Processor (ESP) unless otherwise noted.

 

CCL

What is the difference between a Stream and a Window?

 

Streams are stateless. They process each incoming event, one at a time, and publish an output event as defined by any continuous query logic attached to the stream. While a stream is processing the current event it has no knowledge of past events.

 

Windows, however,  are like a table and normally they hold some number of rows (events) . Incoming events add, update or delete rows in the window according to the opCode of the event.  The number of rows held in the Window is typically determined by a KEEP policy, for example KEEP 5 minutes will keep add incoming events to the window but then delete them after 5 minues, or KEEP 10 rows will keep the 10 rows that have been added or updated most recently, deleting the older ones.  The size/contents of a window can also be affected by incoming “delete” events (i.e. events with a “Delete” opcode). So a window without a KEEP clause might only hold a few rows if most rows added are soon deleted by incoming Delete events.

 

It’s also important to know that there is a difference in how Streams and Windows process opCodes. Because streams are stateless, they can’t really process update and delete events against previous events. Therefore how a stream will handle opcodes other than inserts depends on whether the stream has a primary key or not (note that versions of ESP prior to SP08 did not support keyed streams, so all streams behave as described here for un-keyed streams).  An un-keyed stream will turn updates and upserts into inserts and process them like an insert, and will ignore (discard) all delete events. A keyed stream will process events with any opcode, without altering the opcode.  Bear in mind, however, that a keyed stream is still stateless, so it can't "enforce" correct opCodes - i.e. if a stream receives an insert event for a key value that was never previously received, the keyed stream will still process the event.

 

How do I create a Window from a Stream?

 

There are several ways to do this – see this doc for details.

 

My aggregate window has a KEEP 5 minutes clause, but the averages it produces are based on data older than that. Why?

 

In an aggregate window (a window with a group by clause), a KEEP policy applied to the aggregate window itself applies to the output. So it will cause any aggregate row to be discarded if it hasn’t updated in the last 5 minutes (or whatever the keep policy is).  The keep policy is NOT applied to the input – so ALL the rows contributing to the aggregate record will be kept as long as that record exists in the window. To do something like create a moving average over the last 5 minues, apply the 5 minute KEEP policy in the FROM clause (or upstream of the aggregation query).

 

What is the difference between CCL and CCL Script?

(Note: in versions of ESP prior to SP09, CCL Script was called SPLASH)

All streaming projects are written in CCL.  The CCL file gets compiled into ccx, which in an executable that runs on the streaming server.  CCL is derived from SQL and as such is declarative.  This has the benefits of being familiar, easy to understand and efficient to use. However, there are times when the event processing logic you need to implement is not easily expressed in a SELECT statement.  This is where CCL Script comes in. CCL Script is a simple scripting language that can be used to write custom operators (Flex operators) and custom functions directly inline within a CCL project. Since CCL Script is procedural and has data structures, it gives you tremendous control in how to process an event.

 

Some people mistakenly think that CCL and CCL Script are two different alternatives and they need to choose one. That’s not the case.  Every streaming project is written in CCL; you then have the option of adding CCL Script where you need to implement complex logic that cannot be expressed in SQL-style operators.

 

What is an EventCache?

 

Eventcache can be described as a small window on the input stream in the query on which aggregation and other computations can be performed. It is a very powerful feature in CCL and can be used in many different places where one needs to compare data with previous stream input.  Below is an example of this:

 

CREATE INPUT STREAM S1 SCHEMA (ID integer, Val integer);

 

CREATE OUTPUT WINDOW Change1 SCHEMA (ID integer, lastVal integer, prevVal integer, chg integer)
PRIMARY KEY DEDUCED
DECLARE
eventCache(S1[ID], 5 events) ec1;
END
AS SELECT
S1.ID as ID,
S1.Val as lastVal,
nth(1,ec1.Val) as prevVal,
(S1.Val - nth(1,ec1.Val)) as chg
FROM S1
GROUP BY (S1.ID);

 

Can we use dictionaries in a regular CCL Query or just in a Flex operator?

 

Yes, the dictionary data type can be used in CCL queries also – they are not limited to use in FLEX operators or CCL Script functions.

 

CREATE INPUT STREAM S1 SCHEMA (ID integer, Val integer);

 

CREATE OUTPUT WINDOW Change1 SCHEMA (ID integer, lastVal integer, chg integer)
PRIMARY KEY DEDUCED
DECLARE
dictionary(integer, integer) prev;
integer temp;
END
AS SELECT
S1.ID as ID,
last(S1.Val) as lastVal,
(temp:=prev[S1.ID];prev[S1.ID]:= S1.Val; S1.Val-temp) as chg
FROM S1
GROUP BY (S1.ID);

 

Why does a window with a KEEP ALL clause contain only one row?

 

CREATE INPUT WINDOW  W1 SCHEMA (symbol string,volume integer,price float)
PRIMARY KEY(symbol)
KEEP 1 ROW
;

CREATE OUTPUT WINDOW W2 SCHEMA(symbol string, volume integer,price float)
PRIMARY KEY(symbol)
KEEP ALL
As
SELECT * FROM W1;

 

The reason for this is every time a new row is inserted in to window W1 the old row is deleted  as the first query has KEEP 1 row policy. The deletion propagates downstream and the row gets deleted in W2 also.

 

Understanding Opcodes

What is an opcode?

 

In CCL, every event has opcodes that is one of:  insert, update, upsert (update if present, otherwise insert), delete, or safe delete (delete it if it exists, but don’t log an error if it doesn’t).  Streams only handle inserts and will treat any updates or upserts received as an insert – and will only transmit inserts. Any deletes received by a stream will be ignored.  Windows on the other hand, apply the events to the data in the window according to the opcode. See this doc for more information on OpCodes in CCL.

 

Why am I receiving deletes from a CCL Window, even though there have been no deletes in the input?

 

The KEEP policy on a window will produce deletes when the item either ages out of a time-based window or is “pushed” out from a count-based window.

 

Capturing CCL output in SAP HANA or SAP Sybase IQ

How do I prevent the KEEP policy on my CCL output window from deleting rows in the database?

 

The data from a CCL output window can be captured in HANA or IQ (or various other databases), but by default, the table in the database will have the same contents as the CCL window. The CCL window will produce inserts, updates and deletes, and those will be applied to the database table.  This includes deletes that result from the KEEP policy on the window. This surprises some people – when what they really want is for the database to hold all rows that were ever  in the window.  This can be done, but it requires the database output adapter to be configured in “data warehouse” mode, where deletes from the window are not applied to the database table. See this doc for more information.

 

Project Optimization

My windows seem to grow unbounded, what’s wrong?

 

This is most often caused by one of the following:

 

1. A window without a KEEP policy.  In general, windows should have a KEEP clause that defines the retention policy for the window.  The default KEEP policy for a window is KEEP ALL. Unless events will be naturally deleted from the window by incoming delete events, the window needs to have a KEEP clause that will keep it from growing unboundedThe only time you should have a Window without a keep clause is when you are confident that you will have incoming DELETE events (i.e. events with a DELETE opCode) that will maintain the size of the window.

 

2. You need to be careful when you create a Window from a Stream. Let’s say your project has StreamA and StreamA is an input to WindowB and WindowB has a GROUP BY clause.  Assuming that WindowB also has a KEEP clause, the KEEP will apply to to the output of WindowB  - i.e. the aggregate rows. But unless you are careful to avoid using any non-additive functions in your column expressions, an aggregation index will be created behind the scenes and it will grow unbounded. See this doc for more detail on how to “safely” create Windows from Streams.

 

A less frequent issue is the use of dictionaries and vectors in Flex Operators. Care should be taken when using these data structures so that they don’t grow unbounded.  Be sure the logic that adds data to them also removes data from them.  Also note that garbage collection in a Flex Operator only occurs when the Flex has completed all processing triggered by the current event and moves on to the next incoming event.

 

How can I structure my CCL to reduce memory usage?

 

To minimize memory usage users can use
1. Streams
2. Windows with tight KEEP policies

3. Use additive aggregation functions
4. Watch out for aggregation indices

5. Be sure to remove unneeded data structures

 

Streams don’t have any state and thus use very little memory.  Use streams whenever possible.

 

Windows with KEEP policy store rows based on the keep policy. If there is a row with keep 1 row policy and when a new row comes in, the engine deletes the previous row and inserts a new row. This propagates deletion downstream and the corresponding row gets deleted in all downstream entities. Ensure you aren't keeping more data in Windows than you need.

 

For more on additive aggregation functions and aggregation indices, see the next question.

 

For data structures (dictionaries, vectors, event caches) be sure your CCL Script removes unneeded values/items.

 

Why do I see memory growth when my aggregate window is set to keep only 1 row?

 

Take a look at this CCL:

Create input stream S1

schema(symbol string,volume integer,price float) ;

 

Create output window w1

schema(symbol string,avg_price float)
Primary key deduced
Keep 5 rows
As
Select

  S1.symbol as symbol,

  avg(S1.price) as avg_price

from S1
Group by S1.symbol;

 

Although there is only a stream and window with keep 1 row policy involved here the engine has to store all the rows for aggregation of the group by clause and this causes the memory to increase.  The KEEP 1 row clause only applies to the output of the aggregation window. Behind the scenes an aggregation index is created that starts saving all the incoming events from S1.

 

There are a couple of ways to avoid this.

 

The easiest, and probably what you actually wanted, is to move the KEEP clause:

 

Create output window w1

schema(symbol string,avg_price float)
Primary key deduced
As
Select

  S1.symbol as symbol,

  avg(S1.price) as avg_price

from S1 KEEP 5 rows
Group by S1.symbol;

 

Now the aggregation index only holds the last 5 rows, and you are aggregating over that set.  You can also have a KEEP clause were it was, but that would have a different effect:  only holding the rows that have most recently updated (or updated within the interval used in the KEEP).

 

The other is to use one of the "special" additive aggregate functions to compute each column in the aggregation window.  These functions are computed incrementally and avoid the creation of an aggregation index behind the scenes. The additive functions that are "safe" to use in this context are:

 

                valueInserted()

                count()

                sum()

                average()

 

So change the CCL above to avoid the creation of an ever-growing aggregation index would be:

 

Create input stream S1 schema(symbol string,volume integer,price float) ;

 

Create output window w1 schema(symbol string,volume integer,price float)
Primary key deduced
As
Select

  S1.symbol symbol,

  valueInserted(S1.volume) volume,

  valueInserted(S1.price) price

from S1
Group by S1.symbol;

Comments

Delete Document

Are you sure you want to delete this document?

Actions