Dynamic Streams
Dynamic streams enable manifest-only connectors to generate streams at runtime based on external data sources such as API responses or user configuration. This powerful feature allows connectors to adapt to varying API structures and user requirements without hardcoding every possible stream.
Overview
Dynamic streams are implemented using DynamicDeclarativeStream
with component resolvers that fetch stream configuration values from external sources. Unlike static Stream Templates that use predefined parameter sets, dynamic streams determine their configuration at runtime.
dynamic_streams:
- type: DynamicDeclarativeStream
stream_template:
type: DeclarativeStream
name: "{{ stream_name }}" # Will be populated dynamically
retriever:
# Template configuration with placeholders
components_resolver:
# Resolver that fetches values from external sources
Component Resolvers
Dynamic streams use three types of component resolvers to fetch configuration values:
HttpComponentsResolver
Fetches stream configuration from API responses. This is useful when the API provides metadata about available streams or when stream parameters depend on API responses.
Here's an example from the Google Search Console connector that creates custom report streams:
dynamic_streams:
- type: DynamicDeclarativeStream
stream_template:
type: DeclarativeStream
name: "search_analytics_by_custom_dimensions"
primary_key:
- site_url
- search_type
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://www.googleapis.com/webmasters/v3"
path: "/sites/{{ sanitize_url(stream_partition.get('site_url')) }}/searchAnalytics/query"
http_method: POST
request_body_json:
startDate: "{{ stream_interval.get('start_time') }}"
endDate: "{{ stream_interval.get('end_time') }}"
dimensions: "{{ dimensions }}" # Will be replaced dynamically
type: "{{ stream_partition.get('search_type') }}"
partition_router:
- type: ListPartitionRouter
values: "{{ config['site_urls'] }}"
cursor_field: site_url
- type: ListPartitionRouter
values: ["web", "news", "image", "video"]
cursor_field: search_type
components_resolver:
type: HttpComponentsResolver
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://www.googleapis.com/webmasters/v3"
path: "/custom-reports"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["reports"]
components_mapping:
stream_name: "{{ response.name }}"
dimensions: "{{ response.dimensions }}"
primary_key: "{{ response.key_fields }}"
The HttpComponentsResolver
makes an API call to fetch available custom reports, then uses the response to populate the stream template with the actual report configurations.
ConfigComponentsResolver
Uses values from the user's connector configuration. This allows users to define their own streams through the connector's configuration interface.
Here's an example from the Google Sheets connector that creates streams for user-specified spreadsheets:
dynamic_streams:
- type: DynamicDeclarativeStream
stream_template:
type: DeclarativeStream
name: "{{ stream_name }}"
primary_key: ["_airbyte_row_num"]
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://sheets.googleapis.com/v4/spreadsheets"
path: "/{{ spreadsheet_id }}/values/{{ sheet_name }}"
authenticator: "#/definitions/authenticator"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["values"]
components_resolver:
type: ConfigComponentsResolver
stream_config: "{{ config['spreadsheet_configs'] }}"
components_mapping:
stream_name: "{{ stream_config.name }}"
spreadsheet_id: "{{ stream_config.spreadsheet_id }}"
sheet_name: "{{ stream_config.sheet_name }}"
In this example, users can configure multiple spreadsheets in their connector configuration, and each one becomes a separate stream.
ParametrizedComponentsResolver
While primarily used for static templates, ParametrizedComponentsResolver
can also support dynamic scenarios when the parameter list is generated programmatically. See Stream Templates for detailed examples.
Configuration Mapping
The components_mapping
section defines how values from the resolver's data source map to placeholders in the stream template:
components_mapping:
# Template placeholder: Source value expression
stream_name: "{{ response.report_name }}"
api_endpoint: "{{ response.endpoint_url }}"
primary_key: "{{ response.key_fields }}"
schema_fields: "{{ response.schema.fields }}"
Mapping Expressions
- Use Jinja2 template syntax for dynamic value extraction
- Access resolver data through context variables (
response
,stream_config
, etc.) - Apply transformations and filters as needed
- Handle nested data structures with dot notation
Advanced Patterns
Conditional Stream Generation
Use conditions to control when streams are generated:
components_resolver:
type: ConfigComponentsResolver
stream_config: "{{ config['custom_streams'] }}"
components_mapping:
stream_name: "{{ stream_config.name }}"
enabled: "{{ stream_config.get('enabled', true) }}"
# Only generate streams where enabled is true
Schema Transformation
Transform API schemas to match your stream requirements:
components_mapping:
stream_name: "{{ response.name | lower | replace(' ', '_') }}"
schema_fields: "{{ response.fields | map('extract_field_info') | list }}"
primary_key: "{{ response.key_field | default('id') }}"
Multi-level Nesting
Handle complex nested configurations:
components_mapping:
stream_name: "{{ stream_config.metadata.display_name }}"
api_config:
endpoint: "{{ stream_config.api.endpoint }}"
method: "{{ stream_config.api.method | default('GET') }}"
headers: "{{ stream_config.api.headers | default({}) }}"
Best Practices
Performance Considerations
- Cache resolver responses: Avoid making the same API calls repeatedly
- Limit stream generation: Don't generate excessive numbers of streams
- Use efficient selectors: Optimize record extraction from resolver responses
- Handle rate limits: Implement appropriate delays for resolver API calls
Error Handling
- Validate resolver data: Check that required fields are present
- Provide fallbacks: Use default values when optional fields are missing
- Handle API failures: Gracefully handle resolver API errors
- Log generation issues: Provide clear error messages for debugging
Security
- Validate user input: Sanitize configuration values used in stream generation
- Limit API access: Ensure resolver APIs have appropriate authentication
- Avoid injection: Be careful with dynamic path and parameter construction
Common Use Cases
Custom Reporting APIs
Many APIs allow users to create custom reports with varying schemas:
# Generate streams for user-defined reports
components_resolver:
type: HttpComponentsResolver
# Fetch available custom reports from API
components_mapping:
stream_name: "custom_report_{{ response.id }}"
report_config: "{{ response.configuration }}"
schema_fields: "{{ response.schema }}"
Multi-tenant SaaS APIs
For APIs serving multiple tenants or organizations:
# Generate streams for each accessible tenant
components_resolver:
type: HttpComponentsResolver
# Fetch list of accessible tenants
components_mapping:
stream_name: "{{ response.tenant_name }}_data"
tenant_id: "{{ response.tenant_id }}"
api_base: "{{ response.api_endpoint }}"
User-configured Data Sources
Allow users to specify their own data sources:
# Generate streams for user-specified databases/tables
components_resolver:
type: ConfigComponentsResolver
stream_config: "{{ config['data_sources'] }}"
components_mapping:
stream_name: "{{ stream_config.table_name }}"
connection_string: "{{ stream_config.connection }}"
query: "{{ stream_config.sql_query }}"
Related Documentation
- Stream Templates - Static stream template patterns
- YAML Overview - Understanding the overall YAML structure
- Partition Router - Handling multiple data partitions
- Record Selector - Extracting records from API responses