Production Best Practices¶
This guide covers essential practices for deploying and operating Python applications with lakeFS in production environments, including performance optimization, security considerations, monitoring, and operational best practices.
Connection Management and Performance¶
Connection Pooling¶
Proper connection management is crucial for production performance and resource utilization.
High-Level SDK Connection Pooling¶
import lakefs
from lakefs.config import Config
# Configure connection pooling
config = Config(
host="https://lakefs.example.com",
access_key_id="your-access-key",
secret_access_key="your-secret-key",
# Connection pool settings
pool_connections=30,
pool_maxsize=30,
max_retries=3,
backoff_factor=0.3
)
# Create client with optimized settings
client = lakefs.Client(config=config)
# Reuse client across your application
class DataService:
def __init__(self):
self.client = client # Reuse the same client instance
def process_data(self, repo_name, branch_name):
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
# Operations use the pooled connections
return branch.objects()
Generated SDK Connection Configuration¶
from lakefs_sdk import Configuration, ApiClient
import urllib3
# Configure connection pooling for Generated SDK
configuration = Configuration(
host="https://lakefs.example.com",
access_token="your-access-token"
)
# Configure urllib3 pool manager
http = urllib3.PoolManager(
num_pools=10,
maxsize=30,
retries=urllib3.Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
)
# Create API client with custom pool manager
api_client = ApiClient(configuration)
api_client.rest_client.pool_manager = http
lakefs-spec Connection Optimization¶
import lakefs_spec
import fsspec
# Configure lakefs-spec with connection pooling
fs = fsspec.filesystem(
'lakefs',
host='https://lakefs.example.com',
access_key_id='your-access-key',
secret_access_key='your-secret-key',
# Connection pool settings
client_kwargs={
'pool_connections': 30,
'pool_maxsize': 30,
'max_retries': 3
}
)
# Use the same filesystem instance across operations
def process_files(file_paths):
for path in file_paths:
with fs.open(path, 'rb') as f:
# Process file using the pooled connection
data = f.read()
yield process_data(data)
Performance Optimization Techniques¶
Batch Operations¶
# High-Level SDK: Efficient batch operations
import lakefs
from concurrent.futures import ThreadPoolExecutor, as_completed
client = lakefs.Client()
repo = client.repository("my-repo")
branch = repo.branch("main")
# Batch upload with threading
def upload_file(file_info):
path, data = file_info
return branch.object(path).upload(data)
files_to_upload = [
("data/file1.csv", data1),
("data/file2.csv", data2),
# ... more files
]
# Use thread pool for concurrent uploads
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(upload_file, file_info)
for file_info in files_to_upload]
for future in as_completed(futures):
try:
result = future.result()
print(f"Upload completed: {result}")
except Exception as e:
print(f"Upload failed: {e}")
Streaming for Large Files¶
# High-Level SDK: Streaming large files
def stream_large_file(repo, branch_name, object_path, local_path):
branch = repo.branch(branch_name)
obj = branch.object(object_path)
# Stream upload for large files
with open(local_path, 'rb') as f:
obj.upload(f, mode='rb')
# Stream download for large files
with obj.reader() as reader:
with open(local_path + '.downloaded', 'wb') as f:
for chunk in reader:
f.write(chunk)
# lakefs-spec: Efficient streaming
import lakefs_spec
def stream_with_lakefs_spec(source_path, dest_path):
# Direct streaming without loading into memory
with fsspec.open(source_path, 'rb') as src:
with fsspec.open(dest_path, 'wb') as dst:
# Stream in chunks
while True:
chunk = src.read(8192) # 8KB chunks
if not chunk:
break
dst.write(chunk)
Caching Strategies¶
import functools
import time
from typing import Dict, Any
# Repository metadata caching
class CachedLakeFSClient:
def __init__(self, client):
self.client = client
self._repo_cache = {}
self._cache_ttl = 300 # 5 minutes
@functools.lru_cache(maxsize=128)
def get_repository_info(self, repo_name: str) -> Dict[str, Any]:
"""Cache repository metadata"""
repo = self.client.repository(repo_name)
return {
'name': repo.id,
'creation_date': repo.creation_date,
'default_branch': repo.default_branch
}
def get_branch_with_cache(self, repo_name: str, branch_name: str):
"""Cache branch objects for reuse"""
cache_key = f"{repo_name}:{branch_name}"
current_time = time.time()
if (cache_key in self._repo_cache and
current_time - self._repo_cache[cache_key]['timestamp'] < self._cache_ttl):
return self._repo_cache[cache_key]['branch']
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
self._repo_cache[cache_key] = {
'branch': branch,
'timestamp': current_time
}
return branch
Security Best Practices¶
Credential Management¶
Environment-Based Configuration¶
import os
import lakefs
from lakefs.config import Config
# Production credential management
def create_secure_client():
# Never hardcode credentials
config = Config(
host=os.environ['LAKEFS_HOST'],
access_key_id=os.environ['LAKEFS_ACCESS_KEY_ID'],
secret_access_key=os.environ['LAKEFS_SECRET_ACCESS_KEY']
)
# Validate configuration
if not all([config.host, config.access_key_id, config.secret_access_key]):
raise ValueError("Missing required lakeFS credentials")
return lakefs.Client(config=config)
# Use AWS Secrets Manager or similar
import boto3
import json
def get_lakefs_credentials_from_secrets():
"""Retrieve credentials from AWS Secrets Manager"""
secrets_client = boto3.client('secretsmanager')
try:
response = secrets_client.get_secret_value(
SecretId='lakefs/production/credentials'
)
credentials = json.loads(response['SecretString'])
return Config(
host=credentials['host'],
access_key_id=credentials['access_key_id'],
secret_access_key=credentials['secret_access_key']
)
except Exception as e:
raise RuntimeError(f"Failed to retrieve credentials: {e}")
SSL/TLS Configuration¶
import ssl
import lakefs
from lakefs.config import Config
# Production SSL configuration
def create_secure_ssl_client():
config = Config(
host="https://lakefs.example.com",
access_key_id=os.environ['LAKEFS_ACCESS_KEY_ID'],
secret_access_key=os.environ['LAKEFS_SECRET_ACCESS_KEY'],
# SSL configuration
verify_ssl=True,
ssl_ca_cert="/path/to/ca-bundle.crt", # Custom CA if needed
cert_file="/path/to/client.crt", # Client certificate
key_file="/path/to/client.key" # Client private key
)
return lakefs.Client(config=config)
# For Generated SDK
from lakefs_sdk import Configuration
import urllib3
def configure_ssl_for_generated_sdk():
configuration = Configuration(
host="https://lakefs.example.com",
access_token="your-token"
)
# Custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
ssl_context.verify_mode = ssl.CERT_REQUIRED
# Configure urllib3 with custom SSL
http = urllib3.PoolManager(
ssl_context=ssl_context,
cert_reqs='CERT_REQUIRED',
ca_certs='/path/to/ca-bundle.crt'
)
return configuration, http
Access Control and Permissions¶
import lakefs
from typing import List, Dict
class SecureLakeFSService:
"""Production service with access control"""
def __init__(self, config):
self.client = lakefs.Client(config)
self.allowed_repositories = self._load_allowed_repositories()
self.user_permissions = self._load_user_permissions()
def _load_allowed_repositories(self) -> List[str]:
"""Load allowed repositories from configuration"""
return os.environ.get('ALLOWED_REPOSITORIES', '').split(',')
def _validate_repository_access(self, repo_name: str, user_id: str):
"""Validate user has access to repository"""
if repo_name not in self.allowed_repositories:
raise PermissionError(f"Access denied to repository: {repo_name}")
user_perms = self.user_permissions.get(user_id, [])
if repo_name not in user_perms:
raise PermissionError(f"User {user_id} lacks access to {repo_name}")
def safe_repository_access(self, repo_name: str, user_id: str):
"""Safely access repository with validation"""
self._validate_repository_access(repo_name, user_id)
return self.client.repository(repo_name)
Deployment Best Practices¶
Containerization¶
Docker Configuration¶
# Production Dockerfile
FROM python:3.11-slim
# Create non-root user
RUN groupadd -r lakefs && useradd -r -g lakefs lakefs
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY --chown=lakefs:lakefs . /app
WORKDIR /app
# Switch to non-root user
USER lakefs
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import lakefs; client = lakefs.Client(); client.repositories.list()" || exit 1
CMD ["python", "app.py"]
Kubernetes Deployment¶
# production-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: lakefs-python-app
spec:
replicas: 3
selector:
matchLabels:
app: lakefs-python-app
template:
metadata:
labels:
app: lakefs-python-app
spec:
containers:
- name: app
image: your-registry/lakefs-python-app:latest
env:
- name: LAKEFS_HOST
valueFrom:
secretKeyRef:
name: lakefs-credentials
key: host
- name: LAKEFS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: lakefs-credentials
key: access_key_id
- name: LAKEFS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: lakefs-credentials
key: secret_access_key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
Configuration Management¶
import os
from dataclasses import dataclass
from typing import Optional
import lakefs
@dataclass
class ProductionConfig:
"""Production configuration management"""
lakefs_host: str
lakefs_access_key_id: str
lakefs_secret_access_key: str
# Performance settings
connection_pool_size: int = 30
max_retries: int = 3
timeout: int = 30
# Security settings
verify_ssl: bool = True
ssl_ca_cert: Optional[str] = None
# Operational settings
log_level: str = "INFO"
metrics_enabled: bool = True
@classmethod
def from_environment(cls):
"""Load configuration from environment variables"""
return cls(
lakefs_host=os.environ['LAKEFS_HOST'],
lakefs_access_key_id=os.environ['LAKEFS_ACCESS_KEY_ID'],
lakefs_secret_access_key=os.environ['LAKEFS_SECRET_ACCESS_KEY'],
connection_pool_size=int(os.environ.get('LAKEFS_POOL_SIZE', '30')),
max_retries=int(os.environ.get('LAKEFS_MAX_RETRIES', '3')),
timeout=int(os.environ.get('LAKEFS_TIMEOUT', '30')),
verify_ssl=os.environ.get('LAKEFS_VERIFY_SSL', 'true').lower() == 'true',
ssl_ca_cert=os.environ.get('LAKEFS_SSL_CA_CERT'),
log_level=os.environ.get('LOG_LEVEL', 'INFO'),
metrics_enabled=os.environ.get('METRICS_ENABLED', 'true').lower() == 'true'
)
def create_client(self) -> lakefs.Client:
"""Create configured lakeFS client"""
config = lakefs.Config(
host=self.lakefs_host,
access_key_id=self.lakefs_access_key_id,
secret_access_key=self.lakefs_secret_access_key,
pool_connections=self.connection_pool_size,
pool_maxsize=self.connection_pool_size,
max_retries=self.max_retries,
verify_ssl=self.verify_ssl,
ssl_ca_cert=self.ssl_ca_cert
)
return lakefs.Client(config=config)
Monitoring and Observability¶
Logging Configuration¶
import logging
import structlog
import lakefs
from pythonjsonlogger import jsonlogger
def setup_production_logging():
"""Configure structured logging for production"""
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Configure standard logging
handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
handler.setFormatter(formatter)
# Set up lakeFS client logging
lakefs_logger = logging.getLogger('lakefs')
lakefs_logger.setLevel(logging.INFO)
lakefs_logger.addHandler(handler)
return structlog.get_logger()
# Usage in application
logger = setup_production_logging()
class LakeFSService:
def __init__(self):
self.client = lakefs.Client()
self.logger = logger.bind(service="lakefs")
def upload_data(self, repo_name, branch_name, path, data):
self.logger.info(
"Starting data upload",
repo=repo_name,
branch=branch_name,
path=path,
size=len(data)
)
try:
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
result = branch.object(path).upload(data)
self.logger.info(
"Data upload completed",
repo=repo_name,
branch=branch_name,
path=path,
checksum=result.checksum
)
return result
except Exception as e:
self.logger.error(
"Data upload failed",
repo=repo_name,
branch=branch_name,
path=path,
error=str(e),
exc_info=True
)
raise
Metrics and Monitoring¶
import time
import functools
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import lakefs
# Prometheus metrics
LAKEFS_OPERATIONS = Counter(
'lakefs_operations_total',
'Total lakeFS operations',
['operation', 'repository', 'status']
)
LAKEFS_OPERATION_DURATION = Histogram(
'lakefs_operation_duration_seconds',
'Duration of lakeFS operations',
['operation', 'repository']
)
LAKEFS_ACTIVE_CONNECTIONS = Gauge(
'lakefs_active_connections',
'Number of active lakeFS connections'
)
def monitor_lakefs_operation(operation_name):
"""Decorator to monitor lakeFS operations"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
repo_name = kwargs.get('repo_name', 'unknown')
start_time = time.time()
LAKEFS_ACTIVE_CONNECTIONS.inc()
try:
result = func(*args, **kwargs)
LAKEFS_OPERATIONS.labels(
operation=operation_name,
repository=repo_name,
status='success'
).inc()
return result
except Exception as e:
LAKEFS_OPERATIONS.labels(
operation=operation_name,
repository=repo_name,
status='error'
).inc()
raise
finally:
duration = time.time() - start_time
LAKEFS_OPERATION_DURATION.labels(
operation=operation_name,
repository=repo_name
).observe(duration)
LAKEFS_ACTIVE_CONNECTIONS.dec()
return wrapper
return decorator
# Usage
class MonitoredLakeFSService:
def __init__(self):
self.client = lakefs.Client()
@monitor_lakefs_operation('upload')
def upload_file(self, repo_name, branch_name, path, data):
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
return branch.object(path).upload(data)
@monitor_lakefs_operation('download')
def download_file(self, repo_name, branch_name, path):
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
return branch.object(path).reader().read()
# Start metrics server
start_http_server(8000)
Health Checks¶
import lakefs
from typing import Dict, Any
import time
class HealthChecker:
"""Production health checking for lakeFS connectivity"""
def __init__(self, client: lakefs.Client):
self.client = client
self.last_check = 0
self.check_interval = 30 # seconds
self.cached_status = None
def check_health(self) -> Dict[str, Any]:
"""Comprehensive health check"""
current_time = time.time()
# Use cached result if recent
if (self.cached_status and
current_time - self.last_check < self.check_interval):
return self.cached_status
health_status = {
'timestamp': current_time,
'status': 'healthy',
'checks': {}
}
# Check basic connectivity
try:
start_time = time.time()
repos = list(self.client.repositories.list(amount=1))
response_time = time.time() - start_time
health_status['checks']['connectivity'] = {
'status': 'pass',
'response_time': response_time
}
except Exception as e:
health_status['status'] = 'unhealthy'
health_status['checks']['connectivity'] = {
'status': 'fail',
'error': str(e)
}
# Check authentication
try:
# Try to access user info or perform authenticated operation
self.client.repositories.list(amount=1)
health_status['checks']['authentication'] = {
'status': 'pass'
}
except Exception as e:
health_status['status'] = 'unhealthy'
health_status['checks']['authentication'] = {
'status': 'fail',
'error': str(e)
}
self.cached_status = health_status
self.last_check = current_time
return health_status
def is_healthy(self) -> bool:
"""Simple boolean health check"""
return self.check_health()['status'] == 'healthy'
# Flask health endpoint example
from flask import Flask, jsonify
app = Flask(__name__)
health_checker = HealthChecker(lakefs.Client())
@app.route('/health')
def health():
health_status = health_checker.check_health()
status_code = 200 if health_status['status'] == 'healthy' else 503
return jsonify(health_status), status_code
@app.route('/ready')
def ready():
# Readiness check - can be more strict than health
if health_checker.is_healthy():
return jsonify({'status': 'ready'}), 200
else:
return jsonify({'status': 'not ready'}), 503
Error Handling and Resilience¶
Retry Strategies¶
import time
import random
from functools import wraps
from typing import Callable, Type, Tuple
import lakefs
def exponential_backoff_retry(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,)
):
"""Exponential backoff retry decorator"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
raise
# Calculate delay with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
logger.warning(
f"Retry attempt {attempt + 1}/{max_retries} for {func.__name__}",
error=str(e),
delay=delay + jitter
)
raise last_exception
return wrapper
return decorator
# Usage
class ResilientLakeFSService:
def __init__(self):
self.client = lakefs.Client()
@exponential_backoff_retry(
max_retries=3,
exceptions=(lakefs.exceptions.ServerException, ConnectionError)
)
def upload_with_retry(self, repo_name, branch_name, path, data):
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
return branch.object(path).upload(data)
Circuit Breaker Pattern¶
import time
from enum import Enum
from typing import Callable, Any
import lakefs
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for lakeFS operations"""
def __init__(
self,
failure_threshold: int = 5,
timeout: float = 60.0,
expected_exception: Type[Exception] = Exception
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset"""
return (time.time() - self.last_failure_time) >= self.timeout
def _on_success(self):
"""Handle successful operation"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed operation"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage
lakefs_circuit_breaker = CircuitBreaker(
failure_threshold=5,
timeout=60.0,
expected_exception=lakefs.exceptions.ServerException
)
class ProtectedLakeFSService:
def __init__(self):
self.client = lakefs.Client()
def safe_upload(self, repo_name, branch_name, path, data):
"""Upload with circuit breaker protection"""
def upload_operation():
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
return branch.object(path).upload(data)
return lakefs_circuit_breaker.call(upload_operation)
Performance Monitoring and Optimization¶
Performance Profiling¶
import cProfile
import pstats
import io
from contextlib import contextmanager
import lakefs
@contextmanager
def profile_lakefs_operation(operation_name: str):
"""Context manager for profiling lakeFS operations"""
profiler = cProfile.Profile()
profiler.enable()
try:
yield
finally:
profiler.disable()
# Analyze results
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s)
ps.sort_stats('cumulative')
ps.print_stats(20) # Top 20 functions
logger.info(
f"Performance profile for {operation_name}",
profile_data=s.getvalue()
)
# Usage
def analyze_performance():
client = lakefs.Client()
repo = client.repository("my-repo")
branch = repo.branch("main")
with profile_lakefs_operation("batch_upload"):
# Perform operations to profile
for i in range(100):
branch.object(f"data/file_{i}.txt").upload(f"data_{i}")
Memory Usage Optimization¶
import gc
import psutil
import os
from typing import Iterator
import lakefs
class MemoryEfficientProcessor:
"""Memory-efficient processing of large datasets"""
def __init__(self, client: lakefs.Client):
self.client = client
self.process = psutil.Process(os.getpid())
def get_memory_usage(self) -> float:
"""Get current memory usage in MB"""
return self.process.memory_info().rss / 1024 / 1024
def process_large_dataset(
self,
repo_name: str,
branch_name: str,
file_paths: list,
chunk_size: int = 1000
) -> Iterator[Any]:
"""Process large dataset in memory-efficient chunks"""
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
initial_memory = self.get_memory_usage()
logger.info(f"Starting processing, initial memory: {initial_memory:.2f} MB")
for i in range(0, len(file_paths), chunk_size):
chunk = file_paths[i:i + chunk_size]
# Process chunk
for file_path in chunk:
obj = branch.object(file_path)
with obj.reader() as reader:
# Process data in streaming fashion
for line in reader:
yield self.process_line(line)
# Force garbage collection after each chunk
gc.collect()
current_memory = self.get_memory_usage()
logger.info(
f"Processed chunk {i//chunk_size + 1}, "
f"memory: {current_memory:.2f} MB, "
f"delta: {current_memory - initial_memory:.2f} MB"
)
def process_line(self, line: bytes) -> Any:
"""Process individual line - implement your logic here"""
return line.decode().strip()
See Also¶
- API Comparison - Choose the right SDK for your use case
- Troubleshooting Guide - Common issues and solutions
- High-Level SDK Advanced Features - Advanced SDK capabilities
- Generated SDK Examples - Direct API usage patterns
- Security Documentation - lakeFS security best practices
- Deployment Guide - lakeFS deployment options