2026-03-03 11:33:31 +03:00

651 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Parcel Subdivision Engine - FastAPI Backend v3
- User roads + auto-fill roads combined (not either/or)
- Existing building footprints respected: carved from buildable area,
containing parcel flagged status=built
- Perimeter access road always present
- Access enforcement pass
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional, Tuple, Dict, Any
import math, uuid, traceback
from shapely.geometry import (
Polygon, MultiPolygon, LineString, MultiLineString,
Point, GeometryCollection, mapping, shape
)
from shapely.ops import unary_union
from shapely.affinity import rotate
from shapely.validation import make_valid
app = FastAPI(title="Parcel Subdivision Engine", version="3.0.0")
app.add_middleware(
CORSMiddleware, allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"],
)
# ─── Models ───────────────────────────────────────────────────────────────────
class SubdivisionConfig(BaseModel):
min_frontage: float = 12.0
min_depth: float = 25.0
road_width: float = 9.0
max_block_length: float = 120.0
allow_culdesac: bool = True
corner_radius: float = 3.0
min_area: Optional[float] = None
def model_post_init(self, _):
if self.min_area is None:
self.min_area = self.min_frontage * self.min_depth
class SubdivisionRequest(BaseModel):
boundary: Dict[str, Any]
roads: Optional[List[Dict[str, Any]]] = []
existing_features: Optional[List[Dict[str, Any]]] = []
zones: Optional[List[Dict[str, Any]]] = []
config: Optional[SubdivisionConfig] = None
# ─── Geometry helpers ─────────────────────────────────────────────────────────
def valid(geom):
if geom is None or geom.is_empty:
return geom
if not geom.is_valid:
geom = make_valid(geom)
return geom
def extract_geometry(obj: Dict) -> Any:
"""Accept GeoJSON Feature wrapper or bare geometry dict."""
if obj is None:
raise ValueError("Null geometry")
geo = obj.get("geometry", obj) if obj.get("type") == "Feature" else obj
if geo is None or not geo.get("type"):
raise ValueError(f"Cannot extract geometry from: {list(obj.keys())}")
return shape(geo)
def flatten_polygons(geom) -> List[Polygon]:
"""Recursively extract all Polygon parts from any geometry."""
if geom is None or geom.is_empty:
return []
if isinstance(geom, Polygon):
return [geom]
if isinstance(geom, (MultiPolygon, GeometryCollection)):
out = []
for g in geom.geoms:
out.extend(flatten_polygons(g))
return out
return []
def obb(polygon: Polygon) -> Tuple[float, float, float]:
"""Oriented bounding box → (short, long, angle_deg)."""
mbr = polygon.minimum_rotated_rectangle
coords = list(mbr.exterior.coords)
e1 = math.hypot(coords[1][0]-coords[0][0], coords[1][1]-coords[0][1])
e2 = math.hypot(coords[2][0]-coords[1][0], coords[2][1]-coords[1][1])
ang = math.degrees(math.atan2(coords[1][1]-coords[0][1], coords[1][0]-coords[0][0]))
return (min(e1,e2), max(e1,e2), ang)
def estimate_scale(boundary: Polygon) -> float:
"""Metres per degree at boundary centroid latitude."""
lat = boundary.centroid.y
return (111_320.0 + 111_320.0 * math.cos(math.radians(lat))) / 2.0
def deg(metres: float, sc: float) -> float:
"""Convert metres → degrees using scale factor."""
return metres / sc
def buffer_line(line, width_deg: float) -> Polygon:
return line.buffer(width_deg / 2, cap_style=2, join_style=2)
# ─── Road network ─────────────────────────────────────────────────────────────
def build_road_network(
boundary: Polygon,
user_roads: List[LineString],
config: SubdivisionConfig,
sc: float
) -> Tuple[List[Polygon], Polygon]:
"""
Build the complete road network:
1. Buffer every user-drawn road centreline.
2. Auto-generate grid roads for any block that exceeds max_block_length
(regardless of whether the user drew some roads already).
3. Add a perimeter half-road ring so every outer block has access.
Returns (road_polygons_for_display, road_union_polygon).
"""
rw = deg(config.road_width, sc)
bl = deg(config.max_block_length, sc)
step = bl + rw # spacing between road centrelines
road_polys: List[Polygon] = []
# ── Step 1: user roads ────────────────────────────────────────────────────
for line in user_roads:
cl = valid(line.intersection(boundary))
if cl and not cl.is_empty:
rp = valid(buffer_line(cl, rw).intersection(boundary))
if rp and not rp.is_empty:
road_polys.append(rp)
# ── Step 2: auto-fill grid roads ──────────────────────────────────────────
# Always run auto-generation, but only add roads that fall in areas NOT
# already covered by user roads (avoids duplicate overlapping roads).
existing_road_union = valid(unary_union(road_polys)) if road_polys else Polygon()
minx, miny, maxx, maxy = boundary.bounds
site_w = maxx - minx
site_h = maxy - miny
def try_add(line: LineString):
cl = valid(line.intersection(boundary))
if not cl or cl.is_empty or cl.length < deg(config.min_frontage * 2, sc):
return
rp = valid(buffer_line(cl, rw).intersection(boundary))
if not rp or rp.is_empty:
return
# Skip if this road is already substantially covered by user roads
if not existing_road_union.is_empty:
overlap = rp.intersection(existing_road_union).area / rp.area
if overlap > 0.6:
return
road_polys.append(rp)
# Vertical auto-roads
x_positions = []
x = minx + step
while x < maxx - rw:
x_positions.append(x)
x += step
if not x_positions and site_w > deg(config.min_depth * 2 + config.road_width, sc):
x_positions = [minx + site_w / 2]
for x in x_positions:
try_add(LineString([(x, miny - 1), (x, maxy + 1)]))
# Horizontal auto-roads
y_positions = []
y = miny + step
while y < maxy - rw:
y_positions.append(y)
y += step
if not y_positions and site_h > deg(config.min_depth * 2 + config.road_width, sc):
y_positions = [miny + site_h / 2]
for y in y_positions:
try_add(LineString([(minx - 1, y), (maxx + 1, y)]))
# ── Step 3: perimeter access ring ────────────────────────────────────────
perim_width = deg(config.road_width * 0.5, sc)
inner = valid(boundary.buffer(-perim_width))
if inner and not inner.is_empty and inner.area > 0:
perim = valid(boundary.difference(inner))
if perim and not perim.is_empty:
road_polys.append(perim)
# Final road union
road_union = valid(unary_union(road_polys)) if road_polys else Polygon()
return road_polys, road_union
# ─── Existing features (buildings) ───────────────────────────────────────────
def process_existing_features(
building_geoms: List[Polygon],
buildable: Polygon,
parcels: List[Dict],
sc: float,
config: SubdivisionConfig
) -> List[Dict]:
"""
For each existing building footprint:
- Find which parcel(s) it overlaps.
- If it spans multiple parcels, merge them into one.
- Mark the containing parcel status='built', store building info.
- Remove building area from the parcel geometry is NOT done
(building sits inside the parcel, that's realistic).
Buildings that fall entirely outside any parcel are ignored.
"""
if not building_geoms or not parcels:
return parcels
tol = deg(0.5, sc)
for bldg in building_geoms:
bldg_buf = bldg.buffer(tol)
hit_indices = []
for i, p in enumerate(parcels):
pg = shape(p["geometry"])
if pg.intersects(bldg_buf):
hit_indices.append(i)
if not hit_indices:
continue # building outside all parcels — skip
# Merge all hit parcels into one
while len(hit_indices) > 1:
i, j = hit_indices[0], hit_indices[1]
gi = shape(parcels[i]["geometry"])
gj = shape(parcels[j]["geometry"])
merged = valid(gi.union(gj))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
area_m2 = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
parcels[i]["geometry"] = mapping(merged)
parcels[i]["properties"].update({
"area_m2": area_m2,
"frontage_m": round(min(pw,ph)*sc, 1),
"depth_m": round(max(pw,ph)*sc, 1),
"has_access": (parcels[i]["properties"].get("has_access")
or parcels[j]["properties"].get("has_access")),
"area_ok": area_m2 >= config.min_area * 0.8,
})
parcels.pop(j)
# Rebuild indices after pop
hit_indices = [hit_indices[0]] + [
k - (1 if k > j else 0) for k in hit_indices[2:]
]
# Mark the surviving parcel as built
target = hit_indices[0]
parcels[target]["properties"]["status"] = "built"
parcels[target]["properties"]["building_area_m2"] = round(bldg.area * sc**2, 1)
return parcels
# ─── Cul-de-sac generator ─────────────────────────────────────────────────────
def generate_culdesacs(
blocks: List[Polygon],
road_union: Polygon,
config: SubdivisionConfig,
sc: float
) -> Tuple[List[Polygon], List[Polygon]]:
cds_polys, new_blocks = [], []
bl = deg(config.max_block_length, sc)
rw = deg(config.road_width, sc)
for block in blocks:
_, h, _ = obb(block)
if h > bl and config.allow_culdesac:
cds, trimmed = _try_culdesac(block, rw, config, sc)
if cds and trimmed and not trimmed.is_empty:
cds_polys.append(cds)
new_blocks.append(trimmed)
continue
new_blocks.append(block)
return cds_polys, new_blocks
def _try_culdesac(block, rw_deg, config, sc):
cx, _ = block.centroid.x, block.centroid.y
minx, miny, maxx, maxy = block.bounds
radius = deg(config.road_width * 1.8, sc)
stub_len = deg(config.max_block_length * 0.45, sc)
stub = LineString([(cx, miny), (cx, miny + stub_len)]).intersection(block)
if stub.is_empty or stub.length < rw_deg:
return None, None
stub_road = valid(buffer_line(stub, rw_deg).intersection(block))
end_pt = list(stub.coords)[-1]
circle = valid(Point(end_pt).buffer(radius).intersection(block))
cds_area = valid(unary_union([stub_road, circle]).intersection(block))
trimmed = valid(block.difference(cds_area))
if trimmed.is_empty or trimmed.area * sc**2 < config.min_area:
return None, None
return cds_area, trimmed
# ─── Block subdivider ─────────────────────────────────────────────────────────
def subdivide_block(
block: Polygon,
config: SubdivisionConfig,
road_union: Polygon,
block_id: int,
start_num: int,
sc: float
) -> List[Dict]:
min_f = deg(config.min_frontage, sc)
min_d = deg(config.min_depth, sc)
min_a = config.min_area / sc**2
_, _, angle = obb(block)
centroid = block.centroid
rotated = valid(rotate(block, -angle, origin=centroid))
rminx, rminy, rmaxx, rmaxy = rotated.bounds
bw = rmaxx - rminx
bh = rmaxy - rminy
num = start_num
parcels = []
def emit(rect_in_rotated):
nonlocal num
p = valid(rect_in_rotated.intersection(rotated))
if p is None or p.is_empty or p.area < min_a * 0.5:
return
p = valid(rotate(p, angle, origin=centroid))
feats = _make_parcel_features(p, config, block_id, num, sc, road_union)
parcels.extend(feats)
num += 1
if bw >= bh:
# Wide block: frontage along X, depth along Y
n_cols = max(1, round(bw / min_f))
col_w = bw / n_cols
rows = ([(rminy, rminy + bh/2), (rminy + bh/2, rmaxy)]
if bh >= min_d * 2 else [(rminy, rmaxy)])
for y0, y1 in rows:
for c in range(n_cols):
x0 = rminx + c * col_w
emit(Polygon([(x0,y0),(x0+col_w,y0),(x0+col_w,y1),(x0,y1)]))
else:
# Tall block: frontage along Y, depth along X
n_rows = max(1, round(bh / min_f))
row_h = bh / n_rows
cols = ([(rminx, rminx + bw/2), (rminx + bw/2, rmaxx)]
if bw >= min_d * 2 else [(rminx, rmaxx)])
for x0, x1 in cols:
for r in range(n_rows):
y0 = rminy + r * row_h
emit(Polygon([(x0,y0),(x1,y0),(x1,y0+row_h),(x0,y0+row_h)]))
return parcels
def _make_parcel_features(geom, config, block_id, num, sc, road_union):
out = []
parts = flatten_polygons(geom) if not isinstance(geom, Polygon) else [geom]
tol = deg(0.5, sc)
for g in parts:
if g.is_empty:
continue
area_m2 = round(g.area * sc**2, 1)
if area_m2 < config.min_area * 0.4:
continue
pw, ph, _ = obb(g)
has_access = (road_union is not None
and not road_union.is_empty
and g.buffer(tol).intersects(road_union))
out.append({
"type": "Feature",
"geometry": mapping(g),
"properties": {
"parcel_id": f"P{num:04d}",
"parcel_num": num,
"block_id": block_id,
"area_m2": area_m2,
"area_ha": round(area_m2 / 10000, 4),
"frontage_m": round(min(pw,ph) * sc, 1),
"depth_m": round(max(pw,ph) * sc, 1),
"address": f"Block {block_id}, Plot {num}",
"zone": "Residential",
"status": "vacant",
"has_access": has_access,
"frontage_ok": round(min(pw,ph)*sc,1) >= config.min_frontage * 0.85,
"area_ok": area_m2 >= config.min_area * 0.80,
}
})
return out
# ─── Quality passes ───────────────────────────────────────────────────────────
def absorb_bad_parcels(parcels, config, sc):
"""Merge undersized / badly-shaped parcels into best same-block neighbour."""
tol = deg(0.5, sc)
changed = True
while changed:
changed = False
geoms = [shape(p["geometry"]) for p in parcels]
for i, p in enumerate(parcels):
g = geoms[i]
area_m2 = g.area * sc**2
w, h, _ = obb(g)
compact = 4 * math.pi * g.area / (g.length**2) if g.length else 0
ok = (area_m2 >= config.min_area * 0.8
and compact >= 0.18
and (h == 0 or w/h >= 0.12))
if ok:
continue
best_j, best_touch = -1, 0.0
for j, q in enumerate(parcels):
if j == i:
continue
if p["properties"]["block_id"] != q["properties"]["block_id"]:
continue
touch = g.buffer(tol).intersection(geoms[j]).area
if touch > best_touch:
best_touch, best_j = touch, j
if best_j >= 0 and best_touch > 0:
merged = valid(g.union(geoms[best_j]))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
am2 = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
acc = (p["properties"].get("has_access")
or parcels[best_j]["properties"].get("has_access"))
parcels[best_j]["geometry"] = mapping(merged)
parcels[best_j]["properties"].update({
"area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1),
"depth_m": round(max(pw,ph)*sc,1), "has_access": acc,
"area_ok": am2 >= config.min_area * 0.8,
})
geoms[best_j] = merged
parcels.pop(i)
changed = True
break
return parcels
def enforce_access(parcels, road_union, config, sc):
"""Merge no-access parcels into nearest accessed neighbour in same block."""
if not road_union or road_union.is_empty:
return parcels
tol = deg(1.0, sc)
changed = True
while changed:
changed = False
geoms = [shape(p["geometry"]) for p in parcels]
for i, p in enumerate(parcels):
if p["properties"].get("has_access"):
continue
g = geoms[i]
best_j, best_shared = -1, 0.0
for j, q in enumerate(parcels):
if j == i:
continue
if p["properties"]["block_id"] != q["properties"]["block_id"]:
continue
if not q["properties"].get("has_access"):
continue
shared = g.buffer(tol).intersection(geoms[j]).area
if shared > best_shared:
best_shared, best_j = shared, j
if best_j >= 0:
merged = valid(g.union(geoms[best_j]))
if isinstance(merged, MultiPolygon):
merged = max(merged.geoms, key=lambda x: x.area)
am2 = round(merged.area * sc**2, 1)
pw, ph, _ = obb(merged)
parcels[best_j]["geometry"] = mapping(merged)
parcels[best_j]["properties"].update({
"area_m2": am2, "frontage_m": round(min(pw,ph)*sc,1),
"depth_m": round(max(pw,ph)*sc,1), "has_access": True,
"area_ok": am2 >= config.min_area * 0.8,
})
geoms[best_j] = merged
parcels.pop(i)
changed = True
break
# Final access recheck
for p in parcels:
g = shape(p["geometry"])
p["properties"]["has_access"] = (
not road_union.is_empty and g.buffer(tol).intersects(road_union)
)
return parcels
# ─── Main endpoint ────────────────────────────────────────────────────────────
@app.post("/api/subdivide")
async def subdivide(request: SubdivisionRequest):
try:
config = request.config or SubdivisionConfig()
# 1. Parse & validate boundary
boundary = valid(extract_geometry(request.boundary))
if isinstance(boundary, MultiPolygon):
boundary = max(boundary.geoms, key=lambda g: g.area)
if not isinstance(boundary, Polygon):
raise HTTPException(400, "Boundary must be a polygon")
sc = estimate_scale(boundary)
# 2. Parse user-drawn road centrelines
user_roads: List[LineString] = []
for r in (request.roads or []):
try:
g= shape(r["geometry"])
#g = extract_geometry(r)
if isinstance(g, LineString):
user_roads.append(g)
elif isinstance(g, MultiLineString):
user_roads.extend(g.geoms)
except Exception:
pass
# print("roads")
# print(request.roads)
# print(user_roads)
# 3. Parse existing building footprints
print(request.existing_features)
building_geoms: List[Polygon] = []
for f in (request.existing_features or []):
try:
#g = extract_geometry(f)
g = shape(f["geometry"])
# Clip to boundary
g = valid(g.intersection(boundary))
building_geoms.extend(flatten_polygons(g))
except Exception:
pass
#print("buildings")
#print( building_geoms)
# 4. Build road network (user + auto-fill + perimeter)
road_polys, road_union = build_road_network(boundary, user_roads, config, sc)
# 5. Buildable area = boundary roads existing buildings
# Buildings are carved out so subdivision doesn't cut through them.
obstacles = road_union
if building_geoms:
bldg_union = valid(unary_union(building_geoms))
obstacles = valid(unary_union([road_union, bldg_union]))
buildable = valid(boundary.difference(obstacles))
# 6. Extract blocks
min_block_area = config.min_area / sc**2
blocks = [b for b in flatten_polygons(buildable) if b.area > min_block_area]
# 7. Cul-de-sacs
cds_polys: List[Polygon] = []
if config.allow_culdesac and blocks:
cds_polys, blocks = generate_culdesacs(blocks, road_union, config, sc)
if cds_polys:
road_union = valid(unary_union([road_union] + cds_polys))
# 8. Subdivide every block
all_parcels: List[Dict] = []
num = 1
for bid, block in enumerate(blocks, 1):
bp = subdivide_block(block, config, road_union, bid, num, sc)
all_parcels.extend(bp)
num += len(bp)
# 9. Quality passes
all_parcels = absorb_bad_parcels(all_parcels, config, sc)
all_parcels = enforce_access(all_parcels, road_union, config, sc)
# 10. Apply existing building footprints → merge spanning parcels, mark built
if building_geoms:
all_parcels = process_existing_features(
building_geoms, buildable, all_parcels, sc, config
)
# 11. Re-number
for i, p in enumerate(all_parcels, 1):
p["properties"]["parcel_num"] = i
p["properties"]["parcel_id"] = f"P{i:04d}"
# 12. Build response collections
# Roads: exclude the perimeter ring (last element) from display features
# because it is very thin and would look ugly — the boundary line already
# shows where the perimeter is.
display_road_polys = road_polys[:-1] if road_polys else road_polys
road_features = [
{"type": "Feature", "geometry": mapping(rp),
"properties": {"type": "road_surface", "id": f"R{i+1:03d}",
"width_m": config.road_width}}
for i, rp in enumerate(display_road_polys)
]
block_features = [
{"type": "Feature", "geometry": mapping(b),
"properties": {"block_id": i+1, "area_m2": round(b.area*sc**2, 1)}}
for i, b in enumerate(blocks)
]
cds_features = [
{"type": "Feature", "geometry": mapping(c),
"properties": {"type": "culdesac", "id": f"CDS{i+1:03d}"}}
for i, c in enumerate(cds_polys)
]
# 13. Stats
areas = [p["properties"]["area_m2"] for p in all_parcels]
no_acc = sum(1 for p in all_parcels if not p["properties"].get("has_access"))
built = sum(1 for p in all_parcels if p["properties"].get("status") == "built")
stats = {
"total_parcels": len(all_parcels),
"total_blocks": len(blocks),
"total_roads": len(display_road_polys),
"culdesacs": len(cds_polys),
"parcels_no_access": no_acc,
"parcels_built": built,
"parcels_vacant": len(all_parcels) - built,
"boundary_area_m2": round(boundary.area * sc**2, 1),
"road_area_m2": round(road_union.area * sc**2, 1) if not road_union.is_empty else 0,
"buildable_area_m2": round(sum(b.area * sc**2 for b in blocks), 1),
"avg_parcel_area_m2": round(sum(areas)/len(areas), 1) if areas else 0,
"min_parcel_area_m2": round(min(areas), 1) if areas else 0,
"existing_buildings": len(building_geoms),
"user_roads_drawn": len(user_roads),
"config": {
"min_frontage": config.min_frontage, "min_depth": config.min_depth,
"road_width": config.road_width, "max_block_length": config.max_block_length,
}
}
return {
"parcels": all_parcels,
"roads": road_features,
"blocks": block_features,
"culdesacs": cds_features,
"stats": stats,
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"{e}\n{traceback.format_exc()}")
@app.get("/health")
async def health():
return {"status": "ok", "version": "3.0.0"}
@app.get("/config/defaults")
async def get_defaults():
return SubdivisionConfig().model_dump()