Transaction Handling¶
Learn how to use transactions for atomic operations that ensure data consistency in your lakeFS repositories. Transactions provide ACID-like properties for complex multi-object operations.
Transaction Concepts¶
How Transactions Work¶
lakeFS transactions use an ephemeral branch pattern: 1. Create ephemeral branch - A temporary branch is created from the source branch 2. Perform operations - All changes are made on the ephemeral branch 3. Commit changes - Changes are committed to the ephemeral branch 4. Merge back - The ephemeral branch is merged back to the source branch 5. Cleanup - The ephemeral branch is deleted
Transaction Properties¶
- Atomic: All operations succeed or all fail together
- Consistent: Repository remains in a valid state
- Isolated: Operations don't interfere with concurrent changes
- Durable: Committed changes are permanently stored
Important Notes¶
- Transactions don't "lock" the source branch
- Concurrent changes to the source branch may cause transaction conflicts
- Transactions are designed for related operations that should succeed or fail together
Basic Transaction Usage¶
Simple Transaction Pattern¶
import lakefs
branch = lakefs.repository("my-repo").branch("main")
# Basic transaction with context manager
with branch.transact(commit_message="Atomic data update") as tx:
# All operations within this block are atomic
tx.object("data/file1.txt").upload(data="New content 1")
tx.object("data/file2.txt").upload(data="New content 2")
# If we reach the end without exceptions, changes are committed
# If an exception occurs, all changes are rolled back
print("Transaction completed successfully")
Expected Output:
Transaction with Metadata¶
# Transaction with rich metadata
with branch.transact(
commit_message="Update user profiles and preferences",
commit_metadata={
"operation": "user_data_update",
"version": "1.2.0",
"updated_by": "data_pipeline",
"batch_id": "batch_001"
}
) as tx:
# Upload user data
tx.object("users/profiles.json").upload(
data='{"users": [{"id": 1, "name": "Alice"}]}',
content_type="application/json"
)
# Upload preferences
tx.object("users/preferences.json").upload(
data='{"theme": "dark", "notifications": true}',
content_type="application/json"
)
print("User data updated atomically")
Accessing Transaction Properties¶
with branch.transact(commit_message="Initial message") as tx:
# Access transaction properties
print(f"Transaction ID: {tx.id}")
print(f"Source branch: {tx.source_id}")
print(f"Commit message: {tx.commit_message}")
# Modify transaction properties during execution
tx.commit_message = "Updated commit message"
tx.commit_metadata = {"updated": "true"}
# Perform operations
tx.object("data/example.txt").upload(data="Transaction example")
Transaction Operations¶
File Operations in Transactions¶
with branch.transact(commit_message="Comprehensive file operations") as tx:
# Upload new files
tx.object("data/new-dataset.csv").upload(
data="id,name,value\n1,Alice,100\n2,Bob,200",
content_type="text/csv"
)
# Update existing files
config_obj = tx.object("config/settings.json")
if config_obj.exists():
# Read current config
current_config = json.loads(config_obj.reader().read())
# Update configuration
current_config["version"] = "2.0"
current_config["last_updated"] = "2024-01-15"
# Write updated config
config_obj.upload(
data=json.dumps(current_config, indent=2),
content_type="application/json"
)
# Delete old files
old_files = list(tx.objects(prefix="data/old/"))
for obj_info in old_files:
tx.object(obj_info.path).delete()
# Batch delete using branch method
temp_files = ["temp/file1.txt", "temp/file2.txt", "temp/file3.txt"]
tx.delete_objects(temp_files)
print("File operations completed atomically")
Data Processing Pipeline¶
import json
import csv
from datetime import datetime
def process_data_pipeline(branch, input_path, output_path):
"""Process data through multiple stages atomically"""
with branch.transact(
commit_message=f"Data pipeline: {input_path} -> {output_path}",
commit_metadata={
"pipeline": "data_transformation",
"input": input_path,
"output": output_path,
"timestamp": datetime.now().isoformat()
}
) as tx:
# Stage 1: Read and validate input
input_obj = tx.object(input_path)
if not input_obj.exists():
raise ValueError(f"Input file {input_path} not found")
raw_data = json.loads(input_obj.reader().read())
# Stage 2: Transform data
processed_records = []
for record in raw_data.get("records", []):
processed_record = {
"id": record["id"],
"name": record["name"].upper(),
"value": record["value"] * 1.1, # Apply 10% increase
"processed_at": datetime.now().isoformat(),
"status": "processed"
}
processed_records.append(processed_record)
# Stage 3: Write processed data
output_obj = tx.object(output_path)
with output_obj.writer(mode='w', content_type="text/csv") as writer:
if processed_records:
fieldnames = processed_records[0].keys()
csv_writer = csv.DictWriter(writer, fieldnames=fieldnames)
csv_writer.writeheader()
csv_writer.writerows(processed_records)
# Stage 4: Create processing summary
summary = {
"input_file": input_path,
"output_file": output_path,
"records_processed": len(processed_records),
"processing_time": datetime.now().isoformat(),
"status": "completed"
}
summary_path = f"summaries/processing_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
tx.object(summary_path).upload(
data=json.dumps(summary, indent=2),
content_type="application/json"
)
# Stage 5: Archive input file
archive_path = f"archive/{input_path}"
input_obj.copy(tx.source_id, archive_path)
input_obj.delete()
return len(processed_records)
# Usage
processed_count = process_data_pipeline(
branch,
"raw/user_data.json",
"processed/user_data.csv"
)
print(f"Pipeline completed: {processed_count} records processed")
Error Handling and Rollback¶
Automatic Rollback on Exceptions¶
def demonstrate_rollback(branch):
"""Demonstrate automatic rollback on transaction failure"""
# Check initial state
initial_objects = list(branch.objects(prefix="demo/"))
print(f"Initial objects: {len(initial_objects)}")
try:
with branch.transact(commit_message="Failing transaction") as tx:
# These operations will be rolled back
tx.object("demo/file1.txt").upload(data="Content 1")
tx.object("demo/file2.txt").upload(data="Content 2")
tx.object("demo/file3.txt").upload(data="Content 3")
print("Files uploaded in transaction...")
# Check objects within transaction
tx_objects = list(tx.objects(prefix="demo/"))
print(f"Objects in transaction: {len(tx_objects)}")
# This will cause the transaction to fail and rollback
raise ValueError("Simulated error - transaction will rollback!")
# This line won't be reached
tx.object("demo/file4.txt").upload(data="Content 4")
except ValueError as e:
print(f"Transaction failed: {e}")
# Check final state - should be same as initial
final_objects = list(branch.objects(prefix="demo/"))
print(f"Final objects: {len(final_objects)}")
print(f"Rollback successful: {len(initial_objects) == len(final_objects)}")
# Run demonstration
demonstrate_rollback(branch)
Expected Output:
Initial objects: 0
Files uploaded in transaction...
Objects in transaction: 3
Transaction failed: Simulated error - transaction will rollback!
Final objects: 0
Rollback successful: True
Conditional Transaction Execution¶
def conditional_update(branch, condition_check):
"""Execute transaction only if conditions are met"""
with branch.transact(commit_message="Conditional update") as tx:
# Check conditions before proceeding
config_obj = tx.object("config/settings.json")
if not config_obj.exists():
raise ValueError("Configuration file not found")
config = json.loads(config_obj.reader().read())
# Apply condition check
if not condition_check(config):
raise ValueError("Conditions not met for update")
# Proceed with updates
config["last_updated"] = datetime.now().isoformat()
config["update_count"] = config.get("update_count", 0) + 1
config_obj.upload(
data=json.dumps(config, indent=2),
content_type="application/json"
)
# Log the update
log_entry = {
"timestamp": datetime.now().isoformat(),
"action": "conditional_update",
"config_version": config.get("version", "unknown")
}
tx.object("logs/updates.json").upload(
data=json.dumps(log_entry),
content_type="application/json"
)
return config
# Example condition check
def version_check(config):
return config.get("version", 0) < 3
try:
updated_config = conditional_update(branch, version_check)
print("Conditional update successful")
except ValueError as e:
print(f"Update skipped: {e}")
Transaction Retry Pattern¶
from lakefs.exceptions import TransactionException
import time
import random
def retry_transaction(branch, operation_func, max_retries=3, base_delay=1):
"""Execute transaction with exponential backoff retry"""
for attempt in range(max_retries):
try:
with branch.transact(
commit_message=f"Retry operation (attempt {attempt + 1})"
) as tx:
result = operation_func(tx)
print(f"Transaction succeeded on attempt {attempt + 1}")
return result
except TransactionException as e:
if attempt == max_retries - 1:
print(f"Transaction failed after {max_retries} attempts")
raise
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s...")
time.sleep(delay)
except Exception as e:
# Non-transaction errors shouldn't be retried
print(f"Non-retryable error: {e}")
raise
def risky_operation(tx):
"""Simulate an operation that might fail due to conflicts"""
# Simulate random failure
if random.random() < 0.7: # 70% chance of failure
raise TransactionException("Simulated conflict")
tx.object("data/result.txt").upload(data="Operation successful")
return "success"
# Usage
try:
result = retry_transaction(branch, risky_operation)
print(f"Final result: {result}")
except Exception as e:
print(f"Operation ultimately failed: {e}")
Advanced Transaction Patterns¶
Multi-Stage Data Processing¶
class DataProcessor:
"""Multi-stage data processor using transactions"""
def __init__(self, branch):
self.branch = branch
def process_dataset(self, dataset_name, stages):
"""Process dataset through multiple stages atomically"""
with self.branch.transact(
commit_message=f"Process dataset: {dataset_name}",
commit_metadata={
"dataset": dataset_name,
"stages": len(stages),
"processor": "DataProcessor"
}
) as tx:
current_data = None
stage_results = []
for i, stage in enumerate(stages):
stage_name = stage.__name__
print(f"Executing stage {i+1}: {stage_name}")
try:
# Execute stage
stage_result = stage(tx, current_data, dataset_name)
current_data = stage_result.get("data")
stage_results.append({
"stage": stage_name,
"status": "success",
"output": stage_result.get("output_path"),
"records": stage_result.get("record_count", 0)
})
except Exception as e:
print(f"Stage {stage_name} failed: {e}")
raise # This will rollback the entire transaction
# Create processing report
report = {
"dataset": dataset_name,
"processing_time": datetime.now().isoformat(),
"stages_completed": len(stage_results),
"stage_results": stage_results,
"status": "completed"
}
report_path = f"reports/{dataset_name}_processing_report.json"
tx.object(report_path).upload(
data=json.dumps(report, indent=2),
content_type="application/json"
)
return report
def extract_stage(tx, previous_data, dataset_name):
"""Extract data from source"""
source_path = f"raw/{dataset_name}.json"
source_obj = tx.object(source_path)
if not source_obj.exists():
raise ValueError(f"Source data not found: {source_path}")
data = json.loads(source_obj.reader().read())
return {
"data": data,
"output_path": source_path,
"record_count": len(data.get("records", []))
}
def transform_stage(tx, input_data, dataset_name):
"""Transform extracted data"""
if not input_data:
raise ValueError("No input data for transformation")
# Transform records
transformed_records = []
for record in input_data.get("records", []):
transformed_record = {
"id": record["id"],
"name": record["name"].strip().title(),
"value": float(record["value"]) * 1.1,
"transformed_at": datetime.now().isoformat()
}
transformed_records.append(transformed_record)
# Save transformed data
output_path = f"transformed/{dataset_name}.json"
transformed_data = {"records": transformed_records}
tx.object(output_path).upload(
data=json.dumps(transformed_data, indent=2),
content_type="application/json"
)
return {
"data": transformed_data,
"output_path": output_path,
"record_count": len(transformed_records)
}
def load_stage(tx, input_data, dataset_name):
"""Load transformed data to final destination"""
if not input_data:
raise ValueError("No input data for loading")
# Convert to CSV format
output_path = f"processed/{dataset_name}.csv"
with tx.object(output_path).writer(mode='w', content_type="text/csv") as writer:
records = input_data.get("records", [])
if records:
fieldnames = records[0].keys()
csv_writer = csv.DictWriter(writer, fieldnames=fieldnames)
csv_writer.writeheader()
csv_writer.writerows(records)
return {
"data": input_data,
"output_path": output_path,
"record_count": len(input_data.get("records", []))
}
# Usage
processor = DataProcessor(branch)
stages = [extract_stage, transform_stage, load_stage]
try:
report = processor.process_dataset("user_data", stages)
print(f"Processing completed: {report['stages_completed']} stages")
except Exception as e:
print(f"Processing failed and rolled back: {e}")
Concurrent Transaction Handling¶
import threading
import queue
import time
class TransactionWorker:
"""Worker for handling concurrent transactions"""
def __init__(self, branch, worker_id):
self.branch = branch
self.worker_id = worker_id
self.results = []
def process_work_item(self, work_item):
"""Process a single work item in a transaction"""
item_id = work_item["id"]
operation = work_item["operation"]
data = work_item["data"]
try:
with self.branch.transact(
commit_message=f"Worker {self.worker_id}: Process item {item_id}",
commit_metadata={
"worker_id": str(self.worker_id),
"item_id": str(item_id),
"operation": operation
}
) as tx:
if operation == "create":
path = f"work_items/{item_id}.json"
tx.object(path).upload(
data=json.dumps(data, indent=2),
content_type="application/json"
)
elif operation == "update":
path = f"work_items/{item_id}.json"
existing_obj = tx.object(path)
if existing_obj.exists():
existing_data = json.loads(existing_obj.reader().read())
existing_data.update(data)
existing_data["updated_by"] = f"worker_{self.worker_id}"
existing_data["updated_at"] = datetime.now().isoformat()
existing_obj.upload(
data=json.dumps(existing_data, indent=2),
content_type="application/json"
)
else:
raise ValueError(f"Item {item_id} not found for update")
elif operation == "delete":
path = f"work_items/{item_id}.json"
tx.object(path).delete()
# Log the operation
log_entry = {
"worker_id": self.worker_id,
"item_id": item_id,
"operation": operation,
"timestamp": datetime.now().isoformat(),
"status": "completed"
}
log_path = f"logs/worker_{self.worker_id}_{int(time.time())}.json"
tx.object(log_path).upload(
data=json.dumps(log_entry),
content_type="application/json"
)
return {"status": "success", "item_id": item_id}
except Exception as e:
return {"status": "error", "item_id": item_id, "error": str(e)}
def concurrent_transaction_demo(branch, work_items, num_workers=3):
"""Demonstrate concurrent transaction processing"""
work_queue = queue.Queue()
result_queue = queue.Queue()
# Add work items to queue
for item in work_items:
work_queue.put(item)
def worker_thread(worker_id):
"""Worker thread function"""
worker = TransactionWorker(branch, worker_id)
while True:
try:
work_item = work_queue.get(timeout=1)
result = worker.process_work_item(work_item)
result_queue.put(result)
work_queue.task_done()
except queue.Empty:
break
except Exception as e:
result_queue.put({
"status": "error",
"worker_id": worker_id,
"error": str(e)
})
# Start worker threads
threads = []
for i in range(num_workers):
thread = threading.Thread(target=worker_thread, args=(i,))
thread.start()
threads.append(thread)
# Wait for completion
work_queue.join()
# Collect results
results = []
while not result_queue.empty():
results.append(result_queue.get())
# Wait for threads to finish
for thread in threads:
thread.join()
return results
# Example usage
work_items = [
{"id": 1, "operation": "create", "data": {"name": "Item 1", "value": 100}},
{"id": 2, "operation": "create", "data": {"name": "Item 2", "value": 200}},
{"id": 3, "operation": "create", "data": {"name": "Item 3", "value": 300}},
{"id": 1, "operation": "update", "data": {"value": 150}},
{"id": 2, "operation": "delete", "data": {}},
]
results = concurrent_transaction_demo(branch, work_items)
print(f"Processed {len(results)} items concurrently")
success_count = len([r for r in results if r["status"] == "success"])
error_count = len([r for r in results if r["status"] == "error"])
print(f"Success: {success_count}, Errors: {error_count}")
Transaction Best Practices¶
Transaction Scope and Size¶
# ✅ Good: Focused, related operations
def good_transaction_example(branch):
with branch.transact(commit_message="Update user profile") as tx:
# Related operations that should succeed/fail together
tx.object("users/profile.json").upload(data=profile_data)
tx.object("users/preferences.json").upload(data=preferences_data)
tx.object("users/index.json").upload(data=updated_index)
# ❌ Avoid: Unrelated operations in single transaction
def avoid_large_transaction(branch):
with branch.transact(commit_message="Daily batch processing") as tx:
# Too many unrelated operations
# This makes the transaction prone to conflicts and hard to debug
for i in range(1000): # Too many operations
tx.object(f"data/file_{i}.txt").upload(data=f"content {i}")
# Unrelated operations mixed together
tx.object("config/settings.json").upload(data=config_data)
tx.object("logs/daily.log").upload(data=log_data)
Optimal Batch Sizes¶
def optimal_batch_processing(branch, items, batch_size=50):
"""Process items in optimal batch sizes"""
total_batches = (len(items) + batch_size - 1) // batch_size
results = []
for batch_num in range(total_batches):
start_idx = batch_num * batch_size
end_idx = min(start_idx + batch_size, len(items))
batch_items = items[start_idx:end_idx]
try:
with branch.transact(
commit_message=f"Process batch {batch_num + 1}/{total_batches}",
commit_metadata={
"batch_number": str(batch_num + 1),
"total_batches": str(total_batches),
"items_in_batch": str(len(batch_items))
}
) as tx:
batch_results = []
for item in batch_items:
# Process individual item
result = process_item(tx, item)
batch_results.append(result)
# Create batch summary
summary = {
"batch_number": batch_num + 1,
"items_processed": len(batch_results),
"timestamp": datetime.now().isoformat()
}
tx.object(f"summaries/batch_{batch_num + 1}.json").upload(
data=json.dumps(summary, indent=2),
content_type="application/json"
)
results.extend(batch_results)
print(f"Completed batch {batch_num + 1}/{total_batches}")
except Exception as e:
print(f"Batch {batch_num + 1} failed: {e}")
# Continue with next batch rather than failing everything
continue
return results
def process_item(tx, item):
"""Process a single item within transaction"""
path = f"processed/{item['id']}.json"
tx.object(path).upload(
data=json.dumps(item, indent=2),
content_type="application/json"
)
return {"id": item["id"], "status": "processed"}
Error Handling Strategies¶
class TransactionManager:
"""Advanced transaction management with error handling"""
def __init__(self, branch):
self.branch = branch
self.failed_operations = []
def execute_with_fallback(self, primary_operation, fallback_operation,
commit_message):
"""Execute operation with fallback on failure"""
# Try primary operation
try:
with self.branch.transact(commit_message=commit_message) as tx:
result = primary_operation(tx)
return {"status": "success", "method": "primary", "result": result}
except Exception as primary_error:
print(f"Primary operation failed: {primary_error}")
# Try fallback operation
try:
with self.branch.transact(
commit_message=f"{commit_message} (fallback)"
) as tx:
result = fallback_operation(tx)
return {"status": "success", "method": "fallback", "result": result}
except Exception as fallback_error:
print(f"Fallback operation also failed: {fallback_error}")
self.failed_operations.append({
"commit_message": commit_message,
"primary_error": str(primary_error),
"fallback_error": str(fallback_error),
"timestamp": datetime.now().isoformat()
})
return {"status": "failed", "errors": [primary_error, fallback_error]}
def get_failed_operations(self):
"""Get list of failed operations for analysis"""
return self.failed_operations
# Example usage
def primary_data_update(tx):
"""Primary method - more complex but preferred"""
# Complex data transformation
tx.object("data/complex_result.json").upload(data='{"method": "complex"}')
return "complex_processing_complete"
def fallback_data_update(tx):
"""Fallback method - simpler but reliable"""
# Simple data update
tx.object("data/simple_result.json").upload(data='{"method": "simple"}')
return "simple_processing_complete"
manager = TransactionManager(branch)
result = manager.execute_with_fallback(
primary_data_update,
fallback_data_update,
"Update data with fallback"
)
print(f"Operation result: {result}")
Transaction Monitoring and Debugging¶
class TransactionMonitor:
"""Monitor transaction performance and behavior"""
def __init__(self):
self.transaction_logs = []
def monitored_transaction(self, branch, commit_message, operation_func,
commit_metadata=None):
"""Execute transaction with monitoring"""
start_time = time.time()
transaction_id = f"tx_{int(start_time)}"
log_entry = {
"transaction_id": transaction_id,
"commit_message": commit_message,
"start_time": start_time,
"metadata": commit_metadata
}
try:
with branch.transact(
commit_message=commit_message,
commit_metadata=commit_metadata
) as tx:
# Log transaction start
print(f"🚀 Starting transaction: {transaction_id}")
# Execute operation
result = operation_func(tx)
# Calculate duration
duration = time.time() - start_time
# Log success
log_entry.update({
"status": "success",
"duration": duration,
"end_time": time.time(),
"result": str(result)[:100] # Truncate long results
})
print(f"✅ Transaction completed: {transaction_id} ({duration:.2f}s)")
return result
except Exception as e:
# Calculate duration
duration = time.time() - start_time
# Log failure
log_entry.update({
"status": "failed",
"duration": duration,
"end_time": time.time(),
"error": str(e)
})
print(f"❌ Transaction failed: {transaction_id} ({duration:.2f}s) - {e}")
raise
finally:
self.transaction_logs.append(log_entry)
def get_transaction_stats(self):
"""Get transaction statistics"""
if not self.transaction_logs:
return {"message": "No transactions recorded"}
successful = [log for log in self.transaction_logs if log["status"] == "success"]
failed = [log for log in self.transaction_logs if log["status"] == "failed"]
avg_duration = sum(log["duration"] for log in self.transaction_logs) / len(self.transaction_logs)
return {
"total_transactions": len(self.transaction_logs),
"successful": len(successful),
"failed": len(failed),
"success_rate": len(successful) / len(self.transaction_logs) * 100,
"average_duration": avg_duration,
"longest_transaction": max(self.transaction_logs, key=lambda x: x["duration"])["duration"],
"shortest_transaction": min(self.transaction_logs, key=lambda x: x["duration"])["duration"]
}
# Usage example
monitor = TransactionMonitor()
def sample_operation(tx):
tx.object("test/file1.txt").upload(data="test content 1")
tx.object("test/file2.txt").upload(data="test content 2")
time.sleep(0.1) # Simulate processing time
return "operation_complete"
# Execute monitored transactions
for i in range(5):
try:
monitor.monitored_transaction(
branch,
f"Test transaction {i+1}",
sample_operation,
{"test_run": str(i+1)}
)
except Exception:
pass # Continue with other transactions
# Get statistics
stats = monitor.get_transaction_stats()
print(f"\n📊 Transaction Statistics:")
for key, value in stats.items():
print(f" {key}: {value}")
Key Points¶
- Atomic operations: All changes in a transaction succeed or fail together
- Ephemeral branches: Transactions use temporary branches for isolation
- No locking: Transactions don't lock the source branch, conflicts are possible
- Context managers: Always use
with
statements for proper cleanup - Error handling: Implement comprehensive error handling and fallback strategies
- Batch sizing: Keep transactions focused and reasonably sized
- Monitoring: Monitor transaction performance and success rates
- Rollback: Failed transactions automatically clean up ephemeral branches
See Also¶
- Repository Management - Creating and managing repositories
- Branch Operations - Version control operations
- Object Operations - Individual object management
- Import Operations - Bulk data operations
- Advanced Features - Advanced patterns and optimization
- Best Practices - Production deployment guidance