简介
Channel是brpc的通信类,继承于RpcChannel,RpcChannel是protobuf中的类,用于服务通信
Channel
- init方法
int Init(butil::EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);
RpcChannel
生成服务实现类
服务端
class $dllexport_decl $$classname$ : public ::$proto_ns$::Service {
protected:
$classname$() = default;
public:
using Stub = $classname$_Stub;
$classname$(const $classname$&) = delete;
$classname$& operator=(const $classname$&) = delete;
virtual ~$classname$() = default;
static const ::$proto_ns$::ServiceDescriptor* descriptor();
$virts$;
// implements Service ----------------------------------------------
const ::$proto_ns$::ServiceDescriptor* GetDescriptor() override;
void CallMethod(const ::$proto_ns$::MethodDescriptor* method,
::$proto_ns$::RpcController* controller,
const ::$proto_ns$::Message* request,
::$proto_ns$::Message* response,
::google::protobuf::Closure* done) override;
const ::$proto_ns$::Message& GetRequestPrototype(
const ::$proto_ns$::MethodDescriptor* method) const override;
const ::$proto_ns$::Message& GetResponsePrototype(
const ::$proto_ns$::MethodDescriptor* method) const override;
};
void $classname$::CallMethod(
const ::$proto_ns$::MethodDescriptor* method,
::$proto_ns$::RpcController* controller,
const ::$proto_ns$::Message* request,
::$proto_ns$::Message* response, ::google::protobuf::Closure* done) {
ABSL_DCHECK_EQ(method->service(), $file_level_service_descriptors$[$index$]);
switch (method->index()) {
case $index$:
$name$(controller,
::$proto_ns$::internal::DownCast<const $input$*>(request),
::$proto_ns$::internal::DownCast<$output$*>(response), done);
break;
default:
ABSL_LOG(FATAL) << "Bad method index; this should never happen.";
break;
}
}
客户端
class $dllexport_decl $$classname$_Stub final : public $classname$ {
public:
$classname$_Stub(::$proto_ns$::RpcChannel* channel);
$classname$_Stub(::$proto_ns$::RpcChannel* channel,
::$proto_ns$::Service::ChannelOwnership ownership);
$classname$_Stub(const $classname$_Stub&) = delete;
$classname$_Stub& operator=(const $classname$_Stub&) = delete;
~$classname$_Stub() override;
inline ::$proto_ns$::RpcChannel* channel() { return channel_; }
void $classname$_Stub::$name$(::$proto_ns$::RpcController* controller,
const $input$* request,
$output$* response, ::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method($index$), controller,
request, response, done);
}
// implements $classname$ ------------------------------------------
$impls$;
private:
::$proto_ns$::RpcChannel* channel_;
bool owns_channel_;
};
调用
调用是通过CallMethod
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done)
callid是通过id_create_impl函数来创建。创建Id,设置join_butex , first_ver 和locked_ver
static int id_create_impl(
bthread_id_t* id, void* data,
int (*on_error)(bthread_id_t, void*, int),
int (*on_error2)(bthread_id_t, void*, int, const std::string&)) {
IdResourceId slot;
Id* const meta = get_resource(&slot);
if (meta) {
meta->data = data;
meta->on_error = on_error;
meta->on_error2 = on_error2;
CHECK(meta->pending_q.empty());
uint32_t* butex = meta->butex;
if (0 == *butex || *butex + ID_MAX_RANGE + 2 < *butex) {
// Skip 0 so that bthread_id_t is never 0
// avoid overflow to make comparisons simpler.
*butex = 1;
}
*meta->join_butex = *butex;
meta->first_ver = *butex;
meta->locked_ver = *butex + 1;
*id = make_id(*butex, slot);
return 0;
}
return ENOMEM;
}
同步调用
当done为NULL时是同步调用
Join会等待响应结果
void Join(CallId id) {
bthread_id_join(id);
}
int bthread_id_join(bthread_id_t id) {
const bthread::IdResourceId slot = bthread::get_slot(id);
bthread::Id* const meta = address_resource(slot);
if (!meta) {
// The id is not created yet, this join is definitely wrong.
return EINVAL;
}
const uint32_t id_ver = bthread::get_version(id);
uint32_t* join_butex = meta->join_butex;
while (1) {
meta->mutex.lock();
const bool has_ver = meta->has_version(id_ver);
const uint32_t expected_ver = *join_butex;
meta->mutex.unlock();
if (!has_ver) {
break;
}
if (bthread::butex_wait(join_butex, expected_ver, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
}
return 0;
}
处理响应
是通过ProcessRpcResponse,其调用ControllerPrivateAccessor.OnResponse方法,内部仍然是调用Controller::OnVersionedRPCReturned方法