Skip to main content

Documentation Index

Fetch the complete documentation index at: https://platform.docs.zenoo.com/llms.txt

Use this file to discover all available pages before exploring further.

Connectors

Connectors enable Hub to interact with external systems, APIs, and services. Hub provides both built-in connectors (like HTTP) and a powerful framework for developing custom connectors.

Table of Contents

Getting Started

Custom Connector Development

Plugin Development

Configuration & Registration

Advanced Topics


Connector Types

Hub supports three types of connectors:

1. Built-in Connectors

2. Custom Connectors

  • Embedded Connectors: Spring-managed beans within your Hub application
  • Plugin Connectors: OSGI bundles for modular, hot-deployable connectors

3. Integration Patterns

  • Wrapper Connectors: Custom connectors that use HTTP connector internally
  • Protocol Connectors: For non-HTTP protocols (FTP, SFTP, message queues, etc.)
  • Service Connectors: Integration with specific external services

Basic Usage

Connectors are used within exchanges in the DSL:

HTTP Connector

exchange('api-call') {
    http {
        url 'https://api.example.com/data'
        method 'GET'
    }
}

Custom Connector

exchange('sms-send') {
    connector 'sms'
    config mobile: user.phone, template: 'welcome', params: [name: user.name]
}

Connector with Complex Configuration

exchange('external-validation') {
    connector 'document-validator'
    config {
        document: upload.idFront
        validationType: 'identity'
        options: {
            strictMode: true
            returnDetails: false
        }
    }
    timeout 30
    fixedBackoffRetry {
        retry 3
        backoff 5
    }
}

Quick Start

1. Simple Custom Connector

Create a basic custom connector:
@Component
class GreetingConnector extends CustomConnectorSupport {

    @Override
    String type() {
        return 'greeting'
    }

    @Override
    PayloadSpec config() {
        return PayloadSpec.of {
            name
            language { optional() }
        }
    }

    @Override
    Mono<Attribute> exchange(CustomConnectorConfig config) {
        String name = config.name
        String language = config.language ?: 'en'

        String greeting = language == 'es' ? "¡Hola ${name}!" : "Hello ${name}!"

        return Mono.just(AttributeBuilder.of([
            greeting: greeting,
            timestamp: Instant.now()
        ]))
    }
}

2. Register the Connector

@Bean
public HubConfigurer configurer(GreetingConnector greetingConnector) {
    return () -> List.of(
        ConnectorActivator.of("greeting", greetingConnector)
    );
}

3. Use in DSL

function('greet-user') {
    payload ->
        exchange('greeting') {
            connector 'greeting'
            config name: payload.username, language: payload.locale
        }
}

Creating Custom Connectors

Interface Implementation

Custom connectors implement the CustomConnector interface or extend CustomConnectorSupport:
import com.zenoo.hub.connector.CustomConnectorSupport
import com.zenoo.hub.connector.CustomConnectorConfig
import com.zenoo.hub.dsl.attribute.Attribute
import com.zenoo.hub.dsl.payload.PayloadSpec

@Component
class MyConnector extends CustomConnectorSupport {

    @Override
    String type() {
        return 'my-connector'  // Unique identifier
    }

    @Override
    PayloadSpec config() {
        return PayloadSpec.of {
            // Define input validation schema
        }
    }

    @Override
    PayloadSpec payload() {
        return PayloadSpec.of {
            // Define expected result schema
        }
    }

    @Override
    Mono<Attribute> exchange(CustomConnectorConfig config) {
        // Implement connector logic
        return Mono.just(AttributeBuilder.of(result))
    }
}

Required Methods

type() - Connector Identifier

Returns the unique identifier used in DSL connector() calls:
@Override
String type() {
    return 'email-sender'  // Used as: connector('email-sender')
}
Naming Conventions:
  • Simple names: 'sms', 'email', 'validation'
  • Versioned: 'mambu@mambu:1.2.3'
  • Namespaced: 'Screen-Sync@worldcheck.connectors'

exchange() - Core Logic

Implements the connector’s main functionality:
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
    // Access configuration
    String apiKey = config.apiKey
    String endpoint = config.endpoint

    // Perform external operation
    return externalService.call(apiKey, endpoint)
        .map(response -> AttributeBuilder.of([
            success: true,
            data: response.getData(),
            timestamp: Instant.now()
        ]))
        .onErrorMap(ex -> new ConnectorErrorException(
            AttributeBuilder.of([error: ex.getMessage()])
        ))
}

Optional Methods

config() - Input Validation

Defines the schema for connector configuration:
@Override
PayloadSpec config() {
    return PayloadSpec.of {
        apiKey                          // Required string
        endpoint                        // Required string
        timeout {
            number()                    // Must be number
            optional()                  // Optional field
        }
        options {                       // Nested object
            retries {
                number()
                optional()
            }
            debug {
                truefalse()            // Boolean
                optional()
            }
        }
    }
}

payload() - Result Schema

Defines the expected structure of connector results:
@Override
PayloadSpec payload() {
    return PayloadSpec.of {
        success { truefalse() }
        data                           // Any data structure
        errors {
            optional()
            list { string() }          // List of error strings
        }
        metadata {
            optional()
            requestId
            duration { number() }
        }
    }
}

Payload Specifications

Payload specifications define data structure and validation rules for connector inputs and outputs.

Basic Types

PayloadSpec.of {
    stringField                        // Required string
    numberField { number() }           // Required number
    booleanField { truefalse() }       // Required boolean
    optionalField { optional() }       // Optional field
}

Advanced Validation

PayloadSpec.of {
    email {
        string()
        regex ~/^[A-Za-z0-9+_.-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$/
    }
    age {
        number()
        // Custom validation logic can be added
    }
    status {
        oneOf 'pending', 'processing', 'completed', 'failed'
    }
    tags {
        list { string() }              // List of strings
        optional()
    }
}

File Handling

PayloadSpec.of {
    document { file() }                        // Any file
    image { file 'image' }                     // Image files only
    pdf { file 'application/pdf' }             // PDF files only
    attachments {
        list { file() }                        // List of files
        optional()
    }
}

Nested Structures

PayloadSpec.of {
    user {
        firstName
        lastName
        address {
            street
            city
            postalCode { regex ~/^\d{5}(-\d{4})?$/ }
            country { oneOf 'US', 'CA', 'MX' }
        }
        contacts {
            optional()
            email { optional() }
            phone { optional() }
        }
    }
}

Date Validation

PayloadSpec.of {
    birthDate { date('yyyy-MM-dd') }           // ISO date format
    timestamp { date('yyyy-MM-dd HH:mm:ss') }  // Custom format
    created { date() }                         // Default format
}

Error Handling

Exception Types

ConnectorErrorException - Retryable Errors

Used for temporary failures that should be retried:
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
    return externalApi.call(config)
        .onErrorMap(TimeoutException.class, ex ->
            new ConnectorErrorException(
                AttributeBuilder.of([
                    error: "API timeout",
                    retryable: true,
                    details: ex.getMessage()
                ])
            )
        )
}

UnretryableConnectorErrorException - Non-Retryable Errors

Used for permanent failures that should not be retried:
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
    if (!isValidApiKey(config.apiKey)) {
        return Mono.error(new UnretryableConnectorErrorException(
            AttributeBuilder.of([
                error: "Invalid API key",
                code: "AUTH_FAILED"
            ])
        ))
    }
    // ... rest of implementation
}

ConnectorException - System Errors

Used for unexpected system errors:
@Override
Mono<Attribute> exchange(CustomConnectorConfig config) {
    try {
        return performOperation(config)
    } catch (IllegalArgumentException ex) {
        throw new ConnectorException("Invalid configuration: " + ex.getMessage())
    }
}

Error Response Structure

Structure error responses consistently:
// Success response
AttributeBuilder.of([
    success: true,
    data: result,
    metadata: [requestId: uuid, duration: elapsed]
])

// Error response
AttributeBuilder.of([
    success: false,
    error: [
        message: "Operation failed",
        code: "EXTERNAL_API_ERROR",
        details: errorDetails,
        retryable: true
    ],
    metadata: [requestId: uuid, timestamp: now]
])

Testing Connectors

Unit Testing

class MyConnectorSpec extends Specification {

    def connector = new MyConnector()

    def "should validate configuration"() {
        given:
        def config = new CustomConnectorConfig([
            apiKey: "test-key",
            endpoint: "https://api.test.com"
        ])

        when:
        def result = connector.validate(config)

        then:
        !result.hasErrors()
    }

    def "should handle successful response"() {
        given:
        def config = new CustomConnectorConfig([
            apiKey: "test-key"
        ])

        when:
        def result = connector.exchange(config).block()

        then:
        result.success == true
        result.data != null
    }
}

Integration Testing with MockConnector

class ComponentIntegrationSpec extends Specification {

    def "should handle connector response"() {
        given:
        def mockConnector = new MockConnector(new MyConnector())
        mockConnector.getMockExchange()
            .withResult([success: true, data: "test-result"])

        // Register mock connector
        // Execute component test

        expect:
        // Verify results
    }
}

Mock Connector Patterns

// Success response
mockConnector.getMockExchange()
    .withResult([success: true, data: expectedData])

// Error response
mockConnector.getMockExchange()
    .withError()

// Delayed response
mockConnector.getMockExchange()
    .withDelay(5)
    .withResult(data)

// Configuration validation
mockConnector.getMockExchange()
    .withConfigConsumer { config ->
        assert config.apiKey == "expected-key"
    }

OSGI Plugin Connectors

Plugin Connector Implementation

OSGI plugin connectors implement the com.zenoo.hub.plugin.sdk.Connector interface:
package com.example.plugin

import com.zenoo.hub.dsl.attribute.Attribute
import com.zenoo.hub.dsl.attribute.AttributeBuilder
import com.zenoo.hub.plugin.sdk.Connector
import org.osgi.service.component.annotations.Component
import reactor.core.publisher.Mono

@Component
class MyPluginConnector implements Connector {

    @Override
    String name() {
        return 'my-plugin'
    }

    @Override
    Mono<Attribute> exchange(Attribute input) {
        // Process input attribute
        String data = input.asString()

        // Perform operation
        String result = processData(data)

        // Return result
        return Mono.just(AttributeBuilder.of([
            processed: result,
            timestamp: System.currentTimeMillis()
        ]))
    }

    private String processData(String data) {
        // Plugin-specific logic
        return data.toUpperCase()
    }
}

Plugin Structure

my-plugin/
├── src/main/java/
│   └── com/plugin/
│       └── MyPluginConnector.groovy
├── build.gradle
└── META-INF/
    └── MANIFEST.MF

build.gradle for Plugin

plugins {
    id 'groovy'
    id 'osgi'
}

dependencies {
    implementation 'com.zenoo.hub:hub-backend:1.0.0'
    implementation 'org.osgi:osgi.core:8.0.0'
    implementation 'org.osgi:osgi.cmpn:8.0.0'
}

jar {
    manifest {
        attributes(
            'Bundle-Name': 'My Plugin Connector',
            'Bundle-SymbolicName': 'com.example.my-plugin',
            'Bundle-Version': '1.0.0',
            'Export-Package': 'com.example.plugin',
            'Import-Package': [
                'com.zenoo.hub.plugin.sdk',
                'com.zenoo.hub.dsl.attribute',
                'reactor.core.publisher',
                'org.osgi.service.component.annotations'
            ].join(',')
        )
    }
}

Plugin vs Embedded Connectors

AspectEmbedded ConnectorsPlugin Connectors
DeploymentBuilt into applicationHot-deployable bundles
InterfaceCustomConnectorplugin.sdk.Connector
ConfigurationPayloadSpec validationManual input processing
Spring IntegrationFull Spring supportLimited Spring features
Error HandlingRich exception typesBasic error handling
TestingFull MockConnector supportLimited testing tools

Connector Registration

Programmatic Registration

HubConfigurer Bean

@Bean
@Primary
public HubConfigurer configurer(
    SmsConnector smsConnector,
    EmailConnector emailConnector,
    ValidationConnector validationConnector) {

    return () -> List.of(
        ConnectorActivator.of("sms", smsConnector),
        ConnectorActivator.of("email", emailConnector),
        ConnectorActivator.of("validation@external:2.1.0", validationConnector)
    );
}

ConnectorActivator Patterns

// Simple registration
ConnectorActivator.of("connector-name", connectorInstance)

// Versioned registration
ConnectorActivator.of("service@provider:1.2.3", connectorInstance)

// Namespaced registration
ConnectorActivator.of("connector@namespace.category", connectorInstance)

// Component-scoped registration
ConnectorActivator.of(ComponentId.of("bundle", "1.0"), connectorInstance)

Auto-Configuration

Hub provides auto-configuration for Spring Boot:
// Automatic registration of all CustomConnector beans
@SpringBootApplication
public class MyHubApplication {

    @Bean
    public MyConnector myConnector() {
        return new MyConnector();
    }

    // Connector is automatically registered as "my-connector"
}

Component Dependencies

Declare connector dependencies in components:
component('user-management') {
    dependencies {
        connector 'sms'                           // Simple dependency
        connector 'email@notification:1.0'       // Versioned dependency
        connector 'validation@external'          // Namespaced dependency
    }

    exposed('send-welcome') {
        payload ->
            exchange('sms') {
                connector 'sms'
                config mobile: payload.phone, template: 'welcome'
            }
    }
}

Monitoring & Metrics

Connector Metrics

Hub automatically tracks metrics for all connectors:

Built-in Metrics

  • Request Count: Total number of connector invocations
  • Duration: Request/response time metrics
  • Error Rate: Success/failure ratios
  • Retry Count: Number of retry attempts

Metric Tags

  • connector.name: Connector identifier
  • connector.type: “custom” or “http”
  • component.name: Calling component name
  • status: “success” or “error”

Custom Metrics

Add custom metrics in your connector:
@Component
class MetricsEnabledConnector extends CustomConnectorSupport {

    private final MeterRegistry meterRegistry
    private final Counter requestCounter
    private final Timer responseTimer

    MetricsEnabledConnector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry
        this.requestCounter = Counter.builder("connector.custom.requests")
            .tag("connector", type())
            .register(meterRegistry)
        this.responseTimer = Timer.builder("connector.custom.duration")
            .tag("connector", type())
            .register(meterRegistry)
    }

    @Override
    Mono<Attribute> exchange(CustomConnectorConfig config) {
        return Timer.Sample.start(meterRegistry)
            .stop(responseTimer)
            .then(performExchange(config))
            .doOnNext(result -> requestCounter.increment("result", "success"))
            .doOnError(error -> requestCounter.increment("result", "error"))
    }
}

Logging Best Practices

@Slf4j
@Component
class WellLoggedConnector extends CustomConnectorSupport {

    @Override
    Mono<Attribute> exchange(CustomConnectorConfig config) {
        String requestId = UUID.randomUUID().toString()

        log.info("Starting connector request: {} with config: {}",
                 requestId, sanitizeConfig(config))

        return performOperation(config)
            .doOnNext(result ->
                log.info("Connector request completed: {} in {}ms",
                         requestId, duration))
            .doOnError(error ->
                log.error("Connector request failed: {} - {}",
                          requestId, error.getMessage(), error))
            .contextWrite(Context.of("requestId", requestId))
    }

    private Object sanitizeConfig(CustomConnectorConfig config) {
        // Remove sensitive data from logs
        return config.asMap().entrySet().stream()
            .filter(entry -> !SENSITIVE_FIELDS.contains(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
    }
}

Integration Patterns

HTTP Wrapper Pattern

Use HTTP connector within custom connectors:
@Component
class SmsConnector extends CustomConnectorSupport {

    private final HttpConnectorExchange http

    @Autowired
    SmsConnector(HttpConnectorExchange http) {
        this.http = http
    }

    @Override
    PayloadSpec config() {
        return PayloadSpec.of {
            mobile
            template
            params { optional() }
        }
    }

    @Override
    Mono<Attribute> exchange(CustomConnectorConfig config) {
        def httpSpec = new HttpConnectorSpec().tap {
            url "https://sms-service.com/api/send"
            method HttpMethod.POST
            basicAuth "user", "password"
            jsonBody(
                customerNumber: config.mobile,
                template: config.template,
                params: config.params ?: [:]
            )
        }

        return http.exchange(httpSpec)
            .map { response -> response.getBody() }
    }
}

Service Integration Pattern

@Component
class DatabaseConnector extends CustomConnectorSupport {

    private final JdbcTemplate jdbcTemplate

    @Autowired
    DatabaseConnector(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate
    }

    @Override
    PayloadSpec config() {
        return PayloadSpec.of {
            query
            parameters { optional() }
        }
    }

    @Override
    Mono<Attribute> exchange(CustomConnectorConfig config) {
        return Mono.fromCallable(() -> {
            List<Map<String, Object>> results = jdbcTemplate.queryForList(
                config.query,
                config.parameters?.toArray() ?: []
            )
            return AttributeBuilder.of([
                results: results,
                count: results.size()
            ])
        })
        .subscribeOn(Schedulers.boundedElastic())
        .onErrorMap(SQLException.class, ex ->
            new ConnectorErrorException(AttributeBuilder.of([
                error: "Database query failed",
                sqlError: ex.getMessage()
            ]))
        )
    }
}

Async Processing Pattern

@Component
class AsyncProcessingConnector extends CustomConnectorSupport {

    private final TaskExecutor taskExecutor

    @Override
    Mono<Attribute> exchange(CustomConnectorConfig config) {
        return Mono.fromCallable(() -> performLongRunningTask(config))
            .subscribeOn(Schedulers.fromExecutor(taskExecutor))
            .timeout(Duration.ofMinutes(5))
            .onErrorMap(TimeoutException.class, ex ->
                new ConnectorErrorException(AttributeBuilder.of([
                    error: "Processing timeout",
                    duration: "5 minutes"
                ]))
            )
    }
}

Best Practices

1. Configuration Design

  • Use clear, descriptive field names
  • Provide sensible defaults where possible
  • Mark optional fields explicitly
  • Validate input thoroughly

2. Error Handling

  • Use appropriate exception types for different error scenarios
  • Provide meaningful error messages and codes
  • Include relevant context in error responses
  • Log errors appropriately without exposing sensitive data

3. Performance

  • Use reactive patterns for non-blocking operations
  • Implement appropriate timeouts
  • Use connection pooling for external resources
  • Consider caching where appropriate

4. Security

  • Never log sensitive configuration data
  • Use secure credential storage mechanisms
  • Validate all inputs to prevent injection attacks
  • Follow principle of least privilege

5. Testing

  • Write comprehensive unit tests
  • Use mock connectors for integration testing
  • Test error scenarios and edge cases
  • Validate configuration schemas

6. Documentation

  • Document connector purpose and capabilities
  • Provide clear configuration examples
  • Document error conditions and responses
  • Include integration examples

Troubleshooting

Common Issues

Connector Not Found

Error: Connector 'my-connector' not found
Solutions:
  • Verify connector is registered in HubConfigurer
  • Check connector type() method returns correct name
  • Ensure connector bean is properly configured
  • Verify component dependencies include the connector

Configuration Validation Errors

Error: Field 'apiKey' is required
Solutions:
  • Check PayloadSpec configuration matches usage
  • Verify all required fields are provided
  • Review field names and types
  • Test configuration validation in unit tests

Timeout Issues

Error: Exchange timeout after 30 seconds
Solutions:
  • Increase exchange timeout: timeout 60
  • Optimize connector implementation
  • Check external service availability
  • Review retry strategies

Plugin Loading Failures

Error: Unable to load plugin connector
Solutions:
  • Verify OSGI bundle manifest
  • Check plugin dependencies
  • Review class loading issues
  • Validate plugin structure

Debugging Tips

  1. Enable Debug Logging
    logging:
      level:
        com.zenoo.hub.connector: DEBUG
        com.zenoo.hub.exchange: DEBUG
    
  2. Use Request IDs
    log.info("Processing request {} with config: {}", requestId, config)
    
  3. Monitor Metrics
    • Check connector success/error rates
    • Review response times
    • Monitor retry patterns
  4. Test in Isolation
    def connector = new MyConnector()
    def config = new CustomConnectorConfig([...])
    def result = connector.exchange(config).block()
    

Performance Optimization

  1. Connection Pooling
    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(
                HttpClient.create()
                    .connectionProvider(ConnectionProvider.builder("custom")
                        .maxConnections(100)
                        .build())
            ))
            .build();
    }
    
  2. Caching
    @Cacheable("connector-results")
    public Mono<Attribute> exchange(CustomConnectorConfig config) {
        // Implementation
    }
    
  3. Async Processing
    return Mono.fromCallable(() -> heavyOperation())
        .subscribeOn(Schedulers.boundedElastic())
    


This documentation covers Hub connector development. For specific connector implementations, refer to their individual documentation or source code examples.