berlin-picnic-api/app/services/street_tree_service.py

462 lines
18 KiB
Python

import json
import math
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
from geopy.distance import geodesic
from rtree import index
import asyncio
import aiofiles
from functools import lru_cache
from app.models.street_tree import (
StreetTree, TreeDensityMetrics, TreeShadeAnalysis, TreesSearchFilters,
TreesNearLocationResponse, TreeGenus, TreeHealthStatus
)
from app.models.green_space import Coordinates
class StreetTreeService:
"""Service for accessing and analyzing Berlin street trees data."""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self._trees_cache = None
self._spatial_index = None
self._tree_id_to_data = {}
self.data_dir = Path("app/data")
self.__class__._initialized = True
async def _load_trees(self) -> List[Dict]:
"""Load street trees data from JSON file and build spatial index."""
if self._trees_cache is None:
trees_file = self.data_dir / "processed" / "street_trees.json"
if trees_file.exists():
print("🔄 Loading trees data and building spatial index...")
async with aiofiles.open(trees_file, 'r', encoding='utf-8') as f:
content = await f.read()
data = json.loads(content)
self._trees_cache = data.get("street_trees", [])
await self._build_spatial_index()
print(f"✅ Loaded {len(self._trees_cache)} trees with spatial index")
else:
print("Warning: street_trees.json not found. Run process_street_trees.py first.")
self._trees_cache = []
return self._trees_cache
async def _build_spatial_index(self):
"""Build R-tree spatial index for fast location queries."""
if self._spatial_index is None and self._trees_cache:
print("🔨 Building spatial index...")
self._spatial_index = index.Index()
self._tree_id_to_data = {}
for i, tree_data in enumerate(self._trees_cache):
lat = tree_data.get('lat')
lng = tree_data.get('lng')
if lat is not None and lng is not None:
# R-tree expects (minx, miny, maxx, maxy)
bbox = (lng, lat, lng, lat)
self._spatial_index.insert(i, bbox)
self._tree_id_to_data[i] = tree_data
print(f"✅ Spatial index built for {len(self._tree_id_to_data)} trees")
def _create_tree_from_dict(self, tree_data: Dict) -> StreetTree:
"""Convert tree dictionary to StreetTree model."""
# Map genus to enum
genus_mapping = {
"AHORN": TreeGenus.AHORN,
"LINDE": TreeGenus.LINDE,
"KASTANIE": TreeGenus.KASTANIE,
"ROSSKASTANIE": TreeGenus.ROSSKASTANIE,
"EICHE": TreeGenus.EICHE,
"PLATANE": TreeGenus.PLATANE,
"BIRKE": TreeGenus.BIRKE,
"WEIßDORN": TreeGenus.WEISSDORN,
"PAPPEL": TreeGenus.PAPPEL,
"ESCHE": TreeGenus.ESCHE,
}
genus_german = (tree_data.get('genus_german') or '').upper()
genus_category = genus_mapping.get(genus_german, TreeGenus.OTHER)
# Determine health status based on available data
health_status = TreeHealthStatus.UNKNOWN
if tree_data.get('age'):
age = tree_data['age']
if age > 80:
health_status = TreeHealthStatus.FAIR
elif age > 50:
health_status = TreeHealthStatus.GOOD
elif age > 0:
health_status = TreeHealthStatus.EXCELLENT
return StreetTree(
id=tree_data.get('id', ''),
object_id=tree_data.get('object_id'),
tree_id=tree_data.get('tree_id'),
location_number=tree_data.get('location_number'),
identifier=tree_data.get('identifier'),
object_name=tree_data.get('object_name'),
species_german=tree_data.get('species_german'),
species_botanical=tree_data.get('species_botanical'),
genus_german=tree_data.get('genus_german'),
genus_botanical=tree_data.get('genus_botanical'),
genus_category=genus_category,
coordinates=Coordinates(
lat=tree_data.get('lat', 0.0),
lng=tree_data.get('lng', 0.0)
),
district=tree_data.get('district'),
owner=tree_data.get('owner'),
category=tree_data.get('category'),
street=tree_data.get('street'),
house_number=tree_data.get('house_number'),
address_addition=tree_data.get('address_addition'),
planting_year=tree_data.get('planting_year'),
age=tree_data.get('age'),
crown_diameter_m=tree_data.get('crown_diameter_m'),
trunk_circumference_cm=tree_data.get('trunk_circumference_cm'),
height_m=tree_data.get('height_m'),
health_status=health_status,
last_updated=datetime.now()
)
@lru_cache(maxsize=1000)
def _distance_cache(self, lat1: float, lng1: float, lat2: float, lng2: float) -> float:
"""Cache distance calculations."""
return geodesic((lat1, lng1), (lat2, lng2)).meters
async def get_trees_near_location(
self,
lat: float,
lng: float,
radius_m: int = 500,
limit: Optional[int] = None
) -> TreesNearLocationResponse:
"""Get street trees within a radius of a location using spatial index."""
start_time = datetime.now()
await self._load_trees()
nearby_trees = []
if self._spatial_index is None:
# Fallback to linear search if index failed
return await self._get_trees_linear_search(lat, lng, radius_m, limit)
# Convert radius to approximate bounding box for R-tree query
# Rough approximation: 1 degree ≈ 111km
radius_deg = radius_m / 111000
bbox = (lng - radius_deg, lat - radius_deg, lng + radius_deg, lat + radius_deg)
# Query spatial index for candidates
candidate_ids = list(self._spatial_index.intersection(bbox))
# Filter candidates by exact distance
tree_distances = []
for tree_id in candidate_ids:
tree_data = self._tree_id_to_data.get(tree_id)
if not tree_data:
continue
tree_lat = tree_data.get('lat')
tree_lng = tree_data.get('lng')
if tree_lat is None or tree_lng is None:
continue
distance = self._distance_cache(lat, lng, tree_lat, tree_lng)
if distance <= radius_m:
tree = self._create_tree_from_dict(tree_data)
tree_distances.append((tree, distance))
if limit and len(tree_distances) >= limit:
break
# Sort by distance
tree_distances.sort(key=lambda x: x[1])
nearby_trees = [tree for tree, _ in tree_distances]
# Calculate metrics
metrics = self._calculate_tree_density_metrics(nearby_trees, radius_m)
shade_analysis = self._analyze_shade_coverage(lat, lng, nearby_trees)
query_time = (datetime.now() - start_time).total_seconds() * 1000
return TreesNearLocationResponse(
location=Coordinates(lat=lat, lng=lng),
radius_m=radius_m,
trees=nearby_trees,
metrics=metrics,
shade_analysis=shade_analysis,
total_found=len(nearby_trees),
query_time_ms=int(query_time)
)
def _calculate_tree_density_metrics(
self,
trees: List[StreetTree],
radius_m: int
) -> TreeDensityMetrics:
"""Calculate tree density and coverage metrics."""
if not trees:
return TreeDensityMetrics()
area_hectares = (math.pi * radius_m * radius_m) / 10000 # Convert to hectares
# Calculate averages
ages = [t.age for t in trees if t.age is not None]
heights = [t.height_m for t in trees if t.height_m is not None]
crowns = [t.crown_diameter_m for t in trees if t.crown_diameter_m is not None]
avg_age = sum(ages) / len(ages) if ages else None
avg_height = sum(heights) / len(heights) if heights else None
avg_crown = sum(crowns) / len(crowns) if crowns else None
# Count mature vs young trees
mature_trees = len([t for t in trees if t.age and t.age > 20])
young_trees = len([t for t in trees if t.age and t.age < 10])
# Calculate shade coverage (rough estimate)
shade_coverage = 0.0
if crowns:
total_crown_area = sum(math.pi * (d/2)**2 for d in crowns if d > 0)
shade_coverage = min(100.0, (total_crown_area / (math.pi * radius_m * radius_m)) * 100)
# Get dominant species
species_count = {}
for tree in trees:
if tree.species_german:
species_count[tree.species_german] = species_count.get(tree.species_german, 0) + 1
dominant_species = sorted(species_count.items(), key=lambda x: x[1], reverse=True)[:3]
dominant_species_names = [species[0] for species in dominant_species]
# Calculate species diversity (simple calculation)
unique_species = len(species_count)
diversity_score = min(100, (unique_species * 10)) if unique_species > 0 else 0
return TreeDensityMetrics(
total_trees=len(trees),
trees_per_hectare=len(trees) / area_hectares if area_hectares > 0 else 0,
average_tree_age=avg_age,
average_height=avg_height,
average_crown_diameter=avg_crown,
shade_coverage_percent=shade_coverage,
mature_trees_count=mature_trees,
young_trees_count=young_trees,
dominant_species=dominant_species_names,
species_diversity_score=diversity_score
)
def _analyze_shade_coverage(
self,
lat: float,
lng: float,
trees: List[StreetTree]
) -> TreeShadeAnalysis:
"""Analyze shade coverage for picnic spot evaluation."""
trees_50m = 0
trees_100m = 0
large_trees = []
for tree in trees:
distance = self._distance_cache(lat, lng, tree.coordinates.lat, tree.coordinates.lng)
if distance <= 50:
trees_50m += 1
if distance <= 100:
trees_100m += 1
# Consider large trees (good crown diameter or height)
if ((tree.crown_diameter_m and tree.crown_diameter_m > 8) or
(tree.height_m and tree.height_m > 15) or
(tree.age and tree.age > 30)):
large_trees.append(tree)
# Estimate shade coverage
shade_coverage = 0
if trees_50m > 0:
shade_coverage = min(100, trees_50m * 15) # Rough estimate
# Shade quality based on tree density and size
shade_quality = 0
if trees_50m > 3:
shade_quality = 80
elif trees_50m > 1:
shade_quality = 60
elif trees_100m > 5:
shade_quality = 40
elif trees_100m > 2:
shade_quality = 20
# Best shade times (simplified)
best_times = []
if shade_quality > 60:
best_times = ["10:00-12:00", "14:00-16:00"]
elif shade_quality > 30:
best_times = ["11:00-13:00"]
return TreeShadeAnalysis(
has_nearby_trees=len(trees) > 0,
trees_within_50m=trees_50m,
trees_within_100m=trees_100m,
estimated_shade_coverage=shade_coverage,
shade_quality_score=shade_quality,
best_shade_times=best_times,
nearby_large_trees=large_trees[:5], # Limit to 5 for response size
canopy_density=len(large_trees) / max(1, len(trees)) if trees else 0
)
async def _get_trees_linear_search(
self,
lat: float,
lng: float,
radius_m: int = 500,
limit: Optional[int] = None
) -> TreesNearLocationResponse:
"""Fallback linear search method."""
start_time = datetime.now()
trees_data = await self._load_trees()
nearby_trees = []
for tree_data in trees_data:
tree_lat = tree_data.get('lat')
tree_lng = tree_data.get('lng')
if tree_lat is None or tree_lng is None:
continue
distance = self._distance_cache(lat, lng, tree_lat, tree_lng)
if distance <= radius_m:
tree = self._create_tree_from_dict(tree_data)
nearby_trees.append(tree)
if limit and len(nearby_trees) >= limit:
break
# Sort by distance
nearby_trees.sort(
key=lambda t: self._distance_cache(lat, lng, t.coordinates.lat, t.coordinates.lng)
)
# Calculate metrics
metrics = self._calculate_tree_density_metrics(nearby_trees, radius_m)
shade_analysis = self._analyze_shade_coverage(lat, lng, nearby_trees)
query_time = (datetime.now() - start_time).total_seconds() * 1000
return TreesNearLocationResponse(
location=Coordinates(lat=lat, lng=lng),
radius_m=radius_m,
trees=nearby_trees,
metrics=metrics,
shade_analysis=shade_analysis,
total_found=len(nearby_trees),
query_time_ms=int(query_time)
)
async def search_trees(self, filters: TreesSearchFilters) -> List[StreetTree]:
"""Search trees with filters."""
trees_data = await self._load_trees()
filtered_trees = []
for tree_data in trees_data:
# Apply location filter first if specified
if (filters.center_lat and filters.center_lng and filters.within_radius_m):
tree_lat = tree_data.get('lat')
tree_lng = tree_data.get('lng')
if tree_lat is None or tree_lng is None:
continue
distance = self._distance_cache(
filters.center_lat, filters.center_lng,
tree_lat, tree_lng
)
if distance > filters.within_radius_m:
continue
# Apply other filters
if filters.species and tree_data.get('species_german') not in filters.species:
continue
if filters.district and tree_data.get('district') != filters.district:
continue
if filters.min_age and (not tree_data.get('age') or tree_data['age'] < filters.min_age):
continue
if filters.max_age and (not tree_data.get('age') or tree_data['age'] > filters.max_age):
continue
if filters.min_height and (not tree_data.get('height_m') or tree_data['height_m'] < filters.min_height):
continue
if filters.max_height and (not tree_data.get('height_m') or tree_data['height_m'] > filters.max_height):
continue
tree = self._create_tree_from_dict(tree_data)
filtered_trees.append(tree)
return filtered_trees
async def get_tree_stats(self) -> Dict[str, Any]:
"""Get overall statistics about Berlin street trees."""
trees_data = await self._load_trees()
if not trees_data:
return {"error": "No tree data available"}
# Count by district
district_counts = {}
species_counts = {}
age_distribution = {"0-10": 0, "11-20": 0, "21-50": 0, "51+": 0, "unknown": 0}
for tree in trees_data:
# District stats
district = tree.get('district')
if district:
district_counts[district] = district_counts.get(district, 0) + 1
# Species stats
species = tree.get('species_german')
if species:
species_counts[species] = species_counts.get(species, 0) + 1
# Age distribution
age = tree.get('age')
if age is None:
age_distribution["unknown"] += 1
elif age <= 10:
age_distribution["0-10"] += 1
elif age <= 20:
age_distribution["11-20"] += 1
elif age <= 50:
age_distribution["21-50"] += 1
else:
age_distribution["51+"] += 1
# Top 10 species
top_species = sorted(species_counts.items(), key=lambda x: x[1], reverse=True)[:10]
return {
"total_trees": len(trees_data),
"districts": len(district_counts),
"unique_species": len(species_counts),
"district_counts": district_counts,
"age_distribution": age_distribution,
"top_species": dict(top_species),
"last_updated": datetime.now().isoformat()
}