diff --git a/clickhouse/CMakeLists.txt b/clickhouse/CMakeLists.txt index 67663ec5..b74ff1e1 100644 --- a/clickhouse/CMakeLists.txt +++ b/clickhouse/CMakeLists.txt @@ -20,6 +20,7 @@ SET ( clickhouse-cpp-lib-src columns/nullable.cpp columns/numeric.cpp columns/map.cpp + columns/serialization.cpp columns/string.cpp columns/tuple.cpp columns/uuid.cpp @@ -120,6 +121,7 @@ INSTALL(FILES columns/lowcardinality.h DESTINATION include/clickhouse/columns/) INSTALL(FILES columns/nullable.h DESTINATION include/clickhouse/columns/) INSTALL(FILES columns/numeric.h DESTINATION include/clickhouse/columns/) INSTALL(FILES columns/map.h DESTINATION include/clickhouse/columns/) +INSTALL(FILES columns/serialization.h DESTINATION include/clickhouse/columns/) INSTALL(FILES columns/string.h DESTINATION include/clickhouse/columns/) INSTALL(FILES columns/tuple.h DESTINATION include/clickhouse/columns/) INSTALL(FILES columns/utils.h DESTINATION include/clickhouse/columns/) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index e4b0c7ef..945d4a36 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -42,8 +42,11 @@ #define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54448 #define DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME 54449 #define DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS 54451 +// #define DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING 54452 +#define DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS 54453 +#define DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION 54454 -#define REVISION DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS +#define REVISION DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION namespace clickhouse { @@ -552,7 +555,19 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) { return false; } + uint8_t has_custom_serialization = 0; + if (REVISION >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) { + if (!WireFormat::ReadFixed(input, &has_custom_serialization)) { + return false; + } + } + if (ColumnRef col = CreateColumnByType(type, create_column_settings)) { + + if (has_custom_serialization) { + col->LoadSerializationKind(&input); + } + if (num_rows && !col->Load(&input, num_rows)) { throw ProtocolError("can't load column '" + name + "' of type " + type); } @@ -708,6 +723,16 @@ void Client::Impl::SendQuery(const Query& query) { throw UnimplementedError(std::string("Can't send open telemetry tracing context to a server, server version is too old")); } } + + if (server_info_.revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS) + { + // collaborate_with_initiator + WireFormat::WriteUInt64 (*output_, 0u); + // count_participating_replicas + WireFormat::WriteUInt64 (*output_, 0u); + // number_of_current_replica + WireFormat::WriteUInt64 (*output_, 0u); + } } /// Per query settings @@ -757,6 +782,17 @@ void Client::Impl::WriteBlock(const Block& block, OutputStream& output) { WireFormat::WriteString(output, bi.Name()); WireFormat::WriteString(output, bi.Type()->GetName()); + bool has_custom = bi.Column()->HasCustomSerialization(); + if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) { + WireFormat::WriteFixed(output, static_cast(has_custom)); + if (has_custom) { + bi.Column()->SaveSerializationKind(&output); + } + } else { + // Current implementation works only for server version >= v22.1.2.2-stable + throw UnimplementedError(std::string("Can't send column with custom serialisation to a server, server version is too old")); + } + // Empty columns are not serialized and occupy exactly 0 bytes. // ref https://github.com/ClickHouse/ClickHouse/blob/39b37a3240f74f4871c8c1679910e065af6bea19/src/Formats/NativeWriter.cpp#L163 const bool containsData = block.GetRowCount() > 0; diff --git a/clickhouse/columns/array.cpp b/clickhouse/columns/array.cpp index 9f66b91f..9b8c717c 100644 --- a/clickhouse/columns/array.cpp +++ b/clickhouse/columns/array.cpp @@ -11,14 +11,14 @@ ColumnArray::ColumnArray(ColumnRef data) } ColumnArray::ColumnArray(ColumnRef data, std::shared_ptr offsets) - : Column(Type::CreateArray(data->Type())) + : Column(Type::CreateArray(data->Type()), Serialization::MakeDefault(this)) , data_(data) , offsets_(offsets) { } ColumnArray::ColumnArray(ColumnArray&& other) - : Column(other.Type()) + : Column(other.Type(), Serialization::MakeDefault(this)) , data_(std::move(other.data_)) , offsets_(std::move(other.offsets_)) { @@ -73,35 +73,36 @@ bool ColumnArray::LoadPrefix(InputStream* input, size_t rows) { if (!rows) { return true; } - - return data_->LoadPrefix(input, rows); + return data_->GetSerialization()->LoadPrefix(data_.get(), input, rows); } bool ColumnArray::LoadBody(InputStream* input, size_t rows) { if (!rows) { return true; } - if (!offsets_->LoadBody(input, rows)) { + if (!offsets_->GetSerialization()->LoadBody(offsets_.get(), input, rows)) { return false; } + const auto nested_rows = (*offsets_)[rows - 1]; if (nested_rows == 0) { return true; } - if (!data_->LoadBody(input, nested_rows)) { + if (!data_->GetSerialization()->LoadBody(data_.get(), input, nested_rows)) { return false; } return true; } void ColumnArray::SavePrefix(OutputStream* output) { - data_->SavePrefix(output); + data_->GetSerialization()->SavePrefix(data_.get(), output); } void ColumnArray::SaveBody(OutputStream* output) { - offsets_->SaveBody(output); + offsets_->GetSerialization()->SaveBody(offsets_.get(), output); + if (data_->Size() > 0) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } } @@ -120,6 +121,18 @@ void ColumnArray::Swap(Column& other) { offsets_.swap(col.offsets_); } +void ColumnArray::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + void ColumnArray::OffsetsIncrease(size_t n) { offsets_->Append(n); } diff --git a/clickhouse/columns/array.h b/clickhouse/columns/array.h index 0ea33d5a..544a9117 100644 --- a/clickhouse/columns/array.h +++ b/clickhouse/columns/array.h @@ -50,18 +50,6 @@ class ColumnArray : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column prefix from input stream. - bool LoadPrefix(InputStream* input, size_t rows) override; - - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column prefix to output stream. - void SavePrefix(OutputStream* output) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -73,6 +61,8 @@ class ColumnArray : public Column { ColumnRef CloneEmpty() const override; void Swap(Column&) override; + void SetSerializationKind(Serialization::Kind kind) override; + void OffsetsIncrease(size_t); protected: @@ -87,6 +77,20 @@ class ColumnArray : public Column { void Reset(); private: + /// Loads column prefix from input stream. + bool LoadPrefix(InputStream* input, size_t rows); + + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column prefix to output stream. + void SavePrefix(OutputStream* output); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + ColumnRef data_; std::shared_ptr offsets_; }; diff --git a/clickhouse/columns/column.cpp b/clickhouse/columns/column.cpp index 7f881d7c..b688e6ef 100644 --- a/clickhouse/columns/column.cpp +++ b/clickhouse/columns/column.cpp @@ -1,24 +1,44 @@ #include "column.h" +#include "../base/wire_format.h" + namespace clickhouse { -bool Column::LoadPrefix(InputStream*, size_t) { - /// does nothing by default +bool Column::Load(InputStream* input, size_t rows) { + assert(serialization_); + return serialization_->LoadPrefix(this, input, rows) + && serialization_->LoadBody(this, input, rows); +} + +/// Saves column data to output stream. +void Column::Save(OutputStream* output) { + assert(serialization_); + serialization_->SavePrefix(this, output); + serialization_->SaveBody(this,output); +} + +bool Column::LoadSerializationKind(InputStream* input) { + uint8_t kind; + if (!WireFormat::ReadFixed(*input, &kind)) { + return false; + } + SetSerializationKind(static_cast(kind)); return true; } -bool Column::Load(InputStream* input, size_t rows) { - return LoadPrefix(input, rows) && LoadBody(input, rows); +void Column::SaveSerializationKind(OutputStream* output) { + assert(serialization_); + WireFormat::WriteFixed(*output, static_cast(serialization_->GetKind())); } -void Column::SavePrefix(OutputStream*) { - /// does nothing by default +SerializationRef Column::GetSerialization() { + assert(serialization_); + return serialization_; } -/// Saves column data to output stream. -void Column::Save(OutputStream* output) { - SavePrefix(output); - SaveBody(output); +bool Column::HasCustomSerialization() const { + assert(serialization_); + return serialization_->GetKind() != Serialization::Kind::DEFAULT; } } diff --git a/clickhouse/columns/column.h b/clickhouse/columns/column.h index b54cbdee..51994195 100644 --- a/clickhouse/columns/column.h +++ b/clickhouse/columns/column.h @@ -2,6 +2,7 @@ #include "../types/types.h" #include "../columns/itemview.h" +#include "../columns/serialization.h" #include "../exceptions.h" #include @@ -19,7 +20,11 @@ using ColumnRef = std::shared_ptr; */ class Column : public std::enable_shared_from_this { public: - explicit inline Column(TypeRef type) : type_(type) {} + explicit inline Column(TypeRef type, SerializationRef serialization) + : type_(std::move(type)) + , serialization_(std::move(serialization)) + { + } virtual ~Column() {} @@ -56,18 +61,6 @@ class Column : public std::enable_shared_from_this { /// Should be called only once from the client. Derived classes should not call it. bool Load(InputStream* input, size_t rows); - /// Loads column prefix from input stream. - virtual bool LoadPrefix(InputStream* input, size_t rows); - - /// Loads column data from input stream. - virtual bool LoadBody(InputStream* input, size_t rows) = 0; - - /// Saves column prefix to output stream. Column types with prefixes must implement it. - virtual void SavePrefix(OutputStream* output); - - /// Saves column body to output stream. - virtual void SaveBody(OutputStream* output) = 0; - /// Template method to save to output stream. It'll call SavePrefix and SaveBody respectively /// Should be called only once from the client. Derived classes should not call it. /// Save is split in Prefix and Body because some data types require prefixes and specific serialization order. @@ -93,12 +86,23 @@ class Column : public std::enable_shared_from_this { throw UnimplementedError("GetItem() is not supported for column of " + type_->GetName()); } + virtual bool LoadSerializationKind(InputStream* input); + + virtual void SaveSerializationKind(OutputStream* output); + + virtual void SetSerializationKind(Serialization::Kind kind) = 0; + + SerializationRef GetSerialization(); + + virtual bool HasCustomSerialization() const; + friend void swap(Column& left, Column& right) { left.Swap(right); } protected: TypeRef type_; + SerializationRef serialization_; }; } // namespace clickhouse diff --git a/clickhouse/columns/date.cpp b/clickhouse/columns/date.cpp index 1ef67c44..786ab13f 100644 --- a/clickhouse/columns/date.cpp +++ b/clickhouse/columns/date.cpp @@ -3,7 +3,7 @@ namespace clickhouse { ColumnDate::ColumnDate() - : Column(Type::CreateDate()) + : Column(Type::CreateDate(), Serialization::MakeDefault(this)) , data_(std::make_shared()) { } @@ -28,11 +28,11 @@ void ColumnDate::Append(ColumnRef column) { } bool ColumnDate::LoadBody(InputStream* input, size_t rows) { - return data_->LoadBody(input, rows); + return data_->GetSerialization()->LoadBody(data_.get(), input, rows); } void ColumnDate::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } size_t ColumnDate::Size() const { @@ -61,10 +61,23 @@ ItemView ColumnDate::GetItem(size_t index) const { return ItemView(Type::Date, data_->GetItem(index)); } - +void ColumnDate::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, static_cast(0)); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} ColumnDate32::ColumnDate32() - : Column(Type::CreateDate32()) + : Column(Type::CreateDate32(), Serialization::MakeDefault(this)) , data_(std::make_shared()) { } @@ -89,11 +102,11 @@ void ColumnDate32::Append(ColumnRef column) { } bool ColumnDate32::LoadBody(InputStream* input, size_t rows) { - return data_->LoadBody(input, rows); + return data_->GetSerialization()->LoadBody(data_.get(), input, rows); } void ColumnDate32::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } size_t ColumnDate32::Size() const { @@ -122,15 +135,29 @@ ItemView ColumnDate32::GetItem(size_t index) const { return ItemView{Type()->GetCode(), data_->GetItem(index)}; } +void ColumnDate32::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, static_cast(0)); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} ColumnDateTime::ColumnDateTime() - : Column(Type::CreateDateTime()) + : Column(Type::CreateDateTime(), Serialization::MakeDefault(this)) , data_(std::make_shared()) { } ColumnDateTime::ColumnDateTime(std::string timezone) - : Column(Type::CreateDateTime(std::move(timezone))) + : Column(Type::CreateDateTime(std::move(timezone)), Serialization::MakeDefault(this)) , data_(std::make_shared()) { } @@ -154,11 +181,11 @@ void ColumnDateTime::Append(ColumnRef column) { } bool ColumnDateTime::LoadBody(InputStream* input, size_t rows) { - return data_->LoadBody(input, rows); + return data_->GetSerialization()->LoadBody(data_.get(), input, rows); } void ColumnDateTime::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } size_t ColumnDateTime::Size() const { @@ -191,6 +218,21 @@ ItemView ColumnDateTime::GetItem(size_t index) const { return ItemView(Type::DateTime, data_->GetItem(index)); } +void ColumnDateTime::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, static_cast(0)); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + ColumnDateTime64::ColumnDateTime64(size_t precision) : ColumnDateTime64(Type::CreateDateTime64(precision), std::make_shared(18ul, precision)) {} @@ -200,7 +242,7 @@ ColumnDateTime64::ColumnDateTime64(size_t precision, std::string timezone) {} ColumnDateTime64::ColumnDateTime64(TypeRef type, std::shared_ptr data) - : Column(type), + : Column(type, Serialization::MakeDefault(this)), data_(data), precision_(type->As()->GetPrecision()) {} @@ -231,11 +273,11 @@ void ColumnDateTime64::Append(ColumnRef column) { } bool ColumnDateTime64::LoadBody(InputStream* input, size_t rows) { - return data_->LoadBody(input, rows); + return data_->GetSerialization()->LoadBody(data_.get(), input, rows); } void ColumnDateTime64::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } void ColumnDateTime64::Clear() { @@ -249,6 +291,21 @@ ItemView ColumnDateTime64::GetItem(size_t index) const { return ItemView(Type::DateTime64, data_->GetItem(index)); } +void ColumnDateTime64::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, static_cast(0)); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + void ColumnDateTime64::Swap(Column& other) { auto& col = dynamic_cast(other); if (col.GetPrecision() != GetPrecision()) { diff --git a/clickhouse/columns/date.h b/clickhouse/columns/date.h index 2a240c90..328a1476 100644 --- a/clickhouse/columns/date.h +++ b/clickhouse/columns/date.h @@ -25,12 +25,6 @@ class ColumnDate : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -44,7 +38,17 @@ class ColumnDate : public Column { ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::shared_ptr data_; }; @@ -66,12 +70,6 @@ class ColumnDate32 : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -85,7 +83,17 @@ class ColumnDate32 : public Column { ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::shared_ptr data_; }; @@ -111,15 +119,9 @@ class ColumnDateTime : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - /// Clear column data . void Clear() override; - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Returns count of rows in the column. size_t Size() const override; @@ -130,7 +132,17 @@ class ColumnDateTime : public Column { ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::shared_ptr data_; }; @@ -159,15 +171,9 @@ class ColumnDateTime64 : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - /// Clear column data . void Clear() override; - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Returns count of rows in the column. size_t Size() const override; @@ -180,10 +186,19 @@ class ColumnDateTime64 : public Column { size_t GetPrecision() const; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + ColumnDateTime64(TypeRef type, std::shared_ptr data); -private: std::shared_ptr data_; const size_t precision_; }; diff --git a/clickhouse/columns/decimal.cpp b/clickhouse/columns/decimal.cpp index d44dc0c0..b14e48de 100644 --- a/clickhouse/columns/decimal.cpp +++ b/clickhouse/columns/decimal.cpp @@ -100,7 +100,7 @@ inline bool mulOverflow(const Int128 & l, const T & r, Int128 * result) namespace clickhouse { ColumnDecimal::ColumnDecimal(size_t precision, size_t scale) - : Column(Type::CreateDecimal(precision, scale)) + : Column(Type::CreateDecimal(precision, scale), Serialization::MakeDefault(this)) { if (precision <= 9) { data_ = std::make_shared(); @@ -112,7 +112,7 @@ ColumnDecimal::ColumnDecimal(size_t precision, size_t scale) } ColumnDecimal::ColumnDecimal(TypeRef type, ColumnRef data) - : Column(type), + : Column(type, Serialization::MakeDefault(this)), data_(data) { } @@ -198,11 +198,11 @@ void ColumnDecimal::Append(ColumnRef column) { } bool ColumnDecimal::LoadBody(InputStream * input, size_t rows) { - return data_->LoadBody(input, rows); + return data_->GetSerialization()->LoadBody(data_.get(), input, rows); } void ColumnDecimal::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } void ColumnDecimal::Clear() { @@ -232,6 +232,21 @@ ItemView ColumnDecimal::GetItem(size_t index) const { return ItemView{GetType().GetCode(), data_->GetItem(index)}; } +void ColumnDecimal::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, Int128{}); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + size_t ColumnDecimal::GetScale() const { return type_->As()->GetScale(); diff --git a/clickhouse/columns/decimal.h b/clickhouse/columns/decimal.h index d3c05ea2..bcd72c0c 100644 --- a/clickhouse/columns/decimal.h +++ b/clickhouse/columns/decimal.h @@ -21,19 +21,22 @@ class ColumnDecimal : public Column { public: void Append(ColumnRef column) override; - bool LoadBody(InputStream* input, size_t rows) override; - void SaveBody(OutputStream* output) override; void Clear() override; size_t Size() const override; ColumnRef Slice(size_t begin, size_t len) const override; ColumnRef CloneEmpty() const override; void Swap(Column& other) override; ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; size_t GetScale() const; size_t GetPrecision() const; private: + bool LoadBody(InputStream* input, size_t rows); + void SaveBody(OutputStream* output); + friend SerializationDefault; + /// Depending on a precision it can be one of: /// - ColumnInt32 /// - ColumnInt64 diff --git a/clickhouse/columns/enum.cpp b/clickhouse/columns/enum.cpp index 1361e817..7fddf3a0 100644 --- a/clickhouse/columns/enum.cpp +++ b/clickhouse/columns/enum.cpp @@ -9,13 +9,13 @@ namespace clickhouse { template ColumnEnum::ColumnEnum(TypeRef type) - : Column(type) + : Column(type, Serialization::MakeDefault(this)) { } template ColumnEnum::ColumnEnum(TypeRef type, const std::vector& data) - : Column(type) + : Column(type, Serialization::MakeDefault(this)) , data_(data) { } @@ -110,6 +110,22 @@ ItemView ColumnEnum::GetItem(size_t index) const { return ItemView{type_->GetCode(), data_[index]}; } +template +void ColumnEnum::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, static_cast(0)); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + template class ColumnEnum; template class ColumnEnum; diff --git a/clickhouse/columns/enum.h b/clickhouse/columns/enum.h index c31b81ff..b5c3080c 100644 --- a/clickhouse/columns/enum.h +++ b/clickhouse/columns/enum.h @@ -32,12 +32,6 @@ class ColumnEnum : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data. void Clear() override; @@ -50,8 +44,17 @@ class ColumnEnum : public Column { void Swap(Column& other) override; ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::vector data_; }; diff --git a/clickhouse/columns/geo.cpp b/clickhouse/columns/geo.cpp index ebea9895..fc6a592c 100644 --- a/clickhouse/columns/geo.cpp +++ b/clickhouse/columns/geo.cpp @@ -34,13 +34,13 @@ namespace clickhouse { template ColumnGeo::ColumnGeo() - : Column(CreateGeoType()), + : Column(CreateGeoType(), Serialization::MakeDefault(this)), data_(CreateColumn()) { } template ColumnGeo::ColumnGeo(ColumnRef data) - : Column(CreateGeoType()) + : Column(CreateGeoType(), Serialization::MakeDefault(this)) , data_(WrapColumn(std::move(data))) { } @@ -66,16 +66,6 @@ void ColumnGeo::Append(ColumnRef column) { } } -template -bool ColumnGeo::LoadBody(InputStream* input, size_t rows) { - return data_->LoadBody(input, rows); -} - -template -void ColumnGeo::SaveBody(OutputStream* output) { - data_->SaveBody(output); -} - template size_t ColumnGeo::Size() const { return data_->Size(); @@ -97,6 +87,40 @@ void ColumnGeo::Swap(Column& other) { data_.swap(col.data_); } +template +void ColumnGeo::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + +template +bool ColumnGeo::LoadPrefix(InputStream* input, size_t rows) { + return data_->GetSerialization()->LoadPrefix(data_.get(), input, rows); +} + +template +bool ColumnGeo::LoadBody(InputStream* input, size_t rows) { + return data_->GetSerialization()->LoadBody(data_.get(), input, rows); +} + +template +void ColumnGeo::SavePrefix(OutputStream* output) { + data_->GetSerialization()->SavePrefix(data_.get(), output); +} + +template +void ColumnGeo::SaveBody(OutputStream* output) { + data_->GetSerialization()->SaveBody(data_.get(), output); +} + + template class ColumnGeo, Type::Code::Point>; template class ColumnGeo, Type::Code::Ring>; diff --git a/clickhouse/columns/geo.h b/clickhouse/columns/geo.h index 5f3db9b6..24be97bf 100644 --- a/clickhouse/columns/geo.h +++ b/clickhouse/columns/geo.h @@ -32,12 +32,6 @@ class ColumnGeo : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -49,7 +43,23 @@ class ColumnGeo : public Column { ColumnRef CloneEmpty() const override; void Swap(Column& other) override; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column prefix from input stream. + bool LoadPrefix(InputStream* input, size_t rows); + + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column prefix to output stream. + void SavePrefix(OutputStream* output); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault>; + std::shared_ptr data_; }; diff --git a/clickhouse/columns/ip4.cpp b/clickhouse/columns/ip4.cpp index 9c269b85..58bfc41f 100644 --- a/clickhouse/columns/ip4.cpp +++ b/clickhouse/columns/ip4.cpp @@ -3,16 +3,24 @@ #include "../base/socket.h" // for platform-specific IPv4-related functions #include +bool operator==(const in_addr& l, const in_addr& r) { + return l.s_addr == r.s_addr; +} + +bool operator!=(const in_addr& l, const in_addr& r) { + return l.s_addr != r.s_addr; +} + namespace clickhouse { ColumnIPv4::ColumnIPv4() - : Column(Type::CreateIPv4()) + : Column(Type::CreateIPv4(), Serialization::MakeDefault(this)) , data_(std::make_shared()) { } ColumnIPv4::ColumnIPv4(ColumnRef data) - : Column(Type::CreateIPv4()) + : Column(Type::CreateIPv4(), Serialization::MakeDefault(this)) , data_(data ? data->As() : nullptr) { if (!data_) @@ -72,11 +80,11 @@ void ColumnIPv4::Append(ColumnRef column) { } bool ColumnIPv4::LoadBody(InputStream * input, size_t rows) { - return data_->LoadBody(input, rows); + return data_->GetSerialization()->LoadBody(data_.get(), input, rows); } void ColumnIPv4::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } size_t ColumnIPv4::Size() const { @@ -100,4 +108,19 @@ ItemView ColumnIPv4::GetItem(size_t index) const { return ItemView(Type::IPv4, data_->GetItem(index)); } +void ColumnIPv4::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, in_addr()); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + } diff --git a/clickhouse/columns/ip4.h b/clickhouse/columns/ip4.h index 3f25e6d5..abd262e2 100644 --- a/clickhouse/columns/ip4.h +++ b/clickhouse/columns/ip4.h @@ -4,6 +4,10 @@ struct in_addr; +bool operator==(const in_addr& l, const in_addr& r); + +bool operator!=(const in_addr& l, const in_addr& r); + namespace clickhouse { class ColumnIPv4 : public Column { @@ -40,12 +44,6 @@ class ColumnIPv4 : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -58,8 +56,17 @@ class ColumnIPv4 : public Column { void Swap(Column& other) override; ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::shared_ptr data_; }; diff --git a/clickhouse/columns/ip6.cpp b/clickhouse/columns/ip6.cpp index 838bcce0..9faaaf1b 100644 --- a/clickhouse/columns/ip6.cpp +++ b/clickhouse/columns/ip6.cpp @@ -4,18 +4,30 @@ #include +bool operator==(const in6_addr& l, const in6_addr& r) { + const uint64_t* const l_array = reinterpret_cast(&l); + const uint64_t* const r_array = reinterpret_cast(&r); + return l_array[0] == r_array[0] && l_array[1] == r_array[1]; +} + +bool operator!=(const in6_addr& l, const in6_addr& r) { + const uint64_t* const l_array = reinterpret_cast(&l); + const uint64_t* const r_array = reinterpret_cast(&r); + return l_array[0] != r_array[0] || l_array[1] != r_array[1]; +} + namespace clickhouse { static_assert(sizeof(struct in6_addr) == 16, "sizeof in6_addr should be 16 bytes"); ColumnIPv6::ColumnIPv6() - : Column(Type::CreateIPv6()) + : Column(Type::CreateIPv6(), Serialization::MakeDefault(this)) , data_(std::make_shared(16)) { } ColumnIPv6::ColumnIPv6(ColumnRef data) - : Column(Type::CreateIPv6()) + : Column(Type::CreateIPv6(), Serialization::MakeDefault(this)) , data_(data ? data->As() : nullptr) { if (!data_ || data_->FixedSize() != sizeof(in6_addr)) @@ -72,11 +84,11 @@ void ColumnIPv6::Append(ColumnRef column) { } bool ColumnIPv6::LoadBody(InputStream* input, size_t rows) { - return data_->LoadBody(input, rows); + return data_->GetSerialization()->LoadBody(data_.get(), input, rows); } void ColumnIPv6::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } size_t ColumnIPv6::Size() const { @@ -100,4 +112,19 @@ ItemView ColumnIPv6::GetItem(size_t index) const { return ItemView{Type::IPv6, data_->GetItem(index)}; } +void ColumnIPv6::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, in6_addr()); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + } diff --git a/clickhouse/columns/ip6.h b/clickhouse/columns/ip6.h index 74d8c1e1..b5221fd8 100644 --- a/clickhouse/columns/ip6.h +++ b/clickhouse/columns/ip6.h @@ -5,6 +5,10 @@ struct in6_addr; +bool operator==(const in6_addr& l, const in6_addr& r); + +bool operator!=(const in6_addr& l, const in6_addr& r); + namespace clickhouse { class ColumnIPv6 : public Column { @@ -38,12 +42,6 @@ class ColumnIPv6 : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -55,8 +53,17 @@ class ColumnIPv6 : public Column { ColumnRef CloneEmpty() const override; void Swap(Column& other) override; ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::shared_ptr data_; }; diff --git a/clickhouse/columns/lowcardinality.cpp b/clickhouse/columns/lowcardinality.cpp index d3627038..fa528292 100644 --- a/clickhouse/columns/lowcardinality.cpp +++ b/clickhouse/columns/lowcardinality.cpp @@ -155,7 +155,8 @@ inline void AppendToDictionary(Column& dictionary, const ItemView & item) { namespace clickhouse { ColumnLowCardinality::ColumnLowCardinality(ColumnRef dictionary_column) - : Column(Type::CreateLowCardinality(dictionary_column->Type())), + : Column(Type::CreateLowCardinality(dictionary_column->Type()), + Serialization::MakeDefault(this)), dictionary_column_(dictionary_column->CloneEmpty()), // safe way to get an column of the same type. index_column_(std::make_shared()) { @@ -163,7 +164,8 @@ ColumnLowCardinality::ColumnLowCardinality(ColumnRef dictionary_column) } ColumnLowCardinality::ColumnLowCardinality(std::shared_ptr dictionary_column) - : Column(Type::CreateLowCardinality(dictionary_column->Type())), + : Column(Type::CreateLowCardinality(dictionary_column->Type()), + Serialization::MakeDefault(this)), dictionary_column_(dictionary_column->CloneEmpty()), // safe way to get an column of the same type. index_column_(std::make_shared()) { @@ -267,7 +269,7 @@ auto Load(ColumnRef new_dictionary_column, InputStream& input, size_t rows) { dataColumn = nullable->Nested(); } - if (!dataColumn->LoadBody(&input, number_of_keys)) + if (!dataColumn->GetSerialization()->LoadBody(dataColumn.get(), &input, number_of_keys)) throw ProtocolError("Failed to read values of dictionary column."); uint64_t number_of_rows; @@ -277,7 +279,7 @@ auto Load(ColumnRef new_dictionary_column, InputStream& input, size_t rows) { if (number_of_rows != rows) throw AssertionError("LowCardinality column must be read in full."); - new_index_column->LoadBody(&input, number_of_rows); + new_index_column->GetSerialization()->LoadBody(new_index_column.get(), &input, number_of_rows); if (auto nullable = new_dictionary_column->As()) { nullable->Append(true); @@ -339,15 +341,15 @@ void ColumnLowCardinality::SaveBody(OutputStream* output) { WireFormat::WriteFixed(*output, number_of_keys); if (auto columnNullable = dictionary_column_->As()) { - columnNullable->Nested()->SaveBody(output); + columnNullable->Nested()->GetSerialization()->SaveBody(columnNullable->Nested().get(), output); } else { - dictionary_column_->SaveBody(output); + dictionary_column_->GetSerialization()->SaveBody(dictionary_column_.get(), output); } const uint64_t number_of_rows = index_column_->Size(); WireFormat::WriteFixed(*output, number_of_rows); - index_column_->SaveBody(output); + index_column_->GetSerialization()->SaveBody(index_column_.get(), output); } void ColumnLowCardinality::Clear() { @@ -409,6 +411,18 @@ ItemView ColumnLowCardinality::GetItem(size_t index) const { return dictionary_column_->GetItem(dictionaryIndex); } +void ColumnLowCardinality::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + // No checks regarding value type or validity of value is made. void ColumnLowCardinality::AppendUnsafe(const ItemView & value) { const auto key = computeHashKey(value); diff --git a/clickhouse/columns/lowcardinality.h b/clickhouse/columns/lowcardinality.h index afadae22..11bf4a89 100644 --- a/clickhouse/columns/lowcardinality.h +++ b/clickhouse/columns/lowcardinality.h @@ -68,17 +68,6 @@ class ColumnLowCardinality : public Column { /// Appends another LowCardinality column to the end of this one, updating dictionary. void Append(ColumnRef /*column*/) override; - bool LoadPrefix(InputStream* input, size_t rows) override; - - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column prefix to output stream. - void SavePrefix(OutputStream* output) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data. void Clear() override; @@ -90,6 +79,7 @@ class ColumnLowCardinality : public Column { ColumnRef CloneEmpty() const override; void Swap(Column& other) override; ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; size_t GetDictionarySize() const; TypeRef GetNestedType() const; @@ -103,10 +93,22 @@ class ColumnLowCardinality : public Column { void AppendUnsafe(const ItemView &); private: + bool LoadPrefix(InputStream* input, size_t rows); + + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column prefix to output stream. + void SavePrefix(OutputStream* output); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + void Setup(ColumnRef dictionary_column); void AppendNullItem(); void AppendDefaultItem(); + friend SerializationDefault; public: static details::LowCardinalityHashKey computeHashKey(const ItemView &); }; diff --git a/clickhouse/columns/lowcardinalityadaptor.h b/clickhouse/columns/lowcardinalityadaptor.h index bcde1a9b..742dec83 100644 --- a/clickhouse/columns/lowcardinalityadaptor.h +++ b/clickhouse/columns/lowcardinalityadaptor.h @@ -27,21 +27,29 @@ class LowCardinalitySerializationAdaptor : public AdaptedColumnType { public: - using AdaptedColumnType::AdaptedColumnType; + template + LowCardinalitySerializationAdaptor(Args&&...args) + : AdaptedColumnType(std::forward(args)...) + { + Column::serialization_ = Serialization::MakeDefault(this); + } + +private: + friend SerializationDefault>; - bool LoadPrefix(InputStream* input, size_t rows) override { + bool LoadPrefix(InputStream* input, size_t rows) { auto new_data_column = this->Slice(0, 0)->template As(); ColumnLowCardinalityT low_cardinality_col(new_data_column); - return low_cardinality_col.LoadPrefix(input, rows); + return low_cardinality_col.GetSerialization()->LoadPrefix(&low_cardinality_col, input, rows); } /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override { + bool LoadBody(InputStream* input, size_t rows) { auto new_data_column = this->CloneEmpty()->template As(); ColumnLowCardinalityT low_cardinality_col(new_data_column); - if (!low_cardinality_col.LoadBody(input, rows)) + if (!low_cardinality_col.GetSerialization()->LoadBody(&low_cardinality_col, input, rows)) return false; // It safe to reuse `flat_data_column` later since ColumnLowCardinalityT makes a deep copy, but still check just in case. @@ -55,8 +63,21 @@ LowCardinalitySerializationAdaptor : public AdaptedColumnType } /// Saves column data to output stream. - void SaveBody(OutputStream* output) override { - ColumnLowCardinalityT(this->template As()).SaveBody(output); + void SaveBody(OutputStream* output) { + ColumnLowCardinalityT low_cardinality_col(this->template As()); + low_cardinality_col.GetSerialization()->SaveBody(&low_cardinality_col, output); + } + + void SetSerializationKind(Serialization::Kind kind) override { + switch (kind) + { + case Serialization::Kind::DEFAULT: + Column::serialization_ = Serialization::MakeDefault(this); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + Column::Type()->GetName()); + } } }; diff --git a/clickhouse/columns/map.cpp b/clickhouse/columns/map.cpp index 3f5616df..d0e9a059 100644 --- a/clickhouse/columns/map.cpp +++ b/clickhouse/columns/map.cpp @@ -30,7 +30,8 @@ TypeRef GetMapType(const Type& data_type) { namespace clickhouse { ColumnMap::ColumnMap(ColumnRef data) - : Column(GetMapType(data->GetType())), data_(data->As()) { + : Column(GetMapType(data->GetType()), Serialization::MakeDefault(this)) + , data_(data->As()) { } void ColumnMap::Clear() { @@ -44,19 +45,19 @@ void ColumnMap::Append(ColumnRef column) { } bool ColumnMap::LoadPrefix(InputStream* input, size_t rows) { - return data_->LoadPrefix(input, rows); + return data_->GetSerialization()->LoadPrefix(data_.get(), input, rows); } bool ColumnMap::LoadBody(InputStream* input, size_t rows) { - return data_->LoadBody(input, rows); + return data_->GetSerialization()->LoadBody(data_.get(),input, rows); } void ColumnMap::SavePrefix(OutputStream* output) { - data_->SavePrefix(output); + data_->GetSerialization()->SavePrefix(data_.get(), output); } void ColumnMap::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } size_t ColumnMap::Size() const { @@ -76,6 +77,18 @@ void ColumnMap::Swap(Column& other) { data_.swap(col.data_); } +void ColumnMap::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + ColumnRef ColumnMap::GetAsColumn(size_t n) const { return data_->GetAsColumn(n); } diff --git a/clickhouse/columns/map.h b/clickhouse/columns/map.h index 24a8b4ae..1f5af0f8 100644 --- a/clickhouse/columns/map.h +++ b/clickhouse/columns/map.h @@ -28,18 +28,6 @@ class ColumnMap : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column prefix from input stream. - bool LoadPrefix(InputStream* input, size_t rows) override; - - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column prefix to output stream. - void SavePrefix(OutputStream* output) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -51,6 +39,8 @@ class ColumnMap : public Column { ColumnRef CloneEmpty() const override; void Swap(Column&) override; + void SetSerializationKind(Serialization::Kind kind) override; + /// Converts map at pos n to column. /// Type of row is tuple {key, value}. ColumnRef GetAsColumn(size_t n) const; @@ -62,6 +52,20 @@ class ColumnMap : public Column { ColumnMap(ColumnMap&& map); private: + /// Loads column prefix from input stream. + bool LoadPrefix(InputStream* input, size_t rows); + + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column prefix to output stream. + void SavePrefix(OutputStream* output); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::shared_ptr data_; }; diff --git a/clickhouse/columns/nothing.h b/clickhouse/columns/nothing.h index 36ddeaea..3ec57fb6 100644 --- a/clickhouse/columns/nothing.h +++ b/clickhouse/columns/nothing.h @@ -15,13 +15,13 @@ namespace clickhouse { class ColumnNothing : public Column { public: ColumnNothing() - : Column(Type::CreateNothing()) + : Column(Type::CreateNothing(), Serialization::MakeDefault(this)) , size_(0) { } explicit ColumnNothing(size_t n) - : Column(Type::CreateNothing()) + : Column(Type::CreateNothing(), Serialization::MakeDefault(this)) , size_(n) { } @@ -46,6 +46,18 @@ class ColumnNothing : public Column { ItemView GetItem(size_t /*index*/) const override { return ItemView{}; } + void SetSerializationKind(Serialization::Kind kind) override { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } + } + public: /// Appends content of given column to the end of current one. void Append(ColumnRef column) override { @@ -54,18 +66,6 @@ class ColumnNothing : public Column { } } - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override { - input->Skip(rows); - size_ += rows; - return true; - } - - /// Saves column data to output stream. - void SaveBody(OutputStream*) override { - throw UnimplementedError("method SaveBody is not supported for Nothing column"); - } - /// Clear column data . void Clear() override { size_ = 0; } @@ -78,6 +78,20 @@ class ColumnNothing : public Column { } private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows) { + input->Skip(rows); + size_ += rows; + return true; + } + + /// Saves column data to output stream. + void SaveBody(OutputStream*) { + throw UnimplementedError("method SaveBody is not supported for Nothing column"); + } + + friend SerializationDefault; + size_t size_; }; diff --git a/clickhouse/columns/nullable.cpp b/clickhouse/columns/nullable.cpp index dd863545..c75d9917 100644 --- a/clickhouse/columns/nullable.cpp +++ b/clickhouse/columns/nullable.cpp @@ -6,7 +6,7 @@ namespace clickhouse { ColumnNullable::ColumnNullable(ColumnRef nested, ColumnRef nulls) - : Column(Type::CreateNullable(nested->Type())) + : Column(Type::CreateNullable(nested->Type()), Serialization::MakeDefault(this)) , nested_(nested) , nulls_(nulls->As()) { @@ -51,26 +51,26 @@ void ColumnNullable::Clear() { } bool ColumnNullable::LoadPrefix(InputStream* input, size_t rows) { - return nested_->LoadPrefix(input, rows); + return nested_->GetSerialization()->LoadPrefix(nested_.get(), input, rows); } bool ColumnNullable::LoadBody(InputStream* input, size_t rows) { - if (!nulls_->LoadBody(input, rows)) { + if (!nulls_->GetSerialization()->LoadBody(nulls_.get(), input, rows)) { return false; } - if (!nested_->LoadBody(input, rows)) { + if (!nested_->GetSerialization()->LoadBody(nested_.get(), input, rows)) { return false; } return true; } void ColumnNullable::SavePrefix(OutputStream* output) { - nested_->SavePrefix(output); + nested_->GetSerialization()->SavePrefix(nested_.get(), output); } void ColumnNullable::SaveBody(OutputStream* output) { - nulls_->SaveBody(output); - nested_->SaveBody(output); + nulls_->GetSerialization()->SaveBody(nulls_.get(), output); + nested_->GetSerialization()->SaveBody(nested_.get(), output); } size_t ColumnNullable::Size() const { @@ -101,4 +101,16 @@ ItemView ColumnNullable::GetItem(size_t index) const { return nested_->GetItem(index); } +void ColumnNullable::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + } diff --git a/clickhouse/columns/nullable.h b/clickhouse/columns/nullable.h index c1924af0..c36072af 100644 --- a/clickhouse/columns/nullable.h +++ b/clickhouse/columns/nullable.h @@ -30,18 +30,6 @@ class ColumnNullable : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column prefix from input stream. - bool LoadPrefix(InputStream* input, size_t rows) override; - - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column prefix to output stream. - void SavePrefix(OutputStream* output) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -55,7 +43,23 @@ class ColumnNullable : public Column { ItemView GetItem(size_t) const override; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column prefix from input stream. + bool LoadPrefix(InputStream* input, size_t rows); + + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column prefix to output stream. + void SavePrefix(OutputStream* output); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + ColumnRef nested_; std::shared_ptr nulls_; }; diff --git a/clickhouse/columns/numeric.cpp b/clickhouse/columns/numeric.cpp index 4e8d54bf..d077884e 100644 --- a/clickhouse/columns/numeric.cpp +++ b/clickhouse/columns/numeric.cpp @@ -7,20 +7,20 @@ namespace clickhouse { template ColumnVector::ColumnVector() - : Column(Type::CreateSimple()) + : Column(Type::CreateSimple(), Serialization::MakeDefault(this)) { } template ColumnVector::ColumnVector(const std::vector & data) - : Column(Type::CreateSimple()) + : Column(Type::CreateSimple(), Serialization::MakeDefault(this)) , data_(data) { } template ColumnVector::ColumnVector(std::vector && data) - : Column(Type::CreateSimple()) + : Column(Type::CreateSimple(), Serialization::MakeDefault(this)) , data_(std::move(data)) { } @@ -98,6 +98,22 @@ ItemView ColumnVector::GetItem(size_t index) const { return ItemView{type_->GetCode(), data_[index]}; } +template +void ColumnVector::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, static_cast(0)); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + template class ColumnVector; template class ColumnVector; template class ColumnVector; diff --git a/clickhouse/columns/numeric.h b/clickhouse/columns/numeric.h index c6da981e..077906c9 100644 --- a/clickhouse/columns/numeric.h +++ b/clickhouse/columns/numeric.h @@ -34,12 +34,6 @@ class ColumnVector : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -53,7 +47,16 @@ class ColumnVector : public Column { ItemView GetItem(size_t index) const override; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault>; std::vector data_; }; diff --git a/clickhouse/columns/serialization.cpp b/clickhouse/columns/serialization.cpp new file mode 100644 index 00000000..f07a0f4a --- /dev/null +++ b/clickhouse/columns/serialization.cpp @@ -0,0 +1,79 @@ +#include "serialization.h" + +#include "../base/wire_format.h" +#include "column.h" + +namespace { +/// 2^62, because VarInt supports only values < 2^63. +constexpr auto END_OF_GRANULE_FLAG = 1ULL << 62; +} // namespace + +namespace clickhouse { + +SerializationSparse::SerializationSparse(SerializationRef nested) : nested_(std::move(nested)) { + assert(nested_); +} + +Serialization::Kind SerializationSparse::GetKind() const { + return Kind::SPARSE; +} + +bool SerializationSparse::LoadPrefix(Column* column, InputStream* input, size_t rows) const { + return nested_->LoadPrefix(column, input, rows); +} + +bool SerializationSparse::LoadBody(Column* column, InputStream* input, size_t rows) const { + assert(column); + std::vector offsets; + while (true) { + uint64_t group_size; + if (!WireFormat::ReadUInt64(*input, &group_size)) { + return false; + } + + if (group_size & END_OF_GRANULE_FLAG) { + break; + } + + size_t start_of_group = 0; + if (!offsets.empty()) start_of_group = offsets.back() + 1; + + offsets.push_back(start_of_group + group_size); + } + auto nested_column = column->CloneEmpty(); + if (offsets.size() > 0) { + if (!nested_->LoadBody(nested_column.get(), input, offsets.size())) { + return false; + } + } + column->Clear(); + AppendSparseColumn(column, rows, offsets, nested_column.get()); + return true; +} + +void SerializationSparse::SavePrefix(Column* column, OutputStream* output) const { + nested_->SavePrefix(column, output); +} + +void SerializationSparse::SaveBody(Column* column, OutputStream* output) const { + assert(column); + std::vector offsets = GetIndicesOfNonDefaultRows(column); + size_t start = 0; + size_t size = offsets.size(); + for (size_t i = 0; i < size; ++i) { + size_t group_size = offsets[i] - start; + WireFormat::WriteUInt64(*output, group_size); + start += group_size + 1; + } + + size_t group_size = column->Size() - start; + group_size |= END_OF_GRANULE_FLAG; + WireFormat::WriteUInt64(*output, group_size); + if (offsets.size() == 0) { + return; + } + auto nested_column = GetByIndices(column, offsets); + nested_->SaveBody(nested_column.get(), output); +} + +} // namespace clickhouse diff --git a/clickhouse/columns/serialization.h b/clickhouse/columns/serialization.h new file mode 100644 index 00000000..5fd49b58 --- /dev/null +++ b/clickhouse/columns/serialization.h @@ -0,0 +1,205 @@ +#pragma once + +#include +#include + +#include "../exceptions.h" + +namespace clickhouse { + +class InputStream; +class OutputStream; + +class Column; + +using ColumnRef = std::shared_ptr; + +using SerializationRef = std::shared_ptr; + +template +class SerializationDefault; + +template +class SerializationSparseT; + +class Serialization { +public: + enum class Kind : uint8_t { + DEFAULT = 0, + SPARSE = 1, + }; + + Serialization() = default; + virtual ~Serialization() = default; + + virtual Kind GetKind() const = 0; + + /// Loads column prefix from input stream. + virtual bool LoadPrefix(Column* column, InputStream* input, size_t rows) const = 0; + + /// Loads column data from input stream. + virtual bool LoadBody(Column* column, InputStream* input, size_t rows) const = 0; + + /// Saves column prefix to output stream. Column types with prefixes must implement it. + virtual void SavePrefix(Column* column, OutputStream* output) const = 0; + + /// Saves column body to output stream. + virtual void SaveBody(Column* column, OutputStream* output) const = 0; + + template + inline static std::shared_ptr> MakeDefault(C*) { + return std::make_shared>(); + } + + template + inline static std::shared_ptr> MakeSparse(C* column, T default_value) { + return std::make_shared>(MakeDefault(column), std::move(default_value)); + } + +protected: + template + C* ColumnAs(Column* column) const { + C* result = dynamic_cast(column); + if (!result) { + throw ValidationError("Can't cast column"); + } + return result; + } +}; + +template +class SerializationDefault : public Serialization { +public: + Kind GetKind() const override { return Kind::DEFAULT; } + + /// Loads column prefix from input stream. + bool LoadPrefix([[maybe_unused]] Column* column, [[maybe_unused]] InputStream* input, [[maybe_unused]] size_t rows) const override { + if constexpr (HasLoadPrefix::value) { + return ColumnAs(column)->LoadPrefix(input, rows); + } + return true; + }; + + /// Loads column data from input stream. + bool LoadBody(Column* column, InputStream* input, size_t rows) const override { return ColumnAs(column)->LoadBody(input, rows); }; + + /// Saves column prefix to output stream. Column types with prefixes must implement it. + void SavePrefix([[maybe_unused]] Column* column, [[maybe_unused]] OutputStream* output) const override { + if constexpr (HasSavePrefix::value) { + ColumnAs(column)->SavePrefix(output); + } + }; + + /// Saves column body to output stream. + void SaveBody(Column* column, OutputStream* output) const override { ColumnAs(column)->SaveBody(output); }; + +private: + template + struct HasLoadPrefix { + private: + static int detect(...); + template + static decltype(std::declval().LoadPrefix(std::declval(), 0)) detect(const U&); + + public: + static constexpr bool value = std::is_same()))>::value; + }; + + template + struct HasSavePrefix { + private: + static int detect(...); + template + static decltype(std::declval().SavePrefix(std::declval())) detect(const U&); + + public: + static constexpr bool value = std::is_same()))>::value; + }; +}; + +/* + * The main purpose of sparse serialization is to reduce the amount of transmitted data + * in the case when the column is filled with default values. + * Default values depend on the type of column. For instance, it is an empty string in case of string type, + * zero in case of an integer, etc. Sparse serialization saves and loads only non-default values and their indices. + * Saving contains the following steps: + * - Takes indices all non-default values and writes differences between neighboring to stream; + * - Makes a column of the same type containing only non-default values and writes it to stream using nested serialization.. + * Loading makes the opposite work: restores indices and non-default values and appends them to the target column + */ + +class SerializationSparse : public Serialization { +public: + explicit SerializationSparse(SerializationRef nested); + + Kind GetKind() const override; + + /// Loads column prefix from input stream. + bool LoadPrefix(Column* column, InputStream* input, size_t rows) const override; + + /// Loads column data from input stream. + bool LoadBody(Column* column, InputStream* input, size_t rows) const override; + + /// Saves column prefix to output stream. Column types with prefixes must implement it. + void SavePrefix(Column* column, OutputStream* output) const override; + + /// Saves column body to output stream. + void SaveBody(Column* column, OutputStream* output) const override; + +protected: + virtual std::vector GetIndicesOfNonDefaultRows(Column* column) const = 0; + virtual ColumnRef GetByIndices(Column* column, const std::vector& indeces) const = 0; + virtual void AppendSparseColumn(Column* column, size_t rows, const std::vector& indices, Column* values) const = 0; + +private: + SerializationRef nested_; +}; + +template +class SerializationSparseT : public SerializationSparse { +public: + explicit SerializationSparseT(SerializationRef nested, T default_value) + : SerializationSparse(std::move(nested)), default_value_(std::move(default_value)) {} + +protected: + std::vector GetIndicesOfNonDefaultRows(Column* column) const override { + C* typed_column = ColumnAs(column); + std::vector result; + const size_t size = typed_column->Size(); + result.reserve(size); + for (size_t i = 0; i < size; ++i) { + if (typed_column->At(i) != default_value_) { + result.push_back(i); + } + } + return result; + } + + ColumnRef GetByIndices(Column* column, const std::vector& indeces) const override { + C* typed_column = ColumnAs(column); + std::shared_ptr result = typed_column->CloneEmpty()->template AsStrict(); + for (size_t index : indeces) { + result->Append(typed_column->At(index)); + } + return result; + } + + void AppendSparseColumn(Column* column, size_t rows, const std::vector& indices, Column* values) const override { + C* target = ColumnAs(column); + C* source = ColumnAs(values); + auto it = indices.begin(); + for (size_t i = 0; i < rows; ++i) { + if (it == indices.end() || i != *it) { + target->Append(default_value_); + continue; + } + target->Append(source->At(it - indices.begin())); + ++it; + } + } + +private: + T default_value_; +}; + +} // namespace clickhouse diff --git a/clickhouse/columns/string.cpp b/clickhouse/columns/string.cpp index f6597bb4..586a9f78 100644 --- a/clickhouse/columns/string.cpp +++ b/clickhouse/columns/string.cpp @@ -25,7 +25,7 @@ size_t ComputeTotalSize(const Container & strings, size_t begin = 0, size_t len namespace clickhouse { ColumnFixedString::ColumnFixedString(size_t n) - : Column(Type::CreateString(n)) + : Column(Type::CreateString(n), Serialization::MakeDefault(this)) , string_size_(n) { } @@ -118,6 +118,21 @@ ItemView ColumnFixedString::GetItem(size_t index) const { return ItemView{Type::FixedString, this->At(index)}; } +void ColumnFixedString::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, std::string(FixedSize(), '\0')); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + struct ColumnString::Block { using CharT = typename std::string::value_type; @@ -157,12 +172,12 @@ struct ColumnString::Block }; ColumnString::ColumnString() - : Column(Type::CreateString()) + : Column(Type::CreateString(), Serialization::MakeDefault(this)) { } ColumnString::ColumnString(size_t element_count) - : Column(Type::CreateString()) + : Column(Type::CreateString(), Serialization::MakeDefault(this)) { items_.reserve(element_count); // 100 is arbitrary number, assumption that string values are about ~40 bytes long. @@ -328,4 +343,19 @@ ItemView ColumnString::GetItem(size_t index) const { return ItemView{Type::String, this->At(index)}; } +void ColumnString::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, std::string()); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + } diff --git a/clickhouse/columns/string.h b/clickhouse/columns/string.h index 9b83a088..5ff56a3d 100644 --- a/clickhouse/columns/string.h +++ b/clickhouse/columns/string.h @@ -43,12 +43,6 @@ class ColumnFixedString : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -62,7 +56,17 @@ class ColumnFixedString : public Column { ItemView GetItem(size_t) const override; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + size_t string_size_; std::string data_; }; @@ -107,12 +111,6 @@ class ColumnString : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -124,10 +122,19 @@ class ColumnString : public Column { ColumnRef CloneEmpty() const override; void Swap(Column& other) override; ItemView GetItem(size_t) const override; + void SetSerializationKind(Serialization::Kind kind) override; private: void AppendUnsafe(std::string_view); + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + private: struct Block; diff --git a/clickhouse/columns/tuple.cpp b/clickhouse/columns/tuple.cpp index 42dc6e63..cb612364 100644 --- a/clickhouse/columns/tuple.cpp +++ b/clickhouse/columns/tuple.cpp @@ -11,7 +11,7 @@ static std::vector CollectTypes(const std::vector& columns) } ColumnTuple::ColumnTuple(const std::vector& columns) - : Column(Type::CreateTuple(CollectTypes(columns))) + : Column(Type::CreateTuple(CollectTypes(columns)), Serialization::MakeDefault(this)) , columns_(columns) { } @@ -57,8 +57,8 @@ ColumnRef ColumnTuple::CloneEmpty() const { } bool ColumnTuple::LoadPrefix(InputStream* input, size_t rows) { - for (auto ci = columns_.begin(); ci != columns_.end(); ++ci) { - if (!(*ci)->LoadPrefix(input, rows)) { + for (auto & column : columns_) { + if (!(column->GetSerialization()->LoadPrefix(column.get(), input, rows))) { return false; } } @@ -67,8 +67,8 @@ bool ColumnTuple::LoadPrefix(InputStream* input, size_t rows) { } bool ColumnTuple::LoadBody(InputStream* input, size_t rows) { - for (auto ci = columns_.begin(); ci != columns_.end(); ++ci) { - if (!(*ci)->LoadBody(input, rows)) { + for (auto & column : columns_) { + if (!(column->GetSerialization()->LoadBody(column.get(), input, rows))) { return false; } } @@ -78,13 +78,13 @@ bool ColumnTuple::LoadBody(InputStream* input, size_t rows) { void ColumnTuple::SavePrefix(OutputStream* output) { for (auto & column : columns_) { - column->SavePrefix(output); + column->GetSerialization()->SavePrefix(column.get(), output); } } void ColumnTuple::SaveBody(OutputStream* output) { for (auto & column : columns_) { - column->SaveBody(output); + column->GetSerialization()->SaveBody(column.get(), output); } } @@ -97,4 +97,44 @@ void ColumnTuple::Swap(Column& other) { columns_.swap(col.columns_); } +bool ColumnTuple::LoadSerializationKind(InputStream* input) { + if (!Column::LoadSerializationKind(input)) { + return false; + } + for (auto & column : columns_) { + if (!(column->LoadSerializationKind(input))) { + return false; + } + } + return true; +} + +void ColumnTuple::SaveSerializationKind(OutputStream* output) { + Column::SaveSerializationKind(output); + for (auto & column : columns_) { + column->SaveSerializationKind(output); + } +} + +void ColumnTuple::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + +bool ColumnTuple::HasCustomSerialization() const { + for (auto & column : columns_) { + if (column->HasCustomSerialization()) { + return true; + } + } + return false; +} + } diff --git a/clickhouse/columns/tuple.h b/clickhouse/columns/tuple.h index b1b5ad31..7335e159 100644 --- a/clickhouse/columns/tuple.h +++ b/clickhouse/columns/tuple.h @@ -29,18 +29,6 @@ class ColumnTuple : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column prefix from input stream. - bool LoadPrefix(InputStream* input, size_t rows) override; - - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column prefix to output stream. - void SavePrefix(OutputStream* output) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -52,7 +40,29 @@ class ColumnTuple : public Column { ColumnRef CloneEmpty() const override; void Swap(Column& other) override; + bool LoadSerializationKind(InputStream* input) override; + + void SaveSerializationKind(OutputStream* output) override; + + void SetSerializationKind(Serialization::Kind kind) override; + + bool HasCustomSerialization() const override; + private: + /// Loads column prefix from input stream. + bool LoadPrefix(InputStream* input, size_t rows); + + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column prefix to output stream. + void SavePrefix(OutputStream* output); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::vector columns_; }; diff --git a/clickhouse/columns/uuid.cpp b/clickhouse/columns/uuid.cpp index 19e94761..0b86f99c 100644 --- a/clickhouse/columns/uuid.cpp +++ b/clickhouse/columns/uuid.cpp @@ -7,13 +7,13 @@ namespace clickhouse { ColumnUUID::ColumnUUID() - : Column(Type::CreateUUID()) + : Column(Type::CreateUUID(), Serialization::MakeDefault(this)) , data_(std::make_shared()) { } ColumnUUID::ColumnUUID(ColumnRef data) - : Column(Type::CreateUUID()) + : Column(Type::CreateUUID(), Serialization::MakeDefault(this)) , data_(data->As()) { if (data_->Size() % 2 != 0) { @@ -45,11 +45,11 @@ void ColumnUUID::Append(ColumnRef column) { } bool ColumnUUID::LoadBody(InputStream* input, size_t rows) { - return data_->LoadBody(input, rows * 2); + return data_->GetSerialization()->LoadBody(data_.get(), input, rows * 2); } void ColumnUUID::SaveBody(OutputStream* output) { - data_->SaveBody(output); + data_->GetSerialization()->SaveBody(data_.get(), output); } size_t ColumnUUID::Size() const { @@ -77,4 +77,19 @@ ItemView ColumnUUID::GetItem(size_t index) const { return ItemView{Type::UUID, std::string_view{data_item_view.data.data(), data_item_view.data.size() * 2}}; } +void ColumnUUID::SetSerializationKind(Serialization::Kind kind) { + switch (kind) + { + case Serialization::Kind::DEFAULT: + serialization_ = Serialization::MakeDefault(this); + break; + case Serialization::Kind::SPARSE: + serialization_ = Serialization::MakeSparse(this, UUID{0,0}); + break; + default: + throw UnimplementedError("Serialization kind:" + std::to_string(static_cast(kind)) + + " is not supported for column of " + type_->GetName()); + } +} + } diff --git a/clickhouse/columns/uuid.h b/clickhouse/columns/uuid.h index dd7d0b9d..27dead1b 100644 --- a/clickhouse/columns/uuid.h +++ b/clickhouse/columns/uuid.h @@ -29,12 +29,6 @@ class ColumnUUID : public Column { /// Appends content of given column to the end of current one. void Append(ColumnRef column) override; - /// Loads column data from input stream. - bool LoadBody(InputStream* input, size_t rows) override; - - /// Saves column data to output stream. - void SaveBody(OutputStream* output) override; - /// Clear column data . void Clear() override; @@ -48,7 +42,17 @@ class ColumnUUID : public Column { ItemView GetItem(size_t) const override; + void SetSerializationKind(Serialization::Kind kind) override; + private: + /// Loads column data from input stream. + bool LoadBody(InputStream* input, size_t rows); + + /// Saves column data to output stream. + void SaveBody(OutputStream* output); + + friend SerializationDefault; + std::shared_ptr data_; }; diff --git a/ut/CMakeLists.txt b/ut/CMakeLists.txt index 2c9f6ee5..a0fe33f0 100644 --- a/ut/CMakeLists.txt +++ b/ut/CMakeLists.txt @@ -25,6 +25,7 @@ SET ( clickhouse-cpp-ut-src utils.cpp value_generators.cpp low_cardinality_nullable_tests.cpp + sparse_ut.cpp ) IF (WITH_OPENSSL) diff --git a/ut/sparse_ut.cpp b/ut/sparse_ut.cpp new file mode 100644 index 00000000..c341662c --- /dev/null +++ b/ut/sparse_ut.cpp @@ -0,0 +1,355 @@ +#include +#include +#include +#include + +#include "clickhouse/client.h" +#include "utils.h" + +namespace { +using namespace clickhouse; +} + +static const auto localHostEndpoint = ClientOptions() + .SetHost(getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) + .SetPort(getEnvOrDefault("CLICKHOUSE_PORT", "9000")) + .SetUser(getEnvOrDefault("CLICKHOUSE_USER", "default")) + .SetPassword(getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) + .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default")); + +template +class Generator; + +template +class GeneratorNumeric { +public: + static void Generate(std::shared_ptr& col) { + for (size_t i = 0; i < 1000; ++i) { + if (i % 10 == 0) { + col->Append(static_cast(i)); + } else { + col->Append(static_cast(0)); + } + } + } +}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template <> +class Generator : public GeneratorNumeric {}; + +template +class GeneratorString { +public: + static void Generate(std::shared_ptr& col) { + for (size_t i = 0; i < 1000; ++i) { + if (i % 10 == 0) { + col->Append(std::to_string(i)); + } else { + col->Append(""); + } + } + } +}; + +template <> +class Generator : public GeneratorString {}; + +template <> +class Generator : public GeneratorString {}; + +template <> +class Generator { +public: + static void Generate(std::shared_ptr& col) { + for (size_t i = 0; i < 1000; ++i) { + if (i % 10 == 0) { + col->Append(i); + } else { + col->Append(0); + } + } + } +}; + +template <> +class Generator { +public: + static void Generate(std::shared_ptr& col) { + unsigned char default_value[16]; + memset(default_value, 0, 16); + for (size_t i = 0; i < 1000; ++i) { + if (i % 10 == 0) { + unsigned char value[16]; + memcpy(value, &i, sizeof(i)); + col->Append(reinterpret_cast(value)); + } else { + col->Append(reinterpret_cast(default_value)); + } + } + } +}; + +template <> +class Generator { +public: + static void Generate(std::shared_ptr& col) { + UUID default_value{0, 0}; + for (size_t i = 0; i < 1000; ++i) { + if (i % 10 == 0) { + UUID value{i, i}; + col->Append(value); + } else { + col->Append(default_value); + } + } + } +}; + +template +class GeneratorEnum { +public: + static void Generate(std::shared_ptr& col) { + for (size_t i = 0; i < 1000; ++i) { + if (i % 10 == 0) { + col->Append(1); + } else { + col->Append(0); + } + } + } +}; + +template <> +class Generator : public GeneratorEnum {}; + +template <> +class Generator : public GeneratorEnum {}; + +template <> +class Generator { +public: + static void Generate(std::shared_ptr& col) { + auto int_col = (*col)[0]->template AsStrict(); + Generator::Generate(int_col); + auto string_col = (*col)[1]->template AsStrict(); + for (size_t i = 0; i < 1000; ++i) { + string_col->Append(std::to_string(i)); + } + } +}; + +template +class GenericSparseColumnTest : public testing::Test { +public: + using ColumnType = std::decay_t; + + static auto MakeColumn() { + if constexpr (std::is_same_v) { + return std::make_shared(12); + } else if constexpr (std::is_same_v) { + return std::make_shared(3); + } else if constexpr (std::is_same_v) { + return std::make_shared(10, 5); + } else if constexpr (std::is_same_v) { + return std::make_shared(Type::CreateEnum8({{"Zero", 0}, {"One", 1}, {"Two", 2}})); + } else if constexpr (std::is_same_v) { + return std::make_shared(Type::CreateEnum16({{"Zero", 0}, {"One", 1}, {"Two", 2}})); + } else if constexpr (std::is_same_v) { + return std::make_shared( + std::vector({std::make_shared(), std::make_shared()})); + } else { + return std::make_shared(); + } + } + + static auto MakeColumnWithValues() { + auto col = MakeColumn(); + Generator::Generate(col); + return col; + } + + static auto Compare(const ColumnType& left, const ColumnType& right) { + if constexpr (std::is_same_v) { + auto result = CompareRecursive(*left[0]->template AsStrict(), *right[0]->template AsStrict()); + if (!result) return result; + result = CompareRecursive(*left[1]->template AsStrict(), *right[1]->template AsStrict()); + return result; + } else { + return CompareRecursive(left, right); + } + } + + static void SetSerializationKind(ColumnType& column, Serialization::Kind kind) { + if constexpr (std::is_same_v) { + column[0]->SetSerializationKind(kind); + } else { + column.SetSerializationKind(kind); + } + } +}; + +using ValueColumns = + ::testing::Types; + +TYPED_TEST_SUITE(GenericSparseColumnTest, ValueColumns); + +TYPED_TEST(GenericSparseColumnTest, LoadAndSave) { + auto column_A = this->MakeColumnWithValues(); + + this->SetSerializationKind(*column_A, Serialization::Kind::SPARSE); + + char buffer[16 * 1024] = {'\0'}; + { + ArrayOutput output(buffer, sizeof(buffer)); + // Save + EXPECT_NO_THROW(column_A->Save(&output)); + } + + auto column_B = this->MakeColumn(); + + this->SetSerializationKind(*column_B, Serialization::Kind::SPARSE); + + { + ArrayInput input(buffer, sizeof(buffer)); + // Load + EXPECT_TRUE(column_B->Load(&input, column_A->Size())); + } + + EXPECT_TRUE(this->Compare(*column_A, *column_B)); +} + +TYPED_TEST(GenericSparseColumnTest, SaveSparse) { + auto column = this->MakeColumnWithValues(); + + clickhouse::Client client(localHostEndpoint); + + if (versionNumber(client.GetServerInfo()) < versionNumber(22, 1)) { + GTEST_SKIP() << "Sparse serialization is available since v22.1.2.2-stable and can't be tested against server: " + << client.GetServerInfo(); + } + + const std::string table_name = "test_clickhouse_cpp_test_ut_sparse_table"; + const std::string column_name = "test_column"; + const auto type_name = column->GetType().GetName(); + + client.Execute("DROP TEMPORARY TABLE IF EXISTS " + table_name + ";"); + client.Execute("CREATE TEMPORARY TABLE IF NOT EXISTS " + table_name + "( " + column_name + " " + type_name + " )"); + + Block block; + block.AppendColumn(column_name, column); + + this->SetSerializationKind(*column, Serialization::Kind::SPARSE); + + client.Insert(table_name, block); + + client.Select("SELECT " + column_name + " FROM " + table_name, [&](const Block& block) { + if (block.GetRowCount() == 0) return; + ASSERT_EQ(1U, block.GetColumnCount()); + auto result = block[0]->template AsStrict(); + EXPECT_TRUE(this->Compare(*column, *result)); + }); +} + +TYPED_TEST(GenericSparseColumnTest, LoadSparse) { + auto column = this->MakeColumnWithValues(); + + clickhouse::Client client(localHostEndpoint); + + if (versionNumber(client.GetServerInfo()) < versionNumber(22, 1)) { + GTEST_SKIP() << "Sparse serialization is available since v22.1.2.2-stable and can't be tested against server: " + << client.GetServerInfo(); + } + + const std::string table_name = "test_clickhouse_cpp_test_ut_sparse_table"; + const std::string column_name = "test_column"; + const auto type_name = column->GetType().GetName(); + + client.Execute("DROP TABLE IF EXISTS " + table_name + ";"); + try { + client.Execute("CREATE TABLE IF NOT EXISTS " + table_name + "( id UInt64," + column_name + " " + type_name + + " )" + " ENGINE = MergeTree ORDER BY id " + " SETTINGS index_granularity = 32, " + " ratio_of_defaults_for_sparse_serialization = 0.1;"); + } catch (const std::exception& e) { + std::cerr << "Got error while create table: " << e.what() << std::endl; + // DB::Exception: clickhouse_cpp_cicd: Cannot execute query in readonly mode + if (std::string(e.what()).find("Cannot execute query in readonly mode") != std::string::npos) { + GTEST_SKIP() << "Database in readonly mode"; + } + // DB::Exception: clickhouse_cpp_cicd: Not enough privileges. To execute this query it's necessary to have grant CREATE TABLE ON + // default.test_clickhouse_cpp_test_ut_sparse_table + if (std::string(e.what()).find("Not enough privileges") != std::string::npos) { + GTEST_SKIP() << "Not enough privileges"; + } + throw; + } + + Block block; + block.AppendColumn(column_name, column); + + client.Insert(table_name, block); + + client.Select("SELECT " + column_name + " FROM " + table_name, [&](const Block& block) { + if (block.GetRowCount() == 0) return; + ASSERT_EQ(1U, block.GetColumnCount()); + auto result = block[0]->template AsStrict(); + if constexpr (std::is_same_v) { + (*column)[0]->SetSerializationKind(Serialization::Kind::DEFAULT); + ASSERT_EQ((*result)[0]->GetSerialization()->GetKind(), Serialization::Kind::SPARSE); + ASSERT_EQ((*result)[1]->GetSerialization()->GetKind(), Serialization::Kind::DEFAULT); + } else { + ASSERT_EQ(result->GetSerialization()->GetKind(), Serialization::Kind::SPARSE); + } + EXPECT_TRUE(this->Compare(*column, *result)); + }); + + client.Execute("DROP TABLE IF EXISTS " + table_name + ";"); +}