/* Calculation Script name - WhizCalArithmetic Description - WhizCalArithmetic is a custom computation used to carry out arithmetic operations (division, multiplication, addition, subtraction) between two data sources. Product Version - v66 Script Version - 1.0 ***Disclaimer - This is proprietary code of Whiz.AI. If you want to build on top of this computation script please make a copy/clone of this script and then alter the script which is copied or cloned. When version upgrade happens there are chances that this script may get modified hence any customization done by customer/partner developer may be overwritten. Key Functions - 1.Function name : extractFromMetadataQuery purpose : extract data from metaDataQuery from call input parameters : metaDataQuery output parameters : Dictionary with key-value pair, Key- variable and value as data for that Variable 2.Function name : getAggrOperation purpose : create aggregation based on the arithmetic operation defined in entity config input parameters : Numerator Aggregation, Denominator Aggregation, Dictionary returned from extractFromMetadataQuery function output parameters : Aggregation 3.Function name : sameDSPostAggrDF purpose : Builds postAggr and return dataframe when data source is same input parameters : metadataQuery, Dictionary output parameters : DataFrame Key variables - query : implicit variable WhizCalLib : The name of the object present in WhizCalLib.js(library script). execution scripts can call WhizCalLib scripts functions using given object. dataAccessManager: This class is exposed to scripts by Framework through implicit variable dataAccessManager. this acts as gateway to data exposed by the framework. A query can be fired through this instance to get the data. */ //**Start of Function code (() => { /****************** Initialize Required Varibale ************************* */ const metadataQuery = query; //name for metadata query, this helps logging and identification of query metadataQuery.name("OriginalMetadataQuery") function cumulativeCalculation(df, dimensions, metric) { // sortAscending(List<String> columnNames, Boolean placeNullsAtEnd) : sort the DataFrame in ascending order of given columns and whether to place nulls at end df.sortAscending(["timestamp"], true); //group(@NonNull String rowGroupResponseColName, @NonNull List<String> groupingColumns) :rowGroupResponseColName - Name of the column which would contain matching rows as list of matching rows, //groupingColumns - The columns to be used for grouping, these would remain as separate columns in the DataFrame returned const groupDF = df.group("group", ["timestamp"]); const groupValueMap = new Map(); const finalDF = new DataFrame(); //getColumnNames returns name of the columns present in the DataFrame for (const col of df.getColumnNames()) { //getColumnMetaData(String columnName) :returns metadata info map about the column //addColumn(String columnName, Map<String,Object> columnMetadata) : Adds column with given name and give metadata information associated with the column. finalDF.addColumn(col, df.getColumnMetaData(col)); } //asListOfRows() returns response as list of rows, new rows cannot be added or deleted into it for (const outerRow of groupDF.asListOfRows()) { const rows = outerRow["group"]; for (const row of rows) { let group = JSON.parse(JSON.stringify(row)); for (col of df.getColumnNames()) { if (!dimensions.has(col)) { delete group[col]; } } group = JSON.stringify(group); let currentValue = row[metric]; let prevValue = groupValueMap.get(group); currentValue = !currentValue ? 0 : currentValue; prevValue = !prevValue ? 0 : prevValue; const sum = prevValue + currentValue; row[metric] = sum; finalDF.addRow(row); groupValueMap.set(group, sum); } } return finalDF; } function extractFromMetadataQuery(metadataQueryBase) { // extract data from metadataquery from call var baseParametersDict = {} var divByZeroResponse = 0 //Returns the column set for this aggregation //it will give primary aggregation column like TRx,NRx const baseMetric = primaryAggr.column() //returns data source configured var baseDataSource = metadataQueryBase.dataSource() // get the "dynamicArgs" map from entity-configuration var dynamicArgs = addonProps?.["defaultFilter"]["dynamicArgs"]; var numDataSource = dynamicArgs['numerator']['datasource'] ? dynamicArgs['numerator']['datasource'] : baseDataSource var numExceptDimList = dynamicArgs['numerator']['except_dimension'] ? dynamicArgs['numerator']['except_dimension'] : [] var denDataSource = dynamicArgs['denominator']['datasource'] ? dynamicArgs['denominator']['datasource'] : baseDataSource var denExceptDimList = dynamicArgs['denominator']['except_dimension'] ? dynamicArgs['denominator']['except_dimension'] : [] var numMetricName = dynamicArgs['numerator']['metricName'] var denMetricName = dynamicArgs['denominator']['metricName'] var metric = outputMetricName // outputMetricName : inbuilt variable var denTimestamp = dynamicArgs['denominator']['timestamp'] var numTimestamp = dynamicArgs['numerator']['timestamp'] var cumulativeFlag = dynamicArgs['cumulative_flag'] //returned list is copy of intervals set for the query and any modifications to the list will not reflect in the query var metaInterval = metadataQueryBase.intervals() var arithmeticOperation = dynamicArgs['arithmeticOperation'] != undefined ? dynamicArgs['arithmeticOperation'] : 'percent' baseParametersDict['baseDataSource'] = baseDataSource baseParametersDict['dynamicArgs'] = dynamicArgs baseParametersDict['metric'] = metric baseParametersDict['metaInterval'] = metaInterval baseParametersDict['arithmeticOperation'] = arithmeticOperation baseParametersDict['baseMetric'] = baseMetric baseParametersDict['numDataSource'] = numDataSource baseParametersDict['denDataSource'] = denDataSource baseParametersDict['numMetricName'] = numMetricName baseParametersDict['denMetricName'] = denMetricName baseParametersDict['divByZeroResponse'] = divByZeroResponse baseParametersDict['numExceptDimList'] = numExceptDimList baseParametersDict['denExceptDimList'] = denExceptDimList baseParametersDict['denTimestamp'] = denTimestamp baseParametersDict['numTimestamp'] = numTimestamp baseParametersDict['cumulativeFlag'] = cumulativeFlag //log.info("------- baseParametersDict {}", baseParametersDict) return baseParametersDict } function createJoinQuery(metadataQuery, QRNum, QRDen, outputMetricName, baseParamDict ) { /** * metadataQuery: pass query from call * QRNum: first query * QRDen: Second query * outputMetricName: outputMetricName * baseParamDict: parameters dict */ //returns offset set in metadataQuery var offset = metadataQuery.offset(); //returns limit set in metadataQuery var limit = metadataQuery.limit(); //shallow Copy of sort orders configured var order = metadataQuery.orderBy(); //name(String name)- name for metadata query, this helps logging and identification of query //limit(Integer limit)- sets limit on number of records that would be sent back by query //offset(Integer offset)- the number of first records with specified offset would be skipped //clearOrderBy()- clears the order by clauses set on this query QRNum.name("WhizCalArithmetic_NumeratorQuery").limit(null).offset(null).clearOrderBy(); QRDen.name("WhizCalArithmetic_DenominatorQuery").limit(null).offset(null).clearOrderBy(); var p1DS = DataSource.innerQuery("d1", QRNum); var p2DS = DataSource.innerQuery("d2", QRDen); //returns shallow Copy of dimensions configured var joinColumns = metadataQuery.dimNames(); var p1CL = p1DS.column(baseParamDict.numMetricName); var p2CL = p2DS.column(baseParamDict.denMetricName); let aggrAction = getAggrOperation(p1CL, p2CL, baseParamDict); if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && baseParamDict.denTimestamp && baseParamDict.numTimestamp) { throw new Error( "granularity query not supported when numerator and denominator contains timestamp" ); } if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && !baseParamDict.denTimestamp && !baseParamDict.numTimestamp) {//As timestamp doesn't matter and is strangely calculated by druid to join on joinColumns.push(TIMESTAMP_RESPONSE_COLUMN); } //creates a query with specified details var joinQuery = MetadataQuery.create() //Sets the model on which this query is to be run, model + dataSource combination decides the DB Table or Druid data source to be used to run the query .model(metadataQuery.model()) //name for metadata query, this helps logging and identification of query .name("JoinQuery") //columns to be fetch from database .select(metadataQuery.dimNames()) if (baseParamDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) { //columns to be fetch from database joinQuery.select([p1CL, p2CL]); } else { //columns to be fetch from database joinQuery.select([p1CL, p2CL, aggrAction]); } if (baseParamDict.numTimestamp && !baseParamDict.denTimestamp) { //from(DBDataSource ds) -returns reference name of primary data source associated with this query joinQuery.from(p2DS) //leftJoinOn(DBDataSource dataSource, List<String> joinColumns)- create left join on datasource with given list of column joinQuery.leftJoinOn(p1DS, joinColumns) } else { joinQuery.from(p1DS) joinQuery.leftJoinOn(p2DS, joinColumns) } //offset(Integer offset)- the number of first records with specified offset would be skipped //limit(Integer limit)- sets limit on number of records that would be sent back by query joinQuery.offset(offset).limit(limit) //orderBy(List sortData)- convenience method calling orderBy(String) or orderBy(SortData) depending on type of each element joinQuery.orderBy(order); return joinQuery.druidCompatibilityMode(true); } function getAggrOperation(numeratorAggr, denominatorAggr, paramDict) { // create aggregation based on the arithmeticOperation defined in entityConfig //calling the function from library script return WhizCalLib.identifyOperation(paramDict.arithmeticOperation, paramDict.metric, numeratorAggr, denominatorAggr, paramDict.divByZeroResponse); } function sameDSPostAggrDF(metadataQuery, paramDict) { // Builds postAggr and return dataframe sameDSMetadataQuery = metadataQuery var num_object = paramDict['dynamicArgs']['numerator'] var den_object = paramDict['dynamicArgs']['denominator'] //clears the aggregations set on this query sameDSMetadataQuery.clearAggrs() //adds given aggregation/post-aggregation to the query .aggrs(Aggregation.parseList([num_object.aggregation, den_object.aggregation])) /* Replaces given column to all places applicable; currently Dimensions, Aggregations, Filters, SortOrders might be extended to more known places , specifically useful for changing marker column patterns with actual column You must match the first parameter to the column name supplied in the configuration's numerator aggregation. */ .replaceColumn('__baseMetric', primaryAggr.column()); numAggr = sameDSMetadataQuery.aggrs()[0] denAggr = sameDSMetadataQuery.aggrs()[1] if (paramDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) { //name for metadata query, this helps logging and identification of query sameDSMetadataQuery.name("sameDSMetadataQuery") } else { postAggrAction = getAggrOperation(numAggr, denAggr, paramDict) sameDSMetadataQuery.postAggr(postAggrAction) //replaceOrderingColumn(String currentOrderingColum,String replacementColum)-Replaces any ordering clauses in the query with given column with ordering on new column sameDSMetadataQuery.replaceOrderingColumn(paramDict.baseMetric, paramDict.metric).name("sameDSMetadataQuery") } //removes all contents referring to each column present in the list from all places applicable sameDSMetadataQuery.removeColumns(paramDict.numExceptDimList).removeColumns(paramDict.denExceptDimList) //name for metadata query, this helps logging and identification of query sameDSMetadataQuery.name("WhizCalArithmetic_Query"); //Fires the MetadataQuery passed, returns DataFrame representing the response of the query var sd_response_DF = dataAccessManager.fireQuery(sameDSMetadataQuery); return sd_response_DF } function buildQuery(metaCopy, aggrconf, params) { timestampconf = aggrconf.timestamp; //returns data source configured metaCopy.dataSource(aggrconf.datasource ? aggrconf.datasource : params.baseDataSource) //clears the aggregations set on this query .clearAggrs() /*Replaces given column to all places applicable; currently Dimensions, Aggregations, Filters, SortOrders might be extended to more known places , specifically useful for changing marker column patterns with actual column You must match the first parameter to the column name supplied in the configuration's numerator aggregation. */ .replaceColumn('__baseMetric', primaryAggr.column()) //name for metadata query, this helps logging and identification of query .name("FinalinnerMetadatQuery") //adds given aggregation/post-aggregation to the query .aggr(Aggregation.parse(aggrconf.aggregation)) //removes all contents referring to each column present in the list from all places applicable .removeColumns(aggrconf.except_dimension) if (timestampconf) { //clearIntervals()- clears the intervals set on this query //intervals(List<String> isoIntervals)- sets given intervals in ISO format as primary time filtering criteria. It's mandatory in druid to provide at least one ISO interval for aggregation queries metaCopy.clearIntervals().intervals(timestampconf); //granularity(Granularity granularity)- sets given granularity into the query metaCopy.granularity(Granularity.all()) metaCopy.addonProperties(null); } //log.info("updated metadataQuery : {} ", metaCopy); return metaCopy; } function executeComputation(metadataQuery) { /******************extract required common info************************* */ baseParamDict = extractFromMetadataQuery(metadataQuery) /******************Operation on Numerator and Denominator************************* */ //creates deep copy from existing query var metaCopy_num = metadataQuery.copy(); var metaCopy_den = metadataQuery.copy(); var innerJoinDF; //names of dimensions configured. var joinColumns = metadataQuery.dimNames(); if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && !baseParamDict.denTimestamp && !baseParamDict.numTimestamp) {//As timestamp doesn't matter and is strangely caluclated by druid to join on joinColumns.push(TIMESTAMP_RESPONSE_COLUMN); } if (baseParamDict.numDataSource != baseParamDict.denDataSource) { log.info("-------Datasources are different {}", baseParamDict.numDataSource + " <> " + baseParamDict.denDataSource); /******************evaluation of Numerator part************************* */ var num_object = baseParamDict['dynamicArgs']['numerator'] //log.info("num_object--" + metaCopy_num) var QRNum = buildQuery(metaCopy_num, num_object, baseParamDict) /******************evaluation of Denominator part************************* */ var den_object = baseParamDict['dynamicArgs']['denominator'] var QRDen = buildQuery(metaCopy_den, den_object, baseParamDict) /******************Operation on DataFrames************************* */ innerJoinQuery = createJoinQuery(metadataQuery, QRNum, QRDen, outputMetricName, baseParamDict ) //replaceOrderingColumn(String currentOrderingColum,String replacementColum)-Replaces any ordering clauses in the query with given column with ordering on new column innerJoinQuery.replaceOrderingColumn(baseParamDict.baseMetric, outputMetricName) //log.info("innerJoinQuery-- : {} ", innerJoinQuery); //name for metadata query, this helps logging and identification of query innerJoinQuery.name("WhizCalArithmetic_JoinQuery"); //fires the MetadataQuery passed, returns DataFrame representing the response of the query var innerJoinDF = dataAccessManager.fireQuery(innerJoinQuery); } else { log.info("-------DataSources are same {}", baseParamDict.numDataSource + " == " + baseParamDict.denDataSource); innerJoinDF = sameDSPostAggrDF(metadataQuery, baseParamDict); //var joinColumns = []; } if (baseParamDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) { let aggrAction = getAggrOperation(baseParamDict.numMetricName, baseParamDict.denMetricName, baseParamDict); aggrgateColumns = new Set(joinColumns) aggrgateColumns.delete(TIMESTAMP_RESPONSE_COLUMN) innerJoinDF = cumulativeCalculation(innerJoinDF, aggrgateColumns, baseParamDict.numMetricName); //adds column with given name and empty metadata innerJoinDF.addColumn(aggrAction); } return innerJoinDF; } return executeComputation(metadataQuery); })();
/* ** configuration to be set in calculation metric ** default_filter:In this section's dynamicArgs sub-section, 1.numerator - used to obtain first value datasource: name of datasource metricName: name of the first metric value except_dimension: list of dimensions to be removed from metaDataQuery aggregation:specified aggregation to be used to calculate first value i)name- name of aggregation ii)type- here we specify which type of calculation we want iii)column : It is used to specify on which column given type of operation needs to perform. 2.denominator - used to obtain second value datasource: name of datasource metricName: name of the second metric value except_dimension: list of dimensions to be removed from metaDataQuery aggregation:specified aggregation to be used to calculate second value i)name- name of aggregation ii)type- here we specify which type of calculation we want iii)column : It is used to specify on which column given type of operation needs to perform. 3.arithmeticOperation- operation to be performed between obtained values. eg.division, addition, subtraction, multiplication, percent, percentChange **The configuration below should be copied and pasted to perform given Arithmetic Operation .* */ "default_filter": { "dynamicArgs": { "numerator": { "datasource": "fct_calls", "metricName": "Y_calls_temp", "aggregation": { "name": "Y_calls_temp", "type": "countDistinct", "column": "Customer ID", "druidThetaSketchSize": 1024 }, "except_dimension": [ "market" ] }, "denominator": { "timestamp": [ "2017-01-01T00:00:00/2045-12-15T23:59:59" ], "datasource": "call_plan", "metricName": "total_customer", "aggregation": { "name": "total_customer", "type": "countDistinct", "column": "Customer ID", "druidThetaSketchSize": 1024 }, "except_dimension": [ "Product Name" ] }, "arithmeticOperation": "percent" } }
Was this article helpful?
That’s Great!
Thank you for your feedback
Sorry! We couldn't be helpful
Thank you for your feedback
Feedback sent
We appreciate your effort and will try to fix the article