Advanced Features¶
Explore advanced patterns, optimization techniques, and best practices for the High-Level Python SDK. This guide covers error handling, performance optimization, logging, debugging, and production deployment strategies.
Error Handling and Exception Management¶
Complete Exception Hierarchy¶
The High-Level SDK provides a comprehensive exception hierarchy for different error scenarios:
import lakefs
from lakefs.exceptions import (
# Base exceptions
LakeFSException,
ServerException,
# Authentication and authorization
NoAuthenticationFound,
NotAuthorizedException,
ForbiddenException,
PermissionException,
# Resource errors
NotFoundException,
ObjectNotFoundException,
ConflictException,
ObjectExistsException,
# Request errors
BadRequestException,
UnsupportedOperationException,
InvalidRangeException,
# SDK-specific errors
ImportManagerException,
TransactionException,
# Configuration errors
UnsupportedCredentialsProviderType,
InvalidEnvVarFormat
)
Comprehensive Error Handling Patterns¶
def robust_repository_operations(repo_id, storage_namespace):
"""Demonstrate comprehensive error handling for repository operations"""
try:
# Attempt to create repository
repo = lakefs.repository(repo_id).create(
storage_namespace=storage_namespace,
exist_ok=False
)
print(f"Repository created: {repo_id}")
return repo
except ConflictException:
print(f"Repository {repo_id} already exists, connecting to existing")
return lakefs.repository(repo_id)
except NotAuthorizedException:
print("Authentication failed - check credentials")
raise
except ForbiddenException:
print("Operation forbidden - insufficient permissions")
raise
except BadRequestException as e:
print(f"Invalid request parameters: {e}")
raise
except ServerException as e:
print(f"Server error (HTTP {e.status_code}): {e.reason}")
if e.body:
print(f"Error details: {e.body}")
raise
except LakeFSException as e:
print(f"lakeFS SDK error: {e}")
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Usage
try:
repo = robust_repository_operations("my-repo", "s3://my-bucket/repos/my-repo")
except Exception as e:
print(f"Failed to set up repository: {e}")
Object-Level Error Handling¶
def safe_object_operations(branch, operations):
"""Safely perform multiple object operations with detailed error handling"""
results = []
for operation in operations:
op_type = operation["type"]
path = operation["path"]
try:
if op_type == "upload":
obj = branch.object(path).upload(
data=operation["data"],
mode=operation.get("mode", "w")
)
results.append({"path": path, "status": "uploaded", "object": obj})
elif op_type == "download":
obj = branch.object(path)
if not obj.exists():
raise ObjectNotFoundException(404, "Object not found", None)
content = obj.reader().read()
results.append({"path": path, "status": "downloaded", "content": content})
elif op_type == "delete":
obj = branch.object(path)
obj.delete()
results.append({"path": path, "status": "deleted"})
except ObjectNotFoundException:
print(f"Object not found: {path}")
results.append({"path": path, "status": "not_found"})
except ObjectExistsException:
print(f"Object already exists (exclusive mode): {path}")
results.append({"path": path, "status": "exists"})
except PermissionException:
print(f"Permission denied for object: {path}")
results.append({"path": path, "status": "permission_denied"})
except InvalidRangeException:
print(f"Invalid range request for object: {path}")
results.append({"path": path, "status": "invalid_range"})
except Exception as e:
print(f"Unexpected error with object {path}: {e}")
results.append({"path": path, "status": "error", "error": str(e)})
return results
# Usage
operations = [
{"type": "upload", "path": "data/file1.txt", "data": "content1"},
{"type": "upload", "path": "data/file2.txt", "data": "content2", "mode": "x"},
{"type": "download", "path": "data/file1.txt"},
{"type": "delete", "path": "data/old-file.txt"}
]
results = safe_object_operations(branch, operations)
for result in results:
print(f"{result['path']}: {result['status']}")
Custom Exception Handling¶
class DataPipelineException(LakeFSException):
"""Custom exception for data pipeline operations"""
def __init__(self, stage, message, original_exception=None):
self.stage = stage
self.original_exception = original_exception
super().__init__(f"Pipeline failed at stage '{stage}': {message}")
def pipeline_stage_wrapper(stage_name):
"""Decorator for pipeline stages with custom error handling"""
def decorator(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except LakeFSException as e:
raise DataPipelineException(stage_name, str(e), e)
except Exception as e:
raise DataPipelineException(stage_name, f"Unexpected error: {e}", e)
return wrapper
return decorator
@pipeline_stage_wrapper("data_extraction")
def extract_data(branch, source_path):
"""Extract data with error handling"""
obj = branch.object(source_path)
return json.loads(obj.reader().read())
@pipeline_stage_wrapper("data_transformation")
def transform_data(data):
"""Transform data with error handling"""
return [{"id": item["id"], "value": item["value"] * 2} for item in data]
@pipeline_stage_wrapper("data_loading")
def load_data(branch, target_path, data):
"""Load data with error handling"""
branch.object(target_path).upload(
data=json.dumps(data, indent=2),
content_type="application/json"
)
# Usage
try:
data = extract_data(branch, "raw/input.json")
transformed = transform_data(data)
load_data(branch, "processed/output.json", transformed)
print("Pipeline completed successfully")
except DataPipelineException as e:
print(f"Pipeline failed: {e}")
print(f"Failed stage: {e.stage}")
if e.original_exception:
print(f"Original error: {e.original_exception}")
Performance Optimization¶
Advanced Client Configuration¶
from lakefs.client import Client
import os
def create_optimized_client():
"""Create a performance-optimized client"""
return Client(
host=os.getenv('LAKEFS_ENDPOINT'),
username=os.getenv('LAKEFS_ACCESS_KEY_ID'),
password=os.getenv('LAKEFS_SECRET_ACCESS_KEY'),
# Connection pooling for high throughput
pool_connections=50,
pool_maxsize=100,
pool_block=False,
# Retry configuration
max_retries=5,
backoff_factor=0.3,
retry_on_status=[500, 502, 503, 504],
# Timeout settings
timeout=60,
connect_timeout=10,
read_timeout=50,
# SSL and security
verify_ssl=True,
ssl_ca_cert=os.getenv('LAKEFS_CA_CERT_PATH'),
# Proxy configuration
proxy=os.getenv('HTTPS_PROXY'),
proxy_headers={'User-Agent': 'MyApp/1.0'}
)
# Create singleton client for reuse
_optimized_client = None
def get_optimized_client():
"""Get or create optimized client singleton"""
global _optimized_client
if _optimized_client is None:
_optimized_client = create_optimized_client()
return _optimized_client
Batch Operations and Bulk Processing¶
import concurrent.futures
import threading
from collections import defaultdict
class BulkOperationManager:
"""Manager for efficient bulk operations"""
def __init__(self, branch, max_workers=10, batch_size=100):
self.branch = branch
self.max_workers = max_workers
self.batch_size = batch_size
self.results = defaultdict(list)
self.lock = threading.Lock()
def bulk_upload(self, file_data_pairs):
"""Upload multiple files efficiently using threading"""
def upload_batch(batch):
"""Upload a batch of files in a transaction"""
batch_results = []
try:
with self.branch.transact(
commit_message=f"Bulk upload batch ({len(batch)} files)"
) as tx:
for path, data in batch:
obj = tx.object(path).upload(data=data)
batch_results.append({"path": path, "status": "success", "object": obj})
except Exception as e:
for path, _ in batch:
batch_results.append({"path": path, "status": "error", "error": str(e)})
with self.lock:
self.results["uploads"].extend(batch_results)
return batch_results
# Split into batches
batches = [
file_data_pairs[i:i + self.batch_size]
for i in range(0, len(file_data_pairs), self.batch_size)
]
# Process batches concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(upload_batch, batch) for batch in batches]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Batch upload failed: {e}")
return self.results["uploads"]
def bulk_download(self, paths):
"""Download multiple files efficiently"""
def download_file(path):
"""Download a single file"""
try:
obj = self.branch.object(path)
if obj.exists():
content = obj.reader().read()
return {"path": path, "status": "success", "content": content}
else:
return {"path": path, "status": "not_found"}
except Exception as e:
return {"path": path, "status": "error", "error": str(e)}
# Download files concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {executor.submit(download_file, path): path for path in paths}
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
# Usage
bulk_manager = BulkOperationManager(branch, max_workers=20, batch_size=50)
# Bulk upload
files_to_upload = [
(f"data/file_{i}.txt", f"Content for file {i}")
for i in range(1000)
]
upload_results = bulk_manager.bulk_upload(files_to_upload)
success_count = len([r for r in upload_results if r["status"] == "success"])
print(f"Successfully uploaded {success_count} files")
# Bulk download
paths_to_download = [f"data/file_{i}.txt" for i in range(100)]
download_results = bulk_manager.bulk_download(paths_to_download)
Memory-Efficient Streaming¶
import hashlib
import time
class StreamingProcessor:
"""Efficient streaming processor for large files"""
def __init__(self, chunk_size=64*1024): # 64KB chunks
self.chunk_size = chunk_size
def stream_upload_with_progress(self, branch, local_path, remote_path,
progress_callback=None):
"""Upload large file with progress tracking"""
file_size = os.path.getsize(local_path)
uploaded_bytes = 0
start_time = time.time()
obj = branch.object(remote_path)
with open(local_path, 'rb') as local_file:
with obj.writer(mode='wb') as remote_writer:
while True:
chunk = local_file.read(self.chunk_size)
if not chunk:
break
remote_writer.write(chunk)
uploaded_bytes += len(chunk)
if progress_callback:
progress = (uploaded_bytes / file_size) * 100
elapsed = time.time() - start_time
speed = uploaded_bytes / elapsed if elapsed > 0 else 0
progress_callback({
"progress": progress,
"uploaded_bytes": uploaded_bytes,
"total_bytes": file_size,
"speed_bps": speed,
"elapsed_time": elapsed
})
return obj
def stream_download_with_verification(self, branch, remote_path, local_path,
verify_checksum=True):
"""Download large file with checksum verification"""
obj = branch.object(remote_path)
# Get object stats for verification
stats = obj.stat()
expected_size = stats.size_bytes
expected_checksum = stats.checksum if verify_checksum else None
downloaded_bytes = 0
hasher = hashlib.sha256() if verify_checksum else None
with obj.reader(mode='rb') as remote_reader:
with open(local_path, 'wb') as local_file:
while True:
chunk = remote_reader.read(self.chunk_size)
if not chunk:
break
local_file.write(chunk)
downloaded_bytes += len(chunk)
if hasher:
hasher.update(chunk)
# Verify download
if downloaded_bytes != expected_size:
raise ValueError(f"Size mismatch: expected {expected_size}, got {downloaded_bytes}")
if verify_checksum and expected_checksum:
actual_checksum = f"sha256:{hasher.hexdigest()}"
if actual_checksum != expected_checksum:
raise ValueError(f"Checksum mismatch: expected {expected_checksum}, got {actual_checksum}")
return {
"local_path": local_path,
"remote_path": remote_path,
"size_bytes": downloaded_bytes,
"checksum_verified": verify_checksum
}
# Usage with progress tracking
def progress_callback(info):
print(f"Upload progress: {info['progress']:.1f}% "
f"({info['uploaded_bytes']}/{info['total_bytes']} bytes) "
f"Speed: {info['speed_bps']/1024/1024:.1f} MB/s")
processor = StreamingProcessor()
# Upload large file
large_obj = processor.stream_upload_with_progress(
branch,
"large_dataset.csv",
"data/large_dataset.csv",
progress_callback=progress_callback
)
# Download with verification
result = processor.stream_download_with_verification(
branch,
"data/large_dataset.csv",
"downloaded_dataset.csv",
verify_checksum=True
)
print(f"Download verified: {result}")
Connection Pooling and Resource Management¶
import atexit
from contextlib import contextmanager
class ConnectionManager:
"""Manage lakeFS connections and resources efficiently"""
def __init__(self):
self.clients = {}
self.active_connections = 0
self.max_connections = 50
# Register cleanup on exit
atexit.register(self.cleanup_all)
def get_client(self, config_name="default", **client_kwargs):
"""Get or create a client with connection pooling"""
if config_name not in self.clients:
if self.active_connections >= self.max_connections:
raise RuntimeError(f"Maximum connections ({self.max_connections}) exceeded")
client_config = {
"pool_connections": 10,
"pool_maxsize": 20,
"max_retries": 3,
**client_kwargs
}
self.clients[config_name] = Client(**client_config)
self.active_connections += 1
return self.clients[config_name]
@contextmanager
def managed_repository(self, repo_id, config_name="default", **client_kwargs):
"""Context manager for repository operations with automatic cleanup"""
client = self.get_client(config_name, **client_kwargs)
repo = lakefs.Repository(repo_id, client=client)
try:
yield repo
finally:
# Cleanup could be added here if needed
pass
def cleanup_all(self):
"""Cleanup all connections"""
for client in self.clients.values():
# Perform any necessary cleanup
pass
self.clients.clear()
self.active_connections = 0
# Global connection manager
connection_manager = ConnectionManager()
# Usage
with connection_manager.managed_repository("my-repo") as repo:
branch = repo.branch("main")
# Perform operations
branch.object("data/test.txt").upload(data="test content")
# Repository and client are automatically managed
Advanced I/O Patterns¶
Custom Serialization and Formats¶
import pickle
import json
import csv
import io
import gzip
import base64
from typing import Any, Dict, List
class AdvancedSerializer:
"""Advanced serialization for different data types and formats"""
@staticmethod
def serialize_python_object(obj: Any, compression=True) -> bytes:
"""Serialize Python object with optional compression"""
data = pickle.dumps(obj)
if compression:
data = gzip.compress(data)
return data
@staticmethod
def deserialize_python_object(data: bytes, compression=True) -> Any:
"""Deserialize Python object with optional decompression"""
if compression:
data = gzip.decompress(data)
return pickle.loads(data)
@staticmethod
def serialize_dataframe(df, format='parquet', compression=True) -> bytes:
"""Serialize pandas DataFrame in various formats"""
buffer = io.BytesIO()
if format == 'parquet':
df.to_parquet(buffer, compression='gzip' if compression else None)
elif format == 'csv':
csv_data = df.to_csv(index=False)
if compression:
csv_data = gzip.compress(csv_data.encode('utf-8'))
else:
csv_data = csv_data.encode('utf-8')
buffer.write(csv_data)
elif format == 'json':
json_data = df.to_json(orient='records', indent=2)
if compression:
json_data = gzip.compress(json_data.encode('utf-8'))
else:
json_data = json_data.encode('utf-8')
buffer.write(json_data)
return buffer.getvalue()
@staticmethod
def deserialize_dataframe(data: bytes, format='parquet', compression=True):
"""Deserialize pandas DataFrame from various formats"""
import pandas as pd
if compression and format in ['csv', 'json']:
data = gzip.decompress(data)
buffer = io.BytesIO(data)
if format == 'parquet':
return pd.read_parquet(buffer)
elif format == 'csv':
return pd.read_csv(buffer)
elif format == 'json':
return pd.read_json(buffer, orient='records')
def advanced_data_storage(branch, data_items):
"""Store various data types with optimal serialization"""
serializer = AdvancedSerializer()
for item_name, item_data in data_items.items():
if isinstance(item_data, dict):
# Store as compressed JSON
json_data = json.dumps(item_data, indent=2)
compressed_data = gzip.compress(json_data.encode('utf-8'))
branch.object(f"data/{item_name}.json.gz").upload(
data=compressed_data,
content_type="application/gzip",
metadata={"format": "json", "compression": "gzip"}
)
elif hasattr(item_data, 'to_parquet'): # pandas DataFrame
# Store as compressed Parquet
parquet_data = serializer.serialize_dataframe(
item_data, format='parquet', compression=True
)
branch.object(f"data/{item_name}.parquet").upload(
data=parquet_data,
content_type="application/octet-stream",
metadata={"format": "parquet", "compression": "gzip"}
)
else:
# Store as compressed pickle for arbitrary Python objects
pickle_data = serializer.serialize_python_object(
item_data, compression=True
)
branch.object(f"data/{item_name}.pkl.gz").upload(
data=pickle_data,
content_type="application/octet-stream",
metadata={"format": "pickle", "compression": "gzip"}
)
# Usage
import pandas as pd
data_items = {
"config": {"version": "1.0", "settings": {"debug": True}},
"users": pd.DataFrame({"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]}),
"model": {"weights": [0.1, 0.2, 0.3], "bias": 0.05}
}
advanced_data_storage(branch, data_items)
Pre-signed URL Management¶
import time
from urllib.parse import urlparse
import requests
class PreSignedURLManager:
"""Manage pre-signed URLs for direct storage access"""
def __init__(self, branch):
self.branch = branch
self.url_cache = {}
self.cache_duration = 3600 # 1 hour
def get_presigned_upload_url(self, path, content_type=None, cache=True):
"""Get pre-signed URL for direct upload"""
cache_key = f"upload:{path}:{content_type}"
if cache and cache_key in self.url_cache:
cached_url, timestamp = self.url_cache[cache_key]
if time.time() - timestamp < self.cache_duration:
return cached_url
# Get object stats to get pre-signed URL
obj = self.branch.object(path)
# For uploads, we need to use the writer with pre-sign
with obj.writer(mode='wb', pre_sign=True, content_type=content_type) as writer:
# The writer provides access to pre-signed URL
presigned_url = writer._pre_signed_url if hasattr(writer, '_pre_signed_url') else None
if cache and presigned_url:
self.url_cache[cache_key] = (presigned_url, time.time())
return presigned_url
def get_presigned_download_url(self, path, cache=True):
"""Get pre-signed URL for direct download"""
cache_key = f"download:{path}"
if cache and cache_key in self.url_cache:
cached_url, timestamp = self.url_cache[cache_key]
if time.time() - timestamp < self.cache_duration:
return cached_url
# Get object stats with pre-sign enabled
obj = self.branch.object(path)
stats = obj.stat(pre_sign=True)
presigned_url = stats.physical_address
if cache and presigned_url:
self.url_cache[cache_key] = (presigned_url, time.time())
return presigned_url
def direct_upload_via_presigned(self, path, data, content_type=None):
"""Upload data directly using pre-signed URL"""
presigned_url = self.get_presigned_upload_url(path, content_type)
if not presigned_url:
# Fallback to regular upload
return self.branch.object(path).upload(
data=data,
content_type=content_type
)
# Direct upload to storage
headers = {}
if content_type:
headers['Content-Type'] = content_type
response = requests.put(presigned_url, data=data, headers=headers)
response.raise_for_status()
return self.branch.object(path)
def direct_download_via_presigned(self, path):
"""Download data directly using pre-signed URL"""
presigned_url = self.get_presigned_download_url(path)
if not presigned_url:
# Fallback to regular download
return self.branch.object(path).reader().read()
# Direct download from storage
response = requests.get(presigned_url)
response.raise_for_status()
return response.content
# Usage
url_manager = PreSignedURLManager(branch)
# Direct upload
data = b"Large binary data that benefits from direct upload"
obj = url_manager.direct_upload_via_presigned(
"data/large-file.bin",
data,
content_type="application/octet-stream"
)
# Direct download
downloaded_data = url_manager.direct_download_via_presigned("data/large-file.bin")
Logging and Debugging¶
Comprehensive Logging Setup¶
import logging
import sys
from datetime import datetime
class LakeFSLogger:
"""Comprehensive logging setup for lakeFS operations"""
def __init__(self, name="lakefs_app", level=logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Prevent duplicate handlers
if not self.logger.handlers:
self._setup_handlers()
def _setup_handlers(self):
"""Set up logging handlers"""
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
# File handler
file_handler = logging.FileHandler(
f"lakefs_{datetime.now().strftime('%Y%m%d')}.log"
)
file_handler.setLevel(logging.DEBUG)
# Formatters
console_format = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_format = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
)
console_handler.setFormatter(console_format)
file_handler.setFormatter(file_format)
self.logger.addHandler(console_handler)
self.logger.addHandler(file_handler)
# Enable lakeFS SDK logging
lakefs_logger = logging.getLogger('lakefs')
lakefs_logger.setLevel(logging.DEBUG)
lakefs_logger.addHandler(file_handler)
def log_operation(self, operation_name, func, *args, **kwargs):
"""Log operation execution with timing"""
start_time = time.time()
self.logger.info(f"Starting operation: {operation_name}")
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
self.logger.info(f"Completed operation: {operation_name} ({duration:.2f}s)")
return result
except Exception as e:
duration = time.time() - start_time
self.logger.error(f"Failed operation: {operation_name} ({duration:.2f}s) - {e}")
raise
def log_transaction(self, tx_func):
"""Decorator for logging transaction operations"""
def wrapper(branch, commit_message, *args, **kwargs):
self.logger.info(f"Starting transaction: {commit_message}")
try:
with branch.transact(commit_message=commit_message) as tx:
result = tx_func(tx, *args, **kwargs)
self.logger.info(f"Transaction completed: {commit_message}")
return result
except Exception as e:
self.logger.error(f"Transaction failed: {commit_message} - {e}")
raise
return wrapper
# Usage
logger = LakeFSLogger("my_app", level=logging.DEBUG)
# Log regular operations
def upload_data(branch, path, data):
return branch.object(path).upload(data=data)
result = logger.log_operation(
"upload_user_data",
upload_data,
branch, "data/users.json", '{"users": []}'
)
# Log transactions
@logger.log_transaction
def process_data_transaction(tx, input_path, output_path):
# Read input
input_obj = tx.object(input_path)
data = json.loads(input_obj.reader().read())
# Process data
processed = [{"id": item["id"], "processed": True} for item in data]
# Write output
tx.object(output_path).upload(
data=json.dumps(processed, indent=2),
content_type="application/json"
)
return len(processed)
# Execute logged transaction
processed_count = process_data_transaction(
branch,
"Transaction: Process user data",
"raw/users.json",
"processed/users.json"
)
Performance Monitoring and Profiling¶
import time
import psutil
import threading
from collections import defaultdict, deque
class PerformanceMonitor:
"""Monitor performance metrics for lakeFS operations"""
def __init__(self, max_history=1000):
self.metrics = defaultdict(deque)
self.max_history = max_history
self.active_operations = {}
self.lock = threading.Lock()
def start_operation(self, operation_id, operation_type):
"""Start monitoring an operation"""
with self.lock:
self.active_operations[operation_id] = {
"type": operation_type,
"start_time": time.time(),
"start_memory": psutil.Process().memory_info().rss,
"start_cpu": psutil.Process().cpu_percent()
}
def end_operation(self, operation_id, success=True, error=None):
"""End monitoring an operation"""
with self.lock:
if operation_id not in self.active_operations:
return
start_info = self.active_operations.pop(operation_id)
end_time = time.time()
end_memory = psutil.Process().memory_info().rss
duration = end_time - start_info["start_time"]
memory_delta = end_memory - start_info["start_memory"]
metric = {
"operation_id": operation_id,
"type": start_info["type"],
"duration": duration,
"memory_delta": memory_delta,
"success": success,
"error": str(error) if error else None,
"timestamp": end_time
}
# Store metric
self.metrics[start_info["type"]].append(metric)
# Limit history
if len(self.metrics[start_info["type"]]) > self.max_history:
self.metrics[start_info["type"]].popleft()
def get_stats(self, operation_type=None):
"""Get performance statistics"""
with self.lock:
if operation_type:
operations = [operation_type] if operation_type in self.metrics else []
else:
operations = list(self.metrics.keys())
stats = {}
for op_type in operations:
metrics = list(self.metrics[op_type])
if not metrics:
continue
successful = [m for m in metrics if m["success"]]
failed = [m for m in metrics if not m["success"]]
durations = [m["duration"] for m in successful]
memory_deltas = [m["memory_delta"] for m in successful]
stats[op_type] = {
"total_operations": len(metrics),
"successful": len(successful),
"failed": len(failed),
"success_rate": len(successful) / len(metrics) * 100 if metrics else 0,
"avg_duration": sum(durations) / len(durations) if durations else 0,
"max_duration": max(durations) if durations else 0,
"min_duration": min(durations) if durations else 0,
"avg_memory_delta": sum(memory_deltas) / len(memory_deltas) if memory_deltas else 0,
"recent_errors": [m["error"] for m in failed[-5:] if m["error"]]
}
return stats
def monitored_operation(self, operation_type):
"""Decorator for monitoring operations"""
def decorator(func):
def wrapper(*args, **kwargs):
operation_id = f"{operation_type}_{int(time.time() * 1000)}"
self.start_operation(operation_id, operation_type)
try:
result = func(*args, **kwargs)
self.end_operation(operation_id, success=True)
return result
except Exception as e:
self.end_operation(operation_id, success=False, error=e)
raise
return wrapper
return decorator
# Global performance monitor
perf_monitor = PerformanceMonitor()
# Usage with decorator
@perf_monitor.monitored_operation("object_upload")
def monitored_upload(branch, path, data):
return branch.object(path).upload(data=data)
@perf_monitor.monitored_operation("object_download")
def monitored_download(branch, path):
return branch.object(path).reader().read()
@perf_monitor.monitored_operation("transaction")
def monitored_transaction(branch, commit_message, operations):
with branch.transact(commit_message=commit_message) as tx:
results = []
for op in operations:
if op["type"] == "upload":
result = tx.object(op["path"]).upload(data=op["data"])
results.append(result)
return results
# Perform monitored operations
for i in range(10):
monitored_upload(branch, f"test/file_{i}.txt", f"Content {i}")
content = monitored_download(branch, f"test/file_{i}.txt")
# Get performance statistics
stats = perf_monitor.get_stats()
for op_type, metrics in stats.items():
print(f"\n{op_type.upper()} Statistics:")
print(f" Total operations: {metrics['total_operations']}")
print(f" Success rate: {metrics['success_rate']:.1f}%")
print(f" Average duration: {metrics['avg_duration']:.3f}s")
print(f" Max duration: {metrics['max_duration']:.3f}s")
print(f" Average memory delta: {metrics['avg_memory_delta']/1024/1024:.1f} MB")
Production Deployment Strategies¶
Configuration Management¶
import os
import yaml
from dataclasses import dataclass
from typing import Optional, Dict, Any
@dataclass
class LakeFSConfig:
"""Configuration class for lakeFS deployment"""
# Connection settings
endpoint: str
access_key_id: str
secret_access_key: str
# SSL/TLS settings
verify_ssl: bool = True
ssl_ca_cert: Optional[str] = None
# Connection pooling
pool_connections: int = 20
pool_maxsize: int = 50
pool_block: bool = False
# Retry settings
max_retries: int = 3
backoff_factor: float = 0.3
retry_on_status: list = None
# Timeout settings
timeout: int = 60
connect_timeout: int = 10
read_timeout: int = 50
# Proxy settings
proxy: Optional[str] = None
proxy_headers: Optional[Dict[str, str]] = None
# Application settings
default_branch: str = "main"
batch_size: int = 100
max_workers: int = 10
def __post_init__(self):
if self.retry_on_status is None:
self.retry_on_status = [500, 502, 503, 504]
class ConfigManager:
"""Manage lakeFS configuration from multiple sources"""
@staticmethod
def from_environment() -> LakeFSConfig:
"""Load configuration from environment variables"""
return LakeFSConfig(
endpoint=os.getenv('LAKEFS_ENDPOINT', 'http://localhost:8000'),
access_key_id=os.getenv('LAKEFS_ACCESS_KEY_ID'),
secret_access_key=os.getenv('LAKEFS_SECRET_ACCESS_KEY'),
verify_ssl=os.getenv('LAKEFS_VERIFY_SSL', 'true').lower() == 'true',
ssl_ca_cert=os.getenv('LAKEFS_CA_CERT_PATH'),
pool_connections=int(os.getenv('LAKEFS_POOL_CONNECTIONS', '20')),
pool_maxsize=int(os.getenv('LAKEFS_POOL_MAXSIZE', '50')),
max_retries=int(os.getenv('LAKEFS_MAX_RETRIES', '3')),
timeout=int(os.getenv('LAKEFS_TIMEOUT', '60')),
proxy=os.getenv('HTTPS_PROXY'),
default_branch=os.getenv('LAKEFS_DEFAULT_BRANCH', 'main'),
batch_size=int(os.getenv('LAKEFS_BATCH_SIZE', '100')),
max_workers=int(os.getenv('LAKEFS_MAX_WORKERS', '10'))
)
@staticmethod
def from_file(config_path: str) -> LakeFSConfig:
"""Load configuration from YAML file"""
with open(config_path, 'r') as f:
config_data = yaml.safe_load(f)
return LakeFSConfig(**config_data.get('lakefs', {}))
@staticmethod
def from_dict(config_dict: Dict[str, Any]) -> LakeFSConfig:
"""Load configuration from dictionary"""
return LakeFSConfig(**config_dict)
class ProductionLakeFSClient:
"""Production-ready lakeFS client with comprehensive configuration"""
def __init__(self, config: LakeFSConfig):
self.config = config
self._client = None
self._health_check_interval = 300 # 5 minutes
self._last_health_check = 0
@property
def client(self) -> Client:
"""Get or create lakeFS client"""
if self._client is None:
self._client = Client(
host=self.config.endpoint,
username=self.config.access_key_id,
password=self.config.secret_access_key,
verify_ssl=self.config.verify_ssl,
ssl_ca_cert=self.config.ssl_ca_cert,
pool_connections=self.config.pool_connections,
pool_maxsize=self.config.pool_maxsize,
pool_block=self.config.pool_block,
max_retries=self.config.max_retries,
backoff_factor=self.config.backoff_factor,
retry_on_status=self.config.retry_on_status,
timeout=self.config.timeout,
connect_timeout=self.config.connect_timeout,
read_timeout=self.config.read_timeout,
proxy=self.config.proxy,
proxy_headers=self.config.proxy_headers
)
return self._client
def health_check(self, force=False) -> bool:
"""Perform health check with caching"""
current_time = time.time()
if not force and (current_time - self._last_health_check) < self._health_check_interval:
return True # Assume healthy if recently checked
try:
# Simple operation to test connectivity
list(lakefs.repositories(client=self.client, max_amount=1))
self._last_health_check = current_time
return True
except Exception as e:
logger.error(f"Health check failed: {e}")
return False
def get_repository(self, repo_id: str):
"""Get repository with health check"""
if not self.health_check():
raise RuntimeError("lakeFS health check failed")
return lakefs.Repository(repo_id, client=self.client)
# Usage
# Load configuration
config = ConfigManager.from_environment()
# Create production client
prod_client = ProductionLakeFSClient(config)
# Use with health checking
try:
repo = prod_client.get_repository("my-repo")
branch = repo.branch(config.default_branch)
# Perform operations
branch.object("data/production.txt").upload(data="Production data")
except Exception as e:
logger.error(f"Production operation failed: {e}")
Key Points¶
- Comprehensive error handling: Use specific exception types for robust error recovery
- Performance optimization: Implement connection pooling, batch operations, and streaming
- Resource management: Use context managers and connection pooling for efficient resource usage
- Monitoring and logging: Implement comprehensive logging and performance monitoring
- Production readiness: Use proper configuration management and health checking
- Advanced I/O: Leverage pre-signed URLs and custom serialization for optimal performance
- Debugging support: Enable detailed logging and performance profiling for troubleshooting
See Also¶
- Repository Management - Creating and managing repositories
- Branch Operations - Version control operations
- Object Operations - Individual object management
- Transactions - Atomic multi-operation workflows
- Import Operations - Bulk data operations
- Best Practices - Production deployment guidance
- Troubleshooting - Common issues and solutions