824 lines
36 KiB
Python
824 lines
36 KiB
Python
"""
|
||
ParcelGen Backend v5
|
||
Fixes:
|
||
- Auto-roads follow the polygon's dominant (OBB) axis — not always N-S/E-W
|
||
- Cul-de-sacs: only on genuinely inaccessible block ends; correct size; no parcel overlap
|
||
- No perimeter access ring (removed); access via user/auto roads only
|
||
- Hard 100 ha site-area limit
|
||
- Building footprint forces parcel to be large enough to contain it
|
||
"""
|
||
|
||
from fastapi import FastAPI, HTTPException
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel
|
||
from typing import List, Optional, Dict, Any, Tuple
|
||
import math, uuid, traceback, asyncio
|
||
from shapely.geometry import (
|
||
Polygon, MultiPolygon, LineString, MultiLineString,
|
||
Point, GeometryCollection, mapping, shape
|
||
)
|
||
from shapely.ops import unary_union
|
||
from shapely.affinity import rotate, translate
|
||
from shapely.validation import make_valid
|
||
|
||
app = FastAPI(title="ParcelGen", version="5.0.0")
|
||
app.add_middleware(
|
||
CORSMiddleware, allow_origins=["*"], allow_credentials=True,
|
||
allow_methods=["*"], allow_headers=["*"],
|
||
)
|
||
|
||
MAX_SITE_AREA_HA = 100.0 # hard cap
|
||
|
||
LAND_USE_PRESETS = {
|
||
"residential_low": {"label": "Residential – Low Density", "min_frontage": 15, "min_depth": 30, "min_area": 450, "max_area": 2000, "color": "#58a6ff"},
|
||
"residential_medium": {"label": "Residential – Medium Density", "min_frontage": 10, "min_depth": 25, "min_area": 250, "max_area": 1000, "color": "#3fb950"},
|
||
"residential_high": {"label": "Residential – High Density", "min_frontage": 6, "min_depth": 20, "min_area": 120, "max_area": 500, "color": "#f0c84a"},
|
||
"commercial": {"label": "Commercial", "min_frontage": 12, "min_depth": 20, "min_area": 300, "max_area": 5000, "color": "#f97316"},
|
||
"mixed_use": {"label": "Mixed Use", "min_frontage": 8, "min_depth": 20, "min_area": 200, "max_area": 2000, "color": "#bc8cff"},
|
||
"industrial": {"label": "Industrial", "min_frontage": 20, "min_depth": 40, "min_area": 1000, "max_area": 20000, "color": "#f78166"},
|
||
"open_space": {"label": "Open Space / Park", "min_frontage": 30, "min_depth": 30, "min_area": 500, "max_area": 50000, "color": "#34d399"},
|
||
}
|
||
|
||
ROAD_TYPE_DEFAULTS = {
|
||
"primary": {"label": "Primary Road", "width": 20, "color": "#f78166"},
|
||
"secondary": {"label": "Secondary Road", "width": 14, "color": "#f0c84a"},
|
||
"local": {"label": "Local Road", "width": 9, "color": "#8b949e"},
|
||
"lane": {"label": "Lane / Alley", "width": 4, "color": "#58a6ff"},
|
||
}
|
||
|
||
# ─── Models ───────────────────────────────────────────────────────────────────
|
||
|
||
class RoadInput(BaseModel):
|
||
geometry: Dict[str, Any]
|
||
road_type: str = "local"
|
||
width: Optional[float] = None
|
||
label: Optional[str] = None
|
||
|
||
class SubdivisionConfig(BaseModel):
|
||
min_frontage: float = 10.0
|
||
min_plot_depth: float = 25.0
|
||
min_area: Optional[float] = None
|
||
max_area: Optional[float] = None
|
||
default_road_width: float = 9.0
|
||
splay_radius: float = 3.0
|
||
max_block_length: float = 120.0
|
||
allow_culdesac: bool = True
|
||
land_use: str = "residential_medium"
|
||
parcel_prefix: str = "P"
|
||
zone_prefix: str = "Z"
|
||
start_number: int = 1
|
||
|
||
def model_post_init(self, _):
|
||
preset = LAND_USE_PRESETS.get(self.land_use, LAND_USE_PRESETS["residential_medium"])
|
||
if self.min_area is None:
|
||
self.min_area = preset["min_area"]
|
||
if self.max_area is None:
|
||
self.max_area = preset["max_area"]
|
||
|
||
class SubdivisionRequest(BaseModel):
|
||
boundary: Dict[str, Any]
|
||
roads: Optional[List[RoadInput]] = []
|
||
existing_features: Optional[List[Dict[str, Any]]] = []
|
||
zones: Optional[List] = []
|
||
config: Optional[SubdivisionConfig] = None
|
||
session_id: Optional[str] = None
|
||
|
||
_progress: Dict[str, Dict] = {}
|
||
|
||
# ─── Geometry helpers ─────────────────────────────────────────────────────────
|
||
|
||
def valid(geom):
|
||
if geom is None or geom.is_empty: return geom
|
||
if not geom.is_valid: geom = make_valid(geom)
|
||
return geom
|
||
|
||
def extract_geometry(obj):
|
||
if obj is None: raise ValueError("Null geometry")
|
||
geo = obj.get("geometry", obj) if obj.get("type") == "Feature" else obj
|
||
if geo is None or not geo.get("type"):
|
||
raise ValueError(f"Cannot extract geometry from keys: {list(obj.keys())}")
|
||
return shape(geo)
|
||
|
||
def flatten_polygons(geom) -> List[Polygon]:
|
||
if geom is None or geom.is_empty: return []
|
||
if isinstance(geom, Polygon): return [geom]
|
||
if isinstance(geom, (MultiPolygon, GeometryCollection)):
|
||
out = []
|
||
for g in geom.geoms: out.extend(flatten_polygons(g))
|
||
return out
|
||
return []
|
||
|
||
def obb(polygon: Polygon) -> Tuple[float, float, float]:
|
||
"""Minimum rotated rectangle → (short_side, long_side, angle_deg_of_long_axis)."""
|
||
mbr = polygon.minimum_rotated_rectangle
|
||
coords = list(mbr.exterior.coords)
|
||
e1 = math.hypot(coords[1][0]-coords[0][0], coords[1][1]-coords[0][1])
|
||
e2 = math.hypot(coords[2][0]-coords[1][0], coords[2][1]-coords[1][1])
|
||
if e1 >= e2:
|
||
ang = math.degrees(math.atan2(coords[1][1]-coords[0][1], coords[1][0]-coords[0][0]))
|
||
return e2, e1, ang
|
||
else:
|
||
ang = math.degrees(math.atan2(coords[2][1]-coords[1][1], coords[2][0]-coords[1][0]))
|
||
return e1, e2, ang
|
||
|
||
def dominant_angle(boundary: Polygon) -> float:
|
||
"""Return the angle (degrees) of the longest edge of the boundary's OBB.
|
||
Auto-roads are generated parallel/perpendicular to this axis."""
|
||
_, _, ang = obb(boundary)
|
||
# Normalise to [0, 90) so roads run in sensible directions
|
||
ang = ang % 180
|
||
if ang > 90:
|
||
ang -= 90
|
||
return ang
|
||
|
||
def estimate_scale(boundary: Polygon) -> float:
|
||
lat = boundary.centroid.y
|
||
return (111_320.0 + 111_320.0 * math.cos(math.radians(lat))) / 2.0
|
||
|
||
def deg(metres: float, sc: float) -> float:
|
||
return metres / sc
|
||
|
||
def buffer_line(line, width_deg: float) -> Polygon:
|
||
return line.buffer(width_deg / 2, cap_style=2, join_style=2)
|
||
|
||
def make_rotated_line(cx, cy, angle_deg, length) -> LineString:
|
||
"""Line of given length centred at (cx, cy) at angle_deg from E axis."""
|
||
rad = math.radians(angle_deg)
|
||
dx = math.cos(rad) * length / 2
|
||
dy = math.sin(rad) * length / 2
|
||
return LineString([(cx - dx, cy - dy), (cx + dx, cy + dy)])
|
||
|
||
# ─── Road network ─────────────────────────────────────────────────────────────
|
||
|
||
def build_road_network(
|
||
boundary: Polygon,
|
||
road_inputs: List[RoadInput],
|
||
config: SubdivisionConfig,
|
||
sc: float
|
||
) -> Tuple[List[Dict], Polygon]:
|
||
"""
|
||
Returns (road_feature_dicts, road_union_polygon).
|
||
Auto-roads are aligned to the boundary's dominant OBB axis.
|
||
No perimeter ring is added.
|
||
"""
|
||
road_features = []
|
||
road_polys: List[Polygon] = []
|
||
default_rw = deg(config.default_road_width, sc)
|
||
|
||
# ── 1. User-drawn roads ───────────────────────────────────────────────────
|
||
for ri in road_inputs:
|
||
try:
|
||
line = extract_geometry(ri.geometry if isinstance(ri, RoadInput) else ri)
|
||
except Exception:
|
||
continue
|
||
if not isinstance(line, (LineString, MultiLineString)):
|
||
continue
|
||
lines = list(line.geoms) if isinstance(line, MultiLineString) else [line]
|
||
rtype = ri.road_type if isinstance(ri, RoadInput) else "local"
|
||
rwidth_m = (ri.width if isinstance(ri, RoadInput) and ri.width
|
||
else ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["width"])
|
||
rw = deg(rwidth_m, sc)
|
||
color = ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["color"]
|
||
for lp in lines:
|
||
cl = valid(lp.intersection(boundary))
|
||
if cl and not cl.is_empty:
|
||
rp = valid(buffer_line(cl, rw).intersection(boundary))
|
||
if rp and not rp.is_empty:
|
||
road_polys.append(rp)
|
||
road_features.append({
|
||
"type": "Feature", "geometry": mapping(rp),
|
||
"properties": {
|
||
"type": "road_surface", "road_type": rtype,
|
||
"width_m": rwidth_m,
|
||
"label": (ri.label if isinstance(ri, RoadInput) else None)
|
||
or f"{rtype.capitalize()} Road",
|
||
"color": color,
|
||
}
|
||
})
|
||
|
||
# ── 2. Auto-fill grid roads aligned to boundary's dominant axis ───────────
|
||
existing_union = valid(unary_union(road_polys)) if road_polys else Polygon()
|
||
bl = deg(config.max_block_length, sc)
|
||
rw = default_rw
|
||
step = bl + rw
|
||
|
||
# Dominant angle: roads run at `axis_ang` and `axis_ang + 90`
|
||
axis_ang = dominant_angle(boundary)
|
||
|
||
cx_bnd, cy_bnd = boundary.centroid.x, boundary.centroid.y
|
||
# Diagonal of bounding box — long enough to cross the entire boundary
|
||
bminx, bminy, bmaxx, bmaxy = boundary.bounds
|
||
diag = math.hypot(bmaxx - bminx, bmaxy - bminy) * 1.5
|
||
|
||
def try_add_auto(base_line: LineString):
|
||
cl = valid(base_line.intersection(boundary))
|
||
if not cl or cl.is_empty or cl.length < deg(config.min_frontage * 2, sc): return
|
||
rp = valid(buffer_line(cl, rw).intersection(boundary))
|
||
if not rp or rp.is_empty: return
|
||
if not existing_union.is_empty and rp.intersection(existing_union).area / rp.area > 0.6: return
|
||
road_polys.append(rp)
|
||
road_features.append({
|
||
"type": "Feature", "geometry": mapping(rp),
|
||
"properties": {
|
||
"type": "road_surface", "road_type": "local",
|
||
"width_m": config.default_road_width,
|
||
"label": "Local Road (auto)",
|
||
"color": ROAD_TYPE_DEFAULTS["local"]["color"],
|
||
}
|
||
})
|
||
|
||
def sweep_roads(sweep_angle: float):
|
||
"""
|
||
Generate parallel roads perpendicular to sweep_angle,
|
||
spaced by `step`, centred on the boundary centroid.
|
||
sweep_angle is the direction we step across (perp to road direction).
|
||
"""
|
||
perp_rad = math.radians(sweep_angle)
|
||
# Unit vector across which we step
|
||
ux, uy = math.cos(perp_rad), math.sin(perp_rad)
|
||
# Project all boundary vertices onto this axis to find extent
|
||
bcoords = list(boundary.exterior.coords)
|
||
projs = [x*ux + y*uy for x, y in bcoords]
|
||
p_min, p_max = min(projs), max(projs)
|
||
|
||
positions = []
|
||
pos = p_min + step
|
||
while pos < p_max - rw:
|
||
positions.append(pos)
|
||
pos += step
|
||
|
||
if not positions:
|
||
# Site fits in one block: add one road through the centre if deep enough
|
||
extent = p_max - p_min
|
||
if extent > deg(config.min_plot_depth * 2 + config.default_road_width, sc):
|
||
positions = [(p_min + p_max) / 2]
|
||
|
||
for pos in positions:
|
||
# Centre of this road line in geographic coordinates
|
||
lx = cx_bnd + (pos - (cx_bnd*ux + cy_bnd*uy)) * ux
|
||
ly = cy_bnd + (pos - (cx_bnd*ux + cy_bnd*uy)) * uy
|
||
road_line = make_rotated_line(lx, ly, sweep_angle + 90, diag)
|
||
try_add_auto(road_line)
|
||
|
||
# Roads parallel to long axis (sweep perpendicular to long axis = axis_ang + 90)
|
||
sweep_roads(axis_ang + 90)
|
||
# Roads perpendicular to long axis (sweep along long axis = axis_ang)
|
||
sweep_roads(axis_ang)
|
||
|
||
road_union = valid(unary_union(road_polys)) if road_polys else Polygon()
|
||
return road_features, road_union
|
||
|
||
# ─── Cul-de-sac ───────────────────────────────────────────────────────────────
|
||
|
||
def block_has_dead_end(block: Polygon, road_union: Polygon, sc: float) -> bool:
|
||
"""
|
||
True if the block's 'far end' (relative to its long axis) is NOT touching
|
||
any road — indicating a dead end that could benefit from a cul-de-sac.
|
||
We test both terminal thirds of the block; if either misses road_union, it's a dead end.
|
||
"""
|
||
if road_union is None or road_union.is_empty:
|
||
return False
|
||
_, _, angle = obb(block)
|
||
centroid = block.centroid
|
||
rotated = valid(rotate(block, -angle, origin=centroid))
|
||
bx0, by0, bx1, by1 = rotated.bounds
|
||
bh = by1 - by0
|
||
tol = deg(1.0, sc)
|
||
|
||
# Bottom third in rotated space
|
||
bottom_strip = valid(rotate(
|
||
Polygon([(bx0, by0), (bx1, by0), (bx1, by0 + bh*0.35), (bx0, by0 + bh*0.35)]),
|
||
angle, origin=centroid
|
||
))
|
||
# Top third
|
||
top_strip = valid(rotate(
|
||
Polygon([(bx0, by1 - bh*0.35), (bx1, by1 - bh*0.35), (bx1, by1), (bx0, by1)]),
|
||
angle, origin=centroid
|
||
))
|
||
|
||
bottom_ok = bottom_strip and bottom_strip.buffer(tol).intersects(road_union)
|
||
top_ok = top_strip and top_strip.buffer(tol).intersects(road_union)
|
||
return not bottom_ok or not top_ok
|
||
|
||
def generate_culdesacs(blocks, road_union, config, sc):
|
||
"""
|
||
Only attach a cul-de-sac to a block when:
|
||
1. The block exceeds max_block_length
|
||
2. At least one end of the block has no road access (genuine dead end)
|
||
3. The resulting geometry is valid and leaves enough room for parcels
|
||
"""
|
||
cds_polys, new_blocks = [], []
|
||
bl = deg(config.max_block_length, sc)
|
||
|
||
for block in blocks:
|
||
_, h, _ = obb(block)
|
||
if h > bl and config.allow_culdesac and block_has_dead_end(block, road_union, sc):
|
||
cds, trimmed = _try_culdesac(block, config, sc)
|
||
if cds and trimmed and not trimmed.is_empty:
|
||
cds_polys.append(cds)
|
||
new_blocks.append(trimmed)
|
||
continue
|
||
new_blocks.append(block)
|
||
|
||
return cds_polys, new_blocks
|
||
|
||
def _try_culdesac(block: Polygon, config: SubdivisionConfig, sc: float):
|
||
"""
|
||
Insert a cul-de-sac at the dead end of the block, aligned to the block's OBB.
|
||
Radius = road_width (not 1.8×); stub width = road_width.
|
||
Verifies the trimmed block can still hold at least two minimum parcels.
|
||
"""
|
||
rw_deg = deg(config.default_road_width, sc)
|
||
radius_deg = rw_deg # 1× road width, not 1.8×
|
||
min_a_deg2 = config.min_area / sc**2
|
||
|
||
_, _, angle = obb(block)
|
||
centroid = block.centroid
|
||
rotated = valid(rotate(block, -angle, origin=centroid))
|
||
bx0, by0, bx1, by1 = rotated.bounds
|
||
bw, bh = bx1 - bx0, by1 - by0
|
||
bcx = (bx0 + bx1) / 2
|
||
|
||
# Stub runs from the bottom edge upward, centred on block
|
||
stub_len = min(deg(config.max_block_length * 0.3, sc), bh * 0.4)
|
||
stub_rot = LineString([(bcx, by0), (bcx, by0 + stub_len)])
|
||
end_y = by0 + stub_len
|
||
|
||
stub_buf = valid(buffer_line(stub_rot, rw_deg))
|
||
circle = Point(bcx, end_y).buffer(radius_deg)
|
||
cds_rot = valid(unary_union([stub_buf, circle]).intersection(rotated))
|
||
trim_rot = valid(rotated.difference(cds_rot))
|
||
|
||
if trim_rot.is_empty:
|
||
return None, None
|
||
|
||
# Rotate back
|
||
cds_geom = valid(rotate(cds_rot, angle, origin=centroid))
|
||
trim_geom = valid(rotate(trim_rot, angle, origin=centroid))
|
||
|
||
# Must leave room for at least 2 minimum parcels
|
||
if trim_geom.area < min_a_deg2 * 2:
|
||
return None, None
|
||
|
||
# Must not have the cds overlapping too much of remaining block
|
||
if not cds_geom.is_empty and cds_geom.area > block.area * 0.4:
|
||
return None, None
|
||
|
||
return cds_geom, trim_geom
|
||
|
||
# ─── Block subdivider ─────────────────────────────────────────────────────────
|
||
|
||
def subdivide_block(block, config, road_union, block_id, start_num, sc,
|
||
zone_label=None, zone_color=None):
|
||
"""
|
||
Subdivide a block into parcels oriented along the block's own OBB axis.
|
||
Frontage faces the nearest road edge.
|
||
"""
|
||
min_f = deg(config.min_frontage, sc)
|
||
min_d = deg(config.min_plot_depth, sc)
|
||
min_a = config.min_area / sc**2
|
||
max_a = (config.max_area or 1e18) / sc**2
|
||
|
||
_, _, angle = obb(block)
|
||
centroid = block.centroid
|
||
rotated = valid(rotate(block, -angle, origin=centroid))
|
||
rminx, rminy, rmaxx, rmaxy = rotated.bounds
|
||
bw = rmaxx - rminx
|
||
bh = rmaxy - rminy
|
||
|
||
num = start_num
|
||
parcels = []
|
||
|
||
def emit(x0, y0, x1, y1):
|
||
nonlocal num
|
||
cell = Polygon([(x0,y0),(x1,y0),(x1,y1),(x0,y1)])
|
||
p = valid(cell.intersection(rotated))
|
||
if p is None or p.is_empty or p.area < min_a * 0.45: return
|
||
p = valid(rotate(p, angle, origin=centroid))
|
||
feats = _make_parcel(p, config, block_id, num, sc, road_union,
|
||
zone_label, zone_color)
|
||
parcels.extend(feats)
|
||
num += 1
|
||
|
||
# Decide orientation based on block shape:
|
||
# Wide block (bw >= bh): parcels stacked in columns along X, fronting Y roads
|
||
# Tall block (bh > bw): parcels stacked in rows along Y, fronting X roads
|
||
if bw >= bh:
|
||
n_cols = max(1, round(bw / min_f))
|
||
col_w = bw / n_cols
|
||
# Double-bank if block deep enough for two rows of parcels
|
||
if bh >= min_d * 2.2:
|
||
rows = [(rminy, rminy + bh/2), (rminy + bh/2, rmaxy)]
|
||
else:
|
||
rows = [(rminy, rmaxy)]
|
||
for y0, y1 in rows:
|
||
for c in range(n_cols):
|
||
x0 = rminx + c * col_w
|
||
emit(x0, y0, x0 + col_w, y1)
|
||
else:
|
||
n_rows = max(1, round(bh / min_f))
|
||
row_h = bh / n_rows
|
||
if bw >= min_d * 2.2:
|
||
cols = [(rminx, rminx + bw/2), (rminx + bw/2, rmaxx)]
|
||
else:
|
||
cols = [(rminx, rmaxx)]
|
||
for x0, x1 in cols:
|
||
for r in range(n_rows):
|
||
y0 = rminy + r * row_h
|
||
emit(x0, y0, x1, y0 + row_h)
|
||
|
||
return parcels
|
||
|
||
def _make_parcel(geom, config, block_id, num, sc, road_union,
|
||
zone_label, zone_color):
|
||
out = []
|
||
tol = deg(0.5, sc)
|
||
preset = LAND_USE_PRESETS.get(config.land_use, LAND_USE_PRESETS["residential_medium"])
|
||
parts = flatten_polygons(geom) if not isinstance(geom, Polygon) else [geom]
|
||
|
||
for g in parts:
|
||
if g.is_empty: continue
|
||
area_m2 = round(g.area * sc**2, 1)
|
||
if area_m2 < config.min_area * 0.4: continue
|
||
pw, ph, _ = obb(g)
|
||
frontage_m = round(min(pw, ph) * sc, 1)
|
||
plot_depth_m = round(max(pw, ph) * sc, 1)
|
||
has_access = (road_union is not None and not road_union.is_empty
|
||
and g.buffer(tol).intersects(road_union))
|
||
max_a = config.max_area or 1e18
|
||
out.append({
|
||
"type": "Feature",
|
||
"geometry": mapping(g),
|
||
"properties": {
|
||
"parcel_id": f"{config.parcel_prefix}{num:04d}",
|
||
"parcel_num": num,
|
||
"block_id": block_id,
|
||
"area_m2": area_m2,
|
||
"area_ha": round(area_m2 / 10000, 4),
|
||
"frontage_m": frontage_m,
|
||
"plot_depth_m": plot_depth_m,
|
||
"address": f"Block {block_id}, Plot {num}",
|
||
"land_use": config.land_use,
|
||
"land_use_label": zone_label or preset["label"],
|
||
"zone_color": zone_color or preset["color"],
|
||
"zone": zone_label or preset["label"],
|
||
"status": "vacant",
|
||
"has_access": has_access,
|
||
"frontage_ok": frontage_m >= config.min_frontage * 0.85,
|
||
"area_ok": config.min_area * 0.8 <= area_m2 <= max_a * 1.2,
|
||
"within_max_area": area_m2 <= max_a * 1.1,
|
||
}
|
||
})
|
||
return out
|
||
|
||
# ─── Quality passes ───────────────────────────────────────────────────────────
|
||
|
||
def absorb_bad_parcels(parcels, config, sc):
|
||
tol = deg(0.5, sc)
|
||
changed = True
|
||
while changed:
|
||
changed = False
|
||
geoms = [shape(p["geometry"]) for p in parcels]
|
||
for i, p in enumerate(parcels):
|
||
g = geoms[i]
|
||
am2 = g.area * sc**2
|
||
w, h, _ = obb(g)
|
||
compact = 4 * math.pi * g.area / (g.length**2) if g.length else 0
|
||
ok = (am2 >= config.min_area * 0.8
|
||
and compact >= 0.18
|
||
and (h == 0 or w / h >= 0.12))
|
||
if ok: continue
|
||
best_j, best_touch = -1, 0.0
|
||
for j, q in enumerate(parcels):
|
||
if j == i: continue
|
||
if p["properties"]["block_id"] != q["properties"]["block_id"]: continue
|
||
touch = g.buffer(tol).intersection(geoms[j]).area
|
||
if touch > best_touch: best_touch, best_j = touch, j
|
||
if best_j >= 0 and best_touch > 0:
|
||
merged = valid(g.union(geoms[best_j]))
|
||
if isinstance(merged, MultiPolygon):
|
||
merged = max(merged.geoms, key=lambda x: x.area)
|
||
am2_new = round(merged.area * sc**2, 1)
|
||
pw, ph, _ = obb(merged)
|
||
acc = (p["properties"].get("has_access")
|
||
or parcels[best_j]["properties"].get("has_access"))
|
||
parcels[best_j]["geometry"] = mapping(merged)
|
||
parcels[best_j]["properties"].update({
|
||
"area_m2": am2_new,
|
||
"frontage_m": round(min(pw,ph)*sc, 1),
|
||
"plot_depth_m": round(max(pw,ph)*sc, 1),
|
||
"has_access": acc,
|
||
"area_ok": config.min_area*0.8 <= am2_new <= (config.max_area or 1e18)*1.2,
|
||
})
|
||
geoms[best_j] = merged; parcels.pop(i); changed = True; break
|
||
return parcels
|
||
|
||
def enforce_access(parcels, road_union, config, sc):
|
||
if not road_union or road_union.is_empty: return parcels
|
||
tol = deg(1.0, sc)
|
||
changed = True
|
||
while changed:
|
||
changed = False
|
||
geoms = [shape(p["geometry"]) for p in parcels]
|
||
for i, p in enumerate(parcels):
|
||
if p["properties"].get("has_access"): continue
|
||
g = geoms[i]; best_j, best_sh = -1, 0.0
|
||
for j, q in enumerate(parcels):
|
||
if j == i: continue
|
||
if p["properties"]["block_id"] != q["properties"]["block_id"]: continue
|
||
if not q["properties"].get("has_access"): continue
|
||
sh = g.buffer(tol).intersection(geoms[j]).area
|
||
if sh > best_sh: best_sh, best_j = sh, j
|
||
if best_j >= 0:
|
||
merged = valid(g.union(geoms[best_j]))
|
||
if isinstance(merged, MultiPolygon):
|
||
merged = max(merged.geoms, key=lambda x: x.area)
|
||
am2 = round(merged.area * sc**2, 1)
|
||
pw, ph, _ = obb(merged)
|
||
parcels[best_j]["geometry"] = mapping(merged)
|
||
parcels[best_j]["properties"].update({
|
||
"area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1),
|
||
"plot_depth_m": round(max(pw,ph)*sc,1), "has_access": True,
|
||
"area_ok": config.min_area*0.8 <= am2 <= (config.max_area or 1e18)*1.2,
|
||
})
|
||
geoms[best_j] = merged; parcels.pop(i); changed = True; break
|
||
# Final recheck
|
||
for p in parcels:
|
||
g = shape(p["geometry"])
|
||
p["properties"]["has_access"] = (not road_union.is_empty
|
||
and g.buffer(tol).intersects(road_union))
|
||
return parcels
|
||
|
||
def apply_existing_buildings(building_geoms, parcels, sc, config):
|
||
"""
|
||
For each building footprint:
|
||
1. Find all parcels that intersect it.
|
||
2. Merge them until one parcel contains the building.
|
||
3. If the merged parcel is still smaller than the building's bounding box
|
||
(with a 2 m margin), expand it by merging with neighbouring parcels.
|
||
4. Mark the parcel status='built'.
|
||
"""
|
||
if not building_geoms or not parcels: return parcels
|
||
tol = deg(0.5, sc)
|
||
margin = deg(2.0, sc)
|
||
|
||
for bldg in building_geoms:
|
||
bldg_buf = bldg.buffer(tol)
|
||
hit = [i for i, p in enumerate(parcels)
|
||
if shape(p["geometry"]).intersects(bldg_buf)]
|
||
if not hit: continue
|
||
|
||
# Merge all hit parcels
|
||
while len(hit) > 1:
|
||
i, j = hit[0], hit[1]
|
||
gi = shape(parcels[i]["geometry"])
|
||
gj = shape(parcels[j]["geometry"])
|
||
merged = valid(gi.union(gj))
|
||
if isinstance(merged, MultiPolygon):
|
||
merged = max(merged.geoms, key=lambda x: x.area)
|
||
am2 = round(merged.area * sc**2, 1)
|
||
pw, ph, _ = obb(merged)
|
||
parcels[i]["geometry"] = mapping(merged)
|
||
parcels[i]["properties"].update({
|
||
"area_m2": am2,
|
||
"frontage_m": round(min(pw,ph)*sc, 1),
|
||
"plot_depth_m": round(max(pw,ph)*sc, 1),
|
||
"has_access": (parcels[i]["properties"].get("has_access")
|
||
or parcels[j]["properties"].get("has_access")),
|
||
})
|
||
parcels.pop(j)
|
||
hit = [hit[0]] + [k-(1 if k>j else 0) for k in hit[2:]]
|
||
|
||
# Check the merged parcel truly contains the building (with margin)
|
||
target_idx = hit[0]
|
||
parcel_geom = shape(parcels[target_idx]["geometry"])
|
||
|
||
# If building does not fit, expand into adjacent parcels
|
||
expansion_attempts = 0
|
||
while not parcel_geom.buffer(margin).contains(bldg) and expansion_attempts < 6:
|
||
expansion_attempts += 1
|
||
geoms_all = [shape(p["geometry"]) for p in parcels]
|
||
bid = parcels[target_idx]["properties"]["block_id"]
|
||
# Find touching neighbour (same block preferred, any block if not found)
|
||
best_nb, best_touch = -1, 0.0
|
||
for k, q in enumerate(parcels):
|
||
if k == target_idx: continue
|
||
touch = parcel_geom.buffer(tol).intersection(geoms_all[k]).area
|
||
if touch > best_touch:
|
||
best_touch, best_nb = touch, k
|
||
if best_nb < 0 or best_touch == 0:
|
||
break
|
||
merged = valid(parcel_geom.union(geoms_all[best_nb]))
|
||
if isinstance(merged, MultiPolygon):
|
||
merged = max(merged.geoms, key=lambda x: x.area)
|
||
am2 = round(merged.area * sc**2, 1)
|
||
pw, ph, _ = obb(merged)
|
||
parcels[target_idx]["geometry"] = mapping(merged)
|
||
parcels[target_idx]["properties"].update({
|
||
"area_m2": am2,
|
||
"frontage_m": round(min(pw,ph)*sc, 1),
|
||
"plot_depth_m": round(max(pw,ph)*sc, 1),
|
||
"has_access": (parcels[target_idx]["properties"].get("has_access")
|
||
or parcels[best_nb]["properties"].get("has_access")),
|
||
})
|
||
parcels.pop(best_nb)
|
||
if best_nb < target_idx:
|
||
target_idx -= 1
|
||
parcel_geom = shape(parcels[target_idx]["geometry"])
|
||
|
||
parcels[target_idx]["properties"]["status"] = "built"
|
||
parcels[target_idx]["properties"]["building_area_m2"] = round(bldg.area * sc**2, 1)
|
||
parcels[target_idx]["properties"]["building_fits"] = parcel_geom.buffer(margin).contains(bldg)
|
||
|
||
return parcels
|
||
|
||
# ─── Validation ───────────────────────────────────────────────────────────────
|
||
|
||
def validate_config(boundary: Polygon, config: SubdivisionConfig, sc: float):
|
||
site_area = boundary.area * sc**2
|
||
warnings = []
|
||
|
||
if site_area > MAX_SITE_AREA_HA * 10_000:
|
||
raise HTTPException(
|
||
400,
|
||
f"Site area ({site_area/10000:.1f} ha) exceeds the maximum allowed "
|
||
f"planning area of {MAX_SITE_AREA_HA} ha. Please split the site into smaller sections."
|
||
)
|
||
if config.min_area and config.min_area > site_area * 0.5:
|
||
warnings.append(
|
||
f"min_area ({config.min_area} m²) is more than half the site area "
|
||
f"({site_area:.0f} m²) — very few parcels will fit"
|
||
)
|
||
if config.max_area and config.max_area < config.min_area:
|
||
warnings.append("max_area is less than min_area")
|
||
if config.min_frontage > config.min_plot_depth:
|
||
warnings.append("min_frontage exceeds min_plot_depth — parcels will be wider than deep")
|
||
if config.default_road_width >= config.min_frontage * 2:
|
||
warnings.append("Road width is very large relative to parcel frontage")
|
||
return warnings
|
||
|
||
# ─── Main endpoint ────────────────────────────────────────────────────────────
|
||
|
||
@app.post("/subdivide")
|
||
async def subdivide(request: SubdivisionRequest):
|
||
session_id = request.session_id or str(uuid.uuid4())
|
||
_progress[session_id] = {"pct": 0, "msg": "Starting…"}
|
||
|
||
def prog(pct, msg):
|
||
_progress[session_id] = {"pct": pct, "msg": msg}
|
||
|
||
try:
|
||
config = request.config or SubdivisionConfig()
|
||
prog(5, "Parsing boundary…")
|
||
|
||
boundary = valid(extract_geometry(request.boundary))
|
||
if isinstance(boundary, MultiPolygon):
|
||
boundary = max(boundary.geoms, key=lambda g: g.area)
|
||
if not isinstance(boundary, Polygon):
|
||
raise HTTPException(400, "Boundary must be a polygon")
|
||
|
||
sc = estimate_scale(boundary)
|
||
warnings = validate_config(boundary, config, sc) # may raise 400
|
||
|
||
prog(10, "Parsing roads and buildings…")
|
||
|
||
road_inputs: List[RoadInput] = []
|
||
for r in (request.roads or []):
|
||
if isinstance(r, dict):
|
||
try:
|
||
road_inputs.append(RoadInput(**r))
|
||
except Exception:
|
||
try: road_inputs.append(RoadInput(geometry=r))
|
||
except Exception: pass
|
||
else:
|
||
road_inputs.append(r)
|
||
|
||
building_geoms = []
|
||
for f in (request.existing_features or []):
|
||
try:
|
||
g = valid(extract_geometry(f).intersection(boundary))
|
||
building_geoms.extend(flatten_polygons(g))
|
||
except Exception: pass
|
||
|
||
prog(20, "Building road network (axis-aligned)…")
|
||
road_features, road_union = build_road_network(boundary, road_inputs, config, sc)
|
||
|
||
prog(30, "Computing buildable area…")
|
||
obstacles = road_union
|
||
if building_geoms:
|
||
bldg_union = valid(unary_union(building_geoms))
|
||
obstacles = valid(unary_union([road_union, bldg_union])) if not road_union.is_empty else bldg_union
|
||
buildable = valid(boundary.difference(obstacles)) if not obstacles.is_empty else boundary
|
||
blocks = [b for b in flatten_polygons(buildable)
|
||
if b.area * sc**2 > config.min_area]
|
||
|
||
prog(40, "Generating cul-de-sacs (where needed)…")
|
||
cds_polys = []
|
||
if config.allow_culdesac and blocks:
|
||
cds_polys, blocks = generate_culdesacs(blocks, road_union, config, sc)
|
||
if cds_polys:
|
||
road_union = valid(unary_union([road_union] + cds_polys))
|
||
|
||
prog(55, "Subdividing blocks…")
|
||
all_parcels: List[Dict] = []
|
||
num = config.start_number
|
||
for bid, block in enumerate(blocks, 1):
|
||
bp = subdivide_block(block, config, road_union, bid, num, sc)
|
||
all_parcels.extend(bp)
|
||
num += len(bp)
|
||
prog(55 + int(20 * bid / max(len(blocks), 1)),
|
||
f"Block {bid}/{len(blocks)}…")
|
||
|
||
prog(78, "Shape quality pass…")
|
||
all_parcels = absorb_bad_parcels(all_parcels, config, sc)
|
||
|
||
prog(86, "Access enforcement…")
|
||
all_parcels = enforce_access(all_parcels, road_union, config, sc)
|
||
|
||
prog(92, "Fitting existing buildings into parcels…")
|
||
if building_geoms:
|
||
all_parcels = apply_existing_buildings(building_geoms, all_parcels, sc, config)
|
||
|
||
prog(96, "Numbering…")
|
||
for i, p in enumerate(all_parcels, config.start_number):
|
||
p["properties"]["parcel_num"] = i
|
||
p["properties"]["parcel_id"] = f"{config.parcel_prefix}{i:04d}"
|
||
|
||
block_features = [
|
||
{"type":"Feature","geometry":mapping(b),
|
||
"properties":{"block_id":i+1,"area_m2":round(b.area*sc**2,1)}}
|
||
for i, b in enumerate(blocks)
|
||
]
|
||
cds_features = [
|
||
{"type":"Feature","geometry":mapping(c),
|
||
"properties":{"type":"culdesac","id":f"CDS{i+1:03d}"}}
|
||
for i, c in enumerate(cds_polys)
|
||
]
|
||
|
||
areas = [p["properties"]["area_m2"] for p in all_parcels]
|
||
no_acc = sum(1 for p in all_parcels if not p["properties"].get("has_access"))
|
||
built = sum(1 for p in all_parcels if p["properties"].get("status") == "built")
|
||
|
||
stats = {
|
||
"total_parcels": len(all_parcels), "total_blocks": len(blocks),
|
||
"total_roads": len(road_features), "culdesacs": len(cds_polys),
|
||
"parcels_no_access": no_acc, "parcels_built": built,
|
||
"parcels_vacant": len(all_parcels) - built,
|
||
"boundary_area_m2": round(boundary.area * sc**2, 1),
|
||
"road_area_m2": round(road_union.area * sc**2, 1) if not road_union.is_empty else 0,
|
||
"buildable_area_m2": round(sum(b.area*sc**2 for b in blocks), 1),
|
||
"avg_parcel_area_m2": round(sum(areas)/len(areas), 1) if areas else 0,
|
||
"min_parcel_area_m2": round(min(areas), 1) if areas else 0,
|
||
"max_parcel_area_m2": round(max(areas), 1) if areas else 0,
|
||
"existing_buildings": len(building_geoms),
|
||
"user_roads_drawn": len(road_inputs),
|
||
"warnings": warnings,
|
||
"land_use": config.land_use,
|
||
"land_use_label": LAND_USE_PRESETS.get(config.land_use, {}).get("label", config.land_use),
|
||
"dominant_axis_deg": round(dominant_angle(boundary), 1),
|
||
"config": {
|
||
"min_frontage": config.min_frontage, "min_plot_depth": config.min_plot_depth,
|
||
"min_area": config.min_area, "max_area": config.max_area,
|
||
"default_road_width": config.default_road_width,
|
||
"splay_radius": config.splay_radius,
|
||
"max_block_length": config.max_block_length,
|
||
}
|
||
}
|
||
|
||
prog(100, "Done")
|
||
return {
|
||
"parcels": all_parcels, "roads": road_features,
|
||
"blocks": block_features, "culdesacs": cds_features,
|
||
"stats": stats, "session_id": session_id,
|
||
}
|
||
|
||
except HTTPException: raise
|
||
except Exception as e:
|
||
prog(-1, f"Error: {e}")
|
||
raise HTTPException(500, f"{e}\n{traceback.format_exc()}")
|
||
finally:
|
||
async def _cleanup():
|
||
await asyncio.sleep(10)
|
||
_progress.pop(session_id, None)
|
||
asyncio.create_task(_cleanup())
|
||
|
||
|
||
@app.get("/progress/{session_id}")
|
||
async def get_progress(session_id: str):
|
||
return _progress.get(session_id, {"pct": -1, "msg": "Not found"})
|
||
|
||
@app.get("/land-use-presets")
|
||
async def land_use_presets_ep():
|
||
return LAND_USE_PRESETS
|
||
|
||
@app.get("/road-type-defaults")
|
||
async def road_type_defaults_ep():
|
||
return ROAD_TYPE_DEFAULTS
|
||
|
||
@app.get("/health")
|
||
async def health():
|
||
return {"status": "ok", "version": "5.0.0", "max_site_ha": MAX_SITE_AREA_HA}
|
||
|
||
@app.get("/config/defaults")
|
||
async def get_defaults():
|
||
return SubdivisionConfig().model_dump() |