core/threadEngine.js

//CHANGE!!!!!
// eventBus import removed

/**
 * @description Main class for managing threads. Results and execution time are saved here
 * @property engine - name of the engine
 * @property workerLocation - location of the worker running the engine
 * @property workerThreads - holder for all the worker threads
 * @property maxWorkerCount - maximum workers on the browser Leave it at least 1 less than all the available.
 * @property results - holder of the results once finished
 * @class threadManager
 * @param {string} name - The name of the thread manager.
 * @param {string} location - The location of the worker script file.
 */
export default class threadManager {
  constructor(name, location, externLib, eventBus = null) {
    if (!window.Worker) {
      console.error("Web workers API not supported!");
    }
    this.engine = name;
    this.workerLocation = location;
    this.externLib = externLib;

    // Internal events
    this._events = {};

    // External event bus
    this.eventBus = eventBus;

    // CRITICAL: Track all active worker instances for termination
    this.activeWorkers = new Map(); // Map<workerInstance, {index, args, resolve, reject}>
    this.isStopped = false; // Flag to prevent new executions after stop
    this.resetWorkers();
  }

  //CHANGE!!
  on(event, callback) {
    this._events[event] = this._events[event] || [];
    this._events[event].push(callback);
  }

  emit(event, data) {
    // Internal listeners
    if (this._events[event]) {
      this._events[event].forEach(callback => callback(data));
    }

    // Propagate to external event bus
    if (this.eventBus) {
      this.eventBus.emit(event, data);
    }
  }

  /**
   * @memberof threadManager
   * @description Holder for the workers created by the class. It creates an object that contains the workers defined
   * by the execution context holding the execution time of each thread and the worker itself.
   * @param {Number} number - number of thread to run
   */
  createWorkerThread(number) {
    this.workerThreads[number] = {
      worker: undefined,
      functionTime: 0,
      workerTime: 0,
    };
  }

  async executeWithDag(dag) {
    // CRITICAL: Reset stop flag at start of execution to allow rerunning after stop
    this.isStopped = false;

    const executing = new Set();
    const completed = new Set();
    const executionPromises = new Map(); // Track promises for executing functions

    // Helper function to check and start all available functions
    const checkAndStartAvailable = async () => {
      // CRITICAL: Check if execution was stopped
      if (this.isStopped) {
        return [];
      }

      const promises = [];

      // Check all functions to see which ones can run
      for (let i = 0; i < dag.totalCount; i++) {
        // Skip if already executing or completed
        if (executing.has(i) || completed.has(i)) {
          continue;
        }

        // CRITICAL: Check if execution was stopped before starting new workers
        if (this.isStopped) {
          break;
        }

        // Check if this function can execute
        const executeStatus = await dag.canExecute(i);
        const canRun = executeStatus.canRun || false;

        // Prevent duplicate execution of the same uniqueId (same item) at the same time
        // This can happen if multiple triggers fire for the same code block
        if (canRun) {
          const context = dag.getExecutionContext(i);
          const uid =
            context.uniqueId ||
            (context.funcName && context.funcName.id) ||
            (context.funcName && context.funcName.uniqueId) ||
            null;
          if (uid) {
            if (executing.has(uid)) {
              // Already executing this item; skip to avoid duplicate worker
              console.warn(`Skipping duplicate execution for item ${uid}`);
              continue;
            }
          }
        }

        if (canRun) {
          // Respect maxWorkerCount limit
          if (executing.size >= this.maxWorkerCount) {
            break; // Wait for some to complete before starting more
          }

          // Track execution by index and by uniqueId (if present)
          executing.add(i);
          const context = dag.getExecutionContext(i);
          const uid =
            context.uniqueId ||
            (context.funcName && context.funcName.id) ||
            (context.funcName && context.funcName.uniqueId) ||
            null;
          if (uid) {
            executing.add(uid);
          }

          // Get execution context and run in worker
          // (context already retrieved above)

          // Create a promise that handles completion
          const executionPromise = this.workerThreads[i].worker(context)
            .then((result) => {
              executing.delete(i);
              if (uid) {
                executing.delete(uid);
              }
              completed.add(i);
              executionPromises.delete(i);

              return result;
            })
            .catch((error) => {
              executing.delete(i);
              if (uid) {
                executing.delete(uid);
              }
              executionPromises.delete(i);
              console.error(`Error in worker ${i}:`, error);
              throw error;
            });

          executionPromises.set(i, executionPromise);
          promises.push(executionPromise);
        }
      }

      return promises;
    };

    // Main execution loop with deadlock detection
    // CRITICAL: Detect deadlock when no items can run AND dependencies are definitively blocked
    let lastCompletedCount = completed.size;
    let noProgressIterations = 0;
    const maxNoProgressIterations = 5; // Allow a few iterations with no progress (in case items are still running)
    const MAX_ITERATIONS = 10000; // Prevent infinite loops
    const TIMEOUT_MS = 600000; // 10 minutes total timeout
    let iterationCount = 0;
    const startTime = Date.now();

    while (completed.size < dag.totalCount && !this.isStopped) {
      iterationCount++;

      // CRITICAL: Prevent infinite loops
      if (iterationCount > MAX_ITERATIONS) {
        console.error(`[threadEngine] Maximum iterations (${MAX_ITERATIONS}) reached - terminating to prevent memory overflow`);
        // Terminate all remaining items
        for (let i = 0; i < dag.totalCount; i++) {
          if (!completed.has(i)) {
            try {
              const context = dag.getExecutionContext(i);
              if (context && context.uniqueId) {
                this.emit('itemStatus', {
                  itemId: context.uniqueId,
                  status: 'error',
                  error: 'Execution timeout: Maximum iterations reached'
                });
              }
            } catch (e) {
              // Ignore errors when terminating
            }
            completed.add(i);
          }
        }
        break;
      }

      // CRITICAL: Check timeout
      if (Date.now() - startTime > TIMEOUT_MS) {
        console.error(`[threadEngine] Execution timeout (${TIMEOUT_MS}ms) - terminating to prevent memory overflow`);
        // Terminate all remaining items
        for (let i = 0; i < dag.totalCount; i++) {
          if (!completed.has(i)) {
            try {
              const context = dag.getExecutionContext(i);
              if (context && context.uniqueId) {
                this.emit('itemStatus', {
                  itemId: context.uniqueId,
                  status: 'error',
                  error: 'Execution timeout: Time limit exceeded'
                });
              }
            } catch (e) {
              // Ignore errors when terminating
            }
            completed.add(i);
          }
        }
        break;
      }
      // Start all available functions
      await checkAndStartAvailable();

      // If stopped, break out of loop
      if (this.isStopped) {
        this.killAllWorkers();
        break;
      }

      // If we have functions executing, wait for at least one to complete
      if (executionPromises.size > 0) {
        await Promise.race(Array.from(executionPromises.values()));
        lastCompletedCount = completed.size;
        noProgressIterations = 0; // Reset counter on progress
        // After a function completes, immediately check for new available functions
        continue;
      }

      // If no functions are executing, check for deadlock
      if (executionPromises.size === 0 && completed.size < dag.totalCount) {
        // Check all remaining items to see why they can't run
        const blockedItems = [];
        const waitingItems = [];

        for (let i = 0; i < dag.totalCount; i++) {
          if (completed.has(i) || executing.has(i)) {
            continue;
          }

          const executeStatus = await dag.canExecute(i);

          if (executeStatus.blockedDeps && executeStatus.blockedDeps.length > 0) {
            // Item is definitively blocked - dependency has failed/terminated
            blockedItems.push({ index: i, blockedDeps: executeStatus.blockedDeps });
          } else {
            // Item is waiting (dependency not ready yet, but might resolve)
            waitingItems.push(i);
          }
        }

        // If we have blocked items, terminate them immediately
        if (blockedItems.length > 0) {
          console.error(`[threadEngine] Deadlock detected: ${blockedItems.length} items blocked by failed dependencies. Terminating.`);
          for (const { index, blockedDeps } of blockedItems) {
            try {
              const context = dag.getExecutionContext(index);
              const itemId = context.uniqueId;
              const depReasons = blockedDeps.map(d => `${d.depId} (${d.reason})`).join(', ');

              if (itemId) {
                this.emit('itemStatus', {
                  itemId: itemId,
                  status: 'error',
                  error: `Dependency(ies) failed or terminated: ${depReasons}`
                });
              }
              completed.add(index); // Mark as completed (terminated)
            } catch (e) {
              console.error(`Error getting context for blocked item ${index}:`, e);
              completed.add(index); // Still mark as completed to avoid infinite loop
            }
          }
          // Continue loop to check remaining items
          continue;
        }

        // Check for circular dependencies among waiting items
        if (waitingItems.length > 0 && waitingItems.length === dag.totalCount - completed.size) {
          // All remaining items are waiting - check for circular dependency
          const waitingContexts = await Promise.all(
            waitingItems.map(async (i) => {
              try {
                const context = dag.getExecutionContext(i);
                return { index: i, uniqueId: context.uniqueId, dependencies: context.dependencies || [] };
              } catch (e) {
                return { index: i, uniqueId: null, dependencies: [] };
              }
            })
          );

          // Build dependency graph and detect cycles
          const waitingItemIds = new Set(waitingContexts.map(c => c.uniqueId).filter(id => id));
          const hasCircularDependency = waitingContexts.some(({ uniqueId, dependencies }) => {
            if (!uniqueId) return false;
            // Check if any of this item's dependencies depends back on this item or another waiting item
            return dependencies.some(depId => {
              if (!waitingItemIds.has(depId)) return false;
              // Check if depId's dependencies include uniqueId (direct cycle)
              const depContext = waitingContexts.find(c => c.uniqueId === depId);
              return depContext && depContext.dependencies.includes(uniqueId);
            });
          });

          if (hasCircularDependency) {
            console.error(`[threadEngine] Circular dependency detected among ${waitingItems.length} waiting items. Terminating cycle.`);
            for (const { index, uniqueId } of waitingContexts) {
              if (uniqueId) {
                this.emit('itemStatus', {
                  itemId: uniqueId,
                  status: 'error',
                  error: 'Circular dependency detected - dependencies form a cycle'
                });
              }
              completed.add(index); // Mark as completed (terminated)
            }
            // Continue loop - all circular items are now terminated
            continue;
          }
        }

        // No progress for several iterations - but items might still be running externally
        if (completed.size === lastCompletedCount) {
          noProgressIterations++;
          // Only wait if we have items that are still potentially waiting (not blocked)
          if (noProgressIterations >= maxNoProgressIterations && waitingItems.length === 0) {
            // No items executing, no items waiting, but not all completed
            // This shouldn't happen, but break to avoid infinite loop
            console.error(`[threadEngine] Execution stalled: ${completed.size}/${dag.totalCount} completed, but no items are executable or waiting. Terminating remaining items.`);
            // Terminate all remaining items
            for (let i = 0; i < dag.totalCount; i++) {
              if (!completed.has(i)) {
                try {
                  const context = dag.getExecutionContext(i);
                  if (context && context.uniqueId) {
                    this.emit('itemStatus', {
                      itemId: context.uniqueId,
                      status: 'error',
                      error: 'Execution stalled: No progress detected'
                    });
                  }
                } catch (e) {
                  // Ignore errors when terminating
                }
                completed.add(i);
              }
            }
            break;
          }
        } else {
          lastCompletedCount = completed.size;
          noProgressIterations = 0;
        }

        // Wait a bit before checking again
        await new Promise(resolve => setTimeout(resolve, 100));
      }
    }

    // Wait for all remaining executions to complete (unless stopped)
    if (executionPromises.size > 0 && !this.isStopped) {
      await Promise.all(Array.from(executionPromises.values()));
    } else if (this.isStopped) {
      // If stopped, kill all workers and reject remaining promises
      this.killAllWorkers();
      throw new Error('Execution stopped by user');
    }
  }

  /**
   * @memberof threadManager
   * @description Method initializer of the threads found in the workerThread object. It attaches each of the properties into the object.
   * @param {Number} index - number of the thread.
   */
  initializeWorkerThread(index) {
    // CRITICAL: Don't reinitialize if worker function already exists
    // This prevents creating multiple workers for the same thread index
    if (this.workerThreads[index].worker && typeof this.workerThreads[index].worker === 'function') {

      return;
    }

    // CRITICAL: Create worker instance ONCE when thread is initialized, not on each call
    // Store worker instance in the thread object for reuse
    let workerInstance = null;
    let workerEngineType = null;

    this.workerThreads[index].worker = (args) => {
      return new Promise(async (resolve, reject) => {
        // CRITICAL: Check if execution was stopped before using worker
        if (this.isStopped) {
          reject(new Error('Execution stopped'));
          return;
        }

        // Determine engine type for this execution
        const engineType = args.type === 'hydrolang' ? 'javascript' :
          args.type === 'python' ? 'python' :
            args.type === 'javascript' ? 'javascript' :
              args.engine === 'python' ? 'python' :
                args.engine === 'javascript' ? 'javascript' :
                  args.type === 'wasm' ? 'wasm' :
                    this.engine || 'javascript';

        // CRITICAL: Reuse existing worker if engine type matches, otherwise create new one
        // If engine type changed, terminate old worker and create new one
        if (workerInstance && workerEngineType !== engineType) {
          try {
            workerInstance.terminate();
            this.activeWorkers.delete(workerInstance);
          } catch (e) {
            console.warn('Error terminating old worker:', e);
          }
          workerInstance = null;
        }

        // Create worker only if it doesn't exist or was terminated
        if (!workerInstance) {
          let w;
          //CRITICAL INFO: WORKER NOT EXECUTE IF THE PATH IS "TOO RELATIVE", KEEP LONG SOURCE
          // if (typeof importScripts === "function") {
          //   importScripts(this.workerLocation);
          //   w = self;
          // } else {
          if (engineType === "webgpu") {
            w = new Worker(new URL("../../src/webgpu/wgpu.worker.js", import.meta.url), {
              type: "module",
            });
          } else if (engineType === "wasm") {
            w = new Worker(new URL("../../src/wasm/wasm.worker.js", import.meta.url), {
              type: "module",
            });
          } else if (engineType === "python") {
            w = new Worker(new URL("../../src/python/python.worker.js", import.meta.url), {
              type: "module",
            });
          } else if (engineType === "webr") {
            w = new Worker(new URL("../../src/R/webr.worker.js", import.meta.url), {
              type: "module",
            });
          } else {
            w = new Worker(new URL("../../src/javascript/js.worker.js", import.meta.url), {
              type: "module",
            });
          }


          // Store worker instance and engine type for reuse
          workerInstance = w;
          workerEngineType = engineType;

          // Set up message handlers ONCE when worker is created
          // These handlers will be reused for all executions on this worker
          w.onmessage = ({ data }) => {
            let { status, results, funcExec, workerExec } = data;

            // CRITICAL: Use itemId from message first (worker sends it directly)
            // Fallback to execution context if not in message (for backwards compatibility)
            let itemUniqueId = data.itemId || null;

            // If itemId not in message, try to get from execution context
            if (!itemUniqueId) {
              const execContext = this.activeWorkers.get(w);
              if (execContext) {
                const { args: currentArgs } = execContext;
                itemUniqueId = currentArgs.uniqueId ||
                  (currentArgs.funcName && currentArgs.funcName.id) ||
                  (currentArgs.funcName && currentArgs.funcName.uniqueId) ||
                  null;
              }
            }

            if (!itemUniqueId) {
              // For status messages, we must have itemId - log warning but don't return
              // For completion messages, we need context to resolve the promise
              if (data.type === 'status') {
                console.warn('Status message received but no itemId found:', data);
                return; // Can't emit status without itemId
              }
            }

            // Handle status messages - worker sends itemId directly in message
            if (data.type === 'status' && itemUniqueId) {
              this.emit('itemStatus', {
                itemId: itemUniqueId,
                status: data.status,
                error: data.error || null
              });
              return; // Status messages don't need to resolve promises
            }

            // Also emit for completed status (even if not type='status')
            // Need execution context to resolve promise
            const execContext = this.activeWorkers.get(w);
            if (!execContext) {
              // Silently ignore - this is likely a message from a completed execution
              return;
            }

            const { args: currentArgs, resolve: currentResolve, reject: currentReject } = execContext;

            // For completion messages, ensure we have uniqueId
            if (!itemUniqueId) {
              itemUniqueId = currentArgs.uniqueId ||
                (currentArgs.funcName && currentArgs.funcName.id) ||
                (currentArgs.funcName && currentArgs.funcName.uniqueId) ||
                null;
            }

            if (data.status === 'completed') {
              // Emit status update if we have uniqueId
              if (itemUniqueId) {
                this.emit('itemStatus', {
                  itemId: itemUniqueId,
                  status: 'completed',
                  error: null
                });
              }

              // Update timing and resolve promise
              (this.workerThreads[index].functionTime += funcExec),
                (this.workerThreads[index].workerTime += workerExec);
              // CRITICAL: Don't terminate worker - reuse it for next execution
              // Only remove from active tracking, don't terminate
              this.activeWorkers.delete(w);
              currentResolve(data);
              // NOTE: Worker is NOT terminated here - it's reused for subsequent calls
            }
          };

          w.onerror = (error) => {
            // Get the current execution context
            const execContext = this.activeWorkers.get(w);
            if (execContext) {
              const { args: currentArgs, reject: currentReject } = execContext;

              // CRITICAL: Ensure uniqueId is properly extracted
              let itemUniqueId = currentArgs.uniqueId ||
                (currentArgs.funcName && currentArgs.funcName.id) ||
                (currentArgs.funcName && currentArgs.funcName.uniqueId) ||
                null;

              // CRITICAL: Remove from active workers tracking
              this.activeWorkers.delete(w);

              this.emit('itemComplete', {
                itemId: itemUniqueId,
                status: 'error',
                error: error.message
              });

              // Also emit itemStatus for consistency
              if (itemUniqueId) {
                this.emit('itemStatus', {
                  itemId: itemUniqueId,
                  status: 'error',
                  error: error.message
                });
              }

              console.error(`There was an error executing thread: ${index}`, error);
              // Don't terminate worker on error - let it be reused
              currentReject(error);
            } else {
              console.error(`Error in worker ${index} but no execution context found`, error);
            }
          };
        }

        // Use the existing or newly created worker instance
        const w = workerInstance;

        // CRITICAL: Track this worker for termination (update tracking for this execution)
        // Store resolve/reject in a way that the message handler can access them
        // Since handlers are set up once, we need to use a closure or store them per execution
        const executionId = `${index}_${Date.now()}_${Math.random()}`;
        const executionContext = { index, args, resolve, reject, executionId };
        this.activeWorkers.set(w, executionContext);

        try {
          // buffer.byteLength === 0
          //   ? w.postMessage(args)
          //   : w.postMessage(args, [buffer]);
          w.postMessage(args)
        } catch (error) {
          // CRITICAL: Remove from active workers tracking
          this.activeWorkers.delete(w);

          // CRITICAL: Ensure uniqueId is properly extracted from args
          let itemUniqueId = args.uniqueId ||
            (args.funcName && args.funcName.id) ||
            (args.funcName && args.funcName.uniqueId) ||
            null;

          if (itemUniqueId) {
            this.emit('itemStatus', {
              itemId: itemUniqueId,
              status: 'error',
              error: error.message
            });
          }

          console.error(
            `There was an error with the execution of function: ${funcName}, step: ${step}.`
          );
          reject(error);
          w.terminate();
        }
      });
    };
  }

  /**
   * @memberof threadManager
   * @description Kill all active workers and stop execution
   */
  killAllWorkers() {

    this.isStopped = true;

    // Terminate all active workers
    for (const [worker, { index, args, resolve, reject }] of this.activeWorkers.entries()) {
      try {
        // Send stop message if worker supports it
        try {
          worker.postMessage({ type: 'stop' });
        } catch (e) {
          // Worker may not support stop message, continue to terminate
        }

        // Terminate the worker
        worker.terminate();

        // Reject the promise
        reject(new Error('Worker terminated by user'));

        // Emit error event
        this.emit('itemStatus', {
          itemId: args.uniqueId,
          status: 'error',
          error: 'Worker terminated by user'
        });


      } catch (error) {
        console.error(`Error terminating worker ${index}:`, error);
      }
    }

    // Clear all active workers
    this.activeWorkers.clear();


  }

  /**
   * @memberof threadManager
   * @description Stop all execution and kill workers
   */
  stop() {
    this.isStopped = true;
    this.killAllWorkers();
    // CRITICAL: Don't reset isStopped here - it will be reset when executeWithDag is called again
  }

  /**
   * @memberof threadManager
   * @description Resets all the workers set to work in the compute engine.
   */
  resetWorkers() {
    // CRITICAL: Kill any remaining active workers before reset
    if (this.activeWorkers.size > 0) {

      this.killAllWorkers();
    }

    this.maxWorkerCount = navigator.hardwareConcurrency - 1;
    this.workerThreads = {};
    this.results = [];
    this.functionOrder = [];
    // CRITICAL: Always reset stop flag when resetting workers
    // This ensures workers can be restarted after being stopped
    this.isStopped = false;

  }

  /**
   * @memberof threadManager
   * @description Retrives all the execution times of a worker thread. It is triggered within the engine class.
   */
  get execTimes() {
    let funcTime = 0,
      workerTime = 0;
    for (let i = 0; i < Object.keys(this.workerThreads).length; i++) {
      funcTime += this.workerThreads[i].functionTime;
      workerTime += this.workerThreads[i].workerTime;
    }
    return [funcTime, workerTime];
  }
}