Connector Authentication
Different connector types in Nexla may require specific authentication configurations. Understanding connector-specific authentication requirements is essential for setting up secure and reliable data connections.
Overview
Connector authentication is essential for establishing secure connections to external data sources and destinations. This section covers the different types of connectors, their authentication requirements, and how to implement secure connector authentication.
Connector authentication encompasses several key areas that work together to ensure secure and reliable data connections:
- Connector Types: Different categories of data connectors
- Authentication Methods: Specific authentication requirements for each connector type
- Credential Storage: How to store and manage connector-specific credentials
- Security Considerations: Best practices for connector authentication
Connector Types and Authentication
Different connector types require specific authentication methods and configurations. This section covers the authentication requirements for databases, cloud storage, APIs, and file systems, along with configuration examples for each type.
Database Connectors
Database connectors require specific authentication credentials and connection parameters:
PostgreSQL
{
"connector_type": "postgres",
"credentials": {
"host": "your-postgres-host.com",
"port": 5432,
"database": "your_database",
"username": "your_username",
"password": "your_password",
"ssl_mode": "require"
},
"connection_options": {
"connect_timeout": 30,
"application_name": "nexla_connector"
}
}
MySQL
{
"connector_type": "mysql",
"credentials": {
"host": "your-mysql-host.com",
"port": 3306,
"database": "your_database",
"username": "your_username",
"password": "your_password",
"ssl_ca": "/path/to/ca-cert.pem"
},
"connection_options": {
"charset": "utf8mb4",
"autocommit": true
}
}
Snowflake
{
"connector_type": "snowflake",
"credentials": {
"account": "your-account.snowflakecomputing.com",
"warehouse": "your_warehouse",
"database": "your_database",
"schema": "your_schema",
"username": "your_username",
"password": "your_password"
},
"connection_options": {
"role": "your_role",
"timezone": "UTC"
}
}
Cloud Storage Connectors
Cloud storage connectors use different authentication mechanisms:
AWS S3
{
"connector_type": "s3",
"credentials": {
"access_key_id": "your_access_key_id",
"secret_access_key": "your_secret_access_key",
"region": "us-east-1"
},
"connection_options": {
"bucket_name": "your-bucket-name",
"encryption": "AES256"
}
}
Google Cloud Storage
{
"connector_type": "gcs",
"credentials": {
"service_account_key": "path/to/service-account-key.json",
"project_id": "your-project-id"
},
"connection_options": {
"bucket_name": "your-bucket-name",
"encryption": "customer-managed"
}
}
Azure Blob Storage
{
"connector_type": "azure_blob",
"credentials": {
"account_name": "your_storage_account",
"account_key": "your_account_key",
"connection_string": "your_connection_string"
},
"connection_options": {
"container_name": "your-container-name",
"encryption_scope": "your-encryption-scope"
}
}
API Connectors
API connectors require various authentication methods:
REST API with Basic Auth
{
"connector_type": "rest",
"credentials": {
"base_url": "https://api.example.com",
"username": "your_username",
"password": "your_password"
},
"connection_options": {
"timeout": 30,
"retry_attempts": 3,
"rate_limit": 100
}
}
REST API with OAuth 2.0
{
"connector_type": "rest_oauth2",
"credentials": {
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"authorization_url": "https://auth.example.com/oauth/authorize",
"token_url": "https://auth.example.com/oauth/token",
"redirect_uri": "https://your-app.com/callback"
},
"connection_options": {
"scope": "read write",
"grant_type": "authorization_code"
}
}
REST API with API Key
{
"connector_type": "rest_api_key",
"credentials": {
"base_url": "https://api.example.com",
"api_key": "your_api_key",
"api_key_header": "X-API-Key"
},
"connection_options": {
"timeout": 30,
"headers": {
"User-Agent": "Nexla-Connector/1.0"
}
}
}
File System Connectors
File system connectors require specific authentication and access methods:
SFTP
{
"connector_type": "sftp",
"credentials": {
"host": "your-sftp-host.com",
"port": 22,
"username": "your_username",
"password": "your_password",
"private_key_path": "/path/to/private_key"
},
"connection_options": {
"timeout": 30,
"keepalive_interval": 60
}
}
Google Drive
{
"connector_type": "google_drive",
"credentials": {
"service_account_key": "path/to/service-account-key.json",
"scopes": ["https://www.googleapis.com/auth/drive.readonly"]
},
"connection_options": {
"folder_id": "your_folder_id",
"file_types": ["csv", "json", "xlsx"]
}
}
Connector Authentication Flow
The connector authentication flow follows a systematic process to ensure secure and reliable connections. This section walks through the steps from connector selection to ongoing monitoring and maintenance.
1. Select Connector Type
Choose the appropriate connector for your data source or destination:
from enum import Enum
from typing import Dict, Any
class ConnectorType(Enum):
POSTGRES = "postgres"
MYSQL = "mysql"
SNOWFLAKE = "snowflake"
S3 = "s3"
GCS = "gcs"
REST = "rest"
SFTP = "sftp"
GOOGLE_DRIVE = "google_drive"
class ConnectorFactory:
@staticmethod
def create_connector(connector_type: ConnectorType, credentials: Dict[str, Any]):
"""Create connector instance based on type"""
if connector_type == ConnectorType.POSTGRES:
return PostgresConnector(credentials)
elif connector_type == ConnectorType.S3:
return S3Connector(credentials)
elif connector_type == ConnectorType.REST:
return RESTConnector(credentials)
else:
raise ValueError(f"Unsupported connector type: {connector_type}")
2. Configure Credentials
Set up authentication credentials specific to the connector type:
class CredentialManager:
def __init__(self):
self.encrypted_credentials = {}
def store_credentials(self, connector_id: str, credentials: Dict[str, Any]):
"""Store encrypted credentials for a connector"""
# Encrypt sensitive credential data
encrypted = self._encrypt_credentials(credentials)
self.encrypted_credentials[connector_id] = encrypted
def get_credentials(self, connector_id: str) -> Dict[str, Any]:
"""Retrieve and decrypt credentials for a connector"""
if connector_id not in self.encrypted_credentials:
raise ValueError(f"No credentials found for connector: {connector_id}")
encrypted = self.encrypted_credentials[connector_id]
return self._decrypt_credentials(encrypted)
def _encrypt_credentials(self, credentials: Dict[str, Any]) -> Dict[str, Any]:
"""Encrypt sensitive credential data"""
# Implementation depends on your encryption library
# This is a placeholder for the actual encryption logic
return credentials
def _decrypt_credentials(self, encrypted_credentials: Dict[str, Any]) -> Dict[str, Any]:
"""Decrypt sensitive credential data"""
# Implementation depends on your encryption library
# This is a placeholder for the actual decryption logic
return encrypted_credentials
3. Test Connection
Validate that the connector can authenticate and connect to the external system:
class ConnectionTester:
def __init__(self, connector):
self.connector = connector
def test_connection(self) -> Dict[str, Any]:
"""Test connector connection and authentication"""
try:
# Test basic connectivity
connection_result = self.connector.test_connectivity()
# Test authentication
auth_result = self.connector.test_authentication()
# Test basic operations
operation_result = self.connector.test_operations()
return {
'success': True,
'connectivity': connection_result,
'authentication': auth_result,
'operations': operation_result,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'success': False,
'error': str(e),
'error_type': type(e).__name__,
'timestamp': datetime.now().isoformat()
}
def validate_credentials(self) -> bool:
"""Validate that credentials are properly formatted"""
try:
credentials = self.connector.get_credentials()
return self.connector.validate_credential_format(credentials)
except Exception:
return False
4. Monitor Authentication
Track authentication status and handle credential expiration:
class AuthenticationMonitor:
def __init__(self):
self.connector_status = {}
def monitor_connector_auth(self, connector_id: str):
"""Monitor authentication status for a connector"""
connector = self._get_connector(connector_id)
try:
# Check if credentials are still valid
if not connector.validate_credentials():
self._handle_credential_expiration(connector_id)
return False
# Test connection
connection_test = connector.test_connection()
if connection_test['success']:
self._update_status(connector_id, 'authenticated', connection_test)
return True
else:
self._update_status(connector_id, 'authentication_failed', connection_test)
return False
except Exception as e:
self._update_status(connector_id, 'error', {'error': str(e)})
return False
def _handle_credential_expiration(self, connector_id: str):
"""Handle expired credentials"""
# Log credential expiration
logging.warning(f"Credentials expired for connector: {connector_id}")
# Attempt credential refresh if possible
if self._can_refresh_credentials(connector_id):
self._refresh_credentials(connector_id)
else:
# Notify administrators
self._notify_credential_expiration(connector_id)
def _update_status(self, connector_id: str, status: str, details: Dict[str, Any]):
"""Update connector authentication status"""
self.connector_status[connector_id] = {
'status': status,
'last_check': datetime.now().isoformat(),
'details': details
}
Security Best Practices
Implementing security best practices is crucial for protecting connector credentials and maintaining secure connections. This section covers credential storage, connection security, and authentication method best practices.
Credential Storage
Secure credential storage is essential for protecting sensitive authentication information:
- Encryption: Always encrypt credentials at rest
- Access Control: Limit access to credential storage
- Rotation: Implement regular credential rotation
- Audit Logging: Log all credential access and modifications
Connection Security
Connection security measures protect data in transit and ensure secure communication:
- TLS/SSL: Use encrypted connections whenever possible
- Network Isolation: Restrict network access to connector endpoints
- Certificate Validation: Validate SSL certificates
- Connection Timeouts: Implement appropriate connection timeouts
Authentication Methods
Authentication methods should follow security best practices to minimize risk:
- Service Accounts: Use dedicated service accounts for connectors
- Principle of Least Privilege: Grant minimal necessary permissions
- Multi-Factor Authentication: Enable MFA where supported
- Regular Review: Periodically review connector permissions
Implementation Examples
Practical implementation examples demonstrate how to create secure and reliable connectors for different data sources. These examples show best practices for authentication, error handling, and connection management.
Database Connector Implementation
import psycopg2
from typing import Dict, Any, Optional
class PostgresConnector:
def __init__(self, credentials: Dict[str, Any]):
self.credentials = credentials
self.connection = None
def connect(self):
"""Establish database connection"""
try:
self.connection = psycopg2.connect(
host=self.credentials['host'],
port=self.credentials['port'],
database=self.credentials['database'],
user=self.credentials['username'],
password=self.credentials['password'],
sslmode=self.credentials.get('ssl_mode', 'prefer')
)
return True
except Exception as e:
logging.error(f"PostgreSQL connection failed: {e}")
return False
def test_authentication(self) -> bool:
"""Test database authentication"""
try:
if not self.connection or self.connection.closed:
self.connect()
cursor = self.connection.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
cursor.close()
return result[0] == 1
except Exception as e:
logging.error(f"Authentication test failed: {e}")
return False
def execute_query(self, query: str, params: Optional[tuple] = None) -> list:
"""Execute a database query"""
try:
cursor = self.connection.cursor()
cursor.execute(query, params)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
logging.error(f"Query execution failed: {e}")
raise
Cloud Storage Connector Implementation
import boto3
from botocore.exceptions import ClientError
from typing import Dict, Any, List
class S3Connector:
def __init__(self, credentials: Dict[str, Any]):
self.credentials = credentials
self.s3_client = None
def connect(self):
"""Initialize S3 client"""
try:
self.s3_client = boto3.client(
's3',
aws_access_key_id=self.credentials['access_key_id'],
aws_secret_access_key=self.credentials['secret_access_key'],
region_name=self.credentials['region']
)
return True
except Exception as e:
logging.error(f"S3 client initialization failed: {e}")
return False
def test_authentication(self) -> bool:
"""Test S3 authentication"""
try:
# Try to list buckets to test authentication
response = self.s3_client.list_buckets()
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'InvalidAccessKeyId':
logging.error("Invalid AWS access key ID")
elif error_code == 'SignatureDoesNotMatch':
logging.error("Invalid AWS secret access key")
else:
logging.error(f"S3 authentication error: {error_code}")
return False
def list_objects(self, bucket: str, prefix: str = "") -> List[str]:
"""List objects in S3 bucket"""
try:
response = self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)
objects = []
if 'Contents' in response:
for obj in response['Contents']:
objects.append(obj['Key'])
return objects
except ClientError as e:
logging.error(f"Failed to list S3 objects: {e}")
raise
API Connector Implementation
import requests
from typing import Dict, Any, Optional
import base64
class RESTConnector:
def __init__(self, credentials: Dict[str, Any]):
self.credentials = credentials
self.session = requests.Session()
self._setup_authentication()
def _setup_authentication(self):
"""Setup authentication for the session"""
auth_type = self.credentials.get('auth_type', 'basic')
if auth_type == 'basic':
username = self.credentials['username']
password = self.credentials['password']
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
self.session.headers['Authorization'] = f"Basic {encoded}"
elif auth_type == 'api_key':
api_key = self.credentials['api_key']
header_name = self.credentials.get('api_key_header', 'X-API-Key')
self.session.headers[header_name] = api_key
elif auth_type == 'bearer':
token = self.credentials['token']
self.session.headers['Authorization'] = f"Bearer {token}"
def test_authentication(self) -> bool:
"""Test API authentication"""
try:
# Make a simple request to test authentication
test_endpoint = f"{self.credentials['base_url']}/health"
response = self.session.get(test_endpoint, timeout=10)
if response.status_code == 200:
return True
elif response.status_code == 401:
logging.error("API authentication failed: Unauthorized")
return False
else:
logging.warning(f"Unexpected response: {response.status_code}")
return True # Assume authentication is working
except requests.exceptions.RequestException as e:
logging.error(f"API connection test failed: {e}")
return False
def make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make an authenticated API request"""
url = f"{self.credentials['base_url']}{endpoint}"
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
logging.error(f"API request failed: {e}")
raise
Troubleshooting
When connector authentication issues arise, systematic troubleshooting helps identify and resolve problems quickly. This section covers common issues, their causes, and solutions to get your connectors working properly.
Common Issues
Connector authentication issues typically fall into these categories. Understanding the problems, solutions, and prevention strategies helps you maintain reliable connections and quickly resolve issues when they occur.
Database Connection Failures
- Problem: Connection timeout or refused
- Solution: Verify network connectivity and firewall rules
- Prevention: Use connection pooling and implement retry logic
Cloud Storage Authentication Errors
- Problem: Invalid credentials or expired tokens
- Solution: Verify credentials and refresh if necessary
- Prevention: Implement automatic credential rotation
API Rate Limiting
- Problem: 429 Too Many Requests errors
- Solution: Implement client-side rate limiting
- Prevention: Monitor API usage and optimize request patterns
Debugging Tips
These debugging steps help you systematically identify and resolve connector authentication issues:
- Check Credentials: Verify credentials are correct and not expired
- Test Connectivity: Ensure network connectivity to external systems
- Review Logs: Check connector logs for detailed error information
- Validate Configuration: Verify connector configuration parameters
- Test Manually: Test connections manually to isolate issues
Next Steps
- Security Best Practices: Follow comprehensive security guidelines
- Error Handling: Learn about authentication error handling
- Rate Limiting: Understand rate limiting and throttling
- Advanced Features: Explore organization context and impersonation
For comprehensive authentication information, return to the Authentication Overview.