Compare commits
No commits in common. "c14f5ead380a51677021a92ae8102e09923754a6" and "59e27534b0f0dd3eb2b5d5b878b636a4319e4bf9" have entirely different histories.
c14f5ead38
...
59e27534b0
|
@ -1,133 +0,0 @@
|
|||
from pydantic import BaseModel, Field
|
||||
from typing import Optional, List, Dict, Any
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
|
||||
from .green_space import Coordinates
|
||||
|
||||
class TreeCategory(str, Enum):
|
||||
STREET_TREE = "street_tree"
|
||||
PARK_TREE = "park_tree"
|
||||
ANLAGEBAUM = "anlagebaum"
|
||||
ALLEE_TREE = "allee_tree"
|
||||
|
||||
class TreeHealthStatus(str, Enum):
|
||||
EXCELLENT = "excellent"
|
||||
GOOD = "good"
|
||||
FAIR = "fair"
|
||||
POOR = "poor"
|
||||
CRITICAL = "critical"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
class TreeGenus(str, Enum):
|
||||
AHORN = "ahorn"
|
||||
LINDE = "linde"
|
||||
KASTANIE = "kastanie"
|
||||
EICHE = "eiche"
|
||||
PLATANE = "platane"
|
||||
BIRKE = "birke"
|
||||
WEISSDORN = "weissdorn"
|
||||
ROSSKASTANIE = "rosskastanie"
|
||||
PAPPEL = "pappel"
|
||||
ESCHE = "esche"
|
||||
OTHER = "other"
|
||||
|
||||
class StreetTree(BaseModel):
|
||||
"""Individual street tree model based on Berlin Baumkataster data."""
|
||||
|
||||
id: str
|
||||
object_id: Optional[int] = None
|
||||
tree_id: Optional[str] = None
|
||||
location_number: Optional[str] = None
|
||||
identifier: Optional[str] = None
|
||||
|
||||
object_name: Optional[str] = None
|
||||
species_german: Optional[str] = None
|
||||
species_botanical: Optional[str] = None
|
||||
genus_german: Optional[str] = None
|
||||
genus_botanical: Optional[str] = None
|
||||
genus_category: Optional[TreeGenus] = None
|
||||
|
||||
coordinates: Coordinates
|
||||
district: Optional[str] = None
|
||||
owner: Optional[str] = None
|
||||
category: Optional[str] = None
|
||||
|
||||
street: Optional[str] = None
|
||||
house_number: Optional[str] = None
|
||||
address_addition: Optional[str] = None
|
||||
full_address: Optional[str] = None
|
||||
|
||||
planting_year: Optional[int] = None
|
||||
age: Optional[int] = None
|
||||
crown_diameter_m: Optional[float] = None
|
||||
trunk_circumference_cm: Optional[int] = None
|
||||
height_m: Optional[float] = None
|
||||
|
||||
health_status: TreeHealthStatus = TreeHealthStatus.UNKNOWN
|
||||
confidence_score: int = Field(80, ge=0, le=100)
|
||||
|
||||
last_updated: datetime = Field(default_factory=datetime.now)
|
||||
|
||||
class TreeDensityMetrics(BaseModel):
|
||||
"""Tree density and coverage metrics for an area."""
|
||||
|
||||
total_trees: int = 0
|
||||
trees_per_hectare: float = 0.0
|
||||
average_tree_age: Optional[float] = None
|
||||
average_height: Optional[float] = None
|
||||
average_crown_diameter: Optional[float] = None
|
||||
|
||||
shade_coverage_percent: float = Field(0.0, ge=0, le=100)
|
||||
mature_trees_count: int = 0 # Trees older than 20 years
|
||||
young_trees_count: int = 0 # Trees younger than 10 years
|
||||
|
||||
dominant_species: List[str] = []
|
||||
species_diversity_score: int = Field(0, ge=0, le=100)
|
||||
|
||||
class TreeShadeAnalysis(BaseModel):
|
||||
"""Shade analysis for picnic spot evaluation."""
|
||||
|
||||
has_nearby_trees: bool = False
|
||||
trees_within_50m: int = 0
|
||||
trees_within_100m: int = 0
|
||||
|
||||
estimated_shade_coverage: int = Field(0, ge=0, le=100)
|
||||
shade_quality_score: int = Field(0, ge=0, le=100)
|
||||
|
||||
best_shade_times: List[str] = [] # Time periods with best shade
|
||||
seasonal_shade_variation: Optional[str] = None
|
||||
|
||||
nearby_large_trees: List[StreetTree] = []
|
||||
canopy_density: Optional[float] = None
|
||||
|
||||
class TreesSearchFilters(BaseModel):
|
||||
"""Filters for searching trees."""
|
||||
|
||||
species: Optional[List[str]] = None
|
||||
genus: Optional[List[TreeGenus]] = None
|
||||
min_age: Optional[int] = None
|
||||
max_age: Optional[int] = None
|
||||
min_height: Optional[float] = None
|
||||
max_height: Optional[float] = None
|
||||
min_crown_diameter: Optional[float] = None
|
||||
district: Optional[str] = None
|
||||
category: Optional[str] = None
|
||||
|
||||
within_radius_m: Optional[int] = None
|
||||
center_lat: Optional[float] = None
|
||||
center_lng: Optional[float] = None
|
||||
|
||||
class TreesNearLocationResponse(BaseModel):
|
||||
"""Response for trees near a location query."""
|
||||
|
||||
location: Coordinates
|
||||
radius_m: int
|
||||
trees: List[StreetTree]
|
||||
metrics: TreeDensityMetrics
|
||||
shade_analysis: TreeShadeAnalysis
|
||||
|
||||
total_found: int
|
||||
query_time_ms: Optional[int] = None
|
||||
data_source: str = "baumkataster"
|
||||
last_updated: datetime = Field(default_factory=datetime.now)
|
|
@ -11,7 +11,6 @@ from app.models.green_space import (
|
|||
EnvironmentalFeatures, AccessibilityFeatures, RecreationFeatures,
|
||||
NoiseLevel, LocationScore
|
||||
)
|
||||
from app.services.street_tree_service import StreetTreeService
|
||||
|
||||
class BerlinDataService:
|
||||
"""Service for accessing Berlin open data and external APIs."""
|
||||
|
@ -20,9 +19,7 @@ class BerlinDataService:
|
|||
self.cache = {}
|
||||
self.last_refresh = None
|
||||
self._toilets_cache = None
|
||||
self._street_trees_index = None
|
||||
self.data_dir = Path("app/data")
|
||||
self.street_tree_service = StreetTreeService()
|
||||
|
||||
async def search_green_spaces(
|
||||
self,
|
||||
|
@ -74,7 +71,7 @@ class BerlinDataService:
|
|||
return None
|
||||
|
||||
async def get_green_space_at_location(self, lat: float, lng: float) -> Optional[GreenSpace]:
|
||||
"""Check if a location is within a green space and enhance it with real tree data."""
|
||||
"""Check if a location is within a green space."""
|
||||
spaces = await self._get_mock_green_spaces()
|
||||
for space in spaces:
|
||||
# Simple distance check - in reality you'd use proper polygon containment
|
||||
|
@ -82,10 +79,8 @@ class BerlinDataService:
|
|||
(lat, lng),
|
||||
(space.coordinates.lat, space.coordinates.lng)
|
||||
).meters
|
||||
if distance < 500: # Within 500m of center (larger radius for better coverage)
|
||||
# Enhance the green space with real tree data
|
||||
enhanced_space = await self._enhance_green_space_with_real_trees(space, lat, lng)
|
||||
return enhanced_space
|
||||
if distance < 100: # Within 100m of center
|
||||
return space
|
||||
return None
|
||||
|
||||
async def get_green_spaces_within_radius(
|
||||
|
@ -343,57 +338,6 @@ class BerlinDataService:
|
|||
|
||||
return sorted(nearby_toilets, key=lambda x: x['distance_meters'])
|
||||
|
||||
async def _enhance_green_space_with_real_trees(self, green_space: GreenSpace, actual_lat: float, actual_lng: float) -> GreenSpace:
|
||||
"""Enhance green space environmental features with real tree data."""
|
||||
try:
|
||||
# Get real tree data for the actual location (not just the park center)
|
||||
tree_response = await self.street_tree_service.get_trees_near_location(
|
||||
actual_lat, actual_lng, radius_m=300
|
||||
)
|
||||
|
||||
# Calculate enhanced environmental features using real tree data
|
||||
tree_coverage = max(
|
||||
green_space.environmental.tree_coverage_percent,
|
||||
int(tree_response.shade_analysis.estimated_shade_coverage)
|
||||
)
|
||||
|
||||
shade_quality = max(
|
||||
green_space.environmental.shade_quality,
|
||||
tree_response.shade_analysis.shade_quality_score
|
||||
)
|
||||
|
||||
wildlife_diversity = max(
|
||||
green_space.environmental.wildlife_diversity_score,
|
||||
tree_response.metrics.species_diversity_score
|
||||
)
|
||||
|
||||
# Create enhanced environmental features
|
||||
enhanced_environmental = EnvironmentalFeatures(
|
||||
tree_coverage_percent=min(100, tree_coverage),
|
||||
shade_quality=min(100, shade_quality),
|
||||
noise_level=green_space.environmental.noise_level,
|
||||
wildlife_diversity_score=min(100, wildlife_diversity),
|
||||
water_features=green_space.environmental.water_features,
|
||||
natural_surface_percent=green_space.environmental.natural_surface_percent
|
||||
)
|
||||
|
||||
# Create enhanced green space with real tree data
|
||||
enhanced_space = green_space.model_copy(update={
|
||||
"environmental": enhanced_environmental,
|
||||
"coordinates": Coordinates(lat=actual_lat, lng=actual_lng) # Use actual query location
|
||||
})
|
||||
|
||||
# Update data sources to indicate real tree data is used
|
||||
if "real_street_trees" not in enhanced_space.data_sources:
|
||||
enhanced_space.data_sources.append("real_street_trees")
|
||||
|
||||
return enhanced_space
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing green space with real tree data: {e}")
|
||||
# Return original space if enhancement fails
|
||||
return green_space
|
||||
|
||||
async def _get_mock_green_spaces(self) -> List[GreenSpace]:
|
||||
"""Get mock green spaces data for development."""
|
||||
# This would be replaced with real data fetching in production
|
||||
|
|
|
@ -8,14 +8,12 @@ from app.models.green_space import (
|
|||
Coordinates, Amenity, AmenityType
|
||||
)
|
||||
from app.services.berlin_data_service import BerlinDataService
|
||||
from app.services.street_tree_service import StreetTreeService
|
||||
|
||||
class ScoringEngine:
|
||||
"""Dynamic scoring engine for green spaces based on personality preferences."""
|
||||
|
||||
def __init__(self):
|
||||
self.berlin_data = BerlinDataService()
|
||||
self.street_tree_service = StreetTreeService()
|
||||
self.personality_weights = self._initialize_personality_weights()
|
||||
|
||||
def _initialize_personality_weights(self) -> Dict[str, Dict[str, float]]:
|
||||
|
@ -99,12 +97,9 @@ class ScoringEngine:
|
|||
if not weights:
|
||||
raise ValueError(f"Unknown personality type: {personality}")
|
||||
|
||||
# Pre-fetch tree data once for all calculations
|
||||
tree_data = await self._fetch_tree_data_once(green_space)
|
||||
|
||||
# Calculate component scores with cached tree data
|
||||
# Calculate component scores
|
||||
component_scores = await self._calculate_component_scores(
|
||||
green_space, personality, user_location, tree_data
|
||||
green_space, personality, user_location
|
||||
)
|
||||
|
||||
# Calculate weighted final score
|
||||
|
@ -137,26 +132,11 @@ class ScoringEngine:
|
|||
recommendations=recommendations
|
||||
)
|
||||
|
||||
async def _fetch_tree_data_once(self, green_space: GreenSpace) -> Optional[Any]:
|
||||
"""Fetch tree data once and reuse for all calculations."""
|
||||
try:
|
||||
# Use the largest radius needed across all methods (400m)
|
||||
tree_response = await self.street_tree_service.get_trees_near_location(
|
||||
green_space.coordinates.lat,
|
||||
green_space.coordinates.lng,
|
||||
radius_m=400
|
||||
)
|
||||
return tree_response
|
||||
except Exception as e:
|
||||
print(f"Error fetching tree data: {e}")
|
||||
return None
|
||||
|
||||
async def _calculate_component_scores(
|
||||
self,
|
||||
green_space: GreenSpace,
|
||||
personality: str,
|
||||
user_location: Optional[Tuple[float, float]] = None,
|
||||
tree_data: Optional[Any] = None
|
||||
user_location: Optional[Tuple[float, float]] = None
|
||||
) -> Dict[str, int]:
|
||||
"""Calculate individual component scores."""
|
||||
scores = {}
|
||||
|
@ -169,7 +149,7 @@ class ScoringEngine:
|
|||
# Personality-specific components
|
||||
if personality == "little_adventurers":
|
||||
scores["playground_quality"] = green_space.recreation.playground_quality
|
||||
scores["shade_quality"] = self._score_shade_quality_with_trees(green_space, tree_data)
|
||||
scores["shade_quality"] = green_space.environmental.shade_quality
|
||||
scores["toilet_proximity"] = await self._score_toilet_proximity(green_space)
|
||||
scores["family_amenities"] = await self._score_family_amenities(green_space)
|
||||
|
||||
|
@ -189,11 +169,11 @@ class ScoringEngine:
|
|||
|
||||
elif personality == "zen_masters":
|
||||
scores["quietness"] = self._score_quietness(green_space)
|
||||
scores["nature_immersion"] = self._score_nature_immersion_with_trees(green_space, tree_data)
|
||||
scores["nature_immersion"] = self._score_nature_immersion(green_space)
|
||||
scores["crowd_density"] = await self._score_crowd_density(green_space)
|
||||
scores["water_features"] = 100 if green_space.environmental.water_features else 0
|
||||
scores["meditation_spots"] = self._score_meditation_spots_with_trees(green_space, tree_data)
|
||||
scores["air_quality"] = self._score_air_quality_with_trees(green_space, tree_data)
|
||||
scores["meditation_spots"] = self._score_meditation_spots(green_space)
|
||||
scores["air_quality"] = await self._score_air_quality(green_space)
|
||||
|
||||
elif personality == "active_lifestyle":
|
||||
scores["fitness_facilities"] = 100 if green_space.recreation.sports_facilities else 0
|
||||
|
@ -202,11 +182,11 @@ class ScoringEngine:
|
|||
scores["terrain_variety"] = self._score_terrain_variety(green_space)
|
||||
|
||||
elif personality == "wildlife_lover":
|
||||
scores["wildlife_diversity"] = self._score_wildlife_diversity_with_trees(green_space, tree_data)
|
||||
scores["natural_habitat"] = self._score_natural_habitat_with_trees(green_space, tree_data)
|
||||
scores["wildlife_diversity"] = green_space.environmental.wildlife_diversity_score
|
||||
scores["natural_habitat"] = self._score_natural_habitat(green_space)
|
||||
scores["water_features"] = 100 if green_space.environmental.water_features else 0
|
||||
scores["tree_coverage"] = self._score_tree_coverage_with_real_data(green_space, tree_data)
|
||||
scores["observation_spots"] = self._score_observation_spots_with_trees(green_space, tree_data)
|
||||
scores["tree_coverage"] = green_space.environmental.tree_coverage_percent
|
||||
scores["observation_spots"] = self._score_observation_spots(green_space)
|
||||
|
||||
elif personality == "art_nerd":
|
||||
scores["cultural_proximity"] = await self._score_cultural_proximity(green_space)
|
||||
|
@ -409,7 +389,7 @@ class ScoringEngine:
|
|||
score += 25
|
||||
return min(100, score)
|
||||
|
||||
def _score_air_quality(self, green_space: GreenSpace) -> int:
|
||||
async def _score_air_quality(self, green_space: GreenSpace) -> int:
|
||||
"""Score air quality."""
|
||||
score = green_space.environmental.tree_coverage_percent
|
||||
if green_space.environmental.natural_surface_percent > 80:
|
||||
|
@ -588,7 +568,7 @@ class ScoringEngine:
|
|||
personality: str,
|
||||
radius: int
|
||||
) -> Dict[str, Any]:
|
||||
"""Score a specific location with optimized performance."""
|
||||
"""Score a specific location."""
|
||||
# Check if location is in a green space
|
||||
green_space = await self.berlin_data.get_green_space_at_location(lat, lng)
|
||||
|
||||
|
@ -600,7 +580,7 @@ class ScoringEngine:
|
|||
"personality": personality
|
||||
}
|
||||
|
||||
# Score the green space (this now uses cached tree data internally)
|
||||
# Score the green space
|
||||
personality_score = await self.score_green_space(green_space, personality, (lat, lng))
|
||||
|
||||
return {
|
||||
|
@ -643,195 +623,3 @@ class ScoringEngine:
|
|||
locations.append(location)
|
||||
|
||||
return locations
|
||||
|
||||
# === ENHANCED TREE-BASED SCORING METHODS ===
|
||||
|
||||
def _score_tree_coverage_with_real_data(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
|
||||
"""Enhanced tree coverage scoring using real street tree data."""
|
||||
if not tree_data:
|
||||
return green_space.environmental.tree_coverage_percent
|
||||
|
||||
try:
|
||||
# Combine base environmental score with real tree data
|
||||
base_score = green_space.environmental.tree_coverage_percent
|
||||
tree_shade_coverage = tree_data.shade_analysis.estimated_shade_coverage
|
||||
|
||||
# Use the higher of the two scores, with bonus for high tree density
|
||||
enhanced_score = max(base_score, tree_shade_coverage)
|
||||
|
||||
# Bonus for high tree density
|
||||
if tree_data.metrics.trees_per_hectare > 50:
|
||||
enhanced_score = min(100, enhanced_score + 15)
|
||||
elif tree_data.metrics.trees_per_hectare > 20:
|
||||
enhanced_score = min(100, enhanced_score + 10)
|
||||
|
||||
return int(enhanced_score)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing tree coverage score: {e}")
|
||||
return green_space.environmental.tree_coverage_percent
|
||||
|
||||
def _score_wildlife_diversity_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
|
||||
"""Enhanced wildlife diversity scoring using real tree species data."""
|
||||
if not tree_data:
|
||||
return green_space.environmental.wildlife_diversity_score
|
||||
|
||||
try:
|
||||
base_score = green_space.environmental.wildlife_diversity_score
|
||||
tree_diversity = tree_data.metrics.species_diversity_score
|
||||
mature_trees_bonus = min(20, tree_data.metrics.mature_trees_count)
|
||||
|
||||
# Combine scores with weighting
|
||||
enhanced_score = int((base_score * 0.6) + (tree_diversity * 0.4) + mature_trees_bonus)
|
||||
|
||||
return min(100, enhanced_score)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing wildlife diversity score: {e}")
|
||||
return green_space.environmental.wildlife_diversity_score
|
||||
|
||||
def _score_shade_quality_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
|
||||
"""Enhanced shade quality scoring using real tree data."""
|
||||
if not tree_data:
|
||||
return green_space.environmental.shade_quality
|
||||
|
||||
try:
|
||||
base_shade = green_space.environmental.shade_quality
|
||||
tree_shade_quality = tree_data.shade_analysis.shade_quality_score
|
||||
|
||||
# Use the better of the two scores
|
||||
enhanced_score = max(base_shade, tree_shade_quality)
|
||||
|
||||
# Bonus for large nearby trees
|
||||
large_trees_count = len(tree_data.shade_analysis.nearby_large_trees)
|
||||
if large_trees_count > 5:
|
||||
enhanced_score = min(100, enhanced_score + 15)
|
||||
elif large_trees_count > 2:
|
||||
enhanced_score = min(100, enhanced_score + 10)
|
||||
|
||||
return int(enhanced_score)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing shade quality score: {e}")
|
||||
return green_space.environmental.shade_quality
|
||||
|
||||
def _score_nature_immersion_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
|
||||
"""Enhanced nature immersion scoring using real tree data."""
|
||||
if not tree_data:
|
||||
return self._score_nature_immersion(green_space)
|
||||
|
||||
try:
|
||||
# Base score from existing method
|
||||
base_score = green_space.environmental.tree_coverage_percent
|
||||
base_score += green_space.environmental.natural_surface_percent // 2
|
||||
if green_space.environmental.water_features:
|
||||
base_score += 15
|
||||
|
||||
# Enhancement from tree data
|
||||
tree_density_score = min(30, tree_data.metrics.trees_per_hectare)
|
||||
canopy_density_bonus = int(tree_data.shade_analysis.canopy_density * 20) if tree_data.shade_analysis.canopy_density else 0
|
||||
species_diversity_bonus = min(15, tree_data.metrics.species_diversity_score // 5)
|
||||
|
||||
enhanced_score = base_score + tree_density_score + canopy_density_bonus + species_diversity_bonus
|
||||
|
||||
return min(100, int(enhanced_score))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing nature immersion score: {e}")
|
||||
return self._score_nature_immersion(green_space)
|
||||
|
||||
def _score_natural_habitat_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
|
||||
"""Enhanced natural habitat scoring using real tree data."""
|
||||
if not tree_data:
|
||||
return self._score_natural_habitat(green_space)
|
||||
|
||||
try:
|
||||
base_score = green_space.environmental.tree_coverage_percent
|
||||
base_score += green_space.environmental.natural_surface_percent // 2
|
||||
if green_space.environmental.water_features:
|
||||
base_score += 15
|
||||
|
||||
# Tree habitat quality factors
|
||||
mature_trees_score = min(25, tree_data.metrics.mature_trees_count // 2)
|
||||
species_diversity_score = min(20, tree_data.metrics.species_diversity_score // 3)
|
||||
|
||||
enhanced_score = base_score + mature_trees_score + species_diversity_score
|
||||
|
||||
return min(100, int(enhanced_score))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing natural habitat score: {e}")
|
||||
return self._score_natural_habitat(green_space)
|
||||
|
||||
def _score_observation_spots_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
|
||||
"""Enhanced wildlife observation scoring using real tree data."""
|
||||
if not tree_data:
|
||||
return self._score_observation_spots(green_space)
|
||||
|
||||
try:
|
||||
base_score = green_space.environmental.tree_coverage_percent // 2
|
||||
if green_space.environmental.water_features:
|
||||
base_score += 30
|
||||
if green_space.environmental.noise_level.value <= 2:
|
||||
base_score += 20
|
||||
|
||||
# Large trees provide better observation opportunities
|
||||
large_trees_count = len(tree_data.shade_analysis.nearby_large_trees)
|
||||
observation_bonus = min(25, large_trees_count * 3)
|
||||
|
||||
# Species diversity attracts more wildlife to observe
|
||||
diversity_bonus = min(15, tree_data.metrics.species_diversity_score // 4)
|
||||
|
||||
enhanced_score = base_score + observation_bonus + diversity_bonus
|
||||
|
||||
return min(100, int(enhanced_score))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing observation spots score: {e}")
|
||||
return self._score_observation_spots(green_space)
|
||||
|
||||
def _score_meditation_spots_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
|
||||
"""Enhanced meditation spots scoring using real tree data."""
|
||||
if not tree_data:
|
||||
return self._score_meditation_spots(green_space)
|
||||
|
||||
try:
|
||||
base_score = green_space.environmental.tree_coverage_percent // 2
|
||||
if green_space.environmental.water_features:
|
||||
base_score += 25
|
||||
if green_space.environmental.noise_level.value <= 2:
|
||||
base_score += 25
|
||||
|
||||
# Trees enhance meditation through natural sounds and shade
|
||||
shade_quality_bonus = min(20, tree_data.shade_analysis.shade_quality_score // 4)
|
||||
canopy_bonus = int(tree_data.shade_analysis.canopy_density * 15) if tree_data.shade_analysis.canopy_density else 0
|
||||
|
||||
enhanced_score = base_score + shade_quality_bonus + canopy_bonus
|
||||
|
||||
return min(100, int(enhanced_score))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing meditation spots score: {e}")
|
||||
return self._score_meditation_spots(green_space)
|
||||
|
||||
def _score_air_quality_with_trees(self, green_space: GreenSpace, tree_data: Optional[Any] = None) -> int:
|
||||
"""Enhanced air quality scoring using real tree data."""
|
||||
if not tree_data:
|
||||
return self._score_air_quality(green_space)
|
||||
|
||||
try:
|
||||
base_score = green_space.environmental.tree_coverage_percent
|
||||
if green_space.environmental.natural_surface_percent > 80:
|
||||
base_score += 20
|
||||
|
||||
# More trees = better air quality
|
||||
tree_density_bonus = min(25, tree_data.metrics.trees_per_hectare // 2)
|
||||
mature_trees_bonus = min(15, tree_data.metrics.mature_trees_count // 3)
|
||||
|
||||
enhanced_score = base_score + tree_density_bonus + mature_trees_bonus
|
||||
|
||||
return min(100, int(enhanced_score))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error enhancing air quality score: {e}")
|
||||
return self._score_air_quality(green_space)
|
||||
|
|
|
@ -1,353 +0,0 @@
|
|||
import json
|
||||
import math
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple, Dict, Any
|
||||
from datetime import datetime
|
||||
from geopy.distance import geodesic
|
||||
|
||||
from app.models.street_tree import (
|
||||
StreetTree, TreeDensityMetrics, TreeShadeAnalysis, TreesSearchFilters,
|
||||
TreesNearLocationResponse, TreeGenus, TreeHealthStatus
|
||||
)
|
||||
from app.models.green_space import Coordinates
|
||||
|
||||
class StreetTreeService:
|
||||
"""Service for accessing and analyzing Berlin street trees data."""
|
||||
|
||||
def __init__(self):
|
||||
self._trees_cache = None
|
||||
self._trees_index = None
|
||||
self.data_dir = Path("app/data")
|
||||
|
||||
def _load_trees(self) -> List[Dict]:
|
||||
"""Load street trees data from JSON file."""
|
||||
if self._trees_cache is None:
|
||||
trees_file = self.data_dir / "processed" / "street_trees.json"
|
||||
if trees_file.exists():
|
||||
with open(trees_file, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
self._trees_cache = data.get("street_trees", [])
|
||||
else:
|
||||
print("Warning: street_trees.json not found. Run process_street_trees.py first.")
|
||||
self._trees_cache = []
|
||||
return self._trees_cache
|
||||
|
||||
def _create_tree_from_dict(self, tree_data: Dict) -> StreetTree:
|
||||
"""Convert tree dictionary to StreetTree model."""
|
||||
|
||||
# Map genus to enum
|
||||
genus_mapping = {
|
||||
"AHORN": TreeGenus.AHORN,
|
||||
"LINDE": TreeGenus.LINDE,
|
||||
"KASTANIE": TreeGenus.KASTANIE,
|
||||
"ROSSKASTANIE": TreeGenus.ROSSKASTANIE,
|
||||
"EICHE": TreeGenus.EICHE,
|
||||
"PLATANE": TreeGenus.PLATANE,
|
||||
"BIRKE": TreeGenus.BIRKE,
|
||||
"WEIßDORN": TreeGenus.WEISSDORN,
|
||||
"PAPPEL": TreeGenus.PAPPEL,
|
||||
"ESCHE": TreeGenus.ESCHE,
|
||||
}
|
||||
|
||||
genus_german = (tree_data.get('genus_german') or '').upper()
|
||||
genus_category = genus_mapping.get(genus_german, TreeGenus.OTHER)
|
||||
|
||||
# Determine health status based on available data
|
||||
health_status = TreeHealthStatus.UNKNOWN
|
||||
if tree_data.get('age'):
|
||||
age = tree_data['age']
|
||||
if age > 80:
|
||||
health_status = TreeHealthStatus.FAIR
|
||||
elif age > 50:
|
||||
health_status = TreeHealthStatus.GOOD
|
||||
elif age > 0:
|
||||
health_status = TreeHealthStatus.EXCELLENT
|
||||
|
||||
return StreetTree(
|
||||
id=tree_data.get('id', ''),
|
||||
object_id=tree_data.get('object_id'),
|
||||
tree_id=tree_data.get('tree_id'),
|
||||
location_number=tree_data.get('location_number'),
|
||||
identifier=tree_data.get('identifier'),
|
||||
object_name=tree_data.get('object_name'),
|
||||
species_german=tree_data.get('species_german'),
|
||||
species_botanical=tree_data.get('species_botanical'),
|
||||
genus_german=tree_data.get('genus_german'),
|
||||
genus_botanical=tree_data.get('genus_botanical'),
|
||||
genus_category=genus_category,
|
||||
coordinates=Coordinates(
|
||||
lat=tree_data.get('lat', 0.0),
|
||||
lng=tree_data.get('lng', 0.0)
|
||||
),
|
||||
district=tree_data.get('district'),
|
||||
owner=tree_data.get('owner'),
|
||||
category=tree_data.get('category'),
|
||||
street=tree_data.get('street'),
|
||||
house_number=tree_data.get('house_number'),
|
||||
address_addition=tree_data.get('address_addition'),
|
||||
planting_year=tree_data.get('planting_year'),
|
||||
age=tree_data.get('age'),
|
||||
crown_diameter_m=tree_data.get('crown_diameter_m'),
|
||||
trunk_circumference_cm=tree_data.get('trunk_circumference_cm'),
|
||||
height_m=tree_data.get('height_m'),
|
||||
health_status=health_status,
|
||||
last_updated=datetime.now()
|
||||
)
|
||||
|
||||
async def get_trees_near_location(
|
||||
self,
|
||||
lat: float,
|
||||
lng: float,
|
||||
radius_m: int = 500,
|
||||
limit: Optional[int] = None
|
||||
) -> TreesNearLocationResponse:
|
||||
"""Get street trees within a radius of a location."""
|
||||
start_time = datetime.now()
|
||||
|
||||
trees_data = self._load_trees()
|
||||
nearby_trees = []
|
||||
|
||||
for tree_data in trees_data:
|
||||
tree_lat = tree_data.get('lat')
|
||||
tree_lng = tree_data.get('lng')
|
||||
|
||||
if tree_lat is None or tree_lng is None:
|
||||
continue
|
||||
|
||||
distance = geodesic((lat, lng), (tree_lat, tree_lng)).meters
|
||||
if distance <= radius_m:
|
||||
tree = self._create_tree_from_dict(tree_data)
|
||||
nearby_trees.append(tree)
|
||||
|
||||
if limit and len(nearby_trees) >= limit:
|
||||
break
|
||||
|
||||
# Sort by distance
|
||||
nearby_trees.sort(
|
||||
key=lambda t: geodesic((lat, lng), (t.coordinates.lat, t.coordinates.lng)).meters
|
||||
)
|
||||
|
||||
# Calculate metrics
|
||||
metrics = self._calculate_tree_density_metrics(nearby_trees, radius_m)
|
||||
shade_analysis = self._analyze_shade_coverage(lat, lng, nearby_trees)
|
||||
|
||||
query_time = (datetime.now() - start_time).total_seconds() * 1000
|
||||
|
||||
return TreesNearLocationResponse(
|
||||
location=Coordinates(lat=lat, lng=lng),
|
||||
radius_m=radius_m,
|
||||
trees=nearby_trees,
|
||||
metrics=metrics,
|
||||
shade_analysis=shade_analysis,
|
||||
total_found=len(nearby_trees),
|
||||
query_time_ms=int(query_time)
|
||||
)
|
||||
|
||||
def _calculate_tree_density_metrics(
|
||||
self,
|
||||
trees: List[StreetTree],
|
||||
radius_m: int
|
||||
) -> TreeDensityMetrics:
|
||||
"""Calculate tree density and coverage metrics."""
|
||||
if not trees:
|
||||
return TreeDensityMetrics()
|
||||
|
||||
area_hectares = (math.pi * radius_m * radius_m) / 10000 # Convert to hectares
|
||||
|
||||
# Calculate averages
|
||||
ages = [t.age for t in trees if t.age is not None]
|
||||
heights = [t.height_m for t in trees if t.height_m is not None]
|
||||
crowns = [t.crown_diameter_m for t in trees if t.crown_diameter_m is not None]
|
||||
|
||||
avg_age = sum(ages) / len(ages) if ages else None
|
||||
avg_height = sum(heights) / len(heights) if heights else None
|
||||
avg_crown = sum(crowns) / len(crowns) if crowns else None
|
||||
|
||||
# Count mature vs young trees
|
||||
mature_trees = len([t for t in trees if t.age and t.age > 20])
|
||||
young_trees = len([t for t in trees if t.age and t.age < 10])
|
||||
|
||||
# Calculate shade coverage (rough estimate)
|
||||
shade_coverage = 0.0
|
||||
if crowns:
|
||||
total_crown_area = sum(math.pi * (d/2)**2 for d in crowns if d > 0)
|
||||
shade_coverage = min(100.0, (total_crown_area / (math.pi * radius_m * radius_m)) * 100)
|
||||
|
||||
# Get dominant species
|
||||
species_count = {}
|
||||
for tree in trees:
|
||||
if tree.species_german:
|
||||
species_count[tree.species_german] = species_count.get(tree.species_german, 0) + 1
|
||||
|
||||
dominant_species = sorted(species_count.items(), key=lambda x: x[1], reverse=True)[:3]
|
||||
dominant_species_names = [species[0] for species in dominant_species]
|
||||
|
||||
# Calculate species diversity (simple calculation)
|
||||
unique_species = len(species_count)
|
||||
diversity_score = min(100, (unique_species * 10)) if unique_species > 0 else 0
|
||||
|
||||
return TreeDensityMetrics(
|
||||
total_trees=len(trees),
|
||||
trees_per_hectare=len(trees) / area_hectares if area_hectares > 0 else 0,
|
||||
average_tree_age=avg_age,
|
||||
average_height=avg_height,
|
||||
average_crown_diameter=avg_crown,
|
||||
shade_coverage_percent=shade_coverage,
|
||||
mature_trees_count=mature_trees,
|
||||
young_trees_count=young_trees,
|
||||
dominant_species=dominant_species_names,
|
||||
species_diversity_score=diversity_score
|
||||
)
|
||||
|
||||
def _analyze_shade_coverage(
|
||||
self,
|
||||
lat: float,
|
||||
lng: float,
|
||||
trees: List[StreetTree]
|
||||
) -> TreeShadeAnalysis:
|
||||
"""Analyze shade coverage for picnic spot evaluation."""
|
||||
|
||||
trees_50m = 0
|
||||
trees_100m = 0
|
||||
large_trees = []
|
||||
|
||||
for tree in trees:
|
||||
distance = geodesic((lat, lng), (tree.coordinates.lat, tree.coordinates.lng)).meters
|
||||
|
||||
if distance <= 50:
|
||||
trees_50m += 1
|
||||
if distance <= 100:
|
||||
trees_100m += 1
|
||||
|
||||
# Consider large trees (good crown diameter or height)
|
||||
if ((tree.crown_diameter_m and tree.crown_diameter_m > 8) or
|
||||
(tree.height_m and tree.height_m > 15) or
|
||||
(tree.age and tree.age > 30)):
|
||||
large_trees.append(tree)
|
||||
|
||||
# Estimate shade coverage
|
||||
shade_coverage = 0
|
||||
if trees_50m > 0:
|
||||
shade_coverage = min(100, trees_50m * 15) # Rough estimate
|
||||
|
||||
# Shade quality based on tree density and size
|
||||
shade_quality = 0
|
||||
if trees_50m > 3:
|
||||
shade_quality = 80
|
||||
elif trees_50m > 1:
|
||||
shade_quality = 60
|
||||
elif trees_100m > 5:
|
||||
shade_quality = 40
|
||||
elif trees_100m > 2:
|
||||
shade_quality = 20
|
||||
|
||||
# Best shade times (simplified)
|
||||
best_times = []
|
||||
if shade_quality > 60:
|
||||
best_times = ["10:00-12:00", "14:00-16:00"]
|
||||
elif shade_quality > 30:
|
||||
best_times = ["11:00-13:00"]
|
||||
|
||||
return TreeShadeAnalysis(
|
||||
has_nearby_trees=len(trees) > 0,
|
||||
trees_within_50m=trees_50m,
|
||||
trees_within_100m=trees_100m,
|
||||
estimated_shade_coverage=shade_coverage,
|
||||
shade_quality_score=shade_quality,
|
||||
best_shade_times=best_times,
|
||||
nearby_large_trees=large_trees[:5], # Limit to 5 for response size
|
||||
canopy_density=len(large_trees) / max(1, len(trees)) if trees else 0
|
||||
)
|
||||
|
||||
async def search_trees(self, filters: TreesSearchFilters) -> List[StreetTree]:
|
||||
"""Search trees with filters."""
|
||||
trees_data = self._load_trees()
|
||||
filtered_trees = []
|
||||
|
||||
for tree_data in trees_data:
|
||||
# Apply location filter first if specified
|
||||
if (filters.center_lat and filters.center_lng and filters.within_radius_m):
|
||||
tree_lat = tree_data.get('lat')
|
||||
tree_lng = tree_data.get('lng')
|
||||
if tree_lat is None or tree_lng is None:
|
||||
continue
|
||||
|
||||
distance = geodesic(
|
||||
(filters.center_lat, filters.center_lng),
|
||||
(tree_lat, tree_lng)
|
||||
).meters
|
||||
if distance > filters.within_radius_m:
|
||||
continue
|
||||
|
||||
# Apply other filters
|
||||
if filters.species and tree_data.get('species_german') not in filters.species:
|
||||
continue
|
||||
|
||||
if filters.district and tree_data.get('district') != filters.district:
|
||||
continue
|
||||
|
||||
if filters.min_age and (not tree_data.get('age') or tree_data['age'] < filters.min_age):
|
||||
continue
|
||||
|
||||
if filters.max_age and (not tree_data.get('age') or tree_data['age'] > filters.max_age):
|
||||
continue
|
||||
|
||||
if filters.min_height and (not tree_data.get('height_m') or tree_data['height_m'] < filters.min_height):
|
||||
continue
|
||||
|
||||
if filters.max_height and (not tree_data.get('height_m') or tree_data['height_m'] > filters.max_height):
|
||||
continue
|
||||
|
||||
tree = self._create_tree_from_dict(tree_data)
|
||||
filtered_trees.append(tree)
|
||||
|
||||
return filtered_trees
|
||||
|
||||
async def get_tree_stats(self) -> Dict[str, Any]:
|
||||
"""Get overall statistics about Berlin street trees."""
|
||||
trees_data = self._load_trees()
|
||||
|
||||
if not trees_data:
|
||||
return {"error": "No tree data available"}
|
||||
|
||||
# Count by district
|
||||
district_counts = {}
|
||||
species_counts = {}
|
||||
age_distribution = {"0-10": 0, "11-20": 0, "21-50": 0, "51+": 0, "unknown": 0}
|
||||
|
||||
for tree in trees_data:
|
||||
# District stats
|
||||
district = tree.get('district')
|
||||
if district:
|
||||
district_counts[district] = district_counts.get(district, 0) + 1
|
||||
|
||||
# Species stats
|
||||
species = tree.get('species_german')
|
||||
if species:
|
||||
species_counts[species] = species_counts.get(species, 0) + 1
|
||||
|
||||
# Age distribution
|
||||
age = tree.get('age')
|
||||
if age is None:
|
||||
age_distribution["unknown"] += 1
|
||||
elif age <= 10:
|
||||
age_distribution["0-10"] += 1
|
||||
elif age <= 20:
|
||||
age_distribution["11-20"] += 1
|
||||
elif age <= 50:
|
||||
age_distribution["21-50"] += 1
|
||||
else:
|
||||
age_distribution["51+"] += 1
|
||||
|
||||
# Top 10 species
|
||||
top_species = sorted(species_counts.items(), key=lambda x: x[1], reverse=True)[:10]
|
||||
|
||||
return {
|
||||
"total_trees": len(trees_data),
|
||||
"districts": len(district_counts),
|
||||
"unique_species": len(species_counts),
|
||||
"district_counts": district_counts,
|
||||
"age_distribution": age_distribution,
|
||||
"top_species": dict(top_species),
|
||||
"last_updated": datetime.now().isoformat()
|
||||
}
|
|
@ -1,89 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Inspect the street trees JSON file structure without loading the entire file.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
|
||||
def inspect_street_trees():
|
||||
"""Inspect the street trees JSON file structure."""
|
||||
|
||||
file_path = "app/data/processed/street_trees.json"
|
||||
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
# Read just the beginning to get metadata
|
||||
content = f.read(2000) # Read first 2KB
|
||||
|
||||
# Find the metadata section
|
||||
if '"street_trees":' in content:
|
||||
# Extract metadata before the trees array
|
||||
metadata_end = content.find('"street_trees":')
|
||||
metadata_part = content[:metadata_end]
|
||||
|
||||
# Try to parse what we can
|
||||
print("File structure inspection:")
|
||||
print(f"File size: ~414MB")
|
||||
|
||||
# Look for key metadata fields
|
||||
if '"count":' in content:
|
||||
count_start = content.find('"count":') + 8
|
||||
count_end = content.find(',', count_start)
|
||||
if count_end == -1:
|
||||
count_end = content.find('}', count_start)
|
||||
count_str = content[count_start:count_end].strip()
|
||||
print(f"Tree count: {count_str}")
|
||||
|
||||
if '"processed_count":' in content:
|
||||
proc_start = content.find('"processed_count":') + 18
|
||||
proc_end = content.find(',', proc_start)
|
||||
proc_str = content[proc_start:proc_end].strip()
|
||||
print(f"Processed count: {proc_str}")
|
||||
|
||||
if '"skipped_count":' in content:
|
||||
skip_start = content.find('"skipped_count":') + 16
|
||||
skip_end = content.find(',', skip_start)
|
||||
skip_str = content[skip_start:skip_end].strip()
|
||||
print(f"Skipped count: {skip_str}")
|
||||
|
||||
# Now let's find the first tree to see the structure
|
||||
trees_start = content.find('"street_trees": [')
|
||||
if trees_start != -1:
|
||||
# Read a bit more to get the first tree
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
f.seek(trees_start + 17) # Skip to after the array start
|
||||
tree_content = f.read(1000) # Read 1KB to get first tree
|
||||
|
||||
# Find the first complete tree object
|
||||
first_brace = tree_content.find('{')
|
||||
if first_brace != -1:
|
||||
brace_count = 0
|
||||
end_pos = first_brace
|
||||
for i, char in enumerate(tree_content[first_brace:], first_brace):
|
||||
if char == '{':
|
||||
brace_count += 1
|
||||
elif char == '}':
|
||||
brace_count -= 1
|
||||
if brace_count == 0:
|
||||
end_pos = i + 1
|
||||
break
|
||||
|
||||
first_tree_str = tree_content[first_brace:end_pos]
|
||||
try:
|
||||
first_tree = json.loads(first_tree_str)
|
||||
print("\nFirst tree structure:")
|
||||
for key, value in first_tree.items():
|
||||
print(f" {key}: {type(value).__name__} = {value}")
|
||||
except json.JSONDecodeError:
|
||||
print("\nCould not parse first tree, but file exists and has data")
|
||||
|
||||
print("\nFile appears to be processed successfully!")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error inspecting file: {e}")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
inspect_street_trees()
|
|
@ -1,176 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Process Berlin Street Trees (Baumkataster) CSV data.
|
||||
Converts the raw CSV into a structured JSON format for use in the picnic API.
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add the app directory to the Python path
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
def process_street_trees():
|
||||
"""Process the street trees CSV file and create a JSON file."""
|
||||
|
||||
# File paths
|
||||
raw_file = Path("app/data/raw/Baumkataster_Berlin_-1586189165523919690.csv")
|
||||
processed_file = Path("app/data/processed/street_trees.json")
|
||||
|
||||
# Ensure processed directory exists
|
||||
processed_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
print(f"Reading street trees data from: {raw_file}")
|
||||
|
||||
if not raw_file.exists():
|
||||
print(f"Error: Raw file not found at {raw_file}")
|
||||
return False
|
||||
|
||||
try:
|
||||
# Read the CSV file
|
||||
df = pd.read_csv(raw_file, encoding='utf-8')
|
||||
print(f"Loaded {len(df)} street trees from CSV")
|
||||
|
||||
# Display column names for debugging
|
||||
print("Columns in CSV:", df.columns.tolist())
|
||||
|
||||
# Clean and process the data
|
||||
trees = []
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
for idx, row in df.iterrows():
|
||||
try:
|
||||
# Extract coordinates
|
||||
x_coord = row.get('x')
|
||||
y_coord = row.get('y')
|
||||
|
||||
# Skip rows with missing coordinates
|
||||
if pd.isna(x_coord) or pd.isna(y_coord):
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Convert coordinates to lat/lng (assuming they're in EPSG:25833 - ETRS89 / UTM zone 33N)
|
||||
# For now, we'll use them as-is and convert later if needed
|
||||
# In a real implementation, you'd use a proper coordinate transformation
|
||||
|
||||
# Basic coordinate validation (Berlin area check)
|
||||
if not (1480000 <= x_coord <= 1520000 and 6870000 <= y_coord <= 6920000):
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Convert UTM to approximate lat/lng for Berlin area
|
||||
# This is a rough approximation - in production use proper coordinate transformation
|
||||
lat = 52.3 + (y_coord - 6870000) / 111000 # Rough conversion
|
||||
lng = 13.0 + (x_coord - 1480000) / 71000 # Rough conversion
|
||||
|
||||
# Validate converted coordinates
|
||||
if not (52.3 <= lat <= 52.7 and 13.0 <= lng <= 13.8):
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Extract tree information
|
||||
tree_data = {
|
||||
"id": f"tree_{processed_count + 1}",
|
||||
"object_id": row.get('OBJECTID'),
|
||||
"tree_id": row.get('Baum ID'),
|
||||
"location_number": row.get('Standort Nr'),
|
||||
"identifier": row.get('Kennzeich'),
|
||||
"object_name": row.get('Objektname'),
|
||||
"species_german": row.get('Art'),
|
||||
"species_botanical": row.get('Art Botanisch'),
|
||||
"genus_german": row.get('Gattung'),
|
||||
"genus_botanical": row.get('Gattung Botanisch'),
|
||||
"planting_year": row.get('Pflanzjahr'),
|
||||
"age": row.get('Standalter'),
|
||||
"crown_diameter_m": row.get('Krone Durchschnitt (m)'),
|
||||
"trunk_circumference_cm": row.get('Stammumfang (cm)'),
|
||||
"height_m": row.get('Höhe (m)'),
|
||||
"district": row.get('Bezirk'),
|
||||
"owner": row.get('Eigentümer'),
|
||||
"category": row.get('Kategorie'),
|
||||
"street": row.get('Straße'),
|
||||
"house_number": row.get('Haus Nr'),
|
||||
"address_addition": row.get('Adresszusatz'),
|
||||
"lat": round(lat, 6),
|
||||
"lng": round(lng, 6),
|
||||
"x_coord": x_coord,
|
||||
"y_coord": y_coord
|
||||
}
|
||||
|
||||
# Clean up None values and convert to appropriate types
|
||||
for key, value in tree_data.items():
|
||||
if pd.isna(value):
|
||||
tree_data[key] = None
|
||||
elif key in ['planting_year', 'age', 'trunk_circumference_cm'] and value is not None:
|
||||
try:
|
||||
tree_data[key] = int(float(value))
|
||||
except (ValueError, TypeError):
|
||||
tree_data[key] = None
|
||||
elif key in ['crown_diameter_m', 'height_m'] and value is not None:
|
||||
try:
|
||||
tree_data[key] = float(value)
|
||||
except (ValueError, TypeError):
|
||||
tree_data[key] = None
|
||||
elif isinstance(value, str):
|
||||
tree_data[key] = value.strip()
|
||||
|
||||
trees.append(tree_data)
|
||||
processed_count += 1
|
||||
|
||||
# Progress indicator
|
||||
if processed_count % 10000 == 0:
|
||||
print(f"Processed {processed_count} trees...")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing row {idx}: {e}")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Create the final data structure
|
||||
output_data = {
|
||||
"street_trees": trees,
|
||||
"count": len(trees),
|
||||
"processed_count": processed_count,
|
||||
"skipped_count": skipped_count,
|
||||
"last_updated": datetime.now().isoformat(),
|
||||
"source": "baumkataster_csv",
|
||||
"coordinate_system": "EPSG:25833_converted_to_WGS84",
|
||||
"note": "Coordinates converted from UTM to approximate WGS84. Use proper coordinate transformation in production."
|
||||
}
|
||||
|
||||
# Write to JSON file
|
||||
print(f"Writing {len(trees)} trees to: {processed_file}")
|
||||
with open(processed_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(output_data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"Successfully processed street trees data:")
|
||||
print(f" - Total rows in CSV: {len(df)}")
|
||||
print(f" - Successfully processed: {processed_count}")
|
||||
print(f" - Skipped (invalid data): {skipped_count}")
|
||||
print(f" - Output file: {processed_file}")
|
||||
|
||||
# Display some sample data
|
||||
if trees:
|
||||
print("\nSample tree data:")
|
||||
sample_tree = trees[0]
|
||||
for key, value in sample_tree.items():
|
||||
print(f" {key}: {value}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing street trees data: {e}")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = process_street_trees()
|
||||
if success:
|
||||
print("\nStreet trees processing completed successfully!")
|
||||
else:
|
||||
print("\nStreet trees processing failed!")
|
||||
sys.exit(1)
|
Loading…
Reference in New Issue