116 lines
3.5 KiB
Python
116 lines
3.5 KiB
Python
from fastapi import FastAPI, Request
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
import uvicorn
|
|
import bambulabs_api as bl
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
|
|
app = FastAPI(title="X1C Connection")
|
|
|
|
IP = "10.0.2.10"
|
|
ACCESS_CODE = "e0f815a7"
|
|
SERIAL = "00M09A360300272"
|
|
|
|
printer = bl.Printer(IP, ACCESS_CODE, SERIAL)
|
|
|
|
# Serve /static/*
|
|
app.mount("/static", StaticFiles(directory="app/static"), name="static")
|
|
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
# track connection state in-memory for UI (simple flag)
|
|
connectionState = False
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def home(request: Request):
|
|
# Render page and pass current connection state
|
|
try:
|
|
connected = bool(connectionState)
|
|
except NameError:
|
|
connected = False
|
|
return templates.TemplateResponse("index.html", {"request": request, "connected": connected})
|
|
|
|
|
|
@app.post("/connect", response_class=HTMLResponse)
|
|
def connect(request: Request):
|
|
global connectionState
|
|
try:
|
|
printer.mqtt_start()
|
|
if(printer.mqtt_start()):
|
|
connectionState = True
|
|
except Exception:
|
|
connectionState = False
|
|
|
|
# Return the connect area so HTMX can swap the button immediately
|
|
return templates.TemplateResponse("_connect_area.html", {"request": request, "connected": connectionState})
|
|
|
|
|
|
@app.post("/disconnect", response_class=HTMLResponse)
|
|
def disconnect(request: Request):
|
|
global connectionState
|
|
try:
|
|
printer.mqtt_stop()
|
|
connectionState = False
|
|
except Exception:
|
|
connectionState = False
|
|
|
|
# Return the connect area so HTMX can swap the button immediately
|
|
return templates.TemplateResponse("_connect_area.html", {"request": request, "connected": connectionState})
|
|
|
|
|
|
@app.post("/status", response_class=HTMLResponse)
|
|
def status(request: Request):
|
|
|
|
|
|
try:
|
|
# Preferred direct calls (if the API exposes them)
|
|
status = {}
|
|
status['bed_temp'] = printer.get_bed_temperature()
|
|
status['nozzle_temp'] = printer.get_nozzle_temperature()
|
|
status['chamber_temp'] = printer.get_chamber_temperature()
|
|
status['percentage'] = printer.get_percentage()
|
|
remaining_time = printer.get_time()
|
|
current = printer.get_file_name()
|
|
except Exception as e:
|
|
return templates.TemplateResponse("_status.html", {"request": request, "status": {"error": str(e)}})
|
|
|
|
# Normalize current file: remove /data/Metadata/ prefix or use basename
|
|
current_file = str(current or '')
|
|
if current_file.startswith('/data/Metadata/'):
|
|
current_file = current_file[len('/data/Metadata/'):]
|
|
else:
|
|
current_file = os.path.basename(current_file)
|
|
status['current_file'] = current_file or 'N/A'
|
|
|
|
if remaining_time is not None:
|
|
finish_time = datetime.now() + timedelta(seconds=int(remaining_time))
|
|
status['finish_time'] = finish_time.strftime("%Y-%m-%d %H:%M:%S")
|
|
status['remaining_time'] = timedelta(seconds=int(remaining_time))
|
|
|
|
else:
|
|
status['finish_time'] = "NA"
|
|
status['remaining_time'] = "N/A"
|
|
|
|
|
|
|
|
|
|
# stringify values
|
|
for k in status:
|
|
try:
|
|
status[k] = str(status[k])
|
|
except Exception:
|
|
status[k] = 'N/A'
|
|
|
|
return templates.TemplateResponse("_status.html", {"request": request, "status": status})
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Dev convenience; in Docker we launch via CMD
|
|
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)
|
|
|