Create Predicted Eventlog

From QPR ProcessAnalyzer Wiki
Jump to navigation Jump to search

This article has instructions how to install, configure and use eventlog predictions. The prediction creates a new model that contains the source model data and the predictions. It's able to predict case attributes for the generated new cases and event attributes for the predicted events. To distinguish the real (source data) and predicted events and cases, there are following attributes in the model:

  • Event attribute Predicted denotes whether the event is from the source data (false) or whether it's predicted (true).
  • Case attribute Generated denotes whether the case is in the source data (false) or whether the prediction generated it as a new case (true).


Prerequisites for prediction

Following prerequisites need to be fulfilled to run the eventlog prediction:

  • QPR ProcessAnalyzer 2024.2 or later in use
  • Snowflake connection is configured
  • Source models are stored to Snowflake

Install prediction to Snowflake

To install the eventlog prediction to Snowflake:

  1. Go to Snowflake, and create a Snowflake-managed stage with name PREDICTION to the same schema configured to QPR ProcessAnalyzer (in the Snowflake connection string). Use settings in the following image: Create Snowflake stage.png
  2. Open the created stage and upload the predict.pyz file into the stage (ask the file from your QPR representative).
  3. Create the following procedure to the same schema:
CREATE OR REPLACE PROCEDURE QPRPA_SP_PREDICTION("CONFIGURATION" OBJECT)
RETURNS OBJECT
LANGUAGE PYTHON
STRICT
RUNTIME_VERSION = '3.8'
PACKAGES = ('nltk','numpy','pandas==1.5.3','scikit-learn','snowflake-snowpark-python','tensorflow','dill','prophet','holidays==0.18','python-kubernetes','docker-py')
HANDLER = 'main'
EXECUTE AS OWNER
AS '
import sys
def main(session, parameters_in: dict) -> dict:
	session.file.get(''@decision_intelligence/predict.pyz'', ''/tmp'')
	sys.path.append(''/tmp/predict.pyz'')
	import predict
	return predict.main(session, parameters_in)
';

Create prediction script in QPR ProcessAnalyzer

1. Go to QPR ProcessAnalyzer and create the following expression script with name ML library:

let predictionProcedureName = "qprpa_sp_prediction";

function RunFunctionWithParallelLogging(logTable, callbackFunc) 
{
  let state = #{};

  logTable.Truncate();

  _system.Parallel.Run([
    () => {
      try {
        state.Set("Result", callbackFunc());
      }
      finally {
        state.Set("Finished", true);
      }
    },
    () => {
      let readRows = 0;
      while (true) {
        try {
          let finished = state.TryGetValue("Finished");
          if (finished == true) {
            WriteLog(`Work finished. Log written to table ${logTable.Name} (id: ${logTable.Id})`);
            break;
          }

          let rowsDf = logTable.Synchronize().SqlDataFrame.Skip(readRows).Collect();
          readRows = readRows + rowsDf.NRows;
          rowsDf.Rows.{
            let row = _;
            WriteLog(`${row[0]} \t${row[1]}`);
          }
        }
        catch (ex) {
          if (ex.InnerExceptions?[0]?.Type == "UnauthorizedAccess") {
            WriteLog(`Log data table has disappeared. The generated log will not be complete!`);
            break;
          }
          WriteLog(`Log has not yet been created.`);
        }
      }
      Sleep(5000);
    }
  ]);
  return state.TryGetValue("Result");
}

function Train(params, logTable)
{
  function TrainCallback()
  {
    WriteLog("Starting training...");

    let connection = logTable.Project.CreateSnowflakeConnection();
    let finalParams = params.Clone();
    finalParams.Set("_typed", params["_typed"].Extend(#{
      "log_table": logTable
    }));
    let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});

    WriteLog("Finished training...")
    return result;
  }
  return RunFunctionWithParallelLogging(logTable, TrainCallback);
}

function Generate(params, logTable) 
{
  function GenerateCallback()
  {
    WriteLog("Starting generation...")

    let connection = logTable.Project.CreateSnowflakeConnection();

    let typedParams = params["_typed"];
    let resultEventsTable = typedParams["target_event_data_table"];
    let resultCasesTable = typedParams.TryGetValue("target_case_data_table");
    
    if (params["overwrite"]) {
      resultEventsTable.Truncate();
      resultCasesTable?.Truncate();
    }

    let finalParams = params.Clone();
    finalParams.Set("_typed", typedParams.Extend(#{
      "log_table": logTable
    }));

    let result = connection.CallStoredProcedure(predictionProcedureName, #{"configuration": finalParams});

    resultEventsTable.Synchronize();
    resultCasesTable?.Synchronize();
    
    WriteLog("Finished generation...")
    return result;
  }

  return RunFunctionWithParallelLogging(logTable, GenerateCallback);
}

function GetSampledEvents(sourceModel, sampledCaseCount, filter)
{
  if (IsNull(sampledCaseCount) || sampledCaseCount < 1)
    return sourceModel.EventsDataTable.SqlDataFrame;
  let sampleFilterRule = sampledCaseCount == null 
    ? null
    : #{
        "Type":"IncludeCases",
        "Items":[#{
          "Type": "SqlExpressionValue",
          "Configuration": #{
            "Root": `Cases.WithColumn("_Random", Rand()).OrderByColumns(["_Random"], [true]).Head(${sampledCaseCount})`
          }
        }]
      };
  let filterRules = filter?["Items"];
  if (!IsNullTop(filterRules)) {
    if (!IsNullTop(sampleFilterRule))
      filterRules = Concat(filterRules, sampleFilterRule);
  }
  else {
    if (!IsNullTop(sampleFilterRule))
      filterRules = [sampleFilterRule];
    else
      filterRules = [];
  }

  let finalFilter = #{
    "Items":filterRules
  };
  sourceModel.CacheTableSqlDataFrame(finalFilter);
}

function TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize)
{
  let sanitizedModelName = `${sourceModel.Id}-${modelName}`; 

  let originalCaseCount = sourceModel.EventsDataTable.SqlDataFrame.SelectDistinct([sourceModel.EventsDataTable.ColumnMappings["CaseId"]]).NRows;
  let trainEventDataSdf = GetSampledEvents(sourceModel, trainingCaseSampleSize, trainingDataFilter);
  let actualTrainingCaseSampleSize = trainEventDataSdf.SelectDistinct([trainEventDataSdf.ColumnMappings["CaseId"]]).NRows;
  WriteLog(`Starting to train a prediction model using ${ToString(100*actualTrainingCaseSampleSize/originalCaseCount, "F")}% of the original cases (${actualTrainingCaseSampleSize}/${originalCaseCount}) found in source model ${sourceModel.Name}.`)

  let trainCaseDataSdf = sourceModel.CasesDataTable?.SqlDataFrame;

  let columnMappings = trainEventDataSdf.ColumnMappings;
  if (trainCaseDataSdf != null) {
    columnMappings = columnMappings.Extend(#{"Case_CaseId": trainCaseDataSdf.ColumnMappings["CaseId"]});
  }

  trainingConfiguration = trainingConfiguration.Extend(#{ 
    "_typed": #{
      "event_data": trainEventDataSdf,  
      "project": targetProject
    },
    "model_name": sanitizedModelName,
    "column_mappings": columnMappings,
    "overwrite_model": recreatePredictionModel,
    "case_start_time_probability_multiplier": originalCaseCount / actualTrainingCaseSampleSize
  });

  if (trainCaseDataSdf != null) {
    trainingConfiguration["_typed"].Set("case_data", trainCaseDataSdf);
  }

  Train(trainingConfiguration, logTable);
}

function GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable)
{
  let sanitizedModelName = `${sourceModel.Id}-${modelName}`; 
  let eventsTableName = `${modelName} - events`;
  let casesTableName = `${modelName} - cases`;
  let m = targetProject.ModelByName(modelName);
  let eventsTable = null;
  let casesTable = null;  
  if (m != null)
  {
    eventsTable = m.EventsDataTable;
    casesTable = m.CasesDataTable;
  }
  else {
    let eventsTableConfiguration = #{
      "Name": eventsTableName,
      "Type": "Snowflake"
    };
    eventsTable = targetProject.DataTableByName(eventsTableName) ?? targetProject.CreateDataTable(eventsTableConfiguration).Synchronize();
    if (sourceModel.CasesDataTable != null) {
      let casesTableConfiguration = #{
        "Name": casesTableName,
        "Type": "Snowflake"
      };
      casesTable = targetProject.DataTableByName(casesTableName) ?? targetProject.CreateDataTable(casesTableConfiguration).Synchronize();
    }
  }

  eventsTable.Truncate();
  casesTable?.Truncate();

  let generationParams, result;

  if (generationConfiguration.TryGetValue("cases_to_generate") != 0) {
    WriteLog(`Generating new cases with new events...`);

    generationParams = generationConfiguration.Extend(#{ 
      "_typed": #{
        "target_event_data_table": eventsTable,
        "project": targetProject
      },
      "model_name": sanitizedModelName,
      "overwrite": false
    });
    if (casesTable != null) {
      generationConfiguration["_typed"].Set("target_case_data_table", casesTable);
    }

    result = Generate(generationParams, logTable);

    WriteLog(`Generation results:\r\n${result}`);

    result = ParseJson(result);
    if (result["result"] != "success")
      throw result["exception"]
  }

  if (incompleteCasesFilter != null) {
    WriteLog(`Generating new events for running cases...`);
    let incompleteCasesSqlDataFrame = sourceModel.EventsDataTable.SqlDataFrame.ApplyFilter(incompleteCasesFilter, sourceModel.CasesDataTable?.SqlDataFrame);

    generationParams = generationConfiguration.Extend(#{ 
      "_typed": #{
        "event_data": incompleteCasesSqlDataFrame,  
        "target_event_data_table": eventsTable,
        "project": targetProject
      },
      "model_name": sanitizedModelName,
      "overwrite": false,
      "temperature": 0
    });
    if (casesTable != null) {
      generationConfiguration["_typed"].Set("case_data", sourceModel.CasesDataTable.SqlDataFrame);
    }

    result = Generate(generationParams, logTable);
    WriteLog(`Generation results:\r\n${result}`);

    result = ParseJson(result);
    if (result["result"] != "success")
      throw result["exception"]
  }

  if (!("Predicted".In(eventsTable.ColumnTypes.Name)))
    eventsTable
      .AddColumn("Predicted", "Boolean");

  eventsTable
    .DeleteRows(
      Column(result["column_mappings"]["events"]["EventType"]) == "__FINISH__"
    )
    .UpdateRows(
      1==1,
      "Predicted", true
    );

  if (casesTable != null) {
    if (!("Generated".In(casesTable.ColumnTypes.Name)))
      casesTable
        .AddColumn("Generated", "Boolean");

    casesTable
      .UpdateRows(
        1==1,
        "Generated", true
      );
  } 

  WriteLog(`Appending original model data...`);
  eventsTable.Import(sourceModel.EventsDataTable.SqlDataFrame, #{"Append": true});
  if (casesTable != null)
    casesTable.Import(sourceModel.CasesDataTable.SqlDataFrame, #{"Append": true});

  eventsTable
    .UpdateRows(
      Column("Predicted") == null,
      "Predicted", false
    );

  casesTable
    ?.UpdateRows(
      Column("Generated") == null,
      "Generated", false
    );

  if (m != null)
    return m;

  WriteLog(`Creating model...`);
  let eventColumnMappings = result["column_mappings"]["events"];
  let timestampMapping = eventColumnMappings["TimeStamp"];
  eventColumnMappings.Remove("TimeStamp");
  eventColumnMappings.Set("Timestamp", timestampMapping);

  let modelConfiguration = #{
    "DataSource": #{
      "Events":#{
        "DataSourceType": "datatable",
        "DataTableName": eventsTableName,
        "Columns": eventColumnMappings
      }
    }
  };

  if (casesTable != null) {
    modelConfiguration["DataSource"].Set("Cases", #{
      "DataSourceType": "datatable",
      "DataTableName": casesTableName,
      "Columns": result["column_mappings"]["cases"]
    });
  }

  targetProject.CreateModel(#{"Name": modelName, "Configuration": modelConfiguration});
}

function GeneratePredictionModel(configuration)
{
  let modelName = configuration["Name"];
  let sourceModel = configuration["SourceModel"];
  let targetProject = configuration["TargetProject"];
  let trainingConfiguration = configuration["TrainingConfiguration"];
  let generationConfiguration = configuration["GenerationConfiguration"];
  let trainingDataFilter = configuration["TrainingDataFilter"];
  let incompleteCasesFilter = configuration["IncompleteCasesFilter"];
  let recreatePredictionModel = configuration["RecreatePredictionModel"];
  let trainingCaseSampleSize = configuration["TrainingCaseSampleSize"];

  let result = null;
  let logTableName = `_MLLog:${Now}`;
  let logTable = targetProject.CreateDataTable(#{"Name": logTableName, "Type": "Snowflake"});
  try {
    WriteLog(`Training a prediction model based on model:\r\n${sourceModel.Name} (id=${sourceModel.Id})`);
    result = TrainMLModelForModel(modelName, sourceModel, targetProject, trainingConfiguration, recreatePredictionModel, trainingDataFilter, logTable, trainingCaseSampleSize);
    WriteLog(`Model training completed. Training result:\r\n${result}`);

    if (generationConfiguration != null) {
      WriteLog(`Generating a new model named ${modelName} based on trained model ${modelName}):\r\n${result}`);
      let newModel = GenerateNewModel(modelName, sourceModel, targetProject, generationConfiguration, incompleteCasesFilter, logTable);

      WriteLog(`Model ${newModel.Name} (id=${newModel.Id}) generated into project ${targetProject.Name}:\r\n${result}`);
    }
  }
  finally {
    targetProject.DataTableByName(logTableName)?.DeletePermanently();
  }
}

return #{
  "GeneratePredictionModel": GeneratePredictionModel,
  "RunFunctionWithParallelLogging": RunFunctionWithParallelLogging,
  "Train": Train,
  "Generate": Generate
};

2. Create the following example expression script (e.g., with name Create prediction model):

let lib = First(Project.Scripts.Where(Name == "ML library")).Run(#{});
let targetProject = Project;
lib.GeneratePredictionModel(#{
  "Name": "My prediction model",
  "SourceModel": ModelById(1),
  "TargetProject": targetProject,
  "RecreatePredictionModel": true,
  "TrainingConfiguration": #{
    num_epochs_to_train: 500
  },
  "GenerationConfiguration": #{
    "cases_to_generate": 1000
  },
  "TrainingDataFilter": #{
    "Items": [
      #{
        "Type": "IncludeCases",
        "Items": [
          #{
            "Type": "EventAttributeValue",
            "Attribute": "Event type",
            "StringifiedValues": [
              "0Invoice Payment"
            ]
          }
        ]
      }
    ]
  },
  "TrainingCaseSampleSize": 1000,
  "IncompleteCasesFilter": #{
    "Items": [
      #{
        "Type": "ExcludeCases",
        "Items": [
          #{
            "Type": "EventAttributeValue",
            "Attribute": "Event type",
            "StringifiedValues": [
              "0Invoice Payment"
            ]
          }
        ]
      }
    ]
  },
});

3. Configure prediction for the previously created script as instructed in the next chapter.

Configure prediction

Prediction script has the following settings in the GeneratePredictionModel call:

  • Name: Name of the QPR ProcessAnalyzer model that is created to the target project. The model will contain the source model content and the predictions.
  • SourceModel: Source model for which the prediction is made. Model can be selected for example based on id with ModelById function or by name with ModelByName function.
  • TargetProject: Target project to create the new model into.
  • RecreatePredictionModel: When true, a new ML model is trained when the script is run. When false, the prediction is run using possibly pre-existing ML model.
  • TrainingConfiguration: Training parameters.
    • num_epochs_to_train: How many times the whole training data is used in training. The more there are epochs, the better the model usually is, but the training will take more time. The best performing model out of all the iterations will be selected.
    • max_num_case_clusters: Maximum number of clusters to divide the case attribute values into. Default is 20.
  • GenerationConfiguration: Event generation parameters. When null, no generation is done. For example, following parameters are supported:
    • cases_to_generate: Maximum number cases to create. The number of created cases is further limited by the capabilities of the trained model and the case_generation_start_time and case_generation_end_time parameters.
    • case_generation_start_time: If defined, new cases will be generated starting from this timestamp. If not defined, the latest start event timestamp used in the training data. This parameter is given as ISO datetime format.
    • case_generation_end_time: If defined, the new cases generation will stop when reaching this timestamp, and no cases will be generated after it. This parameter is given as ISO datetime format.
    • generate_debug_event_attributes: If true, additional columns will be added containing, e.g., probabilities of the selected activity and other activities.
    • min_prediction_probability : Minimum probability of any activity name in next activity prediction. If the probability of an activity is lower than this, it will never be picked. Default value is 0.01.
    • temperature: If 0, the predicted event type will always be the most probable one. If 1, the next event type is randomly selected based on probabilities of each event type. This behavior is interpolated when using values between 0 and 1.
  • TrainingDataFilter: filter to select specific cases to training the prediction model. Usually this filter is required to train the model only using the completed cases. If uncompleted cases are given to the training, the model can incorrectly learn that cases should end like that.
  • TrainingCaseSampleSize: Maximum number of cases to take from the source model (cases are selected randomly). Use a lower setting to speed up the ML model training. The greater the value, the more subtle phenomena the prediction can learn from the data.
  • IncompleteCasesFilter: Optional filter to select which for cases the prediction is made. To improve performance of the prediction, it's recommended to include only the incomplete cases for which new events might appear, and skip the completed cases for which new events are not expected anymore.