Skip to main content

Control a Data Flow

The flows API provides comprehensive control over data flows, allowing you to activate, pause, and manage flow states from any point in the flow. All control operations operate on the specific resource given and all downstream flow resources, with options to control the entire flow.

Flow Control Endpoints

The flows API provides multiple endpoints for controlling flows, allowing you to control from any resource in the flow:

Activate Flow

Activate all components of a flow from a specific resource downstream, or the entire flow.

Activate Flow: Request
# Activate from flow node
PUT /flows/{flow-node-id}/activate

# Activate from data source
PUT /data_sources/{data-source-id}/flow/activate

# Activate from data set
PUT /data_sets/{data-set-id}/flow/activate

# Activate from data sink
PUT /data_sinks/{data-sink-id}/flow/activate

Pause Flow

Pause all components of a flow from a specific resource downstream, or the entire flow.

Pause Flow: Request
# Pause from flow node
PUT /flows/{flow-node-id}/pause

# Pause from data source
PUT /data_sources/{data-source-id}/flow/pause

# Pause from data set
PUT /data_sets/{data-set-id}/flow/pause

# Pause from data sink
PUT /data_sinks/{data-sink-id}/flow/pause

Control Options

Flow control operations offer two levels of scope. Understanding these options helps you control flows precisely without affecting more resources than necessary.

Control Downstream Only (Default)

By default, flow control operations affect only the specified resource and all downstream resources:

PUT /data_sets/{data-set-id}/flow/activate
PUT /flows/{flow-node-id}/pause

This will control:

  • The specified resource
  • All downstream data sets
  • All downstream data sinks
  • Associated transforms and data maps

Control Entire Flow

To control the entire flow including upstream resources, include the ?all=1 or ?full_tree=1 query parameter:

PUT /data_sets/{data-set-id}/flow/activate?all=1
PUT /flows/{flow-node-id}/pause?full_tree=1

This will control:

  • The entire flow from the origin data source
  • All data sets in the flow
  • All data sinks in the flow
  • Associated transforms and data maps

Control Examples

These examples demonstrate how to use the flow control endpoints in different scenarios, showing both basic and advanced usage patterns.

Example 1: Activate from Data Source

Activate all flows originating from a specific data source:

PUT /data_sources/5023/flow/activate

Example 2: Pause from Data Set

Pause a data set and all downstream resources:

PUT /data_sets/5061/flow/pause

Example 3: Activate Entire Flow

Activate the entire flow from a downstream resource:

PUT /data_sets/5061/flow/activate?all=1

Example 4: Control from Flow Node

Control a flow starting from a specific flow node:

PUT /flows/10001/activate
PUT /flows/10001/pause

Control Response

Flow control operations provide detailed responses that help you understand the current state of your flows and any issues that need attention.

Successful Activation

Activate Flow: Response
{
"flows": [
{
"id": 10001,
"flow_node_id": 10001,
"origin_node_id": 10001,
"flow_type": "streaming",
"status": "ACTIVE",
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z",
"data_source": {
"id": 5001,
"name": "Example Data Source",
"status": "ACTIVE"
},
"data_sinks": [],
"sharers": {
"sharers": [],
"external_sharers": []
},
"children": [
{
"id": 10002,
"flow_node_id": 10002,
"parent_flow_node_id": 10001,
"status": "ACTIVE",
"data_sinks": [
{
"id": 6001,
"name": "Example Data Sink",
"status": "ACTIVE"
}
],
"sharers": {
"sharers": [],
"external_sharers": []
},
"children": []
}
]
}
],
"data_sources": [
{
"id": 5001,
"owner_id": 2,
"org_id": 1,
"name": "Example Data Source",
"status": "ACTIVE",
"description": "Example data source for demonstration",
"connector_type": "s3",
"tags": [],
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z",
"data_credentials": [5008]
}
],
"data_sets": [
{
"id": 5001,
"owner_id": 2,
"org_id": 1,
"parent_data_set_id": null,
"data_source_id": 5001,
"name": "Example Data Set",
"description": "Example data set for demonstration",
"status": "ACTIVE",
"tags": [],
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z"
}
],
"data_sinks": [
{
"id": 6001,
"owner_id": 2,
"org_id": 1,
"name": "Example Data Sink",
"status": "ACTIVE",
"description": "Example data sink for demonstration",
"sink_type": "s3",
"tags": [],
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z",
"data_credentials": [5008]
}
],
"data_credentials": [
{
"id": 5008,
"owner_id": 2,
"org_id": 1,
"name": "Example Credentials",
"description": "Example credentials for demonstration",
"credentials_type": "s3",
"verified_status": "200 Ok",
"tags": [],
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z"
}
]
}

Successful Pause

Pause Flow: Response
{
"flows": [
{
"id": 10001,
"flow_node_id": 10001,
"origin_node_id": 10001,
"flow_type": "streaming",
"status": "PAUSED",
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z",
"data_source": {
"id": 5001,
"name": "Example Data Source",
"status": "PAUSED"
},
"data_sinks": [],
"sharers": {
"sharers": [],
"external_sharers": []
},
"children": [
{
"id": 10002,
"flow_node_id": 10002,
"parent_flow_node_id": 10001,
"status": "PAUSED",
"data_sinks": [
{
"id": 6001,
"name": "Example Data Sink",
"status": "PAUSED"
}
],
"sharers": {
"sharers": [],
"external_sharers": []
},
"children": []
}
]
}
],
"data_sources": [
{
"id": 5001,
"owner_id": 2,
"org_id": 1,
"name": "Example Data Source",
"status": "PAUSED",
"description": "Example data source for demonstration",
"connector_type": "s3",
"tags": [],
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z",
"data_credentials": [5008]
}
],
"data_sets": [
{
"id": 5001,
"owner_id": 2,
"org_id": 1,
"parent_data_set_id": null,
"data_source_id": 5001,
"name": "Example Data Set",
"description": "Example data set for demonstration",
"status": "PAUSED",
"tags": [],
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z"
}
],
"data_sinks": [
{
"id": 6001,
"owner_id": 2,
"org_id": 1,
"name": "Example Data Sink",
"status": "PAUSED",
"description": "Example data sink for demonstration",
"sink_type": "s3",
"tags": [],
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z",
"data_credentials": [5008]
}
],
"data_credentials": [
{
"id": 5008,
"owner_id": 2,
"org_id": 1,
"name": "Example Credentials",
"description": "Example credentials for demonstration",
"credentials_type": "s3",
"verified_status": "200 Ok",
"tags": [],
"created_at": "2023-01-15T10:30:00.000Z",
"updated_at": "2023-01-15T10:30:00.000Z"
}
]
}

Control Behavior

Activation Behavior

When activating a flow:

  1. Resource Validation: All resources in the flow are validated
  2. Dependency Check: Dependencies between resources are verified
  3. State Transition: Resources transition from PAUSED to ACTIVE state
  4. Flow Execution: Data processing begins according to the flow configuration

Pause Behavior

When pausing a flow:

  1. Graceful Shutdown: Resources are paused gracefully to avoid data loss
  2. State Transition: Resources transition from ACTIVE to PAUSED state
  3. Flow Halt: Data processing stops at the specified point
  4. Resource Cleanup: Temporary resources and connections are cleaned up

Control Scenarios

Scenario 1: Selective Flow Control

Control only a portion of a flow while leaving other parts running:

# Pause only downstream from a specific data set
PUT /data_sets/5061/flow/pause

# Activate only downstream from a specific data set
PUT /data_sets/5061/flow/activate

Scenario 2: Full Flow Control

Control the entire flow from any point:

# Pause entire flow from a downstream resource
PUT /data_sets/5061/flow/pause?all=1

# Activate entire flow from a downstream resource
PUT /data_sets/5061/flow/activate?all=1

Scenario 3: Origin-Based Control

Control flows from their origin data source:

# Activate all flows from a data source
PUT /data_sources/5023/flow/activate

# Pause all flows from a data source
PUT /data_sources/5023/flow/pause

Best Practices

  1. Use Selective Control: Start with downstream-only control unless you need to control the entire flow
  2. Monitor State Changes: Verify that resources transition to the expected state
  3. Handle Errors Gracefully: Implement proper error handling for control operations
  4. Test Control Workflows: Test control operations in non-production environments first
  5. Document Control Points: Document which resources are used as control points for your flows

Legacy Endpoints

Note: The legacy /data_flows endpoints are still available for backward compatibility but are deprecated:

  • PUT /data_flows/data_source/{data_source_id}/activate
  • PUT /data_flows/data_source/{data_source_id}/pause
  • PUT /data_flows/{data_set_id}/activate
  • PUT /data_flows/{data_set_id}/pause
  • PUT /data_flows/data_sink/{data_sink_id}/activate
  • PUT /data_flows/data_sink/{data_sink_id}/pause

It's recommended to use the new /flows endpoints for all new development.