Skip to content

Client Pool

The Client Pool manages gRPC connections to remote modules. It provides connection reuse, health monitoring, and automatic reconnection.

Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                         CLIENT POOL ARCHITECTURE                             │
└─────────────────────────────────────────────────────────────────────────────┘

                    ┌─────────────────────────────────────┐
                    │           CLIENT POOL               │
                    │                                     │
                    │  connections map:                   │
                    │  ┌────────────────────────────────┐ │
                    │  │ common-module-v1 → *grpc.Conn  │ │
                    │  │ http-module-v1   → *grpc.Conn  │ │
                    │  │ api-module-v1    → *grpc.Conn  │ │
                    │  └────────────────────────────────┘ │
                    │                                     │
                    └──────────────────┬──────────────────┘

                    ┌──────────────────┼──────────────────┐
                    │                  │                  │
                    ▼                  ▼                  ▼
           common-module-v1     http-module-v1     api-module-v1
             :50051               :50051              :50051

Pool Implementation

go
type Pool struct {
    connections map[string]*grpc.ClientConn
    mu          sync.RWMutex
    dialOpts    []grpc.DialOption
}

func NewPool() *Pool {
    return &Pool{
        connections: make(map[string]*grpc.ClientConn),
        dialOpts: []grpc.DialOption{
            grpc.WithTransportCredentials(insecure.NewCredentials()),
            grpc.WithKeepaliveParams(keepalive.ClientParameters{
                Time:                10 * time.Second,
                Timeout:             3 * time.Second,
                PermitWithoutStream: true,
            }),
        },
    }
}

Core Operations

Register

Register a module when discovered:

go
func (p *Pool) Register(moduleName, address string) error {
    p.mu.Lock()
    defer p.mu.Unlock()

    // Already registered with same address?
    if conn, exists := p.connections[moduleName]; exists {
        if conn.Target() == address {
            return nil // No change needed
        }
        // Address changed - close old connection
        conn.Close()
    }

    // Create new connection
    conn, err := grpc.Dial(address, p.dialOpts...)
    if err != nil {
        return fmt.Errorf("failed to dial %s: %w", address, err)
    }

    p.connections[moduleName] = conn
    log.Info("registered module", "name", moduleName, "address", address)

    return nil
}

Get

Get a connection for a module:

go
func (p *Pool) Get(moduleName string) (*grpc.ClientConn, bool) {
    p.mu.RLock()
    defer p.mu.RUnlock()

    conn, ok := p.connections[moduleName]
    if !ok {
        return nil, false
    }

    // Check connection health
    state := conn.GetState()
    if state == connectivity.Shutdown {
        return nil, false
    }

    return conn, true
}

Unregister

Remove a module when it goes away:

go
func (p *Pool) Unregister(moduleName string) {
    p.mu.Lock()
    defer p.mu.Unlock()

    if conn, exists := p.connections[moduleName]; exists {
        conn.Close()
        delete(p.connections, moduleName)
        log.Info("unregistered module", "name", moduleName)
    }
}

Integration with Discovery

The TinyModuleReconciler updates the pool:

go
func (r *TinyModuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    instance := &v1alpha1.TinyModule{}
    err := r.Get(ctx, req.NamespacedName, instance)

    if errors.IsNotFound(err) {
        // Module removed
        r.ClientPool.Unregister(req.Name)
        return ctrl.Result{}, nil
    }

    // Skip our own module
    if req.Name == r.Module.GetNameSanitised() {
        return ctrl.Result{}, nil
    }

    // Register remote module
    if instance.Status.Addr != "" {
        r.ClientPool.Register(req.Name, instance.Status.Addr)
    }

    return ctrl.Result{}, nil
}

Connection Lifecycle

┌─────────────────────────────────────────────────────────────────────────────┐
│                       CONNECTION LIFECYCLE                                   │
└─────────────────────────────────────────────────────────────────────────────┘

1. TinyModule CR Created


2. Reconciler sees new module


3. Pool.Register() called

   ├─▶ grpc.Dial() - Create connection

   ├─▶ Store in connections map


4. Connection used for messages

   ├─▶ Keepalive maintains health

   ├─▶ Auto-reconnect on failure


5. TinyModule CR Deleted


6. Pool.Unregister() called

   └─▶ conn.Close() - Cleanup

Health Monitoring

Connection States

go
func (p *Pool) GetHealth() map[string]string {
    p.mu.RLock()
    defer p.mu.RUnlock()

    health := make(map[string]string)
    for name, conn := range p.connections {
        health[name] = conn.GetState().String()
    }
    return health
}

Proactive Health Checks

go
func (p *Pool) HealthCheck(ctx context.Context) {
    p.mu.RLock()
    modules := make([]string, 0, len(p.connections))
    for name := range p.connections {
        modules = append(modules, name)
    }
    p.mu.RUnlock()

    for _, name := range modules {
        conn, ok := p.Get(name)
        if !ok {
            continue
        }

        state := conn.GetState()
        if state != connectivity.Ready {
            log.Warn("connection not ready",
                "module", name,
                "state", state.String(),
            )
            // Trigger reconnection
            conn.Connect()
        }
    }
}

Reconnection

gRPC handles reconnection automatically with exponential backoff:

go
// Default backoff configuration
grpc.WithConnectParams(grpc.ConnectParams{
    Backoff: backoff.Config{
        BaseDelay:  1 * time.Second,
        Multiplier: 1.6,
        Jitter:     0.2,
        MaxDelay:   120 * time.Second,
    },
    MinConnectTimeout: 20 * time.Second,
})

Load Balancing

Round-Robin

For services with multiple pods:

go
grpc.WithDefaultServiceConfig(`{
    "loadBalancingPolicy": "round_robin"
}`)

DNS Resolution

gRPC resolves DNS to find all endpoints:

go
// Address format for DNS resolution
address := "dns:///common-module-v1.tinysystems.svc.cluster.local:50051"

Error Handling

Connection Failure

go
conn, ok := p.Get(moduleName)
if !ok {
    return fmt.Errorf("module not available: %s", moduleName)
}

// Check state before sending
state := conn.GetState()
if state != connectivity.Ready && state != connectivity.Idle {
    return fmt.Errorf("connection not ready: %s (state: %s)", moduleName, state)
}

Send Failure

go
client := pb.NewModuleServiceClient(conn)
resp, err := client.Send(ctx, msg)
if err != nil {
    st, ok := status.FromError(err)
    if ok {
        switch st.Code() {
        case codes.Unavailable:
            // Module is down
            log.Error("module unavailable", "module", moduleName)
        case codes.DeadlineExceeded:
            // Timeout
            log.Error("request timed out", "module", moduleName)
        }
    }
    return err
}

Metrics

Pool Metrics

go
var (
    poolConnections = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "tinysystems_client_pool_connections",
            Help: "Number of connections by state",
        },
        []string{"module", "state"},
    )

    poolOperations = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "tinysystems_client_pool_operations_total",
            Help: "Pool operations by type",
        },
        []string{"operation", "status"},
    )
)

func (p *Pool) updateMetrics() {
    p.mu.RLock()
    defer p.mu.RUnlock()

    for name, conn := range p.connections {
        state := conn.GetState().String()
        poolConnections.WithLabelValues(name, state).Set(1)
    }
}

Debugging

List Connections

go
func (p *Pool) Debug() {
    p.mu.RLock()
    defer p.mu.RUnlock()

    for name, conn := range p.connections {
        log.Info("pool connection",
            "module", name,
            "target", conn.Target(),
            "state", conn.GetState().String(),
        )
    }
}

Log Connection Events

go
// Enable gRPC logging
os.Setenv("GRPC_GO_LOG_VERBOSITY_LEVEL", "99")
os.Setenv("GRPC_GO_LOG_SEVERITY_LEVEL", "info")

Best Practices

1. Lazy Connection

Don't connect until needed:

go
grpc.WithBlock() // Don't use - blocks startup
// Instead, let connections establish on first use

2. Share Connections

One connection per module is sufficient:

go
// Good: Single connection handles many RPCs
conn := pool.Get("http-module-v1")
client1 := pb.NewModuleServiceClient(conn)
client2 := pb.NewModuleServiceClient(conn)

// Bad: Creating new connections per call

3. Handle Graceful Shutdown

go
func (p *Pool) Close() {
    p.mu.Lock()
    defer p.mu.Unlock()

    for name, conn := range p.connections {
        if err := conn.Close(); err != nil {
            log.Error("failed to close connection", "module", name, "error", err)
        }
    }
    p.connections = make(map[string]*grpc.ClientConn)
}

4. Monitor Pool Health

go
// Periodic health check
go func() {
    ticker := time.NewTicker(30 * time.Second)
    for range ticker.C {
        pool.HealthCheck(context.Background())
        pool.updateMetrics()
    }
}()

Next Steps

Build flow-based applications on Kubernetes