Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -997,9 +997,14 @@ Future<Result, BrokerConsumerStatsImpl> ClientConnection::newConsumerStats(uint6
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}
pendingConsumerStatsMap_.insert(std::make_pair(requestId, promise));
lock.unlock();
if (mockingRequests_.load(std::memory_order_acquire) && mockServer_ != nullptr &&
mockServer_->sendRequest("CONSUMER_STATS", requestId)) {
return promise.getFuture();
}
sendCommand(Commands::newConsumerStats(consumerId, requestId));
return promise.getFuture();
}
Expand Down
4 changes: 2 additions & 2 deletions lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
mockingRequests_.store(true, std::memory_order_release);
}

void handleKeepAliveTimeout();

private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
Expand Down Expand Up @@ -284,8 +286,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

void handleGetLastMessageIdTimeout(const ASIO_ERROR&, const LastMessageIdRequestData& data);

void handleKeepAliveTimeout();

template <typename Handler>
inline AllocHandler<Handler> customAllocReadHandler(Handler h) {
return AllocHandler<Handler>(readHandlerAllocator_, h);
Expand Down
17 changes: 12 additions & 5 deletions lib/MockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,18 @@ class MockServer : public std::enable_shared_from_this<MockServer> {
}
});
}
schedule(connection, request + std::to_string(requestId), iter->second, [connection, requestId] {
proto::CommandSuccess success;
success.set_request_id(requestId);
connection->handleSuccess(success);
});
schedule(connection, request + std::to_string(requestId), iter->second,
[connection, request, requestId] {
if (request == "CONSUMER_STATS") {
proto::CommandConsumerStatsResponse response;
response.set_request_id(requestId);
connection->handleConsumerStatsResponse(response);
} else {
proto::CommandSuccess success;
success.set_request_id(requestId);
connection->handleSuccess(success);
}
});
return true;
} else {
return false;
Expand Down
63 changes: 31 additions & 32 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -847,48 +847,47 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(const BrokerConsumerSt
Lock lock(mutex_);
MultiTopicsBrokerConsumerStatsPtr statsPtr =
std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
auto latchPtr = std::make_shared<std::atomic_size_t>(numberTopicPartitions_->load());
lock.unlock();

size_t i = 0;
consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
size_t index = i++;
auto weakSelf = weak_from_this();
consumer->getBrokerConsumerStatsAsync([this, weakSelf, latchPtr, statsPtr, index, callback](
Result result, const BrokerConsumerStats& stats) {
auto self = weakSelf.lock();
if (self) {
handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
}
auto failedResult = std::make_shared<std::atomic<Result>>(ResultOk);
consumers_.forEachValue(
[this, &latchPtr, &statsPtr, &i, callback, &failedResult](const ConsumerImplPtr& consumer) {
size_t index = i++;
auto weakSelf = weak_from_this();
consumer->getBrokerConsumerStatsAsync(
[this, weakSelf, latchPtr, statsPtr, index, callback, failedResult](
Result result, const BrokerConsumerStats& stats) {
auto self = weakSelf.lock();
if (!self) {
return;
}
if (result == ResultOk) {
std::lock_guard<std::mutex> lock{mutex_};
statsPtr->add(stats, index);
} else {
// Store the first failed result as the final failed result
auto expected = ResultOk;
failedResult->compare_exchange_strong(expected, result);
}
if (--*latchPtr == 0) {
if (auto firstFailedResult = failedResult->load(std::memory_order_acquire);
firstFailedResult == ResultOk) {
callback(ResultOk, BrokerConsumerStats{statsPtr});
} else {
// Fail the whole operation if any of the consumers failed
callback(firstFailedResult, {});
}
}
});
});
});
}

void MultiTopicsConsumerImpl::getLastMessageIdAsync(const BrokerGetLastMessageIdCallback& callback) {
callback(ResultOperationNotSupported, GetLastMessageIdResponse());
}

void MultiTopicsConsumerImpl::handleGetConsumerStats(Result res,
const BrokerConsumerStats& brokerConsumerStats,
const LatchPtr& latchPtr,
const MultiTopicsBrokerConsumerStatsPtr& statsPtr,
size_t index,
const BrokerConsumerStatsCallback& callback) {
Lock lock(mutex_);
if (res == ResultOk) {
latchPtr->countdown();
statsPtr->add(brokerConsumerStats, index);
} else {
lock.unlock();
callback(res, BrokerConsumerStats());
return;
}
if (latchPtr->getCount() == 0) {
lock.unlock();
callback(ResultOk, BrokerConsumerStats(statsPtr));
}
}

std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::vector<std::string>& topics) {
TopicNamePtr topicNamePtr = std::shared_ptr<TopicName>();

Expand Down
4 changes: 0 additions & 4 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "ConsumerImpl.h"
#include "ConsumerInterceptors.h"
#include "Future.h"
#include "Latch.h"
#include "LookupDataResult.h"
#include "SynchronizedHashMap.h"
#include "TestUtil.h"
Expand Down Expand Up @@ -100,9 +99,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
uint64_t getNumberOfConnectedConsumer() override;
void hasMessageAvailableAsync(const HasMessageAvailableCallback& callback) override;

void handleGetConsumerStats(Result, const BrokerConsumerStats&, const LatchPtr&,
const MultiTopicsBrokerConsumerStatsPtr&, size_t,
const BrokerConsumerStatsCallback&);
// return first topic name when all topics name valid, or return null pointer
static std::shared_ptr<TopicName> topicNamesValid(const std::vector<std::string>& topics);
void unsubscribeOneTopicAsync(const std::string& topic, const ResultCallback& callback);
Expand Down
1 change: 1 addition & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "WaitUtils.h"
#include "lib/ClientConnection.h"
#include "lib/Future.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
#include "lib/MessageIdUtil.h"
#include "lib/MultiTopicsConsumerImpl.h"
Expand Down
30 changes: 30 additions & 0 deletions tests/MultiTopicsConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
#include <pulsar/Client.h>

#include <chrono>
#include <future>
#include <thread>

#include "ThreadSafeMessages.h"
#include "lib/LogUtils.h"
#include "lib/MockServer.h"
#include "tests/PulsarFriend.h"

static const std::string lookupUrl = "pulsar://localhost:6650";

Expand Down Expand Up @@ -142,3 +146,29 @@ TEST(MultiTopicsConsumerTest, testAcknowledgeInvalidMessageId) {

client.close();
}

TEST(MultiTopicsConsumerTest, testGetConsumerStatsFail) {
Client client{lookupUrl};
std::vector<std::string> topics{"testGetConsumerStatsFail0", "testGetConsumerStatsFail1"};
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", consumer));

auto connection = *PulsarFriend::getConnections(client).begin();
auto mockServer = std::make_shared<MockServer>(connection);
connection->attachMockServer(mockServer);

mockServer->setRequestDelay({{"CONSUMER_STATS", 3000}});
auto future = std::async(std::launch::async, [&consumer]() {
BrokerConsumerStats stats;
return consumer.getBrokerConsumerStats(stats);
});
// Trigger the `getBrokerConsumerStats` in a new thread
future.wait_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::milliseconds(100));

connection->handleKeepAliveTimeout();
ASSERT_EQ(ResultDisconnected, future.get());

mockServer->close();
client.close();
}
Loading