Skip to content

gRPC Fundamentals

TinySystems uses gRPC for communication between modules. Understanding gRPC basics helps when building and debugging distributed flows.

Overview

+-----------------------------------------------------------------------------+
|                         GRPC IN TINYSYSTEMS                                  |
+-----------------------------------------------------------------------------+

  Module A                     Kubernetes                     Module B
  (common-module)              Network                        (http-module)
       |                          |                               |
       |  Message to              |                               |
       |  http-module             |                               |
       |       |                  |                               |
       |       v                  |                               |
       |  Scheduler               |                               |
       |       |                  |                               |
       |       | Not local?       |                               |
       |       |                  |                               |
       |       v                  |                               |
       |  ClientPool.Get()        |                               |
       |       |                  |                               |
       |       v                  |                               |
       |  gRPC Client -------------------------------> gRPC Server
       |                          |                               |
       |                          |                   Scheduler.Handle()
       |                          |                               |
       +--------------------------+-------------------------------+

What is gRPC?

gRPC is a high-performance RPC framework that uses:

  • Protocol Buffers: Efficient binary serialization
  • HTTP/2: Multiplexing, streaming, header compression
  • Service Definition: Strongly-typed APIs

TinySystems gRPC Service

The SDK defines a simple gRPC service:

protobuf
service ModuleService {
    rpc Send(Message) returns (Response);
}

message Message {
    string to = 1;        // "node-name.port-name"
    string from = 2;      // "source-node.source-port"
    bytes data = 3;       // Serialized message data
    map<string, string> metadata = 4;
}

message Response {
    bytes data = 1;       // Response data (if any)
    string error = 2;     // Error message (if failed)
}

Server Implementation

Each module runs a gRPC server:

go
// Simplified from SDK
type Server struct {
    scheduler *Scheduler
    pb.UnimplementedModuleServiceServer
}

func (s *Server) Send(ctx context.Context, msg *pb.Message) (*pb.Response, error) {
    // Deserialize the message
    data, err := deserialize(msg.Data)
    if err != nil {
        return nil, err
    }

    // Route to the scheduler
    result := s.scheduler.Handle(ctx, runner.Msg{
        To:   msg.To,
        From: msg.From,
        Data: data,
    })

    // Return response
    if err, ok := result.(error); ok {
        return &pb.Response{Error: err.Error()}, nil
    }

    respData, _ := serialize(result)
    return &pb.Response{Data: respData}, nil
}

Client Implementation

The SDK maintains a pool of gRPC clients:

go
func (s *Scheduler) sendToRemote(ctx context.Context, moduleName string, msg runner.Msg) error {
    conn, ok := s.clientPool.Get(moduleName)
    if !ok {
        return fmt.Errorf("module not discovered: %s", moduleName)
    }

    client := pb.NewModuleServiceClient(conn)

    data, _ := serialize(msg.Data)

    resp, err := client.Send(ctx, &pb.Message{
        To:   msg.To,
        From: msg.From,
        Data: data,
    })

    if err != nil {
        return err
    }
    if resp.Error != "" {
        return errors.New(resp.Error)
    }

    return nil
}

Connection Management

Keepalive

gRPC connections use keepalive to detect dead connections:

go
// Client-side keepalive
conn, err := grpc.Dial(address,
    grpc.WithKeepaliveParams(keepalive.ClientParameters{
        Time:                10 * time.Second,  // Ping every 10s
        Timeout:             3 * time.Second,   // Wait 3s for pong
        PermitWithoutStream: true,              // Ping even when idle
    }),
)

// Server-side keepalive
server := grpc.NewServer(
    grpc.KeepaliveParams(keepalive.ServerParameters{
        Time:    10 * time.Second,
        Timeout: 3 * time.Second,
    }),
    grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
        MinTime:             5 * time.Second,
        PermitWithoutStream: true,
    }),
)

Connection States

+---------------------------------------------------------------------------+
|                        GRPC CONNECTION STATES                              |
+---------------------------------------------------------------------------+

    IDLE ------> CONNECTING ------> READY
      |              |                 |
      |              |                 |
      |              v                 v
      |         TRANSIENT_FAILURE <-------
      |              |
      |              | Retry
      |              v
      +--------> CONNECTING
                     |
                     v
                 SHUTDOWN
StateDescription
IDLENo activity, will connect on demand
CONNECTINGEstablishing connection
READYConnected and healthy
TRANSIENT_FAILUREFailed, will retry
SHUTDOWNConnection closed

Error Handling

gRPC Status Codes

CodeNameDescription
0OKSuccess
1CANCELLEDOperation cancelled
2UNKNOWNUnknown error
3INVALID_ARGUMENTInvalid request
4DEADLINE_EXCEEDEDTimeout
5NOT_FOUNDResource not found
13INTERNALInternal error
14UNAVAILABLEService unavailable

Handling Errors

go
import "google.golang.org/grpc/status"

resp, err := client.Send(ctx, msg)
if err != nil {
    st, ok := status.FromError(err)
    if ok {
        switch st.Code() {
        case codes.Unavailable:
            // Retry later
        case codes.DeadlineExceeded:
            // Timeout
        default:
            // Other error
        }
    }
}

Kubernetes Service

gRPC traffic routes through Kubernetes Services:

yaml
apiVersion: v1
kind: Service
metadata:
  name: my-module-v1
  namespace: tinysystems
spec:
  type: ClusterIP
  selector:
    app: my-module
  ports:
    - name: grpc
      port: 50051
      targetPort: 50051

Load Balancing

Client-Side (Default)

gRPC uses client-side load balancing:

go
// Round-robin across endpoints
conn, _ := grpc.Dial(
    "dns:///my-module-v1.tinysystems.svc.cluster.local:50051",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)

Service-Side

Kubernetes Service provides basic load balancing for initial connection.

Debugging gRPC

Enable Logging

bash
# Set gRPC verbosity
export GRPC_GO_LOG_VERBOSITY_LEVEL=99
export GRPC_GO_LOG_SEVERITY_LEVEL=info

Check Connectivity

bash
# Test gRPC endpoint
grpcurl -plaintext my-module-v1.tinysystems:50051 list

# Health check
grpcurl -plaintext my-module-v1.tinysystems:50051 grpc.health.v1.Health/Check

Monitor Connections

Look for these log patterns:

# Successful connection
"connected to module" module=http-module-v1 address=http-module-v1:50051

# Connection failure
"failed to connect to module" module=http-module-v1 error="connection refused"

# Message sent
"sent message via gRPC" to=http-server-xyz.request

Performance Considerations

Connection Reuse

Connections are pooled and reused:

go
// One connection per remote module
conn, _ := s.clientPool.Get(moduleName)
// Connection is reused for all messages to that module

Message Size

Default limits:

  • Max send: 4MB
  • Max receive: 4MB

For larger messages, consider streaming or chunking.

Timeouts

Set appropriate timeouts:

go
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

resp, err := client.Send(ctx, msg)

Security

mTLS (Production)

In production, enable mutual TLS:

go
// Load certificates
creds, _ := credentials.NewClientTLSFromFile("ca.crt", "")

conn, _ := grpc.Dial(address,
    grpc.WithTransportCredentials(creds),
)

Network Policies

Restrict gRPC traffic:

yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-grpc
  namespace: tinysystems
spec:
  podSelector:
    matchLabels:
      app: my-module
  ingress:
    - from:
        - namespaceSelector:
            matchLabels:
              name: tinysystems
      ports:
        - port: 50051
          protocol: TCP

Next Steps

Build flow-based applications on Kubernetes