Skip to main content

Examples & Tutorials

This section provides practical examples and tutorials for common SDK usage patterns. Each example is self-contained and ready to run.

More Examples

For additional comprehensive examples and real-world scenarios, see the Examples & Recipes page in our Nexla SDK guides.

Basic Examples​

List and Explore Resources​

List All Flows
from nexla_sdk import NexlaClient

client = NexlaClient()

# Get all flows with basic info
flows = client.flows.list(flows_only=True)
print(f"šŸ“Š Found {len(flows)} flows")

for flow in flows[:5]: # Show first 5
print(f" šŸ”„ {flow.name} (ID: {flow.id})")
print(f" Status: {flow.status}")
print(f" Updated: {flow.updated_time}")
print()

Explore Flow Details​

Get Flow Details
# Get detailed information about a specific flow
if flows:
flow_id = flows[0].id
detailed_flow = client.flows.get(flow_id)

print(f"šŸ” Flow Details: {detailed_flow.name}")
print(f" Description: {detailed_flow.description or 'No description'}")
print(f" Created: {detailed_flow.created_time}")
print(f" Project: {detailed_flow.project_id}")
print(f" Sources: {len(detailed_flow.sources) if detailed_flow.sources else 0}")
print(f" Destinations: {len(detailed_flow.destinations) if detailed_flow.destinations else 0}")

Working with Data Sources​

List Data Sources
# Get all data sources
sources = client.sources.list()
print(f"šŸ“„ Found {len(sources)} data sources")

# Group by type
source_types = {}
for source in sources:
source_type = source.type
source_types[source_type] = source_types.get(source_type, 0) + 1

print("\nšŸ“Š Sources by type:")
for source_type, count in source_types.items():
print(f" {source_type}: {count}")

Advanced Examples​

Resource Summary Dashboard​

Dashboard Example
def show_dashboard():
"""Display a summary dashboard of all resources"""
print("šŸ¢ NEXLA DASHBOARD")
print("=" * 50)

try:
# Get counts for each resource type
flows = client.flows.list(flows_only=True)
sources = client.sources.list()
destinations = client.destinations.list()
projects = client.projects.list()

print(f"šŸ“Š Flows: {len(flows)}")
print(f"šŸ“„ Sources: {len(sources)}")
print(f"šŸ“¤ Destinations: {len(destinations)}")
print(f"šŸ“ Projects: {len(projects)}")

# Show flow status breakdown
status_count = {}
for flow in flows:
status = flow.status
status_count[status] = status_count.get(status, 0) + 1

print(f"\nšŸ„ Flow Status:")
for status, count in status_count.items():
print(f" {status.title()}: {count}")

except Exception as e:
print(f"āŒ Error loading dashboard: {e}")

# Run the dashboard
show_dashboard()

Search Flows by Name​

Search Example
def find_flows_by_name(search_term):
"""Find flows containing a specific term in their name"""
flows = client.flows.list()
matching_flows = [
flow for flow in flows
if search_term.lower() in flow.name.lower()
]

print(f"šŸ” Found {len(matching_flows)} flows matching '{search_term}':")
for flow in matching_flows:
print(f" šŸ”„ {flow.name} (ID: {flow.id})")

return matching_flows

# Example usage
# matching_flows = find_flows_by_name("test")

End-to-End Pipeline Example​

S3 → Nexset → S3 Pipeline​

Complete Pipeline
from datetime import datetime, timedelta
from nexla_sdk import NexlaClient
from nexla_sdk.models.credentials.requests import CredentialCreate
from nexla_sdk.models.sources.requests import SourceCreate
from nexla_sdk.models.nexsets.requests import NexsetCreate
from nexla_sdk.models.destinations.requests import DestinationCreate

client = NexlaClient(service_key="<SERVICE_KEY>")

# 1. Register reusable credentials for Amazon S3
credential = client.credentials.create(CredentialCreate(
name="Analytics S3",
credentials_type="s3",
credentials={
"access_key_id": "AKIA...",
"secret_access_key": "***",
"region": "us-east-1"
}
))

# 2. Create and activate an S3 source that reads JSON files nightly
source = client.sources.create(SourceCreate(
name="Orders Raw",
source_type="s3",
data_credentials_id=credential.id,
source_config={
"path": "my-ingest-bucket/data/",
"file_format": "json",
"start.cron": "0 2 * * *" # 02:00 UTC daily pull
}
))
client.sources.activate(source.id)

# 3. Discover the detected nexset (dataset) produced by the source
detected = client.nexsets.list()
source_nexsets = [n for n in detected if n.data_source_id == source.id]
if not source_nexsets:
raise RuntimeError("Source has not emitted data yet; rerun after the first poll.")

parent = source_nexsets[0]

# 4. Create a derived nexset with a light transform
transformed = client.nexsets.create(NexsetCreate(
name="Orders Clean",
parent_data_set_id=parent.id,
has_custom_transform=True,
transform={
"version": 1,
"operations": [
{
"operation": "shift",
"spec": {"*": "&"} # passthrough schema
}
]
}
))

# 5. Land the transformed data back to S3 and activate the sink
destination = client.destinations.create(DestinationCreate(
name="Orders S3 Export",
sink_type="s3",
data_set_id=transformed.id,
data_credentials_id=credential.id,
sink_config={
"path": "my-analytics-bucket/exports/orders/",
"data_format": "parquet",
"file_compression": "snappy"
}
))
client.destinations.activate(destination.id)

# 6. Monitor operational health with metrics
date_to = datetime.utcnow().date()
date_from = date_to - timedelta(days=1)
stats = client.metrics.get_resource_daily_metrics(
resource_type="data_sources",
resource_id=source.id,
from_date=str(date_from),
to_date=str(date_to)
)
print(stats.status)

Error Handling Examples​

Safe API Calls​

Error Handling
from nexla_sdk.exceptions import NexlaAPIError, AuthenticationError

def safe_api_call():
"""Example of proper error handling"""
try:
# This might fail
flows = client.flows.list()
print(f"āœ… Successfully retrieved {len(flows)} flows")

except AuthenticationError:
print("āŒ Authentication failed:")
print(" • Check your NEXLA_SERVICE_KEY")
print(" • Verify the key is active")
print(" • Ensure you have proper permissions")

except NexlaAPIError as e:
print(f"āŒ API Error (Status {e.status_code}): {e.message}")
print(" • Check your network connection")
print(" • Verify the API endpoint")

except Exception as e:
print(f"āŒ Unexpected error: {e}")
print(" • Check the documentation")
print(" • Contact support if the issue persists")

safe_api_call()

Next Steps​