""" ParcelGen Backend v4 - Land use zones (Residential Low/Med/High, Commercial, Mixed, Industrial, Open Space) - Road types (Primary, Secondary, Local, Lane) with per-type widths - Min/max parcel area validation before running - Splay (corner radius) converted correctly from display units - Multi-user safe (stateless, no globals) - Session-aware progress endpoint (SSE) - Better boundary adherence via edge-touching strip layout - Correct terminology: frontage (road-facing width), plot depth (perpendicular) - User roads + auto-fill roads combined (not either/or) - Existing building footprints respected: carved from buildable area, containing parcel flagged status=built - Perimeter access road always present - Access enforcement pass """ from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import List, Optional, Dict, Any, Tuple import math, uuid, traceback, asyncio, json from shapely.geometry import ( Polygon, MultiPolygon, LineString, MultiLineString, Point, GeometryCollection, mapping, shape ) from shapely.ops import unary_union from shapely.affinity import rotate from shapely.validation import make_valid app = FastAPI(title="ParcelGen", version="4.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ─── Land use zone definitions ──────────────────────────────────────────────── LAND_USE_PRESETS = { "residential_low": {"label": "Residential – Low Density", "min_frontage": 15, "min_depth": 30, "min_area": 450, "max_area": 2000, "color": "#58a6ff"}, "residential_medium": {"label": "Residential – Medium Density", "min_frontage": 10, "min_depth": 25, "min_area": 250, "max_area": 1000, "color": "#3fb950"}, "residential_high": {"label": "Residential – High Density", "min_frontage": 6, "min_depth": 20, "min_area": 120, "max_area": 500, "color": "#f0c84a"}, "commercial": {"label": "Commercial", "min_frontage": 12, "min_depth": 20, "min_area": 300, "max_area": 5000, "color": "#f97316"}, "mixed_use": {"label": "Mixed Use", "min_frontage": 8, "min_depth": 20, "min_area": 200, "max_area": 2000, "color": "#bc8cff"}, "industrial": {"label": "Industrial", "min_frontage": 20, "min_depth": 40, "min_area": 1000, "max_area": 20000,"color": "#f78166"}, "open_space": {"label": "Open Space / Park", "min_frontage": 30, "min_depth": 30, "min_area": 500, "max_area": 50000,"color": "#34d399"}, } # ─── Road type definitions ───────────────────────────────────────────────────── ROAD_TYPE_DEFAULTS = { "primary": {"label": "Primary Road", "width": 20, "color": "#f78166"}, "secondary": {"label": "Secondary Road", "width": 14, "color": "#f0c84a"}, "local": {"label": "Local Road", "width": 9, "color": "#8b949e"}, "lane": {"label": "Lane / Alley", "width": 4, "color": "#58a6ff"}, } # ─── Models ─────────────────────────────────────────────────────────────────── class RoadInput(BaseModel): geometry: Dict[str, Any] road_type: str = "local" # primary / secondary / local / lane width: Optional[float] = None # overrides type default if set label: Optional[str] = None class ZoneInput(BaseModel): geometry: Dict[str, Any] land_use: str = "residential_medium" label: Optional[str] = None class SubdivisionConfig(BaseModel): # Parcel dimensions min_frontage: float = 12.0 min_plot_depth: float = 25.0 # renamed from min_depth for clarity min_area: Optional[float] = None # auto from land_use if None max_area: Optional[float] = None # 0 = unlimited # Roads default_road_width: float = 9.0 splay_radius: float = 3.0 # was corner_radius; splay is the correct survey term max_block_length: float = 120.0 allow_culdesac: bool = True # Land use land_use: str = "residential_medium" # Numbering parcel_prefix: str = "P" zone_prefix: str = "Z" start_number: int = 1 def model_post_init(self, _): preset = LAND_USE_PRESETS.get(self.land_use, LAND_USE_PRESETS["residential_medium"]) if self.min_area is None: self.min_area = preset["min_area"] if self.max_area is None: self.max_area = preset["max_area"] class SubdivisionRequest(BaseModel): boundary: Dict[str, Any] roads: Optional[List[RoadInput]] = [] existing_features: Optional[List[Dict[str, Any]]] = [] zones: Optional[List[ZoneInput]] = [] config: Optional[SubdivisionConfig] = None session_id: Optional[str] = None # for progress tracking # In-memory progress store (per session_id, cleared after delivery) _progress: Dict[str, Dict] = {} # ─── Geometry helpers ───────────────────────────────────────────────────────── def valid(geom): if geom is None or geom.is_empty: return geom if not geom.is_valid: geom = make_valid(geom) return geom def extract_geometry(obj: Dict) -> Any: if obj is None: raise ValueError("Null geometry") geo = obj.get("geometry", obj) if obj.get("type") == "Feature" else obj if geo is None or not geo.get("type"): raise ValueError(f"Cannot extract geometry from: {list(obj.keys())}") return shape(geo) def flatten_polygons(geom) -> List[Polygon]: if geom is None or geom.is_empty: return [] if isinstance(geom, Polygon): return [geom] if isinstance(geom, (MultiPolygon, GeometryCollection)): out = [] for g in geom.geoms: out.extend(flatten_polygons(g)) return out return [] def obb(polygon: Polygon) -> Tuple[float, float, float]: """Oriented bounding box → (short, long, angle_deg).""" mbr = polygon.minimum_rotated_rectangle coords = list(mbr.exterior.coords) e1 = math.hypot(coords[1][0]-coords[0][0], coords[1][1]-coords[0][1]) e2 = math.hypot(coords[2][0]-coords[1][0], coords[2][1]-coords[1][1]) ang = math.degrees(math.atan2(coords[1][1]-coords[0][1], coords[1][0]-coords[0][0])) return (min(e1,e2), max(e1,e2), ang) def estimate_scale(boundary: Polygon) -> float: lat = boundary.centroid.y return (111_320.0 + 111_320.0 * math.cos(math.radians(lat))) / 2.0 def deg(metres: float, sc: float) -> float: return metres / sc def buffer_line(line, width_deg: float) -> Polygon: return line.buffer(width_deg / 2, cap_style=2, join_style=2) # ─── Road network ───────────────────────────────────────────────────────────── def build_road_network( boundary: Polygon, road_inputs: List[RoadInput], config: SubdivisionConfig, sc: float ) -> Tuple[List[Dict], Polygon]: """Returns (road_feature_dicts, road_union_polygon).""" road_features = [] # dicts with geometry + metadata road_polys = [] default_rw = deg(config.default_road_width, sc) # ── User roads ──────────────────────────────────────────────────────────── for ri in road_inputs: try: line = extract_geometry(ri.geometry if isinstance(ri, RoadInput) else ri) except Exception: continue if not isinstance(line, (LineString, MultiLineString)): continue lines = list(line.geoms) if isinstance(line, MultiLineString) else [line] # Determine width for this road rtype = (ri.road_type if isinstance(ri, RoadInput) else "local") rwidth_m = (ri.width if isinstance(ri, RoadInput) and ri.width else ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["width"]) rw = deg(rwidth_m, sc) color = ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["color"] for line_part in lines: cl = valid(line_part.intersection(boundary)) if cl and not cl.is_empty: rp = valid(buffer_line(cl, rw).intersection(boundary)) if rp and not rp.is_empty: road_polys.append(rp) road_features.append({ "type": "Feature", "geometry": mapping(rp), "properties": { "type": "road_surface", "road_type": rtype, "width_m": rwidth_m, "label": (ri.label if isinstance(ri, RoadInput) else None) or f"{rtype.capitalize()} Road", "color": color, } }) # ── Auto-fill grid roads ────────────────────────────────────────────────── existing_union = valid(unary_union(road_polys)) if road_polys else Polygon() bl = deg(config.max_block_length, sc) rw = default_rw step = bl + rw minx, miny, maxx, maxy = boundary.bounds site_w, site_h = maxx - minx, maxy - miny def try_add_auto(line: LineString): cl = valid(line.intersection(boundary)) if not cl or cl.is_empty or cl.length < deg(config.min_frontage * 2, sc): return rp = valid(buffer_line(cl, rw).intersection(boundary)) if not rp or rp.is_empty: return if not existing_union.is_empty and rp.intersection(existing_union).area / rp.area > 0.6: return road_polys.append(rp) road_features.append({ "type": "Feature", "geometry": mapping(rp), "properties": {"type": "road_surface", "road_type": "local", "width_m": config.default_road_width, "label": "Local Road (auto)", "color": ROAD_TYPE_DEFAULTS["local"]["color"]} }) xs = [] x = minx + step while x < maxx - rw: xs.append(x); x += step if not xs and site_w > deg(config.min_plot_depth * 2 + config.default_road_width, sc): xs = [minx + site_w / 2] for x in xs: try_add_auto(LineString([(x, miny - 1), (x, maxy + 1)])) ys = [] y = miny + step while y < maxy - rw: ys.append(y); y += step if not ys and site_h > deg(config.min_plot_depth * 2 + config.default_road_width, sc): ys = [miny + site_h / 2] for y in ys: try_add_auto(LineString([(minx - 1, y), (maxx + 1, y)])) # ── Perimeter access ring ───────────────────────────────────────────────── perim_w = deg(config.default_road_width * 0.5, sc) inner = valid(boundary.buffer(-perim_w)) if inner and not inner.is_empty and inner.area > 0: perim = valid(boundary.difference(inner)) if perim and not perim.is_empty: road_polys.append(perim) # Don't add to road_features (perimeter ring is structural, not displayed) road_union = valid(unary_union(road_polys)) if road_polys else Polygon() return road_features, road_union # ─── Cul-de-sac ─────────────────────────────────────────────────────────────── def generate_culdesacs(blocks, road_union, config, sc): cds_polys, new_blocks = [], [] bl = deg(config.max_block_length, sc) rw = deg(config.default_road_width, sc) for block in blocks: _, h, _ = obb(block) if h > bl and config.allow_culdesac: cds, trimmed = _try_culdesac(block, rw, config, sc) if cds and trimmed and not trimmed.is_empty: cds_polys.append(cds); new_blocks.append(trimmed); continue new_blocks.append(block) return cds_polys, new_blocks def _try_culdesac(block, rw_deg, config, sc): cx, _ = block.centroid.x, block.centroid.y minx, miny, _, _ = block.bounds radius = deg(config.default_road_width * 1.8, sc) stub_len = deg(config.max_block_length * 0.45, sc) stub = LineString([(cx, miny), (cx, miny + stub_len)]).intersection(block) if stub.is_empty or stub.length < rw_deg: return None, None stub_road = valid(buffer_line(stub, rw_deg).intersection(block)) end_pt = list(stub.coords)[-1] circle = valid(Point(end_pt).buffer(radius).intersection(block)) cds_area = valid(unary_union([stub_road, circle]).intersection(block)) trimmed = valid(block.difference(cds_area)) if trimmed.is_empty or trimmed.area * sc**2 < config.min_area: return None, None return cds_area, trimmed # ─── Block subdivider ───────────────────────────────────────────────────────── def subdivide_block(block, config, road_union, block_id, start_num, sc, zone_label=None, zone_color=None): min_f = deg(config.min_frontage, sc) min_d = deg(config.min_plot_depth, sc) min_a = config.min_area / sc**2 max_a = (config.max_area or 1e18) / sc**2 _, _, angle = obb(block) centroid = block.centroid rotated = valid(rotate(block, -angle, origin=centroid)) rminx, rminy, rmaxx, rmaxy = rotated.bounds bw, bh = rmaxx - rminx, rmaxy - rminy num = start_num parcels = [] def emit(x0, y0, x1, y1): nonlocal num p = valid(Polygon([(x0,y0),(x1,y0),(x1,y1),(x0,y1)]).intersection(rotated)) if p is None or p.is_empty or p.area < min_a * 0.5: return p = valid(rotate(p, angle, origin=centroid)) feats = _make_parcel(p, config, block_id, num, sc, road_union, zone_label, zone_color, start_num) parcels.extend(feats) num += 1 if bw >= bh: n_cols = max(1, round(bw / min_f)) col_w = bw / n_cols rows = ([(rminy, rminy + bh/2), (rminy + bh/2, rmaxy)] if bh >= min_d * 2 else [(rminy, rmaxy)]) for y0, y1 in rows: for c in range(n_cols): x0 = rminx + c * col_w emit(x0, y0, x0 + col_w, y1) else: n_rows = max(1, round(bh / min_f)) row_h = bh / n_rows cols = ([(rminx, rminx + bw/2), (rminx + bw/2, rmaxx)] if bw >= min_d * 2 else [(rminx, rmaxx)]) for x0, x1 in cols: for r in range(n_rows): y0 = rminy + r * row_h emit(x0, y0, x1, y0 + row_h) return parcels def _make_parcel(geom, config, block_id, num, sc, road_union, zone_label, zone_color, base): out = [] parts = flatten_polygons(geom) if not isinstance(geom, Polygon) else [geom] tol = deg(0.5, sc) preset = LAND_USE_PRESETS.get(config.land_use, LAND_USE_PRESETS["residential_medium"]) for g in parts: if g.is_empty: continue area_m2 = round(g.area * sc**2, 1) if area_m2 < config.min_area * 0.4: continue pw, ph, _ = obb(g) frontage_m = round(min(pw, ph) * sc, 1) plot_depth_m = round(max(pw, ph) * sc, 1) has_access = (road_union is not None and not road_union.is_empty and g.buffer(tol).intersects(road_union)) max_a = config.max_area or 1e18 parcel_id = f"{config.parcel_prefix}{num:04d}" out.append({ "type": "Feature", "geometry": mapping(g), "properties": { "parcel_id": parcel_id, "parcel_num": num, "block_id": block_id, "area_m2": area_m2, "area_ha": round(area_m2 / 10000, 4), "frontage_m": frontage_m, # road-facing width "plot_depth_m": plot_depth_m, # perpendicular dimension "address": f"Block {block_id}, Plot {num}", "land_use": config.land_use, "land_use_label": zone_label or preset["label"], "zone_color": zone_color or preset["color"], "zone": zone_label or preset["label"], "status": "vacant", "has_access": has_access, "frontage_ok": frontage_m >= config.min_frontage * 0.85, "area_ok": config.min_area * 0.8 <= area_m2 <= max_a * 1.2, "within_max_area": area_m2 <= max_a * 1.1, } }) return out # ─── Quality passes ─────────────────────────────────────────────────────────── def absorb_bad_parcels(parcels, config, sc): tol = deg(0.5, sc) changed = True while changed: changed = False geoms = [shape(p["geometry"]) for p in parcels] for i, p in enumerate(parcels): g = geoms[i]; am2 = g.area * sc**2 w, h, _ = obb(g) compact = 4 * math.pi * g.area / (g.length**2) if g.length else 0 max_a = config.max_area or 1e18 ok = am2 >= config.min_area * 0.8 and compact >= 0.18 and (h == 0 or w/h >= 0.12) if ok: continue best_j, best_touch = -1, 0.0 for j, q in enumerate(parcels): if j == i: continue if p["properties"]["block_id"] != q["properties"]["block_id"]: continue touch = g.buffer(tol).intersection(geoms[j]).area if touch > best_touch: best_touch, best_j = touch, j if best_j >= 0 and best_touch > 0: merged = valid(g.union(geoms[best_j])) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2_new = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) acc = p["properties"].get("has_access") or parcels[best_j]["properties"].get("has_access") parcels[best_j]["geometry"] = mapping(merged) parcels[best_j]["properties"].update({ "area_m2": am2_new, "frontage_m": round(min(pw,ph)*sc,1), "plot_depth_m": round(max(pw,ph)*sc,1), "has_access": acc, "area_ok": config.min_area*0.8 <= am2_new <= (config.max_area or 1e18)*1.2, }) geoms[best_j] = merged; parcels.pop(i); changed = True; break return parcels def enforce_access(parcels, road_union, config, sc): if not road_union or road_union.is_empty: return parcels tol = deg(1.0, sc) changed = True while changed: changed = False geoms = [shape(p["geometry"]) for p in parcels] for i, p in enumerate(parcels): if p["properties"].get("has_access"): continue g = geoms[i]; best_j, best_shared = -1, 0.0 for j, q in enumerate(parcels): if j == i: continue if p["properties"]["block_id"] != q["properties"]["block_id"]: continue if not q["properties"].get("has_access"): continue shared = g.buffer(tol).intersection(geoms[j]).area if shared > best_shared: best_shared, best_j = shared, j if best_j >= 0: merged = valid(g.union(geoms[best_j])) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2 = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) parcels[best_j]["geometry"] = mapping(merged) parcels[best_j]["properties"].update({ "area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1), "plot_depth_m": round(max(pw,ph)*sc,1), "has_access": True, "area_ok": config.min_area*0.8 <= am2 <= (config.max_area or 1e18)*1.2, }) geoms[best_j] = merged; parcels.pop(i); changed = True; break for p in parcels: g = shape(p["geometry"]) p["properties"]["has_access"] = not road_union.is_empty and g.buffer(tol).intersects(road_union) return parcels def apply_existing_buildings(building_geoms, parcels, sc, config): if not building_geoms or not parcels: return parcels tol = deg(0.5, sc) for bldg in building_geoms: bldg_buf = bldg.buffer(tol) hit = [i for i, p in enumerate(parcels) if shape(p["geometry"]).intersects(bldg_buf)] if not hit: continue while len(hit) > 1: i, j = hit[0], hit[1] gi, gj = shape(parcels[i]["geometry"]), shape(parcels[j]["geometry"]) merged = valid(gi.union(gj)) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2 = round(merged.area * sc**2, 1); pw, ph, _ = obb(merged) parcels[i]["geometry"] = mapping(merged) parcels[i]["properties"].update({ "area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1), "plot_depth_m": round(max(pw,ph)*sc,1), "has_access": parcels[i]["properties"].get("has_access") or parcels[j]["properties"].get("has_access"), }) parcels.pop(j) hit = [hit[0]] + [k-(1 if k>j else 0) for k in hit[2:]] parcels[hit[0]]["properties"]["status"] = "built" parcels[hit[0]]["properties"]["building_area_m2"] = round(bldg.area * sc**2, 1) return parcels # ─── Validation ─────────────────────────────────────────────────────────────── def validate_config(boundary: Polygon, config: SubdivisionConfig, sc: float): """Return list of warning strings (empty = OK to run).""" site_area = boundary.area * sc**2 warnings = [] if config.min_area > site_area * 0.5: warnings.append(f"min_area ({config.min_area} m²) is more than half the site area ({site_area:.0f} m²) — very few parcels will fit") if config.max_area and config.max_area < config.min_area: warnings.append(f"max_area ({config.max_area} m²) is less than min_area ({config.min_area} m²)") if config.min_frontage > config.min_plot_depth: warnings.append("min_frontage is larger than min_plot_depth — parcels will be wider than deep") if config.default_road_width >= config.min_frontage * 2: warnings.append("Road width is very large relative to parcel frontage — most area will be road") min_site_for_parcels = (config.min_area + config.default_road_width * 20) * 4 if site_area < min_site_for_parcels: warnings.append(f"Site area ({site_area:.0f} m²) may be too small to fit meaningful parcels with the current parameters") return warnings # ─── Main endpoint ──────────────────────────────────────────────────────────── @app.post("/subdivide") async def subdivide(request: SubdivisionRequest): session_id = request.session_id or str(uuid.uuid4()) _progress[session_id] = {"pct": 0, "msg": "Starting…"} def prog(pct, msg): _progress[session_id] = {"pct": pct, "msg": msg} try: config = request.config or SubdivisionConfig() prog(5, "Parsing boundary…") boundary = valid(extract_geometry(request.boundary)) if isinstance(boundary, MultiPolygon): boundary = max(boundary.geoms, key=lambda g: g.area) if not isinstance(boundary, Polygon): raise HTTPException(400, "Boundary must be a polygon") sc = estimate_scale(boundary) # Validate before running warnings = validate_config(boundary, config, sc) prog(10, "Parsing roads and features…") # Parse user roads (accept both RoadInput dicts and legacy plain dicts) road_inputs: List[RoadInput] = [] for r in (request.roads or []): if isinstance(r, dict): try: road_inputs.append(RoadInput(**r)) except Exception: try: road_inputs.append(RoadInput(geometry=r)) except Exception: pass else: road_inputs.append(r) # Parse buildings building_geoms = [] for f in (request.existing_features or []): try: g = valid(extract_geometry(f).intersection(boundary)) building_geoms.extend(flatten_polygons(g)) except Exception: pass prog(20, "Building road network…") road_features, road_union = build_road_network(boundary, road_inputs, config, sc) # Carve obstacles prog(30, "Computing buildable area…") obstacles = road_union if building_geoms: bldg_union = valid(unary_union(building_geoms)) obstacles = valid(unary_union([road_union, bldg_union])) buildable = valid(boundary.difference(obstacles)) if not obstacles.is_empty else boundary blocks = [b for b in flatten_polygons(buildable) if b.area * sc**2 > config.min_area] prog(40, "Generating cul-de-sacs…") cds_polys = [] if config.allow_culdesac and blocks: cds_polys, blocks = generate_culdesacs(blocks, road_union, config, sc) if cds_polys: road_union = valid(unary_union([road_union] + cds_polys)) prog(55, "Subdividing blocks into parcels…") all_parcels = [] num = config.start_number for bid, block in enumerate(blocks, 1): bp = subdivide_block(block, config, road_union, bid, num, sc) all_parcels.extend(bp) num += len(bp) prog(55 + int(20 * (bid / max(len(blocks), 1))), f"Subdividing block {bid}/{len(blocks)}…") prog(78, "Quality control…") all_parcels = absorb_bad_parcels(all_parcels, config, sc) prog(85, "Enforcing road access…") all_parcels = enforce_access(all_parcels, road_union, config, sc) prog(90, "Applying existing buildings…") if building_geoms: all_parcels = apply_existing_buildings(building_geoms, all_parcels, sc, config) prog(95, "Numbering and finalising…") for i, p in enumerate(all_parcels, config.start_number): p["properties"]["parcel_num"] = i p["properties"]["parcel_id"] = f"{config.parcel_prefix}{i:04d}" # Build response block_features = [ {"type":"Feature","geometry":mapping(b), "properties":{"block_id":i+1,"area_m2":round(b.area*sc**2,1)}} for i, b in enumerate(blocks) ] cds_features = [ {"type":"Feature","geometry":mapping(c), "properties":{"type":"culdesac","id":f"CDS{i+1:03d}"}} for i, c in enumerate(cds_polys) ] areas = [p["properties"]["area_m2"] for p in all_parcels] no_acc = sum(1 for p in all_parcels if not p["properties"].get("has_access")) built = sum(1 for p in all_parcels if p["properties"].get("status") == "built") stats = { "total_parcels": len(all_parcels), "total_blocks": len(blocks), "total_roads": len(road_features), "culdesacs": len(cds_polys), "parcels_no_access": no_acc, "parcels_built": built, "parcels_vacant": len(all_parcels) - built, "boundary_area_m2": round(boundary.area * sc**2, 1), "road_area_m2": round(road_union.area * sc**2, 1) if not road_union.is_empty else 0, "buildable_area_m2": round(sum(b.area * sc**2 for b in blocks), 1), "avg_parcel_area_m2": round(sum(areas)/len(areas), 1) if areas else 0, "min_parcel_area_m2": round(min(areas), 1) if areas else 0, "max_parcel_area_m2": round(max(areas), 1) if areas else 0, "existing_buildings": len(building_geoms), "user_roads_drawn": len(road_inputs), "warnings": warnings, "land_use": config.land_use, "land_use_label": LAND_USE_PRESETS.get(config.land_use, {}).get("label", config.land_use), "config": { "min_frontage": config.min_frontage, "min_plot_depth": config.min_plot_depth, "min_area": config.min_area, "max_area": config.max_area, "default_road_width": config.default_road_width, "splay_radius": config.splay_radius, "max_block_length": config.max_block_length, } } prog(100, "Done") return { "parcels": all_parcels, "roads": road_features, "blocks": block_features, "culdesacs": cds_features, "stats": stats, "session_id": session_id, } except HTTPException: raise except Exception as e: prog(-1, f"Error: {e}") raise HTTPException(500, f"{e}\n{traceback.format_exc()}") finally: # Keep progress for a short while so client can read 100% async def _cleanup(): await asyncio.sleep(10) _progress.pop(session_id, None) asyncio.create_task(_cleanup()) @app.get("/progress/{session_id}") async def get_progress(session_id: str): return _progress.get(session_id, {"pct": -1, "msg": "Not found"}) @app.get("/land-use-presets") async def land_use_presets(): return LAND_USE_PRESETS @app.get("/road-type-defaults") async def road_type_defaults(): return ROAD_TYPE_DEFAULTS @app.get("/health") async def health(): return {"status": "ok", "version": "4.0.0"} @app.get("/config/defaults") async def get_defaults(): return SubdivisionConfig().model_dump()