CombineGRPC provides two flavours of functionality, call and handle. Use call to make gRPC calls on the client side, and handle to handle incoming requests on the server side. The library provides versions of call and handle for all RPC styles. Here are the input and output types for each.
When you make a unary call, you provide a request message, and get back a response publisher. The response publisher will either publish a single response, or fail with an RPCError error. Similarly, if you are handling a unary RPC call, you provide a handler that takes a request parameter and returns an AnyPublisher<Response, RPCError>.
You can follow the same intuition to understand the types for the other RPC styles. The only difference is that publishers for the streaming RPCs may publish zero or more messages instead of the single response message that is expected from the unary response publisher.
Quick Tour
Let’s see a quick example. Consider the following protobuf definition for a simple echo service. The service defines one bidirectional RPC. You send it a stream of messages and it echoes the messages back to you.
To implement the server, you provide a handler function that takes an input stream AnyPublisher<EchoRequest, Error> and returns an output stream AnyPublisher<EchoResponse, RPCError>.
import Foundation
import Combine
import CombineGRPC
import GRPC
import NIO
class EchoServiceProvider: EchoProvider {
// Simple bidirectional RPC that echoes back each request message
func sayItBack(context: StreamingResponseCallContext<EchoResponse>) -> EventLoopFuture<(StreamEvent<EchoRequest>) -> Void> {
CombineGRPC.handle(context) { requests in
requests
.map { req in
EchoResponse.with { $0.message = req.message }
}
.setFailureType(to: RPCError.self)
.eraseToAnyPublisher()
}
}
}
Start the server. This is the same process as with Swift gRPC.
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
try! eventLoopGroup.syncShutdownGracefully()
}
// Start the gRPC server and wait until it shuts down.
_ = try Server
.insecure(group: eventLoopGroup)
.withServiceProviders([EchoServiceProvider()])
.bind(host: "localhost", port: 8080)
.flatMap { $0.onClose }
.wait()
Client Side
Now let’s setup our client. Again, it’s the same process that you would go through when using Swift gRPC.
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let channel = ClientConnection
.insecure(group: eventLoopGroup)
.connect(host: "localhost", port: 8080)
let echoClient = EchoServiceNIOClient(channel: channel)
To call the service, create a GRPCExecutor and use its call method. You provide it with a stream of requests AnyPublisher<EchoRequest, Error> and you get back a stream AnyPublisher<EchoResponse, RPCError> of responses from the server.
let requests = repeatElement(EchoRequest.with { $0.message = "hello"}, count: 10)
let requestStream: AnyPublisher<EchoRequest, Error> =
Publishers.Sequence(sequence: requests).eraseToAnyPublisher()
let grpc = GRPCExecutor()
grpc.call(echoClient.sayItBack)(requestStream)
.filter { $0.message == "hello" }
.count()
.sink(receiveValue: { count in
assert(count == 10)
})
That’s it! You have set up bidirectional streaming between a server and client. The method sayItBack of EchoServiceNIOClient is generated by Swift gRPC. Notice that call is curried. You can preselect RPC calls using partial application:
let sayItBack = grpc.call(echoClient.sayItBack)
sayItBack(requestStream).map { response in
// ...
}
Configuring RPC Calls
The GRPCExecutor allows you to configure CallOptions for your RPC calls. You can provide the GRPCExecutor‘s initializer with a stream AnyPublisher<CallOptions, Never>, and the latest CallOptions value will be used when making calls.
let timeoutOptions = CallOptions(timeout: try! .seconds(5))
let grpc = GRPCExecutor(callOptions: Just(timeoutOptions).eraseToAnyPublisher())
Retry Policy
You can also configure GRPCExecutor to automatically retry failed calls by specifying a RetryPolicy. In the following example, we retry calls that fail with status .unauthenticated. We use CallOptions to add a Bearer token to the authorization header, and then retry the call.
// Default CallOptions with no authentication
let callOptions = CurrentValueSubject<CallOptions, Never>(CallOptions())
let grpc = GRPCExecutor(
callOptions: callOptions.eraseToAnyPublisher(),
retry: .failedCall(
upTo: 1,
when: { error in
error.status.code == .unauthenticated
},
delayUntilNext: { retryCount, error in // Useful for implementing exponential backoff
// Retry the call with authentication
callOptions.send(CallOptions(customMetadata: HTTPHeaders([("authorization", "Bearer xxx")])))
return Just(()).eraseToAnyPublisher()
},
didGiveUp: {
print("Authenticated call failed.")
}
)
)
grpc.call(client.authenticatedRpc)(request)
.map { response in
// ...
}
You can imagine doing something along those lines to seamlessly retry calls when an ID token expires. The back-end service replies with status .unauthenticated, you obtain a new ID token using your refresh token, and the call is retried.
More Examples
Check out the CombineGRPC tests for examples of all the different RPC calls and handler implementations. You can find the matching protobuf here.
Logistics
Generating Swift Code from Protobuf
To generate Swift code from your .proto files, you’ll need to first install the protoc Protocol Buffer compiler.
brew install protobuf swift-protobuf grpc-swift
Now you are ready to generate Swift code from protobuf interface definition files.
Let’s generate the message types, gRPC server and gRPC client for Swift.
CombineGRPC
CombineGRPC is a library that provides Combine framework integration for Swift gRPC.
CombineGRPC provides two flavours of functionality,
call
andhandle
. Usecall
to make gRPC calls on the client side, andhandle
to handle incoming requests on the server side. The library provides versions ofcall
andhandle
for all RPC styles. Here are the input and output types for each.Request -> AnyPublisher<Response, RPCError>
Request -> AnyPublisher<Response, RPCError>
AnyPublisher<Request, Error> -> AnyPublisher<Response, RPCError>
AnyPublisher<Request, Error> -> AnyPublisher<Response, RPCError>
When you make a unary call, you provide a request message, and get back a response publisher. The response publisher will either publish a single response, or fail with an
RPCError
error. Similarly, if you are handling a unary RPC call, you provide a handler that takes a request parameter and returns anAnyPublisher<Response, RPCError>
.You can follow the same intuition to understand the types for the other RPC styles. The only difference is that publishers for the streaming RPCs may publish zero or more messages instead of the single response message that is expected from the unary response publisher.
Quick Tour
Let’s see a quick example. Consider the following protobuf definition for a simple echo service. The service defines one bidirectional RPC. You send it a stream of messages and it echoes the messages back to you.
Server Side
To implement the server, you provide a handler function that takes an input stream
AnyPublisher<EchoRequest, Error>
and returns an output streamAnyPublisher<EchoResponse, RPCError>
.Start the server. This is the same process as with Swift gRPC.
Client Side
Now let’s setup our client. Again, it’s the same process that you would go through when using Swift gRPC.
To call the service, create a
GRPCExecutor
and use itscall
method. You provide it with a stream of requestsAnyPublisher<EchoRequest, Error>
and you get back a streamAnyPublisher<EchoResponse, RPCError>
of responses from the server.That’s it! You have set up bidirectional streaming between a server and client. The method
sayItBack
ofEchoServiceNIOClient
is generated by Swift gRPC. Notice that call is curried. You can preselect RPC calls using partial application:Configuring RPC Calls
The
GRPCExecutor
allows you to configureCallOptions
for your RPC calls. You can provide theGRPCExecutor
‘s initializer with a streamAnyPublisher<CallOptions, Never>
, and the latestCallOptions
value will be used when making calls.Retry Policy
You can also configure
GRPCExecutor
to automatically retry failed calls by specifying aRetryPolicy
. In the following example, we retry calls that fail with status.unauthenticated
. We useCallOptions
to add a Bearer token to the authorization header, and then retry the call.You can imagine doing something along those lines to seamlessly retry calls when an ID token expires. The back-end service replies with status
.unauthenticated
, you obtain a new ID token using your refresh token, and the call is retried.More Examples
Check out the CombineGRPC tests for examples of all the different RPC calls and handler implementations. You can find the matching protobuf here.
Logistics
Generating Swift Code from Protobuf
To generate Swift code from your .proto files, you’ll need to first install the protoc Protocol Buffer compiler.
Now you are ready to generate Swift code from protobuf interface definition files.
Let’s generate the message types, gRPC server and gRPC client for Swift.
You’ll see that protoc has created two source files for us.
Adding CombineGRPC to Your Project
You can easily add CombineGRPC to your project using Swift Package Manager. To add the package dependency to your
Package.swift
:Compatibility
Since this library integrates with Combine, it only works on platforms that support Combine. This currently means the following minimum versions:
Feature Status
RPC Client Calls
Server Side Handlers
End-to-end Tests
Contributing
Generate Swift source for the protobuf that is used in tests:
You can then open
Package.swift
in Xcode, build and run the tests.