import { DAG } from "./utils/globalUtils.js";
import threadManager from "./threadEngine.js";
import { splits } from "./utils/splits.js";
import { jsScripts } from "../javascript/jsScripts.js";
import { avScripts } from "../wasm/modules/modules.js";
import { gpuScripts } from "../webgpu/gpuScripts.js";
/**
* @class
* @name engine
* @description main engine driver for all the available modules in the hydrocompute library
* @property results - array with results
* @property execTime - execution time of the running tasks
* @property engineName - name of the engine running (javascript, wasm, webgpu)
* @property workerLocation - location of worker script per engine
* @param {String} engine - name of engine running the workers
* @param {String} workerLocation - location of the worker script running the data
*/
export default class engine {
constructor(engine, workerLocation) {
this.setEngine();
this.workerLocation = workerLocation;
this.initialize(engine);
}
/**
* @method initialize
* @memberof engine
* @description setter for the current engine name and worker location
* @param {String} engine - engine worker location to be fetched
*/
initialize(engineName) {
this.engineName = engineName;
this.threads = new threadManager(this.engineName, this.workerLocation);
}
/**
* @method run
* @memberof engine
* @description interface method from the compute layer. it resets the values for each of the
* @param {Object} args - containing the values for each step of isSplit, data, length, functions, funcArgs, dependencies, linked. See documentation for the HydroCompute class for more details
*/
async run(args) {
//Default behavior for when no data or no functions are passed.
if (args.data.length === 0 || args.functions.length === 0) {
console.error(
"Please pass the data required for analysis and/or the functions to run."
);
return false
}
let {
//Array of functions per step: [[fun1, fun2, fun3], [fun1,fun2,fun3]...]
functions = [],
//Array of arguments per function per step. Can be empty: [[addArg1, addArg2, addArg3], [addArg1, addArg2, addArg3]...]
funcArgs = [],
//Array of dependencies per step per function run: [[[], [0], [1]], [[], [], [0,1]]...]
dependencies = [],
//If true, that means that the results from step 0 are trailed down to step 1 and further. If false, then either use the same data or different
//data will be used at each step. If that is the case, it must be specified as an additional argument.
linked = false,
//The data array can be a set of array buffers. In case the steps are linked and the results are trailed down, then only one buffer is required.
data = [],
//Length of the data submitted for analysis per step.
length = [],
//Array of data splits to be performed per step: [true, false, false, true...]
isSplit = [],
//Name of the script used from the passed arguments.
scriptName = [],
} = args;
//The total number of steps will be infered from the number of functions per step.
let steps = functions.length;
let stepArgs = [];
//Separating
for (let i = 0; i < steps; i++) {
let thisFunctions = functions[i],
thisFunArgs = funcArgs[i],
thisDep = dependencies[i],
thisData = data[i],
thisSplits = isSplit[i],
thisThreadCount = thisFunctions.length,
thisDataLength = length[i],
thisScriptName = scriptName[i];
//defaults in case there are no inputs from the user
thisDep =
typeof thisDep === "undefined" || thisDep === null || thisDep[0] === ""
? []
: thisDep;
thisFunArgs =
typeof thisFunArgs === "undefined" || thisFunArgs === null
? []
: thisFunArgs;
stepArgs.push({
data: thisData,
id: i,
functions: thisFunctions,
funcArgs: thisFunArgs,
isSplit: thisSplits,
threadCount: thisThreadCount,
dependencies: thisDep,
length: thisDataLength,
scriptName: thisScriptName
});
}
try {
//Evluate the execution as a set of trailing down promises that resolve on after the other
if (linked) {
var stepResolve = [];
for (var i = 0; i < stepArgs.length; i++) {
//THIS NEEDS TO CHANGE
stepResolve.push((i, data) => {
return new Promise(async (resolve) => {
let _args = stepArgs[i];
//this could be changed
if (data !== undefined) {
_args.data = data;
}
let p = await this.stepRun(_args);
resolve(p);
});
});
}
//Define a trailing down execution
await DAG({ functions: stepResolve, args: stepArgs[0], type: "steps" });
} else {
//Define a step execution
for (let stepArg of stepArgs) {
await this.stepRun(stepArg);
}
}
return true
} catch (error) {
console.error(
"There was an error with the execution of the steps."
);
throw error;
}
}
/**
* @method stepRun
* @memberof engine
* @description main method for running a simulation. It sets up the way of running
* the computational engine for each module
* @param {*} args
* @returns {Promise} Resolves the type of run to take.
*/
async stepRun(args) {
let {
funcArgs = [],
functions = [],
dependencies = [],
step = 0,
data = [],
isSplit = false,
length = 1,
threadCount = 0,
scriptName = undefined
} = args;
for (var i = 0; i < threadCount; i++) {
this.threads.createWorkerThread(i);
}
let dataSplits = [];
//EXAMPLE CASE: If there are multiple functions that do not depend of each other
//assume that the work can be parallelized
switch (true) {
case functions.length === 1:
dataSplits = data;
break;
case functions.length > 0 && dependencies.length === 0 && isSplit:
dataSplits = splits.main("split1DArray", {
data: data,
n: functions.length,
});
break;
case functions.length > 0 && dependencies.length === 0 && !isSplit:
for (let i = 0; i < functions.length; i++) {
dataSplits.push(data.slice());
}
break;
case functions.length > 0 && dependencies.length > 0:
dataSplits = data;
// Handle the case where there are multiple functions with multiple dependencies
break;
default:
dataSplits = data;
// Handle any other case that was not anticipated
break;
}
let _args = {
data: dataSplits,
splitting: isSplit,
functions,
funcArgs,
threadCount,
length,
scriptName
};
try {
//Need to change the dependencies. They can be different for each
//of the steps linked, or the functions per step.
let r = await this.taskRunner(_args, step, dependencies);
return r;
} catch (error) {
console.error(
`There was an error executing step: ${step}.`,
);
throw error
}
}
/**
* Runs multiple tasks concurrently using worker threads and dependency graph.
* @param {object} args - The arguments for concurrent execution.
* @memberof engine
* @param {number} step - The step value.
* @param {Array} dependencies - The dependency graph.
* @returns {Promise} - A promise that resolves to the result of concurrent execution.
*/
async concurrentRun(args, step, dependencies) {
let batchTasks = []
for (var i = 0; i < args.threadCount; i++) {
let d = args.data.buffer !== undefined
? args.data.buffer
: args.data[i].buffer;
var _args = {
//data: Array.isArray(args.data[0]) ? args.data[i] : args.data,
data: d,
id: i,
funcName: args.functions[i],
length: args.length,
step: step,
funcArgs: args.funcArgs[i],
scriptName: args.scriptName[i]
};
this.threads.initializeWorkerThread(i);
batchTasks.push(_args)
}
try {
let res = await DAG({
functions: Object.keys(this.threads.workerThreads).map((key) => {
return this.threads.workerThreads[key].worker;
}),
dag: dependencies,
args: batchTasks,
type: "functions",
});
return res;
} catch (error) {
console.error(
`There was an error executing the DAG for step: ${step}.`,
);
throw error
}
}
/**
* Runs multiple tasks in parallel using worker threads.
* @param {object} args - The arguments for parallel execution.
* @memberof engine
* @param {number} args.threadCount - The total number of threads.
* @param {function[]} args.functions - The array of functions to execute.
* @param {Array} args.funcArgs - The array of arguments for each function.
* @param {number} step - The step value.
* @returns {Promise<Array>} - A promise that resolves to an array of results.
*/
async parallelRun(args, step) {
const batches = [];
let results = [];
let last = 0;
for (let i = 0; i < args.threadCount; i += this.threads.maxWorkerCount) {
const batch = {
functions: [],
funcArgs: [],
};
for (
let j = i;
j < i + this.threads.maxWorkerCount && j < args.threadCount;
j++
) {
batch.functions.push(args.functions[j]);
batch.funcArgs.push(args.funcArgs[j]);
}
batches.push(batch);
}
for (let batch of batches) {
let batchTasks = [];
for (var i = 0; i < batch.functions.length; i++) {
let j = last + i;
//item changed, check it out later
let d =
args.data.buffer !== undefined
? args.data.buffer
: args.data[j].buffer;
let workerArgs = {
data: d,
id: i,
funcName: args.functions[i],
funcArgs: args.funcArgs[i],
step,
length: args.length,
scriptName: args.scriptName[i]
};
this.threads.initializeWorkerThread(i);
batchTasks.push(this.threads.workerThreads[i].worker(workerArgs));
}
let batchResults = await Promise.all(batchTasks);
results = results.concat(batchResults);
last += batch.functions.length;
}
return results;
}
/**
* Executes tasks based on the provided dependencies and step counter.
* @param {object} args - The arguments for task execution.
* @memberof engine
* @param {number} stepCounter - The step counter.
* @param {Array} dependencies - The dependency graph.
* @returns {Promise} - A promise that resolves to the result of task execution.
*/
async taskRunner(args, stepCounter, dependencies) {
try {
let x;
if (dependencies.length > 0) {
// Sequential Execution
x = await this.concurrentRun(args, stepCounter, dependencies);
} else {
// Parallel Execution
x = await this.parallelRun(args, stepCounter);
}
if (args.threadCount === this.threads.results.length) {
[this.funcEx, this.scriptEx] = this.threads.execTimes;
this.results.push({
//step: stepCounter,
results: this.threads.results,
funcEx: this.funcEx,
scriptEx: this.scriptEx,
funcOrder: this.threads.functionOrder
});
console.log(
`Total function execution time: ${this.funcEx} ms\nTotal worker execution time: ${this.scriptEx} ms`
);
this.threads.resetWorkers();
}
return x;
} catch (error) {
this.threads.resetWorkers();
console.error("An error occurred during task execution.");
throw error;
}
}
/**
*@description resets all properties of the class
* @memberof engine
*@method setEngine
*/
setEngine() {
this.funcEx = 0;
this.scriptEx = 0;
this.engineName = null;
this.splitting = false;
this.instanceCounter = 0;
this.results = [];
this.dataSplits = [];
this.workerLocation = null;
}
/**
* @method availableScripts
* @memberof engine
* @description Returns all the available scripts for each of the engines
* @returns {Object} available functions for each script on an engine
*/
availableScripts() {
if (this.engineName === "javascript") return jsScripts();
if (this.engineName === "wasm") return avScripts();
if (this.engineName === "webgpu") return gpuScripts();
}
}