2026-04-30 20:46:07 +03:00

824 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
ParcelGen Backend v5
Fixes:
- Auto-roads follow the polygon's dominant (OBB) axis — not always N-S/E-W
- Cul-de-sacs: only on genuinely inaccessible block ends; correct size; no parcel overlap
- No perimeter access ring (removed); access via user/auto roads only
- Hard 100 ha site-area limit
- Building footprint forces parcel to be large enough to contain it
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional, Dict, Any, Tuple
import math, uuid, traceback, asyncio
from shapely.geometry import (
Polygon, MultiPolygon, LineString, MultiLineString,
Point, GeometryCollection, mapping, shape
)
from shapely.ops import unary_union
from shapely.affinity import rotate, translate
from shapely.validation import make_valid
app = FastAPI(title="ParcelGen", version="5.0.0")
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"],
)
MAX_SITE_AREA_HA = 100.0 # hard cap
LAND_USE_PRESETS = {
"residential_low": {"label": "Residential Low Density", "min_frontage": 15, "min_depth": 30, "min_area": 450, "max_area": 2000, "color": "#58a6ff"},
"residential_medium": {"label": "Residential Medium Density", "min_frontage": 10, "min_depth": 25, "min_area": 250, "max_area": 1000, "color": "#3fb950"},
"residential_high": {"label": "Residential High Density", "min_frontage": 6, "min_depth": 20, "min_area": 120, "max_area": 500, "color": "#f0c84a"},
"commercial": {"label": "Commercial", "min_frontage": 12, "min_depth": 20, "min_area": 300, "max_area": 5000, "color": "#f97316"},
"mixed_use": {"label": "Mixed Use", "min_frontage": 8, "min_depth": 20, "min_area": 200, "max_area": 2000, "color": "#bc8cff"},
"industrial": {"label": "Industrial", "min_frontage": 20, "min_depth": 40, "min_area": 1000, "max_area": 20000, "color": "#f78166"},
"open_space": {"label": "Open Space / Park", "min_frontage": 30, "min_depth": 30, "min_area": 500, "max_area": 50000, "color": "#34d399"},
}
ROAD_TYPE_DEFAULTS = {
"primary": {"label": "Primary Road", "width": 20, "color": "#f78166"},
"secondary": {"label": "Secondary Road", "width": 14, "color": "#f0c84a"},
"local": {"label": "Local Road", "width": 9, "color": "#8b949e"},
"lane": {"label": "Lane / Alley", "width": 4, "color": "#58a6ff"},
}
# ─── Models ───────────────────────────────────────────────────────────────────
class RoadInput(BaseModel):
geometry: Dict[str, Any]
road_type: str = "local"
width: Optional[float] = None
label: Optional[str] = None
class SubdivisionConfig(BaseModel):
min_frontage: float = 10.0
min_plot_depth: float = 25.0
min_area: Optional[float] = None
max_area: Optional[float] = None
default_road_width: float = 9.0
splay_radius: float = 3.0
max_block_length: float = 120.0
allow_culdesac: bool = True
land_use: str = "residential_medium"
parcel_prefix: str = "P"
zone_prefix: str = "Z"
start_number: int = 1
def model_post_init(self, _):
preset = LAND_USE_PRESETS.get(self.land_use, LAND_USE_PRESETS["residential_medium"])
if self.min_area is None:
self.min_area = preset["min_area"]
if self.max_area is None:
self.max_area = preset["max_area"]
class SubdivisionRequest(BaseModel):
boundary: Dict[str, Any]
roads: Optional[List[RoadInput]] = []
existing_features: Optional[List[Dict[str, Any]]] = []
zones: Optional[List] = []
config: Optional[SubdivisionConfig] = None
session_id: Optional[str] = None
_progress: Dict[str, Dict] = {}
# ─── Geometry helpers ─────────────────────────────────────────────────────────
def valid(geom):
if geom is None or geom.is_empty: return geom
if not geom.is_valid: geom = make_valid(geom)
return geom
def extract_geometry(obj):
if obj is None: raise ValueError("Null geometry")
geo = obj.get("geometry", obj) if obj.get("type") == "Feature" else obj
if geo is None or not geo.get("type"):
raise ValueError(f"Cannot extract geometry from keys: {list(obj.keys())}")
return shape(geo)
def flatten_polygons(geom) -> List[Polygon]:
if geom is None or geom.is_empty: return []
if isinstance(geom, Polygon): return [geom]
if isinstance(geom, (MultiPolygon, GeometryCollection)):
out = []
for g in geom.geoms: out.extend(flatten_polygons(g))
return out
return []
def obb(polygon: Polygon) -> Tuple[float, float, float]:
"""Minimum rotated rectangle → (short_side, long_side, angle_deg_of_long_axis)."""
mbr = polygon.minimum_rotated_rectangle
coords = list(mbr.exterior.coords)
e1 = math.hypot(coords[1][0]-coords[0][0], coords[1][1]-coords[0][1])
e2 = math.hypot(coords[2][0]-coords[1][0], coords[2][1]-coords[1][1])
if e1 >= e2:
ang = math.degrees(math.atan2(coords[1][1]-coords[0][1], coords[1][0]-coords[0][0]))
return e2, e1, ang
else:
ang = math.degrees(math.atan2(coords[2][1]-coords[1][1], coords[2][0]-coords[1][0]))
return e1, e2, ang
def dominant_angle(boundary: Polygon) -> float:
"""Return the angle (degrees) of the longest edge of the boundary's OBB.
Auto-roads are generated parallel/perpendicular to this axis."""
_, _, ang = obb(boundary)
# Normalise to [0, 90) so roads run in sensible directions
ang = ang % 180
if ang > 90:
ang -= 90
return ang
def estimate_scale(boundary: Polygon) -> float:
lat = boundary.centroid.y
return (111_320.0 + 111_320.0 * math.cos(math.radians(lat))) / 2.0
def deg(metres: float, sc: float) -> float:
return metres / sc
def buffer_line(line, width_deg: float) -> Polygon:
return line.buffer(width_deg / 2, cap_style=2, join_style=2)
def make_rotated_line(cx, cy, angle_deg, length) -> LineString:
"""Line of given length centred at (cx, cy) at angle_deg from E axis."""
rad = math.radians(angle_deg)
dx = math.cos(rad) * length / 2
dy = math.sin(rad) * length / 2
return LineString([(cx - dx, cy - dy), (cx + dx, cy + dy)])
# ─── Road network ─────────────────────────────────────────────────────────────
def build_road_network(
boundary: Polygon,
road_inputs: List[RoadInput],
config: SubdivisionConfig,
sc: float
) -> Tuple[List[Dict], Polygon]:
"""
Returns (road_feature_dicts, road_union_polygon).
Auto-roads are aligned to the boundary's dominant OBB axis.
No perimeter ring is added.
"""
road_features = []
road_polys: List[Polygon] = []
default_rw = deg(config.default_road_width, sc)
# ── 1. User-drawn roads ───────────────────────────────────────────────────
for ri in road_inputs:
try:
line = extract_geometry(ri.geometry if isinstance(ri, RoadInput) else ri)
except Exception:
continue
if not isinstance(line, (LineString, MultiLineString)):
continue
lines = list(line.geoms) if isinstance(line, MultiLineString) else [line]
rtype = ri.road_type if isinstance(ri, RoadInput) else "local"
rwidth_m = (ri.width if isinstance(ri, RoadInput) and ri.width
else ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["width"])
rw = deg(rwidth_m, sc)
color = ROAD_TYPE_DEFAULTS.get(rtype, ROAD_TYPE_DEFAULTS["local"])["color"]
for lp in lines:
cl = valid(lp.intersection(boundary))
if cl and not cl.is_empty:
rp = valid(buffer_line(cl, rw).intersection(boundary))
if rp and not rp.is_empty:
road_polys.append(rp)
road_features.append({
"type": "Feature", "geometry": mapping(rp),
"properties": {
"type": "road_surface", "road_type": rtype,
"width_m": rwidth_m,
"label": (ri.label if isinstance(ri, RoadInput) else None)
or f"{rtype.capitalize()} Road",
"color": color,
}
})
# ── 2. Auto-fill grid roads aligned to boundary's dominant axis ───────────
existing_union = valid(unary_union(road_polys)) if road_polys else Polygon()
bl = deg(config.max_block_length, sc)
rw = default_rw
step = bl + rw
# Dominant angle: roads run at `axis_ang` and `axis_ang + 90`
axis_ang = dominant_angle(boundary)
cx_bnd, cy_bnd = boundary.centroid.x, boundary.centroid.y
# Diagonal of bounding box — long enough to cross the entire boundary
bminx, bminy, bmaxx, bmaxy = boundary.bounds
diag = math.hypot(bmaxx - bminx, bmaxy - bminy) * 1.5
def try_add_auto(base_line: LineString):
cl = valid(base_line.intersection(boundary))
if not cl or cl.is_empty or cl.length < deg(config.min_frontage * 2, sc): return
rp = valid(buffer_line(cl, rw).intersection(boundary))
if not rp or rp.is_empty: return
if not existing_union.is_empty and rp.intersection(existing_union).area / rp.area > 0.6: return
road_polys.append(rp)
road_features.append({
"type": "Feature", "geometry": mapping(rp),
"properties": {
"type": "road_surface", "road_type": "local",
"width_m": config.default_road_width,
"label": "Local Road (auto)",
"color": ROAD_TYPE_DEFAULTS["local"]["color"],
}
})
def sweep_roads(sweep_angle: float):
"""
Generate parallel roads perpendicular to sweep_angle,
spaced by `step`, centred on the boundary centroid.
sweep_angle is the direction we step across (perp to road direction).
"""
perp_rad = math.radians(sweep_angle)
# Unit vector across which we step
ux, uy = math.cos(perp_rad), math.sin(perp_rad)
# Project all boundary vertices onto this axis to find extent
bcoords = list(boundary.exterior.coords)
projs = [x*ux + y*uy for x, y in bcoords]
p_min, p_max = min(projs), max(projs)
positions = []
pos = p_min + step
while pos < p_max - rw:
positions.append(pos)
pos += step
if not positions:
# Site fits in one block: add one road through the centre if deep enough
extent = p_max - p_min
if extent > deg(config.min_plot_depth * 2 + config.default_road_width, sc):
positions = [(p_min + p_max) / 2]
for pos in positions:
# Centre of this road line in geographic coordinates
lx = cx_bnd + (pos - (cx_bnd*ux + cy_bnd*uy)) * ux
ly = cy_bnd + (pos - (cx_bnd*ux + cy_bnd*uy)) * uy
road_line = make_rotated_line(lx, ly, sweep_angle + 90, diag)
try_add_auto(road_line)
# Roads parallel to long axis (sweep perpendicular to long axis = axis_ang + 90)
sweep_roads(axis_ang + 90)
# Roads perpendicular to long axis (sweep along long axis = axis_ang)
sweep_roads(axis_ang)
road_union = valid(unary_union(road_polys)) if road_polys else Polygon()
return road_features, road_union
# ─── Cul-de-sac ───────────────────────────────────────────────────────────────
def block_has_dead_end(block: Polygon, road_union: Polygon, sc: float) -> bool:
"""
True if the block's 'far end' (relative to its long axis) is NOT touching
any road — indicating a dead end that could benefit from a cul-de-sac.
We test both terminal thirds of the block; if either misses road_union, it's a dead end.
"""
if road_union is None or road_union.is_empty:
return False
_, _, angle = obb(block)
centroid = block.centroid
rotated = valid(rotate(block, -angle, origin=centroid))
bx0, by0, bx1, by1 = rotated.bounds
bh = by1 - by0
tol = deg(1.0, sc)
# Bottom third in rotated space
bottom_strip = valid(rotate(
Polygon([(bx0, by0), (bx1, by0), (bx1, by0 + bh*0.35), (bx0, by0 + bh*0.35)]),
angle, origin=centroid
))
# Top third
top_strip = valid(rotate(
Polygon([(bx0, by1 - bh*0.35), (bx1, by1 - bh*0.35), (bx1, by1), (bx0, by1)]),
angle, origin=centroid
))
bottom_ok = bottom_strip and bottom_strip.buffer(tol).intersects(road_union)
top_ok = top_strip and top_strip.buffer(tol).intersects(road_union)
return not bottom_ok or not top_ok
def generate_culdesacs(blocks, road_union, config, sc):
"""
Only attach a cul-de-sac to a block when:
1. The block exceeds max_block_length
2. At least one end of the block has no road access (genuine dead end)
3. The resulting geometry is valid and leaves enough room for parcels
"""
cds_polys, new_blocks = [], []
bl = deg(config.max_block_length, sc)
for block in blocks:
_, h, _ = obb(block)
if h > bl and config.allow_culdesac and block_has_dead_end(block, road_union, sc):
cds, trimmed = _try_culdesac(block, config, sc)
if cds and trimmed and not trimmed.is_empty:
cds_polys.append(cds)
new_blocks.append(trimmed)
continue
new_blocks.append(block)
return cds_polys, new_blocks
def _try_culdesac(block: Polygon, config: SubdivisionConfig, sc: float):
"""
Insert a cul-de-sac at the dead end of the block, aligned to the block's OBB.
Radius = road_width (not 1.8×); stub width = road_width.
Verifies the trimmed block can still hold at least two minimum parcels.
"""
rw_deg = deg(config.default_road_width, sc)
radius_deg = rw_deg # 1× road width, not 1.8×
min_a_deg2 = config.min_area / sc**2
_, _, angle = obb(block)
centroid = block.centroid
rotated = valid(rotate(block, -angle, origin=centroid))
bx0, by0, bx1, by1 = rotated.bounds
bw, bh = bx1 - bx0, by1 - by0
bcx = (bx0 + bx1) / 2
# Stub runs from the bottom edge upward, centred on block
stub_len = min(deg(config.max_block_length * 0.3, sc), bh * 0.4)
stub_rot = LineString([(bcx, by0), (bcx, by0 + stub_len)])
end_y = by0 + stub_len
stub_buf = valid(buffer_line(stub_rot, rw_deg))
circle = Point(bcx, end_y).buffer(radius_deg)
cds_rot = valid(unary_union([stub_buf, circle]).intersection(rotated))
trim_rot = valid(rotated.difference(cds_rot))
if trim_rot.is_empty:
return None, None
# Rotate back
cds_geom = valid(rotate(cds_rot, angle, origin=centroid))
trim_geom = valid(rotate(trim_rot, angle, origin=centroid))
# Must leave room for at least 2 minimum parcels
if trim_geom.area < min_a_deg2 * 2:
return None, None
# Must not have the cds overlapping too much of remaining block
if not cds_geom.is_empty and cds_geom.area > block.area * 0.4:
return None, None
return cds_geom, trim_geom
# ─── Block subdivider ─────────────────────────────────────────────────────────
def subdivide_block(block, config, road_union, block_id, start_num, sc,
zone_label=None, zone_color=None):
"""
Subdivide a block into parcels oriented along the block's own OBB axis.
Frontage faces the nearest road edge.
"""
min_f = deg(config.min_frontage, sc)
min_d = deg(config.min_plot_depth, sc)
min_a = config.min_area / sc**2
max_a = (config.max_area or 1e18) / sc**2
_, _, angle = obb(block)
centroid = block.centroid
rotated = valid(rotate(block, -angle, origin=centroid))
rminx, rminy, rmaxx, rmaxy = rotated.bounds
bw = rmaxx - rminx
bh = rmaxy - rminy
num = start_num
parcels = []
def emit(x0, y0, x1, y1):
nonlocal num
cell = Polygon([(x0,y0),(x1,y0),(x1,y1),(x0,y1)])
p = valid(cell.intersection(rotated))
if p is None or p.is_empty or p.area < min_a * 0.45: return
p = valid(rotate(p, angle, origin=centroid))
feats = _make_parcel(p, config, block_id, num, sc, road_union,
zone_label, zone_color)
parcels.extend(feats)
num += 1
# Decide orientation based on block shape:
# Wide block (bw >= bh): parcels stacked in columns along X, fronting Y roads
# Tall block (bh > bw): parcels stacked in rows along Y, fronting X roads
if bw >= bh:
n_cols = max(1, round(bw / min_f))
col_w = bw / n_cols
# Double-bank if block deep enough for two rows of parcels
if bh >= min_d * 2.2:
rows = [(rminy, rminy + bh/2), (rminy + bh/2, rmaxy)]
else:
rows = [(rminy, rmaxy)]
for y0, y1 in rows:
for c in range(n_cols):
x0 = rminx + c * col_w
emit(x0, y0, x0 + col_w, y1)
else:
n_rows = max(1, round(bh / min_f))
row_h = bh / n_rows
if bw >= min_d * 2.2:
cols = [(rminx, rminx + bw/2), (rminx + bw/2, rmaxx)]
else:
cols = [(rminx, rmaxx)]
for x0, x1 in cols:
for r in range(n_rows):
y0 = rminy + r * row_h
emit(x0, y0, x1, y0 + row_h)
return parcels
def _make_parcel(geom, config, block_id, num, sc, road_union,
zone_label, zone_color):
out = []
tol = deg(0.5, sc)
preset = LAND_USE_PRESETS.get(config.land_use, LAND_USE_PRESETS["residential_medium"])
parts = flatten_polygons(geom) if not isinstance(geom, Polygon) else [geom]
for g in parts:
if g.is_empty: continue
area_m2 = round(g.area * sc**2, 1)
if area_m2 < config.min_area * 0.4: continue
pw, ph, _ = obb(g)
frontage_m = round(min(pw, ph) * sc, 1)
plot_depth_m = round(max(pw, ph) * sc, 1)
has_access = (road_union is not None and not road_union.is_empty
and g.buffer(tol).intersects(road_union))
max_a = config.max_area or 1e18
out.append({
"type": "Feature",
"geometry": mapping(g),
"properties": {
"parcel_id": f"{config.parcel_prefix}{num:04d}",
"parcel_num": num,
"block_id": block_id,
"area_m2": area_m2,
"area_ha": round(area_m2 / 10000, 4),
"frontage_m": frontage_m,
"plot_depth_m": plot_depth_m,
"address": f"Block {block_id}, Plot {num}",
"land_use": config.land_use,
"land_use_label": zone_label or preset["label"],
"zone_color": zone_color or preset["color"],
"zone": zone_label or preset["label"],
"status": "vacant",
"has_access": has_access,
"frontage_ok": frontage_m >= config.min_frontage * 0.85,
"area_ok": config.min_area * 0.8 <= area_m2 <= max_a * 1.2,
"within_max_area": area_m2 <= max_a * 1.1,
}
})
return out
# ─── Quality passes ───────────────────────────────────────────────────────────
def absorb_bad_parcels(parcels, config, sc):
tol = deg(0.5, sc)
changed = True
while changed:
changed = False
geoms = [shape(p["geometry"]) for p in parcels]
for i, p in enumerate(parcels):
g = geoms[i]
am2 = g.area * sc**2
w, h, _ = obb(g)
compact = 4 * math.pi * g.area / (g.length**2) if g.length else 0
ok = (am2 >= config.min_area * 0.8
and compact >= 0.18
and (h == 0 or w / h >= 0.12))
if ok: continue
best_j, best_touch = -1, 0.0
for j, q in enumerate(parcels):
if j == i: continue
if p["properties"]["block_id"] != q["properties"]["block_id"]: continue
touch = g.buffer(tol).intersection(geoms[j]).area
if touch > best_touch: best_touch, best_j = touch, j
if best_j >= 0 and best_touch > 0:
merged = valid(g.union(geoms[best_j]))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
am2_new = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
acc = (p["properties"].get("has_access")
or parcels[best_j]["properties"].get("has_access"))
parcels[best_j]["geometry"] = mapping(merged)
parcels[best_j]["properties"].update({
"area_m2": am2_new,
"frontage_m": round(min(pw,ph)*sc, 1),
"plot_depth_m": round(max(pw,ph)*sc, 1),
"has_access": acc,
"area_ok": config.min_area*0.8 <= am2_new <= (config.max_area or 1e18)*1.2,
})
geoms[best_j] = merged; parcels.pop(i); changed = True; break
return parcels
def enforce_access(parcels, road_union, config, sc):
if not road_union or road_union.is_empty: return parcels
tol = deg(1.0, sc)
changed = True
while changed:
changed = False
geoms = [shape(p["geometry"]) for p in parcels]
for i, p in enumerate(parcels):
if p["properties"].get("has_access"): continue
g = geoms[i]; best_j, best_sh = -1, 0.0
for j, q in enumerate(parcels):
if j == i: continue
if p["properties"]["block_id"] != q["properties"]["block_id"]: continue
if not q["properties"].get("has_access"): continue
sh = g.buffer(tol).intersection(geoms[j]).area
if sh > best_sh: best_sh, best_j = sh, j
if best_j >= 0:
merged = valid(g.union(geoms[best_j]))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
am2 = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
parcels[best_j]["geometry"] = mapping(merged)
parcels[best_j]["properties"].update({
"area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1),
"plot_depth_m": round(max(pw,ph)*sc,1), "has_access": True,
"area_ok": config.min_area*0.8 <= am2 <= (config.max_area or 1e18)*1.2,
})
geoms[best_j] = merged; parcels.pop(i); changed = True; break
# Final recheck
for p in parcels:
g = shape(p["geometry"])
p["properties"]["has_access"] = (not road_union.is_empty
and g.buffer(tol).intersects(road_union))
return parcels
def apply_existing_buildings(building_geoms, parcels, sc, config):
"""
For each building footprint:
1. Find all parcels that intersect it.
2. Merge them until one parcel contains the building.
3. If the merged parcel is still smaller than the building's bounding box
(with a 2 m margin), expand it by merging with neighbouring parcels.
4. Mark the parcel status='built'.
"""
if not building_geoms or not parcels: return parcels
tol = deg(0.5, sc)
margin = deg(2.0, sc)
for bldg in building_geoms:
bldg_buf = bldg.buffer(tol)
hit = [i for i, p in enumerate(parcels)
if shape(p["geometry"]).intersects(bldg_buf)]
if not hit: continue
# Merge all hit parcels
while len(hit) > 1:
i, j = hit[0], hit[1]
gi = shape(parcels[i]["geometry"])
gj = shape(parcels[j]["geometry"])
merged = valid(gi.union(gj))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
am2 = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
parcels[i]["geometry"] = mapping(merged)
parcels[i]["properties"].update({
"area_m2": am2,
"frontage_m": round(min(pw,ph)*sc, 1),
"plot_depth_m": round(max(pw,ph)*sc, 1),
"has_access": (parcels[i]["properties"].get("has_access")
or parcels[j]["properties"].get("has_access")),
})
parcels.pop(j)
hit = [hit[0]] + [k-(1 if k>j else 0) for k in hit[2:]]
# Check the merged parcel truly contains the building (with margin)
target_idx = hit[0]
parcel_geom = shape(parcels[target_idx]["geometry"])
# If building does not fit, expand into adjacent parcels
expansion_attempts = 0
while not parcel_geom.buffer(margin).contains(bldg) and expansion_attempts < 6:
expansion_attempts += 1
geoms_all = [shape(p["geometry"]) for p in parcels]
bid = parcels[target_idx]["properties"]["block_id"]
# Find touching neighbour (same block preferred, any block if not found)
best_nb, best_touch = -1, 0.0
for k, q in enumerate(parcels):
if k == target_idx: continue
touch = parcel_geom.buffer(tol).intersection(geoms_all[k]).area
if touch > best_touch:
best_touch, best_nb = touch, k
if best_nb < 0 or best_touch == 0:
break
merged = valid(parcel_geom.union(geoms_all[best_nb]))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
am2 = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
parcels[target_idx]["geometry"] = mapping(merged)
parcels[target_idx]["properties"].update({
"area_m2": am2,
"frontage_m": round(min(pw,ph)*sc, 1),
"plot_depth_m": round(max(pw,ph)*sc, 1),
"has_access": (parcels[target_idx]["properties"].get("has_access")
or parcels[best_nb]["properties"].get("has_access")),
})
parcels.pop(best_nb)
if best_nb < target_idx:
target_idx -= 1
parcel_geom = shape(parcels[target_idx]["geometry"])
parcels[target_idx]["properties"]["status"] = "built"
parcels[target_idx]["properties"]["building_area_m2"] = round(bldg.area * sc**2, 1)
parcels[target_idx]["properties"]["building_fits"] = parcel_geom.buffer(margin).contains(bldg)
return parcels
# ─── Validation ───────────────────────────────────────────────────────────────
def validate_config(boundary: Polygon, config: SubdivisionConfig, sc: float):
site_area = boundary.area * sc**2
warnings = []
if site_area > MAX_SITE_AREA_HA * 10_000:
raise HTTPException(
400,
f"Site area ({site_area/10000:.1f} ha) exceeds the maximum allowed "
f"planning area of {MAX_SITE_AREA_HA} ha. Please split the site into smaller sections."
)
if config.min_area and config.min_area > site_area * 0.5:
warnings.append(
f"min_area ({config.min_area} m²) is more than half the site area "
f"({site_area:.0f} m²) — very few parcels will fit"
)
if config.max_area and config.max_area < config.min_area:
warnings.append("max_area is less than min_area")
if config.min_frontage > config.min_plot_depth:
warnings.append("min_frontage exceeds min_plot_depth — parcels will be wider than deep")
if config.default_road_width >= config.min_frontage * 2:
warnings.append("Road width is very large relative to parcel frontage")
return warnings
# ─── Main endpoint ────────────────────────────────────────────────────────────
@app.post("/subdivide")
async def subdivide(request: SubdivisionRequest):
session_id = request.session_id or str(uuid.uuid4())
_progress[session_id] = {"pct": 0, "msg": "Starting…"}
def prog(pct, msg):
_progress[session_id] = {"pct": pct, "msg": msg}
try:
config = request.config or SubdivisionConfig()
prog(5, "Parsing boundary…")
boundary = valid(extract_geometry(request.boundary))
if isinstance(boundary, MultiPolygon):
boundary = max(boundary.geoms, key=lambda g: g.area)
if not isinstance(boundary, Polygon):
raise HTTPException(400, "Boundary must be a polygon")
sc = estimate_scale(boundary)
warnings = validate_config(boundary, config, sc) # may raise 400
prog(10, "Parsing roads and buildings…")
road_inputs: List[RoadInput] = []
for r in (request.roads or []):
if isinstance(r, dict):
try:
road_inputs.append(RoadInput(**r))
except Exception:
try: road_inputs.append(RoadInput(geometry=r))
except Exception: pass
else:
road_inputs.append(r)
building_geoms = []
for f in (request.existing_features or []):
try:
g = valid(extract_geometry(f).intersection(boundary))
building_geoms.extend(flatten_polygons(g))
except Exception: pass
prog(20, "Building road network (axis-aligned)…")
road_features, road_union = build_road_network(boundary, road_inputs, config, sc)
prog(30, "Computing buildable area…")
obstacles = road_union
if building_geoms:
bldg_union = valid(unary_union(building_geoms))
obstacles = valid(unary_union([road_union, bldg_union])) if not road_union.is_empty else bldg_union
buildable = valid(boundary.difference(obstacles)) if not obstacles.is_empty else boundary
blocks = [b for b in flatten_polygons(buildable)
if b.area * sc**2 > config.min_area]
prog(40, "Generating cul-de-sacs (where needed)…")
cds_polys = []
if config.allow_culdesac and blocks:
cds_polys, blocks = generate_culdesacs(blocks, road_union, config, sc)
if cds_polys:
road_union = valid(unary_union([road_union] + cds_polys))
prog(55, "Subdividing blocks…")
all_parcels: List[Dict] = []
num = config.start_number
for bid, block in enumerate(blocks, 1):
bp = subdivide_block(block, config, road_union, bid, num, sc)
all_parcels.extend(bp)
num += len(bp)
prog(55 + int(20 * bid / max(len(blocks), 1)),
f"Block {bid}/{len(blocks)}")
prog(78, "Shape quality pass…")
all_parcels = absorb_bad_parcels(all_parcels, config, sc)
prog(86, "Access enforcement…")
all_parcels = enforce_access(all_parcels, road_union, config, sc)
prog(92, "Fitting existing buildings into parcels…")
if building_geoms:
all_parcels = apply_existing_buildings(building_geoms, all_parcels, sc, config)
prog(96, "Numbering…")
for i, p in enumerate(all_parcels, config.start_number):
p["properties"]["parcel_num"] = i
p["properties"]["parcel_id"] = f"{config.parcel_prefix}{i:04d}"
block_features = [
{"type":"Feature","geometry":mapping(b),
"properties":{"block_id":i+1,"area_m2":round(b.area*sc**2,1)}}
for i, b in enumerate(blocks)
]
cds_features = [
{"type":"Feature","geometry":mapping(c),
"properties":{"type":"culdesac","id":f"CDS{i+1:03d}"}}
for i, c in enumerate(cds_polys)
]
areas = [p["properties"]["area_m2"] for p in all_parcels]
no_acc = sum(1 for p in all_parcels if not p["properties"].get("has_access"))
built = sum(1 for p in all_parcels if p["properties"].get("status") == "built")
stats = {
"total_parcels": len(all_parcels), "total_blocks": len(blocks),
"total_roads": len(road_features), "culdesacs": len(cds_polys),
"parcels_no_access": no_acc, "parcels_built": built,
"parcels_vacant": len(all_parcels) - built,
"boundary_area_m2": round(boundary.area * sc**2, 1),
"road_area_m2": round(road_union.area * sc**2, 1) if not road_union.is_empty else 0,
"buildable_area_m2": round(sum(b.area*sc**2 for b in blocks), 1),
"avg_parcel_area_m2": round(sum(areas)/len(areas), 1) if areas else 0,
"min_parcel_area_m2": round(min(areas), 1) if areas else 0,
"max_parcel_area_m2": round(max(areas), 1) if areas else 0,
"existing_buildings": len(building_geoms),
"user_roads_drawn": len(road_inputs),
"warnings": warnings,
"land_use": config.land_use,
"land_use_label": LAND_USE_PRESETS.get(config.land_use, {}).get("label", config.land_use),
"dominant_axis_deg": round(dominant_angle(boundary), 1),
"config": {
"min_frontage": config.min_frontage, "min_plot_depth": config.min_plot_depth,
"min_area": config.min_area, "max_area": config.max_area,
"default_road_width": config.default_road_width,
"splay_radius": config.splay_radius,
"max_block_length": config.max_block_length,
}
}
prog(100, "Done")
return {
"parcels": all_parcels, "roads": road_features,
"blocks": block_features, "culdesacs": cds_features,
"stats": stats, "session_id": session_id,
}
except HTTPException: raise
except Exception as e:
prog(-1, f"Error: {e}")
raise HTTPException(500, f"{e}\n{traceback.format_exc()}")
finally:
async def _cleanup():
await asyncio.sleep(10)
_progress.pop(session_id, None)
asyncio.create_task(_cleanup())
@app.get("/progress/{session_id}")
async def get_progress(session_id: str):
return _progress.get(session_id, {"pct": -1, "msg": "Not found"})
@app.get("/land-use-presets")
async def land_use_presets_ep():
return LAND_USE_PRESETS
@app.get("/road-type-defaults")
async def road_type_defaults_ep():
return ROAD_TYPE_DEFAULTS
@app.get("/health")
async def health():
return {"status": "ok", "version": "5.0.0", "max_site_ha": MAX_SITE_AREA_HA}
@app.get("/config/defaults")
async def get_defaults():
return SubdivisionConfig().model_dump()