cancel
Showing results for 
Search instead for 
Did you mean: 

Building windows based on timestamps

former_member259476
Participant
0 Kudos

Hi all,

let's assume in an ESP scenario with guaranteed delivery/zero data loss, my ESP project has an input stream receiving messages containing live sensor data together with a timestamp (resulution: 1 second). The incoming data may not be in the right order when beeing sent to ESP. The intervall in which ESP receives the data may also vary.

Is it possible to create windows (or whatever is suitable) that always collect e.g. 5 subsequent seconds of sensor data based on the timestamp provided in the incoming data. A window should only be sent towards the output adapter when all 5 subsequent seconds of data have been received (in other words: the 5 second window is complete).

Additionally: Is it possible to make sure that the windows are sent towards the output adapter in the correct order, again based on the timestamp.

Thanks a lot,

Patrick

Accepted Solutions (1)

Accepted Solutions (1)

JWootton
Advisor
Advisor
0 Kudos

Yes, you can do this, but it's a bit more complicated than simply defining a CCL Window with a KEEP 5 Seconds clause - because that will always be based on arrival time.  Also:  that doesn't give you the batching capability - i.e. only publish once you have 5 values.

You will need to set this up as a Flex operator and your "window" won't actually be a CCL window (as in CREATE OUTPUT WINDOW...) but you'll want to use either a vector or eventCache to collect the 5 most recent values based on timestamp, and it's easy enough to only output once you have 5 values.  EventCache is probably easier, since it can maintain a sorted bucket, allowing you to easily publish the 5 events in correct timestamp order, once you have 5 events.

But there are some behaviors that you'll need to think about and define:

1. If you start to accumulate events, and then you get an event where the timestamp is more than 5 seconds "newer" than the oldest event in your "window", then presumably you add the new one to the window and remove the old one.

- but if you are waiting to product output until you have 5 events in your window, if this most recent one was only, say, the 3rd event since the "old" one,  then you will push out th old one and still have not collected 5 - is that what you want?

- what if you recieve an event with a timestamp older than the oldest timestamp in yoru window?  Do you just throw it away?  What do you do with it?

- and what if, after pushing out the old one, you receive one with a timestamp earlier than the event that caused the old one to be pushed out? at this point the one you removed from the window is lost.  Is that ok?

- is the window a jumping window or a rolling window?  Once you have 5 events in the window,  all within timestamps in a since 5 second interval, and you publish them, do you remove them from the window or leave them?  I'm guessing you are removing them, and now collecting a new group of 5...

Answers (1)

Answers (1)

former_member70923
Discoverer
0 Kudos

Hi Jan,

What Jeff mentioned is accurate. I faced the same challenges (column based data retention, data arriving in unsorted manner, volume etc) in a recent PoC. I'm trying to pull out my model from customer site and will share if I manage. I was planning to request for this feature anyway (custom datetime column based retention).

Based on my experience during the PoC:

- use flex operator for this retention window

- create a dictionary of vector

- dictionary key is time and the actual record is pushed into vector. ESP natively converts seconddate to int. So you can use the number of seconds from EPOCH time (1st Jan 1970) as the dictionary key. This is very easy for dictionary maintenance.

- You can maintain data for 6 seconds and when data for 7th second arrives, process seconds 1-5 and delete the dictionary entries in a rolling or jumping manner. This will provide your system 1 additional second grace period for the 5th second data to arrive. But, if 5th second data comes in after 7th second data has started, what do you want to do? (as Jeff had asked)

- you need to maintain a vector of time with 5 entries i.e. 5 seconds that need to be processed when 7th second data arrives. Correlate this and the dictionary to send data out in a particular order. There might be a better way, like a coalesced eventcache or custom record type based vector, do explore.

- you will also need a log store and dictionary rebuild logic (from the Flex Window) in a restart scenario for GD.

few points as I had observed in my PoC:

- using dictionary and "removing" key entries will be faster than eventcache, as you need to loop through the records and use deleteCache. The expireCache function is not applicable as data is not being maintained using system time. You will have to delete from the flex window in any case (using output setopcode).

- Do note the data processing, no matter how small, when 7th second record arrives (or after subsequent new second data) will create a backlog (queuedepth > 0). don't worry about it and it will get cleared soon. Partition the flex operator using hash on sensor id or similar column.

- This will work great if you just want to pass on the data downstream. I would recommend doing custom aggregation on this flex, if required, rather than a downstream aggregation window. This flex will throw out data faster than agg window can process.

What are your data volumes and velocity by the way?

Regds,

Ashok