Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1280:[C++] Implement block-based buffer(Part I) #1271

Merged
merged 12 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions c++/src/BlockBuffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "BlockBuffer.hh"

#include <algorithm>

namespace orc {

BlockBuffer::BlockBuffer(MemoryPool& pool, uint64_t _blockSize)
: memoryPool(pool),
currentSize(0),
currentCapacity(0),
blockSize(_blockSize) {
if (blockSize == 0) {
throw std::logic_error("Block size cannot be zero");
}
reserve(blockSize);
}

BlockBuffer::~BlockBuffer() {
for (size_t i = 0; i < blocks.size(); ++i) {
memoryPool.free(blocks[i]);
}
blocks.clear();
currentSize = currentCapacity = 0;
}

uint64_t BlockBuffer::getBlockNumber() const {
return (currentSize + blockSize - 1) / blockSize;
}

Block BlockBuffer::getBlock(uint64_t blockIndex) {
if (blockIndex >= getBlockNumber()) {
throw std::out_of_range("Block index out of range");
}
return Block(blocks[blockIndex],
std::min(currentSize - blockIndex * blockSize, blockSize));
}

Block BlockBuffer::getEmptyBlock() {
if (currentSize < currentCapacity) {
Block emptyBlock(blocks[currentSize / blockSize] + currentSize % blockSize,
blockSize - currentSize % blockSize);
currentSize = (currentSize / blockSize + 1) * blockSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we provide a function to update currentSize to reflect the actual size written? Maybe something setSize() or backup(). This looks weird when the returned block is written partially meaning that currentSize is larger than used

To provide better usability, we may provide a function like void append(const char data, size_t size)* to append data to the buffer and manage the blocks internally. This can be a separate patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user only uses part of the block, the size of BlockBuffer can currently be set via the resize() function. BlockBuffer class is temporarily used to replace DataBuffer in BufferedOutputStream, and the existing functions conform to BufferedOutputStream's usage behavior.

The append function can be added additionally later if there are usage scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the flush() function of class BufferedOutputStream needs to access all allocated memory blocks, it needs class BlockBuffer to provide an interface in order for BufferedOutputStream to access all allocated memory blocks.

return emptyBlock;
} else {
resize(currentSize + blockSize);
return Block(blocks.back(), blockSize);
}
}

void BlockBuffer::reserve(uint64_t capacity) {
while (currentCapacity < capacity) {
char* newBlockPtr = memoryPool.malloc(blockSize);
if (newBlockPtr != nullptr) {
blocks.push_back(newBlockPtr);
currentCapacity += blockSize;
} else {
break;
}
}
}

void BlockBuffer::resize(uint64_t size) {
reserve(size);
if (currentCapacity >= size) {
currentSize = size;
} else {
throw std::logic_error("Block buffer resize error");
}
}
} // namespace orc
90 changes: 90 additions & 0 deletions c++/src/BlockBuffer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ORC_MEMORYPOOL_IMPL_HH
#define ORC_MEMORYPOOL_IMPL_HH

#include "orc/MemoryPool.hh"

#include <vector>

namespace orc {

struct Block {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to make it nested class of BlockBuffer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands struct Block only serves class BlockBuffer, so it makes reasonable to treat it as a nested class of class BlockBuffer.

char* data;
uint64_t size;

Block() : data(nullptr), size(0) {}
Block(char* _data, uint64_t _size) : data(_data), size(_size) {}
Block(const Block& block) = default;
~Block() {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

~Block() = default;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

};

class BlockBuffer {
private:
MemoryPool& memoryPool;
// current buffer size
uint64_t currentSize;
// maximal capacity (actual allocated memory)
uint64_t currentCapacity;
// unit for buffer expansion
const uint64_t blockSize;
// pointers to the start of each block
std::vector<char*> blocks;

// non-copy-constructible
BlockBuffer(BlockBuffer& buffer) = delete;
BlockBuffer& operator=(BlockBuffer& buffer) = delete;
BlockBuffer(BlockBuffer&& buffer) = delete;
BlockBuffer& operator=(BlockBuffer&& buffer) = delete;

public:
BlockBuffer(MemoryPool& pool, uint64_t blockSize);

~BlockBuffer();

/**
* Get the Block object
*/
Block getBlock(uint64_t blockIndex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some detail explanation with regard to the mechanism and behavior of the BlockBuffer. Especially concepts like block, block index and block number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some description for block, blockbuffer, block index and block number.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we provide mutable and immutable overloads?

Copy link
Contributor Author

@coderex2522 coderex2522 Oct 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the getBlock function does not modify class member variables, the const attribute is added to the function.


/**
* Get a empty block or allocate a new block if the buffer is exhausted
*/
Block getEmptyBlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to getNextBlock or getBlockToWrite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Rename to getNextBlock.


/**
* Get the Block number
*/
uint64_t getBlockNumber() const;

uint64_t size() const {
return currentSize;
}

uint64_t capacity() const {
return currentCapacity;
}

void resize(uint64_t size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support shrinkToFit parameter?

Copy link
Contributor Author

@coderex2522 coderex2522 Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class DataBuffer only recycles memory during destructor, so class BlockBuffer does not support shrink function, which can be further optimized later.

void reserve(uint64_t capacity);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert a new line before reserve function. Better to explain what will happen in the reserve function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments to explain the reserve function.

};
} // namespace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// namespace orc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


#endif
1 change: 1 addition & 0 deletions c++/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ set(SOURCE_FILES
sargs/TruthValue.cc
wrap/orc-proto-wrapper.cc
Adaptor.cc
BlockBuffer.cc
BloomFilter.cc
ByteRLE.cc
ColumnPrinter.cc
Expand Down
1 change: 1 addition & 0 deletions c++/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ add_executable (orc-test
MemoryInputStream.cc
MemoryOutputStream.cc
TestAttributes.cc
TestBlockBuffer.cc
TestBufferedOutputStream.cc
TestBloomFilter.cc
TestByteRle.cc
Expand Down
82 changes: 82 additions & 0 deletions c++/test/TestBlockBuffer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "BlockBuffer.hh"
#include "orc/OrcFile.hh"
#include "wrap/gtest-wrapper.h"

namespace orc {

TEST(TestBlockBuffer, size_and_capacity) {
MemoryPool* pool = getDefaultPool();
BlockBuffer buffer(*pool, 1024);

// block buffer will preallocate one block during initialization
EXPECT_EQ(buffer.getBlockNumber(), 0);
EXPECT_EQ(buffer.size(), 0);
EXPECT_EQ(buffer.capacity(), 1024);

buffer.reserve(128 * 1024);
EXPECT_EQ(buffer.getBlockNumber(), 0);
EXPECT_EQ(buffer.size(), 0);
EXPECT_EQ(buffer.capacity(), 128 * 1024);

// new size < old capacity
buffer.resize(64 * 1024);
EXPECT_EQ(buffer.getBlockNumber(), 64);
EXPECT_EQ(buffer.size(), 64 * 1024);
EXPECT_EQ(buffer.capacity(), 128 * 1024);

// new size > old capacity
buffer.resize(256 * 1024);
EXPECT_EQ(buffer.getBlockNumber(), 256);
EXPECT_EQ(buffer.size(), 256 * 1024);
EXPECT_EQ(buffer.capacity(), 256 * 1024);
}

TEST(TestBlockBuffer, get_block) {
MemoryPool* pool = getDefaultPool();
BlockBuffer buffer(*pool, 1024);

EXPECT_EQ(buffer.getBlockNumber(), 0);
for (uint64_t i = 0; i < 10; ++i) {
Block block = buffer.getEmptyBlock();
EXPECT_EQ(buffer.getBlockNumber(), i + 1);
for (uint64_t j = 0; j < block.size; ++j) {
if (i % 2 == 0) {
block.data[j] = static_cast<char>('A' + (i + j) % 26);
} else {
block.data[j] = static_cast<char>('a' + (i + j) % 26);
}
}
}

// verify the block data
for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) {
Block block = buffer.getBlock(i);
for (uint64_t j = 0; j < block.size; ++j) {
if (i % 2 == 0) {
EXPECT_EQ(block.data[j], 'A' + (i + j) % 26);
} else {
EXPECT_EQ(block.data[j], 'a' + (i + j) % 26);
}
}
}
}
}