Technology Blogs by Members
Explore a vibrant mix of technical expertise, industry insights, and tech buzz in member blogs covering SAP products, technology, and events. Get in the mix!
cancel
Showing results for 
Search instead for 
Did you mean: 
Former Member
0 Kudos

In two recent blog I demonstrated how easy it is to call HANA views from Apache Spark and push down more complex SQL logic to HANA

Calling HANA Views from Apache Spark | SCN

Optimising HANA Query push-down from Apache Spark


As datavolumes and complexity of the underlying HANA view increase it may not be possible to execute a single query on HANA.  It may overload your HANA datasource, causing the typical 'Out of Memory' (OOM) errors you see when working with highly complex calculation views. You could increase your users OOM limit, revisit the design of the view or run the query in smaller manageable chunks, at a time, and join the results back again.


NOTE: This isn't just an issue just for Spark accessing complex Hana views, but may also occur with Vora accessing HANA as well.


If you take the take the simple test dataset from the first blog in this series, assuming  granular info is needed in Spark for subsequent joins or processing, then it may not be practically to bring all the results over in one go.



In this simple example you could break it into many sub queries such as:

select * from "_SYS_BIC"."test/CA_RDATA" WHERE RUP2 >=  'AA' and RUP2 <= ‘AZ’

select * from "_SYS_BIC"."test/CA_RDATA" WHERE RUP2 >=  'BA' and RUP2 <= ‘BZ’

Etc.

Then union the results in Spark.  


You may have noticed in the first blog that the, Spark does have some basic partitioning logic to handle this.   The down side is that it does not work on many column types, and expects the data source to have a nice sequential row number as part of the key to split the load into multiple sub-queries. Unless you have you designed your data or view in HANA this way thing this is unlikely to work your you.


In this blog I’ve created a small scala script that you can try help you to automate the split, build the SQL, run the queries in parallel before finally unioning the results.


It has a few  input parameters:

  • the Table, View or Query
  • the column to be used to split the query by
  • the number of Splits (separate calls to HANA)


e.g.


Depending on these parameters it will auto generate the splits for you such as:


The SQL statements are union-ed together in a single Spark Dataframe, which can then be queried:


This Dataframe then pushes down the split logic when it is called in Hana:



The basic logic of the below code is to:

  1. Find the distinct values for the specified column and assign a row number, using SQL similar to: 

select  to_int(ROW_NUMBER() over ()) as "ROWID" , "RUP2" from (select distinct "RUP2" from SYSTEM.RDATA order by "RUP2" )

  1. Based on the number of Distinct column values and split number request, the ‘interval’  or number of column values in each split group is determined. 

Math.round( Max Row Number/number of Splits) 

  1. The ‘Mod’ function is used to identify which “column” value is used as the end point in each split.    Row Number % interval
  2. Records with Mod 0 are identified, and LAG function used to determine start point of each range
  3. A dynamic WHERE and SELECT clause are created
  4. Finally the individual SELECT clauses are created as individual Spark Dataframes  which are then unioned together as a single Spark Dataframe



Here is the code I created, I hope you find it useful.


Scala Code

import org.apache.spark.sql.DataFrame

import org.apache.spark.sql.expressions.Window

/* Function returns a Dataframe for a Hana sourced table or View*/

def selectHana(hanaView: String) : org.apache.spark.sql.DataFrame = {

  val driver     ="com.sap.db.jdbc.Driver"

  val url        ="jdbc:sap://<HANA HOST>:<PORT>" //Different port for MDC

  val database   = "<DATABASE NAME>"                     //Needed for MDC

  val username   = "<USERID>"

  val password   = "<PASSWORD>"

  var resultsDF: DataFrame = null;

  resultsDF = sqlContext.read.format("jdbc").option("driver",driver)

                                           .option("url",url)

                                           .option("databaseName", database)

                                           .option("user", username)

                                           .option("password",password)

                                           .option("dbtable", hanaView)

                                           .load()

  return resultsDF

}

/* Function which generates a 'WHERE' clause based on inputs */

def buildWhereClause(columnId: String, columnFr: String , columnTo: String ) : String = {

  var whereStr =  """where """"

  if (columnFr == null) { // First Row

     whereStr = whereStr + columnId + """" <= '""" + columnTo + "'"

  }

  else {

     whereStr = whereStr + columnId + """" > '""" + columnFr  + """' and  """" + columnId + """" <= '""" + columnTo + "'"

  }

 

  return whereStr

}

//Define UDF function, used in a Dataframe, to build the where clause

val buildWhereClause_udf = udf(buildWhereClause _)

/* Function which splits a HANA view, based on an input colum, into multple sub queries based on splits requested */   

def splitHanaFilter(hanaView: String, splitColumn: String, numSplits: Int) : org.apache.spark.sql.DataFrame = {

  

  val wSpec = Window.orderBy("ROWID")

  //Return at data frame with the distinct splitColum, with an associated row id */

  //e.g.

  // select  to_int(ROW_NUMBER() over ()) as "ROWID" , "RUP2" from (select distinct "RUP2" from SYSTEM.RDATA order by "RUP2" )

  val rowIdSql = """(select  to_int(ROW_NUMBER() over ()) as "ROWID" , """" + splitColumn +

                """" from (select distinct """" + splitColumn + """" from """ + hanaView + """ order by """" + splitColumn + """" )) as tmp"""

  val RowIdDF = selectHana(rowIdSql)

  //Get MaxRow and set interval

  val maxRowDF = RowIdDF.agg(max($"ROWID").alias("MAXROWID"))

  val maxRowId : java.lang.Integer = maxRowDF.head().getInt(0)

  val interval = Math.round(maxRowId/numSplits + 0.5)   //Rounds up to keep the number of Splits aligned, otherwise may get 1 extra split for few records.

  //Add Colum representing the  Mod of the 'Interval' then return filtered records where MOD zero

  val filterRowIdDF = RowIdDF.join(maxRowDF)

                             .withColumn("MOD", RowIdDF("ROWID")%interval)

                             .filter("MOD = 0  OR ROWID = MAXROWID").select("ROWID" , splitColumn , "MAXROWID" )

                             .withColumn("PREVKEY", lag(RowIdDF(splitColumn), 1).over(wSpec) )

                             .withColumnRenamed(splitColumn, "KEY")

                             .withColumn("WHERE",buildWhereClause_udf(lit(splitColumn), $"PREVKEY", $"KEY" ))

                             .withColumn("SELECT", concat ( lit("(select * from " + hanaView),lit(" ") , $"WHERE" , lit(") as tmp") )   )

  return filterRowIdDF

}

/* Main Execution of the functions  */

val hanaTableView = "SYSTEM.RDATA"

//val table_view = """(select "$rowid$" as "rowid", "RUP2" from SYSTEM.RDATA group by "RUP2") as tmp"""

val splitColumn = "RUP2"

var numSplits : Integer = 5;

var unionResultsDF: DataFrame = null;

var selectArray = splitHanaFilter(hanaTableView, splitColumn, numSplits).select("SELECT").collect

/* Loop through the sub-queries, create individual Dataframes, then union results*/

var iteration = 1

selectArray.foreach( row => {

                         var rowSELECT = row.getString(0)

                         //println(x.getString(0))

                         if (unionResultsDF!= null) {

                             unionResultsDF = unionResultsDF.unionAll(selectHana(rowSELECT))

                             //println("Next: " + iteration)

                          } else {

                             //println("1st: " + iteration)

                             unionResultsDF = selectHana(rowSELECT)

                          }

                          println("Iteration " + iteration + ": " + rowSELECT)

                          iteration =  iteration +  1

                         } )

unionResultsDF.show()   //Only first SQL called

println("Count " + unionResultsDF.count())

unionResultsDF.registerTempTable("RDATA_PARA");



When you execute the scala code (to create the final Spark Dataframe, and register as a Spark table) you may see results as follows:



While this code was written with only Apache Spark in mind, with some little tweaking it can also be applied to Vora queries against HANA as well.


I hope you've found this useful.  If you found this helped you or you needed to make any other tweaks to optimise it in your environment then please add a comment.


Labels in this area