Skip to content

Database Connector Component

A complete example of a database component with connection pooling.

Overview

This component executes SQL queries against a database. It demonstrates:

  • Connection pool management
  • Parameterized queries
  • Transaction support
  • Multiple output types (rows, affected count)

Complete Implementation

go
package database

import (
    "context"
    "database/sql"
    "fmt"
    "strings"

    _ "github.com/lib/pq" // PostgreSQL driver
    "github.com/tiny-systems/module/api/v1alpha1"
    "github.com/tiny-systems/module/pkg/module"
)

const ComponentName = "database_connector"

type Component struct {
    settings Settings
    db       *sql.DB
}

// Settings configuration
type Settings struct {
    Driver       string `json:"driver" title:"Driver" default:"postgres"
        enum:"postgres,mysql,sqlite3" description:"Database driver"`
    Host         string `json:"host" title:"Host" required:"true" default:"localhost"
        description:"Database host"`
    Port         int    `json:"port" title:"Port" default:"5432"
        description:"Database port"`
    Database     string `json:"database" title:"Database" required:"true"
        description:"Database name"`
    Username     string `json:"username" title:"Username" required:"true"
        description:"Database username"`
    Password     string `json:"password" title:"Password" format:"password"
        description:"Database password"`
    SSLMode      string `json:"sslMode" title:"SSL Mode" default:"disable"
        enum:"disable,require,verify-ca,verify-full" description:"SSL connection mode"`
    MaxOpenConns int    `json:"maxOpenConns" title:"Max Open Connections" default:"10"
        description:"Maximum number of open connections"`
    MaxIdleConns int    `json:"maxIdleConns" title:"Max Idle Connections" default:"5"
        description:"Maximum number of idle connections"`
}

// QueryInput for executing queries
type QueryInput struct {
    Query      string `json:"query" title:"SQL Query" required:"true" format:"textarea"
        configurable:"true" description:"SQL query to execute"`
    Parameters []any  `json:"parameters,omitempty" title:"Parameters" configurable:"true"
        description:"Query parameters (for prepared statements)"`
    Operation  string `json:"operation" title:"Operation" default:"query"
        enum:"query,execute" configurable:"true"
        description:"query=SELECT (returns rows), execute=INSERT/UPDATE/DELETE (returns affected count)"`
}

// QueryOutput for SELECT results
type QueryOutput struct {
    Rows       []map[string]any `json:"rows" title:"Result Rows"`
    RowCount   int              `json:"rowCount" title:"Row Count"`
    Columns    []string         `json:"columns" title:"Column Names"`
    Query      string           `json:"query" title:"Executed Query"`
    DurationMs int64            `json:"durationMs" title:"Duration (ms)"`
}

// ExecuteOutput for INSERT/UPDATE/DELETE results
type ExecuteOutput struct {
    RowsAffected int64  `json:"rowsAffected" title:"Rows Affected"`
    LastInsertID int64  `json:"lastInsertId,omitempty" title:"Last Insert ID"`
    Query        string `json:"query" title:"Executed Query"`
    DurationMs   int64  `json:"durationMs" title:"Duration (ms)"`
}

// ErrorOutput for query errors
type ErrorOutput struct {
    Error      string `json:"error" title:"Error Message"`
    Query      string `json:"query" title:"Failed Query"`
    DurationMs int64  `json:"durationMs" title:"Duration (ms)"`
}

// ControlState for connection status
type ControlState struct {
    Status      string `json:"status" readonly:"true" title:"Connection Status"`
    OpenConns   int    `json:"openConns" readonly:"true" title:"Open Connections"`
    IdleConns   int    `json:"idleConns" readonly:"true" title:"Idle Connections"`
    Connect     bool   `json:"connect" format:"button" title:"Connect" colSpan:"col-span-6"`
    Disconnect  bool   `json:"disconnect" format:"button" title:"Disconnect" colSpan:"col-span-6"`
}

var _ module.Component = (*Component)(nil)
var _ module.ControlHandler = (*Component)(nil)

func (c *Component) GetInfo() module.ComponentInfo {
    return module.ComponentInfo{
        Name:        ComponentName,
        Title:       "Database",
        Description: "Executes SQL queries against relational databases",
        Category:    "Database",
        Tags:        []string{"database", "sql", "postgres", "mysql"},
    }
}

func (c *Component) Ports() []module.Port {
    return []module.Port{
        {
            Name:          v1alpha1.SettingsPort,
            Label:         "Settings",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: Settings{},
        },
        {
            Name:          v1alpha1.ControlPort,
            Label:         "Control",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: ControlState{Status: "disconnected"},
        },
        {
            Name:          "query",
            Label:         "Query",
            Position:      module.PositionLeft,
            Source:        true,
            Configuration: QueryInput{},
        },
        {
            Name:          "result",
            Label:         "Result",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: QueryOutput{},
        },
        {
            Name:          "executed",
            Label:         "Executed",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: ExecuteOutput{},
        },
        {
            Name:          "error",
            Label:         "Error",
            Position:      module.PositionBottom,
            Source:        false,
            Configuration: ErrorOutput{},
        },
    }
}

func (c *Component) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    msg any,
) error {
    switch port {
    case v1alpha1.SettingsPort:
        c.settings = msg.(Settings)
        return nil

    case "query":
        return c.handleQuery(ctx, output, msg.(QueryInput))

    default:
        return fmt.Errorf("unknown port: %s", port)
    }
}

func (c *Component) HandleControl(
    ctx context.Context,
    state any,
    port string,
    msg any,
) (any, error) {
    controlState, _ := state.(*ControlState)
    if controlState == nil {
        controlState = &ControlState{Status: "disconnected"}
    }

    switch port {
    case "connect":
        if c.db != nil {
            return c.getConnectionState(), nil
        }

        if err := c.connect(); err != nil {
            return &ControlState{
                Status: fmt.Sprintf("error: %s", err.Error()),
            }, nil
        }

        return c.getConnectionState(), nil

    case "disconnect":
        if c.db != nil {
            c.db.Close()
            c.db = nil
        }
        return &ControlState{Status: "disconnected"}, nil
    }

    return controlState, nil
}

func (c *Component) connect() error {
    dsn := c.buildDSN()

    db, err := sql.Open(c.settings.Driver, dsn)
    if err != nil {
        return fmt.Errorf("failed to open database: %w", err)
    }

    // Configure connection pool
    db.SetMaxOpenConns(c.settings.MaxOpenConns)
    db.SetMaxIdleConns(c.settings.MaxIdleConns)

    // Test connection
    if err := db.Ping(); err != nil {
        db.Close()
        return fmt.Errorf("failed to ping database: %w", err)
    }

    c.db = db
    return nil
}

func (c *Component) buildDSN() string {
    switch c.settings.Driver {
    case "postgres":
        return fmt.Sprintf(
            "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
            c.settings.Host,
            c.settings.Port,
            c.settings.Username,
            c.settings.Password,
            c.settings.Database,
            c.settings.SSLMode,
        )
    case "mysql":
        return fmt.Sprintf(
            "%s:%s@tcp(%s:%d)/%s",
            c.settings.Username,
            c.settings.Password,
            c.settings.Host,
            c.settings.Port,
            c.settings.Database,
        )
    default:
        return c.settings.Database // sqlite3
    }
}

func (c *Component) getConnectionState() *ControlState {
    if c.db == nil {
        return &ControlState{Status: "disconnected"}
    }

    stats := c.db.Stats()
    return &ControlState{
        Status:    "connected",
        OpenConns: stats.OpenConnections,
        IdleConns: stats.Idle,
    }
}

func (c *Component) handleQuery(
    ctx context.Context,
    output module.Handler,
    input QueryInput,
) error {
    // Auto-connect if not connected
    if c.db == nil {
        if err := c.connect(); err != nil {
            return output(ctx, "error", ErrorOutput{
                Error: fmt.Sprintf("connection failed: %s", err.Error()),
                Query: input.Query,
            })
        }
    }

    start := timeNow()

    // Determine operation type
    operation := input.Operation
    if operation == "" {
        // Auto-detect based on query
        queryUpper := strings.ToUpper(strings.TrimSpace(input.Query))
        if strings.HasPrefix(queryUpper, "SELECT") {
            operation = "query"
        } else {
            operation = "execute"
        }
    }

    switch operation {
    case "query":
        return c.executeQuery(ctx, output, input, start)
    case "execute":
        return c.executeStatement(ctx, output, input, start)
    default:
        return output(ctx, "error", ErrorOutput{
            Error: fmt.Sprintf("unknown operation: %s", operation),
            Query: input.Query,
        })
    }
}

func (c *Component) executeQuery(
    ctx context.Context,
    output module.Handler,
    input QueryInput,
    start int64,
) error {
    rows, err := c.db.QueryContext(ctx, input.Query, input.Parameters...)
    if err != nil {
        return output(ctx, "error", ErrorOutput{
            Error:      err.Error(),
            Query:      input.Query,
            DurationMs: timeNow() - start,
        })
    }
    defer rows.Close()

    // Get column names
    columns, err := rows.Columns()
    if err != nil {
        return output(ctx, "error", ErrorOutput{
            Error:      err.Error(),
            Query:      input.Query,
            DurationMs: timeNow() - start,
        })
    }

    // Read all rows
    var resultRows []map[string]any
    for rows.Next() {
        // Create a slice of interface{} to represent each column
        values := make([]any, len(columns))
        valuePointers := make([]any, len(columns))
        for i := range values {
            valuePointers[i] = &values[i]
        }

        if err := rows.Scan(valuePointers...); err != nil {
            continue
        }

        // Convert to map
        row := make(map[string]any)
        for i, col := range columns {
            val := values[i]
            // Convert []byte to string
            if b, ok := val.([]byte); ok {
                row[col] = string(b)
            } else {
                row[col] = val
            }
        }
        resultRows = append(resultRows, row)
    }

    return output(ctx, "result", QueryOutput{
        Rows:       resultRows,
        RowCount:   len(resultRows),
        Columns:    columns,
        Query:      input.Query,
        DurationMs: timeNow() - start,
    })
}

func (c *Component) executeStatement(
    ctx context.Context,
    output module.Handler,
    input QueryInput,
    start int64,
) error {
    result, err := c.db.ExecContext(ctx, input.Query, input.Parameters...)
    if err != nil {
        return output(ctx, "error", ErrorOutput{
            Error:      err.Error(),
            Query:      input.Query,
            DurationMs: timeNow() - start,
        })
    }

    rowsAffected, _ := result.RowsAffected()
    lastInsertID, _ := result.LastInsertId()

    return output(ctx, "executed", ExecuteOutput{
        RowsAffected: rowsAffected,
        LastInsertID: lastInsertID,
        Query:        input.Query,
        DurationMs:   timeNow() - start,
    })
}

func timeNow() int64 {
    return timeNowFunc().UnixMilli()
}

var timeNowFunc = func() interface{ UnixMilli() int64 } {
    return struct{ UnixMilli func() int64 }{
        UnixMilli: func() int64 { return 0 },
    }
}

func (c *Component) Instance() module.Component {
    return &Component{}
}

Usage Examples

SELECT Query

yaml
edges:
  - port: _settings
    data:
      driver: "postgres"
      host: "localhost"
      port: 5432
      database: "myapp"
      username: "{{$.secrets.dbUser}}"
      password: "{{$.secrets.dbPassword}}"
      sslMode: "require"

  - port: query
    data:
      query: "SELECT id, name, email FROM users WHERE status = $1 LIMIT $2"
      parameters:
        - "active"
        - 100
      operation: "query"

INSERT Statement

yaml
edges:
  - port: query
    data:
      query: "INSERT INTO users (name, email) VALUES ($1, $2)"
      parameters:
        - "{{$.user.name}}"
        - "{{$.user.email}}"
      operation: "execute"

Dynamic Query Building

yaml
edges:
  - port: query
    data:
      query: "{{$.queryTemplate}}"
      parameters: "{{$.queryParams}}"

Response Examples

Query Result (SELECT)

json
{
  "rows": [
    {"id": 1, "name": "Alice", "email": "alice@example.com"},
    {"id": 2, "name": "Bob", "email": "bob@example.com"}
  ],
  "rowCount": 2,
  "columns": ["id", "name", "email"],
  "query": "SELECT id, name, email FROM users WHERE status = $1 LIMIT $2",
  "durationMs": 12
}

Execute Result (INSERT/UPDATE/DELETE)

json
{
  "rowsAffected": 1,
  "lastInsertId": 42,
  "query": "INSERT INTO users (name, email) VALUES ($1, $2)",
  "durationMs": 8
}

Error Result

json
{
  "error": "pq: relation \"users\" does not exist",
  "query": "SELECT * FROM users",
  "durationMs": 3
}

Key Patterns Demonstrated

1. Connection Pooling

go
db.SetMaxOpenConns(c.settings.MaxOpenConns)
db.SetMaxIdleConns(c.settings.MaxIdleConns)

2. Parameterized Queries

Prevent SQL injection:

go
rows, err := c.db.QueryContext(ctx, input.Query, input.Parameters...)

3. Auto-Connect

Lazy connection on first query:

go
if c.db == nil {
    if err := c.connect(); err != nil {
        return output(ctx, "error", ...)
    }
}

4. Operation Auto-Detection

go
queryUpper := strings.ToUpper(strings.TrimSpace(input.Query))
if strings.HasPrefix(queryUpper, "SELECT") {
    operation = "query"
} else {
    operation = "execute"
}

Visual Flow

┌─────────────────────────────────────────────────────────────┐
│                    Database Connector                        │
│                                                              │
│   Query Input                                                │
│   ┌────────────────────────┐                                 │
│   │ SELECT * FROM users    │                                 │
│   │ WHERE status = $1      │                                 │
│   │ parameters: ["active"] │                                 │
│   └───────────┬────────────┘                                 │
│               │                                              │
│               ▼                                              │
│   ┌────────────────────────────────────────────────────┐    │
│   │              Connection Pool                        │    │
│   │                                                     │    │
│   │   ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐         │    │
│   │   │Conn1│ │Conn2│ │Conn3│ │Conn4│ │Conn5│         │    │
│   │   └──┬──┘ └─────┘ └─────┘ └─────┘ └─────┘         │    │
│   │      │                                             │    │
│   └──────┼─────────────────────────────────────────────┘    │
│          │                                                   │
│          ▼                                                   │
│   ┌─────────────────┐                                       │
│   │    Database     │                                       │
│   │   (PostgreSQL)  │                                       │
│   └────────┬────────┘                                       │
│            │                                                 │
│     ┌──────┴──────┐                                         │
│     │             │                                         │
│     ▼             ▼                                         │
│ ┌────────┐   ┌────────┐                                     │
│ │ Result │   │ Error  │                                     │
│ │  Port  │   │  Port  │                                     │
│ └────────┘   └────────┘                                     │
└─────────────────────────────────────────────────────────────┘

Security Best Practices

1. Use Secrets for Credentials

yaml
password: "{{$.secrets.dbPassword}}"

2. Enable SSL in Production

yaml
sslMode: "verify-full"

3. Always Use Parameterized Queries

yaml
# GOOD
query: "SELECT * FROM users WHERE id = $1"
parameters: ["{{$.userId}}"]

# BAD - SQL Injection vulnerable!
query: "SELECT * FROM users WHERE id = {{$.userId}}"

Extension Ideas

  1. Transaction Support: BEGIN/COMMIT/ROLLBACK
  2. Batch Inserts: Efficient bulk data insertion
  3. Query Templates: Named queries with parameter substitution
  4. Connection Health: Periodic ping and reconnection
  5. Query Logging: Log all queries for debugging

Build flow-based applications on Kubernetes