Compare commits

..

No commits in common. "c14f5ead380a51677021a92ae8102e09923754a6" and "59e27534b0f0dd3eb2b5d5b878b636a4319e4bf9" have entirely different histories.

6 changed files with 17 additions and 1036 deletions

View File

@ -1,133 +0,0 @@
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
from enum import Enum
from .green_space import Coordinates
class TreeCategory(str, Enum):
STREET_TREE = "street_tree"
PARK_TREE = "park_tree"
ANLAGEBAUM = "anlagebaum"
ALLEE_TREE = "allee_tree"
class TreeHealthStatus(str, Enum):
EXCELLENT = "excellent"
GOOD = "good"
FAIR = "fair"
POOR = "poor"
CRITICAL = "critical"
UNKNOWN = "unknown"
class TreeGenus(str, Enum):
AHORN = "ahorn"
LINDE = "linde"
KASTANIE = "kastanie"
EICHE = "eiche"
PLATANE = "platane"
BIRKE = "birke"
WEISSDORN = "weissdorn"
ROSSKASTANIE = "rosskastanie"
PAPPEL = "pappel"
ESCHE = "esche"
OTHER = "other"
class StreetTree(BaseModel):
"""Individual street tree model based on Berlin Baumkataster data."""
id: str
object_id: Optional[int] = None
tree_id: Optional[str] = None
location_number: Optional[str] = None
identifier: Optional[str] = None
object_name: Optional[str] = None
species_german: Optional[str] = None
species_botanical: Optional[str] = None
genus_german: Optional[str] = None
genus_botanical: Optional[str] = None
genus_category: Optional[TreeGenus] = None
coordinates: Coordinates
district: Optional[str] = None
owner: Optional[str] = None
category: Optional[str] = None
street: Optional[str] = None
house_number: Optional[str] = None
address_addition: Optional[str] = None
full_address: Optional[str] = None
planting_year: Optional[int] = None
age: Optional[int] = None
crown_diameter_m: Optional[float] = None
trunk_circumference_cm: Optional[int] = None
height_m: Optional[float] = None
health_status: TreeHealthStatus = TreeHealthStatus.UNKNOWN
confidence_score: int = Field(80, ge=0, le=100)
last_updated: datetime = Field(default_factory=datetime.now)
class TreeDensityMetrics(BaseModel):
"""Tree density and coverage metrics for an area."""
total_trees: int = 0
trees_per_hectare: float = 0.0
average_tree_age: Optional[float] = None
average_height: Optional[float] = None
average_crown_diameter: Optional[float] = None
shade_coverage_percent: float = Field(0.0, ge=0, le=100)
mature_trees_count: int = 0 # Trees older than 20 years
young_trees_count: int = 0 # Trees younger than 10 years
dominant_species: List[str] = []
species_diversity_score: int = Field(0, ge=0, le=100)
class TreeShadeAnalysis(BaseModel):
"""Shade analysis for picnic spot evaluation."""
has_nearby_trees: bool = False
trees_within_50m: int = 0
trees_within_100m: int = 0
estimated_shade_coverage: int = Field(0, ge=0, le=100)
shade_quality_score: int = Field(0, ge=0, le=100)
best_shade_times: List[str] = [] # Time periods with best shade
seasonal_shade_variation: Optional[str] = None
nearby_large_trees: List[StreetTree] = []
canopy_density: Optional[float] = None
class TreesSearchFilters(BaseModel):
"""Filters for searching trees."""
species: Optional[List[str]] = None
genus: Optional[List[TreeGenus]] = None
min_age: Optional[int] = None
max_age: Optional[int] = None
min_height: Optional[float] = None
max_height: Optional[float] = None
min_crown_diameter: Optional[float] = None
district: Optional[str] = None
category: Optional[str] = None
within_radius_m: Optional[int] = None
center_lat: Optional[float] = None
center_lng: Optional[float] = None
class TreesNearLocationResponse(BaseModel):
"""Response for trees near a location query."""
location: Coordinates
radius_m: int
trees: List[StreetTree]
metrics: TreeDensityMetrics
shade_analysis: TreeShadeAnalysis
total_found: int
query_time_ms: Optional[int] = None
data_source: str = "baumkataster"
last_updated: datetime = Field(default_factory=datetime.now)

View File

@ -11,7 +11,6 @@ from app.models.green_space import (
EnvironmentalFeatures, AccessibilityFeatures, RecreationFeatures, EnvironmentalFeatures, AccessibilityFeatures, RecreationFeatures,
NoiseLevel, LocationScore NoiseLevel, LocationScore
) )
from app.services.street_tree_service import StreetTreeService
class BerlinDataService: class BerlinDataService:
"""Service for accessing Berlin open data and external APIs.""" """Service for accessing Berlin open data and external APIs."""
@ -20,9 +19,7 @@ class BerlinDataService:
self.cache = {} self.cache = {}
self.last_refresh = None self.last_refresh = None
self._toilets_cache = None self._toilets_cache = None
self._street_trees_index = None
self.data_dir = Path("app/data") self.data_dir = Path("app/data")
self.street_tree_service = StreetTreeService()
async def search_green_spaces( async def search_green_spaces(
self, self,
@ -74,7 +71,7 @@ class BerlinDataService:
return None return None
async def get_green_space_at_location(self, lat: float, lng: float) -> Optional[GreenSpace]: async def get_green_space_at_location(self, lat: float, lng: float) -> Optional[GreenSpace]:
"""Check if a location is within a green space and enhance it with real tree data.""" """Check if a location is within a green space."""
spaces = await self._get_mock_green_spaces() spaces = await self._get_mock_green_spaces()
for space in spaces: for space in spaces:
# Simple distance check - in reality you'd use proper polygon containment # Simple distance check - in reality you'd use proper polygon containment
@ -82,10 +79,8 @@ class BerlinDataService:
(lat, lng), (lat, lng),
(space.coordinates.lat, space.coordinates.lng) (space.coordinates.lat, space.coordinates.lng)
).meters ).meters
if distance < 500: # Within 500m of center (larger radius for better coverage) if distance < 100: # Within 100m of center
# Enhance the green space with real tree data return space
enhanced_space = await self._enhance_green_space_with_real_trees(space, lat, lng)
return enhanced_space
return None return None
async def get_green_spaces_within_radius( async def get_green_spaces_within_radius(
@ -343,57 +338,6 @@ class BerlinDataService:
return sorted(nearby_toilets, key=lambda x: x['distance_meters']) return sorted(nearby_toilets, key=lambda x: x['distance_meters'])
async def _enhance_green_space_with_real_trees(self, green_space: GreenSpace, actual_lat: float, actual_lng: float) -> GreenSpace:
"""Enhance green space environmental features with real tree data."""
try:
# Get real tree data for the actual location (not just the park center)
tree_response = await self.street_tree_service.get_trees_near_location(
actual_lat, actual_lng, radius_m=300
)
# Calculate enhanced environmental features using real tree data
tree_coverage = max(
green_space.environmental.tree_coverage_percent,
int(tree_response.shade_analysis.estimated_shade_coverage)
)
shade_quality = max(
green_space.environmental.shade_quality,
tree_response.shade_analysis.shade_quality_score
)
wildlife_diversity = max(
green_space.environmental.wildlife_diversity_score,
tree_response.metrics.species_diversity_score
)
# Create enhanced environmental features
enhanced_environmental = EnvironmentalFeatures(
tree_coverage_percent=min(100, tree_coverage),
shade_quality=min(100, shade_quality),
noise_level=green_space.environmental.noise_level,
wildlife_diversity_score=min(100, wildlife_diversity),
water_features=green_space.environmental.water_features,
natural_surface_percent=green_space.environmental.natural_surface_percent
)
# Create enhanced green space with real tree data
enhanced_space = green_space.model_copy(update={
"environmental": enhanced_environmental,
"coordinates": Coordinates(lat=actual_lat, lng=actual_lng) # Use actual query location
})
# Update data sources to indicate real tree data is used
if "real_street_trees" not in enhanced_space.data_sources:
enhanced_space.data_sources.append("real_street_trees")
return enhanced_space
except Exception as e:
print(f"Error enhancing green space with real tree data: {e}")
# Return original space if enhancement fails
return green_space
async def _get_mock_green_spaces(self) -> List[GreenSpace]: async def _get_mock_green_spaces(self) -> List[GreenSpace]:
"""Get mock green spaces data for development.""" """Get mock green spaces data for development."""
# This would be replaced with real data fetching in production # This would be replaced with real data fetching in production

View File

@ -8,14 +8,12 @@ from app.models.green_space import (
Coordinates, Amenity, AmenityType Coordinates, Amenity, AmenityType
) )
from app.services.berlin_data_service import BerlinDataService from app.services.berlin_data_service import BerlinDataService
from app.services.street_tree_service import StreetTreeService
class ScoringEngine: class ScoringEngine:
"""Dynamic scoring engine for green spaces based on personality preferences.""" """Dynamic scoring engine for green spaces based on personality preferences."""
def __init__(self): def __init__(self):
self.berlin_data = BerlinDataService() self.berlin_data = BerlinDataService()
self.street_tree_service = StreetTreeService()
self.personality_weights = self._initialize_personality_weights() self.personality_weights = self._initialize_personality_weights()
def _initialize_personality_weights(self) -> Dict[str, Dict[str, float]]: def _initialize_personality_weights(self) -> Dict[str, Dict[str, float]]:
@ -99,12 +97,9 @@ class ScoringEngine:
if not weights: if not weights:
raise ValueError(f"Unknown personality type: {personality}") raise ValueError(f"Unknown personality type: {personality}")
# Pre-fetch tree data once for all calculations # Calculate component scores
tree_data = await self._fetch_tree_data_once(green_space)
# Calculate component scores with cached tree data
component_scores = await self._calculate_component_scores( component_scores = await self._calculate_component_scores(
green_space, personality, user_location, tree_data green_space, personality, user_location
) )
# Calculate weighted final score # Calculate weighted final score
@ -137,26 +132,11 @@ class ScoringEngine:
recommendations=recommendations recommendations=recommendations
) )
async def _fetch_tree_data_once(self, green_space: GreenSpace) -> Optional[Any]:
"""Fetch tree data once and reuse for all calculations."""
try:
# Use the largest radius needed across all methods (400m)
tree_response = await self.street_tree_service.get_trees_near_location(
green_space.coordinates.lat,
green_space.coordinates.lng,
radius_m=400
)
return tree_response
except Exception as e:
print(f"Error fetching tree data: {e}")
return None
async def _calculate_component_scores( async def _calculate_component_scores(
self, self,
green_space: GreenSpace, green_space: GreenSpace,
personality: str, personality: str,
user_location: Optional[Tuple[float, float]] = None, user_location: Optional[Tuple[float, float]] = None
tree_data: Optional[Any] = None
) -> Dict[str, int]: ) -> Dict[str, int]:
"""Calculate individual component scores.""" """Calculate individual component scores."""
scores = {} scores = {}
@ -169,7 +149,7 @@ class ScoringEngine:
# Personality-specific components # Personality-specific components
if personality == "little_adventurers": if personality == "little_adventurers":
scores["playground_quality"] = green_space.recreation.playground_quality scores["playground_quality"] = green_space.recreation.playground_quality
scores["shade_quality"] = self._score_shade_quality_with_trees(green_space, tree_data) scores["shade_quality"] = green_space.environmental.shade_quality
scores["toilet_proximity"] = await self._score_toilet_proximity(green_space) scores["toilet_proximity"] = await self._score_toilet_proximity(green_space)
scores["family_amenities"] = await self._score_family_amenities(green_space) scores["family_amenities"] = await self._score_family_amenities(green_space)
@ -189,11 +169,11 @@ class ScoringEngine:
elif personality == "zen_masters": elif personality == "zen_masters":
scores["quietness"] = self._score_quietness(green_space) scores["quietness"] = self._score_quietness(green_space)
scores["nature_immersion"] = self._score_nature_immersion_with_trees(green_space, tree_data) scores["nature_immersion"] = self._score_nature_immersion(green_space)
scores["crowd_density"] = await self._score_crowd_density(green_space) scores["crowd_density"] = await self._score_crowd_density(green_space)
scores["water_features"] = 100 if green_space.environmental.water_features else 0 scores["water_features"] = 100 if green_space.environmental.water_features else 0
scores["meditation_spots"] = self._score_meditation_spots_with_trees(green_space, tree_data) scores["meditation_spots"] = self._score_meditation_spots(green_space)
scores["air_quality"] = self._score_air_quality_with_trees(green_space, tree_data) scores["air_quality"] = await self._score_air_quality(green_space)
elif personality == "active_lifestyle": elif personality == "active_lifestyle":
scores["fitness_facilities"] = 100 if green_space.recreation.sports_facilities else 0 scores["fitness_facilities"] = 100 if green_space.recreation.sports_facilities else 0
@ -202,11 +182,11 @@ class ScoringEngine:
scores["terrain_variety"] = self._score_terrain_variety(green_space) scores["terrain_variety"] = self._score_terrain_variety(green_space)
elif personality == "wildlife_lover": elif personality == "wildlife_lover":
scores["wildlife_diversity"] = self._score_wildlife_diversity_with_trees(green_space, tree_data) scores["wildlife_diversity"] = green_space.environmental.wildlife_diversity_score
scores["natural_habitat"] = self._score_natural_habitat_with_trees(green_space, tree_data) scores["natural_habitat"] = self._score_natural_habitat(green_space)
scores["water_features"] = 100 if green_space.environmental.water_features else 0 scores["water_features"] = 100 if green_space.environmental.water_features else 0
scores["tree_coverage"] = self._score_tree_coverage_with_real_data(green_space, tree_data) scores["tree_coverage"] = green_space.environmental.tree_coverage_percent
scores["observation_spots"] = self._score_observation_spots_with_trees(green_space, tree_data) scores["observation_spots"] = self._score_observation_spots(green_space)
elif personality == "art_nerd": elif personality == "art_nerd":
scores["cultural_proximity"] = await self._score_cultural_proximity(green_space) scores["cultural_proximity"] = await self._score_cultural_proximity(green_space)
@ -409,7 +389,7 @@ class ScoringEngine:
score += 25 score += 25
return min(100, score) return min(100, score)
def _score_air_quality(self, green_space: GreenSpace) -> int: async def _score_air_quality(self, green_space: GreenSpace) -> int:
"""Score air quality.""" """Score air quality."""
score = green_space.environmental.tree_coverage_percent score = green_space.environmental.tree_coverage_percent
if green_space.environmental.natural_surface_percent > 80: if green_space.environmental.natural_surface_percent > 80:
@ -588,7 +568,7 @@ class ScoringEngine:
personality: str, personality: str,
radius: int radius: int
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Score a specific location with optimized performance.""" """Score a specific location."""
# Check if location is in a green space # Check if location is in a green space
green_space = await self.berlin_data.get_green_space_at_location(lat, lng) green_space = await self.berlin_data.get_green_space_at_location(lat, lng)
@ -600,7 +580,7 @@ class ScoringEngine:
"personality": personality "personality": personality
} }
# Score the green space (this now uses cached tree data internally) # Score the green space
personality_score = await self.score_green_space(green_space, personality, (lat, lng)) personality_score = await self.score_green_space(green_space, personality, (lat, lng))
return { return {
@ -643,195 +623,3 @@ class ScoringEngine:
locations.append(location) locations.append(location)
return locations return locations
# === ENHANCED TREE-BASED SCORING METHODS ===
def _score_tree_coverage_with_real_data(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
"""Enhanced tree coverage scoring using real street tree data."""
if not tree_data:
return green_space.environmental.tree_coverage_percent
try:
# Combine base environmental score with real tree data
base_score = green_space.environmental.tree_coverage_percent
tree_shade_coverage = tree_data.shade_analysis.estimated_shade_coverage
# Use the higher of the two scores, with bonus for high tree density
enhanced_score = max(base_score, tree_shade_coverage)
# Bonus for high tree density
if tree_data.metrics.trees_per_hectare > 50:
enhanced_score = min(100, enhanced_score + 15)
elif tree_data.metrics.trees_per_hectare > 20:
enhanced_score = min(100, enhanced_score + 10)
return int(enhanced_score)
except Exception as e:
print(f"Error enhancing tree coverage score: {e}")
return green_space.environmental.tree_coverage_percent
def _score_wildlife_diversity_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
"""Enhanced wildlife diversity scoring using real tree species data."""
if not tree_data:
return green_space.environmental.wildlife_diversity_score
try:
base_score = green_space.environmental.wildlife_diversity_score
tree_diversity = tree_data.metrics.species_diversity_score
mature_trees_bonus = min(20, tree_data.metrics.mature_trees_count)
# Combine scores with weighting
enhanced_score = int((base_score * 0.6) + (tree_diversity * 0.4) + mature_trees_bonus)
return min(100, enhanced_score)
except Exception as e:
print(f"Error enhancing wildlife diversity score: {e}")
return green_space.environmental.wildlife_diversity_score
def _score_shade_quality_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
"""Enhanced shade quality scoring using real tree data."""
if not tree_data:
return green_space.environmental.shade_quality
try:
base_shade = green_space.environmental.shade_quality
tree_shade_quality = tree_data.shade_analysis.shade_quality_score
# Use the better of the two scores
enhanced_score = max(base_shade, tree_shade_quality)
# Bonus for large nearby trees
large_trees_count = len(tree_data.shade_analysis.nearby_large_trees)
if large_trees_count > 5:
enhanced_score = min(100, enhanced_score + 15)
elif large_trees_count > 2:
enhanced_score = min(100, enhanced_score + 10)
return int(enhanced_score)
except Exception as e:
print(f"Error enhancing shade quality score: {e}")
return green_space.environmental.shade_quality
def _score_nature_immersion_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
"""Enhanced nature immersion scoring using real tree data."""
if not tree_data:
return self._score_nature_immersion(green_space)
try:
# Base score from existing method
base_score = green_space.environmental.tree_coverage_percent
base_score += green_space.environmental.natural_surface_percent // 2
if green_space.environmental.water_features:
base_score += 15
# Enhancement from tree data
tree_density_score = min(30, tree_data.metrics.trees_per_hectare)
canopy_density_bonus = int(tree_data.shade_analysis.canopy_density * 20) if tree_data.shade_analysis.canopy_density else 0
species_diversity_bonus = min(15, tree_data.metrics.species_diversity_score // 5)
enhanced_score = base_score + tree_density_score + canopy_density_bonus + species_diversity_bonus
return min(100, int(enhanced_score))
except Exception as e:
print(f"Error enhancing nature immersion score: {e}")
return self._score_nature_immersion(green_space)
def _score_natural_habitat_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
"""Enhanced natural habitat scoring using real tree data."""
if not tree_data:
return self._score_natural_habitat(green_space)
try:
base_score = green_space.environmental.tree_coverage_percent
base_score += green_space.environmental.natural_surface_percent // 2
if green_space.environmental.water_features:
base_score += 15
# Tree habitat quality factors
mature_trees_score = min(25, tree_data.metrics.mature_trees_count // 2)
species_diversity_score = min(20, tree_data.metrics.species_diversity_score // 3)
enhanced_score = base_score + mature_trees_score + species_diversity_score
return min(100, int(enhanced_score))
except Exception as e:
print(f"Error enhancing natural habitat score: {e}")
return self._score_natural_habitat(green_space)
def _score_observation_spots_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
"""Enhanced wildlife observation scoring using real tree data."""
if not tree_data:
return self._score_observation_spots(green_space)
try:
base_score = green_space.environmental.tree_coverage_percent // 2
if green_space.environmental.water_features:
base_score += 30
if green_space.environmental.noise_level.value <= 2:
base_score += 20
# Large trees provide better observation opportunities
large_trees_count = len(tree_data.shade_analysis.nearby_large_trees)
observation_bonus = min(25, large_trees_count * 3)
# Species diversity attracts more wildlife to observe
diversity_bonus = min(15, tree_data.metrics.species_diversity_score // 4)
enhanced_score = base_score + observation_bonus + diversity_bonus
return min(100, int(enhanced_score))
except Exception as e:
print(f"Error enhancing observation spots score: {e}")
return self._score_observation_spots(green_space)
def _score_meditation_spots_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
"""Enhanced meditation spots scoring using real tree data."""
if not tree_data:
return self._score_meditation_spots(green_space)
try:
base_score = green_space.environmental.tree_coverage_percent // 2
if green_space.environmental.water_features:
base_score += 25
if green_space.environmental.noise_level.value <= 2:
base_score += 25
# Trees enhance meditation through natural sounds and shade
shade_quality_bonus = min(20, tree_data.shade_analysis.shade_quality_score // 4)
canopy_bonus = int(tree_data.shade_analysis.canopy_density * 15) if tree_data.shade_analysis.canopy_density else 0
enhanced_score = base_score + shade_quality_bonus + canopy_bonus
return min(100, int(enhanced_score))
except Exception as e:
print(f"Error enhancing meditation spots score: {e}")
return self._score_meditation_spots(green_space)
def _score_air_quality_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
"""Enhanced air quality scoring using real tree data."""
if not tree_data:
return self._score_air_quality(green_space)
try:
base_score = green_space.environmental.tree_coverage_percent
if green_space.environmental.natural_surface_percent > 80:
base_score += 20
# More trees = better air quality
tree_density_bonus = min(25, tree_data.metrics.trees_per_hectare // 2)
mature_trees_bonus = min(15, tree_data.metrics.mature_trees_count // 3)
enhanced_score = base_score + tree_density_bonus + mature_trees_bonus
return min(100, int(enhanced_score))
except Exception as e:
print(f"Error enhancing air quality score: {e}")
return self._score_air_quality(green_space)

View File

@ -1,353 +0,0 @@
import json
import math
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any
from datetime import datetime
from geopy.distance import geodesic
from app.models.street_tree import (
StreetTree, TreeDensityMetrics, TreeShadeAnalysis, TreesSearchFilters,
TreesNearLocationResponse, TreeGenus, TreeHealthStatus
)
from app.models.green_space import Coordinates
class StreetTreeService:
"""Service for accessing and analyzing Berlin street trees data."""
def __init__(self):
self._trees_cache = None
self._trees_index = None
self.data_dir = Path("app/data")
def _load_trees(self) -> List[Dict]:
"""Load street trees data from JSON file."""
if self._trees_cache is None:
trees_file = self.data_dir / "processed" / "street_trees.json"
if trees_file.exists():
with open(trees_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self._trees_cache = data.get("street_trees", [])
else:
print("Warning: street_trees.json not found. Run process_street_trees.py first.")
self._trees_cache = []
return self._trees_cache
def _create_tree_from_dict(self, tree_data: Dict) -> StreetTree:
"""Convert tree dictionary to StreetTree model."""
# Map genus to enum
genus_mapping = {
"AHORN": TreeGenus.AHORN,
"LINDE": TreeGenus.LINDE,
"KASTANIE": TreeGenus.KASTANIE,
"ROSSKASTANIE": TreeGenus.ROSSKASTANIE,
"EICHE": TreeGenus.EICHE,
"PLATANE": TreeGenus.PLATANE,
"BIRKE": TreeGenus.BIRKE,
"WEIßDORN": TreeGenus.WEISSDORN,
"PAPPEL": TreeGenus.PAPPEL,
"ESCHE": TreeGenus.ESCHE,
}
genus_german = (tree_data.get('genus_german') or '').upper()
genus_category = genus_mapping.get(genus_german, TreeGenus.OTHER)
# Determine health status based on available data
health_status = TreeHealthStatus.UNKNOWN
if tree_data.get('age'):
age = tree_data['age']
if age > 80:
health_status = TreeHealthStatus.FAIR
elif age > 50:
health_status = TreeHealthStatus.GOOD
elif age > 0:
health_status = TreeHealthStatus.EXCELLENT
return StreetTree(
id=tree_data.get('id', ''),
object_id=tree_data.get('object_id'),
tree_id=tree_data.get('tree_id'),
location_number=tree_data.get('location_number'),
identifier=tree_data.get('identifier'),
object_name=tree_data.get('object_name'),
species_german=tree_data.get('species_german'),
species_botanical=tree_data.get('species_botanical'),
genus_german=tree_data.get('genus_german'),
genus_botanical=tree_data.get('genus_botanical'),
genus_category=genus_category,
coordinates=Coordinates(
lat=tree_data.get('lat', 0.0),
lng=tree_data.get('lng', 0.0)
),
district=tree_data.get('district'),
owner=tree_data.get('owner'),
category=tree_data.get('category'),
street=tree_data.get('street'),
house_number=tree_data.get('house_number'),
address_addition=tree_data.get('address_addition'),
planting_year=tree_data.get('planting_year'),
age=tree_data.get('age'),
crown_diameter_m=tree_data.get('crown_diameter_m'),
trunk_circumference_cm=tree_data.get('trunk_circumference_cm'),
height_m=tree_data.get('height_m'),
health_status=health_status,
last_updated=datetime.now()
)
async def get_trees_near_location(
self,
lat: float,
lng: float,
radius_m: int = 500,
limit: Optional[int] = None
) -> TreesNearLocationResponse:
"""Get street trees within a radius of a location."""
start_time = datetime.now()
trees_data = self._load_trees()
nearby_trees = []
for tree_data in trees_data:
tree_lat = tree_data.get('lat')
tree_lng = tree_data.get('lng')
if tree_lat is None or tree_lng is None:
continue
distance = geodesic((lat, lng), (tree_lat, tree_lng)).meters
if distance <= radius_m:
tree = self._create_tree_from_dict(tree_data)
nearby_trees.append(tree)
if limit and len(nearby_trees) >= limit:
break
# Sort by distance
nearby_trees.sort(
key=lambda t: geodesic((lat, lng), (t.coordinates.lat, t.coordinates.lng)).meters
)
# Calculate metrics
metrics = self._calculate_tree_density_metrics(nearby_trees, radius_m)
shade_analysis = self._analyze_shade_coverage(lat, lng, nearby_trees)
query_time = (datetime.now() - start_time).total_seconds() * 1000
return TreesNearLocationResponse(
location=Coordinates(lat=lat, lng=lng),
radius_m=radius_m,
trees=nearby_trees,
metrics=metrics,
shade_analysis=shade_analysis,
total_found=len(nearby_trees),
query_time_ms=int(query_time)
)
def _calculate_tree_density_metrics(
self,
trees: List[StreetTree],
radius_m: int
) -> TreeDensityMetrics:
"""Calculate tree density and coverage metrics."""
if not trees:
return TreeDensityMetrics()
area_hectares = (math.pi * radius_m * radius_m) / 10000 # Convert to hectares
# Calculate averages
ages = [t.age for t in trees if t.age is not None]
heights = [t.height_m for t in trees if t.height_m is not None]
crowns = [t.crown_diameter_m for t in trees if t.crown_diameter_m is not None]
avg_age = sum(ages) / len(ages) if ages else None
avg_height = sum(heights) / len(heights) if heights else None
avg_crown = sum(crowns) / len(crowns) if crowns else None
# Count mature vs young trees
mature_trees = len([t for t in trees if t.age and t.age > 20])
young_trees = len([t for t in trees if t.age and t.age < 10])
# Calculate shade coverage (rough estimate)
shade_coverage = 0.0
if crowns:
total_crown_area = sum(math.pi * (d/2)**2 for d in crowns if d > 0)
shade_coverage = min(100.0, (total_crown_area / (math.pi * radius_m * radius_m)) * 100)
# Get dominant species
species_count = {}
for tree in trees:
if tree.species_german:
species_count[tree.species_german] = species_count.get(tree.species_german, 0) + 1
dominant_species = sorted(species_count.items(), key=lambda x: x[1], reverse=True)[:3]
dominant_species_names = [species[0] for species in dominant_species]
# Calculate species diversity (simple calculation)
unique_species = len(species_count)
diversity_score = min(100, (unique_species * 10)) if unique_species > 0 else 0
return TreeDensityMetrics(
total_trees=len(trees),
trees_per_hectare=len(trees) / area_hectares if area_hectares > 0 else 0,
average_tree_age=avg_age,
average_height=avg_height,
average_crown_diameter=avg_crown,
shade_coverage_percent=shade_coverage,
mature_trees_count=mature_trees,
young_trees_count=young_trees,
dominant_species=dominant_species_names,
species_diversity_score=diversity_score
)
def _analyze_shade_coverage(
self,
lat: float,
lng: float,
trees: List[StreetTree]
) -> TreeShadeAnalysis:
"""Analyze shade coverage for picnic spot evaluation."""
trees_50m = 0
trees_100m = 0
large_trees = []
for tree in trees:
distance = geodesic((lat, lng), (tree.coordinates.lat, tree.coordinates.lng)).meters
if distance <= 50:
trees_50m += 1
if distance <= 100:
trees_100m += 1
# Consider large trees (good crown diameter or height)
if ((tree.crown_diameter_m and tree.crown_diameter_m > 8) or
(tree.height_m and tree.height_m > 15) or
(tree.age and tree.age > 30)):
large_trees.append(tree)
# Estimate shade coverage
shade_coverage = 0
if trees_50m > 0:
shade_coverage = min(100, trees_50m * 15) # Rough estimate
# Shade quality based on tree density and size
shade_quality = 0
if trees_50m > 3:
shade_quality = 80
elif trees_50m > 1:
shade_quality = 60
elif trees_100m > 5:
shade_quality = 40
elif trees_100m > 2:
shade_quality = 20
# Best shade times (simplified)
best_times = []
if shade_quality > 60:
best_times = ["10:00-12:00", "14:00-16:00"]
elif shade_quality > 30:
best_times = ["11:00-13:00"]
return TreeShadeAnalysis(
has_nearby_trees=len(trees) > 0,
trees_within_50m=trees_50m,
trees_within_100m=trees_100m,
estimated_shade_coverage=shade_coverage,
shade_quality_score=shade_quality,
best_shade_times=best_times,
nearby_large_trees=large_trees[:5], # Limit to 5 for response size
canopy_density=len(large_trees) / max(1, len(trees)) if trees else 0
)
async def search_trees(self, filters: TreesSearchFilters) -> List[StreetTree]:
"""Search trees with filters."""
trees_data = self._load_trees()
filtered_trees = []
for tree_data in trees_data:
# Apply location filter first if specified
if (filters.center_lat and filters.center_lng and filters.within_radius_m):
tree_lat = tree_data.get('lat')
tree_lng = tree_data.get('lng')
if tree_lat is None or tree_lng is None:
continue
distance = geodesic(
(filters.center_lat, filters.center_lng),
(tree_lat, tree_lng)
).meters
if distance > filters.within_radius_m:
continue
# Apply other filters
if filters.species and tree_data.get('species_german') not in filters.species:
continue
if filters.district and tree_data.get('district') != filters.district:
continue
if filters.min_age and (not tree_data.get('age') or tree_data['age'] < filters.min_age):
continue
if filters.max_age and (not tree_data.get('age') or tree_data['age'] > filters.max_age):
continue
if filters.min_height and (not tree_data.get('height_m') or tree_data['height_m'] < filters.min_height):
continue
if filters.max_height and (not tree_data.get('height_m') or tree_data['height_m'] > filters.max_height):
continue
tree = self._create_tree_from_dict(tree_data)
filtered_trees.append(tree)
return filtered_trees
async def get_tree_stats(self) -> Dict[str, Any]:
"""Get overall statistics about Berlin street trees."""
trees_data = self._load_trees()
if not trees_data:
return {"error": "No tree data available"}
# Count by district
district_counts = {}
species_counts = {}
age_distribution = {"0-10": 0, "11-20": 0, "21-50": 0, "51+": 0, "unknown": 0}
for tree in trees_data:
# District stats
district = tree.get('district')
if district:
district_counts[district] = district_counts.get(district, 0) + 1
# Species stats
species = tree.get('species_german')
if species:
species_counts[species] = species_counts.get(species, 0) + 1
# Age distribution
age = tree.get('age')
if age is None:
age_distribution["unknown"] += 1
elif age <= 10:
age_distribution["0-10"] += 1
elif age <= 20:
age_distribution["11-20"] += 1
elif age <= 50:
age_distribution["21-50"] += 1
else:
age_distribution["51+"] += 1
# Top 10 species
top_species = sorted(species_counts.items(), key=lambda x: x[1], reverse=True)[:10]
return {
"total_trees": len(trees_data),
"districts": len(district_counts),
"unique_species": len(species_counts),
"district_counts": district_counts,
"age_distribution": age_distribution,
"top_species": dict(top_species),
"last_updated": datetime.now().isoformat()
}

View File

@ -1,89 +0,0 @@
#!/usr/bin/env python3
"""
Inspect the street trees JSON file structure without loading the entire file.
"""
import json
import sys
def inspect_street_trees():
"""Inspect the street trees JSON file structure."""
file_path = "app/data/processed/street_trees.json"
try:
with open(file_path, 'r', encoding='utf-8') as f:
# Read just the beginning to get metadata
content = f.read(2000) # Read first 2KB
# Find the metadata section
if '"street_trees":' in content:
# Extract metadata before the trees array
metadata_end = content.find('"street_trees":')
metadata_part = content[:metadata_end]
# Try to parse what we can
print("File structure inspection:")
print(f"File size: ~414MB")
# Look for key metadata fields
if '"count":' in content:
count_start = content.find('"count":') + 8
count_end = content.find(',', count_start)
if count_end == -1:
count_end = content.find('}', count_start)
count_str = content[count_start:count_end].strip()
print(f"Tree count: {count_str}")
if '"processed_count":' in content:
proc_start = content.find('"processed_count":') + 18
proc_end = content.find(',', proc_start)
proc_str = content[proc_start:proc_end].strip()
print(f"Processed count: {proc_str}")
if '"skipped_count":' in content:
skip_start = content.find('"skipped_count":') + 16
skip_end = content.find(',', skip_start)
skip_str = content[skip_start:skip_end].strip()
print(f"Skipped count: {skip_str}")
# Now let's find the first tree to see the structure
trees_start = content.find('"street_trees": [')
if trees_start != -1:
# Read a bit more to get the first tree
with open(file_path, 'r', encoding='utf-8') as f:
f.seek(trees_start + 17) # Skip to after the array start
tree_content = f.read(1000) # Read 1KB to get first tree
# Find the first complete tree object
first_brace = tree_content.find('{')
if first_brace != -1:
brace_count = 0
end_pos = first_brace
for i, char in enumerate(tree_content[first_brace:], first_brace):
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
end_pos = i + 1
break
first_tree_str = tree_content[first_brace:end_pos]
try:
first_tree = json.loads(first_tree_str)
print("\nFirst tree structure:")
for key, value in first_tree.items():
print(f" {key}: {type(value).__name__} = {value}")
except json.JSONDecodeError:
print("\nCould not parse first tree, but file exists and has data")
print("\nFile appears to be processed successfully!")
return True
except Exception as e:
print(f"Error inspecting file: {e}")
return False
if __name__ == "__main__":
inspect_street_trees()

View File

@ -1,176 +0,0 @@
#!/usr/bin/env python3
"""
Process Berlin Street Trees (Baumkataster) CSV data.
Converts the raw CSV into a structured JSON format for use in the picnic API.
"""
import pandas as pd
import json
from pathlib import Path
from datetime import datetime
import sys
import os
# Add the app directory to the Python path
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
def process_street_trees():
"""Process the street trees CSV file and create a JSON file."""
# File paths
raw_file = Path("app/data/raw/Baumkataster_Berlin_-1586189165523919690.csv")
processed_file = Path("app/data/processed/street_trees.json")
# Ensure processed directory exists
processed_file.parent.mkdir(parents=True, exist_ok=True)
print(f"Reading street trees data from: {raw_file}")
if not raw_file.exists():
print(f"Error: Raw file not found at {raw_file}")
return False
try:
# Read the CSV file
df = pd.read_csv(raw_file, encoding='utf-8')
print(f"Loaded {len(df)} street trees from CSV")
# Display column names for debugging
print("Columns in CSV:", df.columns.tolist())
# Clean and process the data
trees = []
processed_count = 0
skipped_count = 0
for idx, row in df.iterrows():
try:
# Extract coordinates
x_coord = row.get('x')
y_coord = row.get('y')
# Skip rows with missing coordinates
if pd.isna(x_coord) or pd.isna(y_coord):
skipped_count += 1
continue
# Convert coordinates to lat/lng (assuming they're in EPSG:25833 - ETRS89 / UTM zone 33N)
# For now, we'll use them as-is and convert later if needed
# In a real implementation, you'd use a proper coordinate transformation
# Basic coordinate validation (Berlin area check)
if not (1480000 <= x_coord <= 1520000 and 6870000 <= y_coord <= 6920000):
skipped_count += 1
continue
# Convert UTM to approximate lat/lng for Berlin area
# This is a rough approximation - in production use proper coordinate transformation
lat = 52.3 + (y_coord - 6870000) / 111000 # Rough conversion
lng = 13.0 + (x_coord - 1480000) / 71000 # Rough conversion
# Validate converted coordinates
if not (52.3 <= lat <= 52.7 and 13.0 <= lng <= 13.8):
skipped_count += 1
continue
# Extract tree information
tree_data = {
"id": f"tree_{processed_count + 1}",
"object_id": row.get('OBJECTID'),
"tree_id": row.get('Baum ID'),
"location_number": row.get('Standort Nr'),
"identifier": row.get('Kennzeich'),
"object_name": row.get('Objektname'),
"species_german": row.get('Art'),
"species_botanical": row.get('Art Botanisch'),
"genus_german": row.get('Gattung'),
"genus_botanical": row.get('Gattung Botanisch'),
"planting_year": row.get('Pflanzjahr'),
"age": row.get('Standalter'),
"crown_diameter_m": row.get('Krone Durchschnitt (m)'),
"trunk_circumference_cm": row.get('Stammumfang (cm)'),
"height_m": row.get('Höhe (m)'),
"district": row.get('Bezirk'),
"owner": row.get('Eigentümer'),
"category": row.get('Kategorie'),
"street": row.get('Straße'),
"house_number": row.get('Haus Nr'),
"address_addition": row.get('Adresszusatz'),
"lat": round(lat, 6),
"lng": round(lng, 6),
"x_coord": x_coord,
"y_coord": y_coord
}
# Clean up None values and convert to appropriate types
for key, value in tree_data.items():
if pd.isna(value):
tree_data[key] = None
elif key in ['planting_year', 'age', 'trunk_circumference_cm'] and value is not None:
try:
tree_data[key] = int(float(value))
except (ValueError, TypeError):
tree_data[key] = None
elif key in ['crown_diameter_m', 'height_m'] and value is not None:
try:
tree_data[key] = float(value)
except (ValueError, TypeError):
tree_data[key] = None
elif isinstance(value, str):
tree_data[key] = value.strip()
trees.append(tree_data)
processed_count += 1
# Progress indicator
if processed_count % 10000 == 0:
print(f"Processed {processed_count} trees...")
except Exception as e:
print(f"Error processing row {idx}: {e}")
skipped_count += 1
continue
# Create the final data structure
output_data = {
"street_trees": trees,
"count": len(trees),
"processed_count": processed_count,
"skipped_count": skipped_count,
"last_updated": datetime.now().isoformat(),
"source": "baumkataster_csv",
"coordinate_system": "EPSG:25833_converted_to_WGS84",
"note": "Coordinates converted from UTM to approximate WGS84. Use proper coordinate transformation in production."
}
# Write to JSON file
print(f"Writing {len(trees)} trees to: {processed_file}")
with open(processed_file, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"Successfully processed street trees data:")
print(f" - Total rows in CSV: {len(df)}")
print(f" - Successfully processed: {processed_count}")
print(f" - Skipped (invalid data): {skipped_count}")
print(f" - Output file: {processed_file}")
# Display some sample data
if trees:
print("\nSample tree data:")
sample_tree = trees[0]
for key, value in sample_tree.items():
print(f" {key}: {value}")
return True
except Exception as e:
print(f"Error processing street trees data: {e}")
return False
if __name__ == "__main__":
success = process_street_trees()
if success:
print("\nStreet trees processing completed successfully!")
else:
print("\nStreet trees processing failed!")
sys.exit(1)