Create Predicted Eventlog: Difference between revisions

From QPR ProcessAnalyzer Wiki
Jump to navigation Jump to search
 
(30 intermediate revisions by one other user not shown)
Line 1: Line 1:
This page contains instructions how to install and configure eventlog prediction. The prediction creates a new model that contains the source model data and the predictions. Prediction is able to create case attributes for the generated cases and event attribute values for the predicted events. To distinguish the real and predicted events and cases, there are following attributes:
This article has instructions how to install, configure and use eventlog predictions. The prediction creates a new model that contains the source model data and the predictions. It's able to predict case attributes for the generated new cases and event attributes for the predicted events. To distinguish the real (source data) and predicted events and cases, there are following attributes in the model:
* Event attribute '''Predicted''' indicates whether the event is from the source data (false) or whether is a predicted (true).
* Event attribute '''Predicted''' denotes whether the event is from the source data (''false'') or whether it's predicted (''true'').
* Case attribute '''Generated''' indicates whether the case existed in the source data (false) or whether the prediction generated it as a new case.
* Case attribute '''Generated''' denotes whether the case is in the source data (''false'') or whether the prediction generated it as a new case (''true'').


== Prerequisites ==
 
== Prerequisites for prediction ==
Following prerequisites need to be fulfilled to run the eventlog prediction:
Following prerequisites need to be fulfilled to run the eventlog prediction:
* QPR ProcessAnalyzer 2024.2 or later in use
* QPR ProcessAnalyzer 2024.2 or later in use
Line 9: Line 10:
* Source models are stored to Snowflake
* Source models are stored to Snowflake


== Prediction installation ==
== Install prediction to Snowflake ==
Follow these steps to install the eventlog prediction:
To install the eventlog prediction to Snowflake:
# Go to Snowflake UI, and create a Snowflake-managed stage with name '''PREDICTION''' to the same schema that is configured to QPR ProcessAnalyzer (in the Snowflake connection string).
# Go to Snowflake, and create a Snowflake-managed stage with name '''PREDICTION''' to the same schema configured to QPR ProcessAnalyzer (in the Snowflake connection string). Use settings in the following image: [[File:Create_Snowflake_stage.png]]
# Open the created stage and upload the '''predict.pyz''' file to the stage (ask the file from your QPR representative).
# Open the created stage and upload the '''predict.pyz''' file into the stage (ask the file from your QPR representative).
# Create the following procedure to the same schema:
# Create the following procedure to the same schema:
<pre>
<pre>
Line 32: Line 33:
';
';
</pre>
</pre>
# Go to QPR ProcessAnalyzer and create the following expression script:
 
== Create prediction script in QPR ProcessAnalyzer ==
1. Go to QPR ProcessAnalyzer and create the following expression script with name '''ML library''':
<pre>
<pre>
/**
* Name of the stored procedure in Snowflake used for ML model training and related event generation.
*/
let predictionProcedureName = "qprpa_sp_prediction";
let predictionProcedureName = "qprpa_sp_prediction";


function ModifyColumnTypes(dataTable, columnTypesToRestore)
{
  let currentColumnTypes = dataTable.ColumnTypes;
  let currentColumnTypesDict = #{};
  currentColumnTypes.{
    let ct = _;
    currentColumnTypesDict.Set(ct.Name, ct.DataType);
  };
 
  let columnsToModify = [];
 
  columnTypesToRestore.{
    let ct = _;
    let dataTypeToRestore = ct.DataType;
    let currentDataType = currentColumnTypesDict.TryGetValue(ct.Name);
    if (!IsNullTop(currentDataType) && currentDataType != dataTypeToRestore) {
      columnsToModify = Concat(columnsToModify, [ct]);
    }
  };
 
  dataTable
    .AddColumns(columnsToModify.{ let ct = _; #{"Name": `__Restore_${ct.Name}`, "DataType": ct.DataType}});
  columnsToModify.{
    let ct = _;
    dataTable
      .UpdateRows(null, `__Restore_${ct.Name}`, Cast(Column(#expr{ct.Name}), #expr{ct.DataType}));
  };
 
  dataTable
    .RemoveColumns(columnsToModify.Name)
    .RenameColumns(columnsToModify.{ let ct = _; (ct.Name : `__Restore_${ct.Name}`)});
  dataTable.Synchronize();
}
/**
* @name RunFunctionWithParallelLogging
* @descripion
* Runs given function that generates logging information into given data table in a way that all the logging will be included into the
* generated script run log as well.
* @param logTable
* A DataTable having two columns:
* - Datetime-column for the log message's timestamp.
* - Log message column.
* @param callbackFunc
* Function that uses given data table for logging it's current status.
* @returns
* The result of the callback function.
*/
function RunFunctionWithParallelLogging(logTable, callbackFunc)  
function RunFunctionWithParallelLogging(logTable, callbackFunc)  
{
{
Line 53: Line 109:
     () => {
     () => {
       let readRows = 0;
       let readRows = 0;
       while (true) {
       function UpdateLog()
      {
         try {
         try {
          let finished = state.TryGetValue("Finished");
          if (finished == true) {
            WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`);
            break;
          }
           let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect();
           let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect();
           readRows = readRows + rowsDf.NRows;
           readRows = readRows + rowsDf.NRows;
Line 76: Line 127:
         }
         }
       }
       }
       Sleep(5000);
 
       while (true) {
        UpdateLog();
        let finished = state.TryGetValue("Finished");
        if (finished == true) {
          UpdateLog();
          WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`);
          break;
        }
        Sleep(5000);
      }
     }
     }
   ]);
   ]);
Line 82: Line 143:
}
}


/**
* @name Train
* @descripion
* Train a ML model using given parameters and log table.
* @param params
* String dictionary containing the training configuration.
* @param logTable
* DataTable to be used for logging.
* @returns
* Result returned by the ML model training.
*/
function Train(params, logTable)
function Train(params, logTable)
{
{
Line 88: Line 160:
     WriteLog("Starting training...");
     WriteLog("Starting training...");


     let connection = logTable.Project.CreateSnowflakeConnection();
     let connection = logTable.DataSourceConnection;
     let finalParams = params.Clone();
     let finalParams = params.Clone();
     finalParams.Set("_typed", params["_typed"].Extend(#{
     finalParams.Set("_typed", params["_typed"].Extend(#{
Line 101: Line 173:
}
}


/**
* @name Generate
* @descripion
* Generate new events using a trained ML model and log table.
* @param params
* String dictionary containing the training configuration.
* @param logTable
* DataTable to be used for logging.
* @returns
* Result returned by the generation.
*/
function Generate(params, logTable)  
function Generate(params, logTable)  
{
{
Line 107: Line 190:
     WriteLog("Starting generation...")
     WriteLog("Starting generation...")


     let connection = logTable.Project.CreateSnowflakeConnection();
     let connection = logTable.DataSourceConnection;


     let typedParams = params["_typed"];
     let typedParams = params["_typed"];
     let resultEventsTable = typedParams["target_event_data_table"];
     let resultEventsTable = typedParams.TryGetValue("target_event_data_table");
     let resultCasesTable = typedParams.TryGetValue("target_case_data_table");
     let resultCasesTable = typedParams.TryGetValue("target_case_data_table");
      
      
Line 135: Line 218:
}
}


/**
* @name GetSampledEvents
* @descripion
* Returns a SqlDataFrame containing sampled events of given model where given filter is first applied.
* @param sourceModel
* ProcessAnalyzer model object of the model whose event data is to be filtered and sampled.
* @param sampledCaseCount
* The maximum number of cases to return (or null if all cases should be returned).
* @param filter
* JSON filter to be applied on the event data of the source model prior to performing the sampling.
* @returns
* SqlDataFrame containing sampled events of given model where given filter is first applied.
*/
function GetSampledEvents(sourceModel, sampledCaseCount, filter)
function GetSampledEvents(sourceModel, sampledCaseCount, filter)
{
{
Line 168: Line 264:
}
}


/**
* @name TrainMLModelForModel
* @descripion
* Generates a new ML model based on the data found in given source model.
* @param modelName
* Name of the ProcessAnalyzer model to create. The ML model will be named using the template: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel".
* @param sourceModel
* ProcessAnalyzer model object of the model which is to be trained to the ML model.
* @param targetProject
* Project ín whose context the ML model is to be trained.
* @param trainingConfiguration
* String dictionary containing the training configuration.
* @param recreatePredictionModel
* Should a prediction model be overwritten if one already exists for this source model and target model name combination?
* @param trainingDataFilter
* Filter JSON applied on top of all the source model events to find all the events to be used for training.
* @param logTable
* DataTable to be used for logging.
* @param trainingCaseSampleSize
* Maximum number of cases to use from the source model (random sampled).
* @returns
* Result returned by the ML model training.
*/
function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize)
function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize)
{
{
Line 202: Line 321:
}
}


function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable)
function CreateTargetTables(modelName, sourceModel, targetProject)
{
{
  function GetTable(conn, tableName)
  {
    let tableConfiguration = #{
      "Name": tableName,
      "Type": "Snowflake",
      "Connection": conn
    };
    let resultTable = targetProject.DataTableByName(tableName);
    if (resultTable == null)
    {
      resultTable = targetProject.CreateDataTable(tableConfiguration)
        .Modify(#{"NameInDataSource": null})
        .Synchronize();
    }
    return resultTable;
  }
   let sanitizedModelName = `${sourceModel.Id}-${modelName}`;  
   let sanitizedModelName = `${sourceModel.Id}-${modelName}`;  
   let eventsTableName = `${modelName} - events`;
   let eventsTableName = `${modelName} - events`;
   let casesTableName = `${modelName} - cases`;
   let casesTableName = `${modelName} - cases`;
   let m = targetProject.ModelByName(modelName);
   let targetModel = targetProject.ModelByName(modelName);
   let eventsTable = null;
   let eventsTable, casesTable = null;
  let casesTable = null; 
 
   if (m != null)
   if (targetModel != null)
   {
   {
     eventsTable = m.EventsDataTable;
     eventsTable = targetModel.EventsDataTable;
     casesTable = m.CasesDataTable;
     casesTable = targetModel.CasesDataTable;
   }
   }
   else {
   else {
     let eventsTableConfiguration = #{
     let conn = sourceModel.EventsDataTable.DataSourceConnection;
      "Name": eventsTableName,
     eventsTable = GetTable(conn, eventsTableName);
      "Type": "Snowflake"
    };
     eventsTable = targetProject.DataTableByName(eventsTableName) ?? targetProject.CreateDataTable(eventsTableConfiguration).Synchronize();
     if (sourceModel.CasesDataTable != null) {
     if (sourceModel.CasesDataTable != null) {
      let casesTableConfiguration = #{
       casesTable = GetTable(conn, casesTableName);
        "Name": casesTableName,
        "Type": "Snowflake"
      };
       casesTable = targetProject.DataTableByName(casesTableName) ?? targetProject.CreateDataTable(casesTableConfiguration).Synchronize();
     }
     }
   }
   }
Line 233: Line 362:
   casesTable?.Truncate();
   casesTable?.Truncate();


   let generationParams, result;
  return #{
    "TargetModel": targetModel,
    "Events": eventsTable,
    "Cases": casesTable,
    "ModelName": sanitizedModelName
  };
}
 
function CreateResultModel(targetProject, modelName, eventColumnMappings, caseColumnMappings)
{
  WriteLog(`Creating model...`);
 
  let eventsTableName = `${modelName} - events`;
  let casesTableName = `${modelName} - cases`;
 
  let timestampMapping = eventColumnMappings["TimeStamp"];
  eventColumnMappings.Remove("TimeStamp");
  eventColumnMappings.Set("Timestamp", timestampMapping);
 
  let modelConfiguration = #{
    "DataSource": #{
      "Events":#{
        "DataSourceType": "datatable",
        "DataTableName": eventsTableName,
        "Columns": eventColumnMappings
      }
    }
  };
 
  if (caseColumnMappings != null) {
    modelConfiguration["DataSource"].Set("Cases", #{
      "DataSourceType": "datatable",
      "DataTableName": casesTableName,
      "Columns": caseColumnMappings
    });
  }
 
  targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration});
}
 
/**
* @name GenerateNewModel
* @descripion
* Generates a new ProcessAnalyzer model using already trained ML model. Generation consists of the following phases:
* - Generate new cases with new events.
* - Predict new events for incomplete cases.
* - Append original source model data.
* - Create ProcessAnalyzer model if it does not already exist.
* @param modelName
* Name of the ProcessAnalyzer model to create. The used ML model must have name: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel".
* @param sourceModel
* ProcessAnalyzer model object of the model based on which the new model will be created.
* @param targetProject
* Project ínto which the new ProcessAnalyzer model is to be created.
* @param generationConfiguration
* String dictionary containing the generation configuration.
* @param incompleteCasesFilter
* Filter JSON applied on top of all the source model events to find all the events belonging to incomplete cases.
* @param logTable
* DataTable to be used for logging.
* @returns
* ProcessAnalyzer model object of the containing the results of the generation.
*/
function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];
  let sanitizedModelName = tablesDict["ModelName"];
 
   let generationParams, result, overwrite = true;


   if (generationConfiguration.TryGetValue("cases_to_generate") != 0) {
   if (generationConfiguration.TryGetValue("cases_to_generate") != 0) {
Line 244: Line 444:
       },
       },
       "model_name": sanitizedModelName,
       "model_name": sanitizedModelName,
       "overwrite": false
       "overwrite": overwrite
     });
     });
     if (casesTable != null) {
     if (casesTable != null) {
       generationConfiguration["_typed"].Set("target_case_data_table", casesTable);
       generationParams["_typed"].Set("target_case_data_table", casesTable);
     }
     }


Line 257: Line 457:
     if (result["result"] != "success")
     if (result["result"] != "success")
       throw result["exception"]
       throw result["exception"]
    overwrite = false;
   }
   }


Line 270: Line 472:
       },
       },
       "model_name": sanitizedModelName,
       "model_name": sanitizedModelName,
       "overwrite": false,
       "overwrite": overwrite,
       "temperature": 0
       "temperature": 0
     });
     });
     if (casesTable != null) {
     if (casesTable != null) {
       generationConfiguration["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame);
       generationParams["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame);
     }
     }


     result = Generate(generationParams, logTable);
     result = Generate(generationParams, logTable);
    overwrite = false;
     WriteLog(`Generation results:\r\n${result}`);
     WriteLog(`Generation results:\r\n${result}`);


Line 327: Line 530:
     );
     );


   if (m != null)
  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
     return m;
   if (casesTable != null)
    ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
 
  if (targetModel != null)
     return targetModel;
 
  CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null);
}
 
/**
* @name GeneratePredictionModel
* @descripion
* Generates a ProcessAnalyzer model by first training a machine learning model and then using that to generate new cases and events.
* @param configuration
* A string dictionary configuring the model prediction process. Supported keys are:
*  "Name": Name of the PA model to generate to the target project.
*  "SourceModel": Snowflake-based PA model used for training the prediction model.
*  "TargetProject": Target project to create the model into.
*  "TrainingConfiguration": Training parameters as string dictionary.
*  "GenerationConfiguration": Event generation parameters as string dictionary.
*  "TrainingDataFilter": Filter JSON for events to be used for training.
*  "IncompleteCasesFilter": Filter JSON for events belonging to unfinished cases for which new events are to be predicted.
*  "RecreatePredictionModel": Should a prediction model be overwritten if one already exists for this source model and target model name combination?
*  "TrainingCaseSampleSize": Maximum number of cases to use from the source model (random sampled).
*/
function GeneratePredictionModel(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let trainingConfiguration = configuration["TrainingConfiguration"];
  let generationConfiguration = configuration["GenerationConfiguration"];
  let trainingDataFilter = configuration["TrainingDataFilter"];
  let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
  let recreatePredictionModel = configuration["RecreatePredictionModel"];
  let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];
 
  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let conn = sourceModel.EventsDataTable.DataSourceConnection;
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn});
  try {
    WriteLog(`Training a prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id})`);
    result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
    WriteLog(`Model training completed. Training result:\r\n${result}`);
    result = ParseJson(result);


  WriteLog(`Creating model...`);
    if (generationConfiguration != null) {
  let eventColumnMappings = result["column_mappings"]["events"];
      WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName})`);
  let timestampMapping = eventColumnMappings["TimeStamp"];
      let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable);
  eventColumnMappings.Remove("TimeStamp");
  eventColumnMappings.Set("Timestamp", timestampMapping);


  let modelConfiguration = #{
      WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
    "DataSource": #{
       result.Set("Model", newModel);
       "Events":#{
        "DataSourceType": "datatable",
        "DataTableName": eventsTableName,
        "Columns": eventColumnMappings
      }
     }
     }
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
  }
  return result;
}
function TransformModel(modelName, sourceModel, targetProject, configurations, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];
  let eventColumnMappings = sourceModel.EventsDataTable.ColumnMappings;
  let caseColumnMappings = sourceModel.CasesDataTable?.ColumnMappings;
  let result;
  WriteLog(`Performing transformations...`);
  let configuration = #{
    "_typed": #{
      "event_data": sourceModel.EventsDataTable.SqlDataFrame, 
      "project": targetProject,
      "target_event_data_table": eventsTable
    },
    "column_mappings": eventColumnMappings,
    "overwrite": true,
    "transformations": configurations
   };
   };
  if (casesTable != null) {
    configuration["_typed"].Set("target_case_data_table", casesTable);
  }
  result = Generate(configuration, logTable);
  WriteLog(`Transformation results:\r\n${result}`);
  result = ParseJson(result);
  if (result["result"] != "success")
    throw result["exception"]


   if (casesTable != null) {
   if (casesTable != null) {
     modelConfiguration["DataSource"].Set("Cases", #{
     WriteLog(`Importing case data data...`);
      "DataSourceType": "datatable",
    casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});
      "DataTableName": casesTableName,
    ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
      "Columns": result["column_mappings"]["cases"]
  }
     });
 
  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
 
  if (targetModel != null)
    return targetModel;
 
  CreateResultModel(targetProject, modelName, eventColumnMappings, casesTable != null ? caseColumnMappings : null);
}
 
function ApplyTransformations(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let transformationConfigurations = configuration["Transformations"];
  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"});
 
  try {
    WriteLog(`Applying transformations on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`);
    let newModel = TransformModel(modelName, sourceModel, targetProject, transformationConfigurations, logTable);
    WriteLog(`Model transformations completed. New model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
     return newModel;
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
   }
   }
}
function GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];
  let sanitizedModelName = tablesDict["ModelName"];
  let generationParams, result, overwrite = true;
  WriteLog(`Generating case attribute values for running cases...`);
  let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame;
  if (incompleteCasesFilter != null)
    incompleteCasesSqlDataFrame = incompleteCasesSqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);
  generationParams = generationConfiguration.Extend(#{
    "_typed": #{
      "event_data": incompleteCasesSqlDataFrame,
      "case_data": sourceModel.CasesDataTable.SqlDataFrame,
      "target_case_data_table": casesTable,
      "project": targetProject
    },
    "model_name": sanitizedModelName,
    "overwrite": overwrite,
    "temperature": 0
  });
  result = Generate(generationParams, logTable);
  overwrite = false;
  WriteLog(`Generation results:\r\n${result}`);
  result = ParseJson(result);
  if (result["result"] != "success")
    throw result["exception"]
  if (!("Predicted".In(casesTable.ColumnTypes.Name)))
    casesTable
      .AddColumn("Predicted", "Boolean");
  casesTable
    .UpdateRows(
      null,
      "Predicted", true
    );
  WriteLog(`Appending original model data...`);
  eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true}).Synchronize();
  attributesToGenerate.{
    let col = _;
    let newCol = `Predicted_${col}`;
    casesTable.AddColumn(newCol, "String");
    casesTable.UpdateRows(null, newCol, Column(#expr{col}));
  };
  let createdColumns = casesTable.ColumnTypes.Name;
  sourceModel.CasesDataTable.ColumnTypes.Name
    .Where(!_.In(createdColumns)).{
      let col = _;
      casesTable.AddColumn(col, "String");
    };
  casesTable.Merge(sourceModel.CasesDataTable.SqlDataFrame, sourceModel.CasesDataTable.ColumnMappings["CaseId"]);
  casesTable
    .UpdateRows(
      Column("Predicted") == null,
      "Predicted", false
    );


   targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration});
   WriteLog(`Restoring original column types...`);
  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
  ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
 
  if (targetModel != null)
    return targetModel;
 
  CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null);
}
}


function GeneratePredictionModel(configuration)
function GenerateCaseAttributePredictionModel(configuration)
{
{
   let modelName = configuration["Name"];
   let modelName = configuration["Name"];
Line 371: Line 754:
   let result = null;
   let result = null;
   let logTableName = `_MLLog:${Now}`;
   let logTableName = `_MLLog:${Now}`;
   let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"});
  let conn = sourceModel.EventsDataTable.DataSourceConnection;
   let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn});
   try {
   try {
     WriteLog(`Training a prediction model based on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`);
    let attributesToGenerate = trainingConfiguration["output_case_attribute_groups"][0]["attributes"];
 
     WriteLog(`Training a case attribute prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id}). Generated attributes: ${ToJson(attributesToGenerate)}`);
     result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
     result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
     WriteLog(`Model training completed. Training result:\r\n${result}`);
     WriteLog(`Model training completed. Training result:\r\n${result}`);
    result = ParseJson(result);


     if (generationConfiguration != null) {
     if (generationConfiguration != null) {
       WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName}):\r\n${result}`);
       WriteLog(`Generating a new model named ${modelName} based on trained case attribute prediction model ${modelName})`);
       let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable);
       let newModel = GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable);


       WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}:\r\n${result}`);
       WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
      result.Set("Model", newModel);
     }
     }
   }
   }
Line 387: Line 775:
     targetProject.DataTableByName(logTableName)?.DeletePermanently();
     targetProject.DataTableByName(logTableName)?.DeletePermanently();
   }
   }
  return result;
}
}


return #{
return #{
   "GeneratePredictionModel": GeneratePredictionModel,
   "GeneratePredictionModel": GeneratePredictionModel,
   "RunFunctionWithParallelLogging": RunFunctionWithParallelLogging,
   "GenerateCaseAttributePredictionModel": GenerateCaseAttributePredictionModel,
   "Train": Train,
   "ApplyTransformations": ApplyTransformations,
   "Generate": Generate
   "ModifyColumnTypes": ModifyColumnTypes
};
};
</pre>
</pre>


# Create the following expression script (e.g., with name '''Create prediction model'''):
2. Create the following example expression script (e.g., with name '''Create prediction model'''):
<pre>
<pre>
let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{});
let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{});
Line 405: Line 794:
   "SourceModel": ModelById(1),
   "SourceModel": ModelById(1),
   "TargetProject": targetProject,
   "TargetProject": targetProject,
  "TrainingConfiguration": #{},
  "GenerationConfiguration": #{"cases_to_generate": 1000},
  "TrainingDataFilter": null,
  "IncompleteCasesFilter": null,
   "RecreatePredictionModel": true,
   "RecreatePredictionModel": true,
   "TrainingCaseSampleSize": 1000
  "TrainingConfiguration": #{
    "num_epochs_to_train": 500
  },
  "GenerationConfiguration": #{
    "cases_to_generate": 1000
  },
  "TrainingDataFilter": #{
    "Items": [
      #{
        "Type": "IncludeCases",
        "Items": [
          #{
            "Type": "EventAttributeValue",
            "Attribute": "Event type",
            "StringifiedValues": [
              "0Invoice Payment"
            ]
          }
        ]
      }
    ]
  },
   "TrainingCaseSampleSize": 1000,
  "IncompleteCasesFilter": #{
    "Items": [
      #{
        "Type": "ExcludeCases",
        "Items": [
          #{
            "Type": "EventAttributeValue",
            "Attribute": "Event type",
            "StringifiedValues": [
              "0Invoice Payment"
            ]
          }
        ]
      }
    ]
  },
});
});
</pre>
</pre>
# Configure prediction as instructed in the next chapter.
3. Configure prediction for the previously created script as instructed in the next chapter.


== Configure prediction ==
== Configure prediction ==
The prediction has the following settings (defined in the script to create the prediction model):
Prediction script has the following settings in the GeneratePredictionModel call:
* '''Name''': Name of the QPR ProcessAnalyzer model that is created to the target project. The model will contain the source model content and the predictions.
* '''Name''': Name of the QPR ProcessAnalyzer model that is created to the target project. The model will contain the source model content and the predictions.
* '''SourceModel''': Source model.
* '''SourceModel''': Source model for which the prediction is made. Model can be selected for example based on id with ModelById function or by name with ModelByName function.
* '''TargetProject''': Target project to create the model into.
* '''TargetProject''': Target project to create the new model into.
* '''RecreatePredictionModel''': When ''true'', a new ML model is trained when the script is run. When ''false'', the prediction is run using possibly pre-existing ML model.  
* '''TrainingConfiguration''': Training parameters.
* '''TrainingConfiguration''': Training parameters.
* '''GenerationConfiguration''': Event generation parameters.
** '''num_epochs_to_train''': How many times the whole training data is used in training. The more there are epochs, the better the model usually is, but the training will take more time. The best performing model out of all the iterations will be selected.
* '''TrainingDataFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select specific cases to training the prediction model.
** '''max_num_case_clusters''': Maximum number of clusters to divide the case attribute values into. Default is 20.
* '''GenerationConfiguration''': Event generation parameters. When null, no generation is done. For example, following parameters are supported:
** '''cases_to_generate''': Maximum number cases to create. The number of created cases is further limited by the capabilities of the trained model and the ''case_generation_start_time'' and ''case_generation_end_time'' parameters.
** '''case_generation_start_time''': If defined, new cases will be generated starting from this timestamp. If not defined, the latest start event timestamp used in the training data. This parameter is given as ISO datetime format.
** '''case_generation_end_time''': If defined, the new cases generation will stop when reaching this timestamp, and no cases will be generated after it. This parameter is given as ISO datetime format.
** '''generate_debug_event_attributes''': If true, additional columns will be added containing, e.g., probabilities of the selected activity and other activities.
** '''min_prediction_probability ''': Minimum probability of any activity name in next activity prediction. If the probability of an activity is lower than this, it will never be picked. Default value is 0.01.
** '''temperature''': If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1.
* '''TrainingDataFilter''': [[Filtering_in_QPR_ProcessAnalyzer_Queries|Filter]] to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that.
* '''TrainingCaseSampleSize''': Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data.
* '''IncompleteCasesFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select which for cases the prediction is made. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore.
* '''IncompleteCasesFilter''': Optional [[Filtering_in_QPR_ProcessAnalyzer_Queries|filter]] to select which for cases the prediction is made. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore.
* '''RecreatePredictionModel''': Should a prediction model be overwritten if one already exists for this source model and target model name combination.
* '''TrainingCaseSampleSize''': Maximum number of cases to use from the source model (random sampled).
</pre>

Latest revision as of 13:41, 17 May 2024

This article has instructions how to install, configure and use eventlog predictions. The prediction creates a new model that contains the source model data and the predictions. It's able to predict case attributes for the generated new cases and event attributes for the predicted events. To distinguish the real (source data) and predicted events and cases, there are following attributes in the model:

  • Event attribute Predicted denotes whether the event is from the source data (false) or whether it's predicted (true).
  • Case attribute Generated denotes whether the case is in the source data (false) or whether the prediction generated it as a new case (true).


Prerequisites for prediction

Following prerequisites need to be fulfilled to run the eventlog prediction:

  • QPR ProcessAnalyzer 2024.2 or later in use
  • Snowflake connection is configured
  • Source models are stored to Snowflake

Install prediction to Snowflake

To install the eventlog prediction to Snowflake:

  1. Go to Snowflake, and create a Snowflake-managed stage with name PREDICTION to the same schema configured to QPR ProcessAnalyzer (in the Snowflake connection string). Use settings in the following image: Create Snowflake stage.png
  2. Open the created stage and upload the predict.pyz file into the stage (ask the file from your QPR representative).
  3. Create the following procedure to the same schema:
CREATE OR REPLACE PROCEDURE QPRPA_SP_PREDICTION("CONFIGURATION" OBJECT)
RETURNS OBJECT
LANGUAGE PYTHON
STRICT
RUNTIME_VERSION = '3.8'
PACKAGES = ('nltk','numpy','pandas==1.5.3','scikit-learn','snowflake-snowpark-python','tensorflow','dill','prophet','holidays==0.18','python-kubernetes','docker-py')
HANDLER = 'main'
EXECUTE AS OWNER
AS '
import sys
def main(session, parameters_in: dict) -> dict:
	session.file.get(''@decision_intelligence/predict.pyz'', ''/tmp'')
	sys.path.append(''/tmp/predict.pyz'')
	import predict
	return predict.main(session, parameters_in)
';

Create prediction script in QPR ProcessAnalyzer

1. Go to QPR ProcessAnalyzer and create the following expression script with name ML library:

/**
 * Name of the stored procedure in Snowflake used for ML model training and related event generation.
 */
let predictionProcedureName = "qprpa_sp_prediction";

function ModifyColumnTypes(dataTable, columnTypesToRestore)
{
  let currentColumnTypes = dataTable.ColumnTypes;
  let currentColumnTypesDict = #{};
  currentColumnTypes.{
    let ct = _;
    currentColumnTypesDict.Set(ct.Name, ct.DataType);
  };
  
  let columnsToModify = [];
  
  columnTypesToRestore.{
    let ct = _;
    let dataTypeToRestore = ct.DataType;
    let currentDataType = currentColumnTypesDict.TryGetValue(ct.Name);
    if (!IsNullTop(currentDataType) && currentDataType != dataTypeToRestore) {
      columnsToModify = Concat(columnsToModify, [ct]);
    }
  };
  
  dataTable
    .AddColumns(columnsToModify.{ let ct = _; #{"Name": `__Restore_${ct.Name}`, "DataType": ct.DataType}});

  columnsToModify.{
    let ct = _;
    dataTable
      .UpdateRows(null, `__Restore_${ct.Name}`, Cast(Column(#expr{ct.Name}), #expr{ct.DataType}));
  };
  
  dataTable
    .RemoveColumns(columnsToModify.Name)
    .RenameColumns(columnsToModify.{ let ct = _; (ct.Name : `__Restore_${ct.Name}`)});

  dataTable.Synchronize();
}

/**
 * @name RunFunctionWithParallelLogging
 * @descripion
 * Runs given function that generates logging information into given data table in a way that all the logging will be included into the
 * generated script run log as well.
 * @param logTable
 * A DataTable having two columns: 
 * - Datetime-column for the log message's timestamp.
 * - Log message column.
 * @param callbackFunc
 * Function that uses given data table for logging it's current status.
 * @returns
 * The result of the callback function.
 */
function RunFunctionWithParallelLogging(logTable, callbackFunc) 
{
  let state = #{};

  logTable.Truncate();

  _system.Parallel.Run([
    () => {
      try {
        state.Set("Result", callbackFunc());
      }
      finally {
        state.Set("Finished", true);
      }
    },
    () => {
      let readRows = 0;
      function UpdateLog()
      {
        try {
          let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect();
          readRows = readRows + rowsDf.NRows;
          rowsDf.Rows.{
            let row = _;
            WriteLog(`${row[0]} \t${row[1]}`);
          }
        }
        catch (ex) {
          if (ex.InnerExceptions?[0]?.Type == "UnauthorizedAccess") {
            WriteLog(`Log data table has disappeared. The generated log will not be complete!`);
            break;
          }
          WriteLog(`Log has not yet been created.`);
        }
      }

      while (true) {
        UpdateLog();
        let finished = state.TryGetValue("Finished");
        if (finished == true) {
          UpdateLog();
          WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`);
          break;
        }
        Sleep(5000);
      }
    }
  ]);
  return state.TryGetValue("Result");
}

/**
 * @name Train
 * @descripion
 * Train a ML model using given parameters and log table. 
 * @param params
 * String dictionary containing the training configuration.
 * @param logTable
 * DataTable to be used for logging.
 * @returns
 * Result returned by the ML model training.
 */
function Train(params, logTable)
{
  function TrainCallback()
  {
    WriteLog("Starting training...");

    let connection = logTable.DataSourceConnection;
    let finalParams = params.Clone();
    finalParams.Set("_typed", params["_typed"].Extend(#{
      "log_table": logTable
    }));
    let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});

    WriteLog("Finished training...")
    return result;
  }
  return RunFunctionWithParallelLogging(logTable, TrainCallback);
}

/**
 * @name Generate
 * @descripion
 * Generate new events using a trained ML model and log table.
 * @param params
 * String dictionary containing the training configuration.
 * @param logTable
 * DataTable to be used for logging.
 * @returns
 * Result returned by the generation.
 */
function Generate(params, logTable) 
{
  function GenerateCallback()
  {
    WriteLog("Starting generation...")

    let connection = logTable.DataSourceConnection;

    let typedParams = params["_typed"];
    let resultEventsTable = typedParams.TryGetValue("target_event_data_table");
    let resultCasesTable = typedParams.TryGetValue("target_case_data_table");
    
    if (params["overwrite"]) {
      resultEventsTable.Truncate();
      resultCasesTable?.Truncate();
    }

    let finalParams = params.Clone();
    finalParams.Set("_typed", typedParams.Extend(#{
      "log_table": logTable
    }));

    let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});

    resultEventsTable.Synchronize();
    resultCasesTable?.Synchronize();
    
    WriteLog("Finished generation...")
    return result;
  }

  return RunFunctionWithParallelLogging(logTable, GenerateCallback);
}

/**
 * @name GetSampledEvents
 * @descripion
 * Returns a SqlDataFrame containing sampled events of given model where given filter is first applied. 
 * @param sourceModel
 * ProcessAnalyzer model object of the model whose event data is to be filtered and sampled.
 * @param sampledCaseCount
 * The maximum number of cases to return (or null if all cases should be returned).
 * @param filter
 * JSON filter to be applied on the event data of the source model prior to performing the sampling.
 * @returns
 * SqlDataFrame containing sampled events of given model where given filter is first applied.
 */
function GetSampledEvents(sourceModel, sampledCaseCount, filter)
{
  if (IsNull(sampledCaseCount) || sampledCaseCount < 1)
    return sourceModel.EventsDataTable.SqlDataFrame;
  let sampleFilterRule = sampledCaseCount == null 
    ? null
    : #{
        "Type":"IncludeCases",
        "Items":[#{
          "Type": "SqlExpressionValue",
          "Configuration": #{
            "Root": `Cases.WithColumn("_Random", Rand()).OrderByColumns(["_Random"], [true]).Head(${sampledCaseCount})`
          }
        }]
      };
  let filterRules = filter?["Items"];
  if (!IsNullTop(filterRules)) {
    if (!IsNullTop(sampleFilterRule))
      filterRules = Concat(filterRules, sampleFilterRule);
  }
  else {
    if (!IsNullTop(sampleFilterRule))
      filterRules = [sampleFilterRule];
    else
      filterRules = [];
  }

  let finalFilter = #{
    "Items":filterRules
  };
  sourceModel.CacheTableSqlDataFrame(finalFilter);
}

/**
 * @name TrainMLModelForModel
 * @descripion
 * Generates a new ML model based on the data found in given source model. 
 * @param modelName
 * Name of the ProcessAnalyzer model to create. The ML model will be named using the template: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel".
 * @param sourceModel
 * ProcessAnalyzer model object of the model which is to be trained to the ML model.
 * @param targetProject
 * Project ín whose context the ML model is to be trained.
 * @param trainingConfiguration
 * String dictionary containing the training configuration.
 * @param recreatePredictionModel
 * Should a prediction model be overwritten if one already exists for this source model and target model name combination?
 * @param trainingDataFilter
 * Filter JSON applied on top of all the source model events to find all the events to be used for training.
 * @param logTable
 * DataTable to be used for logging.
 * @param trainingCaseSampleSize
 * Maximum number of cases to use from the source model (random sampled). 
 * @returns
 * Result returned by the ML model training.
 */
function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize)
{
  let sanitizedModelName = `${sourceModel.Id}-${modelName}`; 

  let originalCaseCount = sourceModel.EventsDataTable.SqlDataFrame.SelectDistinct([sourceModel.EventsDataTable.ColumnMappings["CaseId"]]).NRows;
  let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter);
  let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([trainEventDataSdf.ColumnMappings["CaseId"]]).NRows;
  WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`)

  let trainCaseDataSdf = sourceModel.CasesDataTable?.SqlDataFrame;

  let columnMappings = trainEventDataSdf.ColumnMappings;
  if (trainCaseDataSdf != null) {
    columnMappings = columnMappings.Extend(#{"Case_CaseId": trainCaseDataSdf.ColumnMappings["CaseId"]});
  }

  trainingConfiguration = trainingConfiguration.Extend(#{ 
    "_typed": #{
      "event_data": trainEventDataSdf,  
      "project": targetProject
    },
    "model_name": sanitizedModelName,
    "column_mappings": columnMappings,
    "overwrite_model": recreatePredictionModel,
    "case_start_time_probability_multiplier": originalCaseCount / actualTrainingCaseSampleSize
  });

  if (trainCaseDataSdf != null) {
    trainingConfiguration["_typed"].Set("case_data", trainCaseDataSdf);
  }

  Train(trainingConfiguration, logTable);
}

function CreateTargetTables(modelName, sourceModel, targetProject)
{
  function GetTable(conn, tableName) 
  {
    let tableConfiguration = #{
      "Name": tableName,
      "Type": "Snowflake",
      "Connection": conn
    };
    let resultTable = targetProject.DataTableByName(tableName);
    if (resultTable == null)
    {
      resultTable = targetProject.CreateDataTable(tableConfiguration)
        .Modify(#{"NameInDataSource": null})
        .Synchronize();
    }
    return resultTable;
  }

  let sanitizedModelName = `${sourceModel.Id}-${modelName}`; 
  let eventsTableName = `${modelName} - events`;
  let casesTableName = `${modelName} - cases`;
  let targetModel = targetProject.ModelByName(modelName);
  let eventsTable, casesTable = null;

  if (targetModel != null)
  {
    eventsTable = targetModel.EventsDataTable;
    casesTable = targetModel.CasesDataTable;
  }
  else {
    let conn = sourceModel.EventsDataTable.DataSourceConnection;
    eventsTable = GetTable(conn, eventsTableName);
    if (sourceModel.CasesDataTable != null) {
      casesTable = GetTable(conn, casesTableName);
    }
  }

  eventsTable.Truncate();
  casesTable?.Truncate();

  return #{
    "TargetModel": targetModel,
    "Events": eventsTable,
    "Cases": casesTable,
    "ModelName": sanitizedModelName
  };
}

function CreateResultModel(targetProject, modelName, eventColumnMappings, caseColumnMappings)
{
  WriteLog(`Creating model...`);

  let eventsTableName = `${modelName} - events`;
  let casesTableName = `${modelName} - cases`;

  let timestampMapping = eventColumnMappings["TimeStamp"];
  eventColumnMappings.Remove("TimeStamp");
  eventColumnMappings.Set("Timestamp", timestampMapping);

  let modelConfiguration = #{
    "DataSource": #{
      "Events":#{
        "DataSourceType": "datatable",
        "DataTableName": eventsTableName,
        "Columns": eventColumnMappings
      }
    }
  };

  if (caseColumnMappings != null) {
    modelConfiguration["DataSource"].Set("Cases", #{
      "DataSourceType": "datatable",
      "DataTableName": casesTableName,
      "Columns": caseColumnMappings
    });
  }

  targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration});
}

/**
 * @name GenerateNewModel
 * @descripion
 * Generates a new ProcessAnalyzer model using already trained ML model. Generation consists of the following phases:
 * - Generate new cases with new events.
 * - Predict new events for incomplete cases.
 * - Append original source model data.
 * - Create ProcessAnalyzer model if it does not already exist.
 * @param modelName
 * Name of the ProcessAnalyzer model to create. The used ML model must have name: "decision_intelligence/<targetProject.Id>-<sourceModel.Id>-<modelName>.pamlmodel".
 * @param sourceModel
 * ProcessAnalyzer model object of the model based on which the new model will be created.
 * @param targetProject
 * Project ínto which the new ProcessAnalyzer model is to be created.
 * @param generationConfiguration
 * String dictionary containing the generation configuration.
 * @param incompleteCasesFilter
 * Filter JSON applied on top of all the source model events to find all the events belonging to incomplete cases.
 * @param logTable
 * DataTable to be used for logging.
 * @returns
 * ProcessAnalyzer model object of the containing the results of the generation.
 */
function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];
  let sanitizedModelName = tablesDict["ModelName"];

  let generationParams, result, overwrite = true;

  if (generationConfiguration.TryGetValue("cases_to_generate") != 0) {
    WriteLog(`Generating new cases with new events...`);

    generationParams = generationConfiguration.Extend(#{ 
      "_typed": #{
        "target_event_data_table": eventsTable,
        "project": targetProject
      },
      "model_name": sanitizedModelName,
      "overwrite": overwrite
    });
    if (casesTable != null) {
      generationParams["_typed"].Set("target_case_data_table", casesTable);
    }

    result = Generate(generationParams, logTable);

    WriteLog(`Generation results:\r\n${result}`);

    result = ParseJson(result);
    if (result["result"] != "success")
      throw result["exception"]

    overwrite = false;
  }

  if (incompleteCasesFilter != null) {
    WriteLog(`Generating new events for running cases...`);
    let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);

    generationParams = generationConfiguration.Extend(#{ 
      "_typed": #{
        "event_data": incompleteCasesSqlDataFrame,  
        "target_event_data_table": eventsTable,
        "project": targetProject
      },
      "model_name": sanitizedModelName,
      "overwrite": overwrite,
      "temperature": 0
    });
    if (casesTable != null) {
      generationParams["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame);
    }

    result = Generate(generationParams, logTable);
    overwrite = false;
    WriteLog(`Generation results:\r\n${result}`);

    result = ParseJson(result);
    if (result["result"] != "success")
      throw result["exception"]
  }

  if (!("Predicted".In(eventsTable.ColumnTypes.Name)))
    eventsTable
      .AddColumn("Predicted", "Boolean");

  eventsTable
    .DeleteRows(
      Column(result["column_mappings"]["events"]["EventType"]) == "__FINISH__"
    )
    .UpdateRows(
      1==1,
      "Predicted", true
    );

  if (casesTable != null) {
    if (!("Generated".In(casesTable.ColumnTypes.Name)))
      casesTable
        .AddColumn("Generated", "Boolean");

    casesTable
      .UpdateRows(
        1==1,
        "Generated", true
      );
  } 

  WriteLog(`Appending original model data...`);
  eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true});
  if (casesTable != null)
    casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});

  eventsTable
    .UpdateRows(
      Column("Predicted") == null,
      "Predicted", false
    );

  casesTable
    ?.UpdateRows(
      Column("Generated") == null,
      "Generated", false
    );

  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
  if (casesTable != null)
    ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);

  if (targetModel != null)
    return targetModel;

  CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null);
}

/**
 * @name GeneratePredictionModel
 * @descripion
 * Generates a ProcessAnalyzer model by first training a machine learning model and then using that to generate new cases and events.
 * @param configuration
 * A string dictionary configuring the model prediction process. Supported keys are:
 *   "Name": Name of the PA model to generate to the target project.
 *   "SourceModel": Snowflake-based PA model used for training the prediction model.
 *   "TargetProject": Target project to create the model into.
 *   "TrainingConfiguration": Training parameters as string dictionary.
 *   "GenerationConfiguration": Event generation parameters as string dictionary.
 *   "TrainingDataFilter": Filter JSON for events to be used for training.
 *   "IncompleteCasesFilter": Filter JSON for events belonging to unfinished cases for which new events are to be predicted.
 *   "RecreatePredictionModel": Should a prediction model be overwritten if one already exists for this source model and target model name combination? 
 *   "TrainingCaseSampleSize": Maximum number of cases to use from the source model (random sampled). 
 */
function GeneratePredictionModel(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let trainingConfiguration = configuration["TrainingConfiguration"];
  let generationConfiguration = configuration["GenerationConfiguration"];
  let trainingDataFilter = configuration["TrainingDataFilter"];
  let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
  let recreatePredictionModel = configuration["RecreatePredictionModel"];
  let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];

  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let conn = sourceModel.EventsDataTable.DataSourceConnection;
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn});
  try {
    WriteLog(`Training a prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id})`);
    result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
    WriteLog(`Model training completed. Training result:\r\n${result}`);
    result = ParseJson(result);

    if (generationConfiguration != null) {
      WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName})`);
      let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable);

      WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
      result.Set("Model", newModel);
    }
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
  }
  return result;
}

function TransformModel(modelName, sourceModel, targetProject, configurations, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];

  let eventColumnMappings = sourceModel.EventsDataTable.ColumnMappings;
  let caseColumnMappings = sourceModel.CasesDataTable?.ColumnMappings;

  let result;

  WriteLog(`Performing transformations...`);

  let configuration = #{ 
    "_typed": #{
      "event_data": sourceModel.EventsDataTable.SqlDataFrame,  
      "project": targetProject,
      "target_event_data_table": eventsTable
    },
    "column_mappings": eventColumnMappings,
    "overwrite": true,
    "transformations": configurations
  };
  if (casesTable != null) {
    configuration["_typed"].Set("target_case_data_table", casesTable);
  }

  result = Generate(configuration, logTable);

  WriteLog(`Transformation results:\r\n${result}`);

  result = ParseJson(result);
  if (result["result"] != "success")
    throw result["exception"]

  if (casesTable != null) {
    WriteLog(`Importing case data data...`);
    casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});
    ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);
  }

  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);

  if (targetModel != null)
    return targetModel;

  CreateResultModel(targetProject, modelName, eventColumnMappings, casesTable != null ? caseColumnMappings : null);
}

function ApplyTransformations(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let transformationConfigurations = configuration["Transformations"];
  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"});

  try {
    WriteLog(`Applying transformations on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`);
    let newModel = TransformModel(modelName, sourceModel, targetProject, transformationConfigurations, logTable);
    WriteLog(`Model transformations completed. New model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
    return newModel;
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
  }
}

function GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable)
{
  let tablesDict = CreateTargetTables(modelName, sourceModel, targetProject);
  let eventsTable = tablesDict["Events"];
  let casesTable = tablesDict["Cases"];
  let targetModel = tablesDict["TargetModel"];
  let sanitizedModelName = tablesDict["ModelName"];

  let generationParams, result, overwrite = true;

  WriteLog(`Generating case attribute values for running cases...`);
  let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame;
  if (incompleteCasesFilter != null)
    incompleteCasesSqlDataFrame = incompleteCasesSqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);

  generationParams = generationConfiguration.Extend(#{ 
    "_typed": #{
      "event_data": incompleteCasesSqlDataFrame,
      "case_data": sourceModel.CasesDataTable.SqlDataFrame,
      "target_case_data_table": casesTable,
      "project": targetProject
    },
    "model_name": sanitizedModelName,
    "overwrite": overwrite,
    "temperature": 0
  });

  result = Generate(generationParams, logTable);
  overwrite = false;
  WriteLog(`Generation results:\r\n${result}`);

  result = ParseJson(result);
  if (result["result"] != "success")
    throw result["exception"]

  if (!("Predicted".In(casesTable.ColumnTypes.Name)))
    casesTable
      .AddColumn("Predicted", "Boolean");

  casesTable
    .UpdateRows(
      null,
      "Predicted", true
    );

  WriteLog(`Appending original model data...`);
  eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true}).Synchronize();
  attributesToGenerate.{ 
    let col = _; 
    let newCol = `Predicted_${col}`;
    casesTable.AddColumn(newCol, "String");
    casesTable.UpdateRows(null, newCol, Column(#expr{col}));
  };
  let createdColumns = casesTable.ColumnTypes.Name;
  sourceModel.CasesDataTable.ColumnTypes.Name
    .Where(!_.In(createdColumns)).{
      let col = _; 
      casesTable.AddColumn(col, "String");
    };
  casesTable.Merge(sourceModel.CasesDataTable.SqlDataFrame, sourceModel.CasesDataTable.ColumnMappings["CaseId"]);

  casesTable
    .UpdateRows(
      Column("Predicted") == null,
      "Predicted", false
    );

  WriteLog(`Restoring original column types...`);
  ModifyColumnTypes(eventsTable, sourceModel.EventsDataTable.ColumnTypes);
  ModifyColumnTypes(casesTable, sourceModel.CasesDataTable.ColumnTypes);

  if (targetModel != null)
    return targetModel;

  CreateResultModel(targetProject, modelName, result["column_mappings"]["events"], casesTable != null ? result["column_mappings"]["cases"] : null);
}

function GenerateCaseAttributePredictionModel(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let trainingConfiguration = configuration["TrainingConfiguration"];
  let generationConfiguration = configuration["GenerationConfiguration"];
  let trainingDataFilter = configuration["TrainingDataFilter"];
  let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
  let recreatePredictionModel = configuration["RecreatePredictionModel"];
  let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];

  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let conn = sourceModel.EventsDataTable.DataSourceConnection;
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake", "Connection": conn});
  try {
    let attributesToGenerate = trainingConfiguration["output_case_attribute_groups"][0]["attributes"];

    WriteLog(`Training a case attribute prediction model based on model: ${sourceModel.Name} (id=${sourceModel.Id}). Generated attributes: ${ToJson(attributesToGenerate)}`);
    result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
    WriteLog(`Model training completed. Training result:\r\n${result}`);
    result = ParseJson(result);

    if (generationConfiguration != null) {
      WriteLog(`Generating a new model named ${modelName} based on trained case attribute prediction model ${modelName})`);
      let newModel = GenerateNewModelWithNewCaseAttributes(modelName, sourceModel, targetProject, generationConfiguration, attributesToGenerate, incompleteCasesFilter, logTable);

      WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}`);
      result.Set("Model", newModel);
    }
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
  }
  return result;
}

return #{
  "GeneratePredictionModel": GeneratePredictionModel,
  "GenerateCaseAttributePredictionModel": GenerateCaseAttributePredictionModel,
  "ApplyTransformations": ApplyTransformations,
  "ModifyColumnTypes": ModifyColumnTypes
};

2. Create the following example expression script (e.g., with name Create prediction model):

let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{});
let targetProject = Project;
lib.GeneratePredictionModel(#{
  "Name": "My prediction model",
  "SourceModel": ModelById(1),
  "TargetProject": targetProject,
  "RecreatePredictionModel": true,
  "TrainingConfiguration": #{
    "num_epochs_to_train": 500
  },
  "GenerationConfiguration": #{
    "cases_to_generate": 1000
  },
  "TrainingDataFilter": #{
    "Items": [
      #{
        "Type": "IncludeCases",
        "Items": [
          #{
            "Type": "EventAttributeValue",
            "Attribute": "Event type",
            "StringifiedValues": [
              "0Invoice Payment"
            ]
          }
        ]
      }
    ]
  },
  "TrainingCaseSampleSize": 1000,
  "IncompleteCasesFilter": #{
    "Items": [
      #{
        "Type": "ExcludeCases",
        "Items": [
          #{
            "Type": "EventAttributeValue",
            "Attribute": "Event type",
            "StringifiedValues": [
              "0Invoice Payment"
            ]
          }
        ]
      }
    ]
  },
});

3. Configure prediction for the previously created script as instructed in the next chapter.

Configure prediction

Prediction script has the following settings in the GeneratePredictionModel call:

  • Name: Name of the QPR ProcessAnalyzer model that is created to the target project. The model will contain the source model content and the predictions.
  • SourceModel: Source model for which the prediction is made. Model can be selected for example based on id with ModelById function or by name with ModelByName function.
  • TargetProject: Target project to create the new model into.
  • RecreatePredictionModel: When true, a new ML model is trained when the script is run. When false, the prediction is run using possibly pre-existing ML model.
  • TrainingConfiguration: Training parameters.
    • num_epochs_to_train: How many times the whole training data is used in training. The more there are epochs, the better the model usually is, but the training will take more time. The best performing model out of all the iterations will be selected.
    • max_num_case_clusters: Maximum number of clusters to divide the case attribute values into. Default is 20.
  • GenerationConfiguration: Event generation parameters. When null, no generation is done. For example, following parameters are supported:
    • cases_to_generate: Maximum number cases to create. The number of created cases is further limited by the capabilities of the trained model and the case_generation_start_time and case_generation_end_time parameters.
    • case_generation_start_time: If defined, new cases will be generated starting from this timestamp. If not defined, the latest start event timestamp used in the training data. This parameter is given as ISO datetime format.
    • case_generation_end_time: If defined, the new cases generation will stop when reaching this timestamp, and no cases will be generated after it. This parameter is given as ISO datetime format.
    • generate_debug_event_attributes: If true, additional columns will be added containing, e.g., probabilities of the selected activity and other activities.
    • min_prediction_probability : Minimum probability of any activity name in next activity prediction. If the probability of an activity is lower than this, it will never be picked. Default value is 0.01.
    • temperature: If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1.
  • TrainingDataFilter: Filter to select specific cases that are used to train the prediction model. This filter is required to train the model only using the completed cases. Uncompleted cases should not be used for the training, so the model doesn't incorrectly learn that cases should end like that.
  • TrainingCaseSampleSize: Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data.
  • IncompleteCasesFilter: Optional filter to select which for cases the prediction is made. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore.