Filesystem API Operations¶
Learn how to perform standard filesystem operations using lakefs-spec's filesystem interface.
Basic Setup¶
Initialize Filesystem¶
from lakefs_spec import LakeFSFileSystem
# Auto-discover credentials from ~/.lakectl.yaml or environment
fs = LakeFSFileSystem()
# Or specify credentials explicitly
fs = LakeFSFileSystem(
host="http://localhost:8000",
username="AKIAIOSFODNN7EXAMPLE",
password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)
URI Format¶
lakefs-spec uses the lakefs://
URI scheme:
lakefs://repository/branch/path/to/file
lakefs://my-repo/main/data/file.txt
lakefs://my-repo/feature-branch/datasets/data.parquet
File Operations¶
Writing Files¶
Write Text Files¶
from pathlib import Path
# Write string content directly
fs.write_text("lakefs://my-repo/main/data/sample.txt", "Hello, lakeFS!")
# Write from local file
local_file = Path("local_data.txt")
local_file.write_text("Local content")
fs.put_file("local_data.txt", "lakefs://my-repo/main/data/uploaded.txt")
Write Binary Files¶
# Write binary data
binary_data = b"Binary content here"
fs.write_bytes("lakefs://my-repo/main/data/binary.dat", binary_data)
# Upload local binary file
fs.put_file("image.jpg", "lakefs://my-repo/main/images/uploaded.jpg")
Streaming Write¶
# Write large files with streaming
with fs.open("lakefs://my-repo/main/data/large_file.txt", "w") as f:
for i in range(1000):
f.write(f"Line {i}\n")
Reading Files¶
Read Text Files¶
# Read entire file as string
content = fs.read_text("lakefs://my-repo/main/data/sample.txt")
print(content) # "Hello, lakeFS!"
# Download to local file
fs.get_file("lakefs://my-repo/main/data/sample.txt", "downloaded.txt")
Read Binary Files¶
# Read binary data
binary_content = fs.read_bytes("lakefs://my-repo/main/data/binary.dat")
# Download binary file
fs.get_file("lakefs://my-repo/main/images/uploaded.jpg", "downloaded.jpg")
Streaming Read¶
# Read large files with streaming
with fs.open("lakefs://my-repo/main/data/large_file.txt", "r") as f:
for line in f:
print(line.strip())
File Information¶
Check File Existence¶
# Check if file exists
if fs.exists("lakefs://my-repo/main/data/sample.txt"):
print("File exists")
else:
print("File not found")
Get File Information¶
# Get file info/stats
info = fs.info("lakefs://my-repo/main/data/sample.txt")
print(f"Size: {info['size']} bytes")
print(f"Type: {info['type']}")
print(f"Modified: {info['mtime']}")
# Get detailed stats
stats = fs.stat("lakefs://my-repo/main/data/sample.txt")
print(f"Checksum: {stats.get('checksum')}")
print(f"Content-Type: {stats.get('content_type')}")
Directory Operations¶
Listing Contents¶
List Files and Directories¶
# List all files in a directory
files = fs.ls("lakefs://my-repo/main/data/")
for file_path in files:
print(file_path)
# List with details
files_detailed = fs.ls("lakefs://my-repo/main/data/", detail=True)
for file_info in files_detailed:
print(f"{file_info['name']} - {file_info['size']} bytes")
Recursive Listing¶
# List all files recursively
all_files = fs.find("lakefs://my-repo/main/")
for file_path in all_files:
print(file_path)
# Find specific file types
csv_files = fs.glob("lakefs://my-repo/main/**/*.csv")
for csv_file in csv_files:
print(csv_file)
Directory Tree¶
# Walk directory tree
for root, dirs, files in fs.walk("lakefs://my-repo/main/data/"):
print(f"Directory: {root}")
for file in files:
print(f" File: {file}")
Creating Directories¶
# Create directory (implicit with file creation)
fs.makedirs("lakefs://my-repo/main/new_directory/", exist_ok=True)
# Directories are created automatically when writing files
fs.write_text("lakefs://my-repo/main/new_dir/file.txt", "Content")
File Management¶
Copying Files¶
# Copy within same repository
fs.copy(
"lakefs://my-repo/main/data/source.txt",
"lakefs://my-repo/main/data/copy.txt"
)
# Copy between branches
fs.copy(
"lakefs://my-repo/main/data/file.txt",
"lakefs://my-repo/feature-branch/data/file.txt"
)
# Copy multiple files
source_files = fs.glob("lakefs://my-repo/main/data/*.txt")
for source in source_files:
target = source.replace("/main/", "/backup/")
fs.copy(source, target)
Moving/Renaming Files¶
# Move/rename file
fs.move(
"lakefs://my-repo/main/data/old_name.txt",
"lakefs://my-repo/main/data/new_name.txt"
)
# Move to different directory
fs.move(
"lakefs://my-repo/main/temp/file.txt",
"lakefs://my-repo/main/permanent/file.txt"
)
Deleting Files¶
# Delete single file
fs.rm("lakefs://my-repo/main/data/unwanted.txt")
# Delete multiple files
files_to_delete = [
"lakefs://my-repo/main/temp/file1.txt",
"lakefs://my-repo/main/temp/file2.txt"
]
fs.rm(files_to_delete)
# Delete directory and all contents
fs.rm("lakefs://my-repo/main/temp/", recursive=True)
Advanced Operations¶
Batch Operations¶
def batch_upload(fs, local_dir, remote_base):
"""Upload multiple files efficiently"""
from pathlib import Path
local_path = Path(local_dir)
uploaded_files = []
for local_file in local_path.rglob("*"):
if local_file.is_file():
# Calculate relative path
rel_path = local_file.relative_to(local_path)
remote_path = f"{remote_base}/{rel_path}"
# Upload file
fs.put_file(str(local_file), remote_path)
uploaded_files.append(remote_path)
return uploaded_files
# Usage
uploaded = batch_upload(fs, "local_data/", "lakefs://my-repo/main/uploaded/")
print(f"Uploaded {len(uploaded)} files")
Working with Metadata¶
# Files with custom metadata are handled through the underlying lakeFS API
# lakefs-spec focuses on filesystem operations
# Get file info including lakeFS-specific metadata
info = fs.info("lakefs://my-repo/main/data/file.txt")
if 'lakefs_metadata' in info:
print(f"lakeFS metadata: {info['lakefs_metadata']}")
Error Handling¶
from fsspec.exceptions import FileNotFoundError
def safe_file_operation(fs, path):
"""Safely perform file operations with error handling"""
try:
if fs.exists(path):
content = fs.read_text(path)
return content
else:
print(f"File not found: {path}")
return None
except FileNotFoundError:
print(f"File not found: {path}")
return None
except PermissionError:
print(f"Permission denied: {path}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Usage
content = safe_file_operation(fs, "lakefs://my-repo/main/data/file.txt")
Performance Tips¶
Efficient File Operations¶
# Use batch operations when possible
files_to_upload = ["file1.txt", "file2.txt", "file3.txt"]
remote_paths = [f"lakefs://my-repo/main/data/{f}" for f in files_to_upload]
# Upload in batch
for local, remote in zip(files_to_upload, remote_paths):
fs.put_file(local, remote)
# Use streaming for large files
def stream_large_file(fs, local_path, remote_path):
with open(local_path, 'rb') as local_f:
with fs.open(remote_path, 'wb') as remote_f:
chunk_size = 8192 # 8KB chunks
while True:
chunk = local_f.read(chunk_size)
if not chunk:
break
remote_f.write(chunk)
Caching and Connection Management¶
# lakefs-spec handles connection pooling automatically
# For better performance with many operations, reuse the filesystem instance
class LakeFSManager:
def __init__(self):
self.fs = LakeFSFileSystem()
def upload_dataset(self, local_files, remote_base):
"""Upload multiple files using the same filesystem instance"""
for local_file in local_files:
remote_path = f"{remote_base}/{Path(local_file).name}"
self.fs.put_file(local_file, remote_path)
# Usage
manager = LakeFSManager()
manager.upload_dataset(["data1.csv", "data2.csv"], "lakefs://my-repo/main/datasets/")
Next Steps¶
- Learn about data science integrations
- Explore transaction patterns
- Check the lakefs-spec documentation