/*
Calculation Script name - WhizCalArithmetic
Description - WhizCalArithmetic is a custom computation used to carry out arithmetic operations (division, multiplication, addition, subtraction) between two data sources.
Product Version - v66
Script Version - 1.0
***Disclaimer - This is proprietary code of Whiz.AI. If you want to build on top of this computation script please make a copy/clone of this script and then alter the script which is copied or cloned. When version upgrade happens there are chances that this script may get modified hence any customization done by customer/partner developer may be overwritten.
Key Functions - 1.Function name : extractFromMetadataQuery
purpose : extract data from metaDataQuery from call
input parameters : metaDataQuery
output parameters : Dictionary with key-value pair, Key- variable and value as data for that Variable
2.Function name : getAggrOperation
purpose : create aggregation based on the arithmetic operation defined in entity config
input parameters : Numerator Aggregation, Denominator Aggregation, Dictionary returned from extractFromMetadataQuery function
output parameters : Aggregation
3.Function name : sameDSPostAggrDF
purpose : Builds postAggr and return dataframe when data source is same
input parameters : metadataQuery, Dictionary
output parameters : DataFrame
Key variables - query : implicit variable
WhizCalLib : The name of the object present in WhizCalLib.js(library script).
execution scripts can call WhizCalLib scripts functions using given object.
dataAccessManager: This class is exposed to scripts by Framework through implicit variable dataAccessManager.
this acts as gateway to data exposed by the framework. A query can be fired through this instance to get the data.
*/
//**Start of Function code
(() => {
/****************** Initialize Required Varibale ************************* */
const metadataQuery = query;
//name for metadata query, this helps logging and identification of query
metadataQuery.name("OriginalMetadataQuery")
function cumulativeCalculation(df, dimensions, metric) {
// sortAscending(List<String> columnNames, Boolean placeNullsAtEnd) : sort the DataFrame in ascending order of given columns and whether to place nulls at end
df.sortAscending(["timestamp"], true);
//group(@NonNull String rowGroupResponseColName, @NonNull List<String> groupingColumns) :rowGroupResponseColName - Name of the column which would contain matching rows as list of matching rows,
//groupingColumns - The columns to be used for grouping, these would remain as separate columns in the DataFrame returned
const groupDF = df.group("group", ["timestamp"]);
const groupValueMap = new Map();
const finalDF = new DataFrame();
//getColumnNames returns name of the columns present in the DataFrame
for (const col of df.getColumnNames()) {
//getColumnMetaData(String columnName) :returns metadata info map about the column
//addColumn(String columnName, Map<String,Object> columnMetadata) : Adds column with given name and give metadata information associated with the column.
finalDF.addColumn(col, df.getColumnMetaData(col));
}
//asListOfRows() returns response as list of rows, new rows cannot be added or deleted into it
for (const outerRow of groupDF.asListOfRows()) {
const rows = outerRow["group"];
for (const row of rows) {
let group = JSON.parse(JSON.stringify(row));
for (col of df.getColumnNames()) {
if (!dimensions.has(col)) {
delete group[col];
}
}
group = JSON.stringify(group);
let currentValue = row[metric];
let prevValue = groupValueMap.get(group);
currentValue = !currentValue ? 0 : currentValue;
prevValue = !prevValue ? 0 : prevValue;
const sum = prevValue + currentValue;
row[metric] = sum;
finalDF.addRow(row);
groupValueMap.set(group, sum);
}
}
return finalDF;
}
function extractFromMetadataQuery(metadataQueryBase) { // extract data from metadataquery from call
var baseParametersDict = {}
var divByZeroResponse = 0
//Returns the column set for this aggregation
//it will give primary aggregation column like TRx,NRx
const baseMetric = primaryAggr.column()
//returns data source configured
var baseDataSource = metadataQueryBase.dataSource()
// get the "dynamicArgs" map from entity-configuration
var dynamicArgs = addonProps?.["defaultFilter"]["dynamicArgs"];
var numDataSource = dynamicArgs['numerator']['datasource'] ? dynamicArgs['numerator']['datasource'] : baseDataSource
var numExceptDimList = dynamicArgs['numerator']['except_dimension'] ? dynamicArgs['numerator']['except_dimension'] : []
var denDataSource = dynamicArgs['denominator']['datasource'] ? dynamicArgs['denominator']['datasource'] : baseDataSource
var denExceptDimList = dynamicArgs['denominator']['except_dimension'] ? dynamicArgs['denominator']['except_dimension'] : []
var numMetricName = dynamicArgs['numerator']['metricName']
var denMetricName = dynamicArgs['denominator']['metricName']
var metric = outputMetricName // outputMetricName : inbuilt variable
var denTimestamp = dynamicArgs['denominator']['timestamp']
var numTimestamp = dynamicArgs['numerator']['timestamp']
var cumulativeFlag = dynamicArgs['cumulative_flag']
//returned list is copy of intervals set for the query and any modifications to the list will not reflect in the query
var metaInterval = metadataQueryBase.intervals()
var arithmeticOperation = dynamicArgs['arithmeticOperation'] != undefined ? dynamicArgs['arithmeticOperation'] : 'percent'
baseParametersDict['baseDataSource'] = baseDataSource
baseParametersDict['dynamicArgs'] = dynamicArgs
baseParametersDict['metric'] = metric
baseParametersDict['metaInterval'] = metaInterval
baseParametersDict['arithmeticOperation'] = arithmeticOperation
baseParametersDict['baseMetric'] = baseMetric
baseParametersDict['numDataSource'] = numDataSource
baseParametersDict['denDataSource'] = denDataSource
baseParametersDict['numMetricName'] = numMetricName
baseParametersDict['denMetricName'] = denMetricName
baseParametersDict['divByZeroResponse'] = divByZeroResponse
baseParametersDict['numExceptDimList'] = numExceptDimList
baseParametersDict['denExceptDimList'] = denExceptDimList
baseParametersDict['denTimestamp'] = denTimestamp
baseParametersDict['numTimestamp'] = numTimestamp
baseParametersDict['cumulativeFlag'] = cumulativeFlag
//log.info("------- baseParametersDict {}", baseParametersDict)
return baseParametersDict
}
function createJoinQuery(metadataQuery,
QRNum,
QRDen,
outputMetricName,
baseParamDict
) {
/**
* metadataQuery: pass query from call
* QRNum: first query
* QRDen: Second query
* outputMetricName: outputMetricName
* baseParamDict: parameters dict
*/
//returns offset set in metadataQuery
var offset = metadataQuery.offset();
//returns limit set in metadataQuery
var limit = metadataQuery.limit();
//shallow Copy of sort orders configured
var order = metadataQuery.orderBy();
//name(String name)- name for metadata query, this helps logging and identification of query
//limit(Integer limit)- sets limit on number of records that would be sent back by query
//offset(Integer offset)- the number of first records with specified offset would be skipped
//clearOrderBy()- clears the order by clauses set on this query
QRNum.name("WhizCalArithmetic_NumeratorQuery").limit(null).offset(null).clearOrderBy();
QRDen.name("WhizCalArithmetic_DenominatorQuery").limit(null).offset(null).clearOrderBy();
var p1DS = DataSource.innerQuery("d1", QRNum);
var p2DS = DataSource.innerQuery("d2", QRDen);
//returns shallow Copy of dimensions configured
var joinColumns = metadataQuery.dimNames();
var p1CL = p1DS.column(baseParamDict.numMetricName);
var p2CL = p2DS.column(baseParamDict.denMetricName);
let aggrAction = getAggrOperation(p1CL, p2CL, baseParamDict);
if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && baseParamDict.denTimestamp && baseParamDict.numTimestamp) {
throw new Error(
"granularity query not supported when numerator and denominator contains timestamp"
);
}
if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && !baseParamDict.denTimestamp && !baseParamDict.numTimestamp) {//As timestamp doesn't matter and is strangely calculated by druid to join on
joinColumns.push(TIMESTAMP_RESPONSE_COLUMN);
}
//creates a query with specified details
var joinQuery = MetadataQuery.create()
//Sets the model on which this query is to be run, model + dataSource combination decides the DB Table or Druid data source to be used to run the query
.model(metadataQuery.model())
//name for metadata query, this helps logging and identification of query
.name("JoinQuery")
//columns to be fetch from database
.select(metadataQuery.dimNames())
if (baseParamDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) {
//columns to be fetch from database
joinQuery.select([p1CL, p2CL]);
}
else {
//columns to be fetch from database
joinQuery.select([p1CL, p2CL, aggrAction]);
}
if (baseParamDict.numTimestamp && !baseParamDict.denTimestamp) {
//from(DBDataSource ds) -returns reference name of primary data source associated with this query
joinQuery.from(p2DS)
//leftJoinOn(DBDataSource dataSource, List<String> joinColumns)- create left join on datasource with given list of column
joinQuery.leftJoinOn(p1DS, joinColumns)
}
else {
joinQuery.from(p1DS)
joinQuery.leftJoinOn(p2DS, joinColumns)
}
//offset(Integer offset)- the number of first records with specified offset would be skipped
//limit(Integer limit)- sets limit on number of records that would be sent back by query
joinQuery.offset(offset).limit(limit)
//orderBy(List sortData)- convenience method calling orderBy(String) or orderBy(SortData) depending on type of each element
joinQuery.orderBy(order);
return joinQuery.druidCompatibilityMode(true);
}
function getAggrOperation(numeratorAggr, denominatorAggr, paramDict) { // create aggregation based on the arithmeticOperation defined in entityConfig
//calling the function from library script
return WhizCalLib.identifyOperation(paramDict.arithmeticOperation, paramDict.metric, numeratorAggr, denominatorAggr, paramDict.divByZeroResponse);
}
function sameDSPostAggrDF(metadataQuery, paramDict) { // Builds postAggr and return dataframe
sameDSMetadataQuery = metadataQuery
var num_object = paramDict['dynamicArgs']['numerator']
var den_object = paramDict['dynamicArgs']['denominator']
//clears the aggregations set on this query
sameDSMetadataQuery.clearAggrs()
//adds given aggregation/post-aggregation to the query
.aggrs(Aggregation.parseList([num_object.aggregation, den_object.aggregation]))
/* Replaces given column to all places applicable; currently Dimensions, Aggregations, Filters, SortOrders
might be extended to more known places , specifically useful for changing marker column patterns with actual column
You must match the first parameter to the column name supplied in the configuration's numerator aggregation.
*/
.replaceColumn('__baseMetric', primaryAggr.column());
numAggr = sameDSMetadataQuery.aggrs()[0]
denAggr = sameDSMetadataQuery.aggrs()[1]
if (paramDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) {
//name for metadata query, this helps logging and identification of query
sameDSMetadataQuery.name("sameDSMetadataQuery")
}
else {
postAggrAction = getAggrOperation(numAggr, denAggr, paramDict)
sameDSMetadataQuery.postAggr(postAggrAction)
//replaceOrderingColumn(String currentOrderingColum,String replacementColum)-Replaces any ordering clauses in the query with given column with ordering on new column
sameDSMetadataQuery.replaceOrderingColumn(paramDict.baseMetric, paramDict.metric).name("sameDSMetadataQuery")
}
//removes all contents referring to each column present in the list from all places applicable
sameDSMetadataQuery.removeColumns(paramDict.numExceptDimList).removeColumns(paramDict.denExceptDimList)
//name for metadata query, this helps logging and identification of query
sameDSMetadataQuery.name("WhizCalArithmetic_Query");
//Fires the MetadataQuery passed, returns DataFrame representing the response of the query
var sd_response_DF = dataAccessManager.fireQuery(sameDSMetadataQuery);
return sd_response_DF
}
function buildQuery(metaCopy, aggrconf, params) {
timestampconf = aggrconf.timestamp;
//returns data source configured
metaCopy.dataSource(aggrconf.datasource ? aggrconf.datasource : params.baseDataSource)
//clears the aggregations set on this query
.clearAggrs()
/*Replaces given column to all places applicable; currently Dimensions, Aggregations, Filters, SortOrders
might be extended to more known places , specifically useful for changing marker column patterns with actual column
You must match the first parameter to the column name supplied in the configuration's numerator aggregation.
*/
.replaceColumn('__baseMetric', primaryAggr.column())
//name for metadata query, this helps logging and identification of query
.name("FinalinnerMetadatQuery")
//adds given aggregation/post-aggregation to the query
.aggr(Aggregation.parse(aggrconf.aggregation))
//removes all contents referring to each column present in the list from all places applicable
.removeColumns(aggrconf.except_dimension)
if (timestampconf) {
//clearIntervals()- clears the intervals set on this query
//intervals(List<String> isoIntervals)- sets given intervals in ISO format as primary time filtering criteria. It's mandatory in druid to provide at least one ISO interval for aggregation queries
metaCopy.clearIntervals().intervals(timestampconf);
//granularity(Granularity granularity)- sets given granularity into the query
metaCopy.granularity(Granularity.all())
metaCopy.addonProperties(null);
}
//log.info("updated metadataQuery : {} ", metaCopy);
return metaCopy;
}
function executeComputation(metadataQuery) {
/******************extract required common info************************* */
baseParamDict = extractFromMetadataQuery(metadataQuery)
/******************Operation on Numerator and Denominator************************* */
//creates deep copy from existing query
var metaCopy_num = metadataQuery.copy();
var metaCopy_den = metadataQuery.copy();
var innerJoinDF;
//names of dimensions configured.
var joinColumns = metadataQuery.dimNames();
if (Granularity.TYPE_ALL != metadataQuery.granularity().type() && !baseParamDict.denTimestamp && !baseParamDict.numTimestamp) {//As timestamp doesn't matter and is strangely caluclated by druid to join on
joinColumns.push(TIMESTAMP_RESPONSE_COLUMN);
}
if (baseParamDict.numDataSource != baseParamDict.denDataSource) {
log.info("-------Datasources are different {}", baseParamDict.numDataSource + " <> " + baseParamDict.denDataSource);
/******************evaluation of Numerator part************************* */
var num_object = baseParamDict['dynamicArgs']['numerator']
//log.info("num_object--" + metaCopy_num)
var QRNum = buildQuery(metaCopy_num, num_object, baseParamDict)
/******************evaluation of Denominator part************************* */
var den_object = baseParamDict['dynamicArgs']['denominator']
var QRDen = buildQuery(metaCopy_den, den_object, baseParamDict)
/******************Operation on DataFrames************************* */
innerJoinQuery = createJoinQuery(metadataQuery,
QRNum,
QRDen,
outputMetricName,
baseParamDict
)
//replaceOrderingColumn(String currentOrderingColum,String replacementColum)-Replaces any ordering clauses in the query with given column with ordering on new column
innerJoinQuery.replaceOrderingColumn(baseParamDict.baseMetric, outputMetricName)
//log.info("innerJoinQuery-- : {} ", innerJoinQuery);
//name for metadata query, this helps logging and identification of query
innerJoinQuery.name("WhizCalArithmetic_JoinQuery");
//fires the MetadataQuery passed, returns DataFrame representing the response of the query
var innerJoinDF = dataAccessManager.fireQuery(innerJoinQuery);
} else {
log.info("-------DataSources are same {}", baseParamDict.numDataSource + " == " + baseParamDict.denDataSource);
innerJoinDF = sameDSPostAggrDF(metadataQuery, baseParamDict);
//var joinColumns = [];
}
if (baseParamDict.cumulativeFlag && Granularity.TYPE_ALL != metadataQuery.granularity().type()) {
let aggrAction = getAggrOperation(baseParamDict.numMetricName, baseParamDict.denMetricName, baseParamDict);
aggrgateColumns = new Set(joinColumns)
aggrgateColumns.delete(TIMESTAMP_RESPONSE_COLUMN)
innerJoinDF = cumulativeCalculation(innerJoinDF, aggrgateColumns, baseParamDict.numMetricName);
//adds column with given name and empty metadata
innerJoinDF.addColumn(aggrAction);
}
return innerJoinDF;
}
return executeComputation(metadataQuery);
})();/*
** configuration to be set in calculation metric **
default_filter:In this section's dynamicArgs sub-section,
1.numerator - used to obtain first value
datasource: name of datasource
metricName: name of the first metric value
except_dimension: list of dimensions to be removed from metaDataQuery
aggregation:specified aggregation to be used to calculate first value
i)name- name of aggregation
ii)type- here we specify which type of calculation we want
iii)column : It is used to specify on which column given type of operation needs to perform.
2.denominator - used to obtain second value
datasource: name of datasource
metricName: name of the second metric value
except_dimension: list of dimensions to be removed from metaDataQuery
aggregation:specified aggregation to be used to calculate second value
i)name- name of aggregation
ii)type- here we specify which type of calculation we want
iii)column : It is used to specify on which column given type of operation needs to perform.
3.arithmeticOperation- operation to be performed between obtained values.
eg.division, addition, subtraction, multiplication, percent, percentChange
**The configuration below should be copied and pasted to perform given Arithmetic Operation .*
*/
"default_filter": {
"dynamicArgs": {
"numerator": {
"datasource": "fct_calls",
"metricName": "Y_calls_temp",
"aggregation": {
"name": "Y_calls_temp",
"type": "countDistinct",
"column": "Customer ID",
"druidThetaSketchSize": 1024
},
"except_dimension": [
"market"
]
},
"denominator": {
"timestamp": [
"2017-01-01T00:00:00/2045-12-15T23:59:59"
],
"datasource": "call_plan",
"metricName": "total_customer",
"aggregation": {
"name": "total_customer",
"type": "countDistinct",
"column": "Customer ID",
"druidThetaSketchSize": 1024
},
"except_dimension": [
"Product Name"
]
},
"arithmeticOperation": "percent"
}
}Was this article helpful?
That’s Great!
Thank you for your feedback
Sorry! We couldn't be helpful
Thank you for your feedback
Feedback sent
We appreciate your effort and will try to fix the article