Skip to content

Resource Manager

The Resource Manager provides components with access to Kubernetes resources and platform capabilities. It enables components to expose ports, create ingress rules, and manage cluster resources.

Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                        RESOURCE MANAGER                                      │
└─────────────────────────────────────────────────────────────────────────────┘

  Component                   Resource Manager                 Kubernetes
      │                            │                               │
      │  ExposePort()              │                               │
      │ ───────────────────────▶   │                               │
      │                            │  Create Service               │
      │                            │  ───────────────────────────▶ │
      │                            │                               │
      │                            │  Create Ingress               │
      │                            │  ───────────────────────────▶ │
      │                            │                               │
      │  ◀───────────────────────  │                               │
      │  Port exposed              │                               │
      │                            │                               │

Accessing the Resource Manager

Components receive the Resource Manager via context:

go
func (c *Component) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    // Get resource manager from context
    rm := resource.FromContext(ctx)
    if rm == nil {
        return fmt.Errorf("resource manager not available")
    }

    // Use resource manager
    err := rm.ExposePort(ctx, resource.ExposePortRequest{
        Port:      8080,
        Hostnames: []string{"api.example.com"},
    })

    return err
}

Core Operations

ExposePort

Expose a port via Kubernetes Service and Ingress:

go
type ExposePortRequest struct {
    Port        int      // Port to expose
    Hostnames   []string // DNS hostnames for ingress
    TLS         bool     // Enable TLS
    ServiceType string   // ClusterIP, NodePort, LoadBalancer
}

func (c *HTTPServer) exposePort(ctx context.Context, rm resource.Manager, port int) error {
    return rm.ExposePort(ctx, resource.ExposePortRequest{
        Port:      port,
        Hostnames: c.settings.Hostnames,
        TLS:       true,
    })
}

UnexposePort

Remove port exposure:

go
func (c *HTTPServer) unexposePort(ctx context.Context, rm resource.Manager) error {
    return rm.UnexposePort(ctx, c.currentPort)
}

UpdateStatus

Update component status in TinyNode:

go
func (c *Component) updateStatus(ctx context.Context, rm resource.Manager) error {
    return rm.UpdateStatus(ctx, resource.StatusUpdate{
        State:       "running",
        Message:     "Processing messages",
        Metadata: map[string]string{
            "processed": "1000",
            "errors":    "5",
        },
    })
}

HTTP Server Pattern

Complete pattern for HTTP server with ingress:

go
type Server struct {
    settings    Settings
    nodeName    string
    currentPort int
    listener    net.Listener
}

type Settings struct {
    Hostnames []string `json:"hostnames" title:"Hostnames"`
    TLS       bool     `json:"tls" title:"Enable TLS" default:"true"`
}

func (s *Server) Handle(ctx context.Context, output module.Handler, port string, msg any) error {
    switch port {
    case v1alpha1.SettingsPort:
        s.settings = msg.(Settings)
        return nil

    case v1alpha1.ReconcilePort:
        return s.handleReconcile(ctx, output, msg)
    }
    return nil
}

func (s *Server) handleReconcile(ctx context.Context, output module.Handler, msg any) error {
    node, ok := msg.(v1alpha1.TinyNode)
    if !ok {
        return nil
    }

    s.nodeName = node.Name
    rm := resource.FromContext(ctx)

    // Check if port already assigned
    configuredPort := 0
    if portStr, ok := node.Status.Metadata["http-server-port"]; ok {
        configuredPort, _ = strconv.Atoi(portStr)
    }

    if configuredPort == s.currentPort && s.currentPort > 0 {
        return nil // Already running
    }

    // No port yet - leader assigns
    if configuredPort == 0 {
        if utils.IsLeader(ctx) {
            return s.startAndExpose(ctx, output, rm)
        }
        return nil
    }

    // Port assigned - all pods start
    return s.startOnPort(ctx, configuredPort)
}

func (s *Server) startAndExpose(ctx context.Context, output module.Handler, rm resource.Manager) error {
    // Start on random port
    listener, err := net.Listen("tcp", ":0")
    if err != nil {
        return err
    }

    port := listener.Addr().(*net.TCPAddr).Port
    s.listener = listener
    s.currentPort = port

    // Expose via ingress
    if len(s.settings.Hostnames) > 0 {
        err = rm.ExposePort(ctx, resource.ExposePortRequest{
            Port:      port,
            Hostnames: s.settings.Hostnames,
            TLS:       s.settings.TLS,
        })
        if err != nil {
            listener.Close()
            return err
        }
    }

    // Publish port to metadata
    output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
        if node.Status.Metadata == nil {
            node.Status.Metadata = make(map[string]string)
        }
        node.Status.Metadata["http-server-port"] = strconv.Itoa(port)
    })

    // Start serving
    go http.Serve(listener, s.handler())

    return nil
}

Kubernetes Resources Created

Service

yaml
apiVersion: v1
kind: Service
metadata:
  name: http-server-abc123
  namespace: tinysystems
  labels:
    tinysystems.io/node: http-server-abc123
spec:
  type: ClusterIP
  selector:
    app: http-module
  ports:
    - name: http
      port: 8080
      targetPort: 8080

Ingress

yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: http-server-abc123
  namespace: tinysystems
  annotations:
    cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
  tls:
    - hosts:
        - api.example.com
      secretName: http-server-abc123-tls
  rules:
    - host: api.example.com
      http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: http-server-abc123
                port:
                  number: 8080

Resource Lifecycle

┌─────────────────────────────────────────────────────────────────────────────┐
│                       RESOURCE LIFECYCLE                                     │
└─────────────────────────────────────────────────────────────────────────────┘

1. Component starts


2. Leader calls ExposePort()

   ├─▶ Service created
   ├─▶ Ingress created
   ├─▶ TLS certificate requested


3. Resources ready

   ├─▶ Traffic flows to component


4. Settings change (hostnames)


5. Leader calls ExposePort() again

   ├─▶ Ingress updated


6. TinyNode deleted


7. Resources cleaned up automatically

   ├─▶ Service deleted
   ├─▶ Ingress deleted
   └─▶ TLS secret deleted

Owner References

Resources are owned by the TinyNode:

go
func (rm *Manager) createService(ctx context.Context, req ExposePortRequest) error {
    service := &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name:      rm.nodeName,
            Namespace: rm.namespace,
            OwnerReferences: []metav1.OwnerReference{
                {
                    APIVersion: "operator.tinysystems.io/v1alpha1",
                    Kind:       "TinyNode",
                    Name:       rm.nodeName,
                    UID:        rm.nodeUID,
                    Controller: ptr.To(true),
                },
            },
        },
        Spec: corev1.ServiceSpec{
            // ...
        },
    }

    return rm.client.Create(ctx, service)
}

When the TinyNode is deleted, Kubernetes garbage collection removes the owned resources.

Error Handling

go
func (c *Server) exposeWithRetry(ctx context.Context, rm resource.Manager, req resource.ExposePortRequest) error {
    var lastErr error

    for attempt := 0; attempt < 3; attempt++ {
        err := rm.ExposePort(ctx, req)
        if err == nil {
            return nil
        }

        lastErr = err

        // Check if retryable
        if errors.IsConflict(err) || errors.IsServerTimeout(err) {
            time.Sleep(time.Duration(attempt+1) * time.Second)
            continue
        }

        // Not retryable
        return err
    }

    return fmt.Errorf("failed after 3 attempts: %w", lastErr)
}

Best Practices

1. Leader-Only Resource Creation

go
if utils.IsLeader(ctx) {
    rm.ExposePort(ctx, req)
}

2. Idempotent Operations

go
if s.currentPort == configuredPort {
    return nil // Already exposed
}

3. Cleanup on Error

go
listener, err := net.Listen("tcp", ":0")
if err != nil {
    return err
}

err = rm.ExposePort(ctx, req)
if err != nil {
    listener.Close() // Cleanup on failure
    return err
}

4. Update Metadata After Exposure

go
// After successful exposure
output(ctx, v1alpha1.ReconcilePort, func(node *v1alpha1.TinyNode) {
    node.Status.Metadata["exposed-port"] = strconv.Itoa(port)
    node.Status.Metadata["exposed-at"] = time.Now().Format(time.RFC3339)
})

Next Steps

Build flow-based applications on Kubernetes