import json import math from pathlib import Path from typing import List, Optional, Tuple, Dict, Any from datetime import datetime from geopy.distance import geodesic from app.models.street_tree import ( StreetTree, TreeDensityMetrics, TreeShadeAnalysis, TreesSearchFilters, TreesNearLocationResponse, TreeGenus, TreeHealthStatus ) from app.models.green_space import Coordinates class StreetTreeService: """Service for accessing and analyzing Berlin street trees data.""" def __init__(self): self._trees_cache = None self._trees_index = None self.data_dir = Path("app/data") def _load_trees(self) -> List[Dict]: """Load street trees data from JSON file.""" if self._trees_cache is None: trees_file = self.data_dir / "processed" / "street_trees.json" if trees_file.exists(): with open(trees_file, 'r', encoding='utf-8') as f: data = json.load(f) self._trees_cache = data.get("street_trees", []) else: print("Warning: street_trees.json not found. Run process_street_trees.py first.") self._trees_cache = [] return self._trees_cache def _create_tree_from_dict(self, tree_data: Dict) -> StreetTree: """Convert tree dictionary to StreetTree model.""" # Map genus to enum genus_mapping = { "AHORN": TreeGenus.AHORN, "LINDE": TreeGenus.LINDE, "KASTANIE": TreeGenus.KASTANIE, "ROSSKASTANIE": TreeGenus.ROSSKASTANIE, "EICHE": TreeGenus.EICHE, "PLATANE": TreeGenus.PLATANE, "BIRKE": TreeGenus.BIRKE, "WEIßDORN": TreeGenus.WEISSDORN, "PAPPEL": TreeGenus.PAPPEL, "ESCHE": TreeGenus.ESCHE, } genus_german = (tree_data.get('genus_german') or '').upper() genus_category = genus_mapping.get(genus_german, TreeGenus.OTHER) # Determine health status based on available data health_status = TreeHealthStatus.UNKNOWN if tree_data.get('age'): age = tree_data['age'] if age > 80: health_status = TreeHealthStatus.FAIR elif age > 50: health_status = TreeHealthStatus.GOOD elif age > 0: health_status = TreeHealthStatus.EXCELLENT return StreetTree( id=tree_data.get('id', ''), object_id=tree_data.get('object_id'), tree_id=tree_data.get('tree_id'), location_number=tree_data.get('location_number'), identifier=tree_data.get('identifier'), object_name=tree_data.get('object_name'), species_german=tree_data.get('species_german'), species_botanical=tree_data.get('species_botanical'), genus_german=tree_data.get('genus_german'), genus_botanical=tree_data.get('genus_botanical'), genus_category=genus_category, coordinates=Coordinates( lat=tree_data.get('lat', 0.0), lng=tree_data.get('lng', 0.0) ), district=tree_data.get('district'), owner=tree_data.get('owner'), category=tree_data.get('category'), street=tree_data.get('street'), house_number=tree_data.get('house_number'), address_addition=tree_data.get('address_addition'), planting_year=tree_data.get('planting_year'), age=tree_data.get('age'), crown_diameter_m=tree_data.get('crown_diameter_m'), trunk_circumference_cm=tree_data.get('trunk_circumference_cm'), height_m=tree_data.get('height_m'), health_status=health_status, last_updated=datetime.now() ) async def get_trees_near_location( self, lat: float, lng: float, radius_m: int = 500, limit: Optional[int] = None ) -> TreesNearLocationResponse: """Get street trees within a radius of a location.""" start_time = datetime.now() trees_data = self._load_trees() nearby_trees = [] for tree_data in trees_data: tree_lat = tree_data.get('lat') tree_lng = tree_data.get('lng') if tree_lat is None or tree_lng is None: continue distance = geodesic((lat, lng), (tree_lat, tree_lng)).meters if distance <= radius_m: tree = self._create_tree_from_dict(tree_data) nearby_trees.append(tree) if limit and len(nearby_trees) >= limit: break # Sort by distance nearby_trees.sort( key=lambda t: geodesic((lat, lng), (t.coordinates.lat, t.coordinates.lng)).meters ) # Calculate metrics metrics = self._calculate_tree_density_metrics(nearby_trees, radius_m) shade_analysis = self._analyze_shade_coverage(lat, lng, nearby_trees) query_time = (datetime.now() - start_time).total_seconds() * 1000 return TreesNearLocationResponse( location=Coordinates(lat=lat, lng=lng), radius_m=radius_m, trees=nearby_trees, metrics=metrics, shade_analysis=shade_analysis, total_found=len(nearby_trees), query_time_ms=int(query_time) ) def _calculate_tree_density_metrics( self, trees: List[StreetTree], radius_m: int ) -> TreeDensityMetrics: """Calculate tree density and coverage metrics.""" if not trees: return TreeDensityMetrics() area_hectares = (math.pi * radius_m * radius_m) / 10000 # Convert to hectares # Calculate averages ages = [t.age for t in trees if t.age is not None] heights = [t.height_m for t in trees if t.height_m is not None] crowns = [t.crown_diameter_m for t in trees if t.crown_diameter_m is not None] avg_age = sum(ages) / len(ages) if ages else None avg_height = sum(heights) / len(heights) if heights else None avg_crown = sum(crowns) / len(crowns) if crowns else None # Count mature vs young trees mature_trees = len([t for t in trees if t.age and t.age > 20]) young_trees = len([t for t in trees if t.age and t.age < 10]) # Calculate shade coverage (rough estimate) shade_coverage = 0.0 if crowns: total_crown_area = sum(math.pi * (d/2)**2 for d in crowns if d > 0) shade_coverage = min(100.0, (total_crown_area / (math.pi * radius_m * radius_m)) * 100) # Get dominant species species_count = {} for tree in trees: if tree.species_german: species_count[tree.species_german] = species_count.get(tree.species_german, 0) + 1 dominant_species = sorted(species_count.items(), key=lambda x: x[1], reverse=True)[:3] dominant_species_names = [species[0] for species in dominant_species] # Calculate species diversity (simple calculation) unique_species = len(species_count) diversity_score = min(100, (unique_species * 10)) if unique_species > 0 else 0 return TreeDensityMetrics( total_trees=len(trees), trees_per_hectare=len(trees) / area_hectares if area_hectares > 0 else 0, average_tree_age=avg_age, average_height=avg_height, average_crown_diameter=avg_crown, shade_coverage_percent=shade_coverage, mature_trees_count=mature_trees, young_trees_count=young_trees, dominant_species=dominant_species_names, species_diversity_score=diversity_score ) def _analyze_shade_coverage( self, lat: float, lng: float, trees: List[StreetTree] ) -> TreeShadeAnalysis: """Analyze shade coverage for picnic spot evaluation.""" trees_50m = 0 trees_100m = 0 large_trees = [] for tree in trees: distance = geodesic((lat, lng), (tree.coordinates.lat, tree.coordinates.lng)).meters if distance <= 50: trees_50m += 1 if distance <= 100: trees_100m += 1 # Consider large trees (good crown diameter or height) if ((tree.crown_diameter_m and tree.crown_diameter_m > 8) or (tree.height_m and tree.height_m > 15) or (tree.age and tree.age > 30)): large_trees.append(tree) # Estimate shade coverage shade_coverage = 0 if trees_50m > 0: shade_coverage = min(100, trees_50m * 15) # Rough estimate # Shade quality based on tree density and size shade_quality = 0 if trees_50m > 3: shade_quality = 80 elif trees_50m > 1: shade_quality = 60 elif trees_100m > 5: shade_quality = 40 elif trees_100m > 2: shade_quality = 20 # Best shade times (simplified) best_times = [] if shade_quality > 60: best_times = ["10:00-12:00", "14:00-16:00"] elif shade_quality > 30: best_times = ["11:00-13:00"] return TreeShadeAnalysis( has_nearby_trees=len(trees) > 0, trees_within_50m=trees_50m, trees_within_100m=trees_100m, estimated_shade_coverage=shade_coverage, shade_quality_score=shade_quality, best_shade_times=best_times, nearby_large_trees=large_trees[:5], # Limit to 5 for response size canopy_density=len(large_trees) / max(1, len(trees)) if trees else 0 ) async def search_trees(self, filters: TreesSearchFilters) -> List[StreetTree]: """Search trees with filters.""" trees_data = self._load_trees() filtered_trees = [] for tree_data in trees_data: # Apply location filter first if specified if (filters.center_lat and filters.center_lng and filters.within_radius_m): tree_lat = tree_data.get('lat') tree_lng = tree_data.get('lng') if tree_lat is None or tree_lng is None: continue distance = geodesic( (filters.center_lat, filters.center_lng), (tree_lat, tree_lng) ).meters if distance > filters.within_radius_m: continue # Apply other filters if filters.species and tree_data.get('species_german') not in filters.species: continue if filters.district and tree_data.get('district') != filters.district: continue if filters.min_age and (not tree_data.get('age') or tree_data['age'] < filters.min_age): continue if filters.max_age and (not tree_data.get('age') or tree_data['age'] > filters.max_age): continue if filters.min_height and (not tree_data.get('height_m') or tree_data['height_m'] < filters.min_height): continue if filters.max_height and (not tree_data.get('height_m') or tree_data['height_m'] > filters.max_height): continue tree = self._create_tree_from_dict(tree_data) filtered_trees.append(tree) return filtered_trees async def get_tree_stats(self) -> Dict[str, Any]: """Get overall statistics about Berlin street trees.""" trees_data = self._load_trees() if not trees_data: return {"error": "No tree data available"} # Count by district district_counts = {} species_counts = {} age_distribution = {"0-10": 0, "11-20": 0, "21-50": 0, "51+": 0, "unknown": 0} for tree in trees_data: # District stats district = tree.get('district') if district: district_counts[district] = district_counts.get(district, 0) + 1 # Species stats species = tree.get('species_german') if species: species_counts[species] = species_counts.get(species, 0) + 1 # Age distribution age = tree.get('age') if age is None: age_distribution["unknown"] += 1 elif age <= 10: age_distribution["0-10"] += 1 elif age <= 20: age_distribution["11-20"] += 1 elif age <= 50: age_distribution["21-50"] += 1 else: age_distribution["51+"] += 1 # Top 10 species top_species = sorted(species_counts.items(), key=lambda x: x[1], reverse=True)[:10] return { "total_trees": len(trees_data), "districts": len(district_counts), "unique_species": len(species_counts), "district_counts": district_counts, "age_distribution": age_distribution, "top_species": dict(top_species), "last_updated": datetime.now().isoformat() }