Examples & Tutorials
This section provides practical examples and tutorials for common SDK usage patterns. Each example is self-contained and ready to run.
More Examples
For additional comprehensive examples and real-world scenarios, see the Examples & Recipes page in our Nexla SDK guides.
Basic Examplesā
List and Explore Resourcesā
List All Flows
from nexla_sdk import NexlaClient
client = NexlaClient()
# Get all flows with basic info
flows = client.flows.list(flows_only=True)
print(f"š Found {len(flows)} flows")
for flow in flows[:5]: # Show first 5
print(f" š {flow.name} (ID: {flow.id})")
print(f" Status: {flow.status}")
print(f" Updated: {flow.updated_time}")
print()
Explore Flow Detailsā
Get Flow Details
# Get detailed information about a specific flow
if flows:
flow_id = flows[0].id
detailed_flow = client.flows.get(flow_id)
print(f"š Flow Details: {detailed_flow.name}")
print(f" Description: {detailed_flow.description or 'No description'}")
print(f" Created: {detailed_flow.created_time}")
print(f" Project: {detailed_flow.project_id}")
print(f" Sources: {len(detailed_flow.sources) if detailed_flow.sources else 0}")
print(f" Destinations: {len(detailed_flow.destinations) if detailed_flow.destinations else 0}")
Working with Data Sourcesā
List Data Sources
# Get all data sources
sources = client.sources.list()
print(f"š„ Found {len(sources)} data sources")
# Group by type
source_types = {}
for source in sources:
source_type = source.type
source_types[source_type] = source_types.get(source_type, 0) + 1
print("\nš Sources by type:")
for source_type, count in source_types.items():
print(f" {source_type}: {count}")
Advanced Examplesā
Resource Summary Dashboardā
Dashboard Example
def show_dashboard():
"""Display a summary dashboard of all resources"""
print("š¢ NEXLA DASHBOARD")
print("=" * 50)
try:
# Get counts for each resource type
flows = client.flows.list(flows_only=True)
sources = client.sources.list()
destinations = client.destinations.list()
projects = client.projects.list()
print(f"š Flows: {len(flows)}")
print(f"š„ Sources: {len(sources)}")
print(f"š¤ Destinations: {len(destinations)}")
print(f"š Projects: {len(projects)}")
# Show flow status breakdown
status_count = {}
for flow in flows:
status = flow.status
status_count[status] = status_count.get(status, 0) + 1
print(f"\nš„ Flow Status:")
for status, count in status_count.items():
print(f" {status.title()}: {count}")
except Exception as e:
print(f"ā Error loading dashboard: {e}")
# Run the dashboard
show_dashboard()
Search Flows by Nameā
Search Example
def find_flows_by_name(search_term):
"""Find flows containing a specific term in their name"""
flows = client.flows.list()
matching_flows = [
flow for flow in flows
if search_term.lower() in flow.name.lower()
]
print(f"š Found {len(matching_flows)} flows matching '{search_term}':")
for flow in matching_flows:
print(f" š {flow.name} (ID: {flow.id})")
return matching_flows
# Example usage
# matching_flows = find_flows_by_name("test")
End-to-End Pipeline Exampleā
S3 ā Nexset ā S3 Pipelineā
Complete Pipeline
from datetime import datetime, timedelta
from nexla_sdk import NexlaClient
from nexla_sdk.models.credentials.requests import CredentialCreate
from nexla_sdk.models.sources.requests import SourceCreate
from nexla_sdk.models.nexsets.requests import NexsetCreate
from nexla_sdk.models.destinations.requests import DestinationCreate
client = NexlaClient(service_key="<SERVICE_KEY>")
# 1. Register reusable credentials for Amazon S3
credential = client.credentials.create(CredentialCreate(
name="Analytics S3",
credentials_type="s3",
credentials={
"access_key_id": "AKIA...",
"secret_access_key": "***",
"region": "us-east-1"
}
))
# 2. Create and activate an S3 source that reads JSON files nightly
source = client.sources.create(SourceCreate(
name="Orders Raw",
source_type="s3",
data_credentials_id=credential.id,
source_config={
"path": "my-ingest-bucket/data/",
"file_format": "json",
"start.cron": "0 2 * * *" # 02:00 UTC daily pull
}
))
client.sources.activate(source.id)
# 3. Discover the detected nexset (dataset) produced by the source
detected = client.nexsets.list()
source_nexsets = [n for n in detected if n.data_source_id == source.id]
if not source_nexsets:
raise RuntimeError("Source has not emitted data yet; rerun after the first poll.")
parent = source_nexsets[0]
# 4. Create a derived nexset with a light transform
transformed = client.nexsets.create(NexsetCreate(
name="Orders Clean",
parent_data_set_id=parent.id,
has_custom_transform=True,
transform={
"version": 1,
"operations": [
{
"operation": "shift",
"spec": {"*": "&"} # passthrough schema
}
]
}
))
# 5. Land the transformed data back to S3 and activate the sink
destination = client.destinations.create(DestinationCreate(
name="Orders S3 Export",
sink_type="s3",
data_set_id=transformed.id,
data_credentials_id=credential.id,
sink_config={
"path": "my-analytics-bucket/exports/orders/",
"data_format": "parquet",
"file_compression": "snappy"
}
))
client.destinations.activate(destination.id)
# 6. Monitor operational health with metrics
date_to = datetime.utcnow().date()
date_from = date_to - timedelta(days=1)
stats = client.metrics.get_resource_daily_metrics(
resource_type="data_sources",
resource_id=source.id,
from_date=str(date_from),
to_date=str(date_to)
)
print(stats.status)
Error Handling Examplesā
Safe API Callsā
Error Handling
from nexla_sdk.exceptions import NexlaAPIError, AuthenticationError
def safe_api_call():
"""Example of proper error handling"""
try:
# This might fail
flows = client.flows.list()
print(f"ā
Successfully retrieved {len(flows)} flows")
except AuthenticationError:
print("ā Authentication failed:")
print(" ⢠Check your NEXLA_SERVICE_KEY")
print(" ⢠Verify the key is active")
print(" ⢠Ensure you have proper permissions")
except NexlaAPIError as e:
print(f"ā API Error (Status {e.status_code}): {e.message}")
print(" ⢠Check your network connection")
print(" ⢠Verify the API endpoint")
except Exception as e:
print(f"ā Unexpected error: {e}")
print(" ⢠Check the documentation")
print(" ⢠Contact support if the issue persists")
safe_api_call()
Next Stepsā
- Operations - Monitoring and troubleshooting
- API Reference - Complete method documentation
- Core Concepts - Understanding Nexla's data model