HTTP Server Component
A complete example of an HTTP server component with Kubernetes service exposure.
Overview
This component creates an HTTP server that handles incoming requests. It demonstrates:
- HTTP server lifecycle management
- Kubernetes Service/Ingress creation
- Request/response handling
- Leader-only operation
Complete Implementation
go
package httpserver
import (
"context"
"fmt"
"io"
"net"
"net/http"
"sync"
"time"
"github.com/tiny-systems/module/api/v1alpha1"
"github.com/tiny-systems/module/pkg/module"
"github.com/tiny-systems/module/pkg/resource"
"github.com/tiny-systems/module/pkg/utils"
)
const ComponentName = "http_server"
type Component struct {
settings Settings
resourceManager *resource.Manager
mu sync.RWMutex
server *http.Server
listener net.Listener
port int
}
// Settings configuration
type Settings struct {
Path string `json:"path" title:"Path" default:"/" required:"true"
description:"URL path to listen on"`
Methods []string `json:"methods" title:"Allowed Methods" default:"[\"GET\",\"POST\"]"
description:"HTTP methods to accept"`
Hostnames []string `json:"hostnames,omitempty" title:"Hostnames"
description:"External hostnames for Ingress (leave empty for internal only)"`
Timeout string `json:"timeout" title:"Request Timeout" default:"30s"
description:"Maximum request processing time"`
EnableCORS bool `json:"enableCors" title:"Enable CORS" default:"false"
description:"Add CORS headers to responses"`
}
// ControlState for server status
type ControlState struct {
Status string `json:"status" readonly:"true" title:"Status"`
Port int `json:"port,omitempty" readonly:"true" title:"Port"`
Requests int64 `json:"requests" readonly:"true" title:"Total Requests"`
URL string `json:"url,omitempty" readonly:"true" title:"External URL"`
Start bool `json:"start" format:"button" title:"Start" colSpan:"col-span-6"`
Stop bool `json:"stop" format:"button" title:"Stop" colSpan:"col-span-6"`
}
// RequestOutput for incoming requests
type RequestOutput struct {
RequestID string `json:"requestId" title:"Request ID"`
Method string `json:"method" title:"Method"`
Path string `json:"path" title:"Path"`
Query map[string]string `json:"query,omitempty" title:"Query Parameters"`
Headers map[string]string `json:"headers" title:"Headers"`
Body string `json:"body,omitempty" title:"Body"`
RemoteAddr string `json:"remoteAddr" title:"Remote Address"`
ReceivedAt string `json:"receivedAt" title:"Received At"`
}
// ResponseInput for sending responses
type ResponseInput struct {
RequestID string `json:"requestId" title:"Request ID" required:"true"`
StatusCode int `json:"statusCode" title:"Status Code" default:"200"`
Headers map[string]string `json:"headers,omitempty" title:"Response Headers"`
Body string `json:"body,omitempty" title:"Response Body"`
ContentType string `json:"contentType" title:"Content Type" default:"application/json"`
}
var _ module.Component = (*Component)(nil)
var _ module.ControlHandler = (*Component)(nil)
func (c *Component) GetInfo() module.ComponentInfo {
return module.ComponentInfo{
Name: ComponentName,
Title: "HTTP Server",
Description: "Receives HTTP requests and sends responses",
Category: "HTTP",
Tags: []string{"http", "server", "webhook", "api"},
}
}
func (c *Component) Ports() []module.Port {
return []module.Port{
{
Name: v1alpha1.SettingsPort,
Label: "Settings",
Position: module.PositionTop,
Source: true,
Configuration: Settings{},
},
{
Name: v1alpha1.ControlPort,
Label: "Control",
Position: module.PositionTop,
Source: true,
Configuration: ControlState{Status: "stopped"},
},
{
Name: v1alpha1.ClientPort,
Label: "Client",
Position: module.PositionTop,
Source: true,
Configuration: resource.Manager{},
},
{
Name: v1alpha1.ReconcilePort,
Label: "Reconcile",
Position: module.PositionTop,
Source: true,
Configuration: v1alpha1.TinyNode{},
},
{
Name: "request",
Label: "Request",
Position: module.PositionRight,
Source: false,
Configuration: RequestOutput{},
},
{
Name: "response",
Label: "Response",
Position: module.PositionLeft,
Source: true,
Configuration: ResponseInput{},
},
}
}
// Pending requests waiting for response
var pendingRequests = sync.Map{}
type pendingRequest struct {
w http.ResponseWriter
done chan struct{}
ctx context.Context
}
func (c *Component) Handle(
ctx context.Context,
output module.Handler,
port string,
msg any,
) error {
switch port {
case v1alpha1.SettingsPort:
c.settings = msg.(Settings)
return nil
case v1alpha1.ClientPort:
c.resourceManager = msg.(*resource.Manager)
return nil
case v1alpha1.ReconcilePort:
return c.handleReconcile(ctx, output, msg.(*v1alpha1.TinyNode))
case "response":
return c.handleResponse(ctx, msg.(ResponseInput))
default:
return fmt.Errorf("unknown port: %s", port)
}
}
func (c *Component) handleReconcile(
ctx context.Context,
output module.Handler,
node *v1alpha1.TinyNode,
) error {
// Read port from other instances
if node.Status.Metadata != nil {
if portStr, ok := node.Status.Metadata["http-port"]; ok {
fmt.Sscanf(portStr, "%d", &c.port)
}
}
// Leader writes the port
if utils.IsLeader(ctx) && c.port > 0 {
return output(ctx, v1alpha1.ReconcilePort, func(n *v1alpha1.TinyNode) {
if n.Status.Metadata == nil {
n.Status.Metadata = make(map[string]string)
}
n.Status.Metadata["http-port"] = fmt.Sprintf("%d", c.port)
})
}
return nil
}
func (c *Component) handleResponse(ctx context.Context, resp ResponseInput) error {
req, ok := pendingRequests.LoadAndDelete(resp.RequestID)
if !ok {
return fmt.Errorf("request %s not found or already responded", resp.RequestID)
}
pending := req.(*pendingRequest)
defer close(pending.done)
// Set headers
if resp.ContentType != "" {
pending.w.Header().Set("Content-Type", resp.ContentType)
}
for k, v := range resp.Headers {
pending.w.Header().Set(k, v)
}
// Write status code
if resp.StatusCode == 0 {
resp.StatusCode = 200
}
pending.w.WriteHeader(resp.StatusCode)
// Write body
if resp.Body != "" {
pending.w.Write([]byte(resp.Body))
}
return nil
}
func (c *Component) HandleControl(
ctx context.Context,
state any,
port string,
msg any,
) (any, error) {
controlState, _ := state.(*ControlState)
if controlState == nil {
controlState = &ControlState{Status: "stopped"}
}
switch port {
case "start":
if !utils.IsLeader(ctx) {
return controlState, nil
}
if c.isRunning() {
return controlState, nil
}
output := utils.GetOutputHandler(ctx)
if err := c.startServer(ctx, output); err != nil {
return controlState, err
}
url := ""
if len(c.settings.Hostnames) > 0 {
url = fmt.Sprintf("https://%s%s", c.settings.Hostnames[0], c.settings.Path)
}
return &ControlState{
Status: "running",
Port: c.port,
URL: url,
}, nil
case "stop":
if !utils.IsLeader(ctx) {
return controlState, nil
}
c.stopServer()
return &ControlState{
Status: "stopped",
Requests: controlState.Requests,
}, nil
}
return controlState, nil
}
func (c *Component) startServer(ctx context.Context, output module.Handler) error {
c.mu.Lock()
defer c.mu.Unlock()
// Find available port
listener, err := net.Listen("tcp", ":0")
if err != nil {
return err
}
c.listener = listener
c.port = listener.Addr().(*net.TCPAddr).Port
// Create HTTP handler
mux := http.NewServeMux()
mux.HandleFunc(c.settings.Path, func(w http.ResponseWriter, r *http.Request) {
c.handleHTTPRequest(ctx, output, w, r)
})
timeout, _ := time.ParseDuration(c.settings.Timeout)
if timeout == 0 {
timeout = 30 * time.Second
}
c.server = &http.Server{
Handler: mux,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
// Start server
go c.server.Serve(listener)
// Expose via Kubernetes if hostnames configured
if c.resourceManager != nil && len(c.settings.Hostnames) > 0 {
err := c.resourceManager.ExposePort(ctx, resource.ExposePortRequest{
Port: c.port,
Hostnames: c.settings.Hostnames,
})
if err != nil {
c.stopServer()
return err
}
}
return nil
}
func (c *Component) stopServer() {
c.mu.Lock()
defer c.mu.Unlock()
if c.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
c.server.Shutdown(ctx)
c.server = nil
}
}
func (c *Component) isRunning() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.server != nil
}
func (c *Component) handleHTTPRequest(
ctx context.Context,
output module.Handler,
w http.ResponseWriter,
r *http.Request,
) {
// Add CORS headers if enabled
if c.settings.EnableCORS {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK)
return
}
}
// Generate request ID
requestID := fmt.Sprintf("%d", time.Now().UnixNano())
// Read body
body, _ := io.ReadAll(r.Body)
r.Body.Close()
// Extract query params
query := make(map[string]string)
for k, v := range r.URL.Query() {
if len(v) > 0 {
query[k] = v[0]
}
}
// Extract headers
headers := make(map[string]string)
for k, v := range r.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}
// Create pending request
pending := &pendingRequest{
w: w,
done: make(chan struct{}),
ctx: ctx,
}
pendingRequests.Store(requestID, pending)
// Send request to flow
requestOutput := RequestOutput{
RequestID: requestID,
Method: r.Method,
Path: r.URL.Path,
Query: query,
Headers: headers,
Body: string(body),
RemoteAddr: r.RemoteAddr,
ReceivedAt: time.Now().Format(time.RFC3339),
}
// Non-blocking output - response comes via response port
go output(ctx, "request", requestOutput)
// Wait for response or timeout
timeout, _ := time.ParseDuration(c.settings.Timeout)
if timeout == 0 {
timeout = 30 * time.Second
}
select {
case <-pending.done:
// Response was sent
case <-time.After(timeout):
// Timeout - send error response
pendingRequests.Delete(requestID)
w.WriteHeader(http.StatusGatewayTimeout)
w.Write([]byte(`{"error":"request timeout"}`))
}
}
func (c *Component) Instance() module.Component {
return &Component{}
}Usage Example
Settings Configuration
yaml
edges:
- port: _settings
data:
path: "/webhook"
methods: ["POST"]
hostnames: ["api.myapp.example.com"]
timeout: "30s"
enableCors: trueFlow: Webhook Processing
HTTP Request ───► HTTP Server ───► Request Port
▲
│
Response Port ◄─── Process Request ◄─── Transform DataEdge Configuration for Response
yaml
# In the Transform component's output edge
edges:
- port: output
target:
node: http-server
port: response
data:
requestId: "{{$.requestId}}"
statusCode: 200
contentType: "application/json"
body: "{{JSON.stringify($.result)}}"Key Patterns Demonstrated
1. Resource Manager Integration
Create Kubernetes resources:
go
err := c.resourceManager.ExposePort(ctx, resource.ExposePortRequest{
Port: c.port,
Hostnames: c.settings.Hostnames,
})2. Request-Response Correlation
Use request IDs to match responses:
go
pendingRequests.Store(requestID, pending)
// Later, in handleResponse:
req, ok := pendingRequests.LoadAndDelete(resp.RequestID)3. Timeout Handling
go
select {
case <-pending.done:
// Response was sent
case <-time.After(timeout):
// Timeout
w.WriteHeader(http.StatusGatewayTimeout)
}4. Port Discovery via CR
Share port across replicas:
go
// Leader writes
n.Status.Metadata["http-port"] = fmt.Sprintf("%d", c.port)
// Readers read
fmt.Sscanf(portStr, "%d", &c.port)Visual Flow
┌─────────────────────────────────────────────────────────────┐
│ HTTP Server │
│ │
│ External │
│ Request ───────────────────────────────────────────┐ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────┐ │ │
│ │ Kubernetes │ │ │
│ │ Ingress/Service │ │ │
│ └────────┬────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────┐ ┌────────────┐ │ │
│ │ HTTP Server │─────►│ Request │────────────┼────►│ To Flow
│ │ (Goroutine) │ │ Output │ │ │
│ └────────┬────────┘ └────────────┘ │ │
│ │ │ │
│ │ Wait │ │
│ │ │ │
│ ┌────────▼────────┐ ┌────────────┐ │ │
│ │ Pending │◄─────│ Response │◄───────────┼─────│ From Flow
│ │ Request Map │ │ Input │ │ │
│ └─────────────────┘ └────────────┘ │ │
│ │
└─────────────────────────────────────────────────────────────┘Extension Ideas
- Route Patterns: Support path parameters like
/users/:id - Rate Limiting: Add request rate limiting
- Authentication: Built-in auth middleware
- Metrics: Expose Prometheus metrics
- WebSocket: Add WebSocket support