Skip to content

Commit

Permalink
[RSDK-5915] Unimplemented subtypes (#3607)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjperez authored Feb 29, 2024
1 parent dbb6dab commit eb1f095
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 24 deletions.
56 changes: 32 additions & 24 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,18 @@ type RobotClient struct {
address string
dialOptions []rpc.DialOption

mu sync.RWMutex
resourceNames []resource.Name
resourceRPCAPIs []resource.RPCAPI
resourceClients map[resource.Name]resource.Resource
remoteNameMap map[resource.Name]resource.Name
changeChan chan bool
notifyParent func()
conn grpc.ReconfigurableClientConn
client pb.RobotServiceClient
refClient *grpcreflect.Client
connected atomic.Bool
mu sync.RWMutex
resourceNames []resource.Name
resourceRPCAPIs []resource.RPCAPI
resourceClients map[resource.Name]resource.Resource
remoteNameMap map[resource.Name]resource.Name
changeChan chan bool
notifyParent func()
conn grpc.ReconfigurableClientConn
client pb.RobotServiceClient
refClient *grpcreflect.Client
connected atomic.Bool
rpcSubtypesUnimplemented bool

activeBackgroundWorkers sync.WaitGroup
backgroundCtx context.Context
Expand Down Expand Up @@ -578,12 +579,25 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour
ctx, cancel = contextutils.ContextWithTimeoutIfNoDeadline(ctx, defaultResourcesTimeout)
defer cancel()
}

resp, err := rc.client.ResourceNames(ctx, &pb.ResourceNamesRequest{})
if err != nil {
return nil, nil, err
}

var resTypes []resource.RPCAPI

resources := make([]resource.Name, 0, len(resp.Resources))
for _, name := range resp.Resources {
newName := rprotoutils.ResourceNameFromProto(name)
resources = append(resources, newName)
}

// resource has previously returned an unimplemented response, skip rpc call
if rc.rpcSubtypesUnimplemented {
return resources, resTypes, nil
}

typesResp, err := rc.client.ResourceRPCSubtypes(ctx, &pb.ResourceRPCSubtypesRequest{})
if err == nil {
reflSource := grpcurl.DescriptorSourceFromServer(ctx, rc.refClient)
Expand Down Expand Up @@ -613,13 +627,8 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour
if s, ok := status.FromError(err); !(ok && (s.Code() == codes.Unimplemented)) {
return nil, nil, err
}
}

resources := make([]resource.Name, 0, len(resp.Resources))

for _, name := range resp.Resources {
newName := rprotoutils.ResourceNameFromProto(name)
resources = append(resources, newName)
// prevent future calls to ResourceRPCSubtypes
rc.rpcSubtypesUnimplemented = true
}

return resources, resTypes, nil
Expand All @@ -635,16 +644,15 @@ func (rc *RobotClient) Refresh(ctx context.Context) (err error) {

func (rc *RobotClient) updateResources(ctx context.Context) error {
// call metadata service.

names, rpcAPIs, err := rc.resources(ctx)
// only return if it is not unimplemented - means a bigger error came up
if err != nil && status.Code(err) != codes.Unimplemented {
return err
}
if err == nil {
rc.resourceNames = make([]resource.Name, 0, len(names))
rc.resourceNames = append(rc.resourceNames, names...)
rc.resourceRPCAPIs = rpcAPIs
}

rc.resourceNames = make([]resource.Name, 0, len(names))
rc.resourceNames = append(rc.resourceNames, names...)
rc.resourceRPCAPIs = rpcAPIs

rc.updateRemoteNameMap()

Expand Down
181 changes: 181 additions & 0 deletions robot/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"
"time"

"github.com/edaniels/golog"
"github.com/golang/geo/r3"
"github.com/google/uuid"
"github.com/jhump/protoreflect/grpcreflect"
Expand Down Expand Up @@ -98,6 +99,186 @@ var finalResources = []resource.Name{

var pose1 = spatialmath.NewZeroPose()

type mockRPCSubtypesUnimplemented struct {
pb.UnimplementedRobotServiceServer
ResourceNamesFunc func(*pb.ResourceNamesRequest) (*pb.ResourceNamesResponse, error)
}

func (ms *mockRPCSubtypesUnimplemented) ResourceNames(
ctx context.Context, req *pb.ResourceNamesRequest,
) (*pb.ResourceNamesResponse, error) {
return ms.ResourceNamesFunc(req)
}

type mockRPCSubtypesImplemented struct {
mockRPCSubtypesUnimplemented
ResourceNamesFunc func(*pb.ResourceNamesRequest) (*pb.ResourceNamesResponse, error)
}

func (ms *mockRPCSubtypesImplemented) ResourceRPCSubtypes(
ctx context.Context, _ *pb.ResourceRPCSubtypesRequest,
) (*pb.ResourceRPCSubtypesResponse, error) {
return &pb.ResourceRPCSubtypesResponse{}, nil
}

func (ms *mockRPCSubtypesImplemented) ResourceNames(
ctx context.Context, req *pb.ResourceNamesRequest,
) (*pb.ResourceNamesResponse, error) {
return ms.ResourceNamesFunc(req)
}

var resourceFunc1 = func(*pb.ResourceNamesRequest) (*pb.ResourceNamesResponse, error) {
board1 := board.Named("board1")
rNames := []*commonpb.ResourceName{
{
Namespace: string(board1.API.Type.Namespace),
Type: board1.API.Type.Name,
Subtype: board1.API.SubtypeName,
Name: board1.Name,
},
}
return &pb.ResourceNamesResponse{Resources: rNames}, nil
}

var resourceFunc2 = func(*pb.ResourceNamesRequest) (*pb.ResourceNamesResponse, error) {
board1 := board.Named("board1")
board2 := board.Named("board2")
rNames := []*commonpb.ResourceName{
{
Namespace: string(board1.API.Type.Namespace),
Type: board1.API.Type.Name,
Subtype: board1.API.SubtypeName,
Name: board1.Name,
},
{
Namespace: string(board2.API.Type.Namespace),
Type: board2.API.Type.Name,
Subtype: board2.API.SubtypeName,
Name: board2.Name,
},
}
return &pb.ResourceNamesResponse{Resources: rNames}, nil
}

func makeRPCServer(logger golog.Logger, option rpc.ServerOption) (rpc.Server, net.Listener, error) {
err := errors.New("failed to make rpc server")
var addr string
var listener net.Listener
var server rpc.Server

for i := 0; i < 10; i++ {
port, err := utils.TryReserveRandomPort()
if err != nil {
continue
}

addr = fmt.Sprint("localhost:", port)
listener, err = net.Listen("tcp", addr)
if err != nil {
continue
}

server, err = rpc.NewServer(logger, option)
if err != nil {
continue
}
return server, listener, nil
}
return nil, nil, err
}

func TestUnimplementedRPCSubtypes(t *testing.T) {
var client1 *RobotClient // test implemented
var client2 *RobotClient // test unimplemented
ctx1, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
ctx2, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
logger1 := logging.NewTestLogger(t)
logger2 := logging.NewTestLogger(t)

rpcServer1, listener1, err := makeRPCServer(logger1.AsZap(), rpc.WithUnauthenticated())
test.That(t, err, test.ShouldBeNil)

rpcServer2, listener2, err := makeRPCServer(logger2.AsZap(), rpc.WithUnauthenticated())
test.That(t, err, test.ShouldBeNil)

defer func() {
test.That(t, rpcServer2.Stop(), test.ShouldBeNil)
}()
defer func() {
test.That(t, rpcServer1.Stop(), test.ShouldBeNil)
}()

implementedService := mockRPCSubtypesImplemented{
ResourceNamesFunc: resourceFunc1,
}

unimplementedService := mockRPCSubtypesUnimplemented{
ResourceNamesFunc: resourceFunc1,
}

err = rpcServer1.RegisterServiceServer(
ctx1,
&pb.RobotService_ServiceDesc,
&implementedService,
pb.RegisterRobotServiceHandlerFromEndpoint,
)
test.That(t, err, test.ShouldBeNil)

err = rpcServer2.RegisterServiceServer(
ctx2,
&pb.RobotService_ServiceDesc,
&unimplementedService,
pb.RegisterRobotServiceHandlerFromEndpoint)
test.That(t, err, test.ShouldBeNil)

go func() {
test.That(t, rpcServer1.Serve(listener1), test.ShouldBeNil)
}()
go func() {
test.That(t, rpcServer2.Serve(listener2), test.ShouldBeNil)
}()

client1, err = New(
ctx1,
listener1.Addr().String(),
logger1,
)
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, client1.Close(ctx1), test.ShouldBeNil)
}()
test.That(t, client1.Connected(), test.ShouldBeTrue)
test.That(t, client1.rpcSubtypesUnimplemented, test.ShouldBeFalse)

client2, err = New(
ctx2,
listener2.Addr().String(),
logger2,
)
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, client2.Close(ctx2), test.ShouldBeNil)
}()
test.That(t, client2.Connected(), test.ShouldBeTrue)
test.That(t, client2.rpcSubtypesUnimplemented, test.ShouldBeTrue)

// verify that the unimplemented check does not affect calls to ResourceNames
test.That(t, len(client2.ResourceNames()), test.ShouldEqual, 1)
_, err = client2.ResourceByName(board.Named("board1"))
test.That(t, err, test.ShouldBeNil)

// still unimplemented, but with two resources
unimplementedService.ResourceNamesFunc = resourceFunc2
err = client2.Refresh(ctx2)
test.That(t, err, test.ShouldBeNil)

test.That(t, len(client2.ResourceNames()), test.ShouldEqual, 2)
_, err = client2.ResourceByName(board.Named("board2"))
test.That(t, err, test.ShouldBeNil)
}

func TestStatusClient(t *testing.T) {
logger := logging.NewTestLogger(t)
listener1, err := net.Listen("tcp", "localhost:0")
Expand Down

0 comments on commit eb1f095

Please sign in to comment.