""" Parcel Subdivision Engine - FastAPI Backend v3 - User roads + auto-fill roads combined (not either/or) - Existing building footprints respected: carved from buildable area, containing parcel flagged status=built - Perimeter access road always present - Access enforcement pass """ from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional, Tuple, Dict, Any import math, uuid, traceback from shapely.geometry import ( Polygon, MultiPolygon, LineString, MultiLineString, Point, GeometryCollection, mapping, shape ) from shapely.ops import unary_union from shapely.affinity import rotate from shapely.validation import make_valid app = FastAPI(title="Parcel Subdivision Engine", version="3.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ─── Models ─────────────────────────────────────────────────────────────────── class SubdivisionConfig(BaseModel): min_frontage: float = 12.0 min_depth: float = 25.0 road_width: float = 9.0 max_block_length: float = 120.0 allow_culdesac: bool = True corner_radius: float = 3.0 min_area: Optional[float] = None def model_post_init(self, _): if self.min_area is None: self.min_area = self.min_frontage * self.min_depth class SubdivisionRequest(BaseModel): boundary: Dict[str, Any] roads: Optional[List[Dict[str, Any]]] = [] existing_features: Optional[List[Dict[str, Any]]] = [] zones: Optional[List[Dict[str, Any]]] = [] config: Optional[SubdivisionConfig] = None # ─── Geometry helpers ───────────────────────────────────────────────────────── def valid(geom): if geom is None or geom.is_empty: return geom if not geom.is_valid: geom = make_valid(geom) return geom def extract_geometry(obj: Dict) -> Any: """Accept GeoJSON Feature wrapper or bare geometry dict.""" if obj is None: raise ValueError("Null geometry") geo = obj.get("geometry", obj) if obj.get("type") == "Feature" else obj if geo is None or not geo.get("type"): raise ValueError(f"Cannot extract geometry from: {list(obj.keys())}") return shape(geo) def flatten_polygons(geom) -> List[Polygon]: """Recursively extract all Polygon parts from any geometry.""" if geom is None or geom.is_empty: return [] if isinstance(geom, Polygon): return [geom] if isinstance(geom, (MultiPolygon, GeometryCollection)): out = [] for g in geom.geoms: out.extend(flatten_polygons(g)) return out return [] def obb(polygon: Polygon) -> Tuple[float, float, float]: """Oriented bounding box → (short, long, angle_deg).""" mbr = polygon.minimum_rotated_rectangle coords = list(mbr.exterior.coords) e1 = math.hypot(coords[1][0]-coords[0][0], coords[1][1]-coords[0][1]) e2 = math.hypot(coords[2][0]-coords[1][0], coords[2][1]-coords[1][1]) ang = math.degrees(math.atan2(coords[1][1]-coords[0][1], coords[1][0]-coords[0][0])) return (min(e1,e2), max(e1,e2), ang) def estimate_scale(boundary: Polygon) -> float: """Metres per degree at boundary centroid latitude.""" lat = boundary.centroid.y return (111_320.0 + 111_320.0 * math.cos(math.radians(lat))) / 2.0 def deg(metres: float, sc: float) -> float: """Convert metres → degrees using scale factor.""" return metres / sc def buffer_line(line, width_deg: float) -> Polygon: return line.buffer(width_deg / 2, cap_style=2, join_style=2) # ─── Road network ───────────────────────────────────────────────────────────── def build_road_network( boundary: Polygon, user_roads: List[LineString], config: SubdivisionConfig, sc: float ) -> Tuple[List[Polygon], Polygon]: """ Build the complete road network: 1. Buffer every user-drawn road centreline. 2. Auto-generate grid roads for any block that exceeds max_block_length (regardless of whether the user drew some roads already). 3. Add a perimeter half-road ring so every outer block has access. Returns (road_polygons_for_display, road_union_polygon). """ rw = deg(config.road_width, sc) bl = deg(config.max_block_length, sc) step = bl + rw # spacing between road centrelines road_polys: List[Polygon] = [] # ── Step 1: user roads ──────────────────────────────────────────────────── for line in user_roads: cl = valid(line.intersection(boundary)) if cl and not cl.is_empty: rp = valid(buffer_line(cl, rw).intersection(boundary)) if rp and not rp.is_empty: road_polys.append(rp) # ── Step 2: auto-fill grid roads ────────────────────────────────────────── # Always run auto-generation, but only add roads that fall in areas NOT # already covered by user roads (avoids duplicate overlapping roads). existing_road_union = valid(unary_union(road_polys)) if road_polys else Polygon() minx, miny, maxx, maxy = boundary.bounds site_w = maxx - minx site_h = maxy - miny def try_add(line: LineString): cl = valid(line.intersection(boundary)) if not cl or cl.is_empty or cl.length < deg(config.min_frontage * 2, sc): return rp = valid(buffer_line(cl, rw).intersection(boundary)) if not rp or rp.is_empty: return # Skip if this road is already substantially covered by user roads if not existing_road_union.is_empty: overlap = rp.intersection(existing_road_union).area / rp.area if overlap > 0.6: return road_polys.append(rp) # Vertical auto-roads x_positions = [] x = minx + step while x < maxx - rw: x_positions.append(x) x += step if not x_positions and site_w > deg(config.min_depth * 2 + config.road_width, sc): x_positions = [minx + site_w / 2] for x in x_positions: try_add(LineString([(x, miny - 1), (x, maxy + 1)])) # Horizontal auto-roads y_positions = [] y = miny + step while y < maxy - rw: y_positions.append(y) y += step if not y_positions and site_h > deg(config.min_depth * 2 + config.road_width, sc): y_positions = [miny + site_h / 2] for y in y_positions: try_add(LineString([(minx - 1, y), (maxx + 1, y)])) # ── Step 3: perimeter access ring ──────────────────────────────────────── perim_width = deg(config.road_width * 0.5, sc) inner = valid(boundary.buffer(-perim_width)) if inner and not inner.is_empty and inner.area > 0: perim = valid(boundary.difference(inner)) if perim and not perim.is_empty: road_polys.append(perim) # Final road union road_union = valid(unary_union(road_polys)) if road_polys else Polygon() return road_polys, road_union # ─── Existing features (buildings) ─────────────────────────────────────────── def process_existing_features( building_geoms: List[Polygon], buildable: Polygon, parcels: List[Dict], sc: float, config: SubdivisionConfig ) -> List[Dict]: """ For each existing building footprint: - Find which parcel(s) it overlaps. - If it spans multiple parcels, merge them into one. - Mark the containing parcel status='built', store building info. - Remove building area from the parcel geometry is NOT done (building sits inside the parcel, that's realistic). Buildings that fall entirely outside any parcel are ignored. """ if not building_geoms or not parcels: return parcels tol = deg(0.5, sc) for bldg in building_geoms: bldg_buf = bldg.buffer(tol) hit_indices = [] for i, p in enumerate(parcels): pg = shape(p["geometry"]) if pg.intersects(bldg_buf): hit_indices.append(i) if not hit_indices: continue # building outside all parcels — skip # Merge all hit parcels into one while len(hit_indices) > 1: i, j = hit_indices[0], hit_indices[1] gi = shape(parcels[i]["geometry"]) gj = shape(parcels[j]["geometry"]) merged = valid(gi.union(gj)) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) area_m2 = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) parcels[i]["geometry"] = mapping(merged) parcels[i]["properties"].update({ "area_m2": area_m2, "frontage_m": round(min(pw,ph)*sc, 1), "depth_m": round(max(pw,ph)*sc, 1), "has_access": (parcels[i]["properties"].get("has_access") or parcels[j]["properties"].get("has_access")), "area_ok": area_m2 >= config.min_area * 0.8, }) parcels.pop(j) # Rebuild indices after pop hit_indices = [hit_indices[0]] + [ k - (1 if k > j else 0) for k in hit_indices[2:] ] # Mark the surviving parcel as built target = hit_indices[0] parcels[target]["properties"]["status"] = "built" parcels[target]["properties"]["building_area_m2"] = round(bldg.area * sc**2, 1) return parcels # ─── Cul-de-sac generator ───────────────────────────────────────────────────── def generate_culdesacs( blocks: List[Polygon], road_union: Polygon, config: SubdivisionConfig, sc: float ) -> Tuple[List[Polygon], List[Polygon]]: cds_polys, new_blocks = [], [] bl = deg(config.max_block_length, sc) rw = deg(config.road_width, sc) for block in blocks: _, h, _ = obb(block) if h > bl and config.allow_culdesac: cds, trimmed = _try_culdesac(block, rw, config, sc) if cds and trimmed and not trimmed.is_empty: cds_polys.append(cds) new_blocks.append(trimmed) continue new_blocks.append(block) return cds_polys, new_blocks def _try_culdesac(block, rw_deg, config, sc): cx, _ = block.centroid.x, block.centroid.y minx, miny, maxx, maxy = block.bounds radius = deg(config.road_width * 1.8, sc) stub_len = deg(config.max_block_length * 0.45, sc) stub = LineString([(cx, miny), (cx, miny + stub_len)]).intersection(block) if stub.is_empty or stub.length < rw_deg: return None, None stub_road = valid(buffer_line(stub, rw_deg).intersection(block)) end_pt = list(stub.coords)[-1] circle = valid(Point(end_pt).buffer(radius).intersection(block)) cds_area = valid(unary_union([stub_road, circle]).intersection(block)) trimmed = valid(block.difference(cds_area)) if trimmed.is_empty or trimmed.area * sc**2 < config.min_area: return None, None return cds_area, trimmed # ─── Block subdivider ───────────────────────────────────────────────────────── def subdivide_block( block: Polygon, config: SubdivisionConfig, road_union: Polygon, block_id: int, start_num: int, sc: float ) -> List[Dict]: min_f = deg(config.min_frontage, sc) min_d = deg(config.min_depth, sc) min_a = config.min_area / sc**2 _, _, angle = obb(block) centroid = block.centroid rotated = valid(rotate(block, -angle, origin=centroid)) rminx, rminy, rmaxx, rmaxy = rotated.bounds bw = rmaxx - rminx bh = rmaxy - rminy num = start_num parcels = [] def emit(rect_in_rotated): nonlocal num p = valid(rect_in_rotated.intersection(rotated)) if p is None or p.is_empty or p.area < min_a * 0.5: return p = valid(rotate(p, angle, origin=centroid)) feats = _make_parcel_features(p, config, block_id, num, sc, road_union) parcels.extend(feats) num += 1 if bw >= bh: # Wide block: frontage along X, depth along Y n_cols = max(1, round(bw / min_f)) col_w = bw / n_cols rows = ([(rminy, rminy + bh/2), (rminy + bh/2, rmaxy)] if bh >= min_d * 2 else [(rminy, rmaxy)]) for y0, y1 in rows: for c in range(n_cols): x0 = rminx + c * col_w emit(Polygon([(x0,y0),(x0+col_w,y0),(x0+col_w,y1),(x0,y1)])) else: # Tall block: frontage along Y, depth along X n_rows = max(1, round(bh / min_f)) row_h = bh / n_rows cols = ([(rminx, rminx + bw/2), (rminx + bw/2, rmaxx)] if bw >= min_d * 2 else [(rminx, rmaxx)]) for x0, x1 in cols: for r in range(n_rows): y0 = rminy + r * row_h emit(Polygon([(x0,y0),(x1,y0),(x1,y0+row_h),(x0,y0+row_h)])) return parcels def _make_parcel_features(geom, config, block_id, num, sc, road_union): out = [] parts = flatten_polygons(geom) if not isinstance(geom, Polygon) else [geom] tol = deg(0.5, sc) for g in parts: if g.is_empty: continue area_m2 = round(g.area * sc**2, 1) if area_m2 < config.min_area * 0.4: continue pw, ph, _ = obb(g) has_access = (road_union is not None and not road_union.is_empty and g.buffer(tol).intersects(road_union)) out.append({ "type": "Feature", "geometry": mapping(g), "properties": { "parcel_id": f"P{num:04d}", "parcel_num": num, "block_id": block_id, "area_m2": area_m2, "area_ha": round(area_m2 / 10000, 4), "frontage_m": round(min(pw,ph) * sc, 1), "depth_m": round(max(pw,ph) * sc, 1), "address": f"Block {block_id}, Plot {num}", "zone": "Residential", "status": "vacant", "has_access": has_access, "frontage_ok": round(min(pw,ph)*sc,1) >= config.min_frontage * 0.85, "area_ok": area_m2 >= config.min_area * 0.80, } }) return out # ─── Quality passes ─────────────────────────────────────────────────────────── def absorb_bad_parcels(parcels, config, sc): """Merge undersized / badly-shaped parcels into best same-block neighbour.""" tol = deg(0.5, sc) changed = True while changed: changed = False geoms = [shape(p["geometry"]) for p in parcels] for i, p in enumerate(parcels): g = geoms[i] area_m2 = g.area * sc**2 w, h, _ = obb(g) compact = 4 * math.pi * g.area / (g.length**2) if g.length else 0 ok = (area_m2 >= config.min_area * 0.8 and compact >= 0.18 and (h == 0 or w/h >= 0.12)) if ok: continue best_j, best_touch = -1, 0.0 for j, q in enumerate(parcels): if j == i: continue if p["properties"]["block_id"] != q["properties"]["block_id"]: continue touch = g.buffer(tol).intersection(geoms[j]).area if touch > best_touch: best_touch, best_j = touch, j if best_j >= 0 and best_touch > 0: merged = valid(g.union(geoms[best_j])) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2 = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) acc = (p["properties"].get("has_access") or parcels[best_j]["properties"].get("has_access")) parcels[best_j]["geometry"] = mapping(merged) parcels[best_j]["properties"].update({ "area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1), "depth_m": round(max(pw,ph)*sc,1), "has_access": acc, "area_ok": am2 >= config.min_area * 0.8, }) geoms[best_j] = merged parcels.pop(i) changed = True break return parcels def enforce_access(parcels, road_union, config, sc): """Merge no-access parcels into nearest accessed neighbour in same block.""" if not road_union or road_union.is_empty: return parcels tol = deg(1.0, sc) changed = True while changed: changed = False geoms = [shape(p["geometry"]) for p in parcels] for i, p in enumerate(parcels): if p["properties"].get("has_access"): continue g = geoms[i] best_j, best_shared = -1, 0.0 for j, q in enumerate(parcels): if j == i: continue if p["properties"]["block_id"] != q["properties"]["block_id"]: continue if not q["properties"].get("has_access"): continue shared = g.buffer(tol).intersection(geoms[j]).area if shared > best_shared: best_shared, best_j = shared, j if best_j >= 0: merged = valid(g.union(geoms[best_j])) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2 = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) parcels[best_j]["geometry"] = mapping(merged) parcels[best_j]["properties"].update({ "area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1), "depth_m": round(max(pw,ph)*sc,1), "has_access": True, "area_ok": am2 >= config.min_area * 0.8, }) geoms[best_j] = merged parcels.pop(i) changed = True break # Final access recheck for p in parcels: g = shape(p["geometry"]) p["properties"]["has_access"] = ( not road_union.is_empty and g.buffer(tol).intersects(road_union) ) return parcels # ─── Main endpoint ──────────────────────────────────────────────────────────── @app.post("/api/subdivide") async def subdivide(request: SubdivisionRequest): try: config = request.config or SubdivisionConfig() # 1. Parse & validate boundary boundary = valid(extract_geometry(request.boundary)) if isinstance(boundary, MultiPolygon): boundary = max(boundary.geoms, key=lambda g: g.area) if not isinstance(boundary, Polygon): raise HTTPException(400, "Boundary must be a polygon") sc = estimate_scale(boundary) # 2. Parse user-drawn road centrelines user_roads: List[LineString] = [] for r in (request.roads or []): try: g= shape(r["geometry"]) #g = extract_geometry(r) if isinstance(g, LineString): user_roads.append(g) elif isinstance(g, MultiLineString): user_roads.extend(g.geoms) except Exception: pass # print("roads") # print(request.roads) # print(user_roads) # 3. Parse existing building footprints print(request.existing_features) building_geoms: List[Polygon] = [] for f in (request.existing_features or []): try: #g = extract_geometry(f) g = shape(f["geometry"]) # Clip to boundary g = valid(g.intersection(boundary)) building_geoms.extend(flatten_polygons(g)) except Exception: pass #print("buildings") #print( building_geoms) # 4. Build road network (user + auto-fill + perimeter) road_polys, road_union = build_road_network(boundary, user_roads, config, sc) # 5. Buildable area = boundary − roads − existing buildings # Buildings are carved out so subdivision doesn't cut through them. obstacles = road_union if building_geoms: bldg_union = valid(unary_union(building_geoms)) obstacles = valid(unary_union([road_union, bldg_union])) buildable = valid(boundary.difference(obstacles)) # 6. Extract blocks min_block_area = config.min_area / sc**2 blocks = [b for b in flatten_polygons(buildable) if b.area > min_block_area] # 7. Cul-de-sacs cds_polys: List[Polygon] = [] if config.allow_culdesac and blocks: cds_polys, blocks = generate_culdesacs(blocks, road_union, config, sc) if cds_polys: road_union = valid(unary_union([road_union] + cds_polys)) # 8. Subdivide every block all_parcels: List[Dict] = [] num = 1 for bid, block in enumerate(blocks, 1): bp = subdivide_block(block, config, road_union, bid, num, sc) all_parcels.extend(bp) num += len(bp) # 9. Quality passes all_parcels = absorb_bad_parcels(all_parcels, config, sc) all_parcels = enforce_access(all_parcels, road_union, config, sc) # 10. Apply existing building footprints → merge spanning parcels, mark built if building_geoms: all_parcels = process_existing_features( building_geoms, buildable, all_parcels, sc, config ) # 11. Re-number for i, p in enumerate(all_parcels, 1): p["properties"]["parcel_num"] = i p["properties"]["parcel_id"] = f"P{i:04d}" # 12. Build response collections # Roads: exclude the perimeter ring (last element) from display features # because it is very thin and would look ugly — the boundary line already # shows where the perimeter is. display_road_polys = road_polys[:-1] if road_polys else road_polys road_features = [ {"type": "Feature", "geometry": mapping(rp), "properties": {"type": "road_surface", "id": f"R{i+1:03d}", "width_m": config.road_width}} for i, rp in enumerate(display_road_polys) ] block_features = [ {"type": "Feature", "geometry": mapping(b), "properties": {"block_id": i+1, "area_m2": round(b.area*sc**2, 1)}} for i, b in enumerate(blocks) ] cds_features = [ {"type": "Feature", "geometry": mapping(c), "properties": {"type": "culdesac", "id": f"CDS{i+1:03d}"}} for i, c in enumerate(cds_polys) ] # 13. Stats areas = [p["properties"]["area_m2"] for p in all_parcels] no_acc = sum(1 for p in all_parcels if not p["properties"].get("has_access")) built = sum(1 for p in all_parcels if p["properties"].get("status") == "built") stats = { "total_parcels": len(all_parcels), "total_blocks": len(blocks), "total_roads": len(display_road_polys), "culdesacs": len(cds_polys), "parcels_no_access": no_acc, "parcels_built": built, "parcels_vacant": len(all_parcels) - built, "boundary_area_m2": round(boundary.area * sc**2, 1), "road_area_m2": round(road_union.area * sc**2, 1) if not road_union.is_empty else 0, "buildable_area_m2": round(sum(b.area * sc**2 for b in blocks), 1), "avg_parcel_area_m2": round(sum(areas)/len(areas), 1) if areas else 0, "min_parcel_area_m2": round(min(areas), 1) if areas else 0, "existing_buildings": len(building_geoms), "user_roads_drawn": len(user_roads), "config": { "min_frontage": config.min_frontage, "min_depth": config.min_depth, "road_width": config.road_width, "max_block_length": config.max_block_length, } } return { "parcels": all_parcels, "roads": road_features, "blocks": block_features, "culdesacs": cds_features, "stats": stats, } except HTTPException: raise except Exception as e: raise HTTPException(500, f"{e}\n{traceback.format_exc()}") @app.get("/health") async def health(): return {"status": "ok", "version": "3.0.0"} @app.get("/config/defaults") async def get_defaults(): return SubdivisionConfig().model_dump()