#include "generation_queue.h" #include "model_manager.h" #include "stable_diffusion_wrapper.h" #include "utils.h" #include "logger.h" #include #include #include #include #include #include #include #include #include #define STB_IMAGE_WRITE_IMPLEMENTATION #include "../stable-diffusion.cpp-src/thirdparty/stb_image_write.h" #define STB_IMAGE_IMPLEMENTATION #include "../stable-diffusion.cpp-src/thirdparty/stb_image.h" class GenerationQueue::Impl { public: // Model manager reference ModelManager* modelManager = nullptr; // Thread management std::thread workerThread; std::atomic running{false}; std::atomic stopRequested{false}; // Queue management mutable std::mutex queueMutex; std::condition_variable queueCondition; std::queue requestQueue; // Job tracking mutable std::mutex jobsMutex; std::unordered_map activeJobs; std::unordered_map> jobPromises; // Hash job tracking std::map>> hashPromises; std::map hashRequests; // Conversion job tracking std::map>> conversionPromises; std::map conversionRequests; // Configuration int maxConcurrentGenerations = 1; std::string queueDir = "./queue"; std::string outputDir = "./output"; // Statistics std::atomic queueSize{0}; std::atomic activeGenerations{0}; std::atomic totalJobsProcessed{0}; // Worker thread function void workerThreadFunction() { LOG_INFO("GenerationQueue worker thread started"); while (running.load() && !stopRequested.load()) { std::unique_lock lock(queueMutex); // Wait for a request or stop signal queueCondition.wait(lock, [this] { return !requestQueue.empty() || stopRequested.load(); }); if (stopRequested.load()) { break; } if (requestQueue.empty()) { continue; } // Get the next request GenerationRequest request = requestQueue.front(); requestQueue.pop(); queueSize.store(requestQueue.size()); lock.unlock(); // Process the request processRequest(request); } LOG_INFO("GenerationQueue worker thread stopped"); } void processRequest(const GenerationRequest& request) { // Check if this is a hash job if (request.prompt == "HASH_JOB") { auto hashIt = hashRequests.find(request.id); if (hashIt != hashRequests.end()) { HashResult result = performHashJob(hashIt->second); auto promiseIt = hashPromises.find(request.id); if (promiseIt != hashPromises.end()) { promiseIt->second->set_value(result); hashPromises.erase(promiseIt); } hashRequests.erase(hashIt); } return; } // Check if this is a conversion job if (request.prompt == "CONVERSION_JOB") { auto convIt = conversionRequests.find(request.id); if (convIt != conversionRequests.end()) { ConversionResult result = performConversionJob(convIt->second); auto promiseIt = conversionPromises.find(request.id); if (promiseIt != conversionPromises.end()) { promiseIt->second->set_value(result); conversionPromises.erase(promiseIt); } conversionRequests.erase(convIt); } return; } auto startTime = std::chrono::system_clock::now(); // Update job status to PROCESSING (only if not already in PROCESSING from model loading) { std::lock_guard lock(jobsMutex); if (activeJobs.find(request.id) != activeJobs.end()) { // Only change status if it was QUEUED (not MODEL_LOADING) if (activeJobs[request.id].status == GenerationStatus::QUEUED) { activeJobs[request.id].status = GenerationStatus::PROCESSING; activeJobs[request.id].progress = 0.0f; activeJobs[request.id].modelLoadProgress = 0.0f; activeJobs[request.id].generationProgress = 0.0f; activeJobs[request.id].firstGenerationCallback = true; // Initialize first callback flag } else if (activeJobs[request.id].status == GenerationStatus::MODEL_LOADING) { // Transition from MODEL_LOADING to PROCESSING activeJobs[request.id].status = GenerationStatus::PROCESSING; // Model loading should already be complete at this point activeJobs[request.id].modelLoadProgress = 1.0f; activeJobs[request.id].firstGenerationCallback = true; // Initialize first callback flag } activeJobs[request.id].startTime = startTime; activeJobs[request.id].currentStep = 0; activeJobs[request.id].totalSteps = 0; if (activeJobs[request.id].timeElapsed == 0) { activeJobs[request.id].timeElapsed = 0; } if (activeJobs[request.id].timeRemaining == 0) { activeJobs[request.id].timeRemaining = 0; } if (activeJobs[request.id].speed == 0.0f) { activeJobs[request.id].speed = 0.0f; } saveJobToFile(activeJobs[request.id]); } } activeGenerations.store(1); // Only one generation at a time LOG_INFO("Processing generation request: " + request.id + " (prompt: " + request.prompt.substr(0, 50) + (request.prompt.length() > 50 ? "..." : "") + ")"); // Real generation logic using stable-diffusion.cpp with progress tracking GenerationResult result = performActualGeneration(request, request.id); auto endTime = std::chrono::system_clock::now(); auto duration = std::chrono::duration_cast(endTime - startTime); result.generationTime = duration.count(); // Update job status to COMPLETED/FAILED { std::lock_guard lock(jobsMutex); if (activeJobs.find(request.id) != activeJobs.end()) { float finalProgress = activeJobs[request.id].progress; auto completionTime = std::chrono::system_clock::now(); auto totalGenerationTime = std::chrono::duration_cast(completionTime - startTime).count(); LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Total generation time: " + std::to_string(totalGenerationTime) + "ms"); LOG_DEBUG("[JOB_COMPLETION] Job " + request.id + " - Status changing to '" + (result.success ? "completed" : "failed") + "'" + " - Progress at completion: " + std::to_string(static_cast(finalProgress * 100.0f)) + "%"); activeJobs[request.id].status = result.success ? GenerationStatus::COMPLETED : GenerationStatus::FAILED; activeJobs[request.id].endTime = endTime; // Set final progress to 100% if successful if (result.success) { activeJobs[request.id].progress = 1.0f; if (activeJobs[request.id].totalSteps > 0) { activeJobs[request.id].currentStep = activeJobs[request.id].totalSteps; } } // Store output files, error message, and generation results activeJobs[request.id].outputFiles = result.imagePaths; activeJobs[request.id].errorMessage = result.errorMessage; activeJobs[request.id].actualSeed = result.actualSeed; activeJobs[request.id].generationTime = result.generationTime; { std::ostringstream oss; oss << "Job " << request.id << " completed. Success: " << (result.success ? "true" : "false") << ", Image paths: " << result.imagePaths.size(); LOG_DEBUG(oss.str()); } for (size_t i = 0; i < result.imagePaths.size(); ++i) { { std::ostringstream oss2; oss2 << "Image " << i << ": " << result.imagePaths[i]; LOG_DEBUG(oss2.str()); } } if (!result.errorMessage.empty()) { std::ostringstream err_oss; err_oss << "Error message: " << result.errorMessage; LOG_DEBUG(err_oss.str()); } // Persist to disk saveJobToFile(activeJobs[request.id]); } // Set the promise value auto it = jobPromises.find(request.id); if (it != jobPromises.end()) { it->second.set_value(result); jobPromises.erase(it); } } activeGenerations.store(0); totalJobsProcessed.fetch_add(1); std::string completionMsg = "Completed generation request: " + request.id + " (success: " + (result.success ? "true" : "false") + ", time: " + std::to_string(result.generationTime) + "ms)"; if (!result.success && !result.errorMessage.empty()) { completionMsg += " - Error: " + result.errorMessage; } LOG_INFO(completionMsg); } // Progress callback that updates model loading progress void updateModelLoadProgress(const std::string& jobId, float modelProgress, uint64_t timeElapsed) { std::lock_guard lock(jobsMutex); auto it = activeJobs.find(jobId); if (it != activeJobs.end()) { // Clamp model loading progress to valid range [0.0, 1.0] modelProgress = std::max(0.0f, std::min(1.0f, modelProgress)); it->second.modelLoadProgress = modelProgress; it->second.generationProgress = 0.0f; // Overall progress during model loading is just the model loading progress it->second.progress = modelProgress; it->second.timeElapsed = static_cast(timeElapsed); // Update status message to reflect model loading if (!it->second.prompt.empty()) { size_t bracketPos = it->second.prompt.find(" [Loading model:"); if (bracketPos == std::string::npos) { it->second.prompt = it->second.prompt + " [Loading model: " + std::to_string(static_cast(modelProgress * 100)) + "%]"; } else { // Update existing loading message it->second.prompt = it->second.prompt.substr(0, bracketPos) + " [Loading model: " + std::to_string(static_cast(modelProgress * 100)) + "%]"; } } saveJobToFile(it->second); } } // Progress callback that updates generation progress void updateGenerationProgress(const std::string& jobId, int step, int totalSteps, float genProgress, uint64_t timeElapsed) { std::lock_guard lock(jobsMutex); auto it = activeJobs.find(jobId); if (it != activeJobs.end()) { // Clamp generation progress to valid range [0.0, 1.0] genProgress = std::max(0.0f, std::min(1.0f, genProgress)); it->second.generationProgress = genProgress; it->second.currentStep = step; it->second.totalSteps = totalSteps; it->second.timeElapsed = static_cast(timeElapsed); // Handle first generation callback specially if (it->second.firstGenerationCallback) { // This is the first callback, ignore the incoming genProgress value // and set overall progress to exactly 50% (model loading complete) it->second.generationProgress = 0.0f; it->second.progress = 0.5f; it->second.firstGenerationCallback = false; // Mark that we've handled the first callback LOG_DEBUG("First generation callback for job " + jobId + " - Ignored genProgress (" + std::to_string(genProgress) + "), reset generation progress to 0.0 and overall progress to 50%"); } else { // For subsequent callbacks, calculate overall progress normally: 50% for model loading + 50% for generation it->second.progress = 0.5f + (genProgress * 0.5f); } // Clamp overall progress to valid range [0.0, 1.0] it->second.progress = std::max(0.0f, std::min(1.0f, it->second.progress)); // Calculate time remaining and speed if (step > 0 && timeElapsed > 0) { double avgStepTime = static_cast(timeElapsed) / step; int remainingSteps = totalSteps - step; it->second.timeRemaining = static_cast(avgStepTime * remainingSteps); it->second.speed = 1000.0 / avgStepTime; // steps per second } // Clean up loading message from prompt if (!it->second.prompt.empty()) { size_t bracketPos = it->second.prompt.find(" [Loading model:"); if (bracketPos != std::string::npos) { it->second.prompt = it->second.prompt.substr(0, bracketPos); } } // Save progress to file periodically (every 10 steps or on significant progress changes) if (step % 10 == 0 || genProgress >= 0.99f) { saveJobToFile(it->second); } } } GenerationResult performActualGeneration(const GenerationRequest& request, const std::string& jobId) { GenerationResult result; result.requestId = request.id; result.success = false; // Check if model manager is available if (!modelManager) { result.errorMessage = "Model manager not available"; return result; } if (!modelManager->isModelLoaded(request.modelName)) { // Update status to show model is being loaded { std::lock_guard lock(jobsMutex); if (activeJobs.find(request.id) != activeJobs.end()) { activeJobs[request.id].status = GenerationStatus::MODEL_LOADING; activeJobs[request.id].progress = 0.0f; // Start at 0% for model loading saveJobToFile(activeJobs[request.id]); } } // Auto-load the model with progress tracking auto modelLoadStartTime = std::chrono::system_clock::now(); // Create a progress callback for model loading auto modelLoadProgressCallback = [this, jobId = request.id, modelLoadStartTime](float loadProgress) { auto currentTime = std::chrono::system_clock::now(); uint64_t timeElapsed = std::chrono::duration_cast(currentTime - modelLoadStartTime).count(); updateModelLoadProgress(jobId, loadProgress, timeElapsed); }; bool loadSuccess = modelManager->loadModel(request.modelName, modelLoadProgressCallback); if (!loadSuccess || !modelManager->isModelLoaded(request.modelName)) { result.errorMessage = "Failed to load model: " + request.modelName; // Update job status to failed { std::lock_guard lock(jobsMutex); if (activeJobs.find(request.id) != activeJobs.end()) { activeJobs[request.id].status = GenerationStatus::FAILED; activeJobs[request.id].errorMessage = result.errorMessage; saveJobToFile(activeJobs[request.id]); } } return result; } // Reset status after successful model loading { std::lock_guard lock(jobsMutex); if (activeJobs.find(request.id) != activeJobs.end()) { // Remove the loading progress suffix from prompt size_t bracketPos = activeJobs[request.id].prompt.find(" [Loading model:"); if (bracketPos != std::string::npos) { activeJobs[request.id].prompt = activeJobs[request.id].prompt.substr(0, bracketPos); } // Set model loading to complete, overall progress to exactly 50% (halfway through) activeJobs[request.id].modelLoadProgress = 1.0f; activeJobs[request.id].generationProgress = 0.0f; activeJobs[request.id].progress = 0.5f; activeJobs[request.id].firstGenerationCallback = true; // Initialize first callback flag saveJobToFile(activeJobs[request.id]); } } } // Get the model wrapper from the shared model manager auto* modelWrapper = modelManager->getModel(request.modelName); if (!modelWrapper) { result.errorMessage = "Model not found or not loaded: " + request.modelName; return result; } // Prepare generation parameters StableDiffusionWrapper::GenerationParams params; params.prompt = request.prompt; params.negativePrompt = request.negativePrompt; params.width = request.width; params.height = request.height; params.batchCount = request.batchCount; params.steps = request.steps; params.cfgScale = request.cfgScale; params.samplingMethod = samplingMethodToString(request.samplingMethod); params.scheduler = schedulerToString(request.scheduler); params.clipSkip = request.clipSkip; params.strength = request.strength; params.controlStrength = request.controlStrength; params.nThreads = request.nThreads; params.offloadParamsToCpu = request.offloadParamsToCpu; params.clipOnCpu = request.clipOnCpu; params.vaeOnCpu = request.vaeOnCpu; params.diffusionFlashAttn = request.diffusionFlashAttn; params.diffusionConvDirect = request.diffusionConvDirect; params.vaeConvDirect = request.vaeConvDirect; // Set model paths if provided params.modelPath = modelManager->getModelInfo(request.modelName).path; params.clipLPath = request.clipLPath; params.clipGPath = request.clipGPath; params.vaePath = request.vaePath; params.taesdPath = request.taesdPath; params.controlNetPath = request.controlNetPath; params.embeddingDir = request.embeddingDir; params.loraModelDir = request.loraModelDir; // Parse seed if (request.seed == "random") { std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution dis; params.seed = dis(gen); } else { try { params.seed = std::stoll(request.seed); } catch (...) { params.seed = 42; // Default seed } } result.actualSeed = params.seed; // Generate images based on request type with progress tracking try { std::vector generatedImages; // Create progress callback that updates job info auto progressCallback = [this, jobId](int step, int totalSteps, float progress, void* userData) { // Calculate time elapsed from start time (stored in userData) auto startTime = userData ? *static_cast(userData) : std::chrono::system_clock::now(); auto currentTime = std::chrono::system_clock::now(); uint64_t timeElapsed = std::chrono::duration_cast(currentTime - startTime).count(); // Use the new generation progress update function updateGenerationProgress(jobId, step, totalSteps, progress, timeElapsed); }; // Store start time to pass as user data auto generationStartTime = std::chrono::system_clock::now(); switch (request.requestType) { case GenerationRequest::RequestType::TEXT2IMG: generatedImages = modelWrapper->generateImage(params, progressCallback, &generationStartTime); break; case GenerationRequest::RequestType::IMG2IMG: if (request.initImageData.empty()) { result.errorMessage = "No init image data provided for img2img"; return result; } generatedImages = modelWrapper->generateImageImg2Img( params, request.initImageData, request.initImageWidth, request.initImageHeight, progressCallback, &generationStartTime ); break; case GenerationRequest::RequestType::CONTROLNET: if (request.controlImageData.empty()) { result.errorMessage = "No control image data provided for ControlNet"; return result; } generatedImages = modelWrapper->generateImageControlNet( params, request.controlImageData, request.controlImageWidth, request.controlImageHeight, progressCallback, &generationStartTime ); break; case GenerationRequest::RequestType::UPSCALER: if (request.initImageData.empty()) { result.errorMessage = "No input image data provided for upscaling"; return result; } if (request.esrganPath.empty()) { result.errorMessage = "No ESRGAN model path provided for upscaling"; return result; } { auto upscaledImage = modelWrapper->upscaleImage( request.esrganPath, request.initImageData, request.initImageWidth, request.initImageHeight, request.initImageChannels, request.upscaleFactor, request.nThreads, request.offloadParamsToCpu, request.diffusionConvDirect ); generatedImages.push_back(upscaledImage); } break; case GenerationRequest::RequestType::INPAINTING: if (request.initImageData.empty()) { result.errorMessage = "No source image data provided for inpainting"; return result; } if (request.maskImageData.empty()) { result.errorMessage = "No mask image data provided for inpainting"; return result; } generatedImages = modelWrapper->generateImageInpainting( params, request.initImageData, request.initImageWidth, request.initImageHeight, request.maskImageData, request.maskImageWidth, request.maskImageHeight, progressCallback, &generationStartTime ); break; default: result.errorMessage = "Unknown request type"; return result; } if (generatedImages.empty()) { result.errorMessage = "Failed to generate images: " + modelWrapper->getLastError(); return result; } // Save generated images to files LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Starting post-processing (image saving) phase"); auto postProcessingStart = std::chrono::system_clock::now(); for (size_t i = 0; i < generatedImages.size(); i++) { const auto& image = generatedImages[i]; LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Saving image " + std::to_string(i+1) + "/" + std::to_string(generatedImages.size())); auto imageSaveStart = std::chrono::system_clock::now(); std::string imagePath = saveImageToFile(image, request.id, i); auto imageSaveEnd = std::chrono::system_clock::now(); auto imageSaveTime = std::chrono::duration_cast(imageSaveEnd - imageSaveStart).count(); LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Image " + std::to_string(i+1) + " save took " + std::to_string(imageSaveTime) + "ms"); if (!imagePath.empty()) { result.imagePaths.push_back(imagePath); } else { result.errorMessage = "Failed to save generated image " + std::to_string(i); return result; } } auto postProcessingEnd = std::chrono::system_clock::now(); auto postProcessingTime = std::chrono::duration_cast(postProcessingEnd - postProcessingStart).count(); LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Post-processing completed in " + std::to_string(postProcessingTime) + "ms"); result.success = true; result.generationTime = generatedImages.empty() ? 0 : generatedImages[0].generationTime; result.errorMessage = ""; } catch (const std::exception& e) { result.errorMessage = "Exception during generation: " + std::string(e.what()); } return result; } std::string saveImageToFile(const StableDiffusionWrapper::GeneratedImage& image, const std::string& requestId, size_t index) { // Create job-specific output directory std::string jobOutputDir = outputDir + "/" + requestId; std::error_code ec; std::filesystem::create_directories(jobOutputDir, ec); if (ec) { LOG_ERROR("Failed to create output directory " + jobOutputDir + ": " + ec.message()); return ""; } // Generate filename std::stringstream ss; ss << jobOutputDir << "/" << requestId << "_" << index << ".png"; std::string filename = ss.str(); LOG_DEBUG("Attempting to save image to: " + filename); // Check if image data is valid if (image.data.empty() || image.width <= 0 || image.height <= 0) { LOG_ERROR("Invalid image data for " + requestId + "_" + std::to_string(index) + ": width=" + std::to_string(image.width) + ", height=" + std::to_string(image.height) + ", channels=" + std::to_string(image.channels) + ", data_size=" + std::to_string(image.data.size())); return ""; } // Validate image data integrity const size_t expectedDataSize = static_cast(image.width) * image.height * image.channels; if (image.data.size() != expectedDataSize) { LOG_WARNING("Image data size mismatch for " + requestId + "_" + std::to_string(index) + ": expected=" + std::to_string(expectedDataSize) + ", actual=" + std::to_string(image.data.size())); // Continue anyway, but log the warning } // Check if we can write to the directory std::ofstream testFile(filename + ".test"); if (!testFile.is_open()) { LOG_ERROR("Cannot write to directory " + jobOutputDir + ": permission denied or disk full"); return ""; } testFile.close(); std::filesystem::remove(filename + ".test"); // Write PNG file using stb_image_write with detailed error logging LOG_DEBUG("Writing PNG file: " + filename + " (size: " + std::to_string(image.width) + "x" + std::to_string(image.height) + "x" + std::to_string(image.channels) + ")"); int result = stbi_write_png( filename.c_str(), image.width, image.height, image.channels, image.data.data(), image.width * image.channels // stride in bytes ); if (result == 0) { LOG_ERROR("stbi_write_png failed for " + filename); // Try to get more detailed error information LOG_ERROR("Image details:"); LOG_ERROR(" Dimensions: " + std::to_string(image.width) + "x" + std::to_string(image.height)); LOG_ERROR(" Channels: " + std::to_string(image.channels)); LOG_ERROR(" Data size: " + std::to_string(image.data.size()) + " bytes"); LOG_ERROR(" Expected size: " + std::to_string(expectedDataSize) + " bytes"); LOG_ERROR(" Stride: " + std::to_string(image.width * image.channels) + " bytes"); // Check if file was created but is empty if (std::filesystem::exists(filename)) { auto fileSize = std::filesystem::file_size(filename); LOG_ERROR(" File exists but size is: " + std::to_string(fileSize) + " bytes"); if (fileSize == 0) { LOG_ERROR(" ERROR: Zero-byte file created - stbi_write_png returned false but file exists"); } } else { LOG_ERROR(" File was not created"); } // Check disk space try { auto space = std::filesystem::space(jobOutputDir); LOG_ERROR(" Available disk space: " + std::to_string(space.available / (1024 * 1024)) + " MB"); } catch (const std::exception& e) { LOG_ERROR(" Could not check disk space: " + std::string(e.what())); } return ""; } // Verify the file was created successfully and has content if (!std::filesystem::exists(filename)) { LOG_ERROR("ERROR: stbi_write_png returned success but file does not exist: " + filename); return ""; } auto fileSize = std::filesystem::file_size(filename); if (fileSize == 0) { LOG_ERROR("ERROR: stbi_write_png returned success but created zero-byte file: " + filename); return ""; } LOG_DEBUG("Successfully saved generated image to: " + filename + " (" + std::to_string(image.width) + "x" + std::to_string(image.height) + ", " + std::to_string(image.channels) + " channels, " + std::to_string(image.data.size()) + " data bytes, " + std::to_string(fileSize) + " file bytes)"); return filename; } std::string samplingMethodToString(SamplingMethod method) { switch (method) { case SamplingMethod::EULER: return "euler"; case SamplingMethod::EULER_A: return "euler_a"; case SamplingMethod::HEUN: return "heun"; case SamplingMethod::DPM2: return "dpm2"; case SamplingMethod::DPMPP2S_A: return "dpmpp2s_a"; case SamplingMethod::DPMPP2M: return "dpmpp2m"; case SamplingMethod::DPMPP2MV2: return "dpmpp2mv2"; case SamplingMethod::IPNDM: return "ipndm"; case SamplingMethod::IPNDM_V: return "ipndm_v"; case SamplingMethod::LCM: return "lcm"; case SamplingMethod::DDIM_TRAILING: return "ddim_trailing"; case SamplingMethod::TCD: return "tcd"; default: return "euler"; } } std::string schedulerToString(Scheduler scheduler) { switch (scheduler) { case Scheduler::DISCRETE: return "discrete"; case Scheduler::KARRAS: return "karras"; case Scheduler::EXPONENTIAL: return "exponential"; case Scheduler::AYS: return "ays"; case Scheduler::GITS: return "gits"; case Scheduler::SMOOTHSTEP: return "smoothstep"; case Scheduler::SGM_UNIFORM: return "sgm_uniform"; case Scheduler::SIMPLE: return "simple"; default: return "default"; } } std::string jobStatusToString(GenerationStatus status) { switch (status) { case GenerationStatus::QUEUED: return "queued"; case GenerationStatus::MODEL_LOADING: return "loading"; case GenerationStatus::PROCESSING: return "processing"; case GenerationStatus::COMPLETED: return "completed"; case GenerationStatus::FAILED: return "failed"; default: return "unknown"; } } GenerationStatus stringToJobStatus(const std::string& status) { if (status == "queued") return GenerationStatus::QUEUED; if (status == "loading") return GenerationStatus::MODEL_LOADING; if (status == "processing") return GenerationStatus::PROCESSING; if (status == "completed") return GenerationStatus::COMPLETED; if (status == "failed") return GenerationStatus::FAILED; return GenerationStatus::QUEUED; } std::string jobTypeToString(JobType type) { switch (type) { case JobType::GENERATION: return "generation"; case JobType::HASHING: return "hashing"; default: return "unknown"; } } JobType stringToJobType(const std::string& type) { if (type == "generation") return JobType::GENERATION; if (type == "hashing") return JobType::HASHING; return JobType::GENERATION; } SamplingMethod stringToSamplingMethod(const std::string& method) { if (method == "euler") return SamplingMethod::EULER; if (method == "euler_a") return SamplingMethod::EULER_A; if (method == "heun") return SamplingMethod::HEUN; if (method == "dpm2") return SamplingMethod::DPM2; if (method == "dpmpp2s_a") return SamplingMethod::DPMPP2S_A; if (method == "dpmpp2m") return SamplingMethod::DPMPP2M; if (method == "dpmpp2mv2") return SamplingMethod::DPMPP2MV2; if (method == "ipndm") return SamplingMethod::IPNDM; if (method == "ipndm_v") return SamplingMethod::IPNDM_V; if (method == "lcm") return SamplingMethod::LCM; if (method == "ddim_trailing") return SamplingMethod::DDIM_TRAILING; if (method == "tcd") return SamplingMethod::TCD; return SamplingMethod::DEFAULT; } Scheduler stringToScheduler(const std::string& scheduler) { if (scheduler == "discrete") return Scheduler::DISCRETE; if (scheduler == "karras") return Scheduler::KARRAS; if (scheduler == "exponential") return Scheduler::EXPONENTIAL; if (scheduler == "ays") return Scheduler::AYS; if (scheduler == "gits") return Scheduler::GITS; if (scheduler == "smoothstep") return Scheduler::SMOOTHSTEP; if (scheduler == "sgm_uniform") return Scheduler::SGM_UNIFORM; if (scheduler == "simple") return Scheduler::SIMPLE; return Scheduler::DEFAULT; } void saveJobToFile(const JobInfo& job) { try { // Create queue directory if it doesn't exist std::filesystem::create_directories(queueDir); // Create JSON object with enhanced fields nlohmann::json jobJson; // Basic fields jobJson["id"] = job.id; jobJson["type"] = jobTypeToString(job.type); jobJson["status"] = jobStatusToString(job.status); jobJson["prompt"] = job.prompt; jobJson["position"] = job.position; // Convert time points to milliseconds since epoch auto queuedMs = std::chrono::duration_cast( job.queuedTime.time_since_epoch()).count(); jobJson["queued_time"] = queuedMs; if (job.status != GenerationStatus::QUEUED) { auto startMs = std::chrono::duration_cast( job.startTime.time_since_epoch()).count(); jobJson["start_time"] = startMs; } if (job.status == GenerationStatus::COMPLETED || job.status == GenerationStatus::FAILED) { auto endMs = std::chrono::duration_cast( job.endTime.time_since_epoch()).count(); jobJson["end_time"] = endMs; } // Enhanced fields for repeatable generation if (!job.modelName.empty()) { jobJson["model_name"] = job.modelName; jobJson["model_hash"] = job.modelHash; jobJson["model_path"] = job.modelPath; } if (!job.negativePrompt.empty()) { jobJson["negative_prompt"] = job.negativePrompt; } jobJson["width"] = job.width; jobJson["height"] = job.height; jobJson["batch_count"] = job.batchCount; jobJson["steps"] = job.steps; jobJson["cfg_scale"] = job.cfgScale; jobJson["sampling_method"] = samplingMethodToString(job.samplingMethod); jobJson["scheduler"] = schedulerToString(job.scheduler); jobJson["seed"] = job.seed; jobJson["actual_seed"] = job.actualSeed; jobJson["request_type"] = job.requestType; jobJson["strength"] = job.strength; jobJson["control_strength"] = job.controlStrength; jobJson["clip_skip"] = job.clipSkip; jobJson["n_threads"] = job.nThreads; jobJson["offload_params_to_cpu"] = job.offloadParamsToCpu; jobJson["clip_on_cpu"] = job.clipOnCpu; jobJson["vae_on_cpu"] = job.vaeOnCpu; jobJson["diffusion_flash_attn"] = job.diffusionFlashAttn; jobJson["diffusion_conv_direct"] = job.diffusionConvDirect; jobJson["vae_conv_direct"] = job.vaeConvDirect; jobJson["generation_time"] = job.generationTime; // Image paths for complex operations if (!job.initImageData.empty()) { jobJson["init_image_path"] = job.initImageData; } if (!job.controlImageData.empty()) { jobJson["control_image_path"] = job.controlImageData; } if (!job.maskImageData.empty()) { jobJson["mask_image_path"] = job.maskImageData; } // Model paths for advanced usage if (!job.clipLPath.empty()) { jobJson["clip_l_path"] = job.clipLPath; } if (!job.clipGPath.empty()) { jobJson["clip_g_path"] = job.clipGPath; } if (!job.vaePath.empty()) { jobJson["vae_path"] = job.vaePath; } if (!job.taesdPath.empty()) { jobJson["taesd_path"] = job.taesdPath; } if (!job.controlNetPath.empty()) { jobJson["controlnet_path"] = job.controlNetPath; } if (!job.embeddingDir.empty()) { jobJson["embedding_dir"] = job.embeddingDir; } if (!job.loraModelDir.empty()) { jobJson["lora_model_dir"] = job.loraModelDir; } if (!job.esrganPath.empty()) { jobJson["esrgan_path"] = job.esrganPath; } jobJson["upscale_factor"] = job.upscaleFactor; // Progress and timing information jobJson["progress"] = job.progress; jobJson["model_load_progress"] = job.modelLoadProgress; jobJson["generation_progress"] = job.generationProgress; jobJson["current_step"] = job.currentStep; jobJson["total_steps"] = job.totalSteps; jobJson["time_elapsed"] = job.timeElapsed; jobJson["time_remaining"] = job.timeRemaining; jobJson["speed"] = job.speed; jobJson["first_generation_callback"] = job.firstGenerationCallback; // Output information jobJson["output_files"] = job.outputFiles; jobJson["error_message"] = job.errorMessage; // Write to file std::string filename = queueDir + "/" + job.id + ".json"; std::ofstream file(filename); if (file.is_open()) { file << jobJson.dump(2); file.close(); } } catch (const std::exception& e) { LOG_ERROR("Error saving job to file: " + std::string(e.what())); } } void loadJobsFromDisk() { try { if (!std::filesystem::exists(queueDir)) { return; } LOG_INFO("Loading persisted jobs from: " + queueDir); int loadedCount = 0; for (const auto& entry : std::filesystem::directory_iterator(queueDir)) { if (entry.path().extension() != ".json") { continue; } try { std::ifstream file(entry.path()); if (!file.is_open()) { continue; } nlohmann::json jobJson = nlohmann::json::parse(file); file.close(); // Reconstruct JobInfo JobInfo job; job.id = jobJson["id"]; job.type = stringToJobType(jobJson["type"]); job.status = stringToJobStatus(jobJson["status"]); job.prompt = jobJson["prompt"]; job.position = jobJson["position"]; // Reconstruct time points auto queuedMs = jobJson["queued_time"].get(); job.queuedTime = std::chrono::system_clock::time_point( std::chrono::milliseconds(queuedMs)); if (jobJson.contains("start_time")) { auto startMs = jobJson["start_time"].get(); job.startTime = std::chrono::system_clock::time_point( std::chrono::milliseconds(startMs)); } if (jobJson.contains("end_time")) { auto endMs = jobJson["end_time"].get(); job.endTime = std::chrono::system_clock::time_point( std::chrono::milliseconds(endMs)); } if (jobJson.contains("output_files")) { job.outputFiles = jobJson["output_files"].get>(); } if (jobJson.contains("error_message")) { job.errorMessage = jobJson["error_message"]; } // Load enhanced fields (with backward compatibility) if (jobJson.contains("model_name")) { job.modelName = jobJson["model_name"]; job.modelHash = jobJson.value("model_hash", ""); job.modelPath = jobJson.value("model_path", ""); } if (jobJson.contains("negative_prompt")) { job.negativePrompt = jobJson["negative_prompt"]; } if (jobJson.contains("width")) job.width = jobJson["width"]; if (jobJson.contains("height")) job.height = jobJson["height"]; if (jobJson.contains("batch_count")) job.batchCount = jobJson["batch_count"]; if (jobJson.contains("steps")) job.steps = jobJson["steps"]; if (jobJson.contains("cfg_scale")) job.cfgScale = jobJson["cfg_scale"]; if (jobJson.contains("sampling_method")) { std::string samplingMethodStr = jobJson["sampling_method"]; job.samplingMethod = stringToSamplingMethod(samplingMethodStr); } if (jobJson.contains("scheduler")) { std::string schedulerStr = jobJson["scheduler"]; job.scheduler = stringToScheduler(schedulerStr); } if (jobJson.contains("seed")) job.seed = jobJson["seed"]; if (jobJson.contains("actual_seed")) job.actualSeed = jobJson["actual_seed"]; if (jobJson.contains("request_type")) job.requestType = jobJson["request_type"]; if (jobJson.contains("strength")) job.strength = jobJson["strength"]; if (jobJson.contains("control_strength")) job.controlStrength = jobJson["control_strength"]; if (jobJson.contains("clip_skip")) job.clipSkip = jobJson["clip_skip"]; if (jobJson.contains("n_threads")) job.nThreads = jobJson["n_threads"]; if (jobJson.contains("offload_params_to_cpu")) job.offloadParamsToCpu = jobJson["offload_params_to_cpu"]; if (jobJson.contains("clip_on_cpu")) job.clipOnCpu = jobJson["clip_on_cpu"]; if (jobJson.contains("vae_on_cpu")) job.vaeOnCpu = jobJson["vae_on_cpu"]; if (jobJson.contains("diffusion_flash_attn")) job.diffusionFlashAttn = jobJson["diffusion_flash_attn"]; if (jobJson.contains("diffusion_conv_direct")) job.diffusionConvDirect = jobJson["diffusion_conv_direct"]; if (jobJson.contains("vae_conv_direct")) job.vaeConvDirect = jobJson["vae_conv_direct"]; if (jobJson.contains("generation_time")) job.generationTime = jobJson["generation_time"]; // Image paths for complex operations if (jobJson.contains("init_image_path")) job.initImageData = jobJson["init_image_path"]; if (jobJson.contains("control_image_path")) job.controlImageData = jobJson["control_image_path"]; if (jobJson.contains("mask_image_path")) job.maskImageData = jobJson["mask_image_path"]; // Model paths for advanced usage if (jobJson.contains("clip_l_path")) job.clipLPath = jobJson["clip_l_path"]; if (jobJson.contains("clip_g_path")) job.clipGPath = jobJson["clip_g_path"]; if (jobJson.contains("vae_path")) job.vaePath = jobJson["vae_path"]; if (jobJson.contains("taesd_path")) job.taesdPath = jobJson["taesd_path"]; if (jobJson.contains("controlnet_path")) job.controlNetPath = jobJson["controlnet_path"]; if (jobJson.contains("embedding_dir")) job.embeddingDir = jobJson["embedding_dir"]; if (jobJson.contains("lora_model_dir")) job.loraModelDir = jobJson["lora_model_dir"]; if (jobJson.contains("esrgan_path")) job.esrganPath = jobJson["esrgan_path"]; if (jobJson.contains("upscale_factor")) job.upscaleFactor = jobJson["upscale_factor"]; // Progress and timing information if (jobJson.contains("progress")) job.progress = jobJson["progress"]; if (jobJson.contains("model_load_progress")) job.modelLoadProgress = jobJson["model_load_progress"]; if (jobJson.contains("generation_progress")) job.generationProgress = jobJson["generation_progress"]; if (jobJson.contains("current_step")) job.currentStep = jobJson["current_step"]; if (jobJson.contains("total_steps")) job.totalSteps = jobJson["total_steps"]; if (jobJson.contains("time_elapsed")) job.timeElapsed = jobJson["time_elapsed"]; if (jobJson.contains("time_remaining")) job.timeRemaining = jobJson["time_remaining"]; if (jobJson.contains("speed")) job.speed = jobJson["speed"]; if (jobJson.contains("first_generation_callback")) { job.firstGenerationCallback = jobJson["first_generation_callback"]; } else { // For backward compatibility, default to true for loaded jobs job.firstGenerationCallback = true; } // Clean up stale processing jobs from server restart if (job.status == GenerationStatus::PROCESSING) { job.status = GenerationStatus::FAILED; job.errorMessage = "Server restarted while job was processing"; job.endTime = std::chrono::system_clock::now(); LOG_DEBUG("Marked stale job as failed: " + job.id); // Persist updated status to disk saveJobToFile(job); } // Add to active jobs std::lock_guard lock(jobsMutex); activeJobs[job.id] = job; loadedCount++; } catch (const std::exception& e) { LOG_ERROR("Error loading job from " + entry.path().string() + ": " + std::string(e.what())); } } if (loadedCount > 0) { LOG_INFO("Loaded " + std::to_string(loadedCount) + " persisted job(s)"); } } catch (const std::exception& e) { LOG_ERROR("Error loading jobs from disk: " + std::string(e.what())); } } HashResult performHashJob(const HashRequest& request) { HashResult result; result.requestId = request.id; result.success = false; result.modelsHashed = 0; auto startTime = std::chrono::system_clock::now(); if (!modelManager) { result.errorMessage = "Model manager not available"; result.status = GenerationStatus::FAILED; return result; } // Get list of models to hash std::vector modelsToHash; if (request.modelNames.empty()) { // Hash all models without hashes auto allModels = modelManager->getAllModels(); for (const auto& [name, info] : allModels) { if (info.sha256.empty() || request.forceRehash) { modelsToHash.push_back(name); } } } else { modelsToHash = request.modelNames; } LOG_DEBUG("Hashing " + std::to_string(modelsToHash.size()) + " model(s)..."); // Hash each model for (const auto& modelName : modelsToHash) { std::string hash = modelManager->ensureModelHash(modelName, request.forceRehash); if (!hash.empty()) { result.modelHashes[modelName] = hash; result.modelsHashed++; } else { LOG_ERROR("Failed to hash model: " + modelName); } } auto endTime = std::chrono::system_clock::now(); result.hashingTime = std::chrono::duration_cast( endTime - startTime).count(); result.success = result.modelsHashed > 0; result.status = result.success ? GenerationStatus::COMPLETED : GenerationStatus::FAILED; if (!result.success) { result.errorMessage = "Failed to hash any models"; } return result; } ConversionResult performConversionJob(const ConversionRequest& request) { ConversionResult result; result.requestId = request.id; result.success = false; auto startTime = std::chrono::system_clock::now(); // Conversion start output removed from stdout // Check if input file exists namespace fs = std::filesystem; if (!fs::exists(request.modelPath)) { result.errorMessage = "Input model file not found: " + request.modelPath; result.status = GenerationStatus::FAILED; return result; } // Get original file size try { auto originalSize = fs::file_size(request.modelPath); result.originalSize = formatFileSize(originalSize); } catch (const std::exception& e) { result.originalSize = "Unknown"; } // Build conversion command // Get the sd binary path from the CMake installation directory std::string sdBinaryPath = "../build/stable-diffusion.cpp-install/bin/sd"; std::stringstream cmd; cmd << sdBinaryPath << " --mode convert"; cmd << " -m \"" << request.modelPath << "\""; cmd << " -o \"" << request.outputPath << "\""; cmd << " --type " << request.quantizationType; cmd << " 2>&1"; // Capture stderr // Command execution output removed from stdout // Execute conversion FILE* pipe = popen(cmd.str().c_str(), "r"); if (!pipe) { result.errorMessage = "Failed to execute conversion command"; result.status = GenerationStatus::FAILED; return result; } // Read command output char buffer[256]; std::string output; while (fgets(buffer, sizeof(buffer), pipe) != nullptr) { output += buffer; // Progress output removed from stdout } int exitCode = pclose(pipe); auto endTime = std::chrono::system_clock::now(); result.conversionTime = std::chrono::duration_cast( endTime - startTime).count(); if (exitCode != 0) { result.errorMessage = "Conversion failed with exit code " + std::to_string(exitCode); if (!output.empty()) { result.errorMessage += "\nOutput: " + output; } result.status = GenerationStatus::FAILED; return result; } // Check if output file was created if (!fs::exists(request.outputPath)) { result.errorMessage = "Output file was not created: " + request.outputPath; result.status = GenerationStatus::FAILED; return result; } // Get converted file size try { auto convertedSize = fs::file_size(request.outputPath); result.convertedSize = formatFileSize(convertedSize); } catch (const std::exception& e) { result.convertedSize = "Unknown"; } result.success = true; result.status = GenerationStatus::COMPLETED; result.outputPath = request.outputPath; LOG_DEBUG("Conversion completed successfully!"); LOG_DEBUG(" Original size: " + result.originalSize); LOG_DEBUG(" Converted size: " + result.convertedSize); LOG_DEBUG(" Time: " + std::to_string(result.conversionTime) + "ms"); // Trigger model rescan after successful conversion if (modelManager) { LOG_DEBUG("Triggering model rescan..."); modelManager->scanModelsDirectory(); } return result; } std::string formatFileSize(size_t bytes) { const char* units[] = {"B", "KB", "MB", "GB", "TB"}; int unitIndex = 0; double size = static_cast(bytes); while (size >= 1024.0 && unitIndex < 4) { size /= 1024.0; unitIndex++; } std::stringstream ss; ss << std::fixed << std::setprecision(2) << size << " " << units[unitIndex]; return ss.str(); } // Helper function to populate JobInfo from GenerationRequest void populateJobInfoFromRequest(JobInfo& jobInfo, const GenerationRequest& request) { // Model information jobInfo.modelName = request.modelName; jobInfo.modelHash = modelManager ? modelManager->getModelInfo(request.modelName).sha256 : ""; jobInfo.modelPath = modelManager ? modelManager->getModelInfo(request.modelName).path : ""; // Generation parameters jobInfo.negativePrompt = request.negativePrompt; jobInfo.width = request.width; jobInfo.height = request.height; jobInfo.batchCount = request.batchCount; jobInfo.steps = request.steps; jobInfo.cfgScale = request.cfgScale; jobInfo.samplingMethod = request.samplingMethod; jobInfo.scheduler = request.scheduler; jobInfo.seed = request.seed; jobInfo.strength = request.strength; jobInfo.controlStrength = request.controlStrength; jobInfo.clipSkip = request.clipSkip; jobInfo.nThreads = request.nThreads; jobInfo.offloadParamsToCpu = request.offloadParamsToCpu; jobInfo.clipOnCpu = request.clipOnCpu; jobInfo.vaeOnCpu = request.vaeOnCpu; jobInfo.diffusionFlashAttn = request.diffusionFlashAttn; jobInfo.diffusionConvDirect = request.diffusionConvDirect; jobInfo.vaeConvDirect = request.vaeConvDirect; // Request type - store image paths instead of image data switch (request.requestType) { case GenerationRequest::RequestType::TEXT2IMG: jobInfo.requestType = "text2img"; break; case GenerationRequest::RequestType::IMG2IMG: jobInfo.requestType = "img2img"; // Store path to init image instead of the image data jobInfo.initImageData = request.initImagePath; // Store path, not base64 break; case GenerationRequest::RequestType::CONTROLNET: jobInfo.requestType = "controlnet"; // Store path to control image instead of the image data jobInfo.controlImageData = request.controlImagePath; // Store path, not base64 break; case GenerationRequest::RequestType::UPSCALER: jobInfo.requestType = "upscaler"; // Store path to input image instead of the image data jobInfo.initImageData = request.initImagePath; // Store path, not base64 break; case GenerationRequest::RequestType::INPAINTING: jobInfo.requestType = "inpainting"; // Store paths to images instead of the image data jobInfo.initImageData = request.initImagePath; // Store path, not base64 jobInfo.maskImageData = request.maskImagePath; // Store path, not base64 break; } // Model paths jobInfo.clipLPath = request.clipLPath; jobInfo.clipGPath = request.clipGPath; jobInfo.vaePath = request.vaePath; jobInfo.taesdPath = request.taesdPath; jobInfo.controlNetPath = request.controlNetPath; jobInfo.embeddingDir = request.embeddingDir; jobInfo.loraModelDir = request.loraModelDir; jobInfo.esrganPath = request.esrganPath; jobInfo.upscaleFactor = request.upscaleFactor; } }; GenerationQueue::GenerationQueue(ModelManager* modelManager, int maxConcurrentGenerations, const std::string& queueDir, const std::string& outputDir) : pImpl(std::make_unique()) { pImpl->modelManager = modelManager; pImpl->maxConcurrentGenerations = maxConcurrentGenerations; pImpl->queueDir = queueDir; pImpl->outputDir = outputDir; LOG_INFO("GenerationQueue initialized"); LOG_INFO(" Max concurrent generations: " + std::to_string(maxConcurrentGenerations)); LOG_INFO(" Queue directory: " + queueDir); LOG_INFO(" Output directory: " + outputDir); // Load any existing jobs from disk pImpl->loadJobsFromDisk(); } GenerationQueue::~GenerationQueue() { stop(); } std::future GenerationQueue::enqueueRequest(const GenerationRequest& request) { LOG_DEBUG("Enqueuing generation request: " + request.id); LOG_DEBUG(" Prompt: " + request.prompt.substr(0, 100) + (request.prompt.length() > 100 ? "..." : "")); LOG_DEBUG(" Model: " + request.modelName); LOG_DEBUG(" Size: " + std::to_string(request.width) + "x" + std::to_string(request.height)); LOG_DEBUG(" Steps: " + std::to_string(request.steps) + ", CFG: " + std::to_string(request.cfgScale)); // Create promise and future auto promise = std::make_shared>(); auto future = promise->get_future(); // Store the promise { std::lock_guard lock(pImpl->jobsMutex); pImpl->jobPromises[request.id] = std::move(*promise); } // Add to queue { std::lock_guard lock(pImpl->queueMutex); // Create job info with enhanced data JobInfo jobInfo; jobInfo.id = request.id; jobInfo.type = JobType::GENERATION; jobInfo.status = GenerationStatus::QUEUED; jobInfo.prompt = request.prompt; // Store full prompt jobInfo.queuedTime = std::chrono::system_clock::now(); jobInfo.position = pImpl->requestQueue.size() + 1; // Populate all enhanced fields from the request pImpl->populateJobInfoFromRequest(jobInfo, request); // Store job info { std::lock_guard jobsLock(pImpl->jobsMutex); pImpl->activeJobs[request.id] = jobInfo; } // Persist to disk pImpl->saveJobToFile(jobInfo); pImpl->requestQueue.push(request); pImpl->queueSize.store(pImpl->requestQueue.size()); } // Notify worker thread pImpl->queueCondition.notify_one(); return future; } std::future GenerationQueue::enqueueHashRequest(const HashRequest& request) { auto promise = std::make_shared>(); auto future = promise->get_future(); std::unique_lock lock(pImpl->queueMutex); // Create a generation request that acts as a placeholder for hash job GenerationRequest hashJobPlaceholder; hashJobPlaceholder.id = request.id; hashJobPlaceholder.prompt = "HASH_JOB"; // Special marker hashJobPlaceholder.modelName = request.modelNames.empty() ? "ALL_MODELS" : request.modelNames[0]; // Store promise for retrieval later pImpl->hashPromises[request.id] = promise; pImpl->hashRequests[request.id] = request; pImpl->requestQueue.push(hashJobPlaceholder); pImpl->queueCondition.notify_one(); LOG_DEBUG("Enqueued hash request: " + request.id); return future; } std::future GenerationQueue::enqueueConversionRequest(const ConversionRequest& request) { auto promise = std::make_shared>(); auto future = promise->get_future(); std::unique_lock lock(pImpl->queueMutex); // Create a generation request that acts as a placeholder for conversion job GenerationRequest conversionJobPlaceholder; conversionJobPlaceholder.id = request.id; conversionJobPlaceholder.prompt = "CONVERSION_JOB"; // Special marker conversionJobPlaceholder.modelName = request.modelName; // Store promise for retrieval later pImpl->conversionPromises[request.id] = promise; pImpl->conversionRequests[request.id] = request; pImpl->requestQueue.push(conversionJobPlaceholder); pImpl->queueCondition.notify_one(); LOG_DEBUG("Enqueued conversion request: " + request.id + " (model: " + request.modelName + ", type: " + request.quantizationType + ")"); return future; } size_t GenerationQueue::getQueueSize() const { return pImpl->queueSize.load(); } size_t GenerationQueue::getActiveGenerations() const { return pImpl->activeGenerations.load(); } std::vector GenerationQueue::getQueueStatus() const { std::vector jobs; std::lock_guard lock(pImpl->jobsMutex); jobs.reserve(pImpl->activeJobs.size()); for (const auto& pair : pImpl->activeJobs) { jobs.push_back(pair.second); } // Sort by queued time, then by status std::sort(jobs.begin(), jobs.end(), [](const JobInfo& a, const JobInfo& b) { if (a.status != b.status) { return static_cast(a.status) < static_cast(b.status); } return a.queuedTime < b.queuedTime; }); return jobs; } JobInfo GenerationQueue::getJobInfo(const std::string& jobId) const { std::lock_guard lock(pImpl->jobsMutex); auto it = pImpl->activeJobs.find(jobId); if (it != pImpl->activeJobs.end()) { return it->second; } return JobInfo{}; // Return empty job info if not found } bool GenerationQueue::cancelJob(const std::string& jobId) { std::lock_guard queueLock(pImpl->queueMutex); std::lock_guard jobsLock(pImpl->jobsMutex); // Check if job is still queued std::queue newQueue; bool found = false; while (!pImpl->requestQueue.empty()) { GenerationRequest request = pImpl->requestQueue.front(); pImpl->requestQueue.pop(); if (request.id == jobId) { found = true; // Update job status auto it = pImpl->activeJobs.find(jobId); if (it != pImpl->activeJobs.end()) { it->second.status = GenerationStatus::FAILED; it->second.endTime = std::chrono::system_clock::now(); } // Set promise with cancellation error auto promiseIt = pImpl->jobPromises.find(jobId); if (promiseIt != pImpl->jobPromises.end()) { GenerationResult result; result.requestId = jobId; result.success = false; result.errorMessage = "Job cancelled by user"; result.generationTime = 0; promiseIt->second.set_value(result); pImpl->jobPromises.erase(promiseIt); } } else { newQueue.push(request); } } pImpl->requestQueue = newQueue; pImpl->queueSize.store(pImpl->requestQueue.size()); return found; } void GenerationQueue::clearQueue() { LOG_DEBUG("Clearing generation queue"); std::lock_guard queueLock(pImpl->queueMutex); std::lock_guard jobsLock(pImpl->jobsMutex); // Cancel all queued jobs while (!pImpl->requestQueue.empty()) { GenerationRequest request = pImpl->requestQueue.front(); pImpl->requestQueue.pop(); // Update job status auto it = pImpl->activeJobs.find(request.id); if (it != pImpl->activeJobs.end()) { it->second.status = GenerationStatus::FAILED; it->second.endTime = std::chrono::system_clock::now(); } // Set promise with cancellation error auto promiseIt = pImpl->jobPromises.find(request.id); if (promiseIt != pImpl->jobPromises.end()) { GenerationResult result; result.requestId = request.id; result.success = false; result.errorMessage = "Queue cleared"; result.generationTime = 0; promiseIt->second.set_value(result); pImpl->jobPromises.erase(promiseIt); } } pImpl->queueSize.store(0); } void GenerationQueue::start() { if (pImpl->running.load()) { LOG_DEBUG("GenerationQueue is already running"); return; } pImpl->running.store(true); pImpl->stopRequested.store(false); pImpl->workerThread = std::thread(&Impl::workerThreadFunction, pImpl.get()); LOG_DEBUG("GenerationQueue started"); } void GenerationQueue::stop() { if (!pImpl->running.load()) { return; } LOG_DEBUG("Stopping GenerationQueue..."); pImpl->stopRequested.store(true); pImpl->queueCondition.notify_all(); if (pImpl->workerThread.joinable()) { pImpl->workerThread.join(); } pImpl->running.store(false); // Clear any remaining promises std::lock_guard lock(pImpl->jobsMutex); for (auto& pair : pImpl->jobPromises) { GenerationResult result; result.requestId = pair.first; result.success = false; result.errorMessage = "Queue stopped"; result.generationTime = 0; pair.second.set_value(result); } pImpl->jobPromises.clear(); LOG_DEBUG("GenerationQueue stopped"); } bool GenerationQueue::isRunning() const { return pImpl->running.load(); } void GenerationQueue::setMaxConcurrentGenerations(int maxConcurrent) { pImpl->maxConcurrentGenerations = maxConcurrent; LOG_DEBUG("GenerationQueue max concurrent generations set to: " + std::to_string(maxConcurrent)); }