Technology Blogs by SAP
Learn how to extend and personalize SAP applications. Follow the SAP technology blog for insights into SAP BTP, ABAP, SAP Analytics Cloud, SAP HANA, and more.
cancel
Showing results for 
Search instead for 
Did you mean: 
former_member199076
Participant
0 Kudos

In the first part of this series, we developed a simple generic interface for posting device messages from a SQL Anywhere database to the HANA IoT Services. However, the actual devices message was a static, hand-written JSON payload. In this post we will simulate an actual collection environment and generate the devices messages directly from the database.

For this demo, we will assume that the SQL Anywhere database is running in a device that is connected to a machine. The machine has three voltage sensors that have to be monitored. The voltage levels levels are recorded every 5 seconds. In general, the voltages do not change very quickly, so to reduce communications, only a minutely average for each sensor is sent to the cloud during normal operation.

To model this, we have to setup our devices and messages in the IoT Cockpit.

1. Create a new device type, called MyDeviceType.

2. Create a new message, called VoltageReading.

Device Type: MyDeviceType

Direction: From Device

Fields:

sensorID - integer

voltageLevel - float

averagePeriod - string

ts - long


3. Create a new device instance of  MyDeviceType, called MyDeviceInstance.


When you create a new device instance, the cockpit will provide the OAuth access token. Copy and paste this somewhere safe as it will be used to communicate with the IoT Services.

Continuing from the first part of this series, open your instance of Interactive SQL and connect to your SQL Anywhere database.

We will define a simple table to hold our sensor readings. Execute the following SQL statement.


CREATE TABLE voltageReadings (
  "id" INTEGER PRIMARY KEY DEFAULT AUTOINCREMENT,
  "sensorID" INTEGER NOT NULL,
  "voltageLevel" NUMERIC(4,2) NOT NULL,
  "ts" TIMESTAMP DEFAULT NOW(),  -- automatically populate with current time at insert
);




We need something to simulate the sensors being polled for their voltage levels. We could use an external scripting language such as Python to do this, but to keep it simple we will simulate it using SQL Anywhere events. Events are blocks of SQL that can be executed either in response to an event, or on a schedule. We will create a separate event for each sensor.


CREATE EVENT sensorOneSimulator
  SCHEDULE START TIME '12:00 AM' EVERY 5 SECONDS
HANDLER BEGIN
  INSERT INTO voltageReadings(sensorID, voltageLevel) VALUES (1, RAND() * 20);
END;
CREATE EVENT sensorTwoSimulator
  SCHEDULE START TIME '12:00 AM' EVERY 5 SECONDS
HANDLER BEGIN
  INSERT INTO voltageReadings(sensorID, voltageLevel) VALUES (2, RAND() * 20);
END;
CREATE EVENT sensorThreeSimulator
  SCHEDULE START TIME '12:00 AM' EVERY 5 SECONDS
HANDLER BEGIN
  INSERT INTO voltageReadings(sensorID, voltageLevel) VALUES (3, RAND() * 20);
END;


After a few minutes, execute the following query to verify that simulated sensor readings are being added to the database:


SELECT * FROM voltageReadings ORDER BY timestamp;


We want to keep the actual readings around in case they are needed, but for now we are only planning on sending up one-minute averages. There is no need to store these because they can be calculated when they are needed, so we can create a view.


CREATE OR REPLACE VIEW voltageReadingsByMinute AS
SELECT voltageReadings.sensorID sensorID,
       CAST (AVG(voltageLevel) AS NUMERIC(4,2)) averageVoltageLevel,
       MINUTES( '1970-01-01 12:00:00', voltageReadings.ts) minutesSinceEpoch
FROM voltageReadings
WHERE minutesSinceEpoch < MINUTES( '1970-01-01 12:00:00', NOW()) -- make sure we only get completed minutes
GROUP BY minutesSinceEpoch, sensorID




How can we be sure if a minute average has been already sent? Suppose our database loses connectivity. When it reconnects, how can we be sure which minute summaries have already been sent?

We need to create a table that will keep track of this information.


CREATE TABLE minuteAverageBookkeeping (
  "minutesSinceEpoch" INTEGER NOT NULL,
  "sensorID" INTEGER NOT NULL,
  "sentToCloudTS" TIMESTAMP NOT NULL DEFAULT NOW(),
  PRIMARY KEY (minutesSinceEpoch, sensorID)
);




If an entry exists in this table for a given sensorID and minutesSinceEpoch, then we can be sure the data has already been sent.

We will also need the push_iot_messages procedure we developed in the first post. We are going to be using the result set in a slightly different way, so we need to redefine it with a slightly different definition. To avoid confusion with the procedure we already created, we will give it a new name: PushIoTMessages.


CREATE OR REPLACE PROCEDURE PushIoTMessages (
  IN host LONG VARCHAR,
  IN device LONG VARCHAR,
  IN token LONG VARCHAR,
  IN body LONG VARCHAR)
RESULT (attribute LONG VARCHAR, value LONG VARCHAR)
URL 'https://!host/com.sap.iotservices.mms/v1/api/http/data/!device' 
TYPE 'HTTP:POST:application/json'
HEADER 'Authorization: Bearer !token'
--PROXY 'http://my-proxy' -- Uncomment this if you need to go through a proxy 
CERTIFICATE 'certificate_name=gte'




Now comes the function that does the heavy lifting. This function finds and calculates all of the minute averages that have not yet been sent, and POSTs them using the web procedure we just defined.


-- This function will send all outstanding minute average messages as a JSON document
-- to the SAP HANA IoT Services, and mark the averges as sent in the bookkeeping table
-- Return Value:
--   < 0 denotes and error
--   >= 0 denotes success, and the value is the number of messages sent
CREATE OR REPLACE FUNCTION sendOutstandingMessagesAndUpdateBookkeeping()
RETURNS INTEGER
BEGIN
DECLARE msgBody LONG VARCHAR; -- the JSON body of our message
DECLARE status LONG VARCHAR; -- the HTTP return code of the web service request
DECLARE returnValue INTEGER = -1; -- the return value of the function
-- find and calculate all minute averages that have not been sent, and put
-- them in a local temporary called outstandingVoltageReadingsByMinute
SELECT voltageReadingsByMinute.sensorID,
       voltageReadingsByMinute.averageVoltageLevel,
       voltageReadingsByMinute.minutesSinceEpoch
INTO LOCAL TEMPORARY TABLE outstandingVoltageReadingsByMinute
FROM voltageReadingsByMinute LEFT OUTER JOIN minuteAverageBookkeeping ON
  voltageReadingsByMinute.sensorID = minuteAverageBookkeeping.sensorID AND
  voltageReadingsByMinute.minutesSinceEpoch = minuteAverageBookkeeping.minutesSinceEpoch
WHERE sentToCloudTS IS NULL;
-- select from the local temporary table to create a JSON string that will form
-- the body of the message. This message body must conform to the message type
-- setup in the IoT Services Cockpit
SELECT sensorID,
       averageVoltageLevel voltageLevel,
       'minute' averagePeriod,
       (minutesSinceEpoch * 60) ts
INTO msgBody
FROM outstandingVoltageReadingsByMinute
FOR JSON RAW;
-- Upload the message and store the HTTP results in a local temporary table
SELECT *
INTO LOCAL TEMPORARY TABLE HTTPResults
FROM PushIoTMessages(
  host='iotmmsiXXXXXXtrial.hanatrial.ondemand.com',
  device='my-device-id',
  token='my-token-id',
  body='{  "messageType": 1,  "messages": ' || msgBody || '}'
);
-- select the HTTP Status out of the HTTPResults local temporary table
SET status = (SELECT TOP 1 value FROM HTTPResults WHERE attribute = 'Status');
-- search the HTTP Status for a 202 Approved status
IF LOCATE(status, '202') > 0 THEN
  -- if found, insert bookkeeping rows in the minuteAverageBookkeeping table
  INSERT INTO minuteAverageBookkeeping(minutesSinceEpoch, sensorID,sentToCloudTS)
    SELECT minutesSinceEpoch, sensorID, NOW() FROM outstandingVoltageReadingsByMinute;
  -- set the return value to the number of messages sent
  SET returnValue = (SELECT COUNT(*) FROM outstandingVoltageReadingsByMinute);
ENDIF;
RETURN returnValue;
END;




Currently we need to call this function manually every time we want to upload new messages. To automate this, we will create an event to call this function every minute.


CREATE EVENT pushMessagesEvent
  SCHEDULE START TIME '12:00 AM' EVERY 1 MINUTES
HANDLER BEGIN
  DECLARE messageSendStatus INTEGER = sendOutstandingMessagesAndUpdateBookkeeping();
  IF messageSendStatus >= 0 THEN
    MESSAGE messageSendStatus || ' messages sent successfully @ ' || NOW();
    COMMIT;
  ELSE
    MESSAGE 'Messages send failed @ ' || NOW() || ' Will try again in one minute';
  END IF;
END;




If messages were sent successfully, it prints a short debug message out to the console and commits the changes to the bookkeeping table. If the messages were not sent successfully, the event ends. When the event fires again in one minute, it will attempt to send any rows that have not yet been uploaded.

To verify it is working, select all of the rows from the voltageReadingsByMinute table.

Within a minute, you should see those rows appearing in the IoT Services Cockpit.

(Note that the averageVoltageLevels are slightly different because the SQL Anywhere database is using NUMERICs, and the IoT Services is using FLOATs. Also, on the SQL Anywhere side we are recording minutes since the epoch, whereas IoT Services is using seconds. [23,905,565 x 60 = 1,434,333,900])

Of course, because each sensor reading is committed to the database, this setup can handle disconnections in the network. It will send up all outstanding messages the next time it connects. To test this, I physically pulled the network cable on my laptop and went to get a coffee. Below is the output of the message window during this test.

At approximately 14:25 the pushMessagesEvent attempted to send the device messages and found it could not connect. When I came back with my coffee a few minutes later, I plugged the network cable back in. The next time the event ran, it uploaded all the outstanding messages.

This has been a relatively straight forward example of using SQL Anywhere to store and forward aggregated summaries of IoT data, even in the face of disconnections. Because a database is a natural place for data manipulations, it is very easy to extend this demo to add more functionality. For example, suppose the system may be offline for many hours and it will have thousands of messages to send up when it connects. You may want to limit the number of messages being sent in each web service request. It is trivial to add this functionality by modifying the definition of sendOutstandingMessagesAndUpdateBookkeeping and changing line 16 to:


SELECT TOP 100 voltageReadingsByMinute.sensorID,



This will limit each upload to only 100 messages. When the event fires again minute later, then next one hundred messages will be sent.