| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633 |
- #include "generation_queue.h"
- #include "model_manager.h"
- #include "stable_diffusion_wrapper.h"
- #include "utils.h"
- #include "logger.h"
- #include <iostream>
- #include <random>
- #include <sstream>
- #include <iomanip>
- #include <algorithm>
- #include <fstream>
- #include <filesystem>
- #include <nlohmann/json.hpp>
- #include <vector>
- #define STB_IMAGE_WRITE_IMPLEMENTATION
- #include "../stable-diffusion.cpp-src/thirdparty/stb_image_write.h"
- #define STB_IMAGE_IMPLEMENTATION
- #include "../stable-diffusion.cpp-src/thirdparty/stb_image.h"
- class GenerationQueue::Impl {
- public:
- // Model manager reference
- ModelManager* modelManager = nullptr;
- // Thread management
- std::thread workerThread;
- std::atomic<bool> running{false};
- std::atomic<bool> stopRequested{false};
- // Queue management
- mutable std::mutex queueMutex;
- std::condition_variable queueCondition;
- std::queue<GenerationRequest> requestQueue;
- // Job tracking
- mutable std::mutex jobsMutex;
- std::unordered_map<std::string, JobInfo> activeJobs;
- std::unordered_map<std::string, std::promise<GenerationResult>> jobPromises;
- // Hash job tracking
- std::map<std::string, std::shared_ptr<std::promise<HashResult>>> hashPromises;
- std::map<std::string, HashRequest> hashRequests;
- // Conversion job tracking
- std::map<std::string, std::shared_ptr<std::promise<ConversionResult>>> conversionPromises;
- std::map<std::string, ConversionRequest> conversionRequests;
- // Configuration
- int maxConcurrentGenerations = 1;
- std::string queueDir = "./queue";
- std::string outputDir = "./output";
- // Statistics
- std::atomic<size_t> queueSize{0};
- std::atomic<size_t> activeGenerations{0};
- std::atomic<uint64_t> totalJobsProcessed{0};
- // Worker thread function
- void workerThreadFunction() {
- LOG_INFO("GenerationQueue worker thread started");
- while (running.load() && !stopRequested.load()) {
- std::unique_lock<std::mutex> lock(queueMutex);
- // Wait for a request or stop signal
- queueCondition.wait(lock, [this] {
- return !requestQueue.empty() || stopRequested.load();
- });
- if (stopRequested.load()) {
- break;
- }
- if (requestQueue.empty()) {
- continue;
- }
- // Get the next request
- GenerationRequest request = requestQueue.front();
- requestQueue.pop();
- queueSize.store(requestQueue.size());
- lock.unlock();
- // Process the request
- processRequest(request);
- }
- LOG_INFO("GenerationQueue worker thread stopped");
- }
- void processRequest(const GenerationRequest& request) {
- // Check if this is a hash job
- if (request.prompt == "HASH_JOB") {
- auto hashIt = hashRequests.find(request.id);
- if (hashIt != hashRequests.end()) {
- HashResult result = performHashJob(hashIt->second);
- auto promiseIt = hashPromises.find(request.id);
- if (promiseIt != hashPromises.end()) {
- promiseIt->second->set_value(result);
- hashPromises.erase(promiseIt);
- }
- hashRequests.erase(hashIt);
- }
- return;
- }
- // Check if this is a conversion job
- if (request.prompt == "CONVERSION_JOB") {
- auto convIt = conversionRequests.find(request.id);
- if (convIt != conversionRequests.end()) {
- ConversionResult result = performConversionJob(convIt->second);
- auto promiseIt = conversionPromises.find(request.id);
- if (promiseIt != conversionPromises.end()) {
- promiseIt->second->set_value(result);
- conversionPromises.erase(promiseIt);
- }
- conversionRequests.erase(convIt);
- }
- return;
- }
- auto startTime = std::chrono::system_clock::now();
- // Update job status to PROCESSING (only if not already in PROCESSING from model loading)
- {
- std::lock_guard<std::mutex> lock(jobsMutex);
- if (activeJobs.find(request.id) != activeJobs.end()) {
- // Only change status if it was QUEUED (not MODEL_LOADING)
- if (activeJobs[request.id].status == GenerationStatus::QUEUED) {
- activeJobs[request.id].status = GenerationStatus::PROCESSING;
- activeJobs[request.id].progress = 0.0f;
- activeJobs[request.id].modelLoadProgress = 0.0f;
- activeJobs[request.id].generationProgress = 0.0f;
- activeJobs[request.id].firstGenerationCallback = true; // Initialize first callback flag
- } else if (activeJobs[request.id].status == GenerationStatus::MODEL_LOADING) {
- // Transition from MODEL_LOADING to PROCESSING
- activeJobs[request.id].status = GenerationStatus::PROCESSING;
- // Model loading should already be complete at this point
- activeJobs[request.id].modelLoadProgress = 1.0f;
- activeJobs[request.id].firstGenerationCallback = true; // Initialize first callback flag
- }
- activeJobs[request.id].startTime = startTime;
- activeJobs[request.id].currentStep = 0;
- activeJobs[request.id].totalSteps = 0;
- if (activeJobs[request.id].timeElapsed == 0) {
- activeJobs[request.id].timeElapsed = 0;
- }
- if (activeJobs[request.id].timeRemaining == 0) {
- activeJobs[request.id].timeRemaining = 0;
- }
- if (activeJobs[request.id].speed == 0.0f) {
- activeJobs[request.id].speed = 0.0f;
- }
- saveJobToFile(activeJobs[request.id]);
- }
- }
- activeGenerations.store(1); // Only one generation at a time
- LOG_INFO("Processing generation request: " + request.id +
- " (prompt: " + request.prompt.substr(0, 50) +
- (request.prompt.length() > 50 ? "..." : "") + ")");
- // Real generation logic using stable-diffusion.cpp with progress tracking
- GenerationResult result = performActualGeneration(request, request.id);
- auto endTime = std::chrono::system_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime);
- result.generationTime = duration.count();
- // Update job status to COMPLETED/FAILED
- {
- std::lock_guard<std::mutex> lock(jobsMutex);
- if (activeJobs.find(request.id) != activeJobs.end()) {
- float finalProgress = activeJobs[request.id].progress;
- auto completionTime = std::chrono::system_clock::now();
- auto totalGenerationTime = std::chrono::duration_cast<std::chrono::milliseconds>(completionTime - startTime).count();
- LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id +
- " - Total generation time: " + std::to_string(totalGenerationTime) + "ms");
- LOG_DEBUG("[JOB_COMPLETION] Job " + request.id +
- " - Status changing to '" + (result.success ? "completed" : "failed") + "'" +
- " - Progress at completion: " + std::to_string(static_cast<int>(finalProgress * 100.0f)) + "%");
- activeJobs[request.id].status = result.success ? GenerationStatus::COMPLETED : GenerationStatus::FAILED;
- activeJobs[request.id].endTime = endTime;
- // Set final progress to 100% if successful
- if (result.success) {
- activeJobs[request.id].progress = 1.0f;
- if (activeJobs[request.id].totalSteps > 0) {
- activeJobs[request.id].currentStep = activeJobs[request.id].totalSteps;
- }
- }
- // Store output files, error message, and generation results
- activeJobs[request.id].outputFiles = result.imagePaths;
- activeJobs[request.id].errorMessage = result.errorMessage;
- activeJobs[request.id].actualSeed = result.actualSeed;
- activeJobs[request.id].generationTime = result.generationTime;
- {
- std::ostringstream oss;
- oss << "Job " << request.id << " completed. Success: " << (result.success ? "true" : "false")
- << ", Image paths: " << result.imagePaths.size();
- LOG_DEBUG(oss.str());
- }
- for (size_t i = 0; i < result.imagePaths.size(); ++i) {
- {
- std::ostringstream oss2;
- oss2 << "Image " << i << ": " << result.imagePaths[i];
- LOG_DEBUG(oss2.str());
- }
- }
- if (!result.errorMessage.empty()) {
- std::ostringstream err_oss;
- err_oss << "Error message: " << result.errorMessage;
- LOG_DEBUG(err_oss.str());
- }
- // Persist to disk
- saveJobToFile(activeJobs[request.id]);
- }
- // Set the promise value
- auto it = jobPromises.find(request.id);
- if (it != jobPromises.end()) {
- it->second.set_value(result);
- jobPromises.erase(it);
- }
- }
- activeGenerations.store(0);
- totalJobsProcessed.fetch_add(1);
- std::string completionMsg = "Completed generation request: " + request.id +
- " (success: " + (result.success ? "true" : "false") +
- ", time: " + std::to_string(result.generationTime) + "ms)";
- if (!result.success && !result.errorMessage.empty()) {
- completionMsg += " - Error: " + result.errorMessage;
- }
- LOG_INFO(completionMsg);
- }
- // Progress callback that updates model loading progress
- void updateModelLoadProgress(const std::string& jobId, float modelProgress, uint64_t timeElapsed) {
- std::lock_guard<std::mutex> lock(jobsMutex);
- auto it = activeJobs.find(jobId);
- if (it != activeJobs.end()) {
- // Clamp model loading progress to valid range [0.0, 1.0]
- modelProgress = std::max(0.0f, std::min(1.0f, modelProgress));
- it->second.modelLoadProgress = modelProgress;
- it->second.generationProgress = 0.0f;
- // Overall progress during model loading is just the model loading progress
- it->second.progress = modelProgress;
- it->second.timeElapsed = static_cast<int64_t>(timeElapsed);
- // Update status message to reflect model loading
- if (!it->second.prompt.empty()) {
- size_t bracketPos = it->second.prompt.find(" [Loading model:");
- if (bracketPos == std::string::npos) {
- it->second.prompt = it->second.prompt + " [Loading model: " + std::to_string(static_cast<int>(modelProgress * 100)) + "%]";
- } else {
- // Update existing loading message
- it->second.prompt = it->second.prompt.substr(0, bracketPos) + " [Loading model: " + std::to_string(static_cast<int>(modelProgress * 100)) + "%]";
- }
- }
- saveJobToFile(it->second);
- }
- }
- // Progress callback that updates generation progress
- void updateGenerationProgress(const std::string& jobId, int step, int totalSteps, float genProgress, uint64_t timeElapsed) {
- std::lock_guard<std::mutex> lock(jobsMutex);
- auto it = activeJobs.find(jobId);
- if (it != activeJobs.end()) {
- // Clamp generation progress to valid range [0.0, 1.0]
- genProgress = std::max(0.0f, std::min(1.0f, genProgress));
- it->second.generationProgress = genProgress;
- it->second.currentStep = step;
- it->second.totalSteps = totalSteps;
- it->second.timeElapsed = static_cast<int64_t>(timeElapsed);
- // Handle first generation callback specially
- if (it->second.firstGenerationCallback) {
- // This is the first callback, ignore the incoming genProgress value
- // and set overall progress to exactly 50% (model loading complete)
- it->second.generationProgress = 0.0f;
- it->second.progress = 0.5f;
- it->second.firstGenerationCallback = false; // Mark that we've handled the first callback
- LOG_DEBUG("First generation callback for job " + jobId +
- " - Ignored genProgress (" + std::to_string(genProgress) +
- "), reset generation progress to 0.0 and overall progress to 50%");
- } else {
- // For subsequent callbacks, calculate overall progress normally: 50% for model loading + 50% for generation
- it->second.progress = 0.5f + (genProgress * 0.5f);
- }
- // Clamp overall progress to valid range [0.0, 1.0]
- it->second.progress = std::max(0.0f, std::min(1.0f, it->second.progress));
- // Calculate time remaining and speed
- if (step > 0 && timeElapsed > 0) {
- double avgStepTime = static_cast<double>(timeElapsed) / step;
- int remainingSteps = totalSteps - step;
- it->second.timeRemaining = static_cast<int64_t>(avgStepTime * remainingSteps);
- it->second.speed = 1000.0 / avgStepTime; // steps per second
- }
- // Clean up loading message from prompt
- if (!it->second.prompt.empty()) {
- size_t bracketPos = it->second.prompt.find(" [Loading model:");
- if (bracketPos != std::string::npos) {
- it->second.prompt = it->second.prompt.substr(0, bracketPos);
- }
- }
- // Save progress to file periodically (every 10 steps or on significant progress changes)
- if (step % 10 == 0 || genProgress >= 0.99f) {
- saveJobToFile(it->second);
- }
- }
- }
- GenerationResult performActualGeneration(const GenerationRequest& request, const std::string& jobId) {
- GenerationResult result;
- result.requestId = request.id;
- result.success = false;
- // Check if model manager is available
- if (!modelManager) {
- result.errorMessage = "Model manager not available";
- return result;
- }
- if (!modelManager->isModelLoaded(request.modelName)) {
- // Update status to show model is being loaded
- {
- std::lock_guard<std::mutex> lock(jobsMutex);
- if (activeJobs.find(request.id) != activeJobs.end()) {
- activeJobs[request.id].status = GenerationStatus::MODEL_LOADING;
- activeJobs[request.id].progress = 0.0f; // Start at 0% for model loading
- saveJobToFile(activeJobs[request.id]);
- }
- }
- // Auto-load the model with progress tracking
- auto modelLoadStartTime = std::chrono::system_clock::now();
- // Create a progress callback for model loading
- auto modelLoadProgressCallback = [this, jobId = request.id, modelLoadStartTime](float loadProgress) {
- auto currentTime = std::chrono::system_clock::now();
- uint64_t timeElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - modelLoadStartTime).count();
- updateModelLoadProgress(jobId, loadProgress, timeElapsed);
- };
- bool loadSuccess = modelManager->loadModel(request.modelName, modelLoadProgressCallback);
- if (!loadSuccess || !modelManager->isModelLoaded(request.modelName)) {
- result.errorMessage = "Failed to load model: " + request.modelName;
- // Update job status to failed
- {
- std::lock_guard<std::mutex> lock(jobsMutex);
- if (activeJobs.find(request.id) != activeJobs.end()) {
- activeJobs[request.id].status = GenerationStatus::FAILED;
- activeJobs[request.id].errorMessage = result.errorMessage;
- saveJobToFile(activeJobs[request.id]);
- }
- }
- return result;
- }
- // Reset status after successful model loading
- {
- std::lock_guard<std::mutex> lock(jobsMutex);
- if (activeJobs.find(request.id) != activeJobs.end()) {
- // Remove the loading progress suffix from prompt
- size_t bracketPos = activeJobs[request.id].prompt.find(" [Loading model:");
- if (bracketPos != std::string::npos) {
- activeJobs[request.id].prompt = activeJobs[request.id].prompt.substr(0, bracketPos);
- }
- // Set model loading to complete, overall progress to exactly 50% (halfway through)
- activeJobs[request.id].modelLoadProgress = 1.0f;
- activeJobs[request.id].generationProgress = 0.0f;
- activeJobs[request.id].progress = 0.5f;
- activeJobs[request.id].firstGenerationCallback = true; // Initialize first callback flag
- saveJobToFile(activeJobs[request.id]);
- }
- }
- }
- // Get the model wrapper from the shared model manager
- auto* modelWrapper = modelManager->getModel(request.modelName);
- if (!modelWrapper) {
- result.errorMessage = "Model not found or not loaded: " + request.modelName;
- return result;
- }
- // Prepare generation parameters
- StableDiffusionWrapper::GenerationParams params;
- params.prompt = request.prompt;
- params.negativePrompt = request.negativePrompt;
- params.width = request.width;
- params.height = request.height;
- params.batchCount = request.batchCount;
- params.steps = request.steps;
- params.cfgScale = request.cfgScale;
- params.samplingMethod = samplingMethodToString(request.samplingMethod);
- params.scheduler = schedulerToString(request.scheduler);
- params.clipSkip = request.clipSkip;
- params.strength = request.strength;
- params.controlStrength = request.controlStrength;
- params.nThreads = request.nThreads;
- params.offloadParamsToCpu = request.offloadParamsToCpu;
- params.clipOnCpu = request.clipOnCpu;
- params.vaeOnCpu = request.vaeOnCpu;
- params.diffusionFlashAttn = request.diffusionFlashAttn;
- params.diffusionConvDirect = request.diffusionConvDirect;
- params.vaeConvDirect = request.vaeConvDirect;
- // Set model paths if provided
- params.modelPath = modelManager->getModelInfo(request.modelName).path;
- params.clipLPath = request.clipLPath;
- params.clipGPath = request.clipGPath;
- params.vaePath = request.vaePath;
- params.taesdPath = request.taesdPath;
- params.controlNetPath = request.controlNetPath;
- params.embeddingDir = request.embeddingDir;
- params.loraModelDir = request.loraModelDir;
- // Parse seed
- if (request.seed == "random") {
- std::random_device rd;
- std::mt19937 gen(rd());
- std::uniform_int_distribution<int64_t> dis;
- params.seed = dis(gen);
- } else {
- try {
- params.seed = std::stoll(request.seed);
- } catch (...) {
- params.seed = 42; // Default seed
- }
- }
- result.actualSeed = params.seed;
- // Generate images based on request type with progress tracking
- try {
- std::vector<StableDiffusionWrapper::GeneratedImage> generatedImages;
- // Create progress callback that updates job info
- auto progressCallback = [this, jobId](int step, int totalSteps, float progress, void* userData) {
- // Calculate time elapsed from start time (stored in userData)
- auto startTime = userData ? *static_cast<std::chrono::system_clock::time_point*>(userData) : std::chrono::system_clock::now();
- auto currentTime = std::chrono::system_clock::now();
- uint64_t timeElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - startTime).count();
- // Use the new generation progress update function
- updateGenerationProgress(jobId, step, totalSteps, progress, timeElapsed);
- };
- // Store start time to pass as user data
- auto generationStartTime = std::chrono::system_clock::now();
- switch (request.requestType) {
- case GenerationRequest::RequestType::TEXT2IMG:
- generatedImages = modelWrapper->generateImage(params, progressCallback, &generationStartTime);
- break;
- case GenerationRequest::RequestType::IMG2IMG:
- if (request.initImageData.empty()) {
- result.errorMessage = "No init image data provided for img2img";
- return result;
- }
- generatedImages = modelWrapper->generateImageImg2Img(
- params,
- request.initImageData,
- request.initImageWidth,
- request.initImageHeight,
- progressCallback,
- &generationStartTime
- );
- break;
- case GenerationRequest::RequestType::CONTROLNET:
- if (request.controlImageData.empty()) {
- result.errorMessage = "No control image data provided for ControlNet";
- return result;
- }
- generatedImages = modelWrapper->generateImageControlNet(
- params,
- request.controlImageData,
- request.controlImageWidth,
- request.controlImageHeight,
- progressCallback,
- &generationStartTime
- );
- break;
- case GenerationRequest::RequestType::UPSCALER:
- if (request.initImageData.empty()) {
- result.errorMessage = "No input image data provided for upscaling";
- return result;
- }
- if (request.esrganPath.empty()) {
- result.errorMessage = "No ESRGAN model path provided for upscaling";
- return result;
- }
- {
- auto upscaledImage = modelWrapper->upscaleImage(
- request.esrganPath,
- request.initImageData,
- request.initImageWidth,
- request.initImageHeight,
- request.initImageChannels,
- request.upscaleFactor,
- request.nThreads,
- request.offloadParamsToCpu,
- request.diffusionConvDirect
- );
- generatedImages.push_back(upscaledImage);
- }
- break;
- case GenerationRequest::RequestType::INPAINTING:
- if (request.initImageData.empty()) {
- result.errorMessage = "No source image data provided for inpainting";
- return result;
- }
- if (request.maskImageData.empty()) {
- result.errorMessage = "No mask image data provided for inpainting";
- return result;
- }
- generatedImages = modelWrapper->generateImageInpainting(
- params,
- request.initImageData,
- request.initImageWidth,
- request.initImageHeight,
- request.maskImageData,
- request.maskImageWidth,
- request.maskImageHeight,
- progressCallback,
- &generationStartTime
- );
- break;
- default:
- result.errorMessage = "Unknown request type";
- return result;
- }
- if (generatedImages.empty()) {
- result.errorMessage = "Failed to generate images: " + modelWrapper->getLastError();
- return result;
- }
- // Save generated images to files
- LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Starting post-processing (image saving) phase");
- auto postProcessingStart = std::chrono::system_clock::now();
- for (size_t i = 0; i < generatedImages.size(); i++) {
- const auto& image = generatedImages[i];
- LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Saving image " + std::to_string(i+1) + "/" + std::to_string(generatedImages.size()));
- auto imageSaveStart = std::chrono::system_clock::now();
- std::string imagePath = saveImageToFile(image, request.id, i);
- auto imageSaveEnd = std::chrono::system_clock::now();
- auto imageSaveTime = std::chrono::duration_cast<std::chrono::milliseconds>(imageSaveEnd - imageSaveStart).count();
- LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Image " + std::to_string(i+1) + " save took " + std::to_string(imageSaveTime) + "ms");
- if (!imagePath.empty()) {
- result.imagePaths.push_back(imagePath);
- } else {
- result.errorMessage = "Failed to save generated image " + std::to_string(i);
- return result;
- }
- }
- auto postProcessingEnd = std::chrono::system_clock::now();
- auto postProcessingTime = std::chrono::duration_cast<std::chrono::milliseconds>(postProcessingEnd - postProcessingStart).count();
- LOG_DEBUG("[TIMING_ANALYSIS] Job " + request.id + " - Post-processing completed in " + std::to_string(postProcessingTime) + "ms");
- result.success = true;
- result.generationTime = generatedImages.empty() ? 0 : generatedImages[0].generationTime;
- result.errorMessage = "";
- } catch (const std::exception& e) {
- result.errorMessage = "Exception during generation: " + std::string(e.what());
- }
- return result;
- }
- std::string saveImageToFile(const StableDiffusionWrapper::GeneratedImage& image, const std::string& requestId, size_t index) {
- // Create job-specific output directory
- std::string jobOutputDir = outputDir + "/" + requestId;
- std::error_code ec;
- std::filesystem::create_directories(jobOutputDir, ec);
- if (ec) {
- LOG_ERROR("Failed to create output directory " + jobOutputDir + ": " + ec.message());
- return "";
- }
- // Generate filename
- std::stringstream ss;
- ss << jobOutputDir << "/" << requestId << "_" << index << ".png";
- std::string filename = ss.str();
- LOG_DEBUG("Attempting to save image to: " + filename);
- // Check if image data is valid
- if (image.data.empty() || image.width <= 0 || image.height <= 0) {
- LOG_ERROR("Invalid image data for " + requestId + "_" + std::to_string(index) +
- ": width=" + std::to_string(image.width) +
- ", height=" + std::to_string(image.height) +
- ", channels=" + std::to_string(image.channels) +
- ", data_size=" + std::to_string(image.data.size()));
- return "";
- }
- // Validate image data integrity
- const size_t expectedDataSize = static_cast<size_t>(image.width) * image.height * image.channels;
- if (image.data.size() != expectedDataSize) {
- LOG_WARNING("Image data size mismatch for " + requestId + "_" + std::to_string(index) +
- ": expected=" + std::to_string(expectedDataSize) +
- ", actual=" + std::to_string(image.data.size()));
- // Continue anyway, but log the warning
- }
- // Check if we can write to the directory
- std::ofstream testFile(filename + ".test");
- if (!testFile.is_open()) {
- LOG_ERROR("Cannot write to directory " + jobOutputDir +
- ": permission denied or disk full");
- return "";
- }
- testFile.close();
- std::filesystem::remove(filename + ".test");
- // Write PNG file using stb_image_write with detailed error logging
- LOG_DEBUG("Writing PNG file: " + filename +
- " (size: " + std::to_string(image.width) + "x" + std::to_string(image.height) +
- "x" + std::to_string(image.channels) + ")");
- int result = stbi_write_png(
- filename.c_str(),
- image.width,
- image.height,
- image.channels,
- image.data.data(),
- image.width * image.channels // stride in bytes
- );
- if (result == 0) {
- LOG_ERROR("stbi_write_png failed for " + filename);
- // Try to get more detailed error information
- LOG_ERROR("Image details:");
- LOG_ERROR(" Dimensions: " + std::to_string(image.width) + "x" + std::to_string(image.height));
- LOG_ERROR(" Channels: " + std::to_string(image.channels));
- LOG_ERROR(" Data size: " + std::to_string(image.data.size()) + " bytes");
- LOG_ERROR(" Expected size: " + std::to_string(expectedDataSize) + " bytes");
- LOG_ERROR(" Stride: " + std::to_string(image.width * image.channels) + " bytes");
- // Check if file was created but is empty
- if (std::filesystem::exists(filename)) {
- auto fileSize = std::filesystem::file_size(filename);
- LOG_ERROR(" File exists but size is: " + std::to_string(fileSize) + " bytes");
- if (fileSize == 0) {
- LOG_ERROR(" ERROR: Zero-byte file created - stbi_write_png returned false but file exists");
- }
- } else {
- LOG_ERROR(" File was not created");
- }
- // Check disk space
- try {
- auto space = std::filesystem::space(jobOutputDir);
- LOG_ERROR(" Available disk space: " + std::to_string(space.available / (1024 * 1024)) + " MB");
- } catch (const std::exception& e) {
- LOG_ERROR(" Could not check disk space: " + std::string(e.what()));
- }
- return "";
- }
- // Verify the file was created successfully and has content
- if (!std::filesystem::exists(filename)) {
- LOG_ERROR("ERROR: stbi_write_png returned success but file does not exist: " + filename);
- return "";
- }
- auto fileSize = std::filesystem::file_size(filename);
- if (fileSize == 0) {
- LOG_ERROR("ERROR: stbi_write_png returned success but created zero-byte file: " + filename);
- return "";
- }
- LOG_DEBUG("Successfully saved generated image to: " + filename +
- " (" + std::to_string(image.width) + "x" + std::to_string(image.height) +
- ", " + std::to_string(image.channels) + " channels, "
- + std::to_string(image.data.size()) + " data bytes, "
- + std::to_string(fileSize) + " file bytes)");
- return filename;
- }
- std::string samplingMethodToString(SamplingMethod method) {
- switch (method) {
- case SamplingMethod::EULER: return "euler";
- case SamplingMethod::EULER_A: return "euler_a";
- case SamplingMethod::HEUN: return "heun";
- case SamplingMethod::DPM2: return "dpm2";
- case SamplingMethod::DPMPP2S_A: return "dpmpp2s_a";
- case SamplingMethod::DPMPP2M: return "dpmpp2m";
- case SamplingMethod::DPMPP2MV2: return "dpmpp2mv2";
- case SamplingMethod::IPNDM: return "ipndm";
- case SamplingMethod::IPNDM_V: return "ipndm_v";
- case SamplingMethod::LCM: return "lcm";
- case SamplingMethod::DDIM_TRAILING: return "ddim_trailing";
- case SamplingMethod::TCD: return "tcd";
- default: return "euler";
- }
- }
- std::string schedulerToString(Scheduler scheduler) {
- switch (scheduler) {
- case Scheduler::DISCRETE: return "discrete";
- case Scheduler::KARRAS: return "karras";
- case Scheduler::EXPONENTIAL: return "exponential";
- case Scheduler::AYS: return "ays";
- case Scheduler::GITS: return "gits";
- case Scheduler::SMOOTHSTEP: return "smoothstep";
- case Scheduler::SGM_UNIFORM: return "sgm_uniform";
- case Scheduler::SIMPLE: return "simple";
- default: return "default";
- }
- }
- std::string jobStatusToString(GenerationStatus status) {
- switch (status) {
- case GenerationStatus::QUEUED: return "queued";
- case GenerationStatus::MODEL_LOADING: return "loading";
- case GenerationStatus::PROCESSING: return "processing";
- case GenerationStatus::COMPLETED: return "completed";
- case GenerationStatus::FAILED: return "failed";
- default: return "unknown";
- }
- }
- GenerationStatus stringToJobStatus(const std::string& status) {
- if (status == "queued") return GenerationStatus::QUEUED;
- if (status == "loading") return GenerationStatus::MODEL_LOADING;
- if (status == "processing") return GenerationStatus::PROCESSING;
- if (status == "completed") return GenerationStatus::COMPLETED;
- if (status == "failed") return GenerationStatus::FAILED;
- return GenerationStatus::QUEUED;
- }
- std::string jobTypeToString(JobType type) {
- switch (type) {
- case JobType::GENERATION: return "generation";
- case JobType::HASHING: return "hashing";
- default: return "unknown";
- }
- }
- JobType stringToJobType(const std::string& type) {
- if (type == "generation") return JobType::GENERATION;
- if (type == "hashing") return JobType::HASHING;
- return JobType::GENERATION;
- }
- SamplingMethod stringToSamplingMethod(const std::string& method) {
- if (method == "euler") return SamplingMethod::EULER;
- if (method == "euler_a") return SamplingMethod::EULER_A;
- if (method == "heun") return SamplingMethod::HEUN;
- if (method == "dpm2") return SamplingMethod::DPM2;
- if (method == "dpmpp2s_a") return SamplingMethod::DPMPP2S_A;
- if (method == "dpmpp2m") return SamplingMethod::DPMPP2M;
- if (method == "dpmpp2mv2") return SamplingMethod::DPMPP2MV2;
- if (method == "ipndm") return SamplingMethod::IPNDM;
- if (method == "ipndm_v") return SamplingMethod::IPNDM_V;
- if (method == "lcm") return SamplingMethod::LCM;
- if (method == "ddim_trailing") return SamplingMethod::DDIM_TRAILING;
- if (method == "tcd") return SamplingMethod::TCD;
- return SamplingMethod::DEFAULT;
- }
- Scheduler stringToScheduler(const std::string& scheduler) {
- if (scheduler == "discrete") return Scheduler::DISCRETE;
- if (scheduler == "karras") return Scheduler::KARRAS;
- if (scheduler == "exponential") return Scheduler::EXPONENTIAL;
- if (scheduler == "ays") return Scheduler::AYS;
- if (scheduler == "gits") return Scheduler::GITS;
- if (scheduler == "smoothstep") return Scheduler::SMOOTHSTEP;
- if (scheduler == "sgm_uniform") return Scheduler::SGM_UNIFORM;
- if (scheduler == "simple") return Scheduler::SIMPLE;
- return Scheduler::DEFAULT;
- }
- void saveJobToFile(const JobInfo& job) {
- try {
- // Create queue directory if it doesn't exist
- std::filesystem::create_directories(queueDir);
- // Create JSON object with enhanced fields
- nlohmann::json jobJson;
- // Basic fields
- jobJson["id"] = job.id;
- jobJson["type"] = jobTypeToString(job.type);
- jobJson["status"] = jobStatusToString(job.status);
- jobJson["prompt"] = job.prompt;
- jobJson["position"] = job.position;
- // Convert time points to milliseconds since epoch
- auto queuedMs = std::chrono::duration_cast<std::chrono::milliseconds>(
- job.queuedTime.time_since_epoch()).count();
- jobJson["queued_time"] = queuedMs;
- if (job.status != GenerationStatus::QUEUED) {
- auto startMs = std::chrono::duration_cast<std::chrono::milliseconds>(
- job.startTime.time_since_epoch()).count();
- jobJson["start_time"] = startMs;
- }
- if (job.status == GenerationStatus::COMPLETED || job.status == GenerationStatus::FAILED) {
- auto endMs = std::chrono::duration_cast<std::chrono::milliseconds>(
- job.endTime.time_since_epoch()).count();
- jobJson["end_time"] = endMs;
- }
- // Enhanced fields for repeatable generation
- if (!job.modelName.empty()) {
- jobJson["model_name"] = job.modelName;
- jobJson["model_hash"] = job.modelHash;
- jobJson["model_path"] = job.modelPath;
- }
- if (!job.negativePrompt.empty()) {
- jobJson["negative_prompt"] = job.negativePrompt;
- }
- jobJson["width"] = job.width;
- jobJson["height"] = job.height;
- jobJson["batch_count"] = job.batchCount;
- jobJson["steps"] = job.steps;
- jobJson["cfg_scale"] = job.cfgScale;
- jobJson["sampling_method"] = samplingMethodToString(job.samplingMethod);
- jobJson["scheduler"] = schedulerToString(job.scheduler);
- jobJson["seed"] = job.seed;
- jobJson["actual_seed"] = job.actualSeed;
- jobJson["request_type"] = job.requestType;
- jobJson["strength"] = job.strength;
- jobJson["control_strength"] = job.controlStrength;
- jobJson["clip_skip"] = job.clipSkip;
- jobJson["n_threads"] = job.nThreads;
- jobJson["offload_params_to_cpu"] = job.offloadParamsToCpu;
- jobJson["clip_on_cpu"] = job.clipOnCpu;
- jobJson["vae_on_cpu"] = job.vaeOnCpu;
- jobJson["diffusion_flash_attn"] = job.diffusionFlashAttn;
- jobJson["diffusion_conv_direct"] = job.diffusionConvDirect;
- jobJson["vae_conv_direct"] = job.vaeConvDirect;
- jobJson["generation_time"] = job.generationTime;
- // Image paths for complex operations
- if (!job.initImageData.empty()) {
- jobJson["init_image_path"] = job.initImageData;
- }
- if (!job.controlImageData.empty()) {
- jobJson["control_image_path"] = job.controlImageData;
- }
- if (!job.maskImageData.empty()) {
- jobJson["mask_image_path"] = job.maskImageData;
- }
- // Model paths for advanced usage
- if (!job.clipLPath.empty()) {
- jobJson["clip_l_path"] = job.clipLPath;
- }
- if (!job.clipGPath.empty()) {
- jobJson["clip_g_path"] = job.clipGPath;
- }
- if (!job.vaePath.empty()) {
- jobJson["vae_path"] = job.vaePath;
- }
- if (!job.taesdPath.empty()) {
- jobJson["taesd_path"] = job.taesdPath;
- }
- if (!job.controlNetPath.empty()) {
- jobJson["controlnet_path"] = job.controlNetPath;
- }
- if (!job.embeddingDir.empty()) {
- jobJson["embedding_dir"] = job.embeddingDir;
- }
- if (!job.loraModelDir.empty()) {
- jobJson["lora_model_dir"] = job.loraModelDir;
- }
- if (!job.esrganPath.empty()) {
- jobJson["esrgan_path"] = job.esrganPath;
- }
- jobJson["upscale_factor"] = job.upscaleFactor;
- // Progress and timing information
- jobJson["progress"] = job.progress;
- jobJson["model_load_progress"] = job.modelLoadProgress;
- jobJson["generation_progress"] = job.generationProgress;
- jobJson["current_step"] = job.currentStep;
- jobJson["total_steps"] = job.totalSteps;
- jobJson["time_elapsed"] = job.timeElapsed;
- jobJson["time_remaining"] = job.timeRemaining;
- jobJson["speed"] = job.speed;
- jobJson["first_generation_callback"] = job.firstGenerationCallback;
- // Output information
- jobJson["output_files"] = job.outputFiles;
- jobJson["error_message"] = job.errorMessage;
- // Write to file
- std::string filename = queueDir + "/" + job.id + ".json";
- std::ofstream file(filename);
- if (file.is_open()) {
- file << jobJson.dump(2);
- file.close();
- }
- } catch (const std::exception& e) {
- LOG_ERROR("Error saving job to file: " + std::string(e.what()));
- }
- }
- void loadJobsFromDisk() {
- try {
- if (!std::filesystem::exists(queueDir)) {
- return;
- }
- LOG_INFO("Loading persisted jobs from: " + queueDir);
- int loadedCount = 0;
- for (const auto& entry : std::filesystem::directory_iterator(queueDir)) {
- if (entry.path().extension() != ".json") {
- continue;
- }
- try {
- std::ifstream file(entry.path());
- if (!file.is_open()) {
- continue;
- }
- nlohmann::json jobJson = nlohmann::json::parse(file);
- file.close();
- // Reconstruct JobInfo
- JobInfo job;
- job.id = jobJson["id"];
- job.type = stringToJobType(jobJson["type"]);
- job.status = stringToJobStatus(jobJson["status"]);
- job.prompt = jobJson["prompt"];
- job.position = jobJson["position"];
- // Reconstruct time points
- auto queuedMs = jobJson["queued_time"].get<int64_t>();
- job.queuedTime = std::chrono::system_clock::time_point(
- std::chrono::milliseconds(queuedMs));
- if (jobJson.contains("start_time")) {
- auto startMs = jobJson["start_time"].get<int64_t>();
- job.startTime = std::chrono::system_clock::time_point(
- std::chrono::milliseconds(startMs));
- }
- if (jobJson.contains("end_time")) {
- auto endMs = jobJson["end_time"].get<int64_t>();
- job.endTime = std::chrono::system_clock::time_point(
- std::chrono::milliseconds(endMs));
- }
- if (jobJson.contains("output_files")) {
- job.outputFiles = jobJson["output_files"].get<std::vector<std::string>>();
- }
- if (jobJson.contains("error_message")) {
- job.errorMessage = jobJson["error_message"];
- }
- // Load enhanced fields (with backward compatibility)
- if (jobJson.contains("model_name")) {
- job.modelName = jobJson["model_name"];
- job.modelHash = jobJson.value("model_hash", "");
- job.modelPath = jobJson.value("model_path", "");
- }
- if (jobJson.contains("negative_prompt")) {
- job.negativePrompt = jobJson["negative_prompt"];
- }
- if (jobJson.contains("width")) job.width = jobJson["width"];
- if (jobJson.contains("height")) job.height = jobJson["height"];
- if (jobJson.contains("batch_count")) job.batchCount = jobJson["batch_count"];
- if (jobJson.contains("steps")) job.steps = jobJson["steps"];
- if (jobJson.contains("cfg_scale")) job.cfgScale = jobJson["cfg_scale"];
- if (jobJson.contains("sampling_method")) {
- std::string samplingMethodStr = jobJson["sampling_method"];
- job.samplingMethod = stringToSamplingMethod(samplingMethodStr);
- }
- if (jobJson.contains("scheduler")) {
- std::string schedulerStr = jobJson["scheduler"];
- job.scheduler = stringToScheduler(schedulerStr);
- }
- if (jobJson.contains("seed")) job.seed = jobJson["seed"];
- if (jobJson.contains("actual_seed")) job.actualSeed = jobJson["actual_seed"];
- if (jobJson.contains("request_type")) job.requestType = jobJson["request_type"];
- if (jobJson.contains("strength")) job.strength = jobJson["strength"];
- if (jobJson.contains("control_strength")) job.controlStrength = jobJson["control_strength"];
- if (jobJson.contains("clip_skip")) job.clipSkip = jobJson["clip_skip"];
- if (jobJson.contains("n_threads")) job.nThreads = jobJson["n_threads"];
- if (jobJson.contains("offload_params_to_cpu")) job.offloadParamsToCpu = jobJson["offload_params_to_cpu"];
- if (jobJson.contains("clip_on_cpu")) job.clipOnCpu = jobJson["clip_on_cpu"];
- if (jobJson.contains("vae_on_cpu")) job.vaeOnCpu = jobJson["vae_on_cpu"];
- if (jobJson.contains("diffusion_flash_attn")) job.diffusionFlashAttn = jobJson["diffusion_flash_attn"];
- if (jobJson.contains("diffusion_conv_direct")) job.diffusionConvDirect = jobJson["diffusion_conv_direct"];
- if (jobJson.contains("vae_conv_direct")) job.vaeConvDirect = jobJson["vae_conv_direct"];
- if (jobJson.contains("generation_time")) job.generationTime = jobJson["generation_time"];
- // Image paths for complex operations
- if (jobJson.contains("init_image_path")) job.initImageData = jobJson["init_image_path"];
- if (jobJson.contains("control_image_path")) job.controlImageData = jobJson["control_image_path"];
- if (jobJson.contains("mask_image_path")) job.maskImageData = jobJson["mask_image_path"];
- // Model paths for advanced usage
- if (jobJson.contains("clip_l_path")) job.clipLPath = jobJson["clip_l_path"];
- if (jobJson.contains("clip_g_path")) job.clipGPath = jobJson["clip_g_path"];
- if (jobJson.contains("vae_path")) job.vaePath = jobJson["vae_path"];
- if (jobJson.contains("taesd_path")) job.taesdPath = jobJson["taesd_path"];
- if (jobJson.contains("controlnet_path")) job.controlNetPath = jobJson["controlnet_path"];
- if (jobJson.contains("embedding_dir")) job.embeddingDir = jobJson["embedding_dir"];
- if (jobJson.contains("lora_model_dir")) job.loraModelDir = jobJson["lora_model_dir"];
- if (jobJson.contains("esrgan_path")) job.esrganPath = jobJson["esrgan_path"];
- if (jobJson.contains("upscale_factor")) job.upscaleFactor = jobJson["upscale_factor"];
- // Progress and timing information
- if (jobJson.contains("progress")) job.progress = jobJson["progress"];
- if (jobJson.contains("model_load_progress")) job.modelLoadProgress = jobJson["model_load_progress"];
- if (jobJson.contains("generation_progress")) job.generationProgress = jobJson["generation_progress"];
- if (jobJson.contains("current_step")) job.currentStep = jobJson["current_step"];
- if (jobJson.contains("total_steps")) job.totalSteps = jobJson["total_steps"];
- if (jobJson.contains("time_elapsed")) job.timeElapsed = jobJson["time_elapsed"];
- if (jobJson.contains("time_remaining")) job.timeRemaining = jobJson["time_remaining"];
- if (jobJson.contains("speed")) job.speed = jobJson["speed"];
- if (jobJson.contains("first_generation_callback")) {
- job.firstGenerationCallback = jobJson["first_generation_callback"];
- } else {
- // For backward compatibility, default to true for loaded jobs
- job.firstGenerationCallback = true;
- }
- // Clean up stale processing jobs from server restart
- if (job.status == GenerationStatus::PROCESSING) {
- job.status = GenerationStatus::FAILED;
- job.errorMessage = "Server restarted while job was processing";
- job.endTime = std::chrono::system_clock::now();
- LOG_DEBUG("Marked stale job as failed: " + job.id);
- // Persist updated status to disk
- saveJobToFile(job);
- }
- // Add to active jobs
- std::lock_guard<std::mutex> lock(jobsMutex);
- activeJobs[job.id] = job;
- loadedCount++;
- } catch (const std::exception& e) {
- LOG_ERROR("Error loading job from " + entry.path().string() + ": " + std::string(e.what()));
- }
- }
- if (loadedCount > 0) {
- LOG_INFO("Loaded " + std::to_string(loadedCount) + " persisted job(s)");
- }
- } catch (const std::exception& e) {
- LOG_ERROR("Error loading jobs from disk: " + std::string(e.what()));
- }
- }
- HashResult performHashJob(const HashRequest& request) {
- HashResult result;
- result.requestId = request.id;
- result.success = false;
- result.modelsHashed = 0;
- auto startTime = std::chrono::system_clock::now();
- if (!modelManager) {
- result.errorMessage = "Model manager not available";
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Get list of models to hash
- std::vector<std::string> modelsToHash;
- if (request.modelNames.empty()) {
- // Hash all models without hashes
- auto allModels = modelManager->getAllModels();
- for (const auto& [name, info] : allModels) {
- if (info.sha256.empty() || request.forceRehash) {
- modelsToHash.push_back(name);
- }
- }
- } else {
- modelsToHash = request.modelNames;
- }
- LOG_DEBUG("Hashing " + std::to_string(modelsToHash.size()) + " model(s)...");
- // Hash each model
- for (const auto& modelName : modelsToHash) {
- std::string hash = modelManager->ensureModelHash(modelName, request.forceRehash);
- if (!hash.empty()) {
- result.modelHashes[modelName] = hash;
- result.modelsHashed++;
- } else {
- LOG_ERROR("Failed to hash model: " + modelName);
- }
- }
- auto endTime = std::chrono::system_clock::now();
- result.hashingTime = std::chrono::duration_cast<std::chrono::milliseconds>(
- endTime - startTime).count();
- result.success = result.modelsHashed > 0;
- result.status = result.success ? GenerationStatus::COMPLETED : GenerationStatus::FAILED;
- if (!result.success) {
- result.errorMessage = "Failed to hash any models";
- }
- return result;
- }
- ConversionResult performConversionJob(const ConversionRequest& request) {
- ConversionResult result;
- result.requestId = request.id;
- result.success = false;
- auto startTime = std::chrono::system_clock::now();
- // Conversion start output removed from stdout
- // Check if input file exists
- namespace fs = std::filesystem;
- if (!fs::exists(request.modelPath)) {
- result.errorMessage = "Input model file not found: " + request.modelPath;
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Get original file size
- try {
- auto originalSize = fs::file_size(request.modelPath);
- result.originalSize = formatFileSize(originalSize);
- } catch (const std::exception& e) {
- result.originalSize = "Unknown";
- }
- // Build conversion command
- // Get the sd binary path from the CMake installation directory
- std::string sdBinaryPath = "../build/stable-diffusion.cpp-install/bin/sd";
- std::stringstream cmd;
- cmd << sdBinaryPath << " --mode convert";
- cmd << " -m \"" << request.modelPath << "\"";
- cmd << " -o \"" << request.outputPath << "\"";
- cmd << " --type " << request.quantizationType;
- cmd << " 2>&1"; // Capture stderr
- // Command execution output removed from stdout
- // Execute conversion
- FILE* pipe = popen(cmd.str().c_str(), "r");
- if (!pipe) {
- result.errorMessage = "Failed to execute conversion command";
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Read command output
- char buffer[256];
- std::string output;
- while (fgets(buffer, sizeof(buffer), pipe) != nullptr) {
- output += buffer;
- // Progress output removed from stdout
- }
- int exitCode = pclose(pipe);
- auto endTime = std::chrono::system_clock::now();
- result.conversionTime = std::chrono::duration_cast<std::chrono::milliseconds>(
- endTime - startTime).count();
- if (exitCode != 0) {
- result.errorMessage = "Conversion failed with exit code " + std::to_string(exitCode);
- if (!output.empty()) {
- result.errorMessage += "\nOutput: " + output;
- }
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Check if output file was created
- if (!fs::exists(request.outputPath)) {
- result.errorMessage = "Output file was not created: " + request.outputPath;
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Get converted file size
- try {
- auto convertedSize = fs::file_size(request.outputPath);
- result.convertedSize = formatFileSize(convertedSize);
- } catch (const std::exception& e) {
- result.convertedSize = "Unknown";
- }
- result.success = true;
- result.status = GenerationStatus::COMPLETED;
- result.outputPath = request.outputPath;
- LOG_DEBUG("Conversion completed successfully!");
- LOG_DEBUG(" Original size: " + result.originalSize);
- LOG_DEBUG(" Converted size: " + result.convertedSize);
- LOG_DEBUG(" Time: " + std::to_string(result.conversionTime) + "ms");
- // Trigger model rescan after successful conversion
- if (modelManager) {
- LOG_DEBUG("Triggering model rescan...");
- modelManager->scanModelsDirectory();
- }
- return result;
- }
- std::string formatFileSize(size_t bytes) {
- const char* units[] = {"B", "KB", "MB", "GB", "TB"};
- int unitIndex = 0;
- double size = static_cast<double>(bytes);
- while (size >= 1024.0 && unitIndex < 4) {
- size /= 1024.0;
- unitIndex++;
- }
- std::stringstream ss;
- ss << std::fixed << std::setprecision(2) << size << " " << units[unitIndex];
- return ss.str();
- }
- // Helper function to populate JobInfo from GenerationRequest
- void populateJobInfoFromRequest(JobInfo& jobInfo, const GenerationRequest& request) {
- // Model information
- jobInfo.modelName = request.modelName;
- jobInfo.modelHash = modelManager ? modelManager->getModelInfo(request.modelName).sha256 : "";
- jobInfo.modelPath = modelManager ? modelManager->getModelInfo(request.modelName).path : "";
- // Generation parameters
- jobInfo.negativePrompt = request.negativePrompt;
- jobInfo.width = request.width;
- jobInfo.height = request.height;
- jobInfo.batchCount = request.batchCount;
- jobInfo.steps = request.steps;
- jobInfo.cfgScale = request.cfgScale;
- jobInfo.samplingMethod = request.samplingMethod;
- jobInfo.scheduler = request.scheduler;
- jobInfo.seed = request.seed;
- jobInfo.strength = request.strength;
- jobInfo.controlStrength = request.controlStrength;
- jobInfo.clipSkip = request.clipSkip;
- jobInfo.nThreads = request.nThreads;
- jobInfo.offloadParamsToCpu = request.offloadParamsToCpu;
- jobInfo.clipOnCpu = request.clipOnCpu;
- jobInfo.vaeOnCpu = request.vaeOnCpu;
- jobInfo.diffusionFlashAttn = request.diffusionFlashAttn;
- jobInfo.diffusionConvDirect = request.diffusionConvDirect;
- jobInfo.vaeConvDirect = request.vaeConvDirect;
- // Request type - store image paths instead of image data
- switch (request.requestType) {
- case GenerationRequest::RequestType::TEXT2IMG:
- jobInfo.requestType = "text2img";
- break;
- case GenerationRequest::RequestType::IMG2IMG:
- jobInfo.requestType = "img2img";
- // Store path to init image instead of the image data
- jobInfo.initImageData = request.initImagePath; // Store path, not base64
- break;
- case GenerationRequest::RequestType::CONTROLNET:
- jobInfo.requestType = "controlnet";
- // Store path to control image instead of the image data
- jobInfo.controlImageData = request.controlImagePath; // Store path, not base64
- break;
- case GenerationRequest::RequestType::UPSCALER:
- jobInfo.requestType = "upscaler";
- // Store path to input image instead of the image data
- jobInfo.initImageData = request.initImagePath; // Store path, not base64
- break;
- case GenerationRequest::RequestType::INPAINTING:
- jobInfo.requestType = "inpainting";
- // Store paths to images instead of the image data
- jobInfo.initImageData = request.initImagePath; // Store path, not base64
- jobInfo.maskImageData = request.maskImagePath; // Store path, not base64
- break;
- }
- // Model paths
- jobInfo.clipLPath = request.clipLPath;
- jobInfo.clipGPath = request.clipGPath;
- jobInfo.vaePath = request.vaePath;
- jobInfo.taesdPath = request.taesdPath;
- jobInfo.controlNetPath = request.controlNetPath;
- jobInfo.embeddingDir = request.embeddingDir;
- jobInfo.loraModelDir = request.loraModelDir;
- jobInfo.esrganPath = request.esrganPath;
- jobInfo.upscaleFactor = request.upscaleFactor;
- }
- };
- GenerationQueue::GenerationQueue(ModelManager* modelManager, int maxConcurrentGenerations,
- const std::string& queueDir, const std::string& outputDir)
- : pImpl(std::make_unique<Impl>()) {
- pImpl->modelManager = modelManager;
- pImpl->maxConcurrentGenerations = maxConcurrentGenerations;
- pImpl->queueDir = queueDir;
- pImpl->outputDir = outputDir;
- LOG_INFO("GenerationQueue initialized");
- LOG_INFO(" Max concurrent generations: " + std::to_string(maxConcurrentGenerations));
- LOG_INFO(" Queue directory: " + queueDir);
- LOG_INFO(" Output directory: " + outputDir);
- // Load any existing jobs from disk
- pImpl->loadJobsFromDisk();
- }
- GenerationQueue::~GenerationQueue() {
- stop();
- }
- std::future<GenerationResult> GenerationQueue::enqueueRequest(const GenerationRequest& request) {
- LOG_DEBUG("Enqueuing generation request: " + request.id);
- LOG_DEBUG(" Prompt: " + request.prompt.substr(0, 100) +
- (request.prompt.length() > 100 ? "..." : ""));
- LOG_DEBUG(" Model: " + request.modelName);
- LOG_DEBUG(" Size: " + std::to_string(request.width) + "x" + std::to_string(request.height));
- LOG_DEBUG(" Steps: " + std::to_string(request.steps) + ", CFG: " + std::to_string(request.cfgScale));
- // Create promise and future
- auto promise = std::make_shared<std::promise<GenerationResult>>();
- auto future = promise->get_future();
- // Store the promise
- {
- std::lock_guard<std::mutex> lock(pImpl->jobsMutex);
- pImpl->jobPromises[request.id] = std::move(*promise);
- }
- // Add to queue
- {
- std::lock_guard<std::mutex> lock(pImpl->queueMutex);
- // Create job info with enhanced data
- JobInfo jobInfo;
- jobInfo.id = request.id;
- jobInfo.type = JobType::GENERATION;
- jobInfo.status = GenerationStatus::QUEUED;
- jobInfo.prompt = request.prompt; // Store full prompt
- jobInfo.queuedTime = std::chrono::system_clock::now();
- jobInfo.position = pImpl->requestQueue.size() + 1;
- // Populate all enhanced fields from the request
- pImpl->populateJobInfoFromRequest(jobInfo, request);
- // Store job info
- {
- std::lock_guard<std::mutex> jobsLock(pImpl->jobsMutex);
- pImpl->activeJobs[request.id] = jobInfo;
- }
- // Persist to disk
- pImpl->saveJobToFile(jobInfo);
- pImpl->requestQueue.push(request);
- pImpl->queueSize.store(pImpl->requestQueue.size());
- }
- // Notify worker thread
- pImpl->queueCondition.notify_one();
- return future;
- }
- std::future<HashResult> GenerationQueue::enqueueHashRequest(const HashRequest& request) {
- auto promise = std::make_shared<std::promise<HashResult>>();
- auto future = promise->get_future();
- std::unique_lock<std::mutex> lock(pImpl->queueMutex);
- // Create a generation request that acts as a placeholder for hash job
- GenerationRequest hashJobPlaceholder;
- hashJobPlaceholder.id = request.id;
- hashJobPlaceholder.prompt = "HASH_JOB"; // Special marker
- hashJobPlaceholder.modelName = request.modelNames.empty() ? "ALL_MODELS" : request.modelNames[0];
- // Store promise for retrieval later
- pImpl->hashPromises[request.id] = promise;
- pImpl->hashRequests[request.id] = request;
- pImpl->requestQueue.push(hashJobPlaceholder);
- pImpl->queueCondition.notify_one();
- LOG_DEBUG("Enqueued hash request: " + request.id);
- return future;
- }
- std::future<ConversionResult> GenerationQueue::enqueueConversionRequest(const ConversionRequest& request) {
- auto promise = std::make_shared<std::promise<ConversionResult>>();
- auto future = promise->get_future();
- std::unique_lock<std::mutex> lock(pImpl->queueMutex);
- // Create a generation request that acts as a placeholder for conversion job
- GenerationRequest conversionJobPlaceholder;
- conversionJobPlaceholder.id = request.id;
- conversionJobPlaceholder.prompt = "CONVERSION_JOB"; // Special marker
- conversionJobPlaceholder.modelName = request.modelName;
- // Store promise for retrieval later
- pImpl->conversionPromises[request.id] = promise;
- pImpl->conversionRequests[request.id] = request;
- pImpl->requestQueue.push(conversionJobPlaceholder);
- pImpl->queueCondition.notify_one();
- LOG_DEBUG("Enqueued conversion request: " + request.id + " (model: " + request.modelName + ", type: " + request.quantizationType + ")");
- return future;
- }
- size_t GenerationQueue::getQueueSize() const {
- return pImpl->queueSize.load();
- }
- size_t GenerationQueue::getActiveGenerations() const {
- return pImpl->activeGenerations.load();
- }
- std::vector<JobInfo> GenerationQueue::getQueueStatus() const {
- std::vector<JobInfo> jobs;
- std::lock_guard<std::mutex> lock(pImpl->jobsMutex);
- jobs.reserve(pImpl->activeJobs.size());
- for (const auto& pair : pImpl->activeJobs) {
- jobs.push_back(pair.second);
- }
- // Sort by queued time, then by status
- std::sort(jobs.begin(), jobs.end(), [](const JobInfo& a, const JobInfo& b) {
- if (a.status != b.status) {
- return static_cast<int>(a.status) < static_cast<int>(b.status);
- }
- return a.queuedTime < b.queuedTime;
- });
- return jobs;
- }
- JobInfo GenerationQueue::getJobInfo(const std::string& jobId) const {
- std::lock_guard<std::mutex> lock(pImpl->jobsMutex);
- auto it = pImpl->activeJobs.find(jobId);
- if (it != pImpl->activeJobs.end()) {
- return it->second;
- }
- return JobInfo{}; // Return empty job info if not found
- }
- bool GenerationQueue::cancelJob(const std::string& jobId) {
- std::lock_guard<std::mutex> queueLock(pImpl->queueMutex);
- std::lock_guard<std::mutex> jobsLock(pImpl->jobsMutex);
- // Check if job is still queued
- std::queue<GenerationRequest> newQueue;
- bool found = false;
- while (!pImpl->requestQueue.empty()) {
- GenerationRequest request = pImpl->requestQueue.front();
- pImpl->requestQueue.pop();
- if (request.id == jobId) {
- found = true;
- // Update job status
- auto it = pImpl->activeJobs.find(jobId);
- if (it != pImpl->activeJobs.end()) {
- it->second.status = GenerationStatus::FAILED;
- it->second.endTime = std::chrono::system_clock::now();
- }
- // Set promise with cancellation error
- auto promiseIt = pImpl->jobPromises.find(jobId);
- if (promiseIt != pImpl->jobPromises.end()) {
- GenerationResult result;
- result.requestId = jobId;
- result.success = false;
- result.errorMessage = "Job cancelled by user";
- result.generationTime = 0;
- promiseIt->second.set_value(result);
- pImpl->jobPromises.erase(promiseIt);
- }
- } else {
- newQueue.push(request);
- }
- }
- pImpl->requestQueue = newQueue;
- pImpl->queueSize.store(pImpl->requestQueue.size());
- return found;
- }
- void GenerationQueue::clearQueue() {
- LOG_DEBUG("Clearing generation queue");
- std::lock_guard<std::mutex> queueLock(pImpl->queueMutex);
- std::lock_guard<std::mutex> jobsLock(pImpl->jobsMutex);
- // Cancel all queued jobs
- while (!pImpl->requestQueue.empty()) {
- GenerationRequest request = pImpl->requestQueue.front();
- pImpl->requestQueue.pop();
- // Update job status
- auto it = pImpl->activeJobs.find(request.id);
- if (it != pImpl->activeJobs.end()) {
- it->second.status = GenerationStatus::FAILED;
- it->second.endTime = std::chrono::system_clock::now();
- }
- // Set promise with cancellation error
- auto promiseIt = pImpl->jobPromises.find(request.id);
- if (promiseIt != pImpl->jobPromises.end()) {
- GenerationResult result;
- result.requestId = request.id;
- result.success = false;
- result.errorMessage = "Queue cleared";
- result.generationTime = 0;
- promiseIt->second.set_value(result);
- pImpl->jobPromises.erase(promiseIt);
- }
- }
- pImpl->queueSize.store(0);
- }
- void GenerationQueue::start() {
- if (pImpl->running.load()) {
- LOG_DEBUG("GenerationQueue is already running");
- return;
- }
- pImpl->running.store(true);
- pImpl->stopRequested.store(false);
- pImpl->workerThread = std::thread(&Impl::workerThreadFunction, pImpl.get());
- LOG_DEBUG("GenerationQueue started");
- }
- void GenerationQueue::stop() {
- if (!pImpl->running.load()) {
- return;
- }
- LOG_DEBUG("Stopping GenerationQueue...");
- pImpl->stopRequested.store(true);
- pImpl->queueCondition.notify_all();
- if (pImpl->workerThread.joinable()) {
- pImpl->workerThread.join();
- }
- pImpl->running.store(false);
- // Clear any remaining promises
- std::lock_guard<std::mutex> lock(pImpl->jobsMutex);
- for (auto& pair : pImpl->jobPromises) {
- GenerationResult result;
- result.requestId = pair.first;
- result.success = false;
- result.errorMessage = "Queue stopped";
- result.generationTime = 0;
- pair.second.set_value(result);
- }
- pImpl->jobPromises.clear();
- LOG_DEBUG("GenerationQueue stopped");
- }
- bool GenerationQueue::isRunning() const {
- return pImpl->running.load();
- }
- void GenerationQueue::setMaxConcurrentGenerations(int maxConcurrent) {
- pImpl->maxConcurrentGenerations = maxConcurrent;
- LOG_DEBUG("GenerationQueue max concurrent generations set to: " + std::to_string(maxConcurrent));
- }
|