Arithmetic Calculations

Created by Vijendra Sawant, Modified on Thu, 9 Nov, 2023 at 2:38 PM by Vijendra Sawant


/*
Calculation Script name - WhizCalArithmetic
Description - WhizCalArithmetic is a custom computation used to carry out arithmetic operations (division, multiplication, addition, subtraction) between two data sources.

Product Version - v66
Script Version - 1.0

***Disclaimer - This is proprietary code of Whiz.AI. If you want to build on top of this computation script please make a copy/clone of this script and then alter the script which is copied or cloned. When version upgrade happens there are chances that this script may get modified hence any customization done by customer/partner developer may be overwritten.

Key Functions - 1.Function name : extractFromMetadataQuery
                 purpose : extract data from metaDataQuery from call
                 input parameters : metaDataQuery
                 output parameters : Dictionary with key-value pair, Key- variable and value as data for that Variable
                
                2.Function name : getAggrOperation
                 purpose : create aggregation based on the arithmetic operation defined in entity config
                 input parameters : Numerator Aggregation, Denominator Aggregation, Dictionary returned from extractFromMetadataQuery function
                 output parameters : Aggregation
              
                3.Function name : sameDSPostAggrDF
                 purpose : Builds postAggr and return dataframe when data source is same 
                 input parameters : metadataQuery, Dictionary
                 output parameters : DataFrame
                           
Key variables - query : implicit variable
                WhizCalLib : The name of the object present in WhizCalLib.js(library script).
                                         execution scripts can call WhizCalLib scripts functions using given object.
                dataAccessManager: This class is exposed to scripts by Framework through implicit variable dataAccessManager.
                                   this acts as gateway to data exposed by the framework. A query can be fired through this instance to get the data.             

*/
//**Start of Function code

(() => {
    /****************** Initialize Required Varibale ************************* */

    const metadataQuery = query;

    //name for metadata query, this helps logging and identification of query
    metadataQuery.name("OriginalMetadataQuery")

    function cumulativeCalculation(df, dimensions, metric) {
        // sortAscending(List<String> columnNames, Boolean placeNullsAtEnd) : sort the DataFrame in ascending order of given columns and whether to place nulls at end
        df.sortAscending(["timestamp"], true);

        //group(@NonNull String rowGroupResponseColName, @NonNull List<String> groupingColumns) :rowGroupResponseColName - Name of the column which would contain matching rows as list of matching rows,
        //groupingColumns - The columns to be used for grouping, these would remain as separate columns in the DataFrame returned
        const groupDF = df.group("group", ["timestamp"]);

        const groupValueMap = new Map();
        const finalDF = new DataFrame();

        //getColumnNames returns name of the columns present in the DataFrame
        for (const col of df.getColumnNames()) {

            //getColumnMetaData(String columnName) :returns metadata info map about the column
            //addColumn(String columnName, Map<String,Object> columnMetadata) : Adds column with given name and give metadata information associated with the column.
            finalDF.addColumn(col, df.getColumnMetaData(col));
        }

        //asListOfRows() returns response as list of rows, new rows cannot be added or deleted into it
        for (const outerRow of groupDF.asListOfRows()) {
            const rows = outerRow["group"];
            for (const row of rows) {
                let group = JSON.parse(JSON.stringify(row));
                for (col of df.getColumnNames()) {
                    if (!dimensions.has(col)) {
                        delete group[col];
                    }
                }
                group = JSON.stringify(group);
                let currentValue = row[metric];
                let prevValue = groupValueMap.get(group);
                currentValue = !currentValue ? 0 : currentValue;
                prevValue = !prevValue ? 0 : prevValue;
                const sum = prevValue + currentValue;
                row[metric] = sum;
                finalDF.addRow(row);
                groupValueMap.set(group, sum);
            }
        }
        return finalDF;
    }

    function extractFromMetadataQuery(metadataQueryBase) { // extract data from metadataquery from call
        var baseParametersDict = {}
        var divByZeroResponse = 0

        //Returns the column set for this aggregation
        //it will give primary aggregation column like TRx,NRx
        const baseMetric = primaryAggr.column()

        //returns data source configured
        var baseDataSource = metadataQueryBase.dataSource()

        // get  the "dynamicArgs" map from entity-configuration
        var dynamicArgs = addonProps?.["defaultFilter"]["dynamicArgs"];

        var numDataSource = dynamicArgs['numerator']['datasource'] ? dynamicArgs['numerator']['datasource'] : baseDataSource
        var numExceptDimList = dynamicArgs['numerator']['except_dimension'] ? dynamicArgs['numerator']['except_dimension'] : []
        var denDataSource = dynamicArgs['denominator']['datasource'] ? dynamicArgs['denominator']['datasource'] : baseDataSource
        var denExceptDimList = dynamicArgs['denominator']['except_dimension'] ? dynamicArgs['denominator']['except_dimension'] : []
        var numMetricName = dynamicArgs['numerator']['metricName']
        var denMetricName = dynamicArgs['denominator']['metricName']
        var metric = outputMetricName // outputMetricName : inbuilt variable
        var denTimestamp = dynamicArgs['denominator']['timestamp']
        var numTimestamp = dynamicArgs['numerator']['timestamp']
        var cumulativeFlag = dynamicArgs['cumulative_flag']

        //returned list is copy of intervals set for the query and any modifications to the list will not reflect in the query
        var metaInterval = metadataQueryBase.intervals()

        var arithmeticOperation = dynamicArgs['arithmeticOperation'] != undefined ? dynamicArgs['arithmeticOperation'] : 'percent'

        baseParametersDict['baseDataSource'] = baseDataSource
        baseParametersDict['dynamicArgs'] = dynamicArgs
        baseParametersDict['metric'] = metric
        baseParametersDict['metaInterval'] = metaInterval
        baseParametersDict['arithmeticOperation'] = arithmeticOperation
        baseParametersDict['baseMetric'] = baseMetric
        baseParametersDict['numDataSource'] = numDataSource
        baseParametersDict['denDataSource'] = denDataSource
        baseParametersDict['numMetricName'] = numMetricName
        baseParametersDict['denMetricName'] = denMetricName
        baseParametersDict['divByZeroResponse'] = divByZeroResponse
        baseParametersDict['numExceptDimList'] = numExceptDimList
        baseParametersDict['denExceptDimList'] = denExceptDimList
        baseParametersDict['denTimestamp'] = denTimestamp
        baseParametersDict['numTimestamp'] = numTimestamp
        baseParametersDict['cumulativeFlag'] = cumulativeFlag

        //log.info("------- baseParametersDict {}", baseParametersDict)
        return baseParametersDict
    }

    function createJoinQuery(metadataQuery,
        QRNum,
        QRDen,
        outputMetricName,
        baseParamDict
    ) {
        /**
         * metadataQuery: pass query from call
         * QRNum: first query 
         * QRDen: Second query
         * outputMetricName: outputMetricName
         * baseParamDict: parameters dict 
         */


        //returns offset set in metadataQuery
        var offset = metadataQuery.offset();

        //returns limit set in metadataQuery
        var limit = metadataQuery.limit();

        //shallow Copy of sort orders configured
        var order = metadataQuery.orderBy();

        //name(String name)- name for metadata query, this helps logging and identification of query
        //limit(Integer limit)- sets limit on number of records that would be sent back by query
        //offset(Integer offset)- the number of first records with specified offset would be skipped
        //clearOrderBy()- clears the order by clauses set on this query
        QRNum.name("WhizCalArithmetic_NumeratorQuery").limit(null).offset(null).clearOrderBy();

        QRDen.name("WhizCalArithmetic_DenominatorQuery").limit(null).offset(null).clearOrderBy();
            
        var p1DS = DataSource.innerQuery("d1", QRNum);
        var p2DS = DataSource.innerQuery("d2", QRDen);
        //returns shallow Copy of dimensions configured
        var joinColumns = metadataQuery.dimNames();
        
        var p1CL = p1DS.column(baseParamDict.numMetricName);
        var p2CL = p2DS.column(baseParamDict.denMetricName);
        let aggrAction = getAggrOperation(p1CL, p2CL, baseParamDict);

        if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && baseParamDict.denTimestamp && baseParamDict.numTimestamp) {
            throw new Error(
                "granularity query not supported when numerator and denominator contains timestamp"
            );
        }

        if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && !baseParamDict.denTimestamp && !baseParamDict.numTimestamp) {//As timestamp doesn't matter and is strangely calculated by druid to join on
            joinColumns.push(TIMESTAMP_RESPONSE_COLUMN);
        }

        //creates a query with specified details
        var joinQuery = MetadataQuery.create()

            //Sets the model on which this query is to be run, model + dataSource combination decides the DB Table or Druid data source to be used to run the query
            .model(metadataQuery.model())
            //name for metadata query, this helps logging and identification of query
            .name("JoinQuery")
            //columns to be fetch from database
            .select(metadataQuery.dimNames())

        if (baseParamDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) {
            //columns to be fetch from database
            joinQuery.select([p1CL, p2CL]);
        }
        else {
            //columns to be fetch from database
            joinQuery.select([p1CL, p2CL, aggrAction]);
        }

        if (baseParamDict.numTimestamp && !baseParamDict.denTimestamp) {

            //from(DBDataSource ds) -returns reference name of primary data source associated with this query
            joinQuery.from(p2DS)
            //leftJoinOn(DBDataSource dataSource, List<String> joinColumns)- create left join on datasource with given list of column    
            joinQuery.leftJoinOn(p1DS, joinColumns)
        }
        else {
            joinQuery.from(p1DS)
            joinQuery.leftJoinOn(p2DS, joinColumns)
        }

        //offset(Integer offset)- the number of first records with specified offset would be skipped
        //limit(Integer limit)- sets limit on number of records that would be sent back by query
        joinQuery.offset(offset).limit(limit)

        //orderBy(List sortData)- convenience method calling orderBy(String) or orderBy(SortData) depending on type of each element  
        joinQuery.orderBy(order);

        return joinQuery.druidCompatibilityMode(true);
    }

    function getAggrOperation(numeratorAggr, denominatorAggr, paramDict) { // create aggregation based on the arithmeticOperation defined in entityConfig 

        //calling the function from library script 
        return WhizCalLib.identifyOperation(paramDict.arithmeticOperation, paramDict.metric, numeratorAggr, denominatorAggr, paramDict.divByZeroResponse);
    }

    function sameDSPostAggrDF(metadataQuery, paramDict) { // Builds postAggr and return dataframe

        sameDSMetadataQuery = metadataQuery
        var num_object = paramDict['dynamicArgs']['numerator']
        var den_object = paramDict['dynamicArgs']['denominator']

        //clears the aggregations set on this query
        sameDSMetadataQuery.clearAggrs()

            //adds given aggregation/post-aggregation to the query
            .aggrs(Aggregation.parseList([num_object.aggregation, den_object.aggregation]))

            /* Replaces given column to all places applicable; currently Dimensions, Aggregations, Filters, SortOrders
               might be extended to more known places , specifically useful for changing marker column patterns with actual column 
             
             You must match the first parameter to the column name supplied in the configuration's numerator aggregation.
            */
            .replaceColumn('__baseMetric', primaryAggr.column());

        numAggr = sameDSMetadataQuery.aggrs()[0]
        denAggr = sameDSMetadataQuery.aggrs()[1]

        if (paramDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) {

            //name for metadata query, this helps logging and identification of query
            sameDSMetadataQuery.name("sameDSMetadataQuery")
        }
        else {
            postAggrAction = getAggrOperation(numAggr, denAggr, paramDict)

            sameDSMetadataQuery.postAggr(postAggrAction)

            //replaceOrderingColumn(String currentOrderingColum,String replacementColum)-Replaces any ordering clauses in the query with given column with ordering on new column

            sameDSMetadataQuery.replaceOrderingColumn(paramDict.baseMetric, paramDict.metric).name("sameDSMetadataQuery")
        }


        //removes all contents referring to each column present in the list from all places applicable 
        sameDSMetadataQuery.removeColumns(paramDict.numExceptDimList).removeColumns(paramDict.denExceptDimList)

        //name for metadata query, this helps logging and identification of query
        sameDSMetadataQuery.name("WhizCalArithmetic_Query");

        //Fires the MetadataQuery passed, returns DataFrame representing the response of the query  
        var sd_response_DF = dataAccessManager.fireQuery(sameDSMetadataQuery);

        return sd_response_DF
    }

    function buildQuery(metaCopy, aggrconf, params) {
        timestampconf = aggrconf.timestamp;

        //returns data source configured
        metaCopy.dataSource(aggrconf.datasource ? aggrconf.datasource : params.baseDataSource)

            //clears the aggregations set on this query
            .clearAggrs()

            /*Replaces given column to all places applicable; currently Dimensions, Aggregations, Filters, SortOrders
               might be extended to more known places , specifically useful for changing marker column patterns with actual column 
            
              You must match the first parameter to the column name supplied in the configuration's numerator aggregation.
           */
            .replaceColumn('__baseMetric', primaryAggr.column())

            //name for metadata query, this helps logging and identification of query    
            .name("FinalinnerMetadatQuery")

            //adds given aggregation/post-aggregation to the query    
            .aggr(Aggregation.parse(aggrconf.aggregation))

            //removes all contents referring to each column present in the list from all places applicable 
            .removeColumns(aggrconf.except_dimension)



        if (timestampconf) {
            //clearIntervals()- clears the intervals set on this query
            //intervals(List<String> isoIntervals)- sets given intervals in ISO format as primary time filtering criteria. It's mandatory in druid to provide at least one ISO interval for aggregation queries
            metaCopy.clearIntervals().intervals(timestampconf);
            //granularity(Granularity granularity)- sets given granularity into the query    
            metaCopy.granularity(Granularity.all())
            metaCopy.addonProperties(null);
        }

        //log.info("updated metadataQuery : {} ", metaCopy);

        return metaCopy;
    }


    function executeComputation(metadataQuery) {
        /******************extract required common info************************* */


        baseParamDict = extractFromMetadataQuery(metadataQuery)

        /******************Operation on Numerator and Denominator************************* */

        //creates deep copy from existing query
        var metaCopy_num = metadataQuery.copy();

        var metaCopy_den = metadataQuery.copy();
        var innerJoinDF;

        //names of dimensions configured.  
        var joinColumns = metadataQuery.dimNames();

        if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && !baseParamDict.denTimestamp && !baseParamDict.numTimestamp) {//As timestamp doesn't matter and is strangely caluclated by druid to join on
            joinColumns.push(TIMESTAMP_RESPONSE_COLUMN);
        }


        if (baseParamDict.numDataSource != baseParamDict.denDataSource) {
            log.info("-------Datasources are different {}", baseParamDict.numDataSource + " <> " + baseParamDict.denDataSource);
            /******************evaluation of Numerator part************************* */
            var num_object = baseParamDict['dynamicArgs']['numerator']
            //log.info("num_object--" + metaCopy_num)
            var QRNum = buildQuery(metaCopy_num, num_object, baseParamDict)

            /******************evaluation of Denominator part************************* */


            var den_object = baseParamDict['dynamicArgs']['denominator']
            var QRDen = buildQuery(metaCopy_den, den_object, baseParamDict)

            /******************Operation on DataFrames************************* */

            innerJoinQuery = createJoinQuery(metadataQuery,
                QRNum,
                QRDen,
                outputMetricName,
                baseParamDict
            )

            //replaceOrderingColumn(String currentOrderingColum,String replacementColum)-Replaces any ordering clauses in the query with given column with ordering on new column
            innerJoinQuery.replaceOrderingColumn(baseParamDict.baseMetric, outputMetricName)
            //log.info("innerJoinQuery-- : {} ", innerJoinQuery);

            //name for metadata query, this helps logging and identification of query  
            innerJoinQuery.name("WhizCalArithmetic_JoinQuery");

            //fires the MetadataQuery passed, returns DataFrame representing the response of the query
            var innerJoinDF = dataAccessManager.fireQuery(innerJoinQuery);

        } else {
            log.info("-------DataSources are same {}", baseParamDict.numDataSource + " == " + baseParamDict.denDataSource);
            innerJoinDF = sameDSPostAggrDF(metadataQuery, baseParamDict);
            //var joinColumns = [];
        }

        if (baseParamDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) {
            let aggrAction = getAggrOperation(baseParamDict.numMetricName, baseParamDict.denMetricName, baseParamDict);
            aggrgateColumns = new Set(joinColumns)
            aggrgateColumns.delete(TIMESTAMP_RESPONSE_COLUMN)
            innerJoinDF = cumulativeCalculation(innerJoinDF, aggrgateColumns, baseParamDict.numMetricName);

            //adds column with given name and empty metadata  
            innerJoinDF.addColumn(aggrAction);
        }


        return innerJoinDF;
    }

    return executeComputation(metadataQuery);
})();


/*
 ** configuration to be set in calculation metric **

default_filter:In this section's dynamicArgs sub-section,
        1.numerator - used to obtain first value
          datasource: name of datasource
          metricName: name of the first metric value
          except_dimension: list of dimensions to be removed from metaDataQuery
          aggregation:specified aggregation to be used to calculate first value 
            i)name- name of aggregation
            ii)type- here we specify which type of calculation we want
            iii)column : It is used to specify on which column given type of operation needs to perform.
            
          
        2.denominator - used to obtain second value
            datasource: name of datasource
          metricName: name of the second metric value
          except_dimension: list of dimensions to be removed from metaDataQuery
          aggregation:specified aggregation to be used to calculate second value 
            i)name- name of aggregation
            ii)type- here we specify which type of calculation we want
            iii)column : It is used to specify on which column given type of operation needs to perform.
            
        3.arithmeticOperation- operation to be performed between obtained values.
             eg.division, addition, subtraction, multiplication, percent, percentChange

**The configuration below should be copied and pasted to perform given Arithmetic Operation .*
*/


  "default_filter": {
    "dynamicArgs": {
      "numerator": {
        "datasource": "fct_calls",
        "metricName": "Y_calls_temp",
        "aggregation": {
          "name": "Y_calls_temp",
          "type": "countDistinct",
          "column": "Customer ID",
          "druidThetaSketchSize": 1024
        },
        "except_dimension": [
          "market"
        ]
      },
      "denominator": {
        "timestamp": [
          "2017-01-01T00:00:00/2045-12-15T23:59:59"
        ],
        "datasource": "call_plan",
        "metricName": "total_customer",
        "aggregation": {
          "name": "total_customer",
          "type": "countDistinct",
          "column": "Customer ID",
          "druidThetaSketchSize": 1024
        },
        "except_dimension": [
          "Product Name"
        ]
      },
      "arithmeticOperation": "percent"
    }
  }

Was this article helpful?

That’s Great!

Thank you for your feedback

Sorry! We couldn't be helpful

Thank you for your feedback

Let us know how can we improve this article!

Select at least one of the reasons
CAPTCHA verification is required.

Feedback sent

We appreciate your effort and will try to fix the article