Client Pool
The Client Pool manages gRPC connections to remote modules. It provides connection reuse, health monitoring, and automatic reconnection.
Overview
┌─────────────────────────────────────────────────────────────────────────────┐
│ CLIENT POOL ARCHITECTURE │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────┐
│ CLIENT POOL │
│ │
│ connections map: │
│ ┌────────────────────────────────┐ │
│ │ common-module-v1 → *grpc.Conn │ │
│ │ http-module-v1 → *grpc.Conn │ │
│ │ api-module-v1 → *grpc.Conn │ │
│ └────────────────────────────────┘ │
│ │
└──────────────────┬──────────────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
common-module-v1 http-module-v1 api-module-v1
:50051 :50051 :50051Pool Implementation
go
type Pool struct {
connections map[string]*grpc.ClientConn
mu sync.RWMutex
dialOpts []grpc.DialOption
}
func NewPool() *Pool {
return &Pool{
connections: make(map[string]*grpc.ClientConn),
dialOpts: []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
},
}
}Core Operations
Register
Register a module when discovered:
go
func (p *Pool) Register(moduleName, address string) error {
p.mu.Lock()
defer p.mu.Unlock()
// Already registered with same address?
if conn, exists := p.connections[moduleName]; exists {
if conn.Target() == address {
return nil // No change needed
}
// Address changed - close old connection
conn.Close()
}
// Create new connection
conn, err := grpc.Dial(address, p.dialOpts...)
if err != nil {
return fmt.Errorf("failed to dial %s: %w", address, err)
}
p.connections[moduleName] = conn
log.Info("registered module", "name", moduleName, "address", address)
return nil
}Get
Get a connection for a module:
go
func (p *Pool) Get(moduleName string) (*grpc.ClientConn, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
conn, ok := p.connections[moduleName]
if !ok {
return nil, false
}
// Check connection health
state := conn.GetState()
if state == connectivity.Shutdown {
return nil, false
}
return conn, true
}Unregister
Remove a module when it goes away:
go
func (p *Pool) Unregister(moduleName string) {
p.mu.Lock()
defer p.mu.Unlock()
if conn, exists := p.connections[moduleName]; exists {
conn.Close()
delete(p.connections, moduleName)
log.Info("unregistered module", "name", moduleName)
}
}Integration with Discovery
The TinyModuleReconciler updates the pool:
go
func (r *TinyModuleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
instance := &v1alpha1.TinyModule{}
err := r.Get(ctx, req.NamespacedName, instance)
if errors.IsNotFound(err) {
// Module removed
r.ClientPool.Unregister(req.Name)
return ctrl.Result{}, nil
}
// Skip our own module
if req.Name == r.Module.GetNameSanitised() {
return ctrl.Result{}, nil
}
// Register remote module
if instance.Status.Addr != "" {
r.ClientPool.Register(req.Name, instance.Status.Addr)
}
return ctrl.Result{}, nil
}Connection Lifecycle
┌─────────────────────────────────────────────────────────────────────────────┐
│ CONNECTION LIFECYCLE │
└─────────────────────────────────────────────────────────────────────────────┘
1. TinyModule CR Created
│
▼
2. Reconciler sees new module
│
▼
3. Pool.Register() called
│
├─▶ grpc.Dial() - Create connection
│
├─▶ Store in connections map
│
▼
4. Connection used for messages
│
├─▶ Keepalive maintains health
│
├─▶ Auto-reconnect on failure
│
▼
5. TinyModule CR Deleted
│
▼
6. Pool.Unregister() called
│
└─▶ conn.Close() - CleanupHealth Monitoring
Connection States
go
func (p *Pool) GetHealth() map[string]string {
p.mu.RLock()
defer p.mu.RUnlock()
health := make(map[string]string)
for name, conn := range p.connections {
health[name] = conn.GetState().String()
}
return health
}Proactive Health Checks
go
func (p *Pool) HealthCheck(ctx context.Context) {
p.mu.RLock()
modules := make([]string, 0, len(p.connections))
for name := range p.connections {
modules = append(modules, name)
}
p.mu.RUnlock()
for _, name := range modules {
conn, ok := p.Get(name)
if !ok {
continue
}
state := conn.GetState()
if state != connectivity.Ready {
log.Warn("connection not ready",
"module", name,
"state", state.String(),
)
// Trigger reconnection
conn.Connect()
}
}
}Reconnection
gRPC handles reconnection automatically with exponential backoff:
go
// Default backoff configuration
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 1 * time.Second,
Multiplier: 1.6,
Jitter: 0.2,
MaxDelay: 120 * time.Second,
},
MinConnectTimeout: 20 * time.Second,
})Load Balancing
Round-Robin
For services with multiple pods:
go
grpc.WithDefaultServiceConfig(`{
"loadBalancingPolicy": "round_robin"
}`)DNS Resolution
gRPC resolves DNS to find all endpoints:
go
// Address format for DNS resolution
address := "dns:///common-module-v1.tinysystems.svc.cluster.local:50051"Error Handling
Connection Failure
go
conn, ok := p.Get(moduleName)
if !ok {
return fmt.Errorf("module not available: %s", moduleName)
}
// Check state before sending
state := conn.GetState()
if state != connectivity.Ready && state != connectivity.Idle {
return fmt.Errorf("connection not ready: %s (state: %s)", moduleName, state)
}Send Failure
go
client := pb.NewModuleServiceClient(conn)
resp, err := client.Send(ctx, msg)
if err != nil {
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.Unavailable:
// Module is down
log.Error("module unavailable", "module", moduleName)
case codes.DeadlineExceeded:
// Timeout
log.Error("request timed out", "module", moduleName)
}
}
return err
}Metrics
Pool Metrics
go
var (
poolConnections = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "tinysystems_client_pool_connections",
Help: "Number of connections by state",
},
[]string{"module", "state"},
)
poolOperations = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "tinysystems_client_pool_operations_total",
Help: "Pool operations by type",
},
[]string{"operation", "status"},
)
)
func (p *Pool) updateMetrics() {
p.mu.RLock()
defer p.mu.RUnlock()
for name, conn := range p.connections {
state := conn.GetState().String()
poolConnections.WithLabelValues(name, state).Set(1)
}
}Debugging
List Connections
go
func (p *Pool) Debug() {
p.mu.RLock()
defer p.mu.RUnlock()
for name, conn := range p.connections {
log.Info("pool connection",
"module", name,
"target", conn.Target(),
"state", conn.GetState().String(),
)
}
}Log Connection Events
go
// Enable gRPC logging
os.Setenv("GRPC_GO_LOG_VERBOSITY_LEVEL", "99")
os.Setenv("GRPC_GO_LOG_SEVERITY_LEVEL", "info")Best Practices
1. Lazy Connection
Don't connect until needed:
go
grpc.WithBlock() // Don't use - blocks startup
// Instead, let connections establish on first use2. Share Connections
One connection per module is sufficient:
go
// Good: Single connection handles many RPCs
conn := pool.Get("http-module-v1")
client1 := pb.NewModuleServiceClient(conn)
client2 := pb.NewModuleServiceClient(conn)
// Bad: Creating new connections per call3. Handle Graceful Shutdown
go
func (p *Pool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
for name, conn := range p.connections {
if err := conn.Close(); err != nil {
log.Error("failed to close connection", "module", name, "error", err)
}
}
p.connections = make(map[string]*grpc.ClientConn)
}4. Monitor Pool Health
go
// Periodic health check
go func() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
pool.HealthCheck(context.Background())
pool.updateMetrics()
}
}()Next Steps
- Module Discovery - How modules are discovered
- Cross-Module Communication - Using the pool
- gRPC Fundamentals - gRPC details