berlin-picnic-api/app/services/street_tree_service.py

353 lines
13 KiB
Python

import json
import math
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
from geopy.distance import geodesic
from app.models.street_tree import (
StreetTree, TreeDensityMetrics, TreeShadeAnalysis, TreesSearchFilters,
TreesNearLocationResponse, TreeGenus, TreeHealthStatus
)
from app.models.green_space import Coordinates
class StreetTreeService:
"""Service for accessing and analyzing Berlin street trees data."""
def __init__(self):
self._trees_cache = None
self._trees_index = None
self.data_dir = Path("app/data")
def _load_trees(self) -> List[Dict]:
"""Load street trees data from JSON file."""
if self._trees_cache is None:
trees_file = self.data_dir / "processed" / "street_trees.json"
if trees_file.exists():
with open(trees_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self._trees_cache = data.get("street_trees", [])
else:
print("Warning: street_trees.json not found. Run process_street_trees.py first.")
self._trees_cache = []
return self._trees_cache
def _create_tree_from_dict(self, tree_data: Dict) -> StreetTree:
"""Convert tree dictionary to StreetTree model."""
# Map genus to enum
genus_mapping = {
"AHORN": TreeGenus.AHORN,
"LINDE": TreeGenus.LINDE,
"KASTANIE": TreeGenus.KASTANIE,
"ROSSKASTANIE": TreeGenus.ROSSKASTANIE,
"EICHE": TreeGenus.EICHE,
"PLATANE": TreeGenus.PLATANE,
"BIRKE": TreeGenus.BIRKE,
"WEIßDORN": TreeGenus.WEISSDORN,
"PAPPEL": TreeGenus.PAPPEL,
"ESCHE": TreeGenus.ESCHE,
}
genus_german = (tree_data.get('genus_german') or '').upper()
genus_category = genus_mapping.get(genus_german, TreeGenus.OTHER)
# Determine health status based on available data
health_status = TreeHealthStatus.UNKNOWN
if tree_data.get('age'):
age = tree_data['age']
if age > 80:
health_status = TreeHealthStatus.FAIR
elif age > 50:
health_status = TreeHealthStatus.GOOD
elif age > 0:
health_status = TreeHealthStatus.EXCELLENT
return StreetTree(
id=tree_data.get('id', ''),
object_id=tree_data.get('object_id'),
tree_id=tree_data.get('tree_id'),
location_number=tree_data.get('location_number'),
identifier=tree_data.get('identifier'),
object_name=tree_data.get('object_name'),
species_german=tree_data.get('species_german'),
species_botanical=tree_data.get('species_botanical'),
genus_german=tree_data.get('genus_german'),
genus_botanical=tree_data.get('genus_botanical'),
genus_category=genus_category,
coordinates=Coordinates(
lat=tree_data.get('lat', 0.0),
lng=tree_data.get('lng', 0.0)
),
district=tree_data.get('district'),
owner=tree_data.get('owner'),
category=tree_data.get('category'),
street=tree_data.get('street'),
house_number=tree_data.get('house_number'),
address_addition=tree_data.get('address_addition'),
planting_year=tree_data.get('planting_year'),
age=tree_data.get('age'),
crown_diameter_m=tree_data.get('crown_diameter_m'),
trunk_circumference_cm=tree_data.get('trunk_circumference_cm'),
height_m=tree_data.get('height_m'),
health_status=health_status,
last_updated=datetime.now()
)
async def get_trees_near_location(
self,
lat: float,
lng: float,
radius_m: int = 500,
limit: Optional[int] = None
) -> TreesNearLocationResponse:
"""Get street trees within a radius of a location."""
start_time = datetime.now()
trees_data = self._load_trees()
nearby_trees = []
for tree_data in trees_data:
tree_lat = tree_data.get('lat')
tree_lng = tree_data.get('lng')
if tree_lat is None or tree_lng is None:
continue
distance = geodesic((lat, lng), (tree_lat, tree_lng)).meters
if distance <= radius_m:
tree = self._create_tree_from_dict(tree_data)
nearby_trees.append(tree)
if limit and len(nearby_trees) >= limit:
break
# Sort by distance
nearby_trees.sort(
key=lambda t: geodesic((lat, lng), (t.coordinates.lat, t.coordinates.lng)).meters
)
# Calculate metrics
metrics = self._calculate_tree_density_metrics(nearby_trees, radius_m)
shade_analysis = self._analyze_shade_coverage(lat, lng, nearby_trees)
query_time = (datetime.now() - start_time).total_seconds() * 1000
return TreesNearLocationResponse(
location=Coordinates(lat=lat, lng=lng),
radius_m=radius_m,
trees=nearby_trees,
metrics=metrics,
shade_analysis=shade_analysis,
total_found=len(nearby_trees),
query_time_ms=int(query_time)
)
def _calculate_tree_density_metrics(
self,
trees: List[StreetTree],
radius_m: int
) -> TreeDensityMetrics:
"""Calculate tree density and coverage metrics."""
if not trees:
return TreeDensityMetrics()
area_hectares = (math.pi * radius_m * radius_m) / 10000 # Convert to hectares
# Calculate averages
ages = [t.age for t in trees if t.age is not None]
heights = [t.height_m for t in trees if t.height_m is not None]
crowns = [t.crown_diameter_m for t in trees if t.crown_diameter_m is not None]
avg_age = sum(ages) / len(ages) if ages else None
avg_height = sum(heights) / len(heights) if heights else None
avg_crown = sum(crowns) / len(crowns) if crowns else None
# Count mature vs young trees
mature_trees = len([t for t in trees if t.age and t.age > 20])
young_trees = len([t for t in trees if t.age and t.age < 10])
# Calculate shade coverage (rough estimate)
shade_coverage = 0.0
if crowns:
total_crown_area = sum(math.pi * (d/2)**2 for d in crowns if d > 0)
shade_coverage = min(100.0, (total_crown_area / (math.pi * radius_m * radius_m)) * 100)
# Get dominant species
species_count = {}
for tree in trees:
if tree.species_german:
species_count[tree.species_german] = species_count.get(tree.species_german, 0) + 1
dominant_species = sorted(species_count.items(), key=lambda x: x[1], reverse=True)[:3]
dominant_species_names = [species[0] for species in dominant_species]
# Calculate species diversity (simple calculation)
unique_species = len(species_count)
diversity_score = min(100, (unique_species * 10)) if unique_species > 0 else 0
return TreeDensityMetrics(
total_trees=len(trees),
trees_per_hectare=len(trees) / area_hectares if area_hectares > 0 else 0,
average_tree_age=avg_age,
average_height=avg_height,
average_crown_diameter=avg_crown,
shade_coverage_percent=shade_coverage,
mature_trees_count=mature_trees,
young_trees_count=young_trees,
dominant_species=dominant_species_names,
species_diversity_score=diversity_score
)
def _analyze_shade_coverage(
self,
lat: float,
lng: float,
trees: List[StreetTree]
) -> TreeShadeAnalysis:
"""Analyze shade coverage for picnic spot evaluation."""
trees_50m = 0
trees_100m = 0
large_trees = []
for tree in trees:
distance = geodesic((lat, lng), (tree.coordinates.lat, tree.coordinates.lng)).meters
if distance <= 50:
trees_50m += 1
if distance <= 100:
trees_100m += 1
# Consider large trees (good crown diameter or height)
if ((tree.crown_diameter_m and tree.crown_diameter_m > 8) or
(tree.height_m and tree.height_m > 15) or
(tree.age and tree.age > 30)):
large_trees.append(tree)
# Estimate shade coverage
shade_coverage = 0
if trees_50m > 0:
shade_coverage = min(100, trees_50m * 15) # Rough estimate
# Shade quality based on tree density and size
shade_quality = 0
if trees_50m > 3:
shade_quality = 80
elif trees_50m > 1:
shade_quality = 60
elif trees_100m > 5:
shade_quality = 40
elif trees_100m > 2:
shade_quality = 20
# Best shade times (simplified)
best_times = []
if shade_quality > 60:
best_times = ["10:00-12:00", "14:00-16:00"]
elif shade_quality > 30:
best_times = ["11:00-13:00"]
return TreeShadeAnalysis(
has_nearby_trees=len(trees) > 0,
trees_within_50m=trees_50m,
trees_within_100m=trees_100m,
estimated_shade_coverage=shade_coverage,
shade_quality_score=shade_quality,
best_shade_times=best_times,
nearby_large_trees=large_trees[:5], # Limit to 5 for response size
canopy_density=len(large_trees) / max(1, len(trees)) if trees else 0
)
async def search_trees(self, filters: TreesSearchFilters) -> List[StreetTree]:
"""Search trees with filters."""
trees_data = self._load_trees()
filtered_trees = []
for tree_data in trees_data:
# Apply location filter first if specified
if (filters.center_lat and filters.center_lng and filters.within_radius_m):
tree_lat = tree_data.get('lat')
tree_lng = tree_data.get('lng')
if tree_lat is None or tree_lng is None:
continue
distance = geodesic(
(filters.center_lat, filters.center_lng),
(tree_lat, tree_lng)
).meters
if distance > filters.within_radius_m:
continue
# Apply other filters
if filters.species and tree_data.get('species_german') not in filters.species:
continue
if filters.district and tree_data.get('district') != filters.district:
continue
if filters.min_age and (not tree_data.get('age') or tree_data['age'] < filters.min_age):
continue
if filters.max_age and (not tree_data.get('age') or tree_data['age'] > filters.max_age):
continue
if filters.min_height and (not tree_data.get('height_m') or tree_data['height_m'] < filters.min_height):
continue
if filters.max_height and (not tree_data.get('height_m') or tree_data['height_m'] > filters.max_height):
continue
tree = self._create_tree_from_dict(tree_data)
filtered_trees.append(tree)
return filtered_trees
async def get_tree_stats(self) -> Dict[str, Any]:
"""Get overall statistics about Berlin street trees."""
trees_data = self._load_trees()
if not trees_data:
return {"error": "No tree data available"}
# Count by district
district_counts = {}
species_counts = {}
age_distribution = {"0-10": 0, "11-20": 0, "21-50": 0, "51+": 0, "unknown": 0}
for tree in trees_data:
# District stats
district = tree.get('district')
if district:
district_counts[district] = district_counts.get(district, 0) + 1
# Species stats
species = tree.get('species_german')
if species:
species_counts[species] = species_counts.get(species, 0) + 1
# Age distribution
age = tree.get('age')
if age is None:
age_distribution["unknown"] += 1
elif age <= 10:
age_distribution["0-10"] += 1
elif age <= 20:
age_distribution["11-20"] += 1
elif age <= 50:
age_distribution["21-50"] += 1
else:
age_distribution["51+"] += 1
# Top 10 species
top_species = sorted(species_counts.items(), key=lambda x: x[1], reverse=True)[:10]
return {
"total_trees": len(trees_data),
"districts": len(district_counts),
"unique_species": len(species_counts),
"district_counts": district_counts,
"age_distribution": age_distribution,
"top_species": dict(top_species),
"last_updated": datetime.now().isoformat()
}