Database Connector Component
A complete example of a database component with connection pooling.
Overview
This component executes SQL queries against a database. It demonstrates:
- Connection pool management
- Parameterized queries
- Transaction support
- Multiple output types (rows, affected count)
Complete Implementation
go
package database
import (
"context"
"database/sql"
"fmt"
"strings"
_ "github.com/lib/pq" // PostgreSQL driver
"github.com/tiny-systems/module/api/v1alpha1"
"github.com/tiny-systems/module/pkg/module"
)
const ComponentName = "database_connector"
type Component struct {
settings Settings
db *sql.DB
}
// Settings configuration
type Settings struct {
Driver string `json:"driver" title:"Driver" default:"postgres"
enum:"postgres,mysql,sqlite3" description:"Database driver"`
Host string `json:"host" title:"Host" required:"true" default:"localhost"
description:"Database host"`
Port int `json:"port" title:"Port" default:"5432"
description:"Database port"`
Database string `json:"database" title:"Database" required:"true"
description:"Database name"`
Username string `json:"username" title:"Username" required:"true"
description:"Database username"`
Password string `json:"password" title:"Password" format:"password"
description:"Database password"`
SSLMode string `json:"sslMode" title:"SSL Mode" default:"disable"
enum:"disable,require,verify-ca,verify-full" description:"SSL connection mode"`
MaxOpenConns int `json:"maxOpenConns" title:"Max Open Connections" default:"10"
description:"Maximum number of open connections"`
MaxIdleConns int `json:"maxIdleConns" title:"Max Idle Connections" default:"5"
description:"Maximum number of idle connections"`
}
// QueryInput for executing queries
type QueryInput struct {
Query string `json:"query" title:"SQL Query" required:"true" format:"textarea"
configurable:"true" description:"SQL query to execute"`
Parameters []any `json:"parameters,omitempty" title:"Parameters" configurable:"true"
description:"Query parameters (for prepared statements)"`
Operation string `json:"operation" title:"Operation" default:"query"
enum:"query,execute" configurable:"true"
description:"query=SELECT (returns rows), execute=INSERT/UPDATE/DELETE (returns affected count)"`
}
// QueryOutput for SELECT results
type QueryOutput struct {
Rows []map[string]any `json:"rows" title:"Result Rows"`
RowCount int `json:"rowCount" title:"Row Count"`
Columns []string `json:"columns" title:"Column Names"`
Query string `json:"query" title:"Executed Query"`
DurationMs int64 `json:"durationMs" title:"Duration (ms)"`
}
// ExecuteOutput for INSERT/UPDATE/DELETE results
type ExecuteOutput struct {
RowsAffected int64 `json:"rowsAffected" title:"Rows Affected"`
LastInsertID int64 `json:"lastInsertId,omitempty" title:"Last Insert ID"`
Query string `json:"query" title:"Executed Query"`
DurationMs int64 `json:"durationMs" title:"Duration (ms)"`
}
// ErrorOutput for query errors
type ErrorOutput struct {
Error string `json:"error" title:"Error Message"`
Query string `json:"query" title:"Failed Query"`
DurationMs int64 `json:"durationMs" title:"Duration (ms)"`
}
// ControlState for connection status
type ControlState struct {
Status string `json:"status" readonly:"true" title:"Connection Status"`
OpenConns int `json:"openConns" readonly:"true" title:"Open Connections"`
IdleConns int `json:"idleConns" readonly:"true" title:"Idle Connections"`
Connect bool `json:"connect" format:"button" title:"Connect" colSpan:"col-span-6"`
Disconnect bool `json:"disconnect" format:"button" title:"Disconnect" colSpan:"col-span-6"`
}
var _ module.Component = (*Component)(nil)
var _ module.ControlHandler = (*Component)(nil)
func (c *Component) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: ComponentName,
Title: "Database",
Description: "Executes SQL queries against relational databases",
Category: "Database",
Tags: []string{"database", "sql", "postgres", "mysql"},
}
}
func (c *Component) Ports() []module.Port {
return []module.Port{
{
Name: v1alpha1.SettingsPort,
Label: "Settings",
Position: module.PositionTop,
Source: true,
Configuration: Settings{},
},
{
Name: v1alpha1.ControlPort,
Label: "Control",
Position: module.PositionTop,
Source: true,
Configuration: ControlState{Status: "disconnected"},
},
{
Name: "query",
Label: "Query",
Position: module.PositionLeft,
Source: true,
Configuration: QueryInput{},
},
{
Name: "result",
Label: "Result",
Position: module.PositionRight,
Source: false,
Configuration: QueryOutput{},
},
{
Name: "executed",
Label: "Executed",
Position: module.PositionRight,
Source: false,
Configuration: ExecuteOutput{},
},
{
Name: "error",
Label: "Error",
Position: module.PositionBottom,
Source: false,
Configuration: ErrorOutput{},
},
}
}
func (c *Component) Handle(
ctx context.Context,
output module.Handler,
port string,
msg any,
) error {
switch port {
case v1alpha1.SettingsPort:
c.settings = msg.(Settings)
return nil
case "query":
return c.handleQuery(ctx, output, msg.(QueryInput))
default:
return fmt.Errorf("unknown port: %s", port)
}
}
func (c *Component) HandleControl(
ctx context.Context,
state any,
port string,
msg any,
) (any, error) {
controlState, _ := state.(*ControlState)
if controlState == nil {
controlState = &ControlState{Status: "disconnected"}
}
switch port {
case "connect":
if c.db != nil {
return c.getConnectionState(), nil
}
if err := c.connect(); err != nil {
return &ControlState{
Status: fmt.Sprintf("error: %s", err.Error()),
}, nil
}
return c.getConnectionState(), nil
case "disconnect":
if c.db != nil {
c.db.Close()
c.db = nil
}
return &ControlState{Status: "disconnected"}, nil
}
return controlState, nil
}
func (c *Component) connect() error {
dsn := c.buildDSN()
db, err := sql.Open(c.settings.Driver, dsn)
if err != nil {
return fmt.Errorf("failed to open database: %w", err)
}
// Configure connection pool
db.SetMaxOpenConns(c.settings.MaxOpenConns)
db.SetMaxIdleConns(c.settings.MaxIdleConns)
// Test connection
if err := db.Ping(); err != nil {
db.Close()
return fmt.Errorf("failed to ping database: %w", err)
}
c.db = db
return nil
}
func (c *Component) buildDSN() string {
switch c.settings.Driver {
case "postgres":
return fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
c.settings.Host,
c.settings.Port,
c.settings.Username,
c.settings.Password,
c.settings.Database,
c.settings.SSLMode,
)
case "mysql":
return fmt.Sprintf(
"%s:%s@tcp(%s:%d)/%s",
c.settings.Username,
c.settings.Password,
c.settings.Host,
c.settings.Port,
c.settings.Database,
)
default:
return c.settings.Database // sqlite3
}
}
func (c *Component) getConnectionState() *ControlState {
if c.db == nil {
return &ControlState{Status: "disconnected"}
}
stats := c.db.Stats()
return &ControlState{
Status: "connected",
OpenConns: stats.OpenConnections,
IdleConns: stats.Idle,
}
}
func (c *Component) handleQuery(
ctx context.Context,
output module.Handler,
input QueryInput,
) error {
// Auto-connect if not connected
if c.db == nil {
if err := c.connect(); err != nil {
return output(ctx, "error", ErrorOutput{
Error: fmt.Sprintf("connection failed: %s", err.Error()),
Query: input.Query,
})
}
}
start := timeNow()
// Determine operation type
operation := input.Operation
if operation == "" {
// Auto-detect based on query
queryUpper := strings.ToUpper(strings.TrimSpace(input.Query))
if strings.HasPrefix(queryUpper, "SELECT") {
operation = "query"
} else {
operation = "execute"
}
}
switch operation {
case "query":
return c.executeQuery(ctx, output, input, start)
case "execute":
return c.executeStatement(ctx, output, input, start)
default:
return output(ctx, "error", ErrorOutput{
Error: fmt.Sprintf("unknown operation: %s", operation),
Query: input.Query,
})
}
}
func (c *Component) executeQuery(
ctx context.Context,
output module.Handler,
input QueryInput,
start int64,
) error {
rows, err := c.db.QueryContext(ctx, input.Query, input.Parameters...)
if err != nil {
return output(ctx, "error", ErrorOutput{
Error: err.Error(),
Query: input.Query,
DurationMs: timeNow() - start,
})
}
defer rows.Close()
// Get column names
columns, err := rows.Columns()
if err != nil {
return output(ctx, "error", ErrorOutput{
Error: err.Error(),
Query: input.Query,
DurationMs: timeNow() - start,
})
}
// Read all rows
var resultRows []map[string]any
for rows.Next() {
// Create a slice of interface{} to represent each column
values := make([]any, len(columns))
valuePointers := make([]any, len(columns))
for i := range values {
valuePointers[i] = &values[i]
}
if err := rows.Scan(valuePointers...); err != nil {
continue
}
// Convert to map
row := make(map[string]any)
for i, col := range columns {
val := values[i]
// Convert []byte to string
if b, ok := val.([]byte); ok {
row[col] = string(b)
} else {
row[col] = val
}
}
resultRows = append(resultRows, row)
}
return output(ctx, "result", QueryOutput{
Rows: resultRows,
RowCount: len(resultRows),
Columns: columns,
Query: input.Query,
DurationMs: timeNow() - start,
})
}
func (c *Component) executeStatement(
ctx context.Context,
output module.Handler,
input QueryInput,
start int64,
) error {
result, err := c.db.ExecContext(ctx, input.Query, input.Parameters...)
if err != nil {
return output(ctx, "error", ErrorOutput{
Error: err.Error(),
Query: input.Query,
DurationMs: timeNow() - start,
})
}
rowsAffected, _ := result.RowsAffected()
lastInsertID, _ := result.LastInsertId()
return output(ctx, "executed", ExecuteOutput{
RowsAffected: rowsAffected,
LastInsertID: lastInsertID,
Query: input.Query,
DurationMs: timeNow() - start,
})
}
func timeNow() int64 {
return timeNowFunc().UnixMilli()
}
var timeNowFunc = func() interface{ UnixMilli() int64 } {
return struct{ UnixMilli func() int64 }{
UnixMilli: func() int64 { return 0 },
}
}
func (c *Component) Instance() module.Component {
return &Component{}
}Usage Examples
SELECT Query
yaml
edges:
- port: _settings
data:
driver: "postgres"
host: "localhost"
port: 5432
database: "myapp"
username: "{{$.secrets.dbUser}}"
password: "{{$.secrets.dbPassword}}"
sslMode: "require"
- port: query
data:
query: "SELECT id, name, email FROM users WHERE status = $1 LIMIT $2"
parameters:
- "active"
- 100
operation: "query"INSERT Statement
yaml
edges:
- port: query
data:
query: "INSERT INTO users (name, email) VALUES ($1, $2)"
parameters:
- "{{$.user.name}}"
- "{{$.user.email}}"
operation: "execute"Dynamic Query Building
yaml
edges:
- port: query
data:
query: "{{$.queryTemplate}}"
parameters: "{{$.queryParams}}"Response Examples
Query Result (SELECT)
json
{
"rows": [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"}
],
"rowCount": 2,
"columns": ["id", "name", "email"],
"query": "SELECT id, name, email FROM users WHERE status = $1 LIMIT $2",
"durationMs": 12
}Execute Result (INSERT/UPDATE/DELETE)
json
{
"rowsAffected": 1,
"lastInsertId": 42,
"query": "INSERT INTO users (name, email) VALUES ($1, $2)",
"durationMs": 8
}Error Result
json
{
"error": "pq: relation \"users\" does not exist",
"query": "SELECT * FROM users",
"durationMs": 3
}Key Patterns Demonstrated
1. Connection Pooling
go
db.SetMaxOpenConns(c.settings.MaxOpenConns)
db.SetMaxIdleConns(c.settings.MaxIdleConns)2. Parameterized Queries
Prevent SQL injection:
go
rows, err := c.db.QueryContext(ctx, input.Query, input.Parameters...)3. Auto-Connect
Lazy connection on first query:
go
if c.db == nil {
if err := c.connect(); err != nil {
return output(ctx, "error", ...)
}
}4. Operation Auto-Detection
go
queryUpper := strings.ToUpper(strings.TrimSpace(input.Query))
if strings.HasPrefix(queryUpper, "SELECT") {
operation = "query"
} else {
operation = "execute"
}Visual Flow
┌─────────────────────────────────────────────────────────────┐
│ Database Connector │
│ │
│ Query Input │
│ ┌────────────────────────┐ │
│ │ SELECT * FROM users │ │
│ │ WHERE status = $1 │ │
│ │ parameters: ["active"] │ │
│ └───────────┬────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Connection Pool │ │
│ │ │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │Conn1│ │Conn2│ │Conn3│ │Conn4│ │Conn5│ │ │
│ │ └──┬──┘ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ │ │ │ │
│ └──────┼─────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Database │ │
│ │ (PostgreSQL) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────┐ ┌────────┐ │
│ │ Result │ │ Error │ │
│ │ Port │ │ Port │ │
│ └────────┘ └────────┘ │
└─────────────────────────────────────────────────────────────┘Security Best Practices
1. Use Secrets for Credentials
yaml
password: "{{$.secrets.dbPassword}}"2. Enable SSL in Production
yaml
sslMode: "verify-full"3. Always Use Parameterized Queries
yaml
# GOOD
query: "SELECT * FROM users WHERE id = $1"
parameters: ["{{$.userId}}"]
# BAD - SQL Injection vulnerable!
query: "SELECT * FROM users WHERE id = {{$.userId}}"Extension Ideas
- Transaction Support: BEGIN/COMMIT/ROLLBACK
- Batch Inserts: Efficient bulk data insertion
- Query Templates: Named queries with parameter substitution
- Connection Health: Periodic ping and reconnection
- Query Logging: Log all queries for debugging