from fastapi import FastAPI, Request, HTTPException from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates import uvicorn import bambulabs_api as bl import os import json from datetime import datetime, timedelta from typing import Dict, Optional from zoneinfo import ZoneInfo import time import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = FastAPI(title="Printer Manager") class PrinterInstance: def __init__(self, config: dict): self.id = config['id'] self.name = config['name'] self.connected = False self.printer = bl.Printer( config['ip'], config['access_code'], config['serial'] ) self.status = {} self.target_temps = { 'bed': 0, 'nozzle': 0 } self.last_status_update = 0 # Track when we last updated status # Dictionary to store printer instances printers: Dict[str, PrinterInstance] = {} # Load configuration from JSON file try: config_path = os.path.join(os.path.dirname(__file__), 'config.json') logger.info(f"Loading config from: {config_path}") with open(config_path, 'r') as f: config = json.load(f) logger.info(f"Found {len(config['printers'])} printers in config") for printer_config in config['printers']: logger.info(f"Loading printer: {printer_config['name']} (ID: {printer_config['id']})") printers[printer_config['id']] = PrinterInstance(printer_config) logger.info(f"Successfully loaded {len(printers)} printers") except (FileNotFoundError, json.JSONDecodeError, KeyError) as e: logger.error(f"Error loading configuration: {str(e)}") printers = {} # Serve /static/* app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") # track connection state in-memory for UI (simple flag) connectionState = False @app.get("/", response_class=HTMLResponse) def home(request: Request): return templates.TemplateResponse("index.html", { "request": request, "printers": printers }) @app.post("/connect-all", response_class=HTMLResponse) async def connect_all(request: Request): # Try to connect all printers for printer_instance in printers.values(): try: if not printer_instance.connected: success = printer_instance.printer.mqtt_start() if success: printer_instance.connected = True except Exception: continue # Skip failed connections and continue with others # Render the updated container contents html_content = templates.get_template("_printer_grid.html").render( request=request, printers=printers ) return HTMLResponse(content=html_content) @app.post("/connect/{printer_id}", response_class=HTMLResponse) async def connect(request: Request, printer_id: str): if printer_id not in printers: return templates.TemplateResponse( "_printer_card.html", { "request": request, "printer": None, "error": "Printer not found" } ) printer_instance = printers[printer_id] try: logger.info(f"Attempting to connect printer {printer_id}") # First attempt logger.debug(f"Making first connection attempt for {printer_id}") first_attempt = printer_instance.printer.mqtt_start() logger.debug(f"First connection attempt result: {first_attempt}") # Delay between attempts time.sleep(1.0) # Second attempt (known workaround for API behavior) logger.debug(f"Making second connection attempt for {printer_id}") success = printer_instance.printer.mqtt_start() logger.debug(f"Second connection attempt result: {success}") if not success: raise Exception("Failed to establish MQTT connection") printer_instance.connected = True logger.info(f"Successfully connected printer {printer_id}") except Exception as e: printer_instance.connected = False return templates.TemplateResponse( "_printer_card.html", { "request": request, "printer": printer_instance, "error": f"Failed to connect: {str(e)}" } ) # Return the entire printer card to update the UI return templates.TemplateResponse("_printer_card.html", { "request": request, "printer": printer_instance }) @app.post("/disconnect/{printer_id}", response_class=HTMLResponse) async def disconnect(request: Request, printer_id: str): if printer_id not in printers: return templates.TemplateResponse( "_printer_card.html", { "request": request, "printer": None, "error": "Printer not found" } ) printer_instance = printers[printer_id] try: logger.info(f"Stopping MQTT for printer {printer_id}") printer_instance.printer.mqtt_stop() time.sleep(0.5) # Give it time to fully disconnect printer_instance.connected = False logger.info(f"Successfully disconnected printer {printer_id}") except Exception as e: logger.error(f"Error during disconnect: {e}") printer_instance.connected = False # Return the entire printer card to update the UI return templates.TemplateResponse("_printer_card.html", { "request": request, "printer": printer_instance }) @app.post("/set-temperature/{printer_id}", response_class=HTMLResponse) async def set_temperature(request: Request, printer_id: str): if printer_id not in printers: print(f"Printer {printer_id} not found") return HTMLResponse(content="Printer not found", status_code=404) printer_instance = printers[printer_id] form_data = await request.form() temp_type = form_data.get("type") temp_value = form_data.get("value") print(f"Setting {temp_type} temperature to {temp_value} for printer {printer_id}") try: temp_value = float(temp_value) success = False if temp_type == "nozzle": success = printer_instance.printer.set_nozzle_temperature(temp_value) if success: printer_instance.target_temps['nozzle'] = int(temp_value) logger.info(f"Set nozzle temp to {temp_value}: {'success' if success else 'failed'}") elif temp_type == "bed": success = printer_instance.printer.set_bed_temperature(temp_value) if success: printer_instance.target_temps['bed'] = int(temp_value) logger.info(f"Set bed temp to {temp_value}: {'success' if success else 'failed'}") if not success: error_msg = f"Failed to set {temp_type} temperature to {temp_value}" logger.error(error_msg) return HTMLResponse( content=error_msg, status_code=400, headers={"HX-Trigger": f"showMessage:Failed to set temperature"} ) return HTMLResponse(content=str(temp_value)) except Exception as e: print(f"Error setting temperature: {str(e)}") return HTMLResponse( content=f"Error: {str(e)}", status_code=400, headers={"HX-Trigger": f"showMessage:{str(e)}"} ) @app.post("/status/{printer_id}", response_class=HTMLResponse) async def status(request: Request, printer_id: str): if printer_id not in printers: return templates.TemplateResponse( "_status.html", { "request": request, "printer": None, "status": {"error": "Printer not found"} } ) printer_instance = printers[printer_id] if not printer_instance.connected: # Try to reconnect if disconnected try: success = printer_instance.printer.mqtt_start() if success: printer_instance.connected = True else: return templates.TemplateResponse( "_status.html", { "request": request, "printer": printer_instance, "status": {"error": "Printer disconnected. Try reconnecting."} } ) except Exception: return templates.TemplateResponse( "_status.html", { "request": request, "printer": printer_instance, "status": {"error": "Printer disconnected. Try reconnecting."} } ) try: current_time = time.time() # If it's been less than 0.8 seconds since last update, return cached status if current_time - printer_instance.last_status_update < 0.8: return templates.TemplateResponse("_status.html", { "request": request, "printer": printer_instance, "status": printer_instance.status }) printer_instance.last_status_update = current_time status = {} # Add debug info status['debug_info'] = { 'id': printer_instance.id, 'name': printer_instance.name, 'connected': printer_instance.connected } # Get printer status and format temperatures as integers print(f"Getting status for printer {printer_instance.id}") try: bed_temp = printer_instance.printer.get_bed_temperature() logger.debug(f"Bed temp raw: {bed_temp}") status['bed_temp'] = int(float(bed_temp)) if bed_temp is not None else 0 except Exception as e: logger.warning(f"Error getting bed temp: {e}") status['bed_temp'] = 0 status['bed_target'] = printer_instance.target_temps['bed'] try: nozzle_temp = printer_instance.printer.get_nozzle_temperature() logger.debug(f"Nozzle temp raw: {nozzle_temp}") status['nozzle_temp'] = int(float(nozzle_temp)) if nozzle_temp is not None else 0 except Exception as e: logger.warning(f"Error getting nozzle temp: {e}") status['nozzle_temp'] = 0 status['nozzle_target'] = printer_instance.target_temps['nozzle'] try: chamber_temp = printer_instance.printer.get_chamber_temperature() logger.debug(f"Chamber temp raw: {chamber_temp}") status['chamber_temp'] = int(float(chamber_temp)) if chamber_temp is not None else 0 except Exception as e: logger.warning(f"Error getting chamber temp: {e}") status['chamber_temp'] = 0 try: percentage = printer_instance.printer.get_percentage() logger.debug(f"Progress percentage: {percentage}") status['percentage'] = percentage if percentage is not None else "0%" except Exception as e: logger.warning(f"Error getting percentage: {e}") status['percentage'] = "0%" try: remaining_time = printer_instance.printer.get_time() logger.debug(f"Remaining time: {remaining_time}") except Exception as e: logger.warning(f"Error getting time: {e}") remaining_time = None try: current = printer_instance.printer.get_file_name() logger.debug(f"Current file: {current}") except Exception as e: logger.warning(f"Error getting file name: {e}") current = None except Exception as e: printer_instance.connected = False # Mark as disconnected on error return templates.TemplateResponse( "_status.html", { "request": request, "printer": printer_instance, "status": {"error": f"Error getting status: {str(e)}"} } ) # Set current file and time info based on remaining time if remaining_time is None or int(remaining_time) == 0: status['current_file'] = 'N/A' status['finish_time'] = 'N/A' status['remaining_time'] = 'N/A' else: # Normalize current file: remove /data/Metadata/ prefix or use basename current_file = str(current or '') if current_file.startswith('/data/Metadata/'): current_file = current_file[len('/data/Metadata/'):] else: current_file = os.path.basename(current_file) status['current_file'] = current_file or 'N/A' # Set time information with proper timezone handling local_tz = ZoneInfo('localtime') now = datetime.now(local_tz) finish_time = now + timedelta(seconds=int(remaining_time)) status['finish_time'] = finish_time.strftime("%I:%M %p") # Show time in 12-hour format with AM/PM if finish_time.date() > now.date(): status['finish_time'] = finish_time.strftime("%Y-%m-%d %I:%M %p") # Include date if not today status['remaining_time'] = str(timedelta(seconds=int(remaining_time))) # stringify values for k in status: try: status[k] = str(status[k]) except Exception: status[k] = 'N/A' printer_instance.status = status return templates.TemplateResponse("_status.html", { "request": request, "printer": printer_instance, "status": status }) if __name__ == "__main__": # Dev convenience; in Docker we launch via CMD uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)