| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196 |
- #include "generation_queue.h"
- #include "model_manager.h"
- #include "stable_diffusion_wrapper.h"
- #include "utils.h"
- #include <iostream>
- #include <random>
- #include <sstream>
- #include <iomanip>
- #include <algorithm>
- #include <fstream>
- #include <filesystem>
- #include <nlohmann/json.hpp>
- #define STB_IMAGE_WRITE_IMPLEMENTATION
- #include "../stable-diffusion.cpp-src/thirdparty/stb_image_write.h"
- #define STB_IMAGE_IMPLEMENTATION
- #include "../stable-diffusion.cpp-src/thirdparty/stb_image.h"
- class GenerationQueue::Impl {
- public:
- // Model manager reference
- ModelManager* modelManager = nullptr;
- // Thread management
- std::thread workerThread;
- std::atomic<bool> running{false};
- std::atomic<bool> stopRequested{false};
- // Queue management
- mutable std::mutex queueMutex;
- std::condition_variable queueCondition;
- std::queue<GenerationRequest> requestQueue;
- // Job tracking
- mutable std::mutex jobsMutex;
- std::unordered_map<std::string, JobInfo> activeJobs;
- std::unordered_map<std::string, std::promise<GenerationResult>> jobPromises;
- // Hash job tracking
- std::map<std::string, std::shared_ptr<std::promise<HashResult>>> hashPromises;
- std::map<std::string, HashRequest> hashRequests;
- // Conversion job tracking
- std::map<std::string, std::shared_ptr<std::promise<ConversionResult>>> conversionPromises;
- std::map<std::string, ConversionRequest> conversionRequests;
- // Configuration
- int maxConcurrentGenerations = 1;
- std::string queueDir = "./queue";
- std::string outputDir = "./output";
- // Statistics
- std::atomic<size_t> queueSize{0};
- std::atomic<size_t> activeGenerations{0};
- std::atomic<uint64_t> totalJobsProcessed{0};
- // Worker thread function
- void workerThreadFunction() {
- std::cout << "GenerationQueue worker thread started" << std::endl;
- while (running.load() && !stopRequested.load()) {
- std::unique_lock<std::mutex> lock(queueMutex);
- // Wait for a request or stop signal
- queueCondition.wait(lock, [this] {
- return !requestQueue.empty() || stopRequested.load();
- });
- if (stopRequested.load()) {
- break;
- }
- if (requestQueue.empty()) {
- continue;
- }
- // Get the next request
- GenerationRequest request = requestQueue.front();
- requestQueue.pop();
- queueSize.store(requestQueue.size());
- lock.unlock();
- // Process the request
- processRequest(request);
- }
- std::cout << "GenerationQueue worker thread stopped" << std::endl;
- }
- void processRequest(const GenerationRequest& request) {
- // Check if this is a hash job
- if (request.prompt == "HASH_JOB") {
- auto hashIt = hashRequests.find(request.id);
- if (hashIt != hashRequests.end()) {
- HashResult result = performHashJob(hashIt->second);
- auto promiseIt = hashPromises.find(request.id);
- if (promiseIt != hashPromises.end()) {
- promiseIt->second->set_value(result);
- hashPromises.erase(promiseIt);
- }
- hashRequests.erase(hashIt);
- }
- return;
- }
- // Check if this is a conversion job
- if (request.prompt == "CONVERSION_JOB") {
- auto convIt = conversionRequests.find(request.id);
- if (convIt != conversionRequests.end()) {
- ConversionResult result = performConversionJob(convIt->second);
- auto promiseIt = conversionPromises.find(request.id);
- if (promiseIt != conversionPromises.end()) {
- promiseIt->second->set_value(result);
- conversionPromises.erase(promiseIt);
- }
- conversionRequests.erase(convIt);
- }
- return;
- }
- auto startTime = std::chrono::steady_clock::now();
- // Update job status to PROCESSING
- {
- std::lock_guard<std::mutex> lock(jobsMutex);
- if (activeJobs.find(request.id) != activeJobs.end()) {
- activeJobs[request.id].status = GenerationStatus::PROCESSING;
- activeJobs[request.id].startTime = startTime;
- activeJobs[request.id].progress = 0.0f;
- activeJobs[request.id].currentStep = 0;
- activeJobs[request.id].totalSteps = 0;
- activeJobs[request.id].timeElapsed = 0;
- activeJobs[request.id].timeRemaining = 0;
- activeJobs[request.id].speed = 0.0f;
- saveJobToFile(activeJobs[request.id]);
- }
- }
- activeGenerations.store(1); // Only one generation at a time
- std::cout << "Processing generation request: " << request.id
- << " (prompt: " << request.prompt.substr(0, 50)
- << (request.prompt.length() > 50 ? "..." : "") << ")" << std::endl;
- // Real generation logic using stable-diffusion.cpp with progress tracking
- GenerationResult result = performActualGeneration(request, request.id);
- auto endTime = std::chrono::steady_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime);
- result.generationTime = duration.count();
- // Update job status to COMPLETED/FAILED
- {
- std::lock_guard<std::mutex> lock(jobsMutex);
- if (activeJobs.find(request.id) != activeJobs.end()) {
- activeJobs[request.id].status = result.success ? GenerationStatus::COMPLETED : GenerationStatus::FAILED;
- activeJobs[request.id].endTime = endTime;
- // Set final progress to 100% if successful
- if (result.success) {
- activeJobs[request.id].progress = 1.0f;
- if (activeJobs[request.id].totalSteps > 0) {
- activeJobs[request.id].currentStep = activeJobs[request.id].totalSteps;
- }
- }
- // Store output files and error message
- activeJobs[request.id].outputFiles = result.imagePaths;
- activeJobs[request.id].errorMessage = result.errorMessage;
- // Persist to disk
- saveJobToFile(activeJobs[request.id]);
- }
- // Set the promise value
- auto it = jobPromises.find(request.id);
- if (it != jobPromises.end()) {
- it->second.set_value(result);
- jobPromises.erase(it);
- }
- }
- activeGenerations.store(0);
- totalJobsProcessed.fetch_add(1);
- std::cout << "Completed generation request: " << request.id
- << " (success: " << (result.success ? "true" : "false")
- << ", time: " << result.generationTime << "ms)";
- if (!result.success && !result.errorMessage.empty()) {
- std::cout << " - Error: " << result.errorMessage;
- }
- std::cout << std::endl;
- }
- // Progress callback that updates the job info
- void updateJobProgress(const std::string& jobId, int step, int totalSteps, float progress, uint64_t timeElapsed) {
- std::lock_guard<std::mutex> lock(jobsMutex);
- auto it = activeJobs.find(jobId);
- if (it != activeJobs.end()) {
- it->second.progress = progress;
- it->second.currentStep = step;
- it->second.totalSteps = totalSteps;
- it->second.timeElapsed = static_cast<int64_t>(timeElapsed);
- // Calculate time remaining and speed
- if (step > 0 && timeElapsed > 0) {
- double avgStepTime = static_cast<double>(timeElapsed) / step;
- int remainingSteps = totalSteps - step;
- it->second.timeRemaining = static_cast<int64_t>(avgStepTime * remainingSteps);
- it->second.speed = 1000.0 / avgStepTime; // steps per second
- }
- // Save progress to file periodically (every 10 steps or on significant progress changes)
- if (step % 10 == 0 || progress >= 0.99f) {
- saveJobToFile(it->second);
- }
- }
- }
- GenerationResult performActualGeneration(const GenerationRequest& request, const std::string& jobId) {
- GenerationResult result;
- result.requestId = request.id;
- result.success = false;
- // Check if model manager is available
- if (!modelManager) {
- result.errorMessage = "Model manager not available";
- return result;
- }
- // Check if the model is loaded (DO NOT auto-load)
- if (!modelManager->isModelLoaded(request.modelName)) {
- result.errorMessage = "Model not loaded: " + request.modelName + ". Please load the model first using POST /api/models/{hash}/load";
- return result;
- }
- // Get the model wrapper from the shared model manager
- auto* modelWrapper = modelManager->getModel(request.modelName);
- if (!modelWrapper) {
- result.errorMessage = "Model not found or not loaded: " + request.modelName;
- return result;
- }
- // Prepare generation parameters
- StableDiffusionWrapper::GenerationParams params;
- params.prompt = request.prompt;
- params.negativePrompt = request.negativePrompt;
- params.width = request.width;
- params.height = request.height;
- params.batchCount = request.batchCount;
- params.steps = request.steps;
- params.cfgScale = request.cfgScale;
- params.samplingMethod = samplingMethodToString(request.samplingMethod);
- params.scheduler = schedulerToString(request.scheduler);
- params.clipSkip = request.clipSkip;
- params.strength = request.strength;
- params.controlStrength = request.controlStrength;
- params.nThreads = request.nThreads;
- params.offloadParamsToCpu = request.offloadParamsToCpu;
- params.clipOnCpu = request.clipOnCpu;
- params.vaeOnCpu = request.vaeOnCpu;
- params.diffusionFlashAttn = request.diffusionFlashAttn;
- params.diffusionConvDirect = request.diffusionConvDirect;
- params.vaeConvDirect = request.vaeConvDirect;
- // Set model paths if provided
- params.modelPath = modelManager->getModelInfo(request.modelName).path;
- params.clipLPath = request.clipLPath;
- params.clipGPath = request.clipGPath;
- params.vaePath = request.vaePath;
- params.taesdPath = request.taesdPath;
- params.controlNetPath = request.controlNetPath;
- params.embeddingDir = request.embeddingDir;
- params.loraModelDir = request.loraModelDir;
- // Parse seed
- if (request.seed == "random") {
- std::random_device rd;
- std::mt19937 gen(rd());
- std::uniform_int_distribution<int64_t> dis;
- params.seed = dis(gen);
- } else {
- try {
- params.seed = std::stoll(request.seed);
- } catch (...) {
- params.seed = 42; // Default seed
- }
- }
- result.actualSeed = params.seed;
- // Generate images based on request type with progress tracking
- try {
- std::vector<StableDiffusionWrapper::GeneratedImage> generatedImages;
- // Create progress callback that updates job info
- auto progressCallback = [this, jobId](int step, int totalSteps, float progress, void* userData) {
- // Calculate time elapsed from start time (stored in userData)
- auto startTime = userData ? *static_cast<std::chrono::steady_clock::time_point*>(userData) : std::chrono::steady_clock::now();
- auto currentTime = std::chrono::steady_clock::now();
- uint64_t timeElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - startTime).count();
- updateJobProgress(jobId, step, totalSteps, progress, timeElapsed);
- };
- // Store start time to pass as user data
- auto generationStartTime = std::chrono::steady_clock::now();
- switch (request.requestType) {
- case GenerationRequest::RequestType::TEXT2IMG:
- generatedImages = modelWrapper->generateImage(params, progressCallback, &generationStartTime);
- break;
- case GenerationRequest::RequestType::IMG2IMG:
- if (request.initImageData.empty()) {
- result.errorMessage = "No init image data provided for img2img";
- return result;
- }
- generatedImages = modelWrapper->generateImageImg2Img(
- params,
- request.initImageData,
- request.initImageWidth,
- request.initImageHeight,
- progressCallback,
- &generationStartTime
- );
- break;
- case GenerationRequest::RequestType::CONTROLNET:
- if (request.controlImageData.empty()) {
- result.errorMessage = "No control image data provided for ControlNet";
- return result;
- }
- generatedImages = modelWrapper->generateImageControlNet(
- params,
- request.controlImageData,
- request.controlImageWidth,
- request.controlImageHeight,
- progressCallback,
- &generationStartTime
- );
- break;
- case GenerationRequest::RequestType::UPSCALER:
- if (request.initImageData.empty()) {
- result.errorMessage = "No input image data provided for upscaling";
- return result;
- }
- if (request.esrganPath.empty()) {
- result.errorMessage = "No ESRGAN model path provided for upscaling";
- return result;
- }
- {
- auto upscaledImage = modelWrapper->upscaleImage(
- request.esrganPath,
- request.initImageData,
- request.initImageWidth,
- request.initImageHeight,
- request.initImageChannels,
- request.upscaleFactor,
- request.nThreads,
- request.offloadParamsToCpu,
- request.diffusionConvDirect
- );
- generatedImages.push_back(upscaledImage);
- }
- break;
- case GenerationRequest::RequestType::INPAINTING:
- if (request.initImageData.empty()) {
- result.errorMessage = "No source image data provided for inpainting";
- return result;
- }
- if (request.maskImageData.empty()) {
- result.errorMessage = "No mask image data provided for inpainting";
- return result;
- }
- generatedImages = modelWrapper->generateImageInpainting(
- params,
- request.initImageData,
- request.initImageWidth,
- request.initImageHeight,
- request.maskImageData,
- request.maskImageWidth,
- request.maskImageHeight,
- progressCallback,
- &generationStartTime
- );
- break;
- default:
- result.errorMessage = "Unknown request type";
- return result;
- }
- if (generatedImages.empty()) {
- result.errorMessage = "Failed to generate images: " + modelWrapper->getLastError();
- return result;
- }
- // Save generated images to files
- for (size_t i = 0; i < generatedImages.size(); i++) {
- const auto& image = generatedImages[i];
- std::string imagePath = saveImageToFile(image, request.id, i);
- if (!imagePath.empty()) {
- result.imagePaths.push_back(imagePath);
- } else {
- result.errorMessage = "Failed to save generated image " + std::to_string(i);
- return result;
- }
- }
- result.success = true;
- result.generationTime = generatedImages.empty() ? 0 : generatedImages[0].generationTime;
- result.errorMessage = "";
- } catch (const std::exception& e) {
- result.errorMessage = "Exception during generation: " + std::string(e.what());
- }
- return result;
- }
- std::string saveImageToFile(const StableDiffusionWrapper::GeneratedImage& image, const std::string& requestId, size_t index) {
- // Create job-specific output directory
- std::string jobOutputDir = outputDir + "/" + requestId;
- std::error_code ec;
- std::filesystem::create_directories(jobOutputDir, ec);
- if (ec) {
- std::cerr << "Failed to create output directory " << jobOutputDir
- << ": " << ec.message() << std::endl;
- return "";
- }
- // Generate filename
- std::stringstream ss;
- ss << jobOutputDir << "/" << requestId << "_" << index << ".png";
- std::string filename = ss.str();
- std::cout << "Attempting to save image to: " << filename << std::endl;
- // Check if image data is valid
- if (image.data.empty() || image.width <= 0 || image.height <= 0) {
- std::cerr << "Invalid image data for " << requestId << "_" << index
- << ": width=" << image.width
- << ", height=" << image.height
- << ", channels=" << image.channels
- << ", data_size=" << image.data.size() << std::endl;
- return "";
- }
- // Validate image data integrity
- const size_t expectedDataSize = static_cast<size_t>(image.width) * image.height * image.channels;
- if (image.data.size() != expectedDataSize) {
- std::cerr << "Image data size mismatch for " << requestId << "_" << index
- << ": expected=" << expectedDataSize
- << ", actual=" << image.data.size() << std::endl;
- // Continue anyway, but log the warning
- }
- // Check if we can write to the directory
- std::ofstream testFile(filename + ".test");
- if (!testFile.is_open()) {
- std::cerr << "Cannot write to directory " << jobOutputDir
- << ": permission denied or disk full" << std::endl;
- return "";
- }
- testFile.close();
- std::filesystem::remove(filename + ".test");
- // Write PNG file using stb_image_write with detailed error logging
- std::cout << "Writing PNG file: " << filename
- << " (size: " << image.width << "x" << image.height
- << "x" << image.channels << ")" << std::endl;
- int result = stbi_write_png(
- filename.c_str(),
- image.width,
- image.height,
- image.channels,
- image.data.data(),
- image.width * image.channels // stride in bytes
- );
- if (result == 0) {
- std::cerr << "stbi_write_png failed for " << filename << std::endl;
- // Try to get more detailed error information
- std::cerr << "Image details:" << std::endl;
- std::cerr << " Dimensions: " << image.width << "x" << image.height << std::endl;
- std::cerr << " Channels: " << image.channels << std::endl;
- std::cerr << " Data size: " << image.data.size() << " bytes" << std::endl;
- std::cerr << " Expected size: " << expectedDataSize << " bytes" << std::endl;
- std::cerr << " Stride: " << (image.width * image.channels) << " bytes" << std::endl;
- // Check if file was created but is empty
- if (std::filesystem::exists(filename)) {
- auto fileSize = std::filesystem::file_size(filename);
- std::cerr << " File exists but size is: " << fileSize << " bytes" << std::endl;
- if (fileSize == 0) {
- std::cerr << " ERROR: Zero-byte file created - stbi_write_png returned false but file exists" << std::endl;
- }
- } else {
- std::cerr << " File was not created" << std::endl;
- }
- // Check disk space
- try {
- auto space = std::filesystem::space(jobOutputDir);
- std::cerr << " Available disk space: " << (space.available / (1024 * 1024)) << " MB" << std::endl;
- } catch (const std::exception& e) {
- std::cerr << " Could not check disk space: " << e.what() << std::endl;
- }
- return "";
- }
- // Verify the file was created successfully and has content
- if (!std::filesystem::exists(filename)) {
- std::cerr << "ERROR: stbi_write_png returned success but file does not exist: " << filename << std::endl;
- return "";
- }
- auto fileSize = std::filesystem::file_size(filename);
- if (fileSize == 0) {
- std::cerr << "ERROR: stbi_write_png returned success but created zero-byte file: " << filename << std::endl;
- return "";
- }
- std::cout << "Successfully saved generated image to: " << filename
- << " (" << image.width << "x" << image.height
- << ", " << image.channels << " channels, "
- << image.data.size() << " data bytes, "
- << fileSize << " file bytes)" << std::endl;
- return filename;
- }
- std::string samplingMethodToString(SamplingMethod method) {
- switch (method) {
- case SamplingMethod::EULER: return "euler";
- case SamplingMethod::EULER_A: return "euler_a";
- case SamplingMethod::HEUN: return "heun";
- case SamplingMethod::DPM2: return "dpm2";
- case SamplingMethod::DPMPP2S_A: return "dpmpp2s_a";
- case SamplingMethod::DPMPP2M: return "dpmpp2m";
- case SamplingMethod::DPMPP2MV2: return "dpmpp2mv2";
- case SamplingMethod::IPNDM: return "ipndm";
- case SamplingMethod::IPNDM_V: return "ipndm_v";
- case SamplingMethod::LCM: return "lcm";
- case SamplingMethod::DDIM_TRAILING: return "ddim_trailing";
- case SamplingMethod::TCD: return "tcd";
- default: return "euler";
- }
- }
- std::string schedulerToString(Scheduler scheduler) {
- switch (scheduler) {
- case Scheduler::DISCRETE: return "discrete";
- case Scheduler::KARRAS: return "karras";
- case Scheduler::EXPONENTIAL: return "exponential";
- case Scheduler::AYS: return "ays";
- case Scheduler::GITS: return "gits";
- case Scheduler::SMOOTHSTEP: return "smoothstep";
- case Scheduler::SGM_UNIFORM: return "sgm_uniform";
- case Scheduler::SIMPLE: return "simple";
- default: return "default";
- }
- }
- std::string jobStatusToString(GenerationStatus status) {
- switch (status) {
- case GenerationStatus::QUEUED: return "queued";
- case GenerationStatus::PROCESSING: return "processing";
- case GenerationStatus::COMPLETED: return "completed";
- case GenerationStatus::FAILED: return "failed";
- default: return "unknown";
- }
- }
- GenerationStatus stringToJobStatus(const std::string& status) {
- if (status == "queued") return GenerationStatus::QUEUED;
- if (status == "processing") return GenerationStatus::PROCESSING;
- if (status == "completed") return GenerationStatus::COMPLETED;
- if (status == "failed") return GenerationStatus::FAILED;
- return GenerationStatus::QUEUED;
- }
- std::string jobTypeToString(JobType type) {
- switch (type) {
- case JobType::GENERATION: return "generation";
- case JobType::HASHING: return "hashing";
- default: return "unknown";
- }
- }
- JobType stringToJobType(const std::string& type) {
- if (type == "generation") return JobType::GENERATION;
- if (type == "hashing") return JobType::HASHING;
- return JobType::GENERATION;
- }
- void saveJobToFile(const JobInfo& job) {
- try {
- // Create queue directory if it doesn't exist
- std::filesystem::create_directories(queueDir);
- // Create JSON object
- nlohmann::json jobJson;
- jobJson["id"] = job.id;
- jobJson["type"] = jobTypeToString(job.type);
- jobJson["status"] = jobStatusToString(job.status);
- jobJson["prompt"] = job.prompt;
- jobJson["position"] = job.position;
- // Convert time points to milliseconds since epoch
- auto queuedMs = std::chrono::duration_cast<std::chrono::milliseconds>(
- job.queuedTime.time_since_epoch()).count();
- jobJson["queued_time"] = queuedMs;
- if (job.status != GenerationStatus::QUEUED) {
- auto startMs = std::chrono::duration_cast<std::chrono::milliseconds>(
- job.startTime.time_since_epoch()).count();
- jobJson["start_time"] = startMs;
- }
- if (job.status == GenerationStatus::COMPLETED || job.status == GenerationStatus::FAILED) {
- auto endMs = std::chrono::duration_cast<std::chrono::milliseconds>(
- job.endTime.time_since_epoch()).count();
- jobJson["end_time"] = endMs;
- }
- jobJson["output_files"] = job.outputFiles;
- jobJson["error_message"] = job.errorMessage;
- // Write to file
- std::string filename = queueDir + "/" + job.id + ".json";
- std::ofstream file(filename);
- if (file.is_open()) {
- file << jobJson.dump(2);
- file.close();
- }
- } catch (const std::exception& e) {
- std::cerr << "Error saving job to file: " << e.what() << std::endl;
- }
- }
- void loadJobsFromDisk() {
- try {
- if (!std::filesystem::exists(queueDir)) {
- return;
- }
- std::cout << "Loading persisted jobs from: " << queueDir << std::endl;
- int loadedCount = 0;
- for (const auto& entry : std::filesystem::directory_iterator(queueDir)) {
- if (entry.path().extension() != ".json") {
- continue;
- }
- try {
- std::ifstream file(entry.path());
- if (!file.is_open()) {
- continue;
- }
- nlohmann::json jobJson = nlohmann::json::parse(file);
- file.close();
- // Reconstruct JobInfo
- JobInfo job;
- job.id = jobJson["id"];
- job.type = stringToJobType(jobJson["type"]);
- job.status = stringToJobStatus(jobJson["status"]);
- job.prompt = jobJson["prompt"];
- job.position = jobJson["position"];
- // Reconstruct time points
- auto queuedMs = jobJson["queued_time"].get<int64_t>();
- job.queuedTime = std::chrono::steady_clock::time_point(
- std::chrono::milliseconds(queuedMs));
- if (jobJson.contains("start_time")) {
- auto startMs = jobJson["start_time"].get<int64_t>();
- job.startTime = std::chrono::steady_clock::time_point(
- std::chrono::milliseconds(startMs));
- }
- if (jobJson.contains("end_time")) {
- auto endMs = jobJson["end_time"].get<int64_t>();
- job.endTime = std::chrono::steady_clock::time_point(
- std::chrono::milliseconds(endMs));
- }
- if (jobJson.contains("output_files")) {
- job.outputFiles = jobJson["output_files"].get<std::vector<std::string>>();
- }
- if (jobJson.contains("error_message")) {
- job.errorMessage = jobJson["error_message"];
- }
- // Clean up stale processing jobs from server restart
- if (job.status == GenerationStatus::PROCESSING) {
- job.status = GenerationStatus::FAILED;
- job.errorMessage = "Server restarted while job was processing";
- job.endTime = std::chrono::steady_clock::now();
- std::cout << "Marked stale job as failed: " << job.id << std::endl;
- // Persist updated status to disk
- saveJobToFile(job);
- }
- // Add to active jobs
- std::lock_guard<std::mutex> lock(jobsMutex);
- activeJobs[job.id] = job;
- loadedCount++;
- } catch (const std::exception& e) {
- std::cerr << "Error loading job from " << entry.path() << ": " << e.what() << std::endl;
- }
- }
- if (loadedCount > 0) {
- std::cout << "Loaded " << loadedCount << " persisted job(s)" << std::endl;
- }
- } catch (const std::exception& e) {
- std::cerr << "Error loading jobs from disk: " << e.what() << std::endl;
- }
- }
- HashResult performHashJob(const HashRequest& request) {
- HashResult result;
- result.requestId = request.id;
- result.success = false;
- result.modelsHashed = 0;
- auto startTime = std::chrono::steady_clock::now();
- if (!modelManager) {
- result.errorMessage = "Model manager not available";
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Get list of models to hash
- std::vector<std::string> modelsToHash;
- if (request.modelNames.empty()) {
- // Hash all models without hashes
- auto allModels = modelManager->getAllModels();
- for (const auto& [name, info] : allModels) {
- if (info.sha256.empty() || request.forceRehash) {
- modelsToHash.push_back(name);
- }
- }
- } else {
- modelsToHash = request.modelNames;
- }
- std::cout << "Hashing " << modelsToHash.size() << " model(s)..." << std::endl;
- // Hash each model
- for (const auto& modelName : modelsToHash) {
- std::string hash = modelManager->ensureModelHash(modelName, request.forceRehash);
- if (!hash.empty()) {
- result.modelHashes[modelName] = hash;
- result.modelsHashed++;
- } else {
- std::cerr << "Failed to hash model: " << modelName << std::endl;
- }
- }
- auto endTime = std::chrono::steady_clock::now();
- result.hashingTime = std::chrono::duration_cast<std::chrono::milliseconds>(
- endTime - startTime).count();
- result.success = result.modelsHashed > 0;
- result.status = result.success ? GenerationStatus::COMPLETED : GenerationStatus::FAILED;
- if (!result.success) {
- result.errorMessage = "Failed to hash any models";
- }
- return result;
- }
- ConversionResult performConversionJob(const ConversionRequest& request) {
- ConversionResult result;
- result.requestId = request.id;
- result.success = false;
- auto startTime = std::chrono::steady_clock::now();
- std::cout << "Starting model conversion: " << request.modelName << std::endl;
- std::cout << " Input: " << request.modelPath << std::endl;
- std::cout << " Output: " << request.outputPath << std::endl;
- std::cout << " Quantization: " << request.quantizationType << std::endl;
- // Check if input file exists
- namespace fs = std::filesystem;
- if (!fs::exists(request.modelPath)) {
- result.errorMessage = "Input model file not found: " + request.modelPath;
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Get original file size
- try {
- auto originalSize = fs::file_size(request.modelPath);
- result.originalSize = formatFileSize(originalSize);
- } catch (const std::exception& e) {
- result.originalSize = "Unknown";
- }
- // Build conversion command
- // Get the sd binary path from the CMake installation directory
- std::string sdBinaryPath = "../build/stable-diffusion.cpp-install/bin/sd";
- std::stringstream cmd;
- cmd << sdBinaryPath << " --mode convert";
- cmd << " -m \"" << request.modelPath << "\"";
- cmd << " -o \"" << request.outputPath << "\"";
- cmd << " --type " << request.quantizationType;
- cmd << " 2>&1"; // Capture stderr
- std::cout << "Executing: " << cmd.str() << std::endl;
- // Execute conversion
- FILE* pipe = popen(cmd.str().c_str(), "r");
- if (!pipe) {
- result.errorMessage = "Failed to execute conversion command";
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Read command output
- char buffer[256];
- std::string output;
- while (fgets(buffer, sizeof(buffer), pipe) != nullptr) {
- output += buffer;
- std::cout << buffer; // Print progress
- }
- int exitCode = pclose(pipe);
- auto endTime = std::chrono::steady_clock::now();
- result.conversionTime = std::chrono::duration_cast<std::chrono::milliseconds>(
- endTime - startTime).count();
- if (exitCode != 0) {
- result.errorMessage = "Conversion failed with exit code " + std::to_string(exitCode);
- if (!output.empty()) {
- result.errorMessage += "\nOutput: " + output;
- }
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Check if output file was created
- if (!fs::exists(request.outputPath)) {
- result.errorMessage = "Output file was not created: " + request.outputPath;
- result.status = GenerationStatus::FAILED;
- return result;
- }
- // Get converted file size
- try {
- auto convertedSize = fs::file_size(request.outputPath);
- result.convertedSize = formatFileSize(convertedSize);
- } catch (const std::exception& e) {
- result.convertedSize = "Unknown";
- }
- result.success = true;
- result.status = GenerationStatus::COMPLETED;
- result.outputPath = request.outputPath;
- std::cout << "Conversion completed successfully!" << std::endl;
- std::cout << " Original size: " << result.originalSize << std::endl;
- std::cout << " Converted size: " << result.convertedSize << std::endl;
- std::cout << " Time: " << result.conversionTime << "ms" << std::endl;
- // Trigger model rescan after successful conversion
- if (modelManager) {
- std::cout << "Triggering model rescan..." << std::endl;
- modelManager->scanModelsDirectory();
- }
- return result;
- }
- std::string formatFileSize(size_t bytes) {
- const char* units[] = {"B", "KB", "MB", "GB", "TB"};
- int unitIndex = 0;
- double size = static_cast<double>(bytes);
- while (size >= 1024.0 && unitIndex < 4) {
- size /= 1024.0;
- unitIndex++;
- }
- std::stringstream ss;
- ss << std::fixed << std::setprecision(2) << size << " " << units[unitIndex];
- return ss.str();
- }
- };
- GenerationQueue::GenerationQueue(ModelManager* modelManager, int maxConcurrentGenerations,
- const std::string& queueDir, const std::string& outputDir)
- : pImpl(std::make_unique<Impl>()) {
- pImpl->modelManager = modelManager;
- pImpl->maxConcurrentGenerations = maxConcurrentGenerations;
- pImpl->queueDir = queueDir;
- pImpl->outputDir = outputDir;
- std::cout << "GenerationQueue initialized" << std::endl;
- std::cout << " Max concurrent generations: " << maxConcurrentGenerations << std::endl;
- std::cout << " Queue directory: " << queueDir << std::endl;
- std::cout << " Output directory: " << outputDir << std::endl;
- // Load any existing jobs from disk
- pImpl->loadJobsFromDisk();
- }
- GenerationQueue::~GenerationQueue() {
- stop();
- }
- std::future<GenerationResult> GenerationQueue::enqueueRequest(const GenerationRequest& request) {
- std::cout << "Enqueuing generation request: " << request.id << std::endl;
- std::cout << " Prompt: " << request.prompt.substr(0, 100)
- << (request.prompt.length() > 100 ? "..." : "") << std::endl;
- std::cout << " Model: " << request.modelName << std::endl;
- std::cout << " Size: " << request.width << "x" << request.height << std::endl;
- std::cout << " Steps: " << request.steps << ", CFG: " << request.cfgScale << std::endl;
- // Create promise and future
- auto promise = std::make_shared<std::promise<GenerationResult>>();
- auto future = promise->get_future();
- // Store the promise
- {
- std::lock_guard<std::mutex> lock(pImpl->jobsMutex);
- pImpl->jobPromises[request.id] = std::move(*promise);
- }
- // Add to queue
- {
- std::lock_guard<std::mutex> lock(pImpl->queueMutex);
- // Create job info
- JobInfo jobInfo;
- jobInfo.id = request.id;
- jobInfo.type = JobType::GENERATION;
- jobInfo.status = GenerationStatus::QUEUED;
- jobInfo.prompt = request.prompt; // Store full prompt
- jobInfo.queuedTime = std::chrono::steady_clock::now();
- jobInfo.position = pImpl->requestQueue.size() + 1;
- // Store job info
- {
- std::lock_guard<std::mutex> jobsLock(pImpl->jobsMutex);
- pImpl->activeJobs[request.id] = jobInfo;
- }
- // Persist to disk
- pImpl->saveJobToFile(jobInfo);
- pImpl->requestQueue.push(request);
- pImpl->queueSize.store(pImpl->requestQueue.size());
- }
- // Notify worker thread
- pImpl->queueCondition.notify_one();
- return future;
- }
- std::future<HashResult> GenerationQueue::enqueueHashRequest(const HashRequest& request) {
- auto promise = std::make_shared<std::promise<HashResult>>();
- auto future = promise->get_future();
- std::unique_lock<std::mutex> lock(pImpl->queueMutex);
- // Create a generation request that acts as a placeholder for hash job
- GenerationRequest hashJobPlaceholder;
- hashJobPlaceholder.id = request.id;
- hashJobPlaceholder.prompt = "HASH_JOB"; // Special marker
- hashJobPlaceholder.modelName = request.modelNames.empty() ? "ALL_MODELS" : request.modelNames[0];
- // Store promise for retrieval later
- pImpl->hashPromises[request.id] = promise;
- pImpl->hashRequests[request.id] = request;
- pImpl->requestQueue.push(hashJobPlaceholder);
- pImpl->queueCondition.notify_one();
- std::cout << "Enqueued hash request: " << request.id << std::endl;
- return future;
- }
- std::future<ConversionResult> GenerationQueue::enqueueConversionRequest(const ConversionRequest& request) {
- auto promise = std::make_shared<std::promise<ConversionResult>>();
- auto future = promise->get_future();
- std::unique_lock<std::mutex> lock(pImpl->queueMutex);
- // Create a generation request that acts as a placeholder for conversion job
- GenerationRequest conversionJobPlaceholder;
- conversionJobPlaceholder.id = request.id;
- conversionJobPlaceholder.prompt = "CONVERSION_JOB"; // Special marker
- conversionJobPlaceholder.modelName = request.modelName;
- // Store promise for retrieval later
- pImpl->conversionPromises[request.id] = promise;
- pImpl->conversionRequests[request.id] = request;
- pImpl->requestQueue.push(conversionJobPlaceholder);
- pImpl->queueCondition.notify_one();
- std::cout << "Enqueued conversion request: " << request.id << " (model: " << request.modelName << ", type: " << request.quantizationType << ")" << std::endl;
- return future;
- }
- size_t GenerationQueue::getQueueSize() const {
- return pImpl->queueSize.load();
- }
- size_t GenerationQueue::getActiveGenerations() const {
- return pImpl->activeGenerations.load();
- }
- std::vector<JobInfo> GenerationQueue::getQueueStatus() const {
- std::vector<JobInfo> jobs;
- std::lock_guard<std::mutex> lock(pImpl->jobsMutex);
- jobs.reserve(pImpl->activeJobs.size());
- for (const auto& pair : pImpl->activeJobs) {
- jobs.push_back(pair.second);
- }
- // Sort by queued time, then by status
- std::sort(jobs.begin(), jobs.end(), [](const JobInfo& a, const JobInfo& b) {
- if (a.status != b.status) {
- return static_cast<int>(a.status) < static_cast<int>(b.status);
- }
- return a.queuedTime < b.queuedTime;
- });
- return jobs;
- }
- JobInfo GenerationQueue::getJobInfo(const std::string& jobId) const {
- std::lock_guard<std::mutex> lock(pImpl->jobsMutex);
- auto it = pImpl->activeJobs.find(jobId);
- if (it != pImpl->activeJobs.end()) {
- return it->second;
- }
- return JobInfo{}; // Return empty job info if not found
- }
- bool GenerationQueue::cancelJob(const std::string& jobId) {
- std::lock_guard<std::mutex> queueLock(pImpl->queueMutex);
- std::lock_guard<std::mutex> jobsLock(pImpl->jobsMutex);
- // Check if job is still queued
- std::queue<GenerationRequest> newQueue;
- bool found = false;
- while (!pImpl->requestQueue.empty()) {
- GenerationRequest request = pImpl->requestQueue.front();
- pImpl->requestQueue.pop();
- if (request.id == jobId) {
- found = true;
- // Update job status
- auto it = pImpl->activeJobs.find(jobId);
- if (it != pImpl->activeJobs.end()) {
- it->second.status = GenerationStatus::FAILED;
- it->second.endTime = std::chrono::steady_clock::now();
- }
- // Set promise with cancellation error
- auto promiseIt = pImpl->jobPromises.find(jobId);
- if (promiseIt != pImpl->jobPromises.end()) {
- GenerationResult result;
- result.requestId = jobId;
- result.success = false;
- result.errorMessage = "Job cancelled by user";
- result.generationTime = 0;
- promiseIt->second.set_value(result);
- pImpl->jobPromises.erase(promiseIt);
- }
- } else {
- newQueue.push(request);
- }
- }
- pImpl->requestQueue = newQueue;
- pImpl->queueSize.store(pImpl->requestQueue.size());
- return found;
- }
- void GenerationQueue::clearQueue() {
- std::cout << "Clearing generation queue" << std::endl;
- std::lock_guard<std::mutex> queueLock(pImpl->queueMutex);
- std::lock_guard<std::mutex> jobsLock(pImpl->jobsMutex);
- // Cancel all queued jobs
- while (!pImpl->requestQueue.empty()) {
- GenerationRequest request = pImpl->requestQueue.front();
- pImpl->requestQueue.pop();
- // Update job status
- auto it = pImpl->activeJobs.find(request.id);
- if (it != pImpl->activeJobs.end()) {
- it->second.status = GenerationStatus::FAILED;
- it->second.endTime = std::chrono::steady_clock::now();
- }
- // Set promise with cancellation error
- auto promiseIt = pImpl->jobPromises.find(request.id);
- if (promiseIt != pImpl->jobPromises.end()) {
- GenerationResult result;
- result.requestId = request.id;
- result.success = false;
- result.errorMessage = "Queue cleared";
- result.generationTime = 0;
- promiseIt->second.set_value(result);
- pImpl->jobPromises.erase(promiseIt);
- }
- }
- pImpl->queueSize.store(0);
- }
- void GenerationQueue::start() {
- if (pImpl->running.load()) {
- std::cout << "GenerationQueue is already running" << std::endl;
- return;
- }
- pImpl->running.store(true);
- pImpl->stopRequested.store(false);
- pImpl->workerThread = std::thread(&Impl::workerThreadFunction, pImpl.get());
- std::cout << "GenerationQueue started" << std::endl;
- }
- void GenerationQueue::stop() {
- if (!pImpl->running.load()) {
- return;
- }
- std::cout << "Stopping GenerationQueue..." << std::endl;
- pImpl->stopRequested.store(true);
- pImpl->queueCondition.notify_all();
- if (pImpl->workerThread.joinable()) {
- pImpl->workerThread.join();
- }
- pImpl->running.store(false);
- // Clear any remaining promises
- std::lock_guard<std::mutex> lock(pImpl->jobsMutex);
- for (auto& pair : pImpl->jobPromises) {
- GenerationResult result;
- result.requestId = pair.first;
- result.success = false;
- result.errorMessage = "Queue stopped";
- result.generationTime = 0;
- pair.second.set_value(result);
- }
- pImpl->jobPromises.clear();
- std::cout << "GenerationQueue stopped" << std::endl;
- }
- bool GenerationQueue::isRunning() const {
- return pImpl->running.load();
- }
- void GenerationQueue::setMaxConcurrentGenerations(int maxConcurrent) {
- pImpl->maxConcurrentGenerations = maxConcurrent;
- std::cout << "GenerationQueue max concurrent generations set to: " << maxConcurrent << std::endl;
- }
|