Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/iocore/cache/CacheDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ enum CacheEventType {
CACHE_EVENT_SCAN_OPERATION_BLOCKED = CACHE_EVENT_EVENTS_START + 23,
CACHE_EVENT_SCAN_OPERATION_FAILED = CACHE_EVENT_EVENTS_START + 24,
CACHE_EVENT_SCAN_DONE = CACHE_EVENT_EVENTS_START + 25,
CACHE_EVENT_OPEN_DIR_RETRY = CACHE_EVENT_EVENTS_START + 26,
//////////////////////////
// Internal error codes //
//////////////////////////
Expand Down
191 changes: 191 additions & 0 deletions include/tsutil/Bravo.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,195 @@ template <typename T = std::shared_mutex, size_t SLOT_SIZE = 256, int SLOWDOWN_G

using shared_mutex = shared_mutex_impl<>;

/**
ts::bravo::recursive_shared_mutex_impl

A recursive version of shared_mutex_impl that allows the same thread
to acquire exclusive and shared locks multiple times.

Uses DenseThreadId for efficient per-thread state tracking without map overhead.
Optimized to minimize expensive std::this_thread::get_id() calls by using
DenseThreadId for ownership tracking.

Mixed lock semantics:
- Upgrade prevention: A thread holding a shared lock cannot acquire an exclusive lock
(would cause deadlock). try_lock() returns false, lock() asserts.
- Downgrade allowed: A thread holding an exclusive lock can acquire a shared lock.
*/
template <typename T = shared_mutex_impl<>, size_t SLOT_SIZE = 256> class recursive_shared_mutex_impl
{
// Use a sentinel value for "no owner" - DenseThreadId values are 0 to SLOT_SIZE-1
static constexpr size_t NO_OWNER = SLOT_SIZE;

public:
recursive_shared_mutex_impl() = default;
~recursive_shared_mutex_impl() = default;

// No copying or moving
recursive_shared_mutex_impl(recursive_shared_mutex_impl const &) = delete;
recursive_shared_mutex_impl &operator=(recursive_shared_mutex_impl const &) = delete;
recursive_shared_mutex_impl(recursive_shared_mutex_impl &&) = delete;
recursive_shared_mutex_impl &operator=(recursive_shared_mutex_impl &&) = delete;

////
// Exclusive locking (recursive)
//
void
lock()
{
size_t tid = DenseThreadId::self();
// Fast path: check if we already own the lock
if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
++_exclusive_count;
return;
}
// Upgrade prevention: cannot acquire exclusive lock while holding shared lock
ThreadState &state = _thread_states[tid];
debug_assert(state.shared_count == 0);
_mutex.lock();
_exclusive_owner.store(tid, std::memory_order_relaxed);
_exclusive_count = 1;
}

bool
try_lock()
{
size_t tid = DenseThreadId::self();
// Fast path: check if we already own the lock
if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
++_exclusive_count;
return true;
}
// Upgrade prevention: cannot acquire exclusive lock while holding shared lock
ThreadState &state = _thread_states[tid];
if (state.shared_count > 0) {
return false;
}
if (_mutex.try_lock()) {
_exclusive_owner.store(tid, std::memory_order_relaxed);
_exclusive_count = 1;
return true;
}
return false;
}

void
unlock()
{
if (--_exclusive_count == 0) {
_exclusive_owner.store(NO_OWNER, std::memory_order_relaxed);
_mutex.unlock();
}
}

////
// Shared locking (recursive)
//
void
lock_shared(Token &token)
{
size_t tid = DenseThreadId::self();
ThreadState &state = _thread_states[tid];

// Fast path: already holding shared lock - just increment count (most common case)
size_t count = state.shared_count;
if (count > 0) {
state.shared_count = count + 1;
token = state.cached_token;
return;
}

// Check for downgrade: if we hold exclusive lock, allow shared lock without acquiring underlying
if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
state.shared_count = 1;
token = 0; // Special token indicating we're under exclusive lock
return;
}

// Slow path: acquire underlying lock
_mutex.lock_shared(state.cached_token);
state.shared_count = 1;
token = state.cached_token;
}

bool
try_lock_shared(Token &token)
{
size_t tid = DenseThreadId::self();
ThreadState &state = _thread_states[tid];

// Fast path: already holding shared lock - just increment count (most common case)
size_t count = state.shared_count;
if (count > 0) {
state.shared_count = count + 1;
token = state.cached_token;
return true;
}

// Check for downgrade: if we hold exclusive lock, allow shared lock without acquiring underlying
if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
state.shared_count = 1;
token = 0; // Special token indicating we're under exclusive lock
return true;
}

// Slow path: try to acquire underlying lock
if (_mutex.try_lock_shared(state.cached_token)) {
state.shared_count = 1;
token = state.cached_token;
return true;
}
return false;
}

void
unlock_shared(const Token /* token */)
{
size_t tid = DenseThreadId::self();
ThreadState &state = _thread_states[tid];
if (--state.shared_count == 0) {
// Only unlock underlying mutex if we're not holding exclusive lock
if (_exclusive_owner.load(std::memory_order_relaxed) != tid) {
_mutex.unlock_shared(state.cached_token);
}
state.cached_token = 0;
}
}

// Extensions to check
bool
has_unique_lock()
{
return _exclusive_owner.load(std::memory_order_relaxed) == DenseThreadId::self();
}

bool
has_shared_lock()
{
size_t tid = DenseThreadId::self();
ThreadState &state = _thread_states[tid];

if (state.shared_count > 0) {
return true;
} else if (_exclusive_owner.load(std::memory_order_relaxed) == tid) {
return true;
} else {
return false;
}
}

private:
struct ThreadState {
size_t shared_count{0};
Token cached_token{0};
};

T _mutex;
std::atomic<size_t> _exclusive_owner{NO_OWNER};
size_t _exclusive_count{0};
std::array<ThreadState, SLOT_SIZE> _thread_states{};
};

using recursive_shared_mutex = recursive_shared_mutex_impl<>;

} // namespace ts::bravo
61 changes: 39 additions & 22 deletions src/iocore/cache/Cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,28 @@ DbgCtl dbg_ctl_cache_init{"cache_init"};
DbgCtl dbg_ctl_cache_hosting{"cache_hosting"};
DbgCtl dbg_ctl_cache_update{"cache_update"};

CacheVC *
new_CacheVC_for_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const HttpConfigAccessor *params,
StripeSM *stripe)
{
CacheVC *cache_vc = new_CacheVC(cont);

cache_vc->first_key = *key;
cache_vc->key = *key;
cache_vc->earliest_key = *key;
cache_vc->stripe = stripe;
cache_vc->vio.op = VIO::READ;
cache_vc->op_type = static_cast<int>(CacheOpType::Read);
cache_vc->frag_type = CACHE_FRAG_TYPE_HTTP;
cache_vc->params = params;
cache_vc->request.copy_shallow(request);

ts::Metrics::Gauge::increment(cache_rsb.status[cache_vc->op_type].active);
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[cache_vc->op_type].active);

return cache_vc;
}

} // end anonymous namespace

// Global list of the volumes created
Expand Down Expand Up @@ -543,20 +565,24 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
OpenDirEntry *od = nullptr;
CacheVC *c = nullptr;

// Read-While-Writer
// This OpenDirEntry lookup doesn't need stripe mutex lock because OpenDir has own reader-writer lock
od = stripe->open_read(key);
if (od != nullptr) {
c = new_CacheVC_for_read(cont, key, request, params, stripe);
c->od = od;
cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr);
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
}
return &c->_action;
}

{
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked() || (od = stripe->open_read(key)) || stripe->directory.probe(key, stripe, &result, &last_collision)) {
c = new_CacheVC(cont);
c->first_key = c->key = c->earliest_key = *key;
c->stripe = stripe;
c->vio.op = VIO::READ;
c->op_type = static_cast<int>(CacheOpType::Read);
ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active);
ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active);
c->request.copy_shallow(request);
c->frag_type = CACHE_FRAG_TYPE_HTTP;
c->params = params;
c->od = od;
if (!lock.is_locked() || stripe->directory.probe(key, stripe, &result, &last_collision)) {
c = new_CacheVC_for_read(cont, key, request, params, stripe);
}
if (!lock.is_locked()) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
Expand All @@ -566,9 +592,7 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
if (!c) {
goto Lmiss;
}
if (c->od) {
goto Lwriter;
}

// hit
c->dir = c->first_dir = result;
c->last_collision = last_collision;
Expand All @@ -587,13 +611,6 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[static_cast<int>(CacheOpType::Read)].failure);
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<void *>(-ECACHE_NO_DOC));
return ACTION_RESULT_DONE;
Lwriter:
cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr);
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
}
return &c->_action;
Lcallreturn:
if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
Expand Down
Loading