Skip to main content

Connector Authentication

Different connector types in Nexla may require specific authentication configurations. Understanding connector-specific authentication requirements is essential for setting up secure and reliable data connections.

Overview

Connector authentication is essential for establishing secure connections to external data sources and destinations. This section covers the different types of connectors, their authentication requirements, and how to implement secure connector authentication.

Connector authentication encompasses several key areas that work together to ensure secure and reliable data connections:

  • Connector Types: Different categories of data connectors
  • Authentication Methods: Specific authentication requirements for each connector type
  • Credential Storage: How to store and manage connector-specific credentials
  • Security Considerations: Best practices for connector authentication

Connector Types and Authentication

Different connector types require specific authentication methods and configurations. This section covers the authentication requirements for databases, cloud storage, APIs, and file systems, along with configuration examples for each type.

Database Connectors

Database connectors require specific authentication credentials and connection parameters:

PostgreSQL

PostgreSQL Authentication Configuration
{
"connector_type": "postgres",
"credentials": {
"host": "your-postgres-host.com",
"port": 5432,
"database": "your_database",
"username": "your_username",
"password": "your_password",
"ssl_mode": "require"
},
"connection_options": {
"connect_timeout": 30,
"application_name": "nexla_connector"
}
}

MySQL

MySQL Authentication Configuration
{
"connector_type": "mysql",
"credentials": {
"host": "your-mysql-host.com",
"port": 3306,
"database": "your_database",
"username": "your_username",
"password": "your_password",
"ssl_ca": "/path/to/ca-cert.pem"
},
"connection_options": {
"charset": "utf8mb4",
"autocommit": true
}
}

Snowflake

Snowflake Authentication Configuration
{
"connector_type": "snowflake",
"credentials": {
"account": "your-account.snowflakecomputing.com",
"warehouse": "your_warehouse",
"database": "your_database",
"schema": "your_schema",
"username": "your_username",
"password": "your_password"
},
"connection_options": {
"role": "your_role",
"timezone": "UTC"
}
}

Cloud Storage Connectors

Cloud storage connectors use different authentication mechanisms:

AWS S3

AWS S3 Authentication Configuration
{
"connector_type": "s3",
"credentials": {
"access_key_id": "your_access_key_id",
"secret_access_key": "your_secret_access_key",
"region": "us-east-1"
},
"connection_options": {
"bucket_name": "your-bucket-name",
"encryption": "AES256"
}
}

Google Cloud Storage

Google Cloud Storage Authentication Configuration
{
"connector_type": "gcs",
"credentials": {
"service_account_key": "path/to/service-account-key.json",
"project_id": "your-project-id"
},
"connection_options": {
"bucket_name": "your-bucket-name",
"encryption": "customer-managed"
}
}

Azure Blob Storage

Azure Blob Storage Authentication Configuration
{
"connector_type": "azure_blob",
"credentials": {
"account_name": "your_storage_account",
"account_key": "your_account_key",
"connection_string": "your_connection_string"
},
"connection_options": {
"container_name": "your-container-name",
"encryption_scope": "your-encryption-scope"
}
}

API Connectors

API connectors require various authentication methods:

REST API with Basic Auth

REST API Basic Authentication Configuration
{
"connector_type": "rest",
"credentials": {
"base_url": "https://api.example.com",
"username": "your_username",
"password": "your_password"
},
"connection_options": {
"timeout": 30,
"retry_attempts": 3,
"rate_limit": 100
}
}

REST API with OAuth 2.0

REST API OAuth 2.0 Configuration
{
"connector_type": "rest_oauth2",
"credentials": {
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"authorization_url": "https://auth.example.com/oauth/authorize",
"token_url": "https://auth.example.com/oauth/token",
"redirect_uri": "https://your-app.com/callback"
},
"connection_options": {
"scope": "read write",
"grant_type": "authorization_code"
}
}

REST API with API Key

REST API Key Authentication Configuration
{
"connector_type": "rest_api_key",
"credentials": {
"base_url": "https://api.example.com",
"api_key": "your_api_key",
"api_key_header": "X-API-Key"
},
"connection_options": {
"timeout": 30,
"headers": {
"User-Agent": "Nexla-Connector/1.0"
}
}
}

File System Connectors

File system connectors require specific authentication and access methods:

SFTP

SFTP Authentication Configuration
{
"connector_type": "sftp",
"credentials": {
"host": "your-sftp-host.com",
"port": 22,
"username": "your_username",
"password": "your_password",
"private_key_path": "/path/to/private_key"
},
"connection_options": {
"timeout": 30,
"keepalive_interval": 60
}
}

Google Drive

Google Drive Authentication Configuration
{
"connector_type": "google_drive",
"credentials": {
"service_account_key": "path/to/service-account-key.json",
"scopes": ["https://www.googleapis.com/auth/drive.readonly"]
},
"connection_options": {
"folder_id": "your_folder_id",
"file_types": ["csv", "json", "xlsx"]
}
}

Connector Authentication Flow

The connector authentication flow follows a systematic process to ensure secure and reliable connections. This section walks through the steps from connector selection to ongoing monitoring and maintenance.

1. Select Connector Type

Choose the appropriate connector for your data source or destination:

Python Connector Selection
from enum import Enum
from typing import Dict, Any

class ConnectorType(Enum):
POSTGRES = "postgres"
MYSQL = "mysql"
SNOWFLAKE = "snowflake"
S3 = "s3"
GCS = "gcs"
REST = "rest"
SFTP = "sftp"
GOOGLE_DRIVE = "google_drive"

class ConnectorFactory:
@staticmethod
def create_connector(connector_type: ConnectorType, credentials: Dict[str, Any]):
"""Create connector instance based on type"""
if connector_type == ConnectorType.POSTGRES:
return PostgresConnector(credentials)
elif connector_type == ConnectorType.S3:
return S3Connector(credentials)
elif connector_type == ConnectorType.REST:
return RESTConnector(credentials)
else:
raise ValueError(f"Unsupported connector type: {connector_type}")

2. Configure Credentials

Set up authentication credentials specific to the connector type:

Python Credential Configuration
class CredentialManager:
def __init__(self):
self.encrypted_credentials = {}

def store_credentials(self, connector_id: str, credentials: Dict[str, Any]):
"""Store encrypted credentials for a connector"""
# Encrypt sensitive credential data
encrypted = self._encrypt_credentials(credentials)
self.encrypted_credentials[connector_id] = encrypted

def get_credentials(self, connector_id: str) -> Dict[str, Any]:
"""Retrieve and decrypt credentials for a connector"""
if connector_id not in self.encrypted_credentials:
raise ValueError(f"No credentials found for connector: {connector_id}")

encrypted = self.encrypted_credentials[connector_id]
return self._decrypt_credentials(encrypted)

def _encrypt_credentials(self, credentials: Dict[str, Any]) -> Dict[str, Any]:
"""Encrypt sensitive credential data"""
# Implementation depends on your encryption library
# This is a placeholder for the actual encryption logic
return credentials

def _decrypt_credentials(self, encrypted_credentials: Dict[str, Any]) -> Dict[str, Any]:
"""Decrypt sensitive credential data"""
# Implementation depends on your encryption library
# This is a placeholder for the actual decryption logic
return encrypted_credentials

3. Test Connection

Validate that the connector can authenticate and connect to the external system:

Python Connection Testing
class ConnectionTester:
def __init__(self, connector):
self.connector = connector

def test_connection(self) -> Dict[str, Any]:
"""Test connector connection and authentication"""
try:
# Test basic connectivity
connection_result = self.connector.test_connectivity()

# Test authentication
auth_result = self.connector.test_authentication()

# Test basic operations
operation_result = self.connector.test_operations()

return {
'success': True,
'connectivity': connection_result,
'authentication': auth_result,
'operations': operation_result,
'timestamp': datetime.now().isoformat()
}

except Exception as e:
return {
'success': False,
'error': str(e),
'error_type': type(e).__name__,
'timestamp': datetime.now().isoformat()
}

def validate_credentials(self) -> bool:
"""Validate that credentials are properly formatted"""
try:
credentials = self.connector.get_credentials()
return self.connector.validate_credential_format(credentials)
except Exception:
return False

4. Monitor Authentication

Track authentication status and handle credential expiration:

Python Authentication Monitoring
class AuthenticationMonitor:
def __init__(self):
self.connector_status = {}

def monitor_connector_auth(self, connector_id: str):
"""Monitor authentication status for a connector"""
connector = self._get_connector(connector_id)

try:
# Check if credentials are still valid
if not connector.validate_credentials():
self._handle_credential_expiration(connector_id)
return False

# Test connection
connection_test = connector.test_connection()

if connection_test['success']:
self._update_status(connector_id, 'authenticated', connection_test)
return True
else:
self._update_status(connector_id, 'authentication_failed', connection_test)
return False

except Exception as e:
self._update_status(connector_id, 'error', {'error': str(e)})
return False

def _handle_credential_expiration(self, connector_id: str):
"""Handle expired credentials"""
# Log credential expiration
logging.warning(f"Credentials expired for connector: {connector_id}")

# Attempt credential refresh if possible
if self._can_refresh_credentials(connector_id):
self._refresh_credentials(connector_id)
else:
# Notify administrators
self._notify_credential_expiration(connector_id)

def _update_status(self, connector_id: str, status: str, details: Dict[str, Any]):
"""Update connector authentication status"""
self.connector_status[connector_id] = {
'status': status,
'last_check': datetime.now().isoformat(),
'details': details
}

Security Best Practices

Implementing security best practices is crucial for protecting connector credentials and maintaining secure connections. This section covers credential storage, connection security, and authentication method best practices.

Credential Storage

Secure credential storage is essential for protecting sensitive authentication information:

  • Encryption: Always encrypt credentials at rest
  • Access Control: Limit access to credential storage
  • Rotation: Implement regular credential rotation
  • Audit Logging: Log all credential access and modifications

Connection Security

Connection security measures protect data in transit and ensure secure communication:

  • TLS/SSL: Use encrypted connections whenever possible
  • Network Isolation: Restrict network access to connector endpoints
  • Certificate Validation: Validate SSL certificates
  • Connection Timeouts: Implement appropriate connection timeouts

Authentication Methods

Authentication methods should follow security best practices to minimize risk:

  • Service Accounts: Use dedicated service accounts for connectors
  • Principle of Least Privilege: Grant minimal necessary permissions
  • Multi-Factor Authentication: Enable MFA where supported
  • Regular Review: Periodically review connector permissions

Implementation Examples

Practical implementation examples demonstrate how to create secure and reliable connectors for different data sources. These examples show best practices for authentication, error handling, and connection management.

Database Connector Implementation

Python Database Connector Example
import psycopg2
from typing import Dict, Any, Optional

class PostgresConnector:
def __init__(self, credentials: Dict[str, Any]):
self.credentials = credentials
self.connection = None

def connect(self):
"""Establish database connection"""
try:
self.connection = psycopg2.connect(
host=self.credentials['host'],
port=self.credentials['port'],
database=self.credentials['database'],
user=self.credentials['username'],
password=self.credentials['password'],
sslmode=self.credentials.get('ssl_mode', 'prefer')
)
return True
except Exception as e:
logging.error(f"PostgreSQL connection failed: {e}")
return False

def test_authentication(self) -> bool:
"""Test database authentication"""
try:
if not self.connection or self.connection.closed:
self.connect()

cursor = self.connection.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
cursor.close()

return result[0] == 1
except Exception as e:
logging.error(f"Authentication test failed: {e}")
return False

def execute_query(self, query: str, params: Optional[tuple] = None) -> list:
"""Execute a database query"""
try:
cursor = self.connection.cursor()
cursor.execute(query, params)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
logging.error(f"Query execution failed: {e}")
raise

Cloud Storage Connector Implementation

Python S3 Connector Example
import boto3
from botocore.exceptions import ClientError
from typing import Dict, Any, List

class S3Connector:
def __init__(self, credentials: Dict[str, Any]):
self.credentials = credentials
self.s3_client = None

def connect(self):
"""Initialize S3 client"""
try:
self.s3_client = boto3.client(
's3',
aws_access_key_id=self.credentials['access_key_id'],
aws_secret_access_key=self.credentials['secret_access_key'],
region_name=self.credentials['region']
)
return True
except Exception as e:
logging.error(f"S3 client initialization failed: {e}")
return False

def test_authentication(self) -> bool:
"""Test S3 authentication"""
try:
# Try to list buckets to test authentication
response = self.s3_client.list_buckets()
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'InvalidAccessKeyId':
logging.error("Invalid AWS access key ID")
elif error_code == 'SignatureDoesNotMatch':
logging.error("Invalid AWS secret access key")
else:
logging.error(f"S3 authentication error: {error_code}")
return False

def list_objects(self, bucket: str, prefix: str = "") -> List[str]:
"""List objects in S3 bucket"""
try:
response = self.s3_client.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)

objects = []
if 'Contents' in response:
for obj in response['Contents']:
objects.append(obj['Key'])

return objects
except ClientError as e:
logging.error(f"Failed to list S3 objects: {e}")
raise

API Connector Implementation

Python REST API Connector Example
import requests
from typing import Dict, Any, Optional
import base64

class RESTConnector:
def __init__(self, credentials: Dict[str, Any]):
self.credentials = credentials
self.session = requests.Session()
self._setup_authentication()

def _setup_authentication(self):
"""Setup authentication for the session"""
auth_type = self.credentials.get('auth_type', 'basic')

if auth_type == 'basic':
username = self.credentials['username']
password = self.credentials['password']
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
self.session.headers['Authorization'] = f"Basic {encoded}"

elif auth_type == 'api_key':
api_key = self.credentials['api_key']
header_name = self.credentials.get('api_key_header', 'X-API-Key')
self.session.headers[header_name] = api_key

elif auth_type == 'bearer':
token = self.credentials['token']
self.session.headers['Authorization'] = f"Bearer {token}"

def test_authentication(self) -> bool:
"""Test API authentication"""
try:
# Make a simple request to test authentication
test_endpoint = f"{self.credentials['base_url']}/health"
response = self.session.get(test_endpoint, timeout=10)

if response.status_code == 200:
return True
elif response.status_code == 401:
logging.error("API authentication failed: Unauthorized")
return False
else:
logging.warning(f"Unexpected response: {response.status_code}")
return True # Assume authentication is working

except requests.exceptions.RequestException as e:
logging.error(f"API connection test failed: {e}")
return False

def make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make an authenticated API request"""
url = f"{self.credentials['base_url']}{endpoint}"

try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
logging.error(f"API request failed: {e}")
raise

Troubleshooting

When connector authentication issues arise, systematic troubleshooting helps identify and resolve problems quickly. This section covers common issues, their causes, and solutions to get your connectors working properly.

Common Issues

Connector authentication issues typically fall into these categories. Understanding the problems, solutions, and prevention strategies helps you maintain reliable connections and quickly resolve issues when they occur.

Database Connection Failures

  • Problem: Connection timeout or refused
  • Solution: Verify network connectivity and firewall rules
  • Prevention: Use connection pooling and implement retry logic

Cloud Storage Authentication Errors

  • Problem: Invalid credentials or expired tokens
  • Solution: Verify credentials and refresh if necessary
  • Prevention: Implement automatic credential rotation

API Rate Limiting

  • Problem: 429 Too Many Requests errors
  • Solution: Implement client-side rate limiting
  • Prevention: Monitor API usage and optimize request patterns

Debugging Tips

These debugging steps help you systematically identify and resolve connector authentication issues:

  1. Check Credentials: Verify credentials are correct and not expired
  2. Test Connectivity: Ensure network connectivity to external systems
  3. Review Logs: Check connector logs for detailed error information
  4. Validate Configuration: Verify connector configuration parameters
  5. Test Manually: Test connections manually to isolate issues

Next Steps

For comprehensive authentication information, return to the Authentication Overview.