Application Development Blog Posts
Learn and share on deeper, cross technology development topics such as integration and connectivity, automation, cloud extensibility, developing at scale, and security.
cancel
Showing results for 
Search instead for 
Did you mean: 
Former Member

In this post I will document how to schedule batch jobs using python and how to handle the output. You may want to check out my first blog post as an introduction on how to access SAP through Python: Python for Basis (Part 1) .

Description of the Scenario

In the following section we want to perform a security assessment by scheduling report RSUSR002 to find out the following:

  • Which users have SAP_ALL?
  • Which users are able to execute any function module thanks to S_DEVELOP?

That means, we need to accomplish the following tasks:

  • Logon to the SAP client we want to evaluate
  • Create a variant for report RSUSR002
  • Schedule report RSUSR002 in the background with immediate start
  • Wait for completion of the job
  • Download the generated spool
  • Extract the users from the downloaded spool
  • Generate an Excel file with the extracted information

Extending the PyRFC Connection Class

The easiest way to accomplish our task is to create a subclass of the PyRFC Connection class and add our own functions to it.

import pyrfc

class sap_abapclient(pyrfc.Connection):

    def __init__(self, *args, **kwargs 😞

        pyrfc.Connection.__init__(self, *args, **kwargs )

        self.condetails=self.get_connection_attributes()

Instead of using the standard pyrfc.Connection class to connect to an SAP system we will use our own class. The following sections will document the different functions required to complete the task.

Check whether RSUSR002 variant already exists

Before we schedule a RSUSR002 job we need to create or modify a corresponding variant. Since we need to use different function modules for maintenance and creation, we need to check first, whether a variant exists. The following function will take care of this:

    def check_if_variant_exists(self, program, variant):

        result = self.call('RFC_READ_TABLE',

                         QUERY_TABLE='VARID', \

                         DELIMITER='|', \

                         FIELDS=[{'FIELDNAME':'VARIANT'}], \

                         OPTIONS = [{'TEXT':"REPORT EQ '" + program.upper()+"'"}])

        tabledump=result['DATA']

        variant_exists=False

        for record in tabledump:

            if record['WA']==variant.upper():

                variant_exists=True

        return(variant_exists)

The function performs the following:

  • Execute function module RFC_READ_TABLE to download the existing variant names of report RSUSR002
  • Initialize the boolean variable variant_exists
  • loop over all records and find out if the variant name we picked already exists.
  • If the variant exists, variant_exists will be set to True
  • At the end the function returns the content of the variant_exists.

Create/Maintain RSUSR002 variant

To create a variant we use RS_CREATE_VARIANT_RFC, to modify an existing variant we use RS_CHANGE_CREATED_VARIANT_RFC. The corresponding python function starts with an underscore ('_') which by convention determines that a function should not be called directly. It would be a private method but python doesn't prevent calling these functions directly.

If you look at the definition of the function modules then you see several different import parameters. We need the following:

  • VARI_DESC: Defines the variant name and will contain the change date and timestamp
  • VARI_CONTENTS: contains the actual variant values
  • VARI_TEXT: Contains the description in the variant attributes

VARI_CONTENTS is defined as a table with the following fields that are part of the variant values table when looking at the variant through transaction SE38:

  • SELNAME: Selection Screen
  • KIND: Type
  • OPTION: Option
  • LOW: frm
  • HIGH: to
  • SIGN: I/E

The easiest way to get an idea of the variants is to define it in the report itself and then look at the actual values using the variant editor in transaction SE38. Like with all tables, the variant has to be defined as a list of dictionaries. This is the python function to create a create or maintain a variant for RSUSR002:

    def _create_suim_variant(self, variant):

        """

        Creates a temporary variant to schedule the report

        :param variant: list of dictionaries that comprise the variant content.

        """

        import datetime

        report = u'RSUSR002'

        variantname = u'TEMPVARI'

        vari_text = list()

        vari_list = list()

        vari_contents = list()

        vscreens = list()

        temp = dict(MANDT = self.condetails['client'],

                     LANGU=u'EN',

                     REPORT=report,

                     VARIANT=variantname,

                     VTEXT=u'temporary')

        vari_text.append(temp)

        vari_desc = dict(REPORT = report,

                         VARIANT=  variantname,

                         AEDAT = datetime.date(2013,12,12),

                         AETIME = datetime.time(00,00))

        vari_contents.extend(variant)

        if self.check_if_variant_exists(report, variantname)==True:

            test = self.call('RS_CHANGE_CREATED_VARIANT_RFC', CURR_REPORT= report,

                                                              CURR_VARIANT= variantname,

                                                              VARI_DESC = vari_desc,

                                                              VARI_CONTENTS = vari_contents,

                                                              VARI_TEXT = vari_text)

        else:

            test = self.call('RS_CREATE_VARIANT_RFC', CURR_REPORT= report,

                                                      CURR_VARIANT= variantname,

                                                      VARI_DESC = vari_desc,

                                                      VARI_CONTENTS = vari_contents,

                                                      VARI_TEXT = vari_text)

Schedule report RSUSR002 as Batch Job

To schedule the report in background we will to use the XBP interface (Background Processing, Job Scheduling (BC-XBP). we are going to run the following function modules:

  • BAPI_XMI_LOGON: Before we schedule a job we need to register with the instance
  • BAPI_XBP_JOB_OPEN: Initiate the creation of a new job definition
  • BAPI_XBP_JOB_ADD_ABAP_STEP: Define the job step to execute
  • BAPI_XBP_JOB_CLOSE: complete the job creation
  • BAPI_XBP_JOB_START_IMMEDIATELY: Change the job definition for immediate start.
  • BAPI_XMI_LOGOFF: log off from the instance

Here is the code:

    def schedule_job_immediately(self, jobname, program, variant='none', wait='yes'):

        import datetime

        if variant<>'none':

            if self.check_if_variant_exists(program, variant)==False:

                print('Variant Does not Exist')

                exit(3)

        result = self.call ('BAPI_XMI_LOGON', EXTCOMPANY='LARS',

                                              EXTPRODUCT='assessment',

                                              INTERFACE='XBP',

                                              VERSION='2.0')

        result = self.call('BAPI_XBP_JOB_OPEN', JOBNAME=jobname,

                                                EXTERNAL_USER_NAME='AUDIT')


        jobcount=result['JOBCOUNT']

        if variant<>'none':

            result = self.call('BAPI_XBP_JOB_ADD_ABAP_STEP', JOBNAME=jobname,

                                                             JOBCOUNT=jobcount,

                                                             EXTERNAL_USER_NAME='AUDIT',

                                                             ABAP_PROGRAM_NAME=program,

                                                             ABAP_VARIANT_NAME=variant)

        else:

            result = self.call('BAPI_XBP_JOB_ADD_ABAP_STEP', JOBNAME=jobname,

                                                             JOBCOUNT=jobcount,

                                                             EXTERNAL_USER_NAME='AUDIT',

                                                             ABAP_PROGRAM_NAME=program)

        result = self.call('BAPI_XBP_JOB_CLOSE', JOBNAME=jobname,

                                                 JOBCOUNT=jobcount,

                                                 EXTERNAL_USER_NAME='AUDIT')

        result = self.call('BAPI_XBP_JOB_START_IMMEDIATELY', JOBNAME=jobname,

                                                             JOBCOUNT=jobcount,

                                                             EXTERNAL_USER_NAME='AUDIT')

        if wait=='yes':

            jobstatus=self._wait_until_job_completed(jobname,jobcount)

        result=dict(JOBCOUNT  = jobcount,

                    JOBSTATUS = jobstatus)

        return(result)

Wait until Job Completes

The job was started immediately, but before we can download the spool, we need to make sure that the job is really complete. We will use function module SUBST_CHECK_BATCHJOB for this:

    def _wait_until_job_completed(self, jobname, jobcount):

        """

        Checks whether a job is still running and waits until it completes.

        :param jobname:

        :param jobcount:

        """

        jobstatus = 'X'

        while jobstatus not in ['F','A']:

            status=self.call('SUBST_CHECK_BATCHJOB',JOBNAME=jobname, JOBCOUNT=jobcount)

            jobstatus=status['JOBSTATUS']

            print ('job running')

            time.sleep(3)

        return(jobstatus)

Determine the Spool-ID

The batch job will generate a spool request that is identified by the spool ID. To determine the spool ID we need to check table TBTCP:

    def determine_spool_id(self, jobname, jobcount):

        """

        find the spool ID based on the job name and job count.

        :param jobname: name of the SM37 job

        :param jobcount: Job Count

        :return:

        """


        where_clause = "JOBNAME EQ '" +jobname+"' AND JOBCOUNT EQ '" + jobcount + "' AND STEPCOUNT EQ '1'"

        result = self.call('RFC_READ_TABLE', \

                           QUERY_TABLE='TBTCP', \

                           DELIMITER='|', \

                           FIELDS=[{'FIELDNAME':'LISTIDENT'}], \

                           OPTIONS = [{'TEXT':where_clause}])

        tabledump=result['DATA']

        spoolid = tabledump[0]['WA']

        return spoolid

Download Spool

After finding out the spool ID we can download the spool into a variable for further analysis:

    def download_spool(self, spoolid):

        """

        Download Spool File

        :param spoolid: spool ID of the job

        """

        result = self.call ('BAPI_XMI_LOGON', EXTCOMPANY='linkies',

                                              EXTPRODUCT='assessment',

                                              INTERFACE='XBP',

                                              VERSION='2.0')

        result = self.call('BAPI_XBP_JOB_READ_SINGLE_SPOOL',SPOOL_REQUEST=spoolid,

                                                            EXTERNAL_USER_NAME='AUDIT'

                                                            )

        return(result['SPOOL_LIST_PLAIN'])

Extract Users From Spool

The spool file will be a large text stored in a variable. We need to parse the spool file to extract the user information:

    def _extract_users_from_spool(self, spoolfile):

        """

        Extracts the users from the spool file generated by SUIM

        :param spoolfile:

        :return: returns a list of dictionaries with the user records.

        """

        import re

        users = list()

        issues = list()

        userrecord = dict()

        for line in spoolfile:

            flatline = line['LINE']

            if flatline<>'':

                if flatline[0]=='|' and flatline[0:7] == '|  User':  

                    linearray=self.locations_of_substring(flatline,'|')

                if flatline[0]=='|' and flatline[0:9] <>'|  User  ' \

                                    and flatline[0:9] <>'|--------' \

                                    and flatline[0:9] <>'|  User n':

                    userrecord=dict(username     = flatline[1:linearray[1]].strip(),

                                    completename = flatline[linearray[1]+1:linearray[2]].strip(),

                                    usergroup    = flatline[linearray[2]+1:linearray[3]].strip(),

                                    accountnumber= flatline[linearray[3]+1:linearray[4]].strip(),

                                    locked       = flatline[linearray[4]+1:linearray[5]].strip(),

                                    reason       = flatline[linearray[5]+1:linearray[6]].strip(),

                                    validfrom    = flatline[linearray[6]+1:linearray[7]].strip(),

                                    validto      = flatline[linearray[7]+1:linearray[8]].strip(),

                                    usertype     = flatline[linearray[8]+1:linearray[9]].strip(),

                                    refuser      = flatline[linearray[9]+1:linearray[10]].strip())

                    if len(linearray)==12:

                        # there is a new policy column with 7.31. That needs to be included

                        userrecord['policy'] = flatline[linearray[10]+1:linearray[11]].strip()

                    if userrecord['reason'] <>'':

                        userrecord['locked']='X'

                    users.append(userrecord)

        return(users)

The function uses another function to determine the number of '|' in the first line that contains the headers. We need to add that function to our class, too:

    def locations_of_substring(self, string, substring):

        """Return a list of locations of a substring.

        :param substring:

        """

        substring_length = len(substring)

        def recurse(locations_found, start):

            location = string.find(substring, start)

            if location != -1:

                return recurse(locations_found + [location], location+substring_length)

            else:

                return locations_found

        return recurse([], 0)

Generate the Excel file with the users

To generate the excel file I will use a different python module that needs to be installed called xlsxwriter (XlsxWriter 0.5.7 : Python Package Index).

    def generate_xlsx_file(self, data, filename):

        """

        Generate a XLSX file using returned data

        :param data: data

        :razz: aram filename: xlsx filename

        """

        import xlsxwriter

        workbook = xlsxwriter.Workbook(filename)

        worksheet = workbook.add_worksheet()

        worksheet.freeze_panes(1,0)

        xls_row=0

        xls_column=0

        bold = workbook.add_format({'bold': True})

        for record in data:

            if len(record.keys())==10:

                if xls_row==0:

                    for i in range(0,9):

                        worksheet.write(xls_row,i,self.columns_10[i],bold)

                        worksheet.set_column(0,0,20)

                        worksheet.set_column(1,1,50)

                    xls_row=1

                else:

                    for i in range(0,9):

                        if i == 0: worksheet.write(xls_row,i,  data['username'])

                        elif i == 1: worksheet.write(xls_row,i,data['completename'])

                        elif i == 2: worksheet.write(xls_row,i,data['usergroup'])

                        elif i == 3: worksheet.write(xls_row,i,data['accountnumber'])

                        elif i == 4: worksheet.write(xls_row,i,data['locked'])

                        elif i == 5: worksheet.write(xls_row,i,data['reason'])

                        elif i == 6: worksheet.write(xls_row,i,data['validfrom'])

                        elif i == 7: worksheet.write(xls_row,i,data['validto'])

                        elif i == 8: worksheet.write(xls_row,i,data['usertype'])

                        elif i == 9: worksheet.write(xls_row,i,data['refuser'])

                    xls_row=xls_row+1

            elif len(record.keys())==11:

                if xls_row==0:

                    for i in range(0,10):

                        worksheet.write(0,i,self.columns_11[i],bold)

                        worksheet.set_column(0,0,20)

                        worksheet.set_column(1,1,50)

                    xls_row=1

                else:

                    for i in range(0,10):

                        if   i == 0: worksheet.write(xls_row, i,record['username'])

                        elif i == 1: worksheet.write(xls_row, i,record['completename'])

                        elif i == 2: worksheet.write(xls_row, i,record['usergroup'])

                        elif i == 3: worksheet.write(xls_row, i,record['accountnumber'])

                        elif i == 4: worksheet.write(xls_row, i,record['locked'])

                        elif i == 5: worksheet.write(xls_row, i,record['reason'])

                        elif i == 6: worksheet.write(xls_row, i,record['validfrom'])

                        elif i == 7: worksheet.write(xls_row, i,record['validto'])

                        elif i == 8: worksheet.write(xls_row, i,record['usertype'])

                        elif i == 9: worksheet.write(xls_row, i,record['refuser'])

                        elif i == 10: worksheet.write(xls_row,i,record['policy'])

                    xls_row=xls_row+1

        workbook.close()

Combining all functions

The last function will perform all of the previous functions to make the scheduling easier.

    def run_suim_job(self,variant, filename):

        """

        run a SUIM report using a single variant

        :param variant: dictionary that contains the variant

        :return:

        """

        self._create_suim_variant(variant)

        result=self.schedule_job_immediately('RSUSR002','RSUSR002','TEMPVARI')

        spoolid=self.determine_spool_id('RSUSR002',result['JOBCOUNT'])

        spoolfile = self.download_spool(int(spoolid))

        users = self._extract_users_from_spool(spoolfile)

        self.generate_xlsx_file(self, users, filename)

Putting it all together

The entire class would look like this:

import pyrfc, xlsxwriter

class sap_abapclient(pyrfc.Connection):

    def __init__(self, *args, **kwargs 😞

        pyrfc.Connection.__init__(self, *args, **kwargs )

        self.condetails=self.get_connection_attributes()



    def run_suim_job(self,variant, filename):

         ...


    def def generate_xlsx_file(self, data, filename):

         ...


    def locations_of_substring(self, string, substring):

         ...

    def _extract_users_from_spool(self, spoolfile):

         ...


    def download_spool(self, spoolid):

         ...


    def schedule_job_immediately(self, jobname, program, variant='none', wait='yes'):

          ...

    def check_if_variant_exists(self, program, variant):

          ...


    def _create_suim_variant(self, variant):

         ...


    def _wait_until_job_completed(self, jobname, jobcount):

         ...


    def determine_spool_id(self, jobname, jobcount):

         ...


    def download_spool(self, spoolid):

         ...


Executing the Script

To test the script we need to add some lines to the end of the file to make it actually do something. Otherwise the file would only be a text file that contains a definition of a python class but nothing else. The following construct will execute the script.

  • the IF condition will execute the corresponding code underneath
  • several variables will be initialized
  • a connection to the SAP system is initiated using our own subclass that inherited everything from the pyrfc.Connection class.
  • our own functions to schedule RSUSR002 will be executed.

If everything works well, we will have two Excel files in our current work directory. The example does not contain any error handling though.

if __name__ == '__main__':

     sap_all_variant=[]

        # the next lines contain the data for the variant as stored in parameters.

        tempdict=dict(SELNAME = "PROF1", KIND = "S", OPTION = "EQ", LOW = "SAP_ALL", HIGH = "", SIGN = "I")

        self.variantdict.append(tempdict)


    s_develop_variant=[]

        tempdict=dict(SELNAME = "OBJ1",   KIND = "P",  OPTION = "", LOW = "S_DEVELOP",  HIGH = "", SIGN = "")

        self.variantdict.append(tempdict)

        tempdict=dict(SELNAME = "VAL111", KIND = "P",  OPTION = "", LOW = 'PROG',    HIGH = "", SIGN = "")

        self.variantdict.append(tempdict)

        tempdict=dict(SELNAME = "VAL141", KIND = "P",  OPTION = "", LOW = '03',      HIGH = "", SIGN = "")

        self.variantdict.append(tempdict)


    logon_details = dict( user = <username>,

                          passwd = <secret>,

                         ashost = <hostname>,

                          client = <client>,

                          sysnr = <systemnumber>,

                         sid = <sid> )


    connection=sap_abapclient(**params)

    connection.run_suim_job(sap_all_variant,'sap_all_users.xlsx')

    connection.run_suim_job(s_develop_variant,'s_develop_users.xlsx')


This script can be easily extended. You don't need to write things into a excel spreadsheet. You can store things in a database, refresh the data regularly and send an email with changes for example... or generate a nice HTML page using a template engine like jinja2 (Welcome | Jinja2 (The Python Template Engine)), Or you can use the example and write a script that schedules all kinds of basis jobs consistently across an SAP landscape....

2 Comments