Skip to content

Commit

Permalink
ORC-1286: [C++] replace DataBuffer with BlockBuffer in class Buffered…
Browse files Browse the repository at this point in the history
…OutputStream

### What changes were proposed in this pull request?
This PR can solve the huge memory taken by BufferedOutputStream and refactor the write data logic in class CompressionBase.

### Why are the changes needed?
This patch use BlockBuffer to replace DataBuffer  of class BufferedOutputStream in order to solve the [issue](#1240).

### How was this patch tested?
The UTs in TestBufferedOutputStream.cc and TestCompression.cc can cover this patch. Add the TestBlockBuffer.write_to UT.

Closes #1275 from coderex2522/ORC-1280-PART2.

Authored-by: coderex2522 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
coderex2522 authored and dongjoon-hyun committed Oct 26, 2022
1 parent b887371 commit 4da1acb
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 65 deletions.
53 changes: 53 additions & 0 deletions c++/src/BlockBuffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/

#include "BlockBuffer.hh"
#include "orc/Writer.hh"
#include "orc/OrcFile.hh"

#include <algorithm>

Expand Down Expand Up @@ -82,4 +84,55 @@ namespace orc {
}
}
}

void BlockBuffer::writeTo(OutputStream* output,
WriterMetrics* metrics) {
if (currentSize == 0) {
return;
}
static uint64_t MAX_CHUNK_SIZE = 1024 * 1024 * 1024;
uint64_t chunkSize = std::min(output->getNaturalWriteSize(), MAX_CHUNK_SIZE);
if (chunkSize == 0) {
throw std::logic_error("Natural write size cannot be zero");
}
uint64_t ioCount = 0;
uint64_t blockNumber = getBlockNumber();
// if only exists one block, currentSize is equal to first block size
if (blockNumber == 1 && currentSize <= chunkSize) {
Block block = getBlock(0);
output->write(block.data, block.size);
++ioCount;
} else {
char* chunk = memoryPool.malloc(chunkSize);
uint64_t chunkOffset = 0;
for (uint64_t i = 0; i < blockNumber; ++i) {
Block block = getBlock(i);
uint64_t blockOffset = 0;
while (blockOffset < block.size) {
// copy current block into chunk
uint64_t copySize =
std::min(chunkSize - chunkOffset, block.size - blockOffset);
memcpy(chunk + chunkOffset, block.data + blockOffset, copySize);
chunkOffset += copySize;
blockOffset += copySize;

// chunk is full
if (chunkOffset >= chunkSize) {
output->write(chunk, chunkSize);
chunkOffset = 0;
++ioCount;
}
}
}
if (chunkOffset != 0) {
output->write(chunk, chunkOffset);
++ioCount;
}
memoryPool.free(chunk);
}

if (metrics != nullptr) {
metrics->IOCount.fetch_add(ioCount);
}
}
} // namespace orc
9 changes: 9 additions & 0 deletions c++/src/BlockBuffer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

namespace orc {

class OutputStream;
struct WriterMetrics;
/**
* BlockBuffer implements a memory allocation policy based on
* equal-length blocks. BlockBuffer will reserve multiple blocks
Expand Down Expand Up @@ -110,6 +112,13 @@ namespace orc {
* @param newCapacity new capacity of BlockBuffer
*/
void reserve(uint64_t newCapacity);
/**
* Write the BlockBuffer content into OutputStream
* @param output the output stream to write to
* @param metrics the metrics of the writer
*/
void writeTo(OutputStream* output,
WriterMetrics* metrics);
};
} // namespace orc

Expand Down
114 changes: 62 additions & 52 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "lz4.h"

#include <algorithm>
#include <array>
#include <iomanip>
#include <iostream>
#include <sstream>
Expand Down Expand Up @@ -68,10 +69,12 @@ namespace orc {
virtual uint64_t getSize() const override;

protected:
void writeHeader(char * buffer, size_t compressedSize, bool original) {
buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
buffer[1] = static_cast<char>(compressedSize >> 7);
buffer[2] = static_cast<char>(compressedSize >> 15);
void writeData(const unsigned char* data, int size);

void writeHeader(size_t compressedSize, bool original) {
*header[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
*header[1] = static_cast<char>(compressedSize >> 7);
*header[2] = static_cast<char>(compressedSize >> 15);
}

// ensure enough room for compression block header
Expand All @@ -94,6 +97,10 @@ namespace orc {

// Compress output buffer size
int outputSize;

// Compression block header pointer array
static const uint32_t HEADER_SIZE = 3;
std::array<char*, HEADER_SIZE> header;
};

CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
Expand All @@ -113,7 +120,8 @@ namespace orc {
bufferSize(0),
outputPosition(0),
outputSize(0) {
// PASS
// init header pointer array
header.fill(nullptr);
}

void CompressionStreamBase::BackUp(int count) {
Expand Down Expand Up @@ -145,19 +153,46 @@ namespace orc {
static_cast<uint64_t>(outputSize - outputPosition);
}

// write the data content into outputBuffer
void CompressionStreamBase::writeData(const unsigned char* data, int size) {
int offset = 0;
while (offset < size) {
if (outputPosition == outputSize) {
if (!BufferedOutputStream::Next(
reinterpret_cast<void **>(&outputBuffer),
&outputSize)) {
throw std::runtime_error(
"Failed to get next output buffer from output stream.");
}
outputPosition = 0;
} else if (outputPosition > outputSize) {
// for safety this will unlikely happen
throw std::logic_error(
"Write to an out-of-bound place during compression!");
}
int currentSize = std::min(outputSize - outputPosition, size - offset);
memcpy(outputBuffer + outputPosition,
data + offset,
static_cast<size_t>(currentSize));
offset += currentSize;
outputPosition += currentSize;
}
}

void CompressionStreamBase::ensureHeader() {
// adjust 3 bytes for the compression header
if (outputPosition + 3 >= outputSize) {
int newPosition = outputPosition + 3 - outputSize;
if (!BufferedOutputStream::Next(
reinterpret_cast<void **>(&outputBuffer),
&outputSize)) {
for (uint32_t i = 0; i < HEADER_SIZE; ++i) {
if (outputPosition >= outputSize) {
if (!BufferedOutputStream::Next(
reinterpret_cast<void **>(&outputBuffer),
&outputSize)) {
throw std::runtime_error(
"Failed to get next output buffer from output stream.");
}
outputPosition = 0;
}
outputPosition = newPosition;
} else {
outputPosition += 3;
header[i] = outputBuffer + outputPosition;
++outputPosition;
}
}

Expand Down Expand Up @@ -200,22 +235,20 @@ namespace orc {
if (bufferSize != 0) {
ensureHeader();

uint64_t preSize = getSize();
uint64_t totalCompressedSize = doStreamingCompression();

char * header = outputBuffer + outputPosition - totalCompressedSize - 3;
if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
writeHeader(header, static_cast<size_t>(bufferSize), true);
memcpy(
header + 3,
rawInputBuffer.data(),
static_cast<size_t>(bufferSize));

int backup = static_cast<int>(totalCompressedSize) - bufferSize;
BufferedOutputStream::BackUp(backup);
outputPosition -= backup;
outputSize -= backup;
writeHeader(static_cast<size_t>(bufferSize), true);
// reset output buffer
outputBuffer = nullptr;
outputPosition = outputSize = 0;
uint64_t backup = getSize() - preSize;
BufferedOutputStream::BackUp(static_cast<int>(backup));

// copy raw input buffer into block buffer
writeData(rawInputBuffer.data(), bufferSize);
} else {
writeHeader(header, totalCompressedSize, false);
writeHeader(totalCompressedSize, false);
}
}

Expand Down Expand Up @@ -987,41 +1020,18 @@ DIAGNOSTIC_POP

const unsigned char * dataToWrite = nullptr;
int totalSizeToWrite = 0;
char * header = outputBuffer + outputPosition - 3;

if (totalCompressedSize >= static_cast<size_t>(bufferSize)) {
writeHeader(header, static_cast<size_t>(bufferSize), true);
writeHeader(static_cast<size_t>(bufferSize), true);
dataToWrite = rawInputBuffer.data();
totalSizeToWrite = bufferSize;
} else {
writeHeader(header, totalCompressedSize, false);
writeHeader(totalCompressedSize, false);
dataToWrite = compressorBuffer.data();
totalSizeToWrite = static_cast<int>(totalCompressedSize);
}

char * dst = header + 3;
while (totalSizeToWrite > 0) {
if (outputPosition == outputSize) {
if (!BufferedOutputStream::Next(reinterpret_cast<void **>(&outputBuffer),
&outputSize)) {
throw std::logic_error(
"Failed to get next output buffer from output stream.");
}
outputPosition = 0;
dst = outputBuffer;
} else if (outputPosition > outputSize) {
// this will unlikely happen, but we have seen a few on zstd v1.1.0
throw std::logic_error("Write to an out-of-bound place!");
}

int sizeToWrite = std::min(totalSizeToWrite, outputSize - outputPosition);
std::memcpy(dst, dataToWrite, static_cast<size_t>(sizeToWrite));

outputPosition += sizeToWrite;
dataToWrite += sizeToWrite;
totalSizeToWrite -= sizeToWrite;
dst += sizeToWrite;
}
writeData(dataToWrite, totalSizeToWrite);
}

*data = rawInputBuffer.data();
Expand Down
22 changes: 10 additions & 12 deletions c++/src/io/OutputStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace orc {
: outputStream(outStream),
blockSize(blockSize_),
metrics(metrics_) {
dataBuffer.reset(new DataBuffer<char>(pool));
dataBuffer.reset(new BlockBuffer(pool, blockSize));
dataBuffer->reserve(capacity_);
}

Expand All @@ -46,16 +46,12 @@ namespace orc {
}

bool BufferedOutputStream::Next(void** buffer, int* size) {
*size = static_cast<int>(blockSize);
uint64_t oldSize = dataBuffer->size();
uint64_t newSize = oldSize + blockSize;
uint64_t newCapacity = dataBuffer->capacity();
while (newCapacity < newSize) {
newCapacity += dataBuffer->capacity();
auto block = dataBuffer->getNextBlock();
if (block.data == nullptr) {
throw std::logic_error("Failed to get next buffer from block buffer.");
}
dataBuffer->reserve(newCapacity);
dataBuffer->resize(newSize);
*buffer = dataBuffer->data() + oldSize;
*buffer = block.data;
*size = static_cast<int>(block.size);
return true;
}

Expand Down Expand Up @@ -95,9 +91,11 @@ namespace orc {

uint64_t BufferedOutputStream::flush() {
uint64_t dataSize = dataBuffer->size();
// flush data buffer into outputStream
if (dataSize > 0)
{
SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
outputStream->write(dataBuffer->data(), dataSize);
SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
dataBuffer->writeTo(outputStream, metrics);
}
dataBuffer->resize(0);
return dataSize;
Expand Down
3 changes: 2 additions & 1 deletion c++/src/io/OutputStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define ORC_OUTPUTSTREAM_HH

#include "Adaptor.hh"
#include "BlockBuffer.hh"
#include "orc/OrcFile.hh"
#include "wrap/zero-copy-stream-wrapper.h"

Expand Down Expand Up @@ -49,7 +50,7 @@ DIAGNOSTIC_PUSH
class BufferedOutputStream: public google::protobuf::io::ZeroCopyOutputStream {
private:
OutputStream * outputStream;
std::unique_ptr<DataBuffer<char> > dataBuffer;
std::unique_ptr<BlockBuffer> dataBuffer;
uint64_t blockSize;
WriterMetrics* metrics;

Expand Down
1 change: 1 addition & 0 deletions c++/test/MemoryOutputStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace orc {
MemoryOutputStream(ssize_t capacity) : name("MemoryOutputStream") {
data = new char[capacity];
length = 0;
naturalWriteSize = 2048;
}

virtual ~MemoryOutputStream() override;
Expand Down
40 changes: 40 additions & 0 deletions c++/test/TestBlockBuffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/

#include "BlockBuffer.hh"
#include "MemoryOutputStream.hh"
#include "orc/OrcFile.hh"
#include "wrap/gtest-wrapper.h"

namespace orc {
const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M

TEST(TestBlockBuffer, size_and_capacity) {
MemoryPool* pool = getDefaultPool();
Expand Down Expand Up @@ -78,4 +80,42 @@ namespace orc {
}
}
}

void writeToOutputStream(uint64_t blockSize) {
MemoryOutputStream outputStream(DEFAULT_MEM_STREAM_SIZE);
MemoryPool* pool = getDefaultPool();
BlockBuffer buffer(*pool, blockSize);
uint64_t totalBufferSize = 10240;
while (buffer.size() < totalBufferSize) {
BlockBuffer::Block block = buffer.getNextBlock();
uint64_t blockNumber = buffer.getBlockNumber();
for (uint64_t j = 0; j < block.size; ++j) {
if (blockNumber % 2 == 0) {
block.data[j] = static_cast<char>('A' + (blockNumber + j) % 26);
} else {
block.data[j] = static_cast<char>('a' + (blockNumber + j) % 26);
}
}
}
buffer.resize(totalBufferSize);
// flush data buffer into output stream
buffer.writeTo(&outputStream, nullptr);
// verify data buffer
uint64_t dataIndex = 0;
for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) {
BlockBuffer::Block block = buffer.getBlock(i);
for (uint64_t j = 0; j < block.size; ++j) {
EXPECT_EQ(outputStream.getData()[dataIndex++], block.data[j]);
}
}
}

TEST(TestBlockBuffer, write_to) {
// test block size < natural write size
writeToOutputStream(1024);
// test block size = natural write size
writeToOutputStream(2048);
// test block size > natural write size
writeToOutputStream(4096);
}
}

0 comments on commit 4da1acb

Please sign in to comment.