gRPC Fundamentals
TinySystems uses gRPC for communication between modules. Understanding gRPC basics helps when building and debugging distributed flows.
Overview
+-----------------------------------------------------------------------------+
| GRPC IN TINYSYSTEMS |
+-----------------------------------------------------------------------------+
Module A Kubernetes Module B
(common-module) Network (http-module)
| | |
| Message to | |
| http-module | |
| | | |
| v | |
| Scheduler | |
| | | |
| | Not local? | |
| | | |
| v | |
| ClientPool.Get() | |
| | | |
| v | |
| gRPC Client -------------------------------> gRPC Server
| | |
| | Scheduler.Handle()
| | |
+--------------------------+-------------------------------+What is gRPC?
gRPC is a high-performance RPC framework that uses:
- Protocol Buffers: Efficient binary serialization
- HTTP/2: Multiplexing, streaming, header compression
- Service Definition: Strongly-typed APIs
TinySystems gRPC Service
The SDK defines a simple gRPC service:
protobuf
service ModuleService {
rpc Send(Message) returns (Response);
}
message Message {
string to = 1; // "node-name.port-name"
string from = 2; // "source-node.source-port"
bytes data = 3; // Serialized message data
map<string, string> metadata = 4;
}
message Response {
bytes data = 1; // Response data (if any)
string error = 2; // Error message (if failed)
}Server Implementation
Each module runs a gRPC server:
go
// Simplified from SDK
type Server struct {
scheduler *Scheduler
pb.UnimplementedModuleServiceServer
}
func (s *Server) Send(ctx context.Context, msg *pb.Message) (*pb.Response, error) {
// Deserialize the message
data, err := deserialize(msg.Data)
if err != nil {
return nil, err
}
// Route to the scheduler
result := s.scheduler.Handle(ctx, runner.Msg{
To: msg.To,
From: msg.From,
Data: data,
})
// Return response
if err, ok := result.(error); ok {
return &pb.Response{Error: err.Error()}, nil
}
respData, _ := serialize(result)
return &pb.Response{Data: respData}, nil
}Client Implementation
The SDK maintains a pool of gRPC clients:
go
func (s *Scheduler) sendToRemote(ctx context.Context, moduleName string, msg runner.Msg) error {
conn, ok := s.clientPool.Get(moduleName)
if !ok {
return fmt.Errorf("module not discovered: %s", moduleName)
}
client := pb.NewModuleServiceClient(conn)
data, _ := serialize(msg.Data)
resp, err := client.Send(ctx, &pb.Message{
To: msg.To,
From: msg.From,
Data: data,
})
if err != nil {
return err
}
if resp.Error != "" {
return errors.New(resp.Error)
}
return nil
}Connection Management
Keepalive
gRPC connections use keepalive to detect dead connections:
go
// Client-side keepalive
conn, err := grpc.Dial(address,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second, // Ping every 10s
Timeout: 3 * time.Second, // Wait 3s for pong
PermitWithoutStream: true, // Ping even when idle
}),
)
// Server-side keepalive
server := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
}),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: 5 * time.Second,
PermitWithoutStream: true,
}),
)Connection States
+---------------------------------------------------------------------------+
| GRPC CONNECTION STATES |
+---------------------------------------------------------------------------+
IDLE ------> CONNECTING ------> READY
| | |
| | |
| v v
| TRANSIENT_FAILURE <-------
| |
| | Retry
| v
+--------> CONNECTING
|
v
SHUTDOWN| State | Description |
|---|---|
| IDLE | No activity, will connect on demand |
| CONNECTING | Establishing connection |
| READY | Connected and healthy |
| TRANSIENT_FAILURE | Failed, will retry |
| SHUTDOWN | Connection closed |
Error Handling
gRPC Status Codes
| Code | Name | Description |
|---|---|---|
| 0 | OK | Success |
| 1 | CANCELLED | Operation cancelled |
| 2 | UNKNOWN | Unknown error |
| 3 | INVALID_ARGUMENT | Invalid request |
| 4 | DEADLINE_EXCEEDED | Timeout |
| 5 | NOT_FOUND | Resource not found |
| 13 | INTERNAL | Internal error |
| 14 | UNAVAILABLE | Service unavailable |
Handling Errors
go
import "google.golang.org/grpc/status"
resp, err := client.Send(ctx, msg)
if err != nil {
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.Unavailable:
// Retry later
case codes.DeadlineExceeded:
// Timeout
default:
// Other error
}
}
}Kubernetes Service
gRPC traffic routes through Kubernetes Services:
yaml
apiVersion: v1
kind: Service
metadata:
name: my-module-v1
namespace: tinysystems
spec:
type: ClusterIP
selector:
app: my-module
ports:
- name: grpc
port: 50051
targetPort: 50051Load Balancing
Client-Side (Default)
gRPC uses client-side load balancing:
go
// Round-robin across endpoints
conn, _ := grpc.Dial(
"dns:///my-module-v1.tinysystems.svc.cluster.local:50051",
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)Service-Side
Kubernetes Service provides basic load balancing for initial connection.
Debugging gRPC
Enable Logging
bash
# Set gRPC verbosity
export GRPC_GO_LOG_VERBOSITY_LEVEL=99
export GRPC_GO_LOG_SEVERITY_LEVEL=infoCheck Connectivity
bash
# Test gRPC endpoint
grpcurl -plaintext my-module-v1.tinysystems:50051 list
# Health check
grpcurl -plaintext my-module-v1.tinysystems:50051 grpc.health.v1.Health/CheckMonitor Connections
Look for these log patterns:
# Successful connection
"connected to module" module=http-module-v1 address=http-module-v1:50051
# Connection failure
"failed to connect to module" module=http-module-v1 error="connection refused"
# Message sent
"sent message via gRPC" to=http-server-xyz.requestPerformance Considerations
Connection Reuse
Connections are pooled and reused:
go
// One connection per remote module
conn, _ := s.clientPool.Get(moduleName)
// Connection is reused for all messages to that moduleMessage Size
Default limits:
- Max send: 4MB
- Max receive: 4MB
For larger messages, consider streaming or chunking.
Timeouts
Set appropriate timeouts:
go
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
resp, err := client.Send(ctx, msg)Security
mTLS (Production)
In production, enable mutual TLS:
go
// Load certificates
creds, _ := credentials.NewClientTLSFromFile("ca.crt", "")
conn, _ := grpc.Dial(address,
grpc.WithTransportCredentials(creds),
)Network Policies
Restrict gRPC traffic:
yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-grpc
namespace: tinysystems
spec:
podSelector:
matchLabels:
app: my-module
ingress:
- from:
- namespaceSelector:
matchLabels:
name: tinysystems
ports:
- port: 50051
protocol: TCPNext Steps
- Internal Routing - In-module routing
- Cross-Module Communication - Between modules
- Module Discovery - Service discovery