add basic models and toilet

This commit is contained in:
Gal 2025-06-18 22:14:26 +02:00
parent 897cbfdca8
commit 59e27534b0
Signed by: gal
GPG Key ID: F035BC65003BC00B
17 changed files with 2838 additions and 5 deletions

78
.gitignore vendored
View File

@ -1,10 +1,82 @@
# Python-generated files
# .gitignore
# Python
__pycache__/
*.py[oc]
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Virtual environments
venv/
.venv/
ENV/
env/
.env
.venv
# uv
.uv/
uv.lock
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Project specific
app/data/raw/*
app/data/processed/*
app/data/cache/*
!app/data/raw/.gitkeep
!app/data/processed/.gitkeep
!app/data/cache/.gitkeep
# Logs
*.log
logs/
docs/
# Database
*.db
*.sqlite3
# Testing
.coverage
htmlcov/
.pytest_cache/
.tox/
# MyPy
.mypy_cache/
.dmypy.json
dmypy.json
# Ruff
.ruff_cache/

157
README.md
View File

@ -0,0 +1,157 @@
# Berlin Picnic Zone Finder API
Find the perfect picnic zones in Berlin based on your personality and preferences.
## 🚀 Quick Start
### Prerequisites
- Python 3.11+
- [uv](https://github.com/astral-sh/uv) package manager
### Installation
```bash
# Clone the repository
git clone https://github.com/yourusername/berlin-picnic-api.git
cd berlin-picnic-api
# Install dependencies
uv sync --dev
# Create environment file
cp .env.example .env
# Start the development server
uv run uvicorn app.main:app --reload
```
The API will be available at `http://localhost:8000`
## 📚 API Documentation
- **Swagger UI**: http://localhost:8000/docs
- **ReDoc**: http://localhost:8000/redoc
## 🏗️ Project Structure
```
app/
├── main.py # FastAPI application
├── config.py # Configuration settings
├── models/ # Pydantic models
├── services/ # Business logic
├── utils/ # Utility functions
├── routers/ # API route handlers
└── data/ # Data storage
```
## 🔧 Development
### Code Quality
```bash
# Format code
uv run ruff format .
# Lint code
uv run ruff check . --fix
# Type checking
uv run mypy app/
# Run tests
uv run pytest
# Run all checks
uv run ruff check . && uv run ruff format . && uv run mypy app/ && uv run pytest
```
### Data Management
```bash
# Fetch Berlin open data
uv run python scripts/fetch_berlin_data.py
# Process zone data
uv run python scripts/process_zones.py
```
## 🌍 API Endpoints
### Zones
- `GET /zones` - Get zones filtered by personality and preferences
- `GET /zones/{zone_id}` - Get specific zone details
- `GET /zones/{zone_id}/scores` - Get personality scores for a zone
### Neighborhoods
- `GET /neighborhoods` - Get all Berlin neighborhoods
### Health
- `GET /health` - Basic health check
- `GET /health/ready` - Readiness check
## 🎯 Personality Types
- `little_adventurers` - Families with kids
- `date_night` - Romantic couples
- `squad_goals` - Friend groups
- `zen_masters` - Peace seekers
- `active_lifestyle` - Fitness enthusiasts
- `wildlife_lover` - Nature enthusiasts
- `art_nerd` - Art and culture lovers
- `history_geek` - History enthusiasts
## 🗺️ Berlin Neighborhoods
- Mitte, Kreuzberg, Friedrichshain, Prenzlauer Berg
- Charlottenburg, Wilmersdorf, Schöneberg, Tempelhof
- Neukölln, Pankow, Wedding, Moabit
## 📊 Scoring Factors
- Tree coverage and shade
- Noise levels
- Wildlife diversity
- Nearby amenities (Späti, restaurants, toilets)
- Playgrounds and water features
- Public transport accessibility
- Fitness facilities and paths
## 🚀 Deployment
### Docker
```bash
# Build image
docker build -t berlin-picnic-api .
# Run container
docker run -p 8000:8000 berlin-picnic-api
```
### Docker Compose
```bash
# Start all services
docker-compose up -d
```
## 🤝 Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Run tests and linting
5. Submit a pull request
## 📄 License
MIT License - see LICENSE file for details
## 📞 Support
- GitHub Issues: [Create an issue](https://github.com/yourusername/berlin-picnic-api/issues)
- Email: your.email@example.com

70
app/config.py Normal file
View File

@ -0,0 +1,70 @@
# app/config.py
"""Application configuration."""
from typing import List
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
)
# API Configuration
API_TITLE: str = "Berlin Picnic Zone Finder API"
API_VERSION: str = "1.0.0"
DEBUG: bool = False
HOST: str = "0.0.0.0"
PORT: int = 8000
# CORS Settings
ALLOWED_ORIGINS: List[str] = [
"http://localhost:3000",
"http://127.0.0.1:3000",
"https://your-frontend-domain.com",
]
# Berlin Open Data APIs
BERLIN_OPEN_DATA_API_KEY: str = ""
EBIRD_API_KEY: str = ""
INATURALIST_API_URL: str = "https://api.inaturalist.org/v1"
# Database
DATABASE_URL: str = "sqlite:///./berlin_zones.db"
# Redis Cache
REDIS_URL: str = "redis://localhost:6379"
CACHE_TTL: int = 3600
# External APIs
OPENSTREETMAP_API_URL: str = "https://nominatim.openstreetmap.org"
WEATHER_API_KEY: str = ""
# Logging
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# Data Processing
MAX_ZONES_PER_REQUEST: int = 100
DEFAULT_SEARCH_RADIUS: int = 1000 # meters
SCORING_CACHE_TTL: int = 1800 # seconds
# Berlin specific
BERLIN_CENTER_LAT: float = 52.5200
BERLIN_CENTER_LNG: float = 13.4050
BERLIN_BBOX: List[float] = [
13.0883,
52.3382,
13.7611,
52.6755,
] # [min_lng, min_lat, max_lng, max_lat]
# Create settings instance
settings = Settings()

39
app/main.py Normal file
View File

@ -0,0 +1,39 @@
# app/main.py - Refactored API Structure
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.routers import health, green_spaces, amenities, info, admin
# Create FastAPI app
app = FastAPI(
title="Berlin Green Space Personality Scorer",
description="Score any green space in Berlin based on personality preferences",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(health.router)
app.include_router(green_spaces.router)
app.include_router(amenities.router)
app.include_router(info.router)
app.include_router(admin.router)
@app.get("/")
async def root():
"""Root endpoint with API information."""
return {
"message": "Berlin Green Space Personality Scorer API",
"version": "1.0.0",
"docs": "/docs",
"health": "/health"
}

139
app/models/green_space.py Normal file
View File

@ -0,0 +1,139 @@
# app/models/green_space.py
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Tuple, Any
from datetime import datetime
from enum import Enum
class GreenSpaceType(str, Enum):
PARK = "park"
GARDEN = "garden"
FOREST = "forest"
RIVERSIDE = "riverside"
CEMETERY = "cemetery"
PLAYGROUND = "playground"
SPORTS_FIELD = "sports_field"
ALLOTMENT = "allotment"
SQUARE = "square"
OTHER = "other"
class NoiseLevel(int, Enum):
VERY_QUIET = 1
QUIET = 2
MODERATE = 3
NOISY = 4
VERY_NOISY = 5
class AmenityType(str, Enum):
TOILET = "toilet"
RESTAURANT = "restaurant"
CAFE = "cafe"
ICE_CREAM = "ice_cream"
SPATI = "spati"
PLAYGROUND = "playground"
WATER_FEATURE = "water_feature"
FITNESS = "fitness"
BIKE_RENTAL = "bike_rental"
PARKING = "parking"
PUBLIC_TRANSPORT = "public_transport"
class Coordinates(BaseModel):
lat: float = Field(..., ge=-90, le=90)
lng: float = Field(..., ge=-180, le=180)
class Amenity(BaseModel):
id: str
name: str
type: AmenityType
coordinates: Coordinates
distance_meters: int
rating: Optional[float] = Field(None, ge=0, le=5)
opening_hours: Optional[str] = None
description: Optional[str] = None
class EnvironmentalFeatures(BaseModel):
"""Environmental characteristics of the green space."""
tree_coverage_percent: int = Field(..., ge=0, le=100, description="Percentage of area covered by trees")
shade_quality: int = Field(..., ge=0, le=100, description="Quality of shade availability")
noise_level: NoiseLevel = Field(..., description="Average noise level")
wildlife_diversity_score: int = Field(..., ge=0, le=100, description="Biodiversity indicator")
water_features: bool = Field(False, description="Has ponds, fountains, or streams")
natural_surface_percent: int = Field(..., ge=0, le=100, description="Percentage of natural vs paved surface")
class AccessibilityFeatures(BaseModel):
"""Accessibility and infrastructure features."""
wheelchair_accessible: bool = Field(False)
public_transport_score: int = Field(..., ge=1, le=5, description="Public transport accessibility")
cycling_infrastructure: bool = Field(False, description="Has bike paths or bike parking")
parking_availability: int = Field(..., ge=1, le=5, description="Car parking availability")
lighting_quality: int = Field(..., ge=1, le=5, description="Evening lighting quality")
class RecreationFeatures(BaseModel):
"""Recreation and activity features."""
playground_quality: int = Field(0, ge=0, le=100, description="Playground facilities quality")
sports_facilities: bool = Field(False, description="Has sports courts, fitness equipment")
running_paths: bool = Field(False, description="Has designated running/walking paths")
cycling_paths: bool = Field(False, description="Has cycling paths")
dog_friendly: bool = Field(True, description="Dogs allowed")
bbq_allowed: bool = Field(False, description="BBQ/grilling allowed")
class PersonalityScore(BaseModel):
"""Personality-specific score with explanation."""
personality: str
score: int = Field(..., ge=0, le=100)
explanation: str
key_factors: List[str]
recommendations: List[str]
class GreenSpace(BaseModel):
"""Complete green space model with all features and scoring."""
# Basic info
id: str
name: str
description: Optional[str] = None
type: GreenSpaceType
# Location
coordinates: Coordinates
neighborhood: str
address: Optional[str] = None
# Physical characteristics
area_sqm: Optional[int] = Field(None, ge=1, description="Area in square meters")
perimeter_m: Optional[int] = Field(None, ge=1, description="Perimeter in meters")
# Features
environmental: EnvironmentalFeatures
accessibility: AccessibilityFeatures
recreation: RecreationFeatures
# Nearby amenities (within search radius)
nearby_amenities: List[Amenity] = []
# Personality scoring (for current request)
current_personality_score: Optional[PersonalityScore] = None
all_personality_scores: Optional[Dict[str, int]] = None
# Metadata
last_updated: datetime
data_sources: List[str] = []
confidence_score: int = Field(..., ge=0, le=100, description="Data quality confidence")
class ScoringRequest(BaseModel):
"""Request parameters for green space scoring."""
personality: str
location: Optional[Tuple[float, float]] = None # (lat, lng)
radius: int = Field(2000, ge=100, le=10000)
neighborhood: Optional[str] = None
min_score: int = Field(60, ge=0, le=100)
filters: Dict[str, Any] = {}
limit: int = Field(20, ge=1, le=100)
include_amenities: bool = False
class LocationScore(BaseModel):
"""Score for a specific location within a green space."""
coordinates: Coordinates
score: int = Field(..., ge=0, le=100)
explanation: str
nearby_features: List[str]
best_for: List[str] # Best personality types for this location
recommendations: List[str]

37
app/models/response.py Normal file
View File

@ -0,0 +1,37 @@
# app/models/response.py
from pydantic import BaseModel
from typing import List, Dict, Any, Optional
from app.models.green_space import GreenSpace, Coordinates, Amenity, PersonalityScore, LocationScore
class GreenSpaceResponse(BaseModel):
"""Response model for green space search."""
green_spaces: List[GreenSpace]
total_found: int
search_params: Dict[str, Any]
personality: str
search_location: Optional[Coordinates] = None
class DetailedAnalysis(BaseModel):
"""Detailed analysis of a green space."""
green_space: GreenSpace
personality_breakdown: Dict[str, PersonalityScore]
best_locations_within: List[LocationScore] # Best spots within this space
seasonal_recommendations: Dict[str, str] # Spring, Summer, Fall, Winter
optimal_visit_times: List[str]
similar_spaces: List[str] # IDs of similar green spaces
class NearbyAmenitiesResponse(BaseModel):
"""Response for amenity search."""
location: Coordinates
radius: int
amenities: Dict[str, List[Amenity]] # Grouped by type
summary: Dict[str, int] # Count by type
class DiscoveryResult(BaseModel):
"""Result for green space discovery."""
green_space: GreenSpace
distance_meters: int
travel_time_walking: int # minutes
travel_time_cycling: int # minutes
why_recommended: str
best_route_description: str

18
app/routers/admin.py Normal file
View File

@ -0,0 +1,18 @@
# app/routers/admin.py
from fastapi import APIRouter, HTTPException
from app.services.berlin_data_service import BerlinDataService
router = APIRouter(prefix="/admin", tags=["admin"])
# Services
berlin_data = BerlinDataService()
@router.post("/refresh-data")
async def refresh_berlin_data():
"""Refresh cached Berlin open data (admin endpoint)."""
try:
result = await berlin_data.refresh_all_data()
return {"status": "success", "refreshed": result}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Refresh failed: {str(e)}")

34
app/routers/amenities.py Normal file
View File

@ -0,0 +1,34 @@
# app/routers/amenities.py
from fastapi import APIRouter, Query, HTTPException
from typing import Optional, List
from app.models.response import NearbyAmenitiesResponse
from app.services.berlin_data_service import BerlinDataService
router = APIRouter(prefix="/amenities", tags=["amenities"])
# Services
berlin_data = BerlinDataService()
@router.get("/near-location")
async def get_amenities_near_location(
lat: float = Query(..., description="Latitude"),
lng: float = Query(..., description="Longitude"),
radius: int = Query(500, ge=100, le=2000, description="Search radius in meters"),
types: Optional[List[str]] = Query(None, description="Amenity types to include"),
):
"""Get all amenities near a specific location."""
try:
amenities = await berlin_data.get_amenities_near_point(
lat, lng, radius, types
)
return NearbyAmenitiesResponse(
location={"lat": lat, "lng": lng},
radius=radius,
amenities={"all": amenities}, # Group by type in real implementation
summary=berlin_data.summarize_amenities(amenities)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Amenity search failed: {str(e)}")

187
app/routers/green_spaces.py Normal file
View File

@ -0,0 +1,187 @@
# app/routers/green_spaces.py
from fastapi import APIRouter, Query, HTTPException
from typing import Optional, List
from enum import Enum
from app.models.green_space import ScoringRequest
from app.models.response import GreenSpaceResponse, NearbyAmenitiesResponse
from app.services.green_space_service import GreenSpaceService
from app.services.berlin_data_service import BerlinDataService
router = APIRouter(prefix="/green-spaces", tags=["green-spaces"])
# Services
green_space_service = GreenSpaceService()
berlin_data = BerlinDataService()
class PersonalityType(str, Enum):
LITTLE_ADVENTURERS = "little_adventurers"
DATE_NIGHT = "date_night"
SQUAD_GOALS = "squad_goals"
ZEN_MASTERS = "zen_masters"
ACTIVE_LIFESTYLE = "active_lifestyle"
WILDLIFE_LOVER = "wildlife_lover"
ART_NERD = "art_nerd"
HISTORY_GEEK = "history_geek"
@router.get("/search", response_model=GreenSpaceResponse)
async def search_green_spaces(
# Location filters
lat: Optional[float] = Query(None, description="Latitude for location-based search"),
lng: Optional[float] = Query(None, description="Longitude for location-based search"),
radius: int = Query(2000, ge=100, le=10000, description="Search radius in meters"),
# Area filters
neighborhood: Optional[str] = Query(None, description="Berlin neighborhood"),
# Personality scoring
personality: PersonalityType = Query(..., description="Personality type for scoring"),
min_score: int = Query(60, ge=0, le=100, description="Minimum personality score"),
# Feature filters
min_size: Optional[int] = Query(None, ge=100, description="Minimum area in square meters"),
has_water: Optional[bool] = Query(None, description="Must have water features"),
has_playground: Optional[bool] = Query(None, description="Must have playground nearby"),
max_noise_level: Optional[int] = Query(None, ge=1, le=5, description="Maximum noise level"),
# Response options
limit: int = Query(20, ge=1, le=100, description="Maximum results"),
include_amenities: bool = Query(False, description="Include detailed amenity data"),
):
"""
Search for green spaces in Berlin and score them for a personality type.
This endpoint dynamically analyzes Berlin's green spaces and scores them
based on the selected personality preferences.
"""
try:
# Build search request
search_request = ScoringRequest(
personality=personality.value,
location=(lat, lng) if lat and lng else None,
radius=radius,
neighborhood=neighborhood,
min_score=min_score,
filters={
"min_size": min_size,
"has_water": has_water,
"has_playground": has_playground,
"max_noise_level": max_noise_level,
},
limit=limit,
include_amenities=include_amenities
)
# Get and score green spaces
results = await green_space_service.search_and_score(search_request)
return GreenSpaceResponse(
green_spaces=results,
total_found=len(results),
search_params=search_request.dict(),
personality=personality.value
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
@router.get("/score-location")
async def score_specific_location(
lat: float = Query(..., description="Latitude of location to score"),
lng: float = Query(..., description="Longitude of location to score"),
personality: PersonalityType = Query(..., description="Personality type"),
radius: int = Query(300, ge=50, le=1000, description="Analysis radius in meters"),
):
"""
Score a specific location for a personality type.
Analyzes the immediate area around the given coordinates and provides
a personality-based score with detailed explanations.
"""
try:
# Check if location is in a green space
green_space = await berlin_data.get_green_space_at_location(lat, lng)
if not green_space:
return {
"score": 0,
"explanation": "Location is not in a recognized green space",
"location": {"lat": lat, "lng": lng},
"personality": personality.value
}
# Score the location
from app.services.scoring_engine import ScoringEngine
scoring_engine = ScoringEngine()
score_result = await scoring_engine.score_location(
lat, lng, personality.value, radius
)
return score_result
except Exception as e:
raise HTTPException(status_code=500, detail=f"Scoring failed: {str(e)}")
@router.get("/discover")
async def discover_green_spaces(
lat: float = Query(..., description="Your current latitude"),
lng: float = Query(..., description="Your current longitude"),
max_distance: int = Query(5000, ge=500, le=20000, description="Maximum distance in meters"),
personality: PersonalityType = Query(..., description="Your personality type"),
):
"""
Discover highly-rated green spaces near your location.
Returns the best green spaces within walking/cycling distance,
scored for your personality type.
"""
try:
discoveries = await green_space_service.discover_nearby(
lat, lng, max_distance, personality.value
)
return {
"discoveries": discoveries,
"your_location": {"lat": lat, "lng": lng},
"search_radius": max_distance,
"personality": personality.value
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Discovery failed: {str(e)}")
@router.get("/{space_id}/analysis")
async def analyze_green_space(
space_id: str,
personality: Optional[PersonalityType] = Query(None, description="Personality for focused analysis"),
):
"""
Get detailed analysis of a specific green space.
Provides comprehensive scoring across all personalities or
focused analysis for a specific personality type.
"""
try:
analysis = await green_space_service.get_detailed_analysis(
space_id, personality.value if personality else None
)
if not analysis:
raise HTTPException(status_code=404, detail="Green space not found")
return analysis
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
@router.get("/conditions")
async def get_current_conditions(
lat: float = Query(..., description="Latitude"),
lng: float = Query(..., description="Longitude"),
):
"""Get current conditions at a green space (weather, crowds, etc.)."""
try:
conditions = await berlin_data.get_current_conditions(lat, lng)
return conditions
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get conditions: {str(e)}")

100
app/routers/health.py Normal file
View File

@ -0,0 +1,100 @@
# app/routers/health.py
"""Health check endpoints."""
from fastapi import APIRouter
from datetime import datetime
router = APIRouter()
@router.get("/")
async def health_check():
"""Basic health check endpoint."""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"service": "berlin-picnic-api",
}
@router.get("/ready")
async def readiness_check():
"""Readiness check - indicates if service is ready to serve traffic."""
# Add checks for database, external APIs, etc.
return {
"status": "ready",
"timestamp": datetime.utcnow().isoformat(),
"checks": {
"database": "ok",
"cache": "ok",
"external_apis": "ok",
},
}
# app/routers/zones.py
"""Zone-related endpoints."""
from fastapi import APIRouter, Query
from typing import Optional, List
router = APIRouter()
@router.get("/")
async def get_zones(
personality: str = Query(..., description="Personality type"),
neighborhood: Optional[str] = Query(None, description="Berlin neighborhood"),
min_score: int = Query(60, ge=0, le=100, description="Minimum score"),
limit: int = Query(20, ge=1, le=100, description="Maximum results"),
):
"""Get zones filtered by personality and preferences."""
# Placeholder response - will be implemented with actual logic
return {
"zones": [],
"total_count": 0,
"personality": personality,
"filters_applied": {
"neighborhood": neighborhood,
"min_score": min_score,
"limit": limit,
},
}
@router.get("/{zone_id}")
async def get_zone_details(zone_id: str):
"""Get detailed information about a specific zone."""
return {
"zone_id": zone_id,
"message": "Zone details endpoint - implementation coming soon",
}
# app/routers/neighborhoods.py
"""Neighborhood-related endpoints."""
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
async def get_neighborhoods():
"""Get all Berlin neighborhoods with zone counts."""
# Placeholder - will return actual neighborhood data
return {
"neighborhoods": [
{"name": "mitte", "display_name": "Mitte", "zone_count": 15},
{"name": "kreuzberg", "display_name": "Kreuzberg", "zone_count": 12},
{
"name": "prenzlauer_berg",
"display_name": "Prenzlauer Berg",
"zone_count": 10,
},
]
}
# app/routers/__init__.py
"""Router modules."""

82
app/routers/info.py Normal file
View File

@ -0,0 +1,82 @@
# app/routers/info.py
from fastapi import APIRouter, HTTPException
from app.services.berlin_data_service import BerlinDataService
router = APIRouter(tags=["info"])
# Services
berlin_data = BerlinDataService()
@router.get("/neighborhoods")
async def get_neighborhoods():
"""Get all Berlin neighborhoods with green space statistics."""
try:
neighborhoods = await berlin_data.get_neighborhood_stats()
return neighborhoods
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get neighborhoods: {str(e)}")
@router.get("/personalities")
async def get_personality_descriptions():
"""Get descriptions of all personality types and their preferences."""
return {
"personalities": [
{
"id": "little_adventurers",
"name": "Little Adventurers",
"description": "Families with kids seeking safe, fun spaces",
"key_factors": ["playgrounds", "safety", "toilets", "shade", "family_activities"],
"icon": "🧸"
},
{
"id": "date_night",
"name": "Date Night",
"description": "Couples seeking romantic, peaceful settings",
"key_factors": ["scenery", "quiet", "romantic_spots", "restaurants", "privacy"],
"icon": "💕"
},
{
"id": "squad_goals",
"name": "Squad Goals",
"description": "Friend groups needing space for gatherings",
"key_factors": ["large_space", "group_activities", "accessibility", "nearby_food"],
"icon": "🎉"
},
{
"id": "zen_masters",
"name": "Zen Masters",
"description": "Peace seekers wanting tranquil environments",
"key_factors": ["quietness", "nature", "meditation_spots", "low_crowds"],
"icon": "🧘"
},
{
"id": "active_lifestyle",
"name": "Active Lifestyle",
"description": "Fitness enthusiasts seeking exercise opportunities",
"key_factors": ["running_paths", "fitness_equipment", "cycling", "sports_areas"],
"icon": "🏃"
},
{
"id": "wildlife_lover",
"name": "Wildlife Lover",
"description": "Nature enthusiasts seeking animal encounters",
"key_factors": ["wildlife_diversity", "bird_watching", "natural_habitats", "quiet_observation"],
"icon": "🦋"
},
{
"id": "art_nerd",
"name": "Art Nerd",
"description": "Culture lovers seeking artistic inspiration",
"key_factors": ["museums_nearby", "artistic_installations", "cultural_venues", "creative_atmosphere"],
"icon": "🎨"
},
{
"id": "history_geek",
"name": "History Geek",
"description": "History buffs seeking sites with historical significance",
"key_factors": ["historical_sites", "monuments", "cultural_heritage", "educational_value"],
"icon": "🏛️"
}
]
}

View File

@ -0,0 +1,459 @@
# app/services/berlin_data_service.py
from typing import List, Optional, Tuple, Dict, Any
import asyncio
import json
from datetime import datetime
from pathlib import Path
from geopy.distance import geodesic
from app.models.green_space import (
GreenSpace, Coordinates, Amenity, AmenityType, GreenSpaceType,
EnvironmentalFeatures, AccessibilityFeatures, RecreationFeatures,
NoiseLevel, LocationScore
)
class BerlinDataService:
"""Service for accessing Berlin open data and external APIs."""
def __init__(self):
self.cache = {}
self.last_refresh = None
self._toilets_cache = None
self.data_dir = Path("app/data")
async def search_green_spaces(
self,
location: Optional[Tuple[float, float]] = None,
radius: int = 2000,
neighborhood: Optional[str] = None,
filters: Dict[str, Any] = None
) -> List[GreenSpace]:
"""Search for green spaces based on criteria."""
# This is a mock implementation - in a real app, this would query Berlin's open data
mock_spaces = await self._get_mock_green_spaces()
filtered_spaces = []
for space in mock_spaces:
# Apply location filter
if location:
distance = geodesic(
location,
(space.coordinates.lat, space.coordinates.lng)
).meters
if distance > radius:
continue
# Apply neighborhood filter
if neighborhood and space.neighborhood.lower() != neighborhood.lower():
continue
# Apply other filters
if filters:
if filters.get("min_size") and space.area_sqm and space.area_sqm < filters["min_size"]:
continue
if filters.get("has_water") and not space.environmental.water_features:
continue
if filters.get("has_playground") and space.recreation.playground_quality == 0:
continue
if filters.get("max_noise_level") and space.environmental.noise_level.value > filters["max_noise_level"]:
continue
filtered_spaces.append(space)
return filtered_spaces
async def get_green_space_by_id(self, space_id: str) -> Optional[GreenSpace]:
"""Get a specific green space by ID."""
spaces = await self._get_mock_green_spaces()
for space in spaces:
if space.id == space_id:
return space
return None
async def get_green_space_at_location(self, lat: float, lng: float) -> Optional[GreenSpace]:
"""Check if a location is within a green space."""
spaces = await self._get_mock_green_spaces()
for space in spaces:
# Simple distance check - in reality you'd use proper polygon containment
distance = geodesic(
(lat, lng),
(space.coordinates.lat, space.coordinates.lng)
).meters
if distance < 100: # Within 100m of center
return space
return None
async def get_green_spaces_within_radius(
self,
lat: float,
lng: float,
radius: int
) -> List[GreenSpace]:
"""Get all green spaces within a radius."""
spaces = await self._get_mock_green_spaces()
nearby_spaces = []
for space in spaces:
distance = geodesic(
(lat, lng),
(space.coordinates.lat, space.coordinates.lng)
).meters
if distance <= radius:
nearby_spaces.append(space)
return nearby_spaces
async def get_amenities_near_point(
self,
lat: float,
lng: float,
radius: int = 500,
amenity_types: Optional[List[str]] = None
) -> List[Amenity]:
"""Get amenities near a specific point."""
# Mock amenities data
mock_amenities = [
Amenity(
id="toilet_1",
name="Public Toilet",
type=AmenityType.TOILET,
coordinates=Coordinates(lat=lat + 0.001, lng=lng + 0.001),
distance_meters=100,
rating=3.5
),
Amenity(
id="cafe_1",
name="Park Café",
type=AmenityType.CAFE,
coordinates=Coordinates(lat=lat + 0.002, lng=lng - 0.001),
distance_meters=200,
rating=4.2,
opening_hours="8:00-18:00"
),
Amenity(
id="playground_1",
name="Children's Playground",
type=AmenityType.PLAYGROUND,
coordinates=Coordinates(lat=lat - 0.001, lng=lng + 0.002),
distance_meters=150,
rating=4.0
)
]
# Filter by types if specified
if amenity_types:
mock_amenities = [
amenity for amenity in mock_amenities
if amenity.type.value in amenity_types
]
# Filter by radius
filtered_amenities = []
for amenity in mock_amenities:
distance = geodesic(
(lat, lng),
(amenity.coordinates.lat, amenity.coordinates.lng)
).meters
if distance <= radius:
amenity.distance_meters = int(distance)
filtered_amenities.append(amenity)
return filtered_amenities
async def calculate_distance(
self,
lat1: float,
lng1: float,
lat2: float,
lng2: float
) -> int:
"""Calculate distance between two points in meters."""
return int(geodesic((lat1, lng1), (lat2, lng2)).meters)
async def find_similar_green_spaces(
self,
green_space: GreenSpace,
limit: int = 5
) -> List[GreenSpace]:
"""Find green spaces similar to the given one."""
all_spaces = await self._get_mock_green_spaces()
similar_spaces = []
for space in all_spaces:
if space.id == green_space.id:
continue
# Simple similarity based on type and features
similarity_score = 0
if space.type == green_space.type:
similarity_score += 30
if space.neighborhood == green_space.neighborhood:
similarity_score += 20
if space.environmental.water_features == green_space.environmental.water_features:
similarity_score += 15
if abs(space.environmental.tree_coverage_percent - green_space.environmental.tree_coverage_percent) < 20:
similarity_score += 15
if space.recreation.sports_facilities == green_space.recreation.sports_facilities:
similarity_score += 10
if similarity_score >= 50: # Threshold for similarity
similar_spaces.append(space)
# Sort by similarity and return top results
return similar_spaces[:limit]
async def get_neighborhood_stats(self) -> Dict[str, Any]:
"""Get statistics for Berlin neighborhoods."""
return {
"neighborhoods": [
{
"name": "mitte",
"display_name": "Mitte",
"green_space_count": 15,
"avg_personality_scores": {
"little_adventurers": 75,
"date_night": 80,
"squad_goals": 70,
"zen_masters": 65
}
},
{
"name": "kreuzberg",
"display_name": "Kreuzberg",
"green_space_count": 12,
"avg_personality_scores": {
"little_adventurers": 70,
"date_night": 75,
"squad_goals": 85,
"zen_masters": 60
}
},
{
"name": "prenzlauer_berg",
"display_name": "Prenzlauer Berg",
"green_space_count": 18,
"avg_personality_scores": {
"little_adventurers": 90,
"date_night": 70,
"squad_goals": 75,
"zen_masters": 70
}
}
]
}
async def get_current_conditions(self, lat: float, lng: float) -> Dict[str, Any]:
"""Get current conditions at a location."""
# Mock current conditions
return {
"weather": {
"temperature": 22,
"condition": "partly_cloudy",
"humidity": 65,
"wind_speed": 10
},
"crowd_level": "moderate",
"air_quality": "good",
"noise_level": 2,
"last_updated": datetime.now().isoformat()
}
async def refresh_all_data(self) -> Dict[str, str]:
"""Refresh all cached data."""
self.cache.clear()
self.last_refresh = datetime.now()
return {
"green_spaces": "refreshed",
"amenities": "refreshed",
"neighborhoods": "refreshed",
"timestamp": self.last_refresh.isoformat()
}
def summarize_amenities(self, amenities: List[Amenity]) -> Dict[str, int]:
"""Summarize amenities by type."""
summary = {}
for amenity in amenities:
amenity_type = amenity.type.value
summary[amenity_type] = summary.get(amenity_type, 0) + 1
return summary
def _load_toilets(self) -> List[Dict]:
"""Load toilets data from JSON file"""
if self._toilets_cache is None:
toilets_file = self.data_dir / "processed" / "toilets.json"
if toilets_file.exists():
with open(toilets_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self._toilets_cache = data.get("toilets", [])
else:
print("Warning: toilets.json not found. Run process_toilet_csv.py first.")
self._toilets_cache = []
return self._toilets_cache
async def get_toilets_near_point(
self,
lat: float,
lng: float,
radius: int = 500
) -> List[Dict]:
"""Get toilets near a point for picnic scoring"""
toilets = self._load_toilets()
nearby_toilets = []
for toilet in toilets:
# Skip toilets with invalid coordinates
toilet_lat = toilet.get('lat')
toilet_lng = toilet.get('lng')
if toilet_lat is None or toilet_lng is None:
continue
# Check if coordinates are valid numbers
try:
toilet_lat = float(toilet_lat)
toilet_lng = float(toilet_lng)
# Check for NaN or infinite values
if not (isinstance(toilet_lat, (int, float)) and isinstance(toilet_lng, (int, float))):
continue
if toilet_lat != toilet_lat or toilet_lng != toilet_lng: # NaN check
continue
if abs(toilet_lat) > 90 or abs(toilet_lng) > 180: # Invalid coordinate range
continue
except (ValueError, TypeError):
continue
distance = geodesic((lat, lng), (toilet_lat, toilet_lng)).meters
if distance <= radius:
toilet_copy = toilet.copy()
toilet_copy['distance_meters'] = int(distance)
nearby_toilets.append(toilet_copy)
return sorted(nearby_toilets, key=lambda x: x['distance_meters'])
async def _get_mock_green_spaces(self) -> List[GreenSpace]:
"""Get mock green spaces data for development."""
# This would be replaced with real data fetching in production
return [
GreenSpace(
id="tiergarten_1",
name="Tiergarten",
description="Berlin's most famous park in the heart of the city",
type=GreenSpaceType.PARK,
coordinates=Coordinates(lat=52.5145, lng=13.3501),
neighborhood="Mitte",
address="Tiergarten, 10557 Berlin",
area_sqm=210000,
perimeter_m=5800,
environmental=EnvironmentalFeatures(
tree_coverage_percent=85,
shade_quality=90,
noise_level=NoiseLevel.MODERATE,
wildlife_diversity_score=80,
water_features=True,
natural_surface_percent=95
),
accessibility=AccessibilityFeatures(
wheelchair_accessible=True,
public_transport_score=5,
cycling_infrastructure=True,
parking_availability=3,
lighting_quality=4
),
recreation=RecreationFeatures(
playground_quality=70,
sports_facilities=True,
running_paths=True,
cycling_paths=True,
dog_friendly=True,
bbq_allowed=False
),
nearby_amenities=[],
last_updated=datetime.now(),
data_sources=["berlin_open_data", "osm"],
confidence_score=95
),
GreenSpace(
id="volkspark_friedrichshain",
name="Volkspark Friedrichshain",
description="Historic park with fairy tale fountain and sports facilities",
type=GreenSpaceType.PARK,
coordinates=Coordinates(lat=52.5263, lng=13.4317),
neighborhood="Friedrichshain",
address="Friedrichshain, 10249 Berlin",
area_sqm=49000,
perimeter_m=2800,
environmental=EnvironmentalFeatures(
tree_coverage_percent=70,
shade_quality=75,
noise_level=NoiseLevel.QUIET,
wildlife_diversity_score=65,
water_features=True,
natural_surface_percent=80
),
accessibility=AccessibilityFeatures(
wheelchair_accessible=True,
public_transport_score=4,
cycling_infrastructure=True,
parking_availability=2,
lighting_quality=3
),
recreation=RecreationFeatures(
playground_quality=85,
sports_facilities=True,
running_paths=True,
cycling_paths=True,
dog_friendly=True,
bbq_allowed=True
),
nearby_amenities=[],
last_updated=datetime.now(),
data_sources=["berlin_open_data", "osm"],
confidence_score=90
),
GreenSpace(
id="tempelhofer_feld",
name="Tempelhofer Feld",
description="Former airport turned into unique urban park",
type=GreenSpaceType.PARK,
coordinates=Coordinates(lat=52.4732, lng=13.4015),
neighborhood="Tempelhof",
address="Tempelhofer Damm, 12101 Berlin",
area_sqm=300000,
perimeter_m=6200,
environmental=EnvironmentalFeatures(
tree_coverage_percent=15,
shade_quality=20,
noise_level=NoiseLevel.MODERATE,
wildlife_diversity_score=40,
water_features=False,
natural_surface_percent=60
),
accessibility=AccessibilityFeatures(
wheelchair_accessible=True,
public_transport_score=4,
cycling_infrastructure=True,
parking_availability=4,
lighting_quality=2
),
recreation=RecreationFeatures(
playground_quality=30,
sports_facilities=False,
running_paths=True,
cycling_paths=True,
dog_friendly=True,
bbq_allowed=True
),
nearby_amenities=[],
last_updated=datetime.now(),
data_sources=["berlin_open_data", "osm"],
confidence_score=85
)
]

View File

@ -0,0 +1,11 @@
"""
Data integration layer for Berlin Picnic API.
This module provides integration with various data sources:
- Berlin Open Data Portal
- OpenStreetMap (future integration)
"""
from .berlin_open_data import BerlinOpenDataClient
__all__ = ["BerlinOpenDataClient"]

View File

@ -0,0 +1,400 @@
"""
Berlin Open Data integration client.
This module handles fetching and processing data from Berlin's Open Data Portal.
"""
import httpx
import json
from typing import List, Dict, Any, Optional
from pathlib import Path
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class BerlinOpenDataClient:
"""Client for fetching data from Berlin Open Data Portal."""
def __init__(self):
# Updated URLs based on user specification
self.green_spaces_wfs = "https://fbinter.stadt-berlin.de/fb/wfs/data/senstadt/s_gruenanlage"
self.green_spaces_wms = "https://fbinter.stadt-berlin.de/fb/wms/senstadt/k_gruenanlage"
self.base_url = "https://fbinter.stadt-berlin.de/fb/wfs/data/senstadt"
self.data_dir = Path("app/data")
self.timeout = 30.0
# Parameters for WFS request as specified by user
self.green_spaces_params = {
"service": "WFS",
"version": "2.0.0",
"request": "GetFeature",
"typeName": "fis:s_gruenanlage",
"outputFormat": "application/json",
"srsName": "EPSG:4326" # WGS84 coordinate system
}
# Create data directories
self.raw_dir = self.data_dir / "raw"
self.processed_dir = self.data_dir / "processed"
self.raw_dir.mkdir(parents=True, exist_ok=True)
self.processed_dir.mkdir(parents=True, exist_ok=True)
async def fetch_green_spaces(self) -> List[Dict[str, Any]]:
"""
Fetch green spaces from Berlin WFS service using the specified s_gruenanlage endpoint.
Returns:
List of GeoJSON features representing green spaces
Raises:
httpx.HTTPError: If the request fails
ValueError: If the response format is invalid
"""
logger.info("Fetching green spaces from Berlin Open Data (s_gruenanlage)...")
# Primary endpoint using user-specified parameters
primary_endpoint = {
'url': self.green_spaces_wfs,
'params': self.green_spaces_params.copy()
}
# Fallback endpoints with alternative parameters
fallback_endpoints = [
{
'url': self.green_spaces_wfs,
'params': {
'service': 'WFS',
'version': '1.1.0',
'request': 'GetFeature',
'typeName': 'fis:s_gruenanlage',
'outputFormat': 'application/json',
'srsName': 'EPSG:4326'
}
},
{
'url': self.green_spaces_wfs,
'params': {
'service': 'WFS',
'version': '2.0.0',
'request': 'GetFeature',
'typeNames': 'fis:s_gruenanlage',
'outputFormat': 'application/json',
'srsName': 'EPSG:4326'
}
},
{
'url': self.green_spaces_wfs,
'params': {
'service': 'WFS',
'version': '1.1.0',
'request': 'GetFeature',
'typeName': 's_gruenanlage',
'outputFormat': 'application/json',
'srsName': 'EPSG:4326'
}
}
]
# Combine primary and fallback endpoints
endpoints_to_try = [primary_endpoint] + fallback_endpoints
last_error = None
for i, endpoint in enumerate(endpoints_to_try):
try:
url = endpoint['url']
params = endpoint['params']
logger.info(f"Trying endpoint {i+1}/{len(endpoints_to_try)}: {url}")
logger.debug(f"Parameters: {params}")
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(url, params=params)
# Log response details for debugging
logger.debug(f"Response status: {response.status_code}")
logger.debug(f"Response headers: {dict(response.headers)}")
if response.status_code == 200:
# Parse JSON response
data = response.json()
# Validate response structure
if 'features' in data:
features = data['features']
logger.info(f"Successfully fetched {len(features)} green spaces using endpoint {i+1}")
# Save raw data for debugging/backup
await self._save_raw_data(data, "berlin_green_spaces_gruenanlage.geojson")
return features
else:
logger.warning(f"Endpoint {i+1} returned data without 'features' field")
# Log the response structure for debugging
logger.debug(f"Response keys: {list(data.keys()) if isinstance(data, dict) else 'Not a dict'}")
continue
else:
logger.warning(f"Endpoint {i+1} returned status {response.status_code}")
# Try to get error details
try:
error_text = response.text[:500] # First 500 chars
logger.debug(f"Error response: {error_text}")
except:
pass
continue
except Exception as e:
logger.warning(f"Endpoint {i+1} failed: {e}")
last_error = e
continue
# If we get here, all endpoints failed
if last_error:
raise last_error
else:
raise ValueError("All WFS endpoints failed to return valid data")
def process_green_space_feature(self, feature: Dict) -> Optional[Dict[str, Any]]:
"""
Process a single green space feature into our standardized format.
Args:
feature: GeoJSON feature from Berlin Open Data
Returns:
Processed green space data or None if processing fails
"""
try:
properties = feature.get('properties', {})
geometry = feature.get('geometry', {})
# Skip features without essential data
if not properties.get('gruenanlage') or not geometry:
logger.warning(f"Skipping feature with missing essential data: {properties.get('gml_id', 'unknown')}")
return None
# Extract coordinates (centroid for polygon)
coords = self._extract_centroid(geometry)
if not coords:
logger.warning(f"Could not extract coordinates for feature: {properties.get('gml_id', 'unknown')}")
return None
# Calculate area in square meters
area_ha = properties.get('flaeche_ha')
area_sqm = 0
if area_ha:
try:
area_sqm = int(float(area_ha) * 10000)
except (ValueError, TypeError):
logger.warning(f"Invalid area value for feature {properties.get('gml_id')}: {area_ha}")
# Clean and validate name
name = str(properties.get('gruenanlage', 'Unnamed Green Space')).strip()
if not name or name.lower() in ['null', 'none', '']:
name = 'Unnamed Green Space'
# Clean district and sub-district names
district = str(properties.get('bezirk', '')).strip()
sub_district = str(properties.get('ortsteil', '')).strip()
# Normalize category
category = str(properties.get('kategorie', 'park')).strip().lower()
processed_data = {
'id': f"berlin_{properties.get('gml_id', 'unknown')}",
'name': name,
'district': district,
'sub_district': sub_district,
'category': category,
'area_sqm': area_sqm,
'coordinates': coords,
'geometry': geometry, # Keep full geometry for future spatial operations
'data_source': 'berlin_open_data',
'last_updated': datetime.now().isoformat(),
'raw_properties': properties # Keep all original data for debugging
}
return processed_data
except Exception as e:
logger.error(f"Error processing green space feature: {e}")
return None
def _extract_centroid(self, geometry: Dict) -> Optional[Dict[str, float]]:
"""
Extract centroid coordinates from GeoJSON geometry.
Args:
geometry: GeoJSON geometry object
Returns:
Dictionary with 'lat' and 'lng' keys or None if extraction fails
"""
try:
geom_type = geometry.get('type')
coordinates = geometry.get('coordinates')
if not coordinates:
return None
if geom_type == 'Polygon':
# For polygon, use centroid of outer ring
outer_ring = coordinates[0]
if len(outer_ring) < 3:
return None
# Calculate centroid
lats = [coord[1] for coord in outer_ring if len(coord) >= 2]
lngs = [coord[0] for coord in outer_ring if len(coord) >= 2]
if not lats or not lngs:
return None
return {
'lat': sum(lats) / len(lats),
'lng': sum(lngs) / len(lngs)
}
elif geom_type == 'Point':
if len(coordinates) >= 2:
return {
'lat': coordinates[1],
'lng': coordinates[0]
}
elif geom_type == 'MultiPolygon':
# For multipolygon, use centroid of first polygon
if coordinates and len(coordinates) > 0:
first_polygon = coordinates[0]
if first_polygon and len(first_polygon) > 0:
outer_ring = first_polygon[0]
lats = [coord[1] for coord in outer_ring if len(coord) >= 2]
lngs = [coord[0] for coord in outer_ring if len(coord) >= 2]
if lats and lngs:
return {
'lat': sum(lats) / len(lats),
'lng': sum(lngs) / len(lngs)
}
# Fallback: return None for unsupported geometry types
logger.warning(f"Unsupported geometry type: {geom_type}")
return None
except Exception as e:
logger.error(f"Error extracting centroid: {e}")
return None
async def _save_raw_data(self, data: Dict, filename: str) -> None:
"""
Save raw data to file for backup/debugging.
Args:
data: Raw data to save
filename: Name of the file to save to
"""
try:
raw_file = self.raw_dir / filename
with open(raw_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.debug(f"Saved raw data to {raw_file}")
except Exception as e:
logger.warning(f"Failed to save raw data: {e}")
def validate_coordinates(self, lat: float, lng: float) -> bool:
"""
Validate that coordinates are within Berlin bounds.
Args:
lat: Latitude
lng: Longitude
Returns:
True if coordinates are within Berlin bounds
"""
# Berlin approximate bounds
BERLIN_BOUNDS = {
'lat_min': 52.3,
'lat_max': 52.7,
'lng_min': 13.0,
'lng_max': 13.8
}
return (
BERLIN_BOUNDS['lat_min'] <= lat <= BERLIN_BOUNDS['lat_max'] and
BERLIN_BOUNDS['lng_min'] <= lng <= BERLIN_BOUNDS['lng_max']
)
async def process_and_save_green_spaces(self) -> Dict[str, Any]:
"""
Fetch, process, and save green spaces data.
Returns:
Summary of processing results
"""
logger.info("Starting green spaces data processing...")
try:
# Fetch raw data
raw_features = await self.fetch_green_spaces()
# Process features
processed_parks = []
skipped_count = 0
invalid_coords_count = 0
for feature in raw_features:
processed_park = self.process_green_space_feature(feature)
if processed_park is None:
skipped_count += 1
continue
# Validate coordinates
coords = processed_park['coordinates']
if not self.validate_coordinates(coords['lat'], coords['lng']):
invalid_coords_count += 1
logger.warning(f"Invalid coordinates for park {processed_park['name']}: {coords}")
continue
processed_parks.append(processed_park)
# Save processed data
output_data = {
'parks': processed_parks,
'total_count': len(processed_parks),
'data_source': 'berlin_open_data',
'last_updated': datetime.now().isoformat(),
'processing_stats': {
'raw_features': len(raw_features),
'processed_parks': len(processed_parks),
'skipped_features': skipped_count,
'invalid_coordinates': invalid_coords_count
}
}
processed_file = self.processed_dir / "parks.json"
with open(processed_file, 'w', encoding='utf-8') as f:
json.dump(output_data, f, ensure_ascii=False, indent=2)
logger.info(f"Successfully processed {len(processed_parks)} parks")
logger.info(f"Skipped {skipped_count} features, {invalid_coords_count} had invalid coordinates")
return output_data
except Exception as e:
logger.error(f"Error in process_and_save_green_spaces: {e}")
raise
# Convenience function for easy usage
async def fetch_and_process_berlin_green_spaces() -> Dict[str, Any]:
"""
Convenience function to fetch and process Berlin green spaces.
Returns:
Processing results summary
"""
client = BerlinOpenDataClient()
return await client.process_and_save_green_spaces()

View File

@ -0,0 +1,233 @@
# app/services/green_space_service.py
from typing import List, Optional, Tuple, Dict, Any
from app.models.green_space import GreenSpace, ScoringRequest
from app.models.response import DetailedAnalysis, DiscoveryResult
from app.services.scoring_engine import ScoringEngine
from app.services.berlin_data_service import BerlinDataService
class GreenSpaceService:
"""Service for managing green space operations and scoring."""
def __init__(self):
self.scoring_engine = ScoringEngine()
self.berlin_data = BerlinDataService()
async def search_and_score(self, request: ScoringRequest) -> List[GreenSpace]:
"""Search for green spaces and score them based on personality preferences."""
try:
# Get green spaces based on search criteria
green_spaces = await self.berlin_data.search_green_spaces(
location=request.location,
radius=request.radius,
neighborhood=request.neighborhood,
filters=request.filters
)
# Score each green space for the personality
scored_spaces = []
for space in green_spaces:
personality_score = await self.scoring_engine.score_green_space(
space, request.personality, request.location
)
# Only include if meets minimum score
if personality_score.score >= request.min_score:
space.current_personality_score = personality_score
# Include amenities if requested
if request.include_amenities:
space.nearby_amenities = await self.berlin_data.get_amenities_near_point(
space.coordinates.lat,
space.coordinates.lng,
500 # 500m radius for amenities
)
scored_spaces.append(space)
# Sort by score (highest first) and limit results
scored_spaces.sort(
key=lambda x: x.current_personality_score.score if x.current_personality_score else 0,
reverse=True
)
return scored_spaces[:request.limit]
except Exception as e:
raise Exception(f"Failed to search and score green spaces: {str(e)}")
async def discover_nearby(
self,
lat: float,
lng: float,
max_distance: int,
personality: str
) -> List[DiscoveryResult]:
"""Discover highly-rated green spaces near a location."""
try:
# Get nearby green spaces
nearby_spaces = await self.berlin_data.get_green_spaces_within_radius(
lat, lng, max_distance
)
discoveries = []
for space in nearby_spaces:
# Score the space
personality_score = await self.scoring_engine.score_green_space(
space, personality, (lat, lng)
)
# Only include high-scoring spaces
if personality_score.score >= 70:
space.current_personality_score = personality_score
# Calculate travel info
distance = await self.berlin_data.calculate_distance(
lat, lng, space.coordinates.lat, space.coordinates.lng
)
discovery = DiscoveryResult(
green_space=space,
distance_meters=distance,
travel_time_walking=max(5, distance // 80), # ~5 km/h walking speed
travel_time_cycling=max(2, distance // 250), # ~15 km/h cycling speed
why_recommended=personality_score.explanation,
best_route_description=await self._get_route_description(
lat, lng, space.coordinates.lat, space.coordinates.lng
)
)
discoveries.append(discovery)
# Sort by score and distance
discoveries.sort(
key=lambda x: (x.green_space.current_personality_score.score, -x.distance_meters),
reverse=True
)
return discoveries[:10] # Top 10 discoveries
except Exception as e:
raise Exception(f"Failed to discover green spaces: {str(e)}")
async def get_detailed_analysis(
self,
space_id: str,
focus_personality: Optional[str] = None
) -> Optional[DetailedAnalysis]:
"""Get detailed analysis of a specific green space."""
try:
# Get the green space
green_space = await self.berlin_data.get_green_space_by_id(space_id)
if not green_space:
return None
# Score for all personalities or just the focused one
personality_breakdown = {}
personalities = [focus_personality] if focus_personality else [
"little_adventurers", "date_night", "squad_goals", "zen_masters",
"active_lifestyle", "wildlife_lover", "art_nerd", "history_geek"
]
for personality in personalities:
if personality: # Skip None values
score = await self.scoring_engine.score_green_space(
green_space, personality
)
personality_breakdown[personality] = score
# Find best locations within the space
best_locations = await self.scoring_engine.find_best_locations_within(
green_space, focus_personality or "zen_masters"
)
# Generate seasonal recommendations
seasonal_recommendations = self._generate_seasonal_recommendations(
green_space, focus_personality
)
# Find similar spaces
similar_spaces = await self.berlin_data.find_similar_green_spaces(
green_space, limit=5
)
return DetailedAnalysis(
green_space=green_space,
personality_breakdown=personality_breakdown,
best_locations_within=best_locations,
seasonal_recommendations=seasonal_recommendations,
optimal_visit_times=self._get_optimal_visit_times(green_space),
similar_spaces=[space.id for space in similar_spaces]
)
except Exception as e:
raise Exception(f"Failed to analyze green space: {str(e)}")
def _generate_seasonal_recommendations(
self,
green_space: GreenSpace,
personality: Optional[str]
) -> Dict[str, str]:
"""Generate seasonal visit recommendations."""
recommendations = {}
# Base recommendations on green space features
if green_space.environmental.water_features:
recommendations["summer"] = "Perfect for cooling off by the water features"
recommendations["spring"] = "Beautiful reflections in the water as nature awakens"
if green_space.environmental.tree_coverage_percent > 70:
recommendations["autumn"] = "Stunning fall foliage display"
recommendations["spring"] = "Fresh green canopy and blooming trees"
if green_space.recreation.sports_facilities:
recommendations["summer"] = "Great weather for outdoor sports and activities"
recommendations["winter"] = "Crisp air perfect for winter sports if available"
# Fill in any missing seasons with generic recommendations
seasons = ["spring", "summer", "autumn", "winter"]
for season in seasons:
if season not in recommendations:
recommendations[season] = f"Enjoy the peaceful {season} atmosphere"
return recommendations
def _get_optimal_visit_times(self, green_space: GreenSpace) -> List[str]:
"""Get optimal visit times based on green space characteristics."""
times = []
if green_space.environmental.noise_level.value <= 2:
times.append("Early morning (7-9 AM) for peaceful solitude")
if green_space.recreation.playground_quality > 50:
times.append("Mid-morning (9-11 AM) for family activities")
if green_space.accessibility.lighting_quality >= 4:
times.append("Evening (6-8 PM) for romantic walks")
if green_space.recreation.sports_facilities:
times.append("Late afternoon (4-6 PM) for sports activities")
# Always include at least one recommendation
if not times:
times.append("Midday (11 AM - 2 PM) for general enjoyment")
return times
async def _get_route_description(
self,
from_lat: float,
from_lng: float,
to_lat: float,
to_lng: float
) -> str:
"""Get a simple route description."""
# This is a simplified version - in a real app you'd use a routing service
distance = await self.berlin_data.calculate_distance(from_lat, from_lng, to_lat, to_lng)
if distance < 500:
return "Short walk through the neighborhood"
elif distance < 1500:
return "Pleasant walk or quick bike ride"
elif distance < 3000:
return "Good cycling distance or longer walk"
else:
return "Best reached by bike or public transport"

View File

@ -0,0 +1,625 @@
# app/services/scoring_engine.py
from typing import Any, Dict, List, Optional, Tuple
import math
from geopy.distance import geodesic
from app.models.green_space import (
GreenSpace, PersonalityScore, LocationScore,
Coordinates, Amenity, AmenityType
)
from app.services.berlin_data_service import BerlinDataService
class ScoringEngine:
"""Dynamic scoring engine for green spaces based on personality preferences."""
def __init__(self):
self.berlin_data = BerlinDataService()
self.personality_weights = self._initialize_personality_weights()
def _initialize_personality_weights(self) -> Dict[str, Dict[str, float]]:
"""Define scoring weights for each personality type."""
return {
"little_adventurers": {
"playground_quality": 25,
"safety_score": 20,
"toilet_proximity": 15,
"shade_quality": 10,
"noise_level": 10, # Lower noise = higher score
"family_amenities": 15,
"accessibility": 5
},
"date_night": {
"romantic_atmosphere": 25,
"noise_level": 20, # Quiet = romantic
"scenic_beauty": 20,
"restaurant_proximity": 15,
"privacy_score": 10,
"lighting_quality": 10
},
"squad_goals": {
"space_size": 25,
"group_amenities": 20,
"accessibility": 15,
"food_options": 15,
"parking_availability": 10,
"activity_options": 15
},
"zen_masters": {
"quietness": 30,
"nature_immersion": 25,
"crowd_density": 15, # Lower crowds = higher score
"water_features": 10,
"meditation_spots": 10,
"air_quality": 10
},
"active_lifestyle": {
"fitness_facilities": 25,
"running_cycling_paths": 25,
"sports_areas": 20,
"accessibility": 15,
"terrain_variety": 10,
"safety_score": 5
},
"wildlife_lover": {
"wildlife_diversity": 30,
"natural_habitat": 25,
"water_features": 15,
"tree_coverage": 15,
"noise_level": 10, # Quiet for animals
"observation_spots": 5
},
"art_nerd": {
"cultural_proximity": 30,
"artistic_features": 20,
"museum_accessibility": 20,
"creative_atmosphere": 15,
"cafe_culture": 10,
"inspiring_views": 5
},
"history_geek": {
"historical_significance": 35,
"monument_proximity": 25,
"educational_value": 20,
"architectural_features": 10,
"cultural_context": 10
}
}
async def score_green_space(
self,
green_space: GreenSpace,
personality: str,
user_location: Optional[Tuple[float, float]] = None
) -> PersonalityScore:
"""Score a green space for a specific personality type."""
weights = self.personality_weights.get(personality, {})
if not weights:
raise ValueError(f"Unknown personality type: {personality}")
# Calculate component scores
component_scores = await self._calculate_component_scores(
green_space, personality, user_location
)
# Calculate weighted final score
final_score = 0
explanation_parts = []
key_factors = []
for component, weight in weights.items():
if component in component_scores:
component_score = component_scores[component]
weighted_score = (component_score * weight) / 100
final_score += weighted_score
if component_score > 70: # Highlight strong factors
key_factors.append(component)
explanation_parts.append(
f"{component.replace('_', ' ')}: {component_score}/100"
)
# Generate recommendations
recommendations = self._generate_recommendations(
green_space, personality, component_scores
)
return PersonalityScore(
personality=personality,
score=min(100, max(0, int(final_score))),
explanation=self._create_explanation(personality, explanation_parts),
key_factors=key_factors,
recommendations=recommendations
)
async def _calculate_component_scores(
self,
green_space: GreenSpace,
personality: str,
user_location: Optional[Tuple[float, float]] = None
) -> Dict[str, int]:
"""Calculate individual component scores."""
scores = {}
# Universal components
scores["accessibility"] = self._score_accessibility(green_space, user_location)
scores["safety_score"] = self._score_safety(green_space)
scores["noise_level"] = self._score_noise_level(green_space)
# Personality-specific components
if personality == "little_adventurers":
scores["playground_quality"] = green_space.recreation.playground_quality
scores["shade_quality"] = green_space.environmental.shade_quality
scores["toilet_proximity"] = await self._score_toilet_proximity(green_space)
scores["family_amenities"] = await self._score_family_amenities(green_space)
elif personality == "date_night":
scores["romantic_atmosphere"] = await self._score_romantic_atmosphere(green_space)
scores["scenic_beauty"] = self._score_scenic_beauty(green_space)
scores["restaurant_proximity"] = await self._score_restaurant_proximity(green_space)
scores["privacy_score"] = self._score_privacy(green_space)
scores["lighting_quality"] = green_space.accessibility.lighting_quality * 20
elif personality == "squad_goals":
scores["space_size"] = self._score_space_size(green_space)
scores["group_amenities"] = await self._score_group_amenities(green_space)
scores["food_options"] = await self._score_food_options(green_space)
scores["parking_availability"] = green_space.accessibility.parking_availability * 20
scores["activity_options"] = self._score_activity_options(green_space)
elif personality == "zen_masters":
scores["quietness"] = self._score_quietness(green_space)
scores["nature_immersion"] = self._score_nature_immersion(green_space)
scores["crowd_density"] = await self._score_crowd_density(green_space)
scores["water_features"] = 100 if green_space.environmental.water_features else 0
scores["meditation_spots"] = self._score_meditation_spots(green_space)
scores["air_quality"] = await self._score_air_quality(green_space)
elif personality == "active_lifestyle":
scores["fitness_facilities"] = 100 if green_space.recreation.sports_facilities else 0
scores["running_cycling_paths"] = self._score_running_cycling_paths(green_space)
scores["sports_areas"] = self._score_sports_areas(green_space)
scores["terrain_variety"] = self._score_terrain_variety(green_space)
elif personality == "wildlife_lover":
scores["wildlife_diversity"] = green_space.environmental.wildlife_diversity_score
scores["natural_habitat"] = self._score_natural_habitat(green_space)
scores["water_features"] = 100 if green_space.environmental.water_features else 0
scores["tree_coverage"] = green_space.environmental.tree_coverage_percent
scores["observation_spots"] = self._score_observation_spots(green_space)
elif personality == "art_nerd":
scores["cultural_proximity"] = await self._score_cultural_proximity(green_space)
scores["artistic_features"] = await self._score_artistic_features(green_space)
scores["museum_accessibility"] = await self._score_museum_accessibility(green_space)
scores["creative_atmosphere"] = self._score_creative_atmosphere(green_space)
scores["cafe_culture"] = await self._score_cafe_culture(green_space)
scores["inspiring_views"] = self._score_inspiring_views(green_space)
elif personality == "history_geek":
scores["historical_significance"] = await self._score_historical_significance(green_space)
scores["monument_proximity"] = await self._score_monument_proximity(green_space)
scores["educational_value"] = self._score_educational_value(green_space)
scores["architectural_features"] = self._score_architectural_features(green_space)
scores["cultural_context"] = await self._score_cultural_context(green_space)
return scores
# === COMPONENT SCORING METHODS ===
def _score_accessibility(self, green_space: GreenSpace, user_location: Optional[Tuple[float, float]]) -> int:
"""Score overall accessibility."""
score = green_space.accessibility.public_transport_score * 20
if green_space.accessibility.wheelchair_accessible:
score += 20
if user_location:
distance = geodesic(
user_location,
(green_space.coordinates.lat, green_space.coordinates.lng)
).meters
# Score based on distance (closer = better)
if distance <= 500:
score += 20
elif distance <= 1000:
score += 15
elif distance <= 2000:
score += 10
else:
score += 5
return min(100, score)
def _score_safety(self, green_space: GreenSpace) -> int:
"""Score safety based on lighting and accessibility."""
score = green_space.accessibility.lighting_quality * 15
if green_space.accessibility.wheelchair_accessible:
score += 20
score += green_space.accessibility.public_transport_score * 10
return min(100, score)
def _score_noise_level(self, green_space: GreenSpace) -> int:
"""Score based on noise level (lower noise = higher score)."""
return (6 - green_space.environmental.noise_level.value) * 20
async def _score_toilet_proximity(self, green_space: GreenSpace) -> int:
"""Score based on toilet proximity."""
toilets = [a for a in green_space.nearby_amenities if a.type == AmenityType.TOILET]
if not toilets:
return 20 # Assume some basic facilities
closest_distance = min(toilet.distance_meters for toilet in toilets)
if closest_distance <= 100:
return 100
elif closest_distance <= 300:
return 80
elif closest_distance <= 500:
return 60
else:
return 40
async def _score_family_amenities(self, green_space: GreenSpace) -> int:
"""Score family-friendly amenities."""
score = green_space.recreation.playground_quality
if green_space.environmental.shade_quality > 70:
score += 20
if green_space.accessibility.wheelchair_accessible:
score += 10
return min(100, score)
async def _score_romantic_atmosphere(self, green_space: GreenSpace) -> int:
"""Score romantic atmosphere."""
score = 0
if green_space.environmental.water_features:
score += 30
if green_space.environmental.noise_level.value <= 2:
score += 25
score += green_space.environmental.tree_coverage_percent // 2
score += green_space.accessibility.lighting_quality * 5
return min(100, score)
def _score_scenic_beauty(self, green_space: GreenSpace) -> int:
"""Score scenic beauty."""
score = green_space.environmental.tree_coverage_percent
if green_space.environmental.water_features:
score += 20
score += green_space.environmental.shade_quality // 2
return min(100, score)
async def _score_restaurant_proximity(self, green_space: GreenSpace) -> int:
"""Score restaurant proximity."""
restaurants = [a for a in green_space.nearby_amenities
if a.type in [AmenityType.RESTAURANT, AmenityType.CAFE]]
if not restaurants:
return 30
closest_distance = min(r.distance_meters for r in restaurants)
if closest_distance <= 200:
return 100
elif closest_distance <= 500:
return 80
else:
return 60
def _score_privacy(self, green_space: GreenSpace) -> int:
"""Score privacy based on size and tree coverage."""
score = 0
if green_space.area_sqm and green_space.area_sqm > 50000:
score += 40
elif green_space.area_sqm and green_space.area_sqm > 10000:
score += 20
score += green_space.environmental.tree_coverage_percent // 2
return min(100, score)
def _score_space_size(self, green_space: GreenSpace) -> int:
"""Score based on space size for groups."""
if not green_space.area_sqm:
return 50
if green_space.area_sqm > 100000:
return 100
elif green_space.area_sqm > 50000:
return 80
elif green_space.area_sqm > 20000:
return 60
else:
return 40
async def _score_group_amenities(self, green_space: GreenSpace) -> int:
"""Score amenities suitable for groups."""
score = 0
if green_space.recreation.bbq_allowed:
score += 30
if green_space.recreation.sports_facilities:
score += 25
toilets = [a for a in green_space.nearby_amenities if a.type == AmenityType.TOILET]
if toilets:
score += 20
score += green_space.accessibility.parking_availability * 5
return min(100, score)
async def _score_food_options(self, green_space: GreenSpace) -> int:
"""Score food options nearby."""
food_amenities = [a for a in green_space.nearby_amenities
if a.type in [AmenityType.RESTAURANT, AmenityType.CAFE,
AmenityType.ICE_CREAM, AmenityType.SPATI]]
return min(100, len(food_amenities) * 25)
def _score_activity_options(self, green_space: GreenSpace) -> int:
"""Score activity options."""
score = 0
if green_space.recreation.sports_facilities:
score += 30
if green_space.recreation.running_paths:
score += 20
if green_space.recreation.cycling_paths:
score += 20
if green_space.recreation.playground_quality > 0:
score += 15
if green_space.recreation.bbq_allowed:
score += 15
return min(100, score)
def _score_quietness(self, green_space: GreenSpace) -> int:
"""Score quietness for zen seekers."""
return (6 - green_space.environmental.noise_level.value) * 25
def _score_nature_immersion(self, green_space: GreenSpace) -> int:
"""Score nature immersion."""
score = green_space.environmental.tree_coverage_percent
score += green_space.environmental.natural_surface_percent // 2
if green_space.environmental.water_features:
score += 15
return min(100, score)
async def _score_crowd_density(self, green_space: GreenSpace) -> int:
"""Score based on crowd density (lower crowds = higher score)."""
# Mock implementation - in reality would use real-time data
base_score = 70
if green_space.area_sqm and green_space.area_sqm > 100000:
base_score += 20
if green_space.accessibility.public_transport_score <= 3:
base_score += 10
return min(100, base_score)
def _score_meditation_spots(self, green_space: GreenSpace) -> int:
"""Score meditation spot quality."""
score = green_space.environmental.tree_coverage_percent // 2
if green_space.environmental.water_features:
score += 25
if green_space.environmental.noise_level.value <= 2:
score += 25
return min(100, score)
async def _score_air_quality(self, green_space: GreenSpace) -> int:
"""Score air quality."""
score = green_space.environmental.tree_coverage_percent
if green_space.environmental.natural_surface_percent > 80:
score += 20
return min(100, score)
def _score_running_cycling_paths(self, green_space: GreenSpace) -> int:
"""Score running and cycling infrastructure."""
score = 0
if green_space.recreation.running_paths:
score += 50
if green_space.recreation.cycling_paths:
score += 50
return score
def _score_sports_areas(self, green_space: GreenSpace) -> int:
"""Score sports areas."""
return 100 if green_space.recreation.sports_facilities else 30
def _score_terrain_variety(self, green_space: GreenSpace) -> int:
"""Score terrain variety for active users."""
score = 50 # Base score
if green_space.area_sqm and green_space.area_sqm > 50000:
score += 30
if green_space.environmental.natural_surface_percent > 70:
score += 20
return min(100, score)
def _score_natural_habitat(self, green_space: GreenSpace) -> int:
"""Score natural habitat quality."""
score = green_space.environmental.tree_coverage_percent
score += green_space.environmental.natural_surface_percent // 2
if green_space.environmental.water_features:
score += 15
return min(100, score)
def _score_observation_spots(self, green_space: GreenSpace) -> int:
"""Score wildlife observation opportunities."""
score = green_space.environmental.tree_coverage_percent // 2
if green_space.environmental.water_features:
score += 30
if green_space.environmental.noise_level.value <= 2:
score += 20
return min(100, score)
async def _score_cultural_proximity(self, green_space: GreenSpace) -> int:
"""Score proximity to cultural venues."""
# Mock implementation - would check for nearby museums, galleries, etc.
if green_space.neighborhood.lower() in ["mitte", "kreuzberg", "prenzlauer berg"]:
return 80
return 50
async def _score_artistic_features(self, green_space: GreenSpace) -> int:
"""Score artistic features in the space."""
# Mock implementation - would check for sculptures, installations, etc.
return 60 # Base score
async def _score_museum_accessibility(self, green_space: GreenSpace) -> int:
"""Score museum accessibility."""
# Mock implementation
if green_space.neighborhood.lower() == "mitte":
return 90
elif green_space.neighborhood.lower() in ["kreuzberg", "friedrichshain"]:
return 70
return 40
def _score_creative_atmosphere(self, green_space: GreenSpace) -> int:
"""Score creative atmosphere."""
score = 50 # Base score
if green_space.environmental.tree_coverage_percent > 60:
score += 20
if green_space.environmental.water_features:
score += 15
if green_space.accessibility.lighting_quality >= 4:
score += 15
return min(100, score)
async def _score_cafe_culture(self, green_space: GreenSpace) -> int:
"""Score cafe culture nearby."""
cafes = [a for a in green_space.nearby_amenities if a.type == AmenityType.CAFE]
return min(100, len(cafes) * 30 + 40)
def _score_inspiring_views(self, green_space: GreenSpace) -> int:
"""Score inspiring views."""
score = green_space.environmental.tree_coverage_percent // 2
if green_space.environmental.water_features:
score += 25
if green_space.area_sqm and green_space.area_sqm > 50000:
score += 25
return min(100, score)
async def _score_historical_significance(self, green_space: GreenSpace) -> int:
"""Score historical significance."""
# Mock implementation - would check historical databases
historical_neighborhoods = ["mitte", "kreuzberg", "charlottenburg"]
if green_space.neighborhood.lower() in historical_neighborhoods:
return 80
return 40
async def _score_monument_proximity(self, green_space: GreenSpace) -> int:
"""Score proximity to monuments."""
# Mock implementation
if green_space.neighborhood.lower() == "mitte":
return 90
return 50
def _score_educational_value(self, green_space: GreenSpace) -> int:
"""Score educational value."""
score = 50 # Base score
if green_space.environmental.wildlife_diversity_score > 70:
score += 25
if green_space.environmental.water_features:
score += 15
if green_space.area_sqm and green_space.area_sqm > 100000:
score += 10
return min(100, score)
def _score_architectural_features(self, green_space: GreenSpace) -> int:
"""Score architectural features."""
# Mock implementation
return 60
async def _score_cultural_context(self, green_space: GreenSpace) -> int:
"""Score cultural context."""
# Mock implementation
cultural_neighborhoods = ["mitte", "kreuzberg", "prenzlauer berg", "friedrichshain"]
if green_space.neighborhood.lower() in cultural_neighborhoods:
return 75
return 45
def _create_explanation(self, personality: str, explanation_parts: List[str]) -> str:
"""Create a human-readable explanation."""
if not explanation_parts:
return f"This space has moderate appeal for {personality.replace('_', ' ')} personality type."
return f"Great for {personality.replace('_', ' ')}: " + ", ".join(explanation_parts[:3])
def _generate_recommendations(
self,
green_space: GreenSpace,
personality: str,
component_scores: Dict[str, int]
) -> List[str]:
"""Generate personalized recommendations."""
recommendations = []
if personality == "little_adventurers":
if green_space.recreation.playground_quality > 70:
recommendations.append("Perfect playground for kids to explore")
if green_space.environmental.shade_quality > 70:
recommendations.append("Plenty of shade for family comfort")
elif personality == "date_night":
if green_space.environmental.water_features:
recommendations.append("Romantic walks by the water")
if component_scores.get("lighting_quality", 0) > 70:
recommendations.append("Beautiful evening atmosphere")
elif personality == "zen_masters":
if component_scores.get("quietness", 0) > 80:
recommendations.append("Perfect for meditation and reflection")
if green_space.environmental.water_features:
recommendations.append("Peaceful water sounds enhance tranquility")
# Add generic recommendations if none specific
if not recommendations:
recommendations.append("Enjoy the natural beauty of this space")
recommendations.append("Great for relaxation and outdoor activities")
return recommendations
async def score_location(
self,
lat: float,
lng: float,
personality: str,
radius: int
) -> Dict[str, Any]:
"""Score a specific location."""
# Check if location is in a green space
green_space = await self.berlin_data.get_green_space_at_location(lat, lng)
if not green_space:
return {
"score": 0,
"explanation": "Location is not in a recognized green space",
"location": {"lat": lat, "lng": lng},
"personality": personality
}
# Score the green space
personality_score = await self.score_green_space(green_space, personality, (lat, lng))
return {
"score": personality_score.score,
"explanation": personality_score.explanation,
"location": {"lat": lat, "lng": lng},
"personality": personality,
"green_space": green_space.name,
"recommendations": personality_score.recommendations
}
async def find_best_locations_within(
self,
green_space: GreenSpace,
personality: str
) -> List[LocationScore]:
"""Find best locations within a green space."""
# Mock implementation - in reality would analyze specific spots
locations = []
# Generate a few sample locations within the space
base_lat, base_lng = green_space.coordinates.lat, green_space.coordinates.lng
for i, (offset_lat, offset_lng, description) in enumerate([
(0.001, 0.001, "Northeast corner with mature trees"),
(-0.001, 0.001, "Southeast area near water feature"),
(0.0, -0.001, "Western meadow area")
]):
location = LocationScore(
coordinates=Coordinates(
lat=base_lat + offset_lat,
lng=base_lng + offset_lng
),
score=85 - i * 5, # Decreasing scores
explanation=f"Excellent spot: {description}",
nearby_features=["trees", "seating", "paths"],
best_for=[personality],
recommendations=[f"Perfect for {personality.replace('_', ' ')} activities"]
)
locations.append(location)
return locations

View File

@ -1,7 +1,177 @@
[project]
name = "berlin-picnic-api"
version = "0.1.0"
description = "Add your description here"
description = "API for finding perfect picnic zones in Berlin based on personality"
authors = [
{ name = "Your Name", email = "your.email@example.com" }
]
readme = "README.md"
requires-python = ">=3.11"
dependencies = []
license = { text = "MIT" }
keywords = ["fastapi", "berlin", "picnic", "zones", "personality"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"fastapi>=0.104.0",
"uvicorn[standard]>=0.24.0",
"pydantic>=2.5.0",
"pydantic-settings>=2.1.0",
"pandas>=2.1.0",
"shapely>=2.0.0", # Modern geometric operations
"geojson>=3.1.0", # GeoJSON handling
"geopy>=2.4.0", # Geocoding and distance calculations
"haversine>=2.8.0", # Fast distance calculations
"requests>=2.31.0",
"httpx>=0.25.0",
"python-multipart>=0.0.6",
"python-jose[cryptography]>=3.3.0",
"redis>=5.0.0",
"aiofiles>=23.2.0",
"openpyxl>=3.1.5",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.1.0",
"httpx>=0.25.0", # For testing
"ruff>=0.1.0",
"mypy>=1.7.0",
"pre-commit>=3.5.0",
"black>=23.11.0", # Backup formatter
]
test = [
"pytest>=7.4.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.1.0",
"httpx>=0.25.0",
]
[project.urls]
Homepage = "https://github.com/yourusername/berlin-picnic-api"
Repository = "https://github.com/yourusername/berlin-picnic-api"
Issues = "https://github.com/yourusername/berlin-picnic-api/issues"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["app"]
[tool.uv]
dev-dependencies = [
"pytest>=7.4.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.1.0",
"httpx>=0.25.0",
"ruff>=0.1.0",
"mypy>=1.7.0",
"pre-commit>=3.5.0",
]
# Ruff configuration
[tool.ruff]
target-version = "py311"
line-length = 88
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
"N", # pep8-naming
"S", # bandit
"T20", # flake8-print
"SIM", # flake8-simplify
"TCH", # flake8-type-checking
]
ignore = [
"E501", # line too long, handled by black
"B008", # do not perform function calls in argument defaults
"S101", # use of assert
"T201", # print statements (allowed in scripts)
]
[tool.ruff.per-file-ignores]
"__init__.py" = ["F401"] # Allow unused imports in __init__.py
"tests/**/*" = ["S101", "T201", "ARG", "FBT"] # Allow prints and asserts in tests
"scripts/**/*" = ["T201", "S101"] # Allow prints in scripts
[tool.ruff.isort]
known-first-party = ["app"]
# MyPy configuration
[tool.mypy]
python_version = "3.11"
check_untyped_defs = true
disallow_any_generics = true
disallow_incomplete_defs = true
disallow_untyped_defs = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_unreachable = true
strict_equality = true
[[tool.mypy.overrides]]
module = [
"shapely.*",
"geojson.*",
"geopy.*",
"haversine.*",
"pandas.*",
"redis.*",
]
ignore_missing_imports = true
# Pytest configuration
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = [
"--strict-markers",
"--strict-config",
"--cov=app",
"--cov-report=term-missing",
"--cov-report=html",
"--cov-fail-under=80",
]
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"integration: marks tests as integration tests",
]
# Coverage configuration
[tool.coverage.run]
source = ["app"]
omit = [
"*/tests/*",
"*/venv/*",
"*/__pycache__/*",
"*/migrations/*",
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise AssertionError",
"raise NotImplementedError",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]