Documentation Index
Fetch the complete documentation index at: https://platform.docs.zenoo.com/llms.txt
Use this file to discover all available pages before exploring further.
Connectors
Connectors enable Hub to interact with external systems, APIs, and services. Hub provides both built-in connectors (like HTTP) and a powerful framework for developing custom connectors.
Table of Contents
Getting Started
Custom Connector Development
Plugin Development
Configuration & Registration
Advanced Topics
Connector Types
Hub supports three types of connectors:
1. Built-in Connectors
2. Custom Connectors
- Embedded Connectors: Spring-managed beans within your Hub application
- Plugin Connectors: OSGI bundles for modular, hot-deployable connectors
3. Integration Patterns
- Wrapper Connectors: Custom connectors that use HTTP connector internally
- Protocol Connectors: For non-HTTP protocols (FTP, SFTP, message queues, etc.)
- Service Connectors: Integration with specific external services
Basic Usage
Connectors are used within exchanges in the DSL:
HTTP Connector
exchange('api-call') {
http {
url 'https://api.example.com/data'
method 'GET'
}
}
Custom Connector
exchange('sms-send') {
connector 'sms'
config mobile: user.phone, template: 'welcome', params: [name: user.name]
}
Connector with Complex Configuration
exchange('external-validation') {
connector 'document-validator'
config {
document: upload.idFront
validationType: 'identity'
options: {
strictMode: true
returnDetails: false
}
}
timeout 30
fixedBackoffRetry {
retry 3
backoff 5
}
}
Quick Start
1. Simple Custom Connector
Create a basic custom connector:
@Component
class GreetingConnector extends CustomConnectorSupport {
@Override
String type() {
return 'greeting'
}
@Override
PayloadSpec config() {
return PayloadSpec.of {
name
language { optional() }
}
}
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
String name = config.name
String language = config.language ?: 'en'
String greeting = language == 'es' ? "¡Hola ${name}!" : "Hello ${name}!"
return Mono.just(AttributeBuilder.of([
greeting: greeting,
timestamp: Instant.now()
]))
}
}
2. Register the Connector
@Bean
public HubConfigurer configurer(GreetingConnector greetingConnector) {
return () -> List.of(
ConnectorActivator.of("greeting", greetingConnector)
);
}
3. Use in DSL
function('greet-user') {
payload ->
exchange('greeting') {
connector 'greeting'
config name: payload.username, language: payload.locale
}
}
Creating Custom Connectors
Interface Implementation
Custom connectors implement the CustomConnector interface or extend CustomConnectorSupport:
import com.zenoo.hub.connector.CustomConnectorSupport
import com.zenoo.hub.connector.CustomConnectorConfig
import com.zenoo.hub.dsl.attribute.Attribute
import com.zenoo.hub.dsl.payload.PayloadSpec
@Component
class MyConnector extends CustomConnectorSupport {
@Override
String type() {
return 'my-connector' // Unique identifier
}
@Override
PayloadSpec config() {
return PayloadSpec.of {
// Define input validation schema
}
}
@Override
PayloadSpec payload() {
return PayloadSpec.of {
// Define expected result schema
}
}
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
// Implement connector logic
return Mono.just(AttributeBuilder.of(result))
}
}
Required Methods
type() - Connector Identifier
Returns the unique identifier used in DSL connector() calls:
@Override
String type() {
return 'email-sender' // Used as: connector('email-sender')
}
Naming Conventions:
- Simple names:
'sms', 'email', 'validation'
- Versioned:
'mambu@mambu:1.2.3'
- Namespaced:
'Screen-Sync@worldcheck.connectors'
exchange() - Core Logic
Implements the connector’s main functionality:
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
// Access configuration
String apiKey = config.apiKey
String endpoint = config.endpoint
// Perform external operation
return externalService.call(apiKey, endpoint)
.map(response -> AttributeBuilder.of([
success: true,
data: response.getData(),
timestamp: Instant.now()
]))
.onErrorMap(ex -> new ConnectorErrorException(
AttributeBuilder.of([error: ex.getMessage()])
))
}
Optional Methods
Defines the schema for connector configuration:
@Override
PayloadSpec config() {
return PayloadSpec.of {
apiKey // Required string
endpoint // Required string
timeout {
number() // Must be number
optional() // Optional field
}
options { // Nested object
retries {
number()
optional()
}
debug {
truefalse() // Boolean
optional()
}
}
}
}
payload() - Result Schema
Defines the expected structure of connector results:
@Override
PayloadSpec payload() {
return PayloadSpec.of {
success { truefalse() }
data // Any data structure
errors {
optional()
list { string() } // List of error strings
}
metadata {
optional()
requestId
duration { number() }
}
}
}
Payload Specifications
Payload specifications define data structure and validation rules for connector inputs and outputs.
Basic Types
PayloadSpec.of {
stringField // Required string
numberField { number() } // Required number
booleanField { truefalse() } // Required boolean
optionalField { optional() } // Optional field
}
Advanced Validation
PayloadSpec.of {
email {
string()
regex ~/^[A-Za-z0-9+_.-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$/
}
age {
number()
// Custom validation logic can be added
}
status {
oneOf 'pending', 'processing', 'completed', 'failed'
}
tags {
list { string() } // List of strings
optional()
}
}
File Handling
PayloadSpec.of {
document { file() } // Any file
image { file 'image' } // Image files only
pdf { file 'application/pdf' } // PDF files only
attachments {
list { file() } // List of files
optional()
}
}
Nested Structures
PayloadSpec.of {
user {
firstName
lastName
address {
street
city
postalCode { regex ~/^\d{5}(-\d{4})?$/ }
country { oneOf 'US', 'CA', 'MX' }
}
contacts {
optional()
email { optional() }
phone { optional() }
}
}
}
Date Validation
PayloadSpec.of {
birthDate { date('yyyy-MM-dd') } // ISO date format
timestamp { date('yyyy-MM-dd HH:mm:ss') } // Custom format
created { date() } // Default format
}
Error Handling
Exception Types
ConnectorErrorException - Retryable Errors
Used for temporary failures that should be retried:
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
return externalApi.call(config)
.onErrorMap(TimeoutException.class, ex ->
new ConnectorErrorException(
AttributeBuilder.of([
error: "API timeout",
retryable: true,
details: ex.getMessage()
])
)
)
}
UnretryableConnectorErrorException - Non-Retryable Errors
Used for permanent failures that should not be retried:
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
if (!isValidApiKey(config.apiKey)) {
return Mono.error(new UnretryableConnectorErrorException(
AttributeBuilder.of([
error: "Invalid API key",
code: "AUTH_FAILED"
])
))
}
// ... rest of implementation
}
ConnectorException - System Errors
Used for unexpected system errors:
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
try {
return performOperation(config)
} catch (IllegalArgumentException ex) {
throw new ConnectorException("Invalid configuration: " + ex.getMessage())
}
}
Error Response Structure
Structure error responses consistently:
// Success response
AttributeBuilder.of([
success: true,
data: result,
metadata: [requestId: uuid, duration: elapsed]
])
// Error response
AttributeBuilder.of([
success: false,
error: [
message: "Operation failed",
code: "EXTERNAL_API_ERROR",
details: errorDetails,
retryable: true
],
metadata: [requestId: uuid, timestamp: now]
])
Testing Connectors
Unit Testing
class MyConnectorSpec extends Specification {
def connector = new MyConnector()
def "should validate configuration"() {
given:
def config = new CustomConnectorConfig([
apiKey: "test-key",
endpoint: "https://api.test.com"
])
when:
def result = connector.validate(config)
then:
!result.hasErrors()
}
def "should handle successful response"() {
given:
def config = new CustomConnectorConfig([
apiKey: "test-key"
])
when:
def result = connector.exchange(config).block()
then:
result.success == true
result.data != null
}
}
Integration Testing with MockConnector
class ComponentIntegrationSpec extends Specification {
def "should handle connector response"() {
given:
def mockConnector = new MockConnector(new MyConnector())
mockConnector.getMockExchange()
.withResult([success: true, data: "test-result"])
// Register mock connector
// Execute component test
expect:
// Verify results
}
}
Mock Connector Patterns
// Success response
mockConnector.getMockExchange()
.withResult([success: true, data: expectedData])
// Error response
mockConnector.getMockExchange()
.withError()
// Delayed response
mockConnector.getMockExchange()
.withDelay(5)
.withResult(data)
// Configuration validation
mockConnector.getMockExchange()
.withConfigConsumer { config ->
assert config.apiKey == "expected-key"
}
OSGI Plugin Connectors
Plugin Connector Implementation
OSGI plugin connectors implement the com.zenoo.hub.plugin.sdk.Connector interface:
package com.example.plugin
import com.zenoo.hub.dsl.attribute.Attribute
import com.zenoo.hub.dsl.attribute.AttributeBuilder
import com.zenoo.hub.plugin.sdk.Connector
import org.osgi.service.component.annotations.Component
import reactor.core.publisher.Mono
@Component
class MyPluginConnector implements Connector {
@Override
String name() {
return 'my-plugin'
}
@Override
Mono<Attribute> exchange(Attribute input) {
// Process input attribute
String data = input.asString()
// Perform operation
String result = processData(data)
// Return result
return Mono.just(AttributeBuilder.of([
processed: result,
timestamp: System.currentTimeMillis()
]))
}
private String processData(String data) {
// Plugin-specific logic
return data.toUpperCase()
}
}
Plugin Structure
my-plugin/
├── src/main/java/
│ └── com/plugin/
│ └── MyPluginConnector.groovy
├── build.gradle
└── META-INF/
└── MANIFEST.MF
build.gradle for Plugin
plugins {
id 'groovy'
id 'osgi'
}
dependencies {
implementation 'com.zenoo.hub:hub-backend:1.0.0'
implementation 'org.osgi:osgi.core:8.0.0'
implementation 'org.osgi:osgi.cmpn:8.0.0'
}
jar {
manifest {
attributes(
'Bundle-Name': 'My Plugin Connector',
'Bundle-SymbolicName': 'com.example.my-plugin',
'Bundle-Version': '1.0.0',
'Export-Package': 'com.example.plugin',
'Import-Package': [
'com.zenoo.hub.plugin.sdk',
'com.zenoo.hub.dsl.attribute',
'reactor.core.publisher',
'org.osgi.service.component.annotations'
].join(',')
)
}
}
Plugin vs Embedded Connectors
| Aspect | Embedded Connectors | Plugin Connectors |
|---|
| Deployment | Built into application | Hot-deployable bundles |
| Interface | CustomConnector | plugin.sdk.Connector |
| Configuration | PayloadSpec validation | Manual input processing |
| Spring Integration | Full Spring support | Limited Spring features |
| Error Handling | Rich exception types | Basic error handling |
| Testing | Full MockConnector support | Limited testing tools |
Connector Registration
Programmatic Registration
@Bean
@Primary
public HubConfigurer configurer(
SmsConnector smsConnector,
EmailConnector emailConnector,
ValidationConnector validationConnector) {
return () -> List.of(
ConnectorActivator.of("sms", smsConnector),
ConnectorActivator.of("email", emailConnector),
ConnectorActivator.of("validation@external:2.1.0", validationConnector)
);
}
ConnectorActivator Patterns
// Simple registration
ConnectorActivator.of("connector-name", connectorInstance)
// Versioned registration
ConnectorActivator.of("service@provider:1.2.3", connectorInstance)
// Namespaced registration
ConnectorActivator.of("connector@namespace.category", connectorInstance)
// Component-scoped registration
ConnectorActivator.of(ComponentId.of("bundle", "1.0"), connectorInstance)
Auto-Configuration
Hub provides auto-configuration for Spring Boot:
// Automatic registration of all CustomConnector beans
@SpringBootApplication
public class MyHubApplication {
@Bean
public MyConnector myConnector() {
return new MyConnector();
}
// Connector is automatically registered as "my-connector"
}
Component Dependencies
Declare connector dependencies in components:
component('user-management') {
dependencies {
connector 'sms' // Simple dependency
connector 'email@notification:1.0' // Versioned dependency
connector 'validation@external' // Namespaced dependency
}
exposed('send-welcome') {
payload ->
exchange('sms') {
connector 'sms'
config mobile: payload.phone, template: 'welcome'
}
}
}
Monitoring & Metrics
Connector Metrics
Hub automatically tracks metrics for all connectors:
Built-in Metrics
- Request Count: Total number of connector invocations
- Duration: Request/response time metrics
- Error Rate: Success/failure ratios
- Retry Count: Number of retry attempts
connector.name: Connector identifier
connector.type: “custom” or “http”
component.name: Calling component name
status: “success” or “error”
Custom Metrics
Add custom metrics in your connector:
@Component
class MetricsEnabledConnector extends CustomConnectorSupport {
private final MeterRegistry meterRegistry
private final Counter requestCounter
private final Timer responseTimer
MetricsEnabledConnector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry
this.requestCounter = Counter.builder("connector.custom.requests")
.tag("connector", type())
.register(meterRegistry)
this.responseTimer = Timer.builder("connector.custom.duration")
.tag("connector", type())
.register(meterRegistry)
}
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
return Timer.Sample.start(meterRegistry)
.stop(responseTimer)
.then(performExchange(config))
.doOnNext(result -> requestCounter.increment("result", "success"))
.doOnError(error -> requestCounter.increment("result", "error"))
}
}
Logging Best Practices
@Slf4j
@Component
class WellLoggedConnector extends CustomConnectorSupport {
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
String requestId = UUID.randomUUID().toString()
log.info("Starting connector request: {} with config: {}",
requestId, sanitizeConfig(config))
return performOperation(config)
.doOnNext(result ->
log.info("Connector request completed: {} in {}ms",
requestId, duration))
.doOnError(error ->
log.error("Connector request failed: {} - {}",
requestId, error.getMessage(), error))
.contextWrite(Context.of("requestId", requestId))
}
private Object sanitizeConfig(CustomConnectorConfig config) {
// Remove sensitive data from logs
return config.asMap().entrySet().stream()
.filter(entry -> !SENSITIVE_FIELDS.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
}
}
Integration Patterns
HTTP Wrapper Pattern
Use HTTP connector within custom connectors:
@Component
class SmsConnector extends CustomConnectorSupport {
private final HttpConnectorExchange http
@Autowired
SmsConnector(HttpConnectorExchange http) {
this.http = http
}
@Override
PayloadSpec config() {
return PayloadSpec.of {
mobile
template
params { optional() }
}
}
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
def httpSpec = new HttpConnectorSpec().tap {
url "https://sms-service.com/api/send"
method HttpMethod.POST
basicAuth "user", "password"
jsonBody(
customerNumber: config.mobile,
template: config.template,
params: config.params ?: [:]
)
}
return http.exchange(httpSpec)
.map { response -> response.getBody() }
}
}
Service Integration Pattern
@Component
class DatabaseConnector extends CustomConnectorSupport {
private final JdbcTemplate jdbcTemplate
@Autowired
DatabaseConnector(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate
}
@Override
PayloadSpec config() {
return PayloadSpec.of {
query
parameters { optional() }
}
}
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
return Mono.fromCallable(() -> {
List<Map<String, Object>> results = jdbcTemplate.queryForList(
config.query,
config.parameters?.toArray() ?: []
)
return AttributeBuilder.of([
results: results,
count: results.size()
])
})
.subscribeOn(Schedulers.boundedElastic())
.onErrorMap(SQLException.class, ex ->
new ConnectorErrorException(AttributeBuilder.of([
error: "Database query failed",
sqlError: ex.getMessage()
]))
)
}
}
Async Processing Pattern
@Component
class AsyncProcessingConnector extends CustomConnectorSupport {
private final TaskExecutor taskExecutor
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
return Mono.fromCallable(() -> performLongRunningTask(config))
.subscribeOn(Schedulers.fromExecutor(taskExecutor))
.timeout(Duration.ofMinutes(5))
.onErrorMap(TimeoutException.class, ex ->
new ConnectorErrorException(AttributeBuilder.of([
error: "Processing timeout",
duration: "5 minutes"
]))
)
}
}
Best Practices
1. Configuration Design
- Use clear, descriptive field names
- Provide sensible defaults where possible
- Mark optional fields explicitly
- Validate input thoroughly
2. Error Handling
- Use appropriate exception types for different error scenarios
- Provide meaningful error messages and codes
- Include relevant context in error responses
- Log errors appropriately without exposing sensitive data
- Use reactive patterns for non-blocking operations
- Implement appropriate timeouts
- Use connection pooling for external resources
- Consider caching where appropriate
4. Security
- Never log sensitive configuration data
- Use secure credential storage mechanisms
- Validate all inputs to prevent injection attacks
- Follow principle of least privilege
5. Testing
- Write comprehensive unit tests
- Use mock connectors for integration testing
- Test error scenarios and edge cases
- Validate configuration schemas
6. Documentation
- Document connector purpose and capabilities
- Provide clear configuration examples
- Document error conditions and responses
- Include integration examples
Troubleshooting
Common Issues
Connector Not Found
Error: Connector 'my-connector' not found
Solutions:
- Verify connector is registered in
HubConfigurer
- Check connector
type() method returns correct name
- Ensure connector bean is properly configured
- Verify component dependencies include the connector
Configuration Validation Errors
Error: Field 'apiKey' is required
Solutions:
- Check
PayloadSpec configuration matches usage
- Verify all required fields are provided
- Review field names and types
- Test configuration validation in unit tests
Timeout Issues
Error: Exchange timeout after 30 seconds
Solutions:
- Increase exchange timeout:
timeout 60
- Optimize connector implementation
- Check external service availability
- Review retry strategies
Plugin Loading Failures
Error: Unable to load plugin connector
Solutions:
- Verify OSGI bundle manifest
- Check plugin dependencies
- Review class loading issues
- Validate plugin structure
Debugging Tips
-
Enable Debug Logging
logging:
level:
com.zenoo.hub.connector: DEBUG
com.zenoo.hub.exchange: DEBUG
-
Use Request IDs
log.info("Processing request {} with config: {}", requestId, config)
-
Monitor Metrics
- Check connector success/error rates
- Review response times
- Monitor retry patterns
-
Test in Isolation
def connector = new MyConnector()
def config = new CustomConnectorConfig([...])
def result = connector.exchange(config).block()
-
Connection Pooling
@Bean
public WebClient webClient() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(
HttpClient.create()
.connectionProvider(ConnectionProvider.builder("custom")
.maxConnections(100)
.build())
))
.build();
}
-
Caching
@Cacheable("connector-results")
public Mono<Attribute> exchange(CustomConnectorConfig config) {
// Implementation
}
-
Async Processing
return Mono.fromCallable(() -> heavyOperation())
.subscribeOn(Schedulers.boundedElastic())
This documentation covers Hub connector development. For specific connector implementations, refer to their individual documentation or source code examples.