""" ParcelGen Backend v5 Fixes: - Auto-roads follow the polygon's dominant (OBB) axis — not always N-S/E-W - Cul-de-sacs: only on genuinely inaccessible block ends; correct size; no parcel overlap - No perimeter access ring (removed); access via user/auto roads only - Hard 100 ha site-area limit - Building footprint forces parcel to be large enough to contain it """ from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional, Dict, Any, Tuple import math, uuid, traceback, asyncio from shapely.geometry import ( Polygon, MultiPolygon, LineString, MultiLineString, Point, GeometryCollection, mapping, shape ) from shapely.ops import unary_union from shapely.affinity import rotate, translate from shapely.validation import make_valid app = FastAPI(title="ParcelGen", version="5.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) MAX_SITE_AREA_HA = 100.0 # hard cap LAND_USE_PRESETS = { "residential_low": {"label": "Residential – Low Density", "min_frontage": 15, "min_depth": 30, "min_area": 450, "max_area": 2000, "color": "#58a6ff"}, "residential_medium": {"label": "Residential – Medium Density", "min_frontage": 10, "min_depth": 25, "min_area": 250, "max_area": 1000, "color": "#3fb950"}, "residential_high": {"label": "Residential – High Density", "min_frontage": 6, "min_depth": 20, "min_area": 120, "max_area": 500, "color": "#f0c84a"}, "commercial": {"label": "Commercial", "min_frontage": 12, "min_depth": 20, "min_area": 300, "max_area": 5000, "color": "#f97316"}, "mixed_use": {"label": "Mixed Use", "min_frontage": 8, "min_depth": 20, "min_area": 200, "max_area": 2000, "color": "#bc8cff"}, "industrial": {"label": "Industrial", "min_frontage": 20, "min_depth": 40, "min_area": 1000, "max_area": 20000, "color": "#f78166"}, "open_space": {"label": "Open Space / Park", "min_frontage": 30, "min_depth": 30, "min_area": 500, "max_area": 50000, "color": "#34d399"}, } ROAD_TYPE_DEFAULTS = { "primary": {"label": "Primary Road", "width": 20, "color": "#f78166"}, "secondary": {"label": "Secondary Road", "width": 14, "color": "#f0c84a"}, "local": {"label": "Local Road", "width": 9, "color": "#8b949e"}, "lane": {"label": "Lane / Alley", "width": 4, "color": "#58a6ff"}, } # ─── Models ─────────────────────────────────────────────────────────────────── class RoadInput(BaseModel): geometry: Dict[str, Any] road_type: str = "local" width: Optional[float] = None label: Optional[str] = None class SubdivisionConfig(BaseModel): min_frontage: float = 10.0 min_plot_depth: float = 25.0 min_area: Optional[float] = None max_area: Optional[float] = None default_road_width: float = 9.0 splay_radius: float = 3.0 max_block_length: float = 120.0 allow_culdesac: bool = True land_use: str = "residential_medium" parcel_prefix: str = "P" zone_prefix: str = "Z" start_number: int = 1 def model_post_init(self, _): preset = LAND_USE_PRESETS.get(self.land_use, LAND_USE_PRESETS["residential_medium"]) if self.min_area is None: self.min_area = preset["min_area"] if self.max_area is None: self.max_area = preset["max_area"] class SubdivisionRequest(BaseModel): boundary: Dict[str, Any] roads: Optional[List[RoadInput]] = [] existing_features: Optional[List[Dict[str, Any]]] = [] zones: Optional[List] = [] config: Optional[SubdivisionConfig] = None session_id: Optional[str] = None _progress: Dict[str, Dict] = {} # ─── Geometry helpers ───────────────────────────────────────────────────────── def valid(geom): if geom is None or geom.is_empty: return geom if not geom.is_valid: geom = make_valid(geom) return geom def extract_geometry(obj): if obj is None: raise ValueError("Null geometry") geo = obj.get("geometry", obj) if obj.get("type") == "Feature" else obj if geo is None or not geo.get("type"): raise ValueError(f"Cannot extract geometry from keys: {list(obj.keys())}") return shape(geo) def flatten_polygons(geom) -> List[Polygon]: if geom is None or geom.is_empty: return [] if isinstance(geom, Polygon): return [geom] if isinstance(geom, (MultiPolygon, GeometryCollection)): out = [] for g in geom.geoms: out.extend(flatten_polygons(g)) return out return [] def obb(polygon: Polygon) -> Tuple[float, float, float]: """Minimum rotated rectangle → (short_side, long_side, angle_deg_of_long_axis).""" mbr = polygon.minimum_rotated_rectangle coords = list(mbr.exterior.coords) e1 = math.hypot(coords[1][0]-coords[0][0], coords[1][1]-coords[0][1]) e2 = math.hypot(coords[2][0]-coords[1][0], coords[2][1]-coords[1][1]) if e1 >= e2: ang = math.degrees(math.atan2(coords[1][1]-coords[0][1], coords[1][0]-coords[0][0])) return e2, e1, ang else: ang = math.degrees(math.atan2(coords[2][1]-coords[1][1], coords[2][0]-coords[1][0])) return e1, e2, ang def dominant_angle(boundary: Polygon) -> float: """Return the angle (degrees) of the longest edge of the boundary's OBB. Auto-roads are generated parallel/perpendicular to this axis.""" _, _, ang = obb(boundary) # Normalise to [0, 90) so roads run in sensible directions ang = ang % 180 if ang > 90: ang -= 90 return ang def estimate_scale(boundary: Polygon) -> float: lat = boundary.centroid.y return (111_320.0 + 111_320.0 * math.cos(math.radians(lat))) / 2.0 def deg(metres: float, sc: float) -> float: return metres / sc def buffer_line(line, width_deg: float) -> Polygon: return line.buffer(width_deg / 2, cap_style=2, join_style=2) def make_rotated_line(cx, cy, angle_deg, length) -> LineString: """Line of given length centred at (cx, cy) at angle_deg from E axis.""" rad = math.radians(angle_deg) dx = math.cos(rad) * length / 2 dy = math.sin(rad) * length / 2 return LineString([(cx - dx, cy - dy), (cx + dx, cy + dy)]) # ─── Road network ───────────────────────────────────────────────────────────── def build_road_network( boundary: Polygon, road_inputs: List[RoadInput], config: SubdivisionConfig, sc: float ) -> Tuple[List[Dict], Polygon]: """ Returns (road_feature_dicts, road_union_polygon). Auto-roads are aligned to the boundary's dominant OBB axis. No perimeter ring is added. """ road_features = [] road_polys: List[Polygon] = [] default_rw = deg(config.default_road_width, sc) # ── 1. User-drawn roads ─────────────────────────────────────────────────── for ri in road_inputs: try: line = extract_geometry(ri.geometry if isinstance(ri, RoadInput) else ri) except Exception: continue if not isinstance(line, (LineString, MultiLineString)): continue lines = list(line.geoms) if isinstance(line, MultiLineString) else [line] rtype = ri.road_type if isinstance(ri, RoadInput) else "local" rwidth_m = (ri.width if isinstance(ri, RoadInput) and ri.width else ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["width"]) rw = deg(rwidth_m, sc) color = ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["color"] for lp in lines: cl = valid(lp.intersection(boundary)) if cl and not cl.is_empty: rp = valid(buffer_line(cl, rw).intersection(boundary)) if rp and not rp.is_empty: road_polys.append(rp) road_features.append({ "type": "Feature", "geometry": mapping(rp), "properties": { "type": "road_surface", "road_type": rtype, "width_m": rwidth_m, "label": (ri.label if isinstance(ri, RoadInput) else None) or f"{rtype.capitalize()} Road", "color": color, } }) # ── 2. Auto-fill grid roads aligned to boundary's dominant axis ─────────── existing_union = valid(unary_union(road_polys)) if road_polys else Polygon() bl = deg(config.max_block_length, sc) rw = default_rw step = bl + rw # Dominant angle: roads run at `axis_ang` and `axis_ang + 90` axis_ang = dominant_angle(boundary) cx_bnd, cy_bnd = boundary.centroid.x, boundary.centroid.y # Diagonal of bounding box — long enough to cross the entire boundary bminx, bminy, bmaxx, bmaxy = boundary.bounds diag = math.hypot(bmaxx - bminx, bmaxy - bminy) * 1.5 def try_add_auto(base_line: LineString): cl = valid(base_line.intersection(boundary)) if not cl or cl.is_empty or cl.length < deg(config.min_frontage * 2, sc): return rp = valid(buffer_line(cl, rw).intersection(boundary)) if not rp or rp.is_empty: return if not existing_union.is_empty and rp.intersection(existing_union).area / rp.area > 0.6: return road_polys.append(rp) road_features.append({ "type": "Feature", "geometry": mapping(rp), "properties": { "type": "road_surface", "road_type": "local", "width_m": config.default_road_width, "label": "Local Road (auto)", "color": ROAD_TYPE_DEFAULTS["local"]["color"], } }) def sweep_roads(sweep_angle: float): """ Generate parallel roads perpendicular to sweep_angle, spaced by `step`, centred on the boundary centroid. sweep_angle is the direction we step across (perp to road direction). """ perp_rad = math.radians(sweep_angle) # Unit vector across which we step ux, uy = math.cos(perp_rad), math.sin(perp_rad) # Project all boundary vertices onto this axis to find extent bcoords = list(boundary.exterior.coords) projs = [x*ux + y*uy for x, y in bcoords] p_min, p_max = min(projs), max(projs) positions = [] pos = p_min + step while pos < p_max - rw: positions.append(pos) pos += step if not positions: # Site fits in one block: add one road through the centre if deep enough extent = p_max - p_min if extent > deg(config.min_plot_depth * 2 + config.default_road_width, sc): positions = [(p_min + p_max) / 2] for pos in positions: # Centre of this road line in geographic coordinates lx = cx_bnd + (pos - (cx_bnd*ux + cy_bnd*uy)) * ux ly = cy_bnd + (pos - (cx_bnd*ux + cy_bnd*uy)) * uy road_line = make_rotated_line(lx, ly, sweep_angle + 90, diag) try_add_auto(road_line) # Roads parallel to long axis (sweep perpendicular to long axis = axis_ang + 90) sweep_roads(axis_ang + 90) # Roads perpendicular to long axis (sweep along long axis = axis_ang) sweep_roads(axis_ang) road_union = valid(unary_union(road_polys)) if road_polys else Polygon() return road_features, road_union # ─── Cul-de-sac ─────────────────────────────────────────────────────────────── def block_has_dead_end(block: Polygon, road_union: Polygon, sc: float) -> bool: """ True if the block's 'far end' (relative to its long axis) is NOT touching any road — indicating a dead end that could benefit from a cul-de-sac. We test both terminal thirds of the block; if either misses road_union, it's a dead end. """ if road_union is None or road_union.is_empty: return False _, _, angle = obb(block) centroid = block.centroid rotated = valid(rotate(block, -angle, origin=centroid)) bx0, by0, bx1, by1 = rotated.bounds bh = by1 - by0 tol = deg(1.0, sc) # Bottom third in rotated space bottom_strip = valid(rotate( Polygon([(bx0, by0), (bx1, by0), (bx1, by0 + bh*0.35), (bx0, by0 + bh*0.35)]), angle, origin=centroid )) # Top third top_strip = valid(rotate( Polygon([(bx0, by1 - bh*0.35), (bx1, by1 - bh*0.35), (bx1, by1), (bx0, by1)]), angle, origin=centroid )) bottom_ok = bottom_strip and bottom_strip.buffer(tol).intersects(road_union) top_ok = top_strip and top_strip.buffer(tol).intersects(road_union) return not bottom_ok or not top_ok def generate_culdesacs(blocks, road_union, config, sc): """ Only attach a cul-de-sac to a block when: 1. The block exceeds max_block_length 2. At least one end of the block has no road access (genuine dead end) 3. The resulting geometry is valid and leaves enough room for parcels """ cds_polys, new_blocks = [], [] bl = deg(config.max_block_length, sc) for block in blocks: _, h, _ = obb(block) if h > bl and config.allow_culdesac and block_has_dead_end(block, road_union, sc): cds, trimmed = _try_culdesac(block, config, sc) if cds and trimmed and not trimmed.is_empty: cds_polys.append(cds) new_blocks.append(trimmed) continue new_blocks.append(block) return cds_polys, new_blocks def _try_culdesac(block: Polygon, config: SubdivisionConfig, sc: float): """ Insert a cul-de-sac at the dead end of the block, aligned to the block's OBB. Radius = road_width (not 1.8×); stub width = road_width. Verifies the trimmed block can still hold at least two minimum parcels. """ rw_deg = deg(config.default_road_width, sc) radius_deg = rw_deg # 1× road width, not 1.8× min_a_deg2 = config.min_area / sc**2 _, _, angle = obb(block) centroid = block.centroid rotated = valid(rotate(block, -angle, origin=centroid)) bx0, by0, bx1, by1 = rotated.bounds bw, bh = bx1 - bx0, by1 - by0 bcx = (bx0 + bx1) / 2 # Stub runs from the bottom edge upward, centred on block stub_len = min(deg(config.max_block_length * 0.3, sc), bh * 0.4) stub_rot = LineString([(bcx, by0), (bcx, by0 + stub_len)]) end_y = by0 + stub_len stub_buf = valid(buffer_line(stub_rot, rw_deg)) circle = Point(bcx, end_y).buffer(radius_deg) cds_rot = valid(unary_union([stub_buf, circle]).intersection(rotated)) trim_rot = valid(rotated.difference(cds_rot)) if trim_rot.is_empty: return None, None # Rotate back cds_geom = valid(rotate(cds_rot, angle, origin=centroid)) trim_geom = valid(rotate(trim_rot, angle, origin=centroid)) # Must leave room for at least 2 minimum parcels if trim_geom.area < min_a_deg2 * 2: return None, None # Must not have the cds overlapping too much of remaining block if not cds_geom.is_empty and cds_geom.area > block.area * 0.4: return None, None return cds_geom, trim_geom # ─── Block subdivider ───────────────────────────────────────────────────────── def subdivide_block(block, config, road_union, block_id, start_num, sc, zone_label=None, zone_color=None): """ Subdivide a block into parcels oriented along the block's own OBB axis. Frontage faces the nearest road edge. """ min_f = deg(config.min_frontage, sc) min_d = deg(config.min_plot_depth, sc) min_a = config.min_area / sc**2 max_a = (config.max_area or 1e18) / sc**2 _, _, angle = obb(block) centroid = block.centroid rotated = valid(rotate(block, -angle, origin=centroid)) rminx, rminy, rmaxx, rmaxy = rotated.bounds bw = rmaxx - rminx bh = rmaxy - rminy num = start_num parcels = [] def emit(x0, y0, x1, y1): nonlocal num cell = Polygon([(x0,y0),(x1,y0),(x1,y1),(x0,y1)]) p = valid(cell.intersection(rotated)) if p is None or p.is_empty or p.area < min_a * 0.45: return p = valid(rotate(p, angle, origin=centroid)) feats = _make_parcel(p, config, block_id, num, sc, road_union, zone_label, zone_color) parcels.extend(feats) num += 1 # Decide orientation based on block shape: # Wide block (bw >= bh): parcels stacked in columns along X, fronting Y roads # Tall block (bh > bw): parcels stacked in rows along Y, fronting X roads if bw >= bh: n_cols = max(1, round(bw / min_f)) col_w = bw / n_cols # Double-bank if block deep enough for two rows of parcels if bh >= min_d * 2.2: rows = [(rminy, rminy + bh/2), (rminy + bh/2, rmaxy)] else: rows = [(rminy, rmaxy)] for y0, y1 in rows: for c in range(n_cols): x0 = rminx + c * col_w emit(x0, y0, x0 + col_w, y1) else: n_rows = max(1, round(bh / min_f)) row_h = bh / n_rows if bw >= min_d * 2.2: cols = [(rminx, rminx + bw/2), (rminx + bw/2, rmaxx)] else: cols = [(rminx, rmaxx)] for x0, x1 in cols: for r in range(n_rows): y0 = rminy + r * row_h emit(x0, y0, x1, y0 + row_h) return parcels def _make_parcel(geom, config, block_id, num, sc, road_union, zone_label, zone_color): out = [] tol = deg(0.5, sc) preset = LAND_USE_PRESETS.get(config.land_use, LAND_USE_PRESETS["residential_medium"]) parts = flatten_polygons(geom) if not isinstance(geom, Polygon) else [geom] for g in parts: if g.is_empty: continue area_m2 = round(g.area * sc**2, 1) if area_m2 < config.min_area * 0.4: continue pw, ph, _ = obb(g) frontage_m = round(min(pw, ph) * sc, 1) plot_depth_m = round(max(pw, ph) * sc, 1) has_access = (road_union is not None and not road_union.is_empty and g.buffer(tol).intersects(road_union)) max_a = config.max_area or 1e18 out.append({ "type": "Feature", "geometry": mapping(g), "properties": { "parcel_id": f"{config.parcel_prefix}{num:04d}", "parcel_num": num, "block_id": block_id, "area_m2": area_m2, "area_ha": round(area_m2 / 10000, 4), "frontage_m": frontage_m, "plot_depth_m": plot_depth_m, "address": f"Block {block_id}, Plot {num}", "land_use": config.land_use, "land_use_label": zone_label or preset["label"], "zone_color": zone_color or preset["color"], "zone": zone_label or preset["label"], "status": "vacant", "has_access": has_access, "frontage_ok": frontage_m >= config.min_frontage * 0.85, "area_ok": config.min_area * 0.8 <= area_m2 <= max_a * 1.2, "within_max_area": area_m2 <= max_a * 1.1, } }) return out # ─── Quality passes ─────────────────────────────────────────────────────────── def absorb_bad_parcels(parcels, config, sc): tol = deg(0.5, sc) changed = True while changed: changed = False geoms = [shape(p["geometry"]) for p in parcels] for i, p in enumerate(parcels): g = geoms[i] am2 = g.area * sc**2 w, h, _ = obb(g) compact = 4 * math.pi * g.area / (g.length**2) if g.length else 0 ok = (am2 >= config.min_area * 0.8 and compact >= 0.18 and (h == 0 or w / h >= 0.12)) if ok: continue best_j, best_touch = -1, 0.0 for j, q in enumerate(parcels): if j == i: continue if p["properties"]["block_id"] != q["properties"]["block_id"]: continue touch = g.buffer(tol).intersection(geoms[j]).area if touch > best_touch: best_touch, best_j = touch, j if best_j >= 0 and best_touch > 0: merged = valid(g.union(geoms[best_j])) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2_new = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) acc = (p["properties"].get("has_access") or parcels[best_j]["properties"].get("has_access")) parcels[best_j]["geometry"] = mapping(merged) parcels[best_j]["properties"].update({ "area_m2": am2_new, "frontage_m": round(min(pw,ph)*sc, 1), "plot_depth_m": round(max(pw,ph)*sc, 1), "has_access": acc, "area_ok": config.min_area*0.8 <= am2_new <= (config.max_area or 1e18)*1.2, }) geoms[best_j] = merged; parcels.pop(i); changed = True; break return parcels def enforce_access(parcels, road_union, config, sc): if not road_union or road_union.is_empty: return parcels tol = deg(1.0, sc) changed = True while changed: changed = False geoms = [shape(p["geometry"]) for p in parcels] for i, p in enumerate(parcels): if p["properties"].get("has_access"): continue g = geoms[i]; best_j, best_sh = -1, 0.0 for j, q in enumerate(parcels): if j == i: continue if p["properties"]["block_id"] != q["properties"]["block_id"]: continue if not q["properties"].get("has_access"): continue sh = g.buffer(tol).intersection(geoms[j]).area if sh > best_sh: best_sh, best_j = sh, j if best_j >= 0: merged = valid(g.union(geoms[best_j])) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2 = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) parcels[best_j]["geometry"] = mapping(merged) parcels[best_j]["properties"].update({ "area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1), "plot_depth_m": round(max(pw,ph)*sc,1), "has_access": True, "area_ok": config.min_area*0.8 <= am2 <= (config.max_area or 1e18)*1.2, }) geoms[best_j] = merged; parcels.pop(i); changed = True; break # Final recheck for p in parcels: g = shape(p["geometry"]) p["properties"]["has_access"] = (not road_union.is_empty and g.buffer(tol).intersects(road_union)) return parcels def apply_existing_buildings(building_geoms, parcels, sc, config): """ For each building footprint: 1. Find all parcels that intersect it. 2. Merge them until one parcel contains the building. 3. If the merged parcel is still smaller than the building's bounding box (with a 2 m margin), expand it by merging with neighbouring parcels. 4. Mark the parcel status='built'. """ if not building_geoms or not parcels: return parcels tol = deg(0.5, sc) margin = deg(2.0, sc) for bldg in building_geoms: bldg_buf = bldg.buffer(tol) hit = [i for i, p in enumerate(parcels) if shape(p["geometry"]).intersects(bldg_buf)] if not hit: continue # Merge all hit parcels while len(hit) > 1: i, j = hit[0], hit[1] gi = shape(parcels[i]["geometry"]) gj = shape(parcels[j]["geometry"]) merged = valid(gi.union(gj)) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2 = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) parcels[i]["geometry"] = mapping(merged) parcels[i]["properties"].update({ "area_m2": am2, "frontage_m": round(min(pw,ph)*sc, 1), "plot_depth_m": round(max(pw,ph)*sc, 1), "has_access": (parcels[i]["properties"].get("has_access") or parcels[j]["properties"].get("has_access")), }) parcels.pop(j) hit = [hit[0]] + [k-(1 if k>j else 0) for k in hit[2:]] # Check the merged parcel truly contains the building (with margin) target_idx = hit[0] parcel_geom = shape(parcels[target_idx]["geometry"]) # If building does not fit, expand into adjacent parcels expansion_attempts = 0 while not parcel_geom.buffer(margin).contains(bldg) and expansion_attempts < 6: expansion_attempts += 1 geoms_all = [shape(p["geometry"]) for p in parcels] bid = parcels[target_idx]["properties"]["block_id"] # Find touching neighbour (same block preferred, any block if not found) best_nb, best_touch = -1, 0.0 for k, q in enumerate(parcels): if k == target_idx: continue touch = parcel_geom.buffer(tol).intersection(geoms_all[k]).area if touch > best_touch: best_touch, best_nb = touch, k if best_nb < 0 or best_touch == 0: break merged = valid(parcel_geom.union(geoms_all[best_nb])) if isinstance(merged, MultiPolygon): merged = max(merged.geoms, key=lambda x: x.area) am2 = round(merged.area * sc**2, 1) pw, ph, _ = obb(merged) parcels[target_idx]["geometry"] = mapping(merged) parcels[target_idx]["properties"].update({ "area_m2": am2, "frontage_m": round(min(pw,ph)*sc, 1), "plot_depth_m": round(max(pw,ph)*sc, 1), "has_access": (parcels[target_idx]["properties"].get("has_access") or parcels[best_nb]["properties"].get("has_access")), }) parcels.pop(best_nb) if best_nb < target_idx: target_idx -= 1 parcel_geom = shape(parcels[target_idx]["geometry"]) parcels[target_idx]["properties"]["status"] = "built" parcels[target_idx]["properties"]["building_area_m2"] = round(bldg.area * sc**2, 1) parcels[target_idx]["properties"]["building_fits"] = parcel_geom.buffer(margin).contains(bldg) return parcels # ─── Validation ─────────────────────────────────────────────────────────────── def validate_config(boundary: Polygon, config: SubdivisionConfig, sc: float): site_area = boundary.area * sc**2 warnings = [] if site_area > MAX_SITE_AREA_HA * 10_000: raise HTTPException( 400, f"Site area ({site_area/10000:.1f} ha) exceeds the maximum allowed " f"planning area of {MAX_SITE_AREA_HA} ha. Please split the site into smaller sections." ) if config.min_area and config.min_area > site_area * 0.5: warnings.append( f"min_area ({config.min_area} m²) is more than half the site area " f"({site_area:.0f} m²) — very few parcels will fit" ) if config.max_area and config.max_area < config.min_area: warnings.append("max_area is less than min_area") if config.min_frontage > config.min_plot_depth: warnings.append("min_frontage exceeds min_plot_depth — parcels will be wider than deep") if config.default_road_width >= config.min_frontage * 2: warnings.append("Road width is very large relative to parcel frontage") return warnings # ─── Main endpoint ──────────────────────────────────────────────────────────── @app.post("/subdivide") async def subdivide(request: SubdivisionRequest): session_id = request.session_id or str(uuid.uuid4()) _progress[session_id] = {"pct": 0, "msg": "Starting…"} def prog(pct, msg): _progress[session_id] = {"pct": pct, "msg": msg} try: config = request.config or SubdivisionConfig() prog(5, "Parsing boundary…") boundary = valid(extract_geometry(request.boundary)) if isinstance(boundary, MultiPolygon): boundary = max(boundary.geoms, key=lambda g: g.area) if not isinstance(boundary, Polygon): raise HTTPException(400, "Boundary must be a polygon") sc = estimate_scale(boundary) warnings = validate_config(boundary, config, sc) # may raise 400 prog(10, "Parsing roads and buildings…") road_inputs: List[RoadInput] = [] for r in (request.roads or []): if isinstance(r, dict): try: road_inputs.append(RoadInput(**r)) except Exception: try: road_inputs.append(RoadInput(geometry=r)) except Exception: pass else: road_inputs.append(r) building_geoms = [] for f in (request.existing_features or []): try: g = valid(extract_geometry(f).intersection(boundary)) building_geoms.extend(flatten_polygons(g)) except Exception: pass prog(20, "Building road network (axis-aligned)…") road_features, road_union = build_road_network(boundary, road_inputs, config, sc) prog(30, "Computing buildable area…") obstacles = road_union if building_geoms: bldg_union = valid(unary_union(building_geoms)) obstacles = valid(unary_union([road_union, bldg_union])) if not road_union.is_empty else bldg_union buildable = valid(boundary.difference(obstacles)) if not obstacles.is_empty else boundary blocks = [b for b in flatten_polygons(buildable) if b.area * sc**2 > config.min_area] prog(40, "Generating cul-de-sacs (where needed)…") cds_polys = [] if config.allow_culdesac and blocks: cds_polys, blocks = generate_culdesacs(blocks, road_union, config, sc) if cds_polys: road_union = valid(unary_union([road_union] + cds_polys)) prog(55, "Subdividing blocks…") all_parcels: List[Dict] = [] num = config.start_number for bid, block in enumerate(blocks, 1): bp = subdivide_block(block, config, road_union, bid, num, sc) all_parcels.extend(bp) num += len(bp) prog(55 + int(20 * bid / max(len(blocks), 1)), f"Block {bid}/{len(blocks)}…") prog(78, "Shape quality pass…") all_parcels = absorb_bad_parcels(all_parcels, config, sc) prog(86, "Access enforcement…") all_parcels = enforce_access(all_parcels, road_union, config, sc) prog(92, "Fitting existing buildings into parcels…") if building_geoms: all_parcels = apply_existing_buildings(building_geoms, all_parcels, sc, config) prog(96, "Numbering…") for i, p in enumerate(all_parcels, config.start_number): p["properties"]["parcel_num"] = i p["properties"]["parcel_id"] = f"{config.parcel_prefix}{i:04d}" block_features = [ {"type":"Feature","geometry":mapping(b), "properties":{"block_id":i+1,"area_m2":round(b.area*sc**2,1)}} for i, b in enumerate(blocks) ] cds_features = [ {"type":"Feature","geometry":mapping(c), "properties":{"type":"culdesac","id":f"CDS{i+1:03d}"}} for i, c in enumerate(cds_polys) ] areas = [p["properties"]["area_m2"] for p in all_parcels] no_acc = sum(1 for p in all_parcels if not p["properties"].get("has_access")) built = sum(1 for p in all_parcels if p["properties"].get("status") == "built") stats = { "total_parcels": len(all_parcels), "total_blocks": len(blocks), "total_roads": len(road_features), "culdesacs": len(cds_polys), "parcels_no_access": no_acc, "parcels_built": built, "parcels_vacant": len(all_parcels) - built, "boundary_area_m2": round(boundary.area * sc**2, 1), "road_area_m2": round(road_union.area * sc**2, 1) if not road_union.is_empty else 0, "buildable_area_m2": round(sum(b.area*sc**2 for b in blocks), 1), "avg_parcel_area_m2": round(sum(areas)/len(areas), 1) if areas else 0, "min_parcel_area_m2": round(min(areas), 1) if areas else 0, "max_parcel_area_m2": round(max(areas), 1) if areas else 0, "existing_buildings": len(building_geoms), "user_roads_drawn": len(road_inputs), "warnings": warnings, "land_use": config.land_use, "land_use_label": LAND_USE_PRESETS.get(config.land_use, {}).get("label", config.land_use), "dominant_axis_deg": round(dominant_angle(boundary), 1), "config": { "min_frontage": config.min_frontage, "min_plot_depth": config.min_plot_depth, "min_area": config.min_area, "max_area": config.max_area, "default_road_width": config.default_road_width, "splay_radius": config.splay_radius, "max_block_length": config.max_block_length, } } prog(100, "Done") return { "parcels": all_parcels, "roads": road_features, "blocks": block_features, "culdesacs": cds_features, "stats": stats, "session_id": session_id, } except HTTPException: raise except Exception as e: prog(-1, f"Error: {e}") raise HTTPException(500, f"{e}\n{traceback.format_exc()}") finally: async def _cleanup(): await asyncio.sleep(10) _progress.pop(session_id, None) asyncio.create_task(_cleanup()) @app.get("/progress/{session_id}") async def get_progress(session_id: str): return _progress.get(session_id, {"pct": -1, "msg": "Not found"}) @app.get("/land-use-presets") async def land_use_presets_ep(): return LAND_USE_PRESETS @app.get("/road-type-defaults") async def road_type_defaults_ep(): return ROAD_TYPE_DEFAULTS @app.get("/health") async def health(): return {"status": "ok", "version": "5.0.0", "max_site_ha": MAX_SITE_AREA_HA} @app.get("/config/defaults") async def get_defaults(): return SubdivisionConfig().model_dump()