Troubleshooting Guide¶
This comprehensive guide covers common issues, error patterns, debugging techniques, and performance optimization tips for Python lakeFS integrations across all SDK options.
Common Error Types and Solutions¶
Authentication and Connection Errors¶
Error: Unauthorized (401)
¶
Symptom:
Causes and Solutions:
-
Invalid Credentials
# Check your credentials import lakefs # Verify credentials are set correctly client = lakefs.Client( host="http://localhost:8000", access_key_id="your-access-key", secret_access_key="your-secret-key" ) # Test connection try: repos = list(client.repositories.list(amount=1)) print("Authentication successful") except lakefs.exceptions.UnauthorizedException: print("Invalid credentials")
-
Environment Variables Not Set
-
Token Expiration (for JWT tokens)
Error: Connection refused
or Connection timeout
¶
Symptom:
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=8000):
Max retries exceeded with url: /api/v1/repositories
Solutions:
-
Verify lakeFS Server is Running
-
Check Network Configuration
import requests # Test basic connectivity try: response = requests.get("http://localhost:8000/api/v1/healthcheck", timeout=5) print(f"Server responding: {response.status_code}") except requests.exceptions.ConnectionError: print("Cannot connect to lakeFS server") except requests.exceptions.Timeout: print("Connection timeout - server may be slow")
-
Proxy Configuration Issues
Error: SSL Certificate Verification Failed¶
Symptom:
requests.exceptions.SSLError: HTTPSConnectionPool(host='lakefs.example.com', port=443):
Max retries exceeded with url: /api/v1/repositories (Caused by SSLError(SSLCertVerificationError))
Solutions:
- Configure SSL Verification
import lakefs # Option 1: Disable SSL verification (NOT recommended for production) client = lakefs.Client( host="https://lakefs.example.com", access_key_id="your-key", secret_access_key="your-secret", verify_ssl=False ) # Option 2: Provide custom CA certificate client = lakefs.Client( host="https://lakefs.example.com", access_key_id="your-key", secret_access_key="your-secret", ssl_ca_cert="/path/to/ca-bundle.crt" ) # Option 3: Use system CA bundle import certifi client = lakefs.Client( host="https://lakefs.example.com", access_key_id="your-key", secret_access_key="your-secret", ssl_ca_cert=certifi.where() )
Repository and Branch Errors¶
Error: Repository not found (404)
¶
Symptom:
Solutions:
-
Verify Repository Exists
import lakefs client = lakefs.Client() # List all repositories repos = list(client.repositories.list()) print("Available repositories:") for repo in repos: print(f" - {repo.id}") # Check specific repository try: repo = client.repository("my-repo") print(f"Repository found: {repo.id}") except lakefs.exceptions.NotFoundException: print("Repository does not exist")
-
Create Repository if Missing
Error: Branch not found (404)
¶
Symptom:
Solutions:
-
List Available Branches
-
Create Branch if Missing
Object Operation Errors¶
Error: Object not found (404)
¶
Symptom:
Solutions:
-
Verify Object Path and Branch
repo = client.repository("my-repo") branch = repo.branch("main") # List objects to verify path objects = list(branch.objects.list(prefix="path/to/")) print("Objects in path:") for obj in objects: print(f" - {obj.path}") # Check if object exists try: obj = branch.object("path/to/file.txt") stat = obj.stat() print(f"Object found: {stat.path}, size: {stat.size_bytes}") except lakefs.exceptions.NotFoundException: print("Object does not exist")
-
Handle Missing Objects Gracefully
Error: Conflict (409)
during Upload¶
Symptom:
Solutions:
-
Handle Concurrent Modifications
import time import random def upload_with_retry(branch, path, data, max_retries=3): for attempt in range(max_retries): try: return branch.object(path).upload(data) except lakefs.exceptions.ConflictException: if attempt == max_retries - 1: raise # Wait with exponential backoff wait_time = (2 ** attempt) + random.uniform(0, 1) time.sleep(wait_time)
-
Use Transactions for Atomic Operations
Performance Issues¶
Slow Upload/Download Performance¶
Symptoms: - Uploads taking much longer than expected - High memory usage during file operations - Timeouts on large files
Solutions:
-
Use Streaming for Large Files
-
Configure Connection Pooling
-
Use Batch Operations
from concurrent.futures import ThreadPoolExecutor, as_completed def upload_files_concurrently(branch, files, max_workers=10): def upload_single_file(file_info): path, data = file_info return branch.object(path).upload(data) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(upload_single_file, file_info) for file_info in files] results = [] for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: print(f"Upload failed: {e}") return results
Memory Usage Issues¶
Symptoms: - High memory consumption - Out of memory errors - Memory leaks in long-running processes
Solutions:
-
Use Streaming Readers
-
Implement Proper Resource Management
SDK-Specific Issues¶
High-Level SDK Issues¶
Error: Transaction Rollback
# Handle transaction failures gracefully
try:
with repo.branch("main").transaction() as tx:
tx.upload("file1.txt", data1)
tx.upload("file2.txt", data2)
# Some operation fails here
raise Exception("Simulated failure")
except Exception as e:
print(f"Transaction rolled back: {e}")
# Handle rollback scenario
Generated SDK Issues¶
Error: API Response Parsing
from lakefs_sdk import ApiException
import json
try:
# Generated SDK API call
response = api_client.list_repositories()
except ApiException as e:
print(f"API Error: {e.status}")
print(f"Reason: {e.reason}")
# Parse error details
try:
error_details = json.loads(e.body)
print(f"Error message: {error_details.get('message')}")
except json.JSONDecodeError:
print(f"Raw error: {e.body}")
lakefs-spec Issues¶
Error: fsspec Configuration
import fsspec
import lakefs_spec
# Debug fsspec configuration
def debug_lakefs_spec():
try:
fs = fsspec.filesystem(
'lakefs',
host='http://localhost:8000',
access_key_id='your-key',
secret_access_key='your-secret'
)
# Test basic operation
files = fs.ls('repo/branch/')
print(f"Found {len(files)} files")
except Exception as e:
print(f"lakefs-spec error: {e}")
# Check if lakefs-spec is properly installed
try:
import lakefs_spec
print(f"lakefs-spec version: {lakefs_spec.__version__}")
except ImportError:
print("lakefs-spec not installed")
Boto3 Issues¶
Error: S3 Compatibility Issues
import boto3
from botocore.exceptions import ClientError
# Debug Boto3 with lakeFS
def debug_boto3_lakefs():
s3 = boto3.client(
's3',
endpoint_url='http://localhost:8000',
aws_access_key_id='your-key',
aws_secret_access_key='your-secret'
)
try:
# Test basic operation
response = s3.list_objects_v2(Bucket='repo', Prefix='branch/')
print(f"Found {response.get('KeyCount', 0)} objects")
except ClientError as e:
error_code = e.response['Error']['Code']
error_message = e.response['Error']['Message']
print(f"S3 Error {error_code}: {error_message}")
# Common S3 compatibility issues
if error_code == 'NoSuchBucket':
print("Repository may not exist or incorrect format")
elif error_code == 'AccessDenied':
print("Check credentials and permissions")
Debugging Techniques¶
Enable Debug Logging¶
import logging
import lakefs
# Enable debug logging for lakeFS
logging.basicConfig(level=logging.DEBUG)
lakefs_logger = logging.getLogger('lakefs')
lakefs_logger.setLevel(logging.DEBUG)
# Enable debug logging for requests
requests_logger = logging.getLogger('urllib3')
requests_logger.setLevel(logging.DEBUG)
# Enable debug logging for specific SDK
import lakefs_sdk
lakefs_sdk_logger = logging.getLogger('lakefs_sdk')
lakefs_sdk_logger.setLevel(logging.DEBUG)
Network Traffic Inspection¶
import requests
import lakefs
# Enable request/response logging
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
# Create client with debug session
session = requests.Session()
session.hooks['response'].append(lambda r, *args, **kwargs: print(f"Response: {r.status_code} {r.url}"))
client = lakefs.Client(session=session)
Performance Profiling¶
import cProfile
import pstats
import io
def profile_operation():
profiler = cProfile.Profile()
profiler.enable()
# Your lakeFS operations here
client = lakefs.Client()
repo = client.repository("my-repo")
branch = repo.branch("main")
# Perform operations to profile
for i in range(100):
branch.object(f"test_{i}.txt").upload(f"data_{i}")
profiler.disable()
# Analyze results
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s)
ps.sort_stats('cumulative')
ps.print_stats(20)
print(s.getvalue())
profile_operation()
Memory Usage Analysis¶
import tracemalloc
import lakefs
def analyze_memory_usage():
# Start tracing
tracemalloc.start()
# Your operations
client = lakefs.Client()
repo = client.repository("my-repo")
branch = repo.branch("main")
# Take snapshot before operations
snapshot1 = tracemalloc.take_snapshot()
# Perform memory-intensive operations
large_data = "x" * (10 * 1024 * 1024) # 10MB string
for i in range(10):
branch.object(f"large_{i}.txt").upload(large_data)
# Take snapshot after operations
snapshot2 = tracemalloc.take_snapshot()
# Compare snapshots
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("Top 10 memory allocations:")
for stat in top_stats[:10]:
print(stat)
analyze_memory_usage()
Performance Optimization Tips¶
Connection Optimization¶
import lakefs
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
# Create optimized session
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
# Configure adapter with retry strategy
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=20,
max_retries=retry_strategy
)
session.mount("http://", adapter)
session.mount("https://", adapter)
# Use optimized session with client
client = lakefs.Client(session=session)
Batch Processing Optimization¶
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class OptimizedBatchProcessor:
def __init__(self, client, max_workers=10, batch_size=100):
self.client = client
self.max_workers = max_workers
self.batch_size = batch_size
def process_files_in_batches(self, repo_name, branch_name, files):
repo = self.client.repository(repo_name)
branch = repo.branch(branch_name)
# Process files in batches
for i in range(0, len(files), self.batch_size):
batch = files[i:i + self.batch_size]
self._process_batch(branch, batch)
# Small delay between batches to avoid overwhelming server
time.sleep(0.1)
def _process_batch(self, branch, batch):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for file_info in batch:
future = executor.submit(self._process_single_file, branch, file_info)
futures.append(future)
# Wait for all files in batch to complete
for future in as_completed(futures):
try:
result = future.result()
print(f"Processed: {result}")
except Exception as e:
print(f"Error processing file: {e}")
def _process_single_file(self, branch, file_info):
path, data = file_info
return branch.object(path).upload(data)
Caching Strategies¶
import functools
import time
from typing import Dict, Any
class CachedLakeFSClient:
def __init__(self, client, cache_ttl=300): # 5 minutes
self.client = client
self.cache_ttl = cache_ttl
self._cache = {}
def _is_cache_valid(self, key):
if key not in self._cache:
return False
return time.time() - self._cache[key]['timestamp'] < self.cache_ttl
def get_repository_cached(self, repo_name):
cache_key = f"repo:{repo_name}"
if self._is_cache_valid(cache_key):
return self._cache[cache_key]['data']
repo = self.client.repository(repo_name)
self._cache[cache_key] = {
'data': repo,
'timestamp': time.time()
}
return repo
@functools.lru_cache(maxsize=128)
def get_branch_metadata(self, repo_name, branch_name):
"""Cache branch metadata using LRU cache"""
repo = self.get_repository_cached(repo_name)
branch = repo.branch(branch_name)
return {
'id': branch.id,
'commit_id': branch.head.id if branch.head else None
}
Diagnostic Tools and Scripts¶
Health Check Script¶
#!/usr/bin/env python3
"""
lakeFS Python SDK Health Check Script
"""
import sys
import time
import lakefs
from typing import Dict, Any
def check_connectivity(client: lakefs.Client) -> Dict[str, Any]:
"""Check basic connectivity to lakeFS server"""
try:
start_time = time.time()
repos = list(client.repositories.list(amount=1))
response_time = time.time() - start_time
return {
'status': 'pass',
'response_time': response_time,
'repository_count': len(repos)
}
except Exception as e:
return {
'status': 'fail',
'error': str(e)
}
def check_authentication(client: lakefs.Client) -> Dict[str, Any]:
"""Check authentication status"""
try:
# Try to perform an authenticated operation
repos = list(client.repositories.list(amount=1))
return {'status': 'pass'}
except lakefs.exceptions.UnauthorizedException:
return {'status': 'fail', 'error': 'Invalid credentials'}
except Exception as e:
return {'status': 'fail', 'error': str(e)}
def check_repository_operations(client: lakefs.Client, repo_name: str) -> Dict[str, Any]:
"""Check repository operations"""
try:
repo = client.repository(repo_name)
branches = list(repo.branches.list(amount=1))
return {
'status': 'pass',
'branch_count': len(branches)
}
except lakefs.exceptions.NotFoundException:
return {'status': 'fail', 'error': f'Repository {repo_name} not found'}
except Exception as e:
return {'status': 'fail', 'error': str(e)}
def main():
"""Run comprehensive health check"""
print("lakeFS Python SDK Health Check")
print("=" * 40)
try:
client = lakefs.Client()
except Exception as e:
print(f"Failed to create client: {e}")
sys.exit(1)
# Run checks
checks = {
'Connectivity': check_connectivity(client),
'Authentication': check_authentication(client),
}
# Add repository check if specified
if len(sys.argv) > 1:
repo_name = sys.argv[1]
checks[f'Repository ({repo_name})'] = check_repository_operations(client, repo_name)
# Print results
all_passed = True
for check_name, result in checks.items():
status = "✓ PASS" if result['status'] == 'pass' else "✗ FAIL"
print(f"{check_name}: {status}")
if result['status'] == 'fail':
all_passed = False
print(f" Error: {result.get('error', 'Unknown error')}")
elif 'response_time' in result:
print(f" Response time: {result['response_time']:.3f}s")
print("\n" + "=" * 40)
if all_passed:
print("All checks passed!")
sys.exit(0)
else:
print("Some checks failed!")
sys.exit(1)
if __name__ == "__main__":
main()
Configuration Validator¶
#!/usr/bin/env python3
"""
lakeFS Configuration Validator
"""
import os
import sys
import requests
import lakefs
from urllib.parse import urlparse
def validate_environment_variables():
"""Validate required environment variables"""
required_vars = [
'LAKEFS_HOST',
'LAKEFS_ACCESS_KEY_ID',
'LAKEFS_SECRET_ACCESS_KEY'
]
missing_vars = []
for var in required_vars:
if not os.environ.get(var):
missing_vars.append(var)
if missing_vars:
print(f"Missing environment variables: {', '.join(missing_vars)}")
return False
print("✓ All required environment variables are set")
return True
def validate_host_format(host: str):
"""Validate host URL format"""
try:
parsed = urlparse(host)
if not parsed.scheme or not parsed.netloc:
print(f"✗ Invalid host format: {host}")
return False
print(f"✓ Host format is valid: {host}")
return True
except Exception as e:
print(f"✗ Error parsing host: {e}")
return False
def validate_connectivity(host: str):
"""Validate network connectivity"""
try:
response = requests.get(f"{host}/api/v1/healthcheck", timeout=10)
if response.status_code == 200:
print("✓ Server is reachable and healthy")
return True
else:
print(f"✗ Server returned status code: {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print("✗ Cannot connect to server")
return False
except requests.exceptions.Timeout:
print("✗ Connection timeout")
return False
except Exception as e:
print(f"✗ Connection error: {e}")
return False
def main():
"""Run configuration validation"""
print("lakeFS Configuration Validator")
print("=" * 40)
all_valid = True
# Check environment variables
if not validate_environment_variables():
all_valid = False
# Check host format
host = os.environ.get('LAKEFS_HOST', '')
if host and not validate_host_format(host):
all_valid = False
# Check connectivity
if host and not validate_connectivity(host):
all_valid = False
# Test client creation
try:
client = lakefs.Client()
print("✓ Client created successfully")
except Exception as e:
print(f"✗ Failed to create client: {e}")
all_valid = False
print("\n" + "=" * 40)
if all_valid:
print("Configuration is valid!")
sys.exit(0)
else:
print("Configuration has issues!")
sys.exit(1)
if __name__ == "__main__":
main()
Getting Help¶
Community Resources¶
- GitHub Issues: lakeFS Python SDK Issues
- Slack Community: Join lakeFS Slack
- Documentation: Official lakeFS Documentation
Reporting Issues¶
When reporting issues, please include:
-
Environment Information:
-
Minimal Reproduction Case:
-
Error Messages: Include full error messages and stack traces
-
Configuration: Describe your lakeFS server setup and network configuration
Advanced Debugging¶
For complex issues, consider:
- Enable verbose logging for all components
- Use network traffic inspection tools like Wireshark
- Profile memory and CPU usage during operations
- Test with minimal configurations to isolate issues
- Compare behavior across different SDK options
See Also¶
- API Comparison - Choose the right SDK for your use case
- Best Practices - Production deployment guidelines
- Getting Started Guide - Initial setup and configuration
- High-Level SDK Documentation - Comprehensive SDK guide
- Generated SDK Documentation - Direct API access
- lakefs-spec Documentation - Filesystem interface
- Boto3 Integration - S3-compatible operations