Currently Being Moderated

FAQ (Technical) for SAP Sybase ESP Users

 

CCL

What is the difference between a Stream and a Window?

 

Streams are stateless. They process each incoming event, one at a time, and publish an output event as defined by any continuous query logic attached to the stream. While a stream is processing the current event it has no knowledge of past events.

 

Windows, however,  are like a table and normally they hold some number of rows (events) . Incoming events add, update or delete rows in the window according to the opCode of the event.  The number of rows held in the Window is typically determined by a KEEP policy, for example KEEP 5 minutes will keep add incoming events to the window but then delete them after 5 minues, or KEEP 10 rows will keep the 10 rows that have been added or updated most recently, deleting the older ones.  The size/contents of a window can also be affected by incoming “delete” events (i.e. events with a “Delete” opcode). So a window without a KEEP clause might only hold a few rows if most rows added are soon deleted by incoming Delete events.

 

It’s also important to know that there is a difference in how Streams and Windows process opCodes. Because streams are stateless, they can’t process update and delete events. Therefore a stream will turn updates and upserts into inserts and process them like an insert, and will ignore all delete events.

   

How do I create a Window from a Stream?

 

There are several ways to do this – see this doc for details.

   

My aggregate window has a KEEP 5 minutes clause, but the averages it produces are based on data older than that. Why?

 

In an aggregate window (a window with a group by clause), the KEEP policy is applied to the output. So it will cause any aggregate row to be discarded if it hasn’t updated in the last 5 minutes (or whatever the keep policy is).  The keep policy is NOT applied to the input – so ALL the rows contributing to the aggregate record will be kept as long as that record exists in the window. To do something like create a moving average over the last 5 minues, apply the 5 minute KEEP policy upstream of the aggregation query.

   

What is the difference between CCL and SPLASH?

 

All ESP projects are written in CCL.  The CCL file gets compiled into ccx, which in an ESP executable.  CCL is derived from SQL and as such is declarative.  This has the benefits of being familiar, easy to understand and efficient to use. However, there are times when the event processing logic you need to implement is not easily expressed in a SELECT statement.  This is where SPLASH comes in. SPLASH is a simple scripting language that can be used to write custom operators (Flex operators) and custom functions directly inline within a CCL project. Since SPLASH is procedural and has data structures, it gives you tremendous control in how to process an event.

 

Some people mistakenly think that CCL and SPLASH are two different alternatives and they need to choose one. That’s not the case.  Every ESP project is written in CCL; you then have the option of adding SPLASH where you need to extend CCL.

   

What is an EventCache?

 

Eventcache can be described as a small window on the input stream in the query on which aggregation and other computations can be performed. It is a very powerful feature in ESP and can be used in many different places where one needs to compare data with previous stream input.  Below is an example of this:

 

CREATE INPUT STREAM S1 SCHEMA (ID integer, Val integer);

 

CREATE OUTPUT WINDOW Change1 SCHEMA (ID integer, lastVal integer, prevVal integer, chg integer)
PRIMARY KEY DEDUCED
DECLARE
eventCache(S1[ID], 5 events) ec1;
END
AS SELECT
S1.ID as ID,
S1.Val as lastVal,
nth(1,ec1.Val) as prevVal,
(S1.Val - nth(1,ec1.Val)) as chg
FROM S1
GROUP BY (S1.ID);

 

Can we use dictionaries in a regular CCL Query or just in a Flex operator?

 

Yes, the dictionary data type can be used in CCL queries also – they are not limited to use in FLEX operators or SPLASH functions.

 

CREATE INPUT STREAM S1 SCHEMA (ID integer, Val integer);

 

CREATE OUTPUT WINDOW Change1 SCHEMA (ID integer, lastVal integer, chg integer)
PRIMARY KEY DEDUCED
DECLARE
dictionary(integer, integer) prev;
integer temp;
END
AS SELECT
S1.ID as ID,
last(S1.Val) as lastVal,
(temp:=prev[S1.ID];prev[S1.ID]:= S1.Val; S1.Val-temp) as chg
FROM S1
GROUP BY (S1.ID);

 

Why does a window with a KEEP ALL clause contain only one row?

 

CREATE INPUT WINDOW  W1 SCHEMA (symbol string,volume integer,price float)
PRIMARY KEY(symbol)
KEEP 1 ROW
;

CREATE OUTPUT WINDOW W2 SCHEMA(symbol string, volume integer,price float)
PRIMARY KEY(symbol)
KEEP ALL
As
SELECT * FROM W1;

 

The reason for this is every time a new row is inserted in to window W1 the old row is deleted  as the first query has KEEP 1 row policy. The deletion propagates downstream and the row gets deleted in W2 also.

 

Understanding Opcodes

What is an opcode?

 

In ESP, every event has opcodes that is one of:  insert, update, upsert (update if present, otherwise insert), delete, or safe delete (delete it if it exists, but don’t log an error if it doesn’t).  Streams only handle inserts and will treat any updates or upserts received as an insert – and will only transmit inserts. Any deletes received by a stream will be ignored.  Windows on the other hand, apply the events to the data in the window according to the opcode. See this doc for more information on OpCodes in ESP.

 

Why am I receiving deletes from an ESP Window, even though there have been no deletes in the input?

 

The KEEP policy on a window will produce deletes when the item either ages out of a time-based window or is “pushed” out from a count-based window.

 

Capturing ESP output in SAP HANA or SAP Sybase IQ

How do I prevent the KEEP policy on my ESP window from deleting rows in HANA or IQ?

 

The data from an ESP output window can be captured in HANA or IQ (or various other databases), but by default, the table in the database will have the same contents of the ESP window. The ESP window will produce inserts, updates and deletes, and those will be applied to the database table.  This includes deletes that result from the KEEP policy on the window. This surprises some people – when what they really want is for the database to hold all rows that were ever  in the window.  This can be done, but it requires the database output adapter to be configured in “data warehouse” mode, where deletes from the window are not applied to the database table. See this doc for more information.

 

Project Deployment

How do I use command line commands to start a project in ESP?

 

1) Start a cluster manager node:
$ esp_server --cluster-node=node1.xml
The node1.xml file can be found under $ESP_HOME/cluster/nodes/node1

 

2) Log into the cluster manager, create a workspace, and start projects under the cluster manager:
a. Log in to cluster manager
$ esp_cluster_admin --uri esp://<hostname>:<port>
b. Add a workspace
> add workspace <workspace_name>
c. Add a project
> add project <workspace_name>/<project_name> <project>.ccx
d. Start the project
> start project <workspace_name>/<project_name>
e. Get details of the project
> get project <workspace_name>/<project_name>
f. (Optional) Make command port and sql port static via project configuration file instead of dynamically picked by cluster manager
> add project <workspace_name>/<project_name> <project>.ccx <project>.ccr

 

3) (Optional) Interactive admin commands can be issued via the command line as independent commands
$ esp_cluster_admin --uri esp://<hostname>:<port> --get_projectdetail --workspace-name <workspace_name> --project-name <project_name>

 

Project Optimization

My windows seem to grow unbounded, what’s wrong?

 

This is most often caused by one of the following:

 

1. A window without a KEEP policy.  In general, windows should have a KEEP clause that defines the retention policy for the window.  The default KEEP policy for a window is KEEP ALL. Unless events will be naturally deleted from the window by incoming delete events, the window needs to have a KEEP clause that will keep it from growing unboundedThe only time you should have a Window without a keep clause is when you are confident that you will have incoming DELETE events (i.e. events with a DELETE opCode) that will maintain the size of the window.

 

2. You need to be careful when you create a Window from a Stream. Let’s say your project has StreamA and StreamA is an input to WindowB and WindowB has a GROUP BY clause.  Assuming that WindowB also has a KEEP clause, the KEEP will apply to to the output of WindowB  - i.e. the aggregate rows. But unless you are careful to avoid using any non-additive functions in your column expressions, an aggregation index will be created behind the scenes and it will grow unbounded. See this doc for more detail on how to “safely” create Windows from Streams.

 

A less frequent issue is the use of dictionaries and vectors in Flex Operators. Care should be taken when using these data structures so that they don’t grow unbounded.  Be sure the logic that adds data to them also removes data from them.  Also note that garbage collection in a Flex Operator only occurs when the Flex has completed all processing triggered by the current event and moves on to the next incoming event.

 

What are the ways in ESP to reduce memory usage in the project?

 

To minimize memory usage users can use
1. Streams,
2. Windows with KEEP policy
3. Delta streams.

 

Streams don’t have any storage  and can understand only insert opcode. When a row with update opcode  arrives it is converted to insert opcode  and used in a stream. A  delete opcode row is ignored by Streams.

 

Windows with KEEP policy store rows based on the keep policy. If there is a row with keep 1 row policy and when a new row comes in, the engine deletes the previous row and inserts a new row. This propagates deletion downstream and the corresponding row gets deleted in all downstream entities.

 

A  Delta stream is a new feature in ESP which was not present in Aleri.  A Delta stream does not store any rows and understands all opcodes. A Delta streams use the input stream storage and decides on what to do with the row that arrives. Delta stream also has the ability to change the opcodes if it is used as a filter.

 

For example..

 

CREATE INPUT WINDOW W1 SCHEMA (Symbol string,Volume integer,Price Float)
PRIMARY KEY(Symbol)
;

 

CREATE OUTPUT DELTA STREAM S1 SCHEMA(Symbol string, Volume integer,Price Float)
PRIMARY KEY(Symbol)
AS
SELECT * from W1
where W1.Volume > 100;

 

CREATE OUTPUT WINDOW W2 SCHEMA(Symbol string, Volume integer,Price Float)
PRIMARY KEY(Symbol)
AS
SELECT * from S1;

 

If the data send in as input is:

i,SAP,90,3
u,SAP,120,3
u,SAP,200,3

then the first row is rejected as it does not satisfy the filter. The second row gets converted to an insert and propagates downstream as an insert row and the third row propagates as an update opcode downstream.

 

A Delta stream has the ability to change opcodes referencing the data from the input. It determines what opcode it needs to output for a particular incoming event by checking whether data has gone through it or not by referencing the input window data, although it doesn't store the data by itself. There are some restrictions to its usage which are documented under DELTA stream documentation in the CCL guide.

 

Why do I see a lot of memory used when my aggregate window is set to keep only 1 row?

 

Create input stream S1 schema(symbol string,volume integer,price float) ;

Create output window w1 schema(symbol string,volume integer,price float)
Primary key deduced
Keep 1 row
As
Select * from S1
Group by S1.symbol;

 

Although there is only a stream and window with keep 1 row policy involved here the engine has to store all the rows for aggregation of the group by clause and this causes the memory to increase.  The KEEP 1 row clause only applies to the output of the aggregation window. The way to avoid this is to use one of the "special" additive aggregate functions to compute each column in the aggregation window.  These functions are computed incrementally and avoid the creation of an aggregation index behind the scenes. The additive functions that are "safe" to use in this context are:

 

                valueInserted()

                count()

                sum()

                average()

 

So change the CCL above to avoid the creation of an ever-growing aggregation index would be:

 

Create input stream S1 schema(symbol string,volume integer,price float) ;

 

Create output window w1 schema(symbol string,volume integer,price float)
Primary key deduced
As
Select

  S1.symbol symbol,

  valueInserted(S1.volume) volume,

  valueInserted(S1.price) price

from S1
Group by S1.symbol;

Comments

Delete Document

Are you sure you want to delete this document?

Actions