Skip to content

HTTP Server Component

A complete example of an HTTP server component with Kubernetes service exposure.

Overview

This component creates an HTTP server that handles incoming requests. It demonstrates:

  • HTTP server lifecycle management
  • Kubernetes Service/Ingress creation
  • Request/response handling
  • Leader-only operation

Complete Implementation

go
package httpserver

import (
    "context"
    "fmt"
    "io"
    "net"
    "net/http"
    "sync"
    "time"

    "github.com/tiny-systems/module/api/v1alpha1"
    "github.com/tiny-systems/module/pkg/module"
    "github.com/tiny-systems/module/pkg/resource"
    "github.com/tiny-systems/module/pkg/utils"
)

const ComponentName = "http_server"

type Component struct {
    settings        Settings
    resourceManager *resource.Manager

    mu       sync.RWMutex
    server   *http.Server
    listener net.Listener
    port     int
}

// Settings configuration
type Settings struct {
    Path        string   `json:"path" title:"Path" default:"/" required:"true"
        description:"URL path to listen on"`
    Methods     []string `json:"methods" title:"Allowed Methods" default:"[\"GET\",\"POST\"]"
        description:"HTTP methods to accept"`
    Hostnames   []string `json:"hostnames,omitempty" title:"Hostnames"
        description:"External hostnames for Ingress (leave empty for internal only)"`
    Timeout     string   `json:"timeout" title:"Request Timeout" default:"30s"
        description:"Maximum request processing time"`
    EnableCORS  bool     `json:"enableCors" title:"Enable CORS" default:"false"
        description:"Add CORS headers to responses"`
}

// ControlState for server status
type ControlState struct {
    Status    string `json:"status" readonly:"true" title:"Status"`
    Port      int    `json:"port,omitempty" readonly:"true" title:"Port"`
    Requests  int64  `json:"requests" readonly:"true" title:"Total Requests"`
    URL       string `json:"url,omitempty" readonly:"true" title:"External URL"`
    Start     bool   `json:"start" format:"button" title:"Start" colSpan:"col-span-6"`
    Stop      bool   `json:"stop" format:"button" title:"Stop" colSpan:"col-span-6"`
}

// RequestOutput for incoming requests
type RequestOutput struct {
    RequestID   string            `json:"requestId" title:"Request ID"`
    Method      string            `json:"method" title:"Method"`
    Path        string            `json:"path" title:"Path"`
    Query       map[string]string `json:"query,omitempty" title:"Query Parameters"`
    Headers     map[string]string `json:"headers" title:"Headers"`
    Body        string            `json:"body,omitempty" title:"Body"`
    RemoteAddr  string            `json:"remoteAddr" title:"Remote Address"`
    ReceivedAt  string            `json:"receivedAt" title:"Received At"`
}

// ResponseInput for sending responses
type ResponseInput struct {
    RequestID   string            `json:"requestId" title:"Request ID" required:"true"`
    StatusCode  int               `json:"statusCode" title:"Status Code" default:"200"`
    Headers     map[string]string `json:"headers,omitempty" title:"Response Headers"`
    Body        string            `json:"body,omitempty" title:"Response Body"`
    ContentType string            `json:"contentType" title:"Content Type" default:"application/json"`
}

var _ module.Component = (*Component)(nil)
var _ module.ControlHandler = (*Component)(nil)

func (c *Component) GetInfo() module.ComponentInfo {
    return module.ComponentInfo{
        Name:        ComponentName,
        Title:       "HTTP Server",
        Description: "Receives HTTP requests and sends responses",
        Category:    "HTTP",
        Tags:        []string{"http", "server", "webhook", "api"},
    }
}

func (c *Component) Ports() []module.Port {
    return []module.Port{
        {
            Name:          v1alpha1.SettingsPort,
            Label:         "Settings",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: Settings{},
        },
        {
            Name:          v1alpha1.ControlPort,
            Label:         "Control",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: ControlState{Status: "stopped"},
        },
        {
            Name:          v1alpha1.ClientPort,
            Label:         "Client",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: resource.Manager{},
        },
        {
            Name:          v1alpha1.ReconcilePort,
            Label:         "Reconcile",
            Position:      module.PositionTop,
            Source:        true,
            Configuration: v1alpha1.TinyNode{},
        },
        {
            Name:          "request",
            Label:         "Request",
            Position:      module.PositionRight,
            Source:        false,
            Configuration: RequestOutput{},
        },
        {
            Name:          "response",
            Label:         "Response",
            Position:      module.PositionLeft,
            Source:        true,
            Configuration: ResponseInput{},
        },
    }
}

// Pending requests waiting for response
var pendingRequests = sync.Map{}

type pendingRequest struct {
    w      http.ResponseWriter
    done   chan struct{}
    ctx    context.Context
}

func (c *Component) Handle(
    ctx context.Context,
    output module.Handler,
    port string,
    msg any,
) error {
    switch port {
    case v1alpha1.SettingsPort:
        c.settings = msg.(Settings)
        return nil

    case v1alpha1.ClientPort:
        c.resourceManager = msg.(*resource.Manager)
        return nil

    case v1alpha1.ReconcilePort:
        return c.handleReconcile(ctx, output, msg.(*v1alpha1.TinyNode))

    case "response":
        return c.handleResponse(ctx, msg.(ResponseInput))

    default:
        return fmt.Errorf("unknown port: %s", port)
    }
}

func (c *Component) handleReconcile(
    ctx context.Context,
    output module.Handler,
    node *v1alpha1.TinyNode,
) error {
    // Read port from other instances
    if node.Status.Metadata != nil {
        if portStr, ok := node.Status.Metadata["http-port"]; ok {
            fmt.Sscanf(portStr, "%d", &c.port)
        }
    }

    // Leader writes the port
    if utils.IsLeader(ctx) && c.port > 0 {
        return output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
            if n.Status.Metadata == nil {
                n.Status.Metadata = make(map[string]string)
            }
            n.Status.Metadata["http-port"] = fmt.Sprintf("%d", c.port)
        })
    }

    return nil
}

func (c *Component) handleResponse(ctx context.Context, resp ResponseInput) error {
    req, ok := pendingRequests.LoadAndDelete(resp.RequestID)
    if !ok {
        return fmt.Errorf("request %s not found or already responded", resp.RequestID)
    }

    pending := req.(*pendingRequest)
    defer close(pending.done)

    // Set headers
    if resp.ContentType != "" {
        pending.w.Header().Set("Content-Type", resp.ContentType)
    }
    for k, v := range resp.Headers {
        pending.w.Header().Set(k, v)
    }

    // Write status code
    if resp.StatusCode == 0 {
        resp.StatusCode = 200
    }
    pending.w.WriteHeader(resp.StatusCode)

    // Write body
    if resp.Body != "" {
        pending.w.Write([]byte(resp.Body))
    }

    return nil
}

func (c *Component) HandleControl(
    ctx context.Context,
    state any,
    port string,
    msg any,
) (any, error) {
    controlState, _ := state.(*ControlState)
    if controlState == nil {
        controlState = &ControlState{Status: "stopped"}
    }

    switch port {
    case "start":
        if !utils.IsLeader(ctx) {
            return controlState, nil
        }

        if c.isRunning() {
            return controlState, nil
        }

        output := utils.GetOutputHandler(ctx)
        if err := c.startServer(ctx, output); err != nil {
            return controlState, err
        }

        url := ""
        if len(c.settings.Hostnames) > 0 {
            url = fmt.Sprintf("https://%s%s", c.settings.Hostnames[0], c.settings.Path)
        }

        return &ControlState{
            Status: "running",
            Port:   c.port,
            URL:    url,
        }, nil

    case "stop":
        if !utils.IsLeader(ctx) {
            return controlState, nil
        }

        c.stopServer()
        return &ControlState{
            Status:   "stopped",
            Requests: controlState.Requests,
        }, nil
    }

    return controlState, nil
}

func (c *Component) startServer(ctx context.Context, output module.Handler) error {
    c.mu.Lock()
    defer c.mu.Unlock()

    // Find available port
    listener, err := net.Listen("tcp", ":0")
    if err != nil {
        return err
    }

    c.listener = listener
    c.port = listener.Addr().(*net.TCPAddr).Port

    // Create HTTP handler
    mux := http.NewServeMux()
    mux.HandleFunc(c.settings.Path, func(w http.ResponseWriter, r *http.Request) {
        c.handleHTTPRequest(ctx, output, w, r)
    })

    timeout, _ := time.ParseDuration(c.settings.Timeout)
    if timeout == 0 {
        timeout = 30 * time.Second
    }

    c.server = &http.Server{
        Handler:      mux,
        ReadTimeout:  timeout,
        WriteTimeout: timeout,
    }

    // Start server
    go c.server.Serve(listener)

    // Expose via Kubernetes if hostnames configured
    if c.resourceManager != nil && len(c.settings.Hostnames) > 0 {
        err := c.resourceManager.ExposePort(ctx, resource.ExposePortRequest{
            Port:      c.port,
            Hostnames: c.settings.Hostnames,
        })
        if err != nil {
            c.stopServer()
            return err
        }
    }

    return nil
}

func (c *Component) stopServer() {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.server != nil {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        c.server.Shutdown(ctx)
        c.server = nil
    }
}

func (c *Component) isRunning() bool {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.server != nil
}

func (c *Component) handleHTTPRequest(
    ctx context.Context,
    output module.Handler,
    w http.ResponseWriter,
    r *http.Request,
) {
    // Add CORS headers if enabled
    if c.settings.EnableCORS {
        w.Header().Set("Access-Control-Allow-Origin", "*")
        w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
        w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")

        if r.Method == "OPTIONS" {
            w.WriteHeader(http.StatusOK)
            return
        }
    }

    // Generate request ID
    requestID := fmt.Sprintf("%d", time.Now().UnixNano())

    // Read body
    body, _ := io.ReadAll(r.Body)
    r.Body.Close()

    // Extract query params
    query := make(map[string]string)
    for k, v := range r.URL.Query() {
        if len(v) > 0 {
            query[k] = v[0]
        }
    }

    // Extract headers
    headers := make(map[string]string)
    for k, v := range r.Header {
        if len(v) > 0 {
            headers[k] = v[0]
        }
    }

    // Create pending request
    pending := &pendingRequest{
        w:    w,
        done: make(chan struct{}),
        ctx:  ctx,
    }
    pendingRequests.Store(requestID, pending)

    // Send request to flow
    requestOutput := RequestOutput{
        RequestID:  requestID,
        Method:     r.Method,
        Path:       r.URL.Path,
        Query:      query,
        Headers:    headers,
        Body:       string(body),
        RemoteAddr: r.RemoteAddr,
        ReceivedAt: time.Now().Format(time.RFC3339),
    }

    // Non-blocking output - response comes via response port
    go output(ctx, "request", requestOutput)

    // Wait for response or timeout
    timeout, _ := time.ParseDuration(c.settings.Timeout)
    if timeout == 0 {
        timeout = 30 * time.Second
    }

    select {
    case <-pending.done:
        // Response was sent
    case <-time.After(timeout):
        // Timeout - send error response
        pendingRequests.Delete(requestID)
        w.WriteHeader(http.StatusGatewayTimeout)
        w.Write([]byte(`{"error":"request timeout"}`))
    }
}

func (c *Component) Instance() module.Component {
    return &Component{}
}

Usage Example

Settings Configuration

yaml
edges:
  - port: _settings
    data:
      path: "/webhook"
      methods: ["POST"]
      hostnames: ["api.myapp.example.com"]
      timeout: "30s"
      enableCors: true

Flow: Webhook Processing

HTTP Request ───► HTTP Server ───► Request Port


                Response Port ◄─── Process Request ◄─── Transform Data

Edge Configuration for Response

yaml
# In the Transform component's output edge
edges:
  - port: output
    target:
      node: http-server
      port: response
    data:
      requestId: "{{$.requestId}}"
      statusCode: 200
      contentType: "application/json"
      body: "{{JSON.stringify($.result)}}"

Key Patterns Demonstrated

1. Resource Manager Integration

Create Kubernetes resources:

go
err := c.resourceManager.ExposePort(ctx, resource.ExposePortRequest{
    Port:      c.port,
    Hostnames: c.settings.Hostnames,
})

2. Request-Response Correlation

Use request IDs to match responses:

go
pendingRequests.Store(requestID, pending)

// Later, in handleResponse:
req, ok := pendingRequests.LoadAndDelete(resp.RequestID)

3. Timeout Handling

go
select {
case <-pending.done:
    // Response was sent
case <-time.After(timeout):
    // Timeout
    w.WriteHeader(http.StatusGatewayTimeout)
}

4. Port Discovery via CR

Share port across replicas:

go
// Leader writes
n.Status.Metadata["http-port"] = fmt.Sprintf("%d", c.port)

// Readers read
fmt.Sscanf(portStr, "%d", &c.port)

Visual Flow

┌─────────────────────────────────────────────────────────────┐
│                       HTTP Server                            │
│                                                              │
│   External                                                   │
│   Request  ───────────────────────────────────────────┐     │
│            │                                          │     │
│            ▼                                          │     │
│   ┌─────────────────┐                                 │     │
│   │ Kubernetes      │                                 │     │
│   │ Ingress/Service │                                 │     │
│   └────────┬────────┘                                 │     │
│            │                                          │     │
│            ▼                                          │     │
│   ┌─────────────────┐      ┌────────────┐            │     │
│   │   HTTP Server   │─────►│  Request   │────────────┼────►│ To Flow
│   │   (Goroutine)   │      │  Output    │            │     │
│   └────────┬────────┘      └────────────┘            │     │
│            │                                          │     │
│            │ Wait                                     │     │
│            │                                          │     │
│   ┌────────▼────────┐      ┌────────────┐            │     │
│   │  Pending        │◄─────│  Response  │◄───────────┼─────│ From Flow
│   │  Request Map    │      │  Input     │            │     │
│   └─────────────────┘      └────────────┘            │     │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Extension Ideas

  1. Route Patterns: Support path parameters like /users/:id
  2. Rate Limiting: Add request rate limiting
  3. Authentication: Built-in auth middleware
  4. Metrics: Expose Prometheus metrics
  5. WebSocket: Add WebSocket support

Build flow-based applications on Kubernetes