Skip to content
Open
16 changes: 16 additions & 0 deletions io/io/inc/TBufferFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ class TBufferFile : public TBufferIO {
TStreamerInfo *fInfo{nullptr}; ///< Pointer to TStreamerInfo object writing/reading the buffer
InfoList_t fInfoStack; ///< Stack of pointers to the TStreamerInfos

using ByteCountLocator_t = std::size_t; // This might become a pair<chunk_number, local_offset> if we implement chunked keys
using ByteCountStack_t = std::vector<ByteCountLocator_t>;
ByteCountStack_t fByteCountStack; ///<! Stack to keep track of byte count storage positions

using ByteCount_t = std::uint64_t; ///< Type used to store byte count values, can be changed to uint32_t if we implement chunked keys
using ByteCountFinder_t = std::unordered_map<ByteCountLocator_t, ByteCount_t>;
// fByteCounts will be stored either in the header/summary tkey or at the end
// of the last segment/chunk for a large TKey.
ByteCountFinder_t fByteCounts; ///< Map to find the byte count value for a given position

// Default ctor
TBufferFile() {} // NOLINT: not allowed to use = default because of TObject::kIsOnHeap detection, see ROOT-10300

Expand All @@ -62,6 +72,7 @@ class TBufferFile : public TBufferIO {
Long64_t CheckByteCount(ULong64_t startpos, ULong64_t bcnt, const TClass *clss, const char* classname);
void CheckCount(UInt_t offset) override;
UInt_t CheckObject(UInt_t offset, const TClass *cl, Bool_t readClass = kFALSE);
UInt_t ReserveByteCount();

void WriteObjectClass(const void *actualObjStart, const TClass *actualClass, Bool_t cacheReuse) override;

Expand Down Expand Up @@ -106,6 +117,11 @@ class TBufferFile : public TBufferIO {

TObject *ReadObject(const TClass *cl) override;

const ByteCountFinder_t &GetByteCounts() const { return fByteCounts; }
void SetByteCounts(ByteCountFinder_t &&byteCounts) { fByteCounts = std::move(byteCounts); }

void ResetMap() override;

using TBufferIO::CheckObject;

// basic types and arrays of basic types
Expand Down
211 changes: 180 additions & 31 deletions io/io/src/TBufferFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@
\ingroup IO

The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket.

Limits:
Byte Count positions (refered to as `cntpos` or `startpos` in the code)
- Internally they are passed as a ULong64_t
- In legacy user code they are held in UInt_t variables (See type used in
TBufferFile::WriteVersion and TBufferFile::ReadVersion)
- The positions passed by value are limited to 4GB (kOverflowPosition)
and the positions are held in fByteCountStack but retrieved only when
higher than 4GB (kOverflowCount).
Byte Count values:
- Internally they are passed as a ULong64_t
- In legacy user code they are held in UInt_t variables (See type used in
TBufferFile::WriteVersion and TBufferFile::ReadVersion)
- The values passed by value are limited to 1GB (kMaxMapCount)
- The values larger than kMaxMapCount are held in fByteCounts.
*/

#include <cstring>
Expand Down Expand Up @@ -46,15 +61,18 @@ The concrete implementation of TBuffer for writing/reading to/from a ROOT file o
#include "Bswapcpy.h"
#endif

constexpr UInt_t kOverflowPosition = 0xFFFFFFFF;
constexpr UInt_t kOverflowCount = 0xFFFFFFFF;
constexpr UInt_t kMaxCountPosition = 0xFFFFFFFE; // We reserve the highest value to mark overflow positions

const UInt_t kNewClassTag = 0xFFFFFFFF;
const UInt_t kClassMask = 0x80000000; // OR the class index with this
const UInt_t kByteCountMask = 0x40000000; // OR the byte count with this
const UInt_t kMaxMapCount = 0x3FFFFFFE; // last valid fMapCount and byte count
const Version_t kByteCountVMask = 0x4000; // OR the version byte count with this
const Version_t kMaxVersion = 0x3FFF; // highest possible version number
const Int_t kMapOffset = 2; // first 2 map entries are taken by null obj and self obj

constexpr UInt_t kNewClassTag = 0xFFFFFFFF;
constexpr UInt_t kClassMask = 0x80000000; // OR the class index with this
constexpr UInt_t kByteCountMask = 0x40000000; // OR the byte count with this
constexpr UInt_t kMaxMapCount = 0x3FFFFFFE; // last valid fMapCount and byte count
constexpr UInt_t kMaxRecordByteCount = kMaxMapCount; // last valid byte count
constexpr Version_t kByteCountVMask = 0x4000; // OR the version byte count with this
constexpr Version_t kMaxVersion = 0x3FFF; // highest possible version number
constexpr Int_t kMapOffset = 2; // first 2 map entries are taken by null obj and self obj


////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -114,6 +132,16 @@ TBufferFile::~TBufferFile()
{
}

////////////////////////////////////////////////////////////////////////////////
/// Reset buffer maps and clear byte-count tracking structures.

void TBufferFile::ResetMap()
{
TBufferIO::ResetMap();
fByteCountStack.clear();
fByteCounts.clear();
}

////////////////////////////////////////////////////////////////////////////////
/// Increment level.

Expand Down Expand Up @@ -313,17 +341,37 @@ void TBufferFile::WriteCharStar(char *s)
}

////////////////////////////////////////////////////////////////////////////////
/// Set byte count at position cntpos in the buffer. Generate warning if
/// count larger than kMaxMapCount. The count is excluded its own size.
/// Set byte count at position cntpos in the buffer.
/// The count is excluding its own size.
/// \note If underflow or overflow, an Error ir raised (stricter checks in Debug mode)

void TBufferFile::SetByteCount(ULong64_t cntpos, Bool_t packInVersion)
{
assert( cntpos <= kMaxUInt && (sizeof(UInt_t) + cntpos) < static_cast<UInt_t>(fBufCur - fBuffer)
assert( cntpos <= kOverflowPosition && (sizeof(UInt_t) + cntpos) < static_cast<UInt_t>(fBufCur - fBuffer)
&& (fBufCur >= fBuffer)
&& static_cast<ULong64_t>(fBufCur - fBuffer) <= std::numeric_limits<UInt_t>::max()
&& "Byte count position is after the end of the buffer");
const UInt_t cnt = UInt_t(fBufCur - fBuffer) - UInt_t(cntpos) - sizeof(UInt_t);

// We can either make this unconditional or we could split the routine
// in two, one with a new signature and guarantee to get the 64bit position
// (which may be chunk number + local offset) and one with the old signature
// which uses the stack to get the position and call the new one.
// This (of course) also requires that we do the 'same' to the WriteVersion
// routines.
R__ASSERT(!fByteCountStack.empty());
if (cntpos == kOverflowPosition) {
// The position is above 4GB but was cached using a 32 bit variable.
cntpos = fByteCountStack.back();
}
fByteCountStack.pop_back();
// if we are not in the same TKey chunk or if the cntpos is too large to fit in UInt_t
// let's postpone the writing of the byte count
const ULong64_t full_cnt = ULong64_t(fBufCur - fBuffer) - cntpos - sizeof(UInt_t);
if (full_cnt >= kMaxRecordByteCount) {
fByteCounts[cntpos] = full_cnt;
return;
}
UInt_t cnt = static_cast<UInt_t>(full_cnt);
char *buf = (char *)(fBuffer + cntpos);

// if true, pack byte count in two consecutive shorts, so it can
Expand All @@ -344,9 +392,11 @@ void TBufferFile::SetByteCount(ULong64_t cntpos, Bool_t packInVersion)
} else
tobuf(buf, cnt | kByteCountMask);

if (cnt >= kMaxMapCount) {
Error("WriteByteCount", "bytecount too large (more than %d)", kMaxMapCount);
// exception
if (cnt >= kMaxRecordByteCount) {
// NOTE: This should now be unreachable.
Fatal("WriteByteCount",
"Control flow error, code should be unreachable and wrote a bytecount that is too large (more than %d)",
kMaxRecordByteCount);
}
}

Expand All @@ -360,10 +410,32 @@ void TBufferFile::SetByteCount(ULong64_t cntpos, Bool_t packInVersion)

Long64_t TBufferFile::CheckByteCount(ULong64_t startpos, ULong64_t bcnt, const TClass *clss, const char *classname)
{
if (!bcnt) return 0;
R__ASSERT(startpos <= kMaxUInt && bcnt <= kMaxUInt);
Long64_t offset = 0;
R__ASSERT(!fByteCountStack.empty() && startpos <= kOverflowPosition && bcnt <= kOverflowCount
&& "Byte count stack is empty or invalid startpos/bcnt");
if (startpos == kOverflowPosition) {
// The position is above 4GB but was cached using a 32 bit variable.
startpos = fByteCountStack.back();
}
if (bcnt == kOverflowCount) {
// in case we are checking a byte count for which we postponed
// the writing because it was too large, we retrieve it now
auto it = fByteCounts.find(startpos);
if (it != fByteCounts.end()) {
bcnt = it->second;
} else {
bcnt = 0;
Error("CheckByteCount",
"Could not find byte count for position %llu in the byte count map (size=%zu)",
startpos, fByteCounts.size());
}
}
fByteCountStack.pop_back();

if (!bcnt)
return 0;

Long64_t offset = 0;
// End position is buffer location + start position + byte count + size of byte count field
Longptr_t endpos = Longptr_t(fBuffer) + startpos + bcnt + sizeof(UInt_t);

if (Longptr_t(fBufCur) != endpos) {
Expand All @@ -390,7 +462,8 @@ Long64_t TBufferFile::CheckByteCount(ULong64_t startpos, ULong64_t bcnt, const T
if ( ((char *)endpos) > fBufMax ) {
offset = fBufMax-fBufCur;
Error("CheckByteCount",
"Byte count probably corrupted around buffer position %llu:\n\t%llu for a possible maximum of %lld",
"Byte count probably corrupted around buffer position %llu:\n\tByte count is %llu while the buffer size "
"is %lld",
startpos, bcnt, offset);
fBufCur = fBufMax;

Expand Down Expand Up @@ -2694,8 +2767,7 @@ void TBufferFile::WriteObjectClass(const void *actualObjectStart, const TClass *
}

// reserve space for leading byte count
UInt_t cntpos = UInt_t(fBufCur-fBuffer);
fBufCur += sizeof(UInt_t);
UInt_t cntpos = ReserveByteCount();

// write class of object first
Int_t mapsize = fMap->Capacity(); // The slot depends on the capacity and WriteClass might induce an increase.
Expand Down Expand Up @@ -2749,6 +2821,10 @@ TClass *TBufferFile::ReadClass(const TClass *clReq, UInt_t *objTag)
bcnt = 0;
} else {
fVersion = 1;
// When objTag is not used, the caller is not interested in the byte
// count and will not (can not) call CheckByteCount.
if (objTag)
fByteCountStack.push_back(fBufCur - fBuffer);
startpos = UInt_t(fBufCur-fBuffer);
*this >> tag;
}
Expand Down Expand Up @@ -2925,14 +3001,20 @@ void TBufferFile::SkipVersion(const TClass *cl)

////////////////////////////////////////////////////////////////////////////////
/// Read class version from I/O buffer.
/// If passing startpos and bcnt, CheckByteCount must be called later with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to set only one of them? If not, we can assert both are set or both are nullptr and simplify a bit the logic.

/// the same startpos and bcnt (this is needed to properly handle the byte
/// count stack used to support very large buffers)

Version_t TBufferFile::ReadVersion(UInt_t *startpos, UInt_t *bcnt, const TClass *cl)
{
Version_t version;

if (startpos) {
// before reading object save start position
*startpos = UInt_t(fBufCur-fBuffer);
auto full_startpos = fBufCur - fBuffer;
*startpos = full_startpos <= kMaxCountPosition ? UInt_t(full_startpos) : kOverflowPosition;
if (bcnt)
fByteCountStack.push_back(full_startpos);
}

// read byte count (older files don't have byte count)
Expand All @@ -2956,9 +3038,30 @@ Version_t TBufferFile::ReadVersion(UInt_t *startpos, UInt_t *bcnt, const TClass
fBufCur -= sizeof(UInt_t);
v.cnt = 0;
}
if (bcnt) *bcnt = (v.cnt & ~kByteCountMask);
if (bcnt) {
if (!v.cnt) {
// no byte count stored
*bcnt = 0;
if (startpos) // Undo the push_back only if it happened.
fByteCountStack.pop_back();
} else {
*bcnt = (v.cnt & ~kByteCountMask);
if (*bcnt == 0) {
// The byte count was stored but is zero, this means the data
// did not fit and thus we stored it in 'fByteCounts' instead.
// Mark this case by setting startpos to kOverflowCount.
*bcnt = kOverflowCount;
if (startpos)
*startpos = kOverflowPosition;
}
}
}
frombuf(this->fBufCur,&version);

// NOTE: The code above is not straightforward to refactor by a call
// to ReadVersionNoCheckSum because of the following code need the
// 'raw' value in `v`.

if (version<=1) {
if (version <= 0) {
if (cl) {
Expand Down Expand Up @@ -3031,14 +3134,26 @@ Version_t TBufferFile::ReadVersion(UInt_t *startpos, UInt_t *bcnt, const TClass
////////////////////////////////////////////////////////////////////////////////
/// Read class version from I/O buffer, when the caller knows for sure that
/// there is no checksum written/involved.
///
/// This routine can be used from custom streamers when it is known that the
/// class was always versioned (and thus never stored a checksum within the buffer).
/// This allows to disambiguate the case where the class used to have a version
/// number equal to zero and did not save a byte count and is now versioned.
///
/// When reading the version number being zero, without the byte count we have
/// no way to guess whether the class was version un-versioned or had previously
/// a version number equal to zero.

Version_t TBufferFile::ReadVersionNoCheckSum(UInt_t *startpos, UInt_t *bcnt)
{
Version_t version;

if (startpos) {
// before reading object save start position
*startpos = UInt_t(fBufCur-fBuffer);
auto full_startpos = fBufCur - fBuffer;
*startpos = full_startpos < kMaxCountPosition ? UInt_t(full_startpos) : kOverflowPosition;
if (bcnt)
fByteCountStack.push_back(full_startpos);
}

// read byte count (older files don't have byte count)
Expand All @@ -3062,7 +3177,24 @@ Version_t TBufferFile::ReadVersionNoCheckSum(UInt_t *startpos, UInt_t *bcnt)
fBufCur -= sizeof(UInt_t);
v.cnt = 0;
}
if (bcnt) *bcnt = (v.cnt & ~kByteCountMask);
if (bcnt) {
if (!v.cnt) {
// no byte count stored
*bcnt = 0;
if (startpos) // Undo the push_back only if it happened.
fByteCountStack.pop_back();
} else {
*bcnt = (v.cnt & ~kByteCountMask);
if (*bcnt == 0) {
// The byte count was stored but is zero, this means the data
// did not fit and thus we stored it in 'fByteCounts' instead.
// Mark this case by setting startpos to kOverflowCount.
*bcnt = kOverflowCount;
if (startpos)
*startpos = kOverflowPosition;
}
}
}
frombuf(this->fBufCur,&version);

return version;
Expand Down Expand Up @@ -3148,8 +3280,7 @@ UInt_t TBufferFile::WriteVersion(const TClass *cl, Bool_t useBcnt)
UInt_t cntpos = 0;
if (useBcnt) {
// reserve space for leading byte count
cntpos = UInt_t(fBufCur-fBuffer);
fBufCur += sizeof(UInt_t);
cntpos = ReserveByteCount();
}

Version_t version = cl->GetClassVersion();
Expand Down Expand Up @@ -3178,8 +3309,7 @@ UInt_t TBufferFile::WriteVersionMemberWise(const TClass *cl, Bool_t useBcnt)
UInt_t cntpos = 0;
if (useBcnt) {
// reserve space for leading byte count
cntpos = UInt_t(fBufCur-fBuffer);
fBufCur += sizeof(UInt_t);
cntpos = ReserveByteCount();
}

Version_t version = cl->GetClassVersion();
Expand Down Expand Up @@ -3242,8 +3372,12 @@ void TBufferFile::StreamObject(TObject *obj)

void TBufferFile::CheckCount(UInt_t offset)
{
if (IsWriting()) {
if (offset >= kMaxMapCount) {
// FIXME:
// We could continue to issue the error for cases where we don't want to
// support storing more than 1GB in a single TBufferFile.
const bool bufferLimitedToMaxMapCount = false;
if (bufferLimitedToMaxMapCount && IsWriting()) {
if (offset >= kMaxRecordByteCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (offset >= kMaxRecordByteCount) {
if (offset > kMaxRecordByteCount) {

Error("CheckCount", "buffer offset too large (larger than %d)", kMaxMapCount);
// exception
}
Expand Down Expand Up @@ -3327,6 +3461,21 @@ UInt_t TBufferFile::CheckObject(UInt_t offset, const TClass *cl, Bool_t readClas
return offset;
}

////////////////////////////////////////////////////////////////////////////////
/// Reserve space for a leading byte count and return the position where to
/// store the byte count value.
///
/// \return The position (cntpos) where the byte count should be stored later,
/// or kOverflowPosition if the position exceeds kMaxCountPosition

UInt_t TBufferFile::ReserveByteCount()
{
// reserve space for leading byte count
auto full_cntpos = fBufCur - fBuffer;
fByteCountStack.push_back(full_cntpos);
*this << (UInt_t)kByteCountMask; // placeholder for byte count
return full_cntpos < kMaxCountPosition ? full_cntpos : kOverflowPosition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return full_cntpos < kMaxCountPosition ? full_cntpos : kOverflowPosition;
return full_cntpos <= kMaxCountPosition ? full_cntpos : kOverflowPosition;

}

////////////////////////////////////////////////////////////////////////////////
/// Read max bytes from the I/O buffer into buf. The function returns
Expand Down
4 changes: 4 additions & 0 deletions io/io/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,7 @@ endif()
# ROOT_EXECUTABLE(TMapFileTest TMapFileTest.cxx LIBRARIES RIO Hist New)
# ROOT_ADD_TEST(io-io-test-TMapFileTest COMMAND TMapFileTest complete)
#endif()

ROOTTEST_ADD_TEST(testLargeByteCounts
MACRO testByteCount.C
OUTREF testByteCount.ref)
Loading
Loading