2026-04-30 20:46:07 +03:00

652 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ParcelGen Backend v4
- Land use zones (Residential Low/Med/High, Commercial, Mixed, Industrial, Open Space)
- Road types (Primary, Secondary, Local, Lane) with per-type widths
- Min/max parcel area validation before running
- Splay (corner radius) converted correctly from display units
- Multi-user safe (stateless, no globals)
- Session-aware progress endpoint (SSE)
- Better boundary adherence via edge-touching strip layout
- Correct terminology: frontage (road-facing width), plot depth (perpendicular)
- User roads + auto-fill roads combined (not either/or)
- Existing building footprints respected: carved from buildable area,
containing parcel flagged status=built
- Perimeter access road always present
- Access enforcement pass
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any, Tuple
import math, uuid, traceback, asyncio, json
from shapely.geometry import (
Polygon, MultiPolygon, LineString, MultiLineString,
Point, GeometryCollection, mapping, shape
)
from shapely.ops import unary_union
from shapely.affinity import rotate
from shapely.validation import make_valid
app = FastAPI(title="ParcelGen", version="4.0.0")
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"],
)
# ─── Land use zone definitions ────────────────────────────────────────────────
LAND_USE_PRESETS = {
"residential_low": {"label": "Residential Low Density", "min_frontage": 15, "min_depth": 30, "min_area": 450, "max_area": 2000, "color": "#58a6ff"},
"residential_medium": {"label": "Residential Medium Density", "min_frontage": 10, "min_depth": 25, "min_area": 250, "max_area": 1000, "color": "#3fb950"},
"residential_high": {"label": "Residential High Density", "min_frontage": 6, "min_depth": 20, "min_area": 120, "max_area": 500, "color": "#f0c84a"},
"commercial": {"label": "Commercial", "min_frontage": 12, "min_depth": 20, "min_area": 300, "max_area": 5000, "color": "#f97316"},
"mixed_use": {"label": "Mixed Use", "min_frontage": 8, "min_depth": 20, "min_area": 200, "max_area": 2000, "color": "#bc8cff"},
"industrial": {"label": "Industrial", "min_frontage": 20, "min_depth": 40, "min_area": 1000, "max_area": 20000,"color": "#f78166"},
"open_space": {"label": "Open Space / Park", "min_frontage": 30, "min_depth": 30, "min_area": 500, "max_area": 50000,"color": "#34d399"},
}
# ─── Road type definitions ─────────────────────────────────────────────────────
ROAD_TYPE_DEFAULTS = {
"primary": {"label": "Primary Road", "width": 20, "color": "#f78166"},
"secondary": {"label": "Secondary Road", "width": 14, "color": "#f0c84a"},
"local": {"label": "Local Road", "width": 9, "color": "#8b949e"},
"lane": {"label": "Lane / Alley", "width": 4, "color": "#58a6ff"},
}
# ─── Models ───────────────────────────────────────────────────────────────────
class RoadInput(BaseModel):
geometry: Dict[str, Any]
road_type: str = "local" # primary / secondary / local / lane
width: Optional[float] = None # overrides type default if set
label: Optional[str] = None
class ZoneInput(BaseModel):
geometry: Dict[str, Any]
land_use: str = "residential_medium"
label: Optional[str] = None
class SubdivisionConfig(BaseModel):
# Parcel dimensions
min_frontage: float = 12.0
min_plot_depth: float = 25.0 # renamed from min_depth for clarity
min_area: Optional[float] = None # auto from land_use if None
max_area: Optional[float] = None # 0 = unlimited
# Roads
default_road_width: float = 9.0
splay_radius: float = 3.0 # was corner_radius; splay is the correct survey term
max_block_length: float = 120.0
allow_culdesac: bool = True
# Land use
land_use: str = "residential_medium"
# Numbering
parcel_prefix: str = "P"
zone_prefix: str = "Z"
start_number: int = 1
def model_post_init(self, _):
preset = LAND_USE_PRESETS.get(self.land_use, LAND_USE_PRESETS["residential_medium"])
if self.min_area is None:
self.min_area = preset["min_area"]
if self.max_area is None:
self.max_area = preset["max_area"]
class SubdivisionRequest(BaseModel):
boundary: Dict[str, Any]
roads: Optional[List[RoadInput]] = []
existing_features: Optional[List[Dict[str, Any]]] = []
zones: Optional[List[ZoneInput]] = []
config: Optional[SubdivisionConfig] = None
session_id: Optional[str] = None # for progress tracking
# In-memory progress store (per session_id, cleared after delivery)
_progress: Dict[str, Dict] = {}
# ─── Geometry helpers ─────────────────────────────────────────────────────────
def valid(geom):
if geom is None or geom.is_empty: return geom
if not geom.is_valid: geom = make_valid(geom)
return geom
def extract_geometry(obj: Dict) -> Any:
if obj is None: raise ValueError("Null geometry")
geo = obj.get("geometry", obj) if obj.get("type") == "Feature" else obj
if geo is None or not geo.get("type"):
raise ValueError(f"Cannot extract geometry from: {list(obj.keys())}")
return shape(geo)
def flatten_polygons(geom) -> List[Polygon]:
if geom is None or geom.is_empty: return []
if isinstance(geom, Polygon): return [geom]
if isinstance(geom, (MultiPolygon, GeometryCollection)):
out = []
for g in geom.geoms: out.extend(flatten_polygons(g))
return out
return []
def obb(polygon: Polygon) -> Tuple[float, float, float]:
"""Oriented bounding box → (short, long, angle_deg)."""
mbr = polygon.minimum_rotated_rectangle
coords = list(mbr.exterior.coords)
e1 = math.hypot(coords[1][0]-coords[0][0], coords[1][1]-coords[0][1])
e2 = math.hypot(coords[2][0]-coords[1][0], coords[2][1]-coords[1][1])
ang = math.degrees(math.atan2(coords[1][1]-coords[0][1], coords[1][0]-coords[0][0]))
return (min(e1,e2), max(e1,e2), ang)
def estimate_scale(boundary: Polygon) -> float:
lat = boundary.centroid.y
return (111_320.0 + 111_320.0 * math.cos(math.radians(lat))) / 2.0
def deg(metres: float, sc: float) -> float:
return metres / sc
def buffer_line(line, width_deg: float) -> Polygon:
return line.buffer(width_deg / 2, cap_style=2, join_style=2)
# ─── Road network ─────────────────────────────────────────────────────────────
def build_road_network(
boundary: Polygon,
road_inputs: List[RoadInput],
config: SubdivisionConfig,
sc: float
) -> Tuple[List[Dict], Polygon]:
"""Returns (road_feature_dicts, road_union_polygon)."""
road_features = [] # dicts with geometry + metadata
road_polys = []
default_rw = deg(config.default_road_width, sc)
# ── User roads ────────────────────────────────────────────────────────────
for ri in road_inputs:
try:
line = extract_geometry(ri.geometry if isinstance(ri, RoadInput) else ri)
except Exception:
continue
if not isinstance(line, (LineString, MultiLineString)):
continue
lines = list(line.geoms) if isinstance(line, MultiLineString) else [line]
# Determine width for this road
rtype = (ri.road_type if isinstance(ri, RoadInput) else "local")
rwidth_m = (ri.width if isinstance(ri, RoadInput) and ri.width else
ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["width"])
rw = deg(rwidth_m, sc)
color = ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["color"]
for line_part in lines:
cl = valid(line_part.intersection(boundary))
if cl and not cl.is_empty:
rp = valid(buffer_line(cl, rw).intersection(boundary))
if rp and not rp.is_empty:
road_polys.append(rp)
road_features.append({
"type": "Feature",
"geometry": mapping(rp),
"properties": {
"type": "road_surface",
"road_type": rtype,
"width_m": rwidth_m,
"label": (ri.label if isinstance(ri, RoadInput) else None) or f"{rtype.capitalize()} Road",
"color": color,
}
})
# ── Auto-fill grid roads ──────────────────────────────────────────────────
existing_union = valid(unary_union(road_polys)) if road_polys else Polygon()
bl = deg(config.max_block_length, sc)
rw = default_rw
step = bl + rw
minx, miny, maxx, maxy = boundary.bounds
site_w, site_h = maxx - minx, maxy - miny
def try_add_auto(line: LineString):
cl = valid(line.intersection(boundary))
if not cl or cl.is_empty or cl.length < deg(config.min_frontage * 2, sc): return
rp = valid(buffer_line(cl, rw).intersection(boundary))
if not rp or rp.is_empty: return
if not existing_union.is_empty and rp.intersection(existing_union).area / rp.area > 0.6: return
road_polys.append(rp)
road_features.append({
"type": "Feature", "geometry": mapping(rp),
"properties": {"type": "road_surface", "road_type": "local",
"width_m": config.default_road_width, "label": "Local Road (auto)",
"color": ROAD_TYPE_DEFAULTS["local"]["color"]}
})
xs = []
x = minx + step
while x < maxx - rw: xs.append(x); x += step
if not xs and site_w > deg(config.min_plot_depth * 2 + config.default_road_width, sc):
xs = [minx + site_w / 2]
for x in xs: try_add_auto(LineString([(x, miny - 1), (x, maxy + 1)]))
ys = []
y = miny + step
while y < maxy - rw: ys.append(y); y += step
if not ys and site_h > deg(config.min_plot_depth * 2 + config.default_road_width, sc):
ys = [miny + site_h / 2]
for y in ys: try_add_auto(LineString([(minx - 1, y), (maxx + 1, y)]))
# ── Perimeter access ring ─────────────────────────────────────────────────
perim_w = deg(config.default_road_width * 0.5, sc)
inner = valid(boundary.buffer(-perim_w))
if inner and not inner.is_empty and inner.area > 0:
perim = valid(boundary.difference(inner))
if perim and not perim.is_empty:
road_polys.append(perim)
# Don't add to road_features (perimeter ring is structural, not displayed)
road_union = valid(unary_union(road_polys)) if road_polys else Polygon()
return road_features, road_union
# ─── Cul-de-sac ───────────────────────────────────────────────────────────────
def generate_culdesacs(blocks, road_union, config, sc):
cds_polys, new_blocks = [], []
bl = deg(config.max_block_length, sc)
rw = deg(config.default_road_width, sc)
for block in blocks:
_, h, _ = obb(block)
if h > bl and config.allow_culdesac:
cds, trimmed = _try_culdesac(block, rw, config, sc)
if cds and trimmed and not trimmed.is_empty:
cds_polys.append(cds); new_blocks.append(trimmed); continue
new_blocks.append(block)
return cds_polys, new_blocks
def _try_culdesac(block, rw_deg, config, sc):
cx, _ = block.centroid.x, block.centroid.y
minx, miny, _, _ = block.bounds
radius = deg(config.default_road_width * 1.8, sc)
stub_len = deg(config.max_block_length * 0.45, sc)
stub = LineString([(cx, miny), (cx, miny + stub_len)]).intersection(block)
if stub.is_empty or stub.length < rw_deg: return None, None
stub_road = valid(buffer_line(stub, rw_deg).intersection(block))
end_pt = list(stub.coords)[-1]
circle = valid(Point(end_pt).buffer(radius).intersection(block))
cds_area = valid(unary_union([stub_road, circle]).intersection(block))
trimmed = valid(block.difference(cds_area))
if trimmed.is_empty or trimmed.area * sc**2 < config.min_area: return None, None
return cds_area, trimmed
# ─── Block subdivider ─────────────────────────────────────────────────────────
def subdivide_block(block, config, road_union, block_id, start_num, sc, zone_label=None, zone_color=None):
min_f = deg(config.min_frontage, sc)
min_d = deg(config.min_plot_depth, sc)
min_a = config.min_area / sc**2
max_a = (config.max_area or 1e18) / sc**2
_, _, angle = obb(block)
centroid = block.centroid
rotated = valid(rotate(block, -angle, origin=centroid))
rminx, rminy, rmaxx, rmaxy = rotated.bounds
bw, bh = rmaxx - rminx, rmaxy - rminy
num = start_num
parcels = []
def emit(x0, y0, x1, y1):
nonlocal num
p = valid(Polygon([(x0,y0),(x1,y0),(x1,y1),(x0,y1)]).intersection(rotated))
if p is None or p.is_empty or p.area < min_a * 0.5: return
p = valid(rotate(p, angle, origin=centroid))
feats = _make_parcel(p, config, block_id, num, sc, road_union,
zone_label, zone_color, start_num)
parcels.extend(feats)
num += 1
if bw >= bh:
n_cols = max(1, round(bw / min_f))
col_w = bw / n_cols
rows = ([(rminy, rminy + bh/2), (rminy + bh/2, rmaxy)]
if bh >= min_d * 2 else [(rminy, rmaxy)])
for y0, y1 in rows:
for c in range(n_cols):
x0 = rminx + c * col_w
emit(x0, y0, x0 + col_w, y1)
else:
n_rows = max(1, round(bh / min_f))
row_h = bh / n_rows
cols = ([(rminx, rminx + bw/2), (rminx + bw/2, rmaxx)]
if bw >= min_d * 2 else [(rminx, rmaxx)])
for x0, x1 in cols:
for r in range(n_rows):
y0 = rminy + r * row_h
emit(x0, y0, x1, y0 + row_h)
return parcels
def _make_parcel(geom, config, block_id, num, sc, road_union, zone_label, zone_color, base):
out = []
parts = flatten_polygons(geom) if not isinstance(geom, Polygon) else [geom]
tol = deg(0.5, sc)
preset = LAND_USE_PRESETS.get(config.land_use, LAND_USE_PRESETS["residential_medium"])
for g in parts:
if g.is_empty: continue
area_m2 = round(g.area * sc**2, 1)
if area_m2 < config.min_area * 0.4: continue
pw, ph, _ = obb(g)
frontage_m = round(min(pw, ph) * sc, 1)
plot_depth_m = round(max(pw, ph) * sc, 1)
has_access = (road_union is not None and not road_union.is_empty
and g.buffer(tol).intersects(road_union))
max_a = config.max_area or 1e18
parcel_id = f"{config.parcel_prefix}{num:04d}"
out.append({
"type": "Feature",
"geometry": mapping(g),
"properties": {
"parcel_id": parcel_id,
"parcel_num": num,
"block_id": block_id,
"area_m2": area_m2,
"area_ha": round(area_m2 / 10000, 4),
"frontage_m": frontage_m, # road-facing width
"plot_depth_m": plot_depth_m, # perpendicular dimension
"address": f"Block {block_id}, Plot {num}",
"land_use": config.land_use,
"land_use_label": zone_label or preset["label"],
"zone_color": zone_color or preset["color"],
"zone": zone_label or preset["label"],
"status": "vacant",
"has_access": has_access,
"frontage_ok": frontage_m >= config.min_frontage * 0.85,
"area_ok": config.min_area * 0.8 <= area_m2 <= max_a * 1.2,
"within_max_area": area_m2 <= max_a * 1.1,
}
})
return out
# ─── Quality passes ───────────────────────────────────────────────────────────
def absorb_bad_parcels(parcels, config, sc):
tol = deg(0.5, sc)
changed = True
while changed:
changed = False
geoms = [shape(p["geometry"]) for p in parcels]
for i, p in enumerate(parcels):
g = geoms[i]; am2 = g.area * sc**2
w, h, _ = obb(g)
compact = 4 * math.pi * g.area / (g.length**2) if g.length else 0
max_a = config.max_area or 1e18
ok = am2 >= config.min_area * 0.8 and compact >= 0.18 and (h == 0 or w/h >= 0.12)
if ok: continue
best_j, best_touch = -1, 0.0
for j, q in enumerate(parcels):
if j == i: continue
if p["properties"]["block_id"] != q["properties"]["block_id"]: continue
touch = g.buffer(tol).intersection(geoms[j]).area
if touch > best_touch: best_touch, best_j = touch, j
if best_j >= 0 and best_touch > 0:
merged = valid(g.union(geoms[best_j]))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
am2_new = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
acc = p["properties"].get("has_access") or parcels[best_j]["properties"].get("has_access")
parcels[best_j]["geometry"] = mapping(merged)
parcels[best_j]["properties"].update({
"area_m2": am2_new, "frontage_m": round(min(pw,ph)*sc,1),
"plot_depth_m": round(max(pw,ph)*sc,1), "has_access": acc,
"area_ok": config.min_area*0.8 <= am2_new <= (config.max_area or 1e18)*1.2,
})
geoms[best_j] = merged; parcels.pop(i); changed = True; break
return parcels
def enforce_access(parcels, road_union, config, sc):
if not road_union or road_union.is_empty: return parcels
tol = deg(1.0, sc)
changed = True
while changed:
changed = False
geoms = [shape(p["geometry"]) for p in parcels]
for i, p in enumerate(parcels):
if p["properties"].get("has_access"): continue
g = geoms[i]; best_j, best_shared = -1, 0.0
for j, q in enumerate(parcels):
if j == i: continue
if p["properties"]["block_id"] != q["properties"]["block_id"]: continue
if not q["properties"].get("has_access"): continue
shared = g.buffer(tol).intersection(geoms[j]).area
if shared > best_shared: best_shared, best_j = shared, j
if best_j >= 0:
merged = valid(g.union(geoms[best_j]))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
am2 = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
parcels[best_j]["geometry"] = mapping(merged)
parcels[best_j]["properties"].update({
"area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1),
"plot_depth_m": round(max(pw,ph)*sc,1), "has_access": True,
"area_ok": config.min_area*0.8 <= am2 <= (config.max_area or 1e18)*1.2,
})
geoms[best_j] = merged; parcels.pop(i); changed = True; break
for p in parcels:
g = shape(p["geometry"])
p["properties"]["has_access"] = not road_union.is_empty and g.buffer(tol).intersects(road_union)
return parcels
def apply_existing_buildings(building_geoms, parcels, sc, config):
if not building_geoms or not parcels: return parcels
tol = deg(0.5, sc)
for bldg in building_geoms:
bldg_buf = bldg.buffer(tol)
hit = [i for i, p in enumerate(parcels) if shape(p["geometry"]).intersects(bldg_buf)]
if not hit: continue
while len(hit) > 1:
i, j = hit[0], hit[1]
gi, gj = shape(parcels[i]["geometry"]), shape(parcels[j]["geometry"])
merged = valid(gi.union(gj))
if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area)
am2 = round(merged.area * sc**2, 1); pw, ph, _ = obb(merged)
parcels[i]["geometry"] = mapping(merged)
parcels[i]["properties"].update({
"area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1),
"plot_depth_m": round(max(pw,ph)*sc,1),
"has_access": parcels[i]["properties"].get("has_access") or parcels[j]["properties"].get("has_access"),
})
parcels.pop(j)
hit = [hit[0]] + [k-(1 if k>j else 0) for k in hit[2:]]
parcels[hit[0]]["properties"]["status"] = "built"
parcels[hit[0]]["properties"]["building_area_m2"] = round(bldg.area * sc**2, 1)
return parcels
# ─── Validation ───────────────────────────────────────────────────────────────
def validate_config(boundary: Polygon, config: SubdivisionConfig, sc: float):
"""Return list of warning strings (empty = OK to run)."""
site_area = boundary.area * sc**2
warnings = []
if config.min_area > site_area * 0.5:
warnings.append(f"min_area ({config.min_area} m²) is more than half the site area ({site_area:.0f} m²) — very few parcels will fit")
if config.max_area and config.max_area < config.min_area:
warnings.append(f"max_area ({config.max_area} m²) is less than min_area ({config.min_area} m²)")
if config.min_frontage > config.min_plot_depth:
warnings.append("min_frontage is larger than min_plot_depth — parcels will be wider than deep")
if config.default_road_width >= config.min_frontage * 2:
warnings.append("Road width is very large relative to parcel frontage — most area will be road")
min_site_for_parcels = (config.min_area + config.default_road_width * 20) * 4
if site_area < min_site_for_parcels:
warnings.append(f"Site area ({site_area:.0f} m²) may be too small to fit meaningful parcels with the current parameters")
return warnings
# ─── Main endpoint ────────────────────────────────────────────────────────────
@app.post("/subdivide")
async def subdivide(request: SubdivisionRequest):
session_id = request.session_id or str(uuid.uuid4())
_progress[session_id] = {"pct": 0, "msg": "Starting…"}
def prog(pct, msg):
_progress[session_id] = {"pct": pct, "msg": msg}
try:
config = request.config or SubdivisionConfig()
prog(5, "Parsing boundary…")
boundary = valid(extract_geometry(request.boundary))
if isinstance(boundary, MultiPolygon):
boundary = max(boundary.geoms, key=lambda g: g.area)
if not isinstance(boundary, Polygon):
raise HTTPException(400, "Boundary must be a polygon")
sc = estimate_scale(boundary)
# Validate before running
warnings = validate_config(boundary, config, sc)
prog(10, "Parsing roads and features…")
# Parse user roads (accept both RoadInput dicts and legacy plain dicts)
road_inputs: List[RoadInput] = []
for r in (request.roads or []):
if isinstance(r, dict):
try:
road_inputs.append(RoadInput(**r))
except Exception:
try: road_inputs.append(RoadInput(geometry=r))
except Exception: pass
else:
road_inputs.append(r)
# Parse buildings
building_geoms = []
for f in (request.existing_features or []):
try:
g = valid(extract_geometry(f).intersection(boundary))
building_geoms.extend(flatten_polygons(g))
except Exception: pass
prog(20, "Building road network…")
road_features, road_union = build_road_network(boundary, road_inputs, config, sc)
# Carve obstacles
prog(30, "Computing buildable area…")
obstacles = road_union
if building_geoms:
bldg_union = valid(unary_union(building_geoms))
obstacles = valid(unary_union([road_union, bldg_union]))
buildable = valid(boundary.difference(obstacles)) if not obstacles.is_empty else boundary
blocks = [b for b in flatten_polygons(buildable) if b.area * sc**2 > config.min_area]
prog(40, "Generating cul-de-sacs…")
cds_polys = []
if config.allow_culdesac and blocks:
cds_polys, blocks = generate_culdesacs(blocks, road_union, config, sc)
if cds_polys:
road_union = valid(unary_union([road_union] + cds_polys))
prog(55, "Subdividing blocks into parcels…")
all_parcels = []
num = config.start_number
for bid, block in enumerate(blocks, 1):
bp = subdivide_block(block, config, road_union, bid, num, sc)
all_parcels.extend(bp)
num += len(bp)
prog(55 + int(20 * (bid / max(len(blocks), 1))), f"Subdividing block {bid}/{len(blocks)}")
prog(78, "Quality control…")
all_parcels = absorb_bad_parcels(all_parcels, config, sc)
prog(85, "Enforcing road access…")
all_parcels = enforce_access(all_parcels, road_union, config, sc)
prog(90, "Applying existing buildings…")
if building_geoms:
all_parcels = apply_existing_buildings(building_geoms, all_parcels, sc, config)
prog(95, "Numbering and finalising…")
for i, p in enumerate(all_parcels, config.start_number):
p["properties"]["parcel_num"] = i
p["properties"]["parcel_id"] = f"{config.parcel_prefix}{i:04d}"
# Build response
block_features = [
{"type":"Feature","geometry":mapping(b),
"properties":{"block_id":i+1,"area_m2":round(b.area*sc**2,1)}}
for i, b in enumerate(blocks)
]
cds_features = [
{"type":"Feature","geometry":mapping(c),
"properties":{"type":"culdesac","id":f"CDS{i+1:03d}"}}
for i, c in enumerate(cds_polys)
]
areas = [p["properties"]["area_m2"] for p in all_parcels]
no_acc = sum(1 for p in all_parcels if not p["properties"].get("has_access"))
built = sum(1 for p in all_parcels if p["properties"].get("status") == "built")
stats = {
"total_parcels": len(all_parcels), "total_blocks": len(blocks),
"total_roads": len(road_features), "culdesacs": len(cds_polys),
"parcels_no_access": no_acc, "parcels_built": built,
"parcels_vacant": len(all_parcels) - built,
"boundary_area_m2": round(boundary.area * sc**2, 1),
"road_area_m2": round(road_union.area * sc**2, 1) if not road_union.is_empty else 0,
"buildable_area_m2": round(sum(b.area * sc**2 for b in blocks), 1),
"avg_parcel_area_m2": round(sum(areas)/len(areas), 1) if areas else 0,
"min_parcel_area_m2": round(min(areas), 1) if areas else 0,
"max_parcel_area_m2": round(max(areas), 1) if areas else 0,
"existing_buildings": len(building_geoms),
"user_roads_drawn": len(road_inputs),
"warnings": warnings,
"land_use": config.land_use,
"land_use_label": LAND_USE_PRESETS.get(config.land_use, {}).get("label", config.land_use),
"config": {
"min_frontage": config.min_frontage, "min_plot_depth": config.min_plot_depth,
"min_area": config.min_area, "max_area": config.max_area,
"default_road_width": config.default_road_width, "splay_radius": config.splay_radius,
"max_block_length": config.max_block_length,
}
}
prog(100, "Done")
return {
"parcels": all_parcels, "roads": road_features,
"blocks": block_features, "culdesacs": cds_features,
"stats": stats, "session_id": session_id,
}
except HTTPException: raise
except Exception as e:
prog(-1, f"Error: {e}")
raise HTTPException(500, f"{e}\n{traceback.format_exc()}")
finally:
# Keep progress for a short while so client can read 100%
async def _cleanup():
await asyncio.sleep(10)
_progress.pop(session_id, None)
asyncio.create_task(_cleanup())
@app.get("/progress/{session_id}")
async def get_progress(session_id: str):
return _progress.get(session_id, {"pct": -1, "msg": "Not found"})
@app.get("/land-use-presets")
async def land_use_presets():
return LAND_USE_PRESETS
@app.get("/road-type-defaults")
async def road_type_defaults():
return ROAD_TYPE_DEFAULTS
@app.get("/health")
async def health():
return {"status": "ok", "version": "4.0.0"}
@app.get("/config/defaults")
async def get_defaults():
return SubdivisionConfig().model_dump()