Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ typedef std::map<ChannelBase*, Socket*> ChannelToIdMap;
class SubChannel : public SocketUser {
public:
ChannelBase* chan;
ChannelOwnership ownership;

// internal channel is deleted after the fake Socket is SetFailed
void BeforeRecycle(Socket*) {
delete chan;
if (ownership == OWNS_CHANNEL) {
delete chan;
}
delete this;
}

Expand Down Expand Up @@ -83,7 +86,8 @@ class ChannelBalancer : public SharedLoadBalancer {
ChannelBalancer() {}
~ChannelBalancer();
int Init(const char* lb_name);
int AddChannel(ChannelBase* sub_channel, const std::string& tag,
int AddChannel(ChannelBase* sub_channel,
const SelectiveChannel::SubChannelOptions& subopt,
SelectiveChannel::ChannelHandle* handle);
void RemoveAndDestroyChannel(const SelectiveChannel::ChannelHandle& handle);
int SelectChannel(const LoadBalancer::SelectIn& in, SelectOut* out);
Expand Down Expand Up @@ -168,7 +172,8 @@ int ChannelBalancer::Init(const char* lb_name) {
return SharedLoadBalancer::Init(lb_name);
}

int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag,
int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
const SelectiveChannel::SubChannelOptions& subopt,
SelectiveChannel::ChannelHandle* handle) {
if (NULL == sub_channel) {
LOG(ERROR) << "Parameter[sub_channel] is NULL";
Expand All @@ -185,6 +190,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag
return -1;
}
sub_chan->chan = sub_channel;
sub_chan->ownership = subopt.ownership;
SocketId sock_id;
SocketOptions options;
options.user = sub_chan;
Expand All @@ -206,7 +212,7 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag
<< sock_id << " is disabled";
return -1;
}
if (!AddServer(ServerId(sock_id, tag))) {
if (!AddServer(ServerId(sock_id, subopt.tag))) {
LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
// sub_chan will be deleted when the socket is recycled.
ptr->SetFailed();
Expand All @@ -215,10 +221,10 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, const std::string& tag
return -1;
}
// The health-check-related reference has been held on created.
_chan_map[sub_channel]= ptr.get();
_chan_map[sub_channel] = ptr.get();
if (handle) {
handle->id = sock_id;
handle->tag = tag;
handle->tag = subopt.tag;
}
return 0;
}
Expand Down Expand Up @@ -532,20 +538,15 @@ bool SelectiveChannel::initialized() const {
}

int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
ChannelHandle* handle) {
return AddChannel(sub_channel, "", handle);
}

int SelectiveChannel::AddChannel(ChannelBase* sub_channel,
const std::string& tag,
const SubChannelOptions& option,
ChannelHandle* handle) {
schan::ChannelBalancer* lb =
static_cast<schan::ChannelBalancer*>(_chan._lb.get());
if (lb == NULL) {
LOG(ERROR) << "You must call Init() to initialize a SelectiveChannel";
return -1;
}
return lb->AddChannel(sub_channel, tag, handle);
return lb->AddChannel(sub_channel, option, handle);
}

void SelectiveChannel::RemoveAndDestroyChannel(const ChannelHandle& handle) {
Expand Down
17 changes: 15 additions & 2 deletions src/brpc/selective_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class SelectiveChannel : public ChannelBase/*non-copyable*/ {
std::string tag;
};

struct SubChannelOptions {
std::string tag;
ChannelOwnership ownership = OWNS_CHANNEL;
};

SelectiveChannel();
~SelectiveChannel();

Expand All @@ -69,8 +74,16 @@ class SelectiveChannel : public ChannelBase/*non-copyable*/ {
// On success, handle is set with the key for removal.
// NOTE: Different from pchan, schan can add channels at any time.
// Returns 0 on success, -1 otherwise.
int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle);
int AddChannel(ChannelBase* sub_channel, const std::string& tag, ChannelHandle* handle);
int AddChannel(ChannelBase* sub_channel, ChannelHandle* handle) {
return AddChannel(sub_channel, SubChannelOptions(), handle);
}
int AddChannel(ChannelBase* sub_channel, const std::string& tag, ChannelHandle* handle) {
SubChannelOptions option;
option.tag = tag;
return AddChannel(sub_channel, option, handle);
}
int AddChannel(ChannelBase* sub_channel, const SubChannelOptions& option,
ChannelHandle* handle);

// Remove and destroy the sub_channel associated with `handle'.
void RemoveAndDestroyChannel(const ChannelHandle& handle);
Expand Down
Loading