From b50b1c18940dd3d893e849d8264032005e382dfb Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Thu, 20 Jun 2024 15:24:04 -0700 Subject: [PATCH] [ENH] Min compaction size (#2346) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - I overhauled the property tests for the log service. There were bugs and also it was underspecc'ed - The main bug was the the len() being used was over collectionData not of the collectionData[C] in question. - I added invariants using rapids "" action, which is the way to add invariants. - The invariants check the log is as we expect and also the the collections we expect get returned - I actually use the enumerationoffset in the model now - The model will _actually_ purge the log it has. - The prop test will check the fields of its records for equality - New functionality - This PR introduces the "Min compaction size" argument on GetCollectionsToCompact, making it so that the compactors can skip logs with only a handful of entries. - This min compaction size is config for the compactor - I updated the rust in memory log to respect the min size. - The property test is extended to check that min_compaction_size works. Other notes: - The log service types all use int where they should be uint. I added a cleanup task to go convert the incorrect types. The new type I introduce is correct. ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --- chromadb/proto/chroma_pb2_grpc.py | 4 +- chromadb/proto/coordinator_pb2_grpc.py | 3 +- chromadb/proto/logservice_pb2.py | 20 +- chromadb/proto/logservice_pb2.pyi | 6 +- chromadb/proto/logservice_pb2_grpc.py | 3 +- go/database/log/db/copyfrom.go | 2 +- go/database/log/db/db.go | 2 +- go/database/log/db/models.go | 4 +- go/database/log/db/queries.sql.go | 7 +- go/database/log/queries/queries.sql | 1 + go/pkg/log/repository/log.go | 7 +- go/pkg/log/server/property_test.go | 400 +++++++++++++++--- go/pkg/log/server/server.go | 3 +- go/pkg/proto/coordinatorpb/chroma.pb.go | 103 ++--- go/pkg/proto/coordinatorpb/chroma_grpc.pb.go | 20 +- go/pkg/proto/coordinatorpb/coordinator.pb.go | 82 ++-- .../coordinatorpb/coordinator_grpc.pb.go | 56 +-- go/pkg/proto/logservicepb/logservice.pb.go | 44 +- .../proto/logservicepb/logservice_grpc.pb.go | 20 +- idl/chromadb/proto/logservice.proto | 4 +- rust/worker/chroma_config.yaml | 1 + .../src/compactor/compaction_manager.rs | 9 + rust/worker/src/compactor/config.rs | 1 + rust/worker/src/compactor/scheduler.rs | 10 +- rust/worker/src/config.rs | 4 + rust/worker/src/log/log.rs | 16 +- 26 files changed, 574 insertions(+), 258 deletions(-) diff --git a/chromadb/proto/chroma_pb2_grpc.py b/chromadb/proto/chroma_pb2_grpc.py index ec3b9fe126f..b61baf8d5a1 100644 --- a/chromadb/proto/chroma_pb2_grpc.py +++ b/chromadb/proto/chroma_pb2_grpc.py @@ -5,7 +5,7 @@ from chromadb.proto import chroma_pb2 as chromadb_dot_proto_dot_chroma__pb2 -GRPC_GENERATED_VERSION = '1.63.0' +GRPC_GENERATED_VERSION = '1.64.1' GRPC_VERSION = grpc.__version__ EXPECTED_ERROR_RELEASE = '1.65.0' SCHEDULED_RELEASE_DATE = 'June 25, 2024' @@ -87,6 +87,7 @@ def add_MetadataReaderServicer_to_server(servicer, server): generic_handler = grpc.method_handlers_generic_handler( 'chroma.MetadataReader', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('chroma.MetadataReader', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. @@ -207,6 +208,7 @@ def add_VectorReaderServicer_to_server(servicer, server): generic_handler = grpc.method_handlers_generic_handler( 'chroma.VectorReader', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('chroma.VectorReader', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. diff --git a/chromadb/proto/coordinator_pb2_grpc.py b/chromadb/proto/coordinator_pb2_grpc.py index ecd983ef7b4..0c81aa1b5a4 100644 --- a/chromadb/proto/coordinator_pb2_grpc.py +++ b/chromadb/proto/coordinator_pb2_grpc.py @@ -6,7 +6,7 @@ from chromadb.proto import coordinator_pb2 as chromadb_dot_proto_dot_coordinator__pb2 from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 -GRPC_GENERATED_VERSION = '1.63.0' +GRPC_GENERATED_VERSION = '1.64.1' GRPC_VERSION = grpc.__version__ EXPECTED_ERROR_RELEASE = '1.65.0' SCHEDULED_RELEASE_DATE = 'June 25, 2024' @@ -308,6 +308,7 @@ def add_SysDBServicer_to_server(servicer, server): generic_handler = grpc.method_handlers_generic_handler( 'chroma.SysDB', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('chroma.SysDB', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. diff --git a/chromadb/proto/logservice_pb2.py b/chromadb/proto/logservice_pb2.py index a08a261c265..6724352d107 100644 --- a/chromadb/proto/logservice_pb2.py +++ b/chromadb/proto/logservice_pb2.py @@ -15,7 +15,7 @@ from chromadb.proto import chroma_pb2 as chromadb_dot_proto_dot_chroma__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1f\x63hromadb/proto/logservice.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto\"R\n\x0fPushLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12(\n\x07records\x18\x02 \x03(\x0b\x32\x17.chroma.OperationRecord\"(\n\x10PushLogsResponse\x12\x14\n\x0crecord_count\x18\x01 \x01(\x05\"n\n\x0fPullLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x19\n\x11start_from_offset\x18\x02 \x01(\x03\x12\x12\n\nbatch_size\x18\x03 \x01(\x05\x12\x15\n\rend_timestamp\x18\x04 \x01(\x03\"H\n\tLogRecord\x12\x12\n\nlog_offset\x18\x01 \x01(\x03\x12\'\n\x06record\x18\x02 \x01(\x0b\x32\x17.chroma.OperationRecord\"6\n\x10PullLogsResponse\x12\"\n\x07records\x18\x01 \x03(\x0b\x32\x11.chroma.LogRecord\"W\n\x0e\x43ollectionInfo\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x18\n\x10\x66irst_log_offset\x18\x02 \x01(\x03\x12\x14\n\x0c\x66irst_log_ts\x18\x03 \x01(\x03\"&\n$GetAllCollectionInfoToCompactRequest\"\\\n%GetAllCollectionInfoToCompactResponse\x12\x33\n\x13\x61ll_collection_info\x18\x01 \x03(\x0b\x32\x16.chroma.CollectionInfo\"M\n UpdateCollectionLogOffsetRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x12\n\nlog_offset\x18\x02 \x01(\x03\"#\n!UpdateCollectionLogOffsetResponse2\x82\x03\n\nLogService\x12?\n\x08PushLogs\x12\x17.chroma.PushLogsRequest\x1a\x18.chroma.PushLogsResponse\"\x00\x12?\n\x08PullLogs\x12\x17.chroma.PullLogsRequest\x1a\x18.chroma.PullLogsResponse\"\x00\x12~\n\x1dGetAllCollectionInfoToCompact\x12,.chroma.GetAllCollectionInfoToCompactRequest\x1a-.chroma.GetAllCollectionInfoToCompactResponse\"\x00\x12r\n\x19UpdateCollectionLogOffset\x12(.chroma.UpdateCollectionLogOffsetRequest\x1a).chroma.UpdateCollectionLogOffsetResponse\"\x00\x42\x39Z7github.com/chroma-core/chroma/go/pkg/proto/logservicepbb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1f\x63hromadb/proto/logservice.proto\x12\x06\x63hroma\x1a\x1b\x63hromadb/proto/chroma.proto\"R\n\x0fPushLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12(\n\x07records\x18\x02 \x03(\x0b\x32\x17.chroma.OperationRecord\"(\n\x10PushLogsResponse\x12\x14\n\x0crecord_count\x18\x01 \x01(\x05\"n\n\x0fPullLogsRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x19\n\x11start_from_offset\x18\x02 \x01(\x03\x12\x12\n\nbatch_size\x18\x03 \x01(\x05\x12\x15\n\rend_timestamp\x18\x04 \x01(\x03\"H\n\tLogRecord\x12\x12\n\nlog_offset\x18\x01 \x01(\x03\x12\'\n\x06record\x18\x02 \x01(\x0b\x32\x17.chroma.OperationRecord\"6\n\x10PullLogsResponse\x12\"\n\x07records\x18\x01 \x03(\x0b\x32\x11.chroma.LogRecord\"W\n\x0e\x43ollectionInfo\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x18\n\x10\x66irst_log_offset\x18\x02 \x01(\x03\x12\x14\n\x0c\x66irst_log_ts\x18\x03 \x01(\x03\"C\n$GetAllCollectionInfoToCompactRequest\x12\x1b\n\x13min_compaction_size\x18\x01 \x01(\x04\"\\\n%GetAllCollectionInfoToCompactResponse\x12\x33\n\x13\x61ll_collection_info\x18\x01 \x03(\x0b\x32\x16.chroma.CollectionInfo\"M\n UpdateCollectionLogOffsetRequest\x12\x15\n\rcollection_id\x18\x01 \x01(\t\x12\x12\n\nlog_offset\x18\x02 \x01(\x03\"#\n!UpdateCollectionLogOffsetResponse2\x82\x03\n\nLogService\x12?\n\x08PushLogs\x12\x17.chroma.PushLogsRequest\x1a\x18.chroma.PushLogsResponse\"\x00\x12?\n\x08PullLogs\x12\x17.chroma.PullLogsRequest\x1a\x18.chroma.PullLogsResponse\"\x00\x12~\n\x1dGetAllCollectionInfoToCompact\x12,.chroma.GetAllCollectionInfoToCompactRequest\x1a-.chroma.GetAllCollectionInfoToCompactResponse\"\x00\x12r\n\x19UpdateCollectionLogOffset\x12(.chroma.UpdateCollectionLogOffsetRequest\x1a).chroma.UpdateCollectionLogOffsetResponse\"\x00\x42\x39Z7github.com/chroma-core/chroma/go/pkg/proto/logservicepbb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -36,13 +36,13 @@ _globals['_COLLECTIONINFO']._serialized_start=440 _globals['_COLLECTIONINFO']._serialized_end=527 _globals['_GETALLCOLLECTIONINFOTOCOMPACTREQUEST']._serialized_start=529 - _globals['_GETALLCOLLECTIONINFOTOCOMPACTREQUEST']._serialized_end=567 - _globals['_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE']._serialized_start=569 - _globals['_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE']._serialized_end=661 - _globals['_UPDATECOLLECTIONLOGOFFSETREQUEST']._serialized_start=663 - _globals['_UPDATECOLLECTIONLOGOFFSETREQUEST']._serialized_end=740 - _globals['_UPDATECOLLECTIONLOGOFFSETRESPONSE']._serialized_start=742 - _globals['_UPDATECOLLECTIONLOGOFFSETRESPONSE']._serialized_end=777 - _globals['_LOGSERVICE']._serialized_start=780 - _globals['_LOGSERVICE']._serialized_end=1166 + _globals['_GETALLCOLLECTIONINFOTOCOMPACTREQUEST']._serialized_end=596 + _globals['_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE']._serialized_start=598 + _globals['_GETALLCOLLECTIONINFOTOCOMPACTRESPONSE']._serialized_end=690 + _globals['_UPDATECOLLECTIONLOGOFFSETREQUEST']._serialized_start=692 + _globals['_UPDATECOLLECTIONLOGOFFSETREQUEST']._serialized_end=769 + _globals['_UPDATECOLLECTIONLOGOFFSETRESPONSE']._serialized_start=771 + _globals['_UPDATECOLLECTIONLOGOFFSETRESPONSE']._serialized_end=806 + _globals['_LOGSERVICE']._serialized_start=809 + _globals['_LOGSERVICE']._serialized_end=1195 # @@protoc_insertion_point(module_scope) diff --git a/chromadb/proto/logservice_pb2.pyi b/chromadb/proto/logservice_pb2.pyi index ac0b8428e24..5c388f85a5d 100644 --- a/chromadb/proto/logservice_pb2.pyi +++ b/chromadb/proto/logservice_pb2.pyi @@ -57,8 +57,10 @@ class CollectionInfo(_message.Message): def __init__(self, collection_id: _Optional[str] = ..., first_log_offset: _Optional[int] = ..., first_log_ts: _Optional[int] = ...) -> None: ... class GetAllCollectionInfoToCompactRequest(_message.Message): - __slots__ = () - def __init__(self) -> None: ... + __slots__ = ("min_compaction_size",) + MIN_COMPACTION_SIZE_FIELD_NUMBER: _ClassVar[int] + min_compaction_size: int + def __init__(self, min_compaction_size: _Optional[int] = ...) -> None: ... class GetAllCollectionInfoToCompactResponse(_message.Message): __slots__ = ("all_collection_info",) diff --git a/chromadb/proto/logservice_pb2_grpc.py b/chromadb/proto/logservice_pb2_grpc.py index 46312f21895..788f2df60ce 100644 --- a/chromadb/proto/logservice_pb2_grpc.py +++ b/chromadb/proto/logservice_pb2_grpc.py @@ -5,7 +5,7 @@ from chromadb.proto import logservice_pb2 as chromadb_dot_proto_dot_logservice__pb2 -GRPC_GENERATED_VERSION = '1.63.0' +GRPC_GENERATED_VERSION = '1.64.1' GRPC_VERSION = grpc.__version__ EXPECTED_ERROR_RELEASE = '1.65.0' SCHEDULED_RELEASE_DATE = 'June 25, 2024' @@ -115,6 +115,7 @@ def add_LogServiceServicer_to_server(servicer, server): generic_handler = grpc.method_handlers_generic_handler( 'chroma.LogService', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('chroma.LogService', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. diff --git a/go/database/log/db/copyfrom.go b/go/database/log/db/copyfrom.go index 08355528484..bc99481622e 100644 --- a/go/database/log/db/copyfrom.go +++ b/go/database/log/db/copyfrom.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.25.0 +// sqlc v1.26.0 // source: copyfrom.go package log diff --git a/go/database/log/db/db.go b/go/database/log/db/db.go index c0e8f2d798b..30d17edb0c8 100644 --- a/go/database/log/db/db.go +++ b/go/database/log/db/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.25.0 +// sqlc v1.26.0 package log diff --git a/go/database/log/db/models.go b/go/database/log/db/models.go index 29e3889e08b..2fcce42bf15 100644 --- a/go/database/log/db/models.go +++ b/go/database/log/db/models.go @@ -1,11 +1,9 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.25.0 +// sqlc v1.26.0 package log -import () - type Collection struct { ID string RecordCompactionOffsetPosition int64 diff --git a/go/database/log/db/queries.sql.go b/go/database/log/db/queries.sql.go index b4a306bd582..e1769fa9eee 100644 --- a/go/database/log/db/queries.sql.go +++ b/go/database/log/db/queries.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.25.0 +// sqlc v1.26.0 // source: queries.sql package log @@ -14,6 +14,7 @@ with summary as ( select r.collection_id, r.offset, r.timestamp, row_number() over(partition by r.collection_id order by r.offset) as rank from record_log r, collection c where r.collection_id = c.id + and (c.record_enumeration_offset_position - c.record_compaction_offset_position) >= $1 and r.offset > c.record_compaction_offset_position ) select collection_id, "offset", timestamp, rank from summary @@ -28,8 +29,8 @@ type GetAllCollectionsToCompactRow struct { Rank int64 } -func (q *Queries) GetAllCollectionsToCompact(ctx context.Context) ([]GetAllCollectionsToCompactRow, error) { - rows, err := q.db.Query(ctx, getAllCollectionsToCompact) +func (q *Queries) GetAllCollectionsToCompact(ctx context.Context, minCompactionSize int64) ([]GetAllCollectionsToCompactRow, error) { + rows, err := q.db.Query(ctx, getAllCollectionsToCompact, minCompactionSize) if err != nil { return nil, err } diff --git a/go/database/log/queries/queries.sql b/go/database/log/queries/queries.sql index 98e27a7a4be..e5e0686eb0e 100644 --- a/go/database/log/queries/queries.sql +++ b/go/database/log/queries/queries.sql @@ -15,6 +15,7 @@ with summary as ( select r.collection_id, r.offset, r.timestamp, row_number() over(partition by r.collection_id order by r.offset) as rank from record_log r, collection c where r.collection_id = c.id + and (c.record_enumeration_offset_position - c.record_compaction_offset_position) >= sqlc.arg(min_compaction_size) and r.offset > c.record_compaction_offset_position ) select * from summary diff --git a/go/pkg/log/repository/log.go b/go/pkg/log/repository/log.go index a267e589276..be6d43eab19 100644 --- a/go/pkg/log/repository/log.go +++ b/go/pkg/log/repository/log.go @@ -3,10 +3,11 @@ package repository import ( "context" "errors" + "time" + log "github.com/chroma-core/chroma/go/database/log/db" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "time" ) type LogRepository struct { @@ -76,8 +77,8 @@ func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, of return } -func (r *LogRepository) GetAllCollectionInfoToCompact(ctx context.Context) (collectionToCompact []log.GetAllCollectionsToCompactRow, err error) { - collectionToCompact, err = r.queries.GetAllCollectionsToCompact(ctx) +func (r *LogRepository) GetAllCollectionInfoToCompact(ctx context.Context, minCompactionSize uint64) (collectionToCompact []log.GetAllCollectionsToCompactRow, err error) { + collectionToCompact, err = r.queries.GetAllCollectionsToCompact(ctx, int64(minCompactionSize)) if collectionToCompact == nil { collectionToCompact = []log.GetAllCollectionsToCompactRow{} } diff --git a/go/pkg/log/server/property_test.go b/go/pkg/log/server/property_test.go index ace7c4dba6b..e167cf49c99 100644 --- a/go/pkg/log/server/property_test.go +++ b/go/pkg/log/server/property_test.go @@ -2,6 +2,10 @@ package server import ( "context" + "math" + "testing" + "time" + log "github.com/chroma-core/chroma/go/database/log/db" "github.com/chroma-core/chroma/go/pkg/log/configuration" "github.com/chroma-core/chroma/go/pkg/log/repository" @@ -12,15 +16,23 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "google.golang.org/protobuf/proto" "pgregory.net/rapid" - "testing" - "time" ) type ModelState struct { - CollectionEnumerationOffset map[types.UniqueID]int64 - CollectionData map[types.UniqueID][]*coordinatorpb.OperationRecord - CollectionCompactionOffset map[types.UniqueID]int64 + // The current max offset for each collection + CollectionEnumerationOffset map[types.UniqueID]uint64 + // The current non-purged log for each collection and its offset + CollectionData map[types.UniqueID][]ModelLogRecord + // The current compaction offset for each collection (the last offset that was compacted) + CollectionCompactionOffset map[types.UniqueID]uint64 +} + +// A log entry in the model (for testing only) +type ModelLogRecord struct { + offset uint64 + record *coordinatorpb.OperationRecord } type LogServerTestSuite struct { @@ -45,22 +57,244 @@ func (suite *LogServerTestSuite) SetupSuite() { suite.lr = repository.NewLogRepository(conn) suite.logServer = NewLogServer(suite.lr) suite.model = ModelState{ - CollectionData: map[types.UniqueID][]*coordinatorpb.OperationRecord{}, - CollectionCompactionOffset: map[types.UniqueID]int64{}, + CollectionEnumerationOffset: map[types.UniqueID]uint64{}, + CollectionData: map[types.UniqueID][]ModelLogRecord{}, + CollectionCompactionOffset: map[types.UniqueID]uint64{}, + } +} + +// Invariants + +// Check that the correct set of collections are returned for compaction +// The set of collections returned for compaction should be the set of collections +// where the difference between the enumeration offset and the compaction offset +// is greater than the minimum compaction size (if specified) +// Additionally, we should never return a collection if it is not dirty +func (suite *LogServerTestSuite) invariantAllDirtyCollectionsAreReturnedForCompaction(ctx context.Context, t *rapid.T) { + result, err := suite.logServer.GetAllCollectionInfoToCompact(ctx, &logservicepb.GetAllCollectionInfoToCompactRequest{}) + assert.NoError(suite.t, err) + numCollectionsNeedingCompaction := 0 + // Iterate over collections with log data + for collectionId, _ := range suite.model.CollectionData { + compactionOffset, ok := suite.model.CollectionCompactionOffset[collectionId] + if !ok { + compactionOffset = 0 + } + + enumerationOffset, ok := suite.model.CollectionEnumerationOffset[collectionId] + if !ok { + t.Fatalf("State inconsistency: collection %s has no enumeration offset, yet has log data", collectionId) + } + + if enumerationOffset-compactionOffset > 0 { + numCollectionsNeedingCompaction++ + // Expect to find the collection in the result + found := false + for _, collection := range result.AllCollectionInfo { + id, err := types.Parse(collection.CollectionId) + if err != nil { + t.Fatal(err) + } + if id == collectionId { + found = true + break + } + } + if !found { + t.Fatalf("collection %s not found in result", collectionId) + } + } + } + if numCollectionsNeedingCompaction != len(result.AllCollectionInfo) { + t.Fatalf("expected %d collections needing compaction, got %d", numCollectionsNeedingCompaction, len(result.AllCollectionInfo)) + } +} + +func compareModelLogRecordToRecordLog(t *rapid.T, modelLogRecord ModelLogRecord, recordLog log.RecordLog) { + record := &coordinatorpb.OperationRecord{} + if err := proto.Unmarshal(recordLog.Record, record); err != nil { + t.Fatal(err) + } + if int64(modelLogRecord.offset) != recordLog.Offset { + t.Fatalf("expected offset %d, got %d", modelLogRecord.offset, recordLog.Offset) + } + if modelLogRecord.record.Id != record.Id { + t.Fatalf("expected record id %s, got %s", modelLogRecord.record.Id, record.Id) + } + if string(modelLogRecord.record.Vector.Vector) != string(record.Vector.Vector) { + t.Fatalf("expected record vector %s, got %s", string(modelLogRecord.record.Vector.Vector), string(record.Vector.Vector)) + } + if modelLogRecord.record.Vector.Encoding != record.Vector.Encoding { + t.Fatalf("expected record encoding %s, got %s", modelLogRecord.record.Vector.Encoding, record.Vector.Encoding) + } + if modelLogRecord.record.Vector.Dimension != record.Vector.Dimension { + t.Fatalf("expected record dimension %d, got %d", modelLogRecord.record.Vector.Dimension, record.Vector.Dimension) + } + if modelLogRecord.record.Operation != record.Operation { + t.Fatalf("expected record operation %s, got %s", modelLogRecord.record.Operation, record.Operation) + } + if modelLogRecord.record.Metadata != record.Metadata { + t.Fatalf("expected record metadata %s, got %s", modelLogRecord.record.Metadata, record.Metadata) + } +} + +func compareModelLogRecordToLogRecord(t *rapid.T, modelLogRecord ModelLogRecord, recordLog *logservicepb.LogRecord) { + if int64(modelLogRecord.offset) != recordLog.LogOffset { + t.Fatalf("expected offset %d, got %d", modelLogRecord.offset, recordLog.LogOffset) + } + if modelLogRecord.record.Id != recordLog.Record.Id { + t.Fatalf("expected record id %s, got %s", modelLogRecord.record.Id, recordLog.Record.Id) + } + if string(modelLogRecord.record.Vector.Vector) != string(recordLog.Record.Vector.Vector) { + t.Fatalf("expected record vector %s, got %s", string(modelLogRecord.record.Vector.Vector), string(recordLog.Record.Vector.Vector)) + } + if modelLogRecord.record.Vector.Encoding != recordLog.Record.Vector.Encoding { + t.Fatalf("expected record encoding %s, got %s", modelLogRecord.record.Vector.Encoding, recordLog.Record.Vector.Encoding) + } + if modelLogRecord.record.Vector.Dimension != recordLog.Record.Vector.Dimension { + t.Fatalf("expected record dimension %d, got %d", modelLogRecord.record.Vector.Dimension, recordLog.Record.Vector.Dimension) + } + if modelLogRecord.record.Operation != recordLog.Record.Operation { + t.Fatalf("expected record operation %s, got %s", modelLogRecord.record.Operation, recordLog.Record.Operation) + } + if modelLogRecord.record.Metadata != recordLog.Record.Metadata { + t.Fatalf("expected record metadata %s, got %s", modelLogRecord.record.Metadata, recordLog.Record.Metadata) + } +} + +// Check that the set of logs from the compaction offset onwards +// is the same in both the model and the SUT +func (suite *LogServerTestSuite) invariantLogsAreTheSame(ctx context.Context, t *rapid.T) { + for id, model_log := range suite.model.CollectionData { + pulled_log, err := suite.lr.PullRecords(ctx, id.String(), 0, len(model_log), time.Now().UnixNano()) + if err != nil { + t.Fatal(err) + } + // Length of log should be the same + if len(model_log) != len(pulled_log) { + t.Fatalf("expected log length %d, got %d", len(model_log), len(pulled_log)) + } + + // Each record should be the same + for i, modelLogRecord := range model_log { + // Compare the record + compareModelLogRecordToRecordLog(t, modelLogRecord, pulled_log[i]) + } } } +func (suite *LogServerTestSuite) modelPushLogs(ctx context.Context, t *rapid.T, collectionId types.UniqueID, recordsToPush []*coordinatorpb.OperationRecord) { + // Update the model + startEnumerationOffset, ok := suite.model.CollectionEnumerationOffset[collectionId] + if !ok { + startEnumerationOffset = 0 + } + // Enumeration offset is 1 based and should always be + // 1 greater than the last offset + startEnumerationOffset++ + + for i, record := range recordsToPush { + modelRecord := ModelLogRecord{ + offset: startEnumerationOffset + uint64(i), + record: record, + } + suite.model.CollectionData[collectionId] = append(suite.model.CollectionData[collectionId], modelRecord) + suite.model.CollectionEnumerationOffset[collectionId] = startEnumerationOffset + uint64(i) + } + +} + +func (suite *LogServerTestSuite) modelPullLogs(ctx context.Context, t *rapid.T, c types.UniqueID) ([]ModelLogRecord, uint64, uint32) { + startOffset := rapid.Uint64Range(suite.model.CollectionCompactionOffset[c], suite.model.CollectionEnumerationOffset[c]).Draw(t, "start_offset") + // If start offset is 0, we need to set it to 1 as the offset is 1 based + if startOffset == 0 { + startOffset = 1 + } + batchSize := rapid.Uint32Range(1, 20).Draw(t, "batch_size") + + // Pull logs from the model + modelLogs := suite.model.CollectionData[c] + // Find start offset in the model + startIndex := -1 + for i, record := range modelLogs { + if record.offset == startOffset { + startIndex = i + break + } + } + if startIndex == -1 { + t.Fatalf("start offset %d not found in model", startOffset) + } + endIndex := startIndex + int(batchSize) + if endIndex > len(modelLogs) { + endIndex = len(modelLogs) + } + expectedRecords := modelLogs[startIndex:endIndex] + return expectedRecords, startOffset, batchSize +} + +func (suite *LogServerTestSuite) modelPurgeLogs(ctx context.Context, t *rapid.T) { + for id, log := range suite.model.CollectionData { + compactionOffset, ok := suite.model.CollectionCompactionOffset[id] + if !ok { + // No compaction has occurred yet, so we can't purge + continue + } + + new_log := []ModelLogRecord{} + for _, record := range log { + // TODO: It is odd that the SUT purge behavior keeps the record + // with the compaction offset. Shouldn't we be able to purge this + // record? + if record.offset >= compactionOffset { + new_log = append(new_log, record) + } + } + suite.model.CollectionData[id] = new_log + } +} + +func (suite *LogServerTestSuite) modelGetAllCollectionInfoToCompact(ctx context.Context, t *rapid.T) (uint64, uint64, map[types.UniqueID]uint64, bool) { + minCompactionSize := uint64(math.MaxUint64) + maxCompactionSize := uint64(0) + actualCompactionSizes := make(map[types.UniqueID]uint64) + allEmpty := true + for id, log := range suite.model.CollectionData { + if len(log) > 0 { + allEmpty = false + } + + enumerationOffset := suite.model.CollectionEnumerationOffset[id] + compactionOffset, ok := suite.model.CollectionCompactionOffset[id] + if !ok { + compactionOffset = 0 + } + delta := enumerationOffset - compactionOffset + actualCompactionSizes[id] = delta + if delta < minCompactionSize { + minCompactionSize = delta + } + if delta > maxCompactionSize { + maxCompactionSize = delta + } + } + return minCompactionSize, maxCompactionSize, actualCompactionSizes, allEmpty +} + +// State machine func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() { ctx := context.Background() - // Generate collection ids - collections := make([]types.UniqueID, 10) - for i := 0; i < len(collections); i++ { - collections[i] = types.NewUniqueID() - } + maxCollections := 100 + collections := make(map[int]types.UniqueID) collectionGen := rapid.Custom(func(t *rapid.T) types.UniqueID { - return collections[rapid.IntRange(0, len(collections)-1).Draw(t, "collection_id")] + position := rapid.IntRange(0, maxCollections-1).Draw(t, "collection_position") + if _, ok := collections[position]; !ok { + collections[position] = types.NewUniqueID() + } + return collections[position] }) + recordGen := rapid.SliceOf(rapid.Custom(func(t *rapid.T) *coordinatorpb.OperationRecord { data := rapid.SliceOf(rapid.Byte()).Draw(t, "record_data") id := rapid.String().Draw(t, "record_id") @@ -74,8 +308,14 @@ func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() { rapid.Check(suite.t, func(t *rapid.T) { t.Repeat(map[string]func(*rapid.T){ "pushLogs": func(t *rapid.T) { + // Generate data c := collectionGen.Draw(t, "collection") records := recordGen.Draw(t, "record") + + // Update the model + suite.modelPushLogs(ctx, t, c, records) + + // Update the SUT r, err := suite.logServer.PushLogs(ctx, &logservicepb.PushLogsRequest{ CollectionId: c.String(), Records: records, @@ -86,87 +326,122 @@ func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() { if int32(len(records)) != r.RecordCount { t.Fatal("record count mismatch", len(records), r.RecordCount) } - suite.model.CollectionData[c] = append(suite.model.CollectionData[c], records...) }, - "getAllCollectionsToCompact": func(t *rapid.T) { + "compaction": func(t *rapid.T) { + // Query the SUT for all collections to compact (We could query the model too + // it doesn't really matter since we just want to know that if compacted + // the output is the same in getallcollections, which we have another + // transition for) result, err := suite.logServer.GetAllCollectionInfoToCompact(ctx, &logservicepb.GetAllCollectionInfoToCompactRequest{}) assert.NoError(suite.t, err) + + // For reach collection the SUT wants to compact, perform a compaction for _, collection := range result.AllCollectionInfo { id, err := types.Parse(collection.CollectionId) if err != nil { t.Fatal(err) } - compactionOffset := rapid.Int64Range(suite.model.CollectionCompactionOffset[id], int64(len(suite.model.CollectionData))).Draw(t, "new_position") + enumerationOffset := suite.model.CollectionEnumerationOffset[id] + + // Update the SUT + compactionOffset := rapid.Uint64Range(suite.model.CollectionCompactionOffset[id], enumerationOffset).Draw(t, "new_position") _, err = suite.logServer.UpdateCollectionLogOffset(ctx, &logservicepb.UpdateCollectionLogOffsetRequest{ CollectionId: id.String(), - LogOffset: compactionOffset, + LogOffset: int64(compactionOffset), }) if err != nil { t.Fatal(err) } + + // Update the model suite.model.CollectionCompactionOffset[id] = compactionOffset } }, + "getAllCollectionsToCompactWithMinCompactionSize": func(t *rapid.T) { + if len(suite.model.CollectionData) == 0 { + // Nothing to do if no data + return + } + + // Query the model + minCompactionSize, maxCompactionSize, actualCompactionSizes, allEmpty := suite.modelGetAllCollectionInfoToCompact(ctx, t) + if allEmpty { + // Nothing to do if no data + return + } + + // Query the SUT + requestMinCompactionSize := rapid.Uint64Range(minCompactionSize, maxCompactionSize).Draw(t, "min_compaction_size") + result, err := suite.logServer.GetAllCollectionInfoToCompact(ctx, &logservicepb.GetAllCollectionInfoToCompactRequest{ + MinCompactionSize: requestMinCompactionSize, + }) + assert.NoError(suite.t, err) + + // Verify that the result is correct + for _, collection := range result.AllCollectionInfo { + id, err := types.Parse(collection.CollectionId) + if err != nil { + t.Fatal(err) + } + + actualCompactionSize := actualCompactionSizes[id] + if actualCompactionSize < requestMinCompactionSize { + t.Fatalf("compaction size %d is less than request min compaction size %d", actualCompactionSize, requestMinCompactionSize) + } + } + + // Verify that the length of the results is correct + model_expects := 0 + for _, size := range actualCompactionSizes { + if size >= requestMinCompactionSize && size > 0 { + model_expects++ + } + } + if model_expects != len(result.AllCollectionInfo) { + t.Fatalf("expected %d collections, got %d", model_expects, len(result.AllCollectionInfo)) + } + }, "pullLogs": func(t *rapid.T) { c := collectionGen.Draw(t, "collection") - startOffset := rapid.Int64Range(suite.model.CollectionCompactionOffset[c], int64(len(suite.model.CollectionData))).Draw(t, "start_offset") - // If start offset is 0, we need to set it to 1 as the offset is 1 based - if startOffset == 0 { - startOffset = 1 + + // Pull logs from the model + // If the collection has no data, we can't pull logs + if len(suite.model.CollectionData[c]) == 0 { + return } - batchSize := rapid.Int32Range(1, 20).Draw(t, "batch_size") + + expectedRecords, startOffset, batchSize := suite.modelPullLogs(ctx, t, c) + + // Pull logs from the SUT response, err := suite.logServer.PullLogs(ctx, &logservicepb.PullLogsRequest{ CollectionId: c.String(), - StartFromOffset: startOffset, - BatchSize: batchSize, + StartFromOffset: int64(startOffset), + BatchSize: int32(batchSize), EndTimestamp: time.Now().UnixNano(), }) if err != nil { t.Fatal(err) } + // Verify that the number of records returned is correct - if len(suite.model.CollectionData[c]) > int(startOffset) { - if len(suite.model.CollectionData[c])-int(startOffset) < int(batchSize) { - suite.Equal(len(response.Records), len(suite.model.CollectionData[c])-int(startOffset)+1) - } else { - suite.Equal(len(response.Records), int(batchSize)) - } - } - // Verify that the first record offset is correct - if len(response.Records) > 0 { - suite.Equal(response.Records[0].LogOffset, startOffset) - } - // Verify that record returned is matching the expected record - for _, record := range response.Records { - expectedRecord := suite.model.CollectionData[c][record.LogOffset-1] - if string(expectedRecord.Vector.Vector) != string(record.Record.Vector.Vector) { - t.Fatalf("expect record vector %s, got %s", string(expectedRecord.Vector.Vector), string(record.Record.Vector.Vector)) - } - if expectedRecord.Id != record.Record.Id { - t.Fatalf("expect record id %s, got %s", expectedRecord.Id, record.Record.Id) - } + if int64(len(response.Records)) != int64(len(expectedRecords)) { + t.Fatalf("expected %d records, got %d", len(expectedRecords), len(response.Records)) } - // Verify that the first and last record offset is correct - if len(response.Records) > 0 { - lastRecord := response.Records[len(response.Records)-1] - firstRecord := response.Records[0] - // - expectedLastOffset := startOffset + int64(batchSize) - 1 - if expectedLastOffset > int64(len(suite.model.CollectionData[c])) { - expectedLastOffset = int64(len(suite.model.CollectionData[c])) - } - if lastRecord.LogOffset != expectedLastOffset { - t.Fatalf("expect last record %d, got %d", lastRecord.LogOffset, expectedLastOffset) - } - if firstRecord.LogOffset != startOffset { - t.Fatalf("expect first record %d, got %d", startOffset, firstRecord.LogOffset) - } + // Verify the record data is the same + for i, logRecord := range response.Records { + expectedLogRecord := expectedRecords[i] + compareModelLogRecordToLogRecord(t, expectedLogRecord, logRecord) } }, "purgeLogs": func(t *rapid.T) { + // Purge the model + suite.modelPurgeLogs(ctx, t) + + // Purge the SUT err := suite.lr.PurgeRecords(ctx) suite.NoError(err) + // Verify that all record logs are purged for id, offset := range suite.model.CollectionCompactionOffset { if offset != 0 { @@ -174,11 +449,16 @@ func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() { records, err = suite.lr.PullRecords(ctx, id.String(), 0, 1, time.Now().UnixNano()) suite.NoError(err) if len(records) > 0 { - suite.Equal(offset, records[0].Offset) + suite.Equal(int64(offset), records[0].Offset) } } } }, + "": func(t *rapid.T) { + // "" is the invariant check function in rapid + suite.invariantAllDirtyCollectionsAreReturnedForCompaction(ctx, t) + suite.invariantLogsAreTheSame(ctx, t) + }, }) }) } diff --git a/go/pkg/log/server/server.go b/go/pkg/log/server/server.go index 504f74960b0..c768a055c60 100644 --- a/go/pkg/log/server/server.go +++ b/go/pkg/log/server/server.go @@ -2,6 +2,7 @@ package server import ( "context" + log "github.com/chroma-core/chroma/go/database/log/db" "github.com/chroma-core/chroma/go/pkg/log/repository" "github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb" @@ -73,7 +74,7 @@ func (s *logServer) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequ func (s *logServer) GetAllCollectionInfoToCompact(ctx context.Context, req *logservicepb.GetAllCollectionInfoToCompactRequest) (res *logservicepb.GetAllCollectionInfoToCompactResponse, err error) { var collectionToCompact []log.GetAllCollectionsToCompactRow - collectionToCompact, err = s.lr.GetAllCollectionInfoToCompact(ctx) + collectionToCompact, err = s.lr.GetAllCollectionInfoToCompact(ctx, req.MinCompactionSize) if err != nil { return } diff --git a/go/pkg/proto/coordinatorpb/chroma.pb.go b/go/pkg/proto/coordinatorpb/chroma.pb.go index 3dc3631928a..b8f1c6a7d82 100644 --- a/go/pkg/proto/coordinatorpb/chroma.pb.go +++ b/go/pkg/proto/coordinatorpb/chroma.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2 -// protoc v5.26.1 +// protoc-gen-go v1.33.0 +// protoc v4.23.4 // source: chromadb/proto/chroma.proto package coordinatorpb @@ -888,6 +888,9 @@ type UpdateMetadataValue struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + // Not set if user wants to delete the key. + // TODO(Sanket): Should we make this more explicit? + // // Types that are assignable to Value: // // *UpdateMetadataValue_StringValue @@ -3209,7 +3212,7 @@ func file_chromadb_proto_chroma_proto_rawDescGZIP() []byte { var file_chromadb_proto_chroma_proto_enumTypes = make([]protoimpl.EnumInfo, 8) var file_chromadb_proto_chroma_proto_msgTypes = make([]protoimpl.MessageInfo, 38) -var file_chromadb_proto_chroma_proto_goTypes = []any{ +var file_chromadb_proto_chroma_proto_goTypes = []interface{}{ (Operation)(0), // 0: chroma.Operation (ScalarEncoding)(0), // 1: chroma.ScalarEncoding (SegmentScope)(0), // 2: chroma.SegmentScope @@ -3327,7 +3330,7 @@ func file_chromadb_proto_chroma_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_chromadb_proto_chroma_proto_msgTypes[0].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Status); i { case 0: return &v.state @@ -3339,7 +3342,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[1].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Vector); i { case 0: return &v.state @@ -3351,7 +3354,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[2].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*FilePaths); i { case 0: return &v.state @@ -3363,7 +3366,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[3].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Segment); i { case 0: return &v.state @@ -3375,7 +3378,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[4].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Collection); i { case 0: return &v.state @@ -3387,7 +3390,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[5].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Database); i { case 0: return &v.state @@ -3399,7 +3402,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[6].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Tenant); i { case 0: return &v.state @@ -3411,7 +3414,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[7].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UpdateMetadataValue); i { case 0: return &v.state @@ -3423,7 +3426,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[8].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UpdateMetadata); i { case 0: return &v.state @@ -3435,7 +3438,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[9].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*OperationRecord); i { case 0: return &v.state @@ -3447,7 +3450,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[10].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CountRecordsRequest); i { case 0: return &v.state @@ -3459,7 +3462,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[11].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CountRecordsResponse); i { case 0: return &v.state @@ -3471,7 +3474,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[12].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QueryMetadataRequest); i { case 0: return &v.state @@ -3483,7 +3486,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[13].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QueryMetadataResponse); i { case 0: return &v.state @@ -3495,7 +3498,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[14].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*MetadataEmbeddingRecord); i { case 0: return &v.state @@ -3507,7 +3510,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[15].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*WhereDocument); i { case 0: return &v.state @@ -3519,7 +3522,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[16].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DirectWhereDocument); i { case 0: return &v.state @@ -3531,7 +3534,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[17].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*WhereDocumentChildren); i { case 0: return &v.state @@ -3543,7 +3546,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[18].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[18].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Where); i { case 0: return &v.state @@ -3555,7 +3558,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[19].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[19].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DirectComparison); i { case 0: return &v.state @@ -3567,7 +3570,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[20].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*WhereChildren); i { case 0: return &v.state @@ -3579,7 +3582,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[21].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*StringListComparison); i { case 0: return &v.state @@ -3591,7 +3594,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[22].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[22].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SingleStringComparison); i { case 0: return &v.state @@ -3603,7 +3606,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[23].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[23].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SingleBoolComparison); i { case 0: return &v.state @@ -3615,7 +3618,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[24].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[24].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*IntListComparison); i { case 0: return &v.state @@ -3627,7 +3630,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[25].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SingleIntComparison); i { case 0: return &v.state @@ -3639,7 +3642,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[26].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DoubleListComparison); i { case 0: return &v.state @@ -3651,7 +3654,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[27].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*BoolListComparison); i { case 0: return &v.state @@ -3663,7 +3666,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[28].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SingleDoubleComparison); i { case 0: return &v.state @@ -3675,7 +3678,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[29].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetVectorsRequest); i { case 0: return &v.state @@ -3687,7 +3690,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[30].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[30].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetVectorsResponse); i { case 0: return &v.state @@ -3699,7 +3702,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[31].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[31].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*VectorEmbeddingRecord); i { case 0: return &v.state @@ -3711,7 +3714,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[32].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[32].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QueryVectorsRequest); i { case 0: return &v.state @@ -3723,7 +3726,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[33].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[33].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*QueryVectorsResponse); i { case 0: return &v.state @@ -3735,7 +3738,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[34].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[34].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*VectorQueryResults); i { case 0: return &v.state @@ -3747,7 +3750,7 @@ func file_chromadb_proto_chroma_proto_init() { return nil } } - file_chromadb_proto_chroma_proto_msgTypes[35].Exporter = func(v any, i int) any { + file_chromadb_proto_chroma_proto_msgTypes[35].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*VectorQueryResult); i { case 0: return &v.state @@ -3760,25 +3763,25 @@ func file_chromadb_proto_chroma_proto_init() { } } } - file_chromadb_proto_chroma_proto_msgTypes[3].OneofWrappers = []any{} - file_chromadb_proto_chroma_proto_msgTypes[4].OneofWrappers = []any{} - file_chromadb_proto_chroma_proto_msgTypes[7].OneofWrappers = []any{ + file_chromadb_proto_chroma_proto_msgTypes[3].OneofWrappers = []interface{}{} + file_chromadb_proto_chroma_proto_msgTypes[4].OneofWrappers = []interface{}{} + file_chromadb_proto_chroma_proto_msgTypes[7].OneofWrappers = []interface{}{ (*UpdateMetadataValue_StringValue)(nil), (*UpdateMetadataValue_IntValue)(nil), (*UpdateMetadataValue_FloatValue)(nil), (*UpdateMetadataValue_BoolValue)(nil), } - file_chromadb_proto_chroma_proto_msgTypes[9].OneofWrappers = []any{} - file_chromadb_proto_chroma_proto_msgTypes[12].OneofWrappers = []any{} - file_chromadb_proto_chroma_proto_msgTypes[15].OneofWrappers = []any{ + file_chromadb_proto_chroma_proto_msgTypes[9].OneofWrappers = []interface{}{} + file_chromadb_proto_chroma_proto_msgTypes[12].OneofWrappers = []interface{}{} + file_chromadb_proto_chroma_proto_msgTypes[15].OneofWrappers = []interface{}{ (*WhereDocument_Direct)(nil), (*WhereDocument_Children)(nil), } - file_chromadb_proto_chroma_proto_msgTypes[18].OneofWrappers = []any{ + file_chromadb_proto_chroma_proto_msgTypes[18].OneofWrappers = []interface{}{ (*Where_DirectComparison)(nil), (*Where_Children)(nil), } - file_chromadb_proto_chroma_proto_msgTypes[19].OneofWrappers = []any{ + file_chromadb_proto_chroma_proto_msgTypes[19].OneofWrappers = []interface{}{ (*DirectComparison_SingleStringOperand)(nil), (*DirectComparison_StringListOperand)(nil), (*DirectComparison_SingleIntOperand)(nil), @@ -3788,15 +3791,15 @@ func file_chromadb_proto_chroma_proto_init() { (*DirectComparison_BoolListOperand)(nil), (*DirectComparison_SingleBoolOperand)(nil), } - file_chromadb_proto_chroma_proto_msgTypes[25].OneofWrappers = []any{ + file_chromadb_proto_chroma_proto_msgTypes[25].OneofWrappers = []interface{}{ (*SingleIntComparison_GenericComparator)(nil), (*SingleIntComparison_NumberComparator)(nil), } - file_chromadb_proto_chroma_proto_msgTypes[28].OneofWrappers = []any{ + file_chromadb_proto_chroma_proto_msgTypes[28].OneofWrappers = []interface{}{ (*SingleDoubleComparison_GenericComparator)(nil), (*SingleDoubleComparison_NumberComparator)(nil), } - file_chromadb_proto_chroma_proto_msgTypes[35].OneofWrappers = []any{} + file_chromadb_proto_chroma_proto_msgTypes[35].OneofWrappers = []interface{}{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/go/pkg/proto/coordinatorpb/chroma_grpc.pb.go b/go/pkg/proto/coordinatorpb/chroma_grpc.pb.go index 7338ae575f5..a99c2632a5b 100644 --- a/go/pkg/proto/coordinatorpb/chroma_grpc.pb.go +++ b/go/pkg/proto/coordinatorpb/chroma_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 -// - protoc v5.26.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 // source: chromadb/proto/chroma.proto package coordinatorpb @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 const ( MetadataReader_QueryMetadata_FullMethodName = "/chroma.MetadataReader/QueryMetadata" @@ -40,9 +40,8 @@ func NewMetadataReaderClient(cc grpc.ClientConnInterface) MetadataReaderClient { } func (c *metadataReaderClient) QueryMetadata(ctx context.Context, in *QueryMetadataRequest, opts ...grpc.CallOption) (*QueryMetadataResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(QueryMetadataResponse) - err := c.cc.Invoke(ctx, MetadataReader_QueryMetadata_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, MetadataReader_QueryMetadata_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -50,9 +49,8 @@ func (c *metadataReaderClient) QueryMetadata(ctx context.Context, in *QueryMetad } func (c *metadataReaderClient) CountRecords(ctx context.Context, in *CountRecordsRequest, opts ...grpc.CallOption) (*CountRecordsResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(CountRecordsResponse) - err := c.cc.Invoke(ctx, MetadataReader_CountRecords_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, MetadataReader_CountRecords_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -169,9 +167,8 @@ func NewVectorReaderClient(cc grpc.ClientConnInterface) VectorReaderClient { } func (c *vectorReaderClient) GetVectors(ctx context.Context, in *GetVectorsRequest, opts ...grpc.CallOption) (*GetVectorsResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GetVectorsResponse) - err := c.cc.Invoke(ctx, VectorReader_GetVectors_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, VectorReader_GetVectors_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -179,9 +176,8 @@ func (c *vectorReaderClient) GetVectors(ctx context.Context, in *GetVectorsReque } func (c *vectorReaderClient) QueryVectors(ctx context.Context, in *QueryVectorsRequest, opts ...grpc.CallOption) (*QueryVectorsResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(QueryVectorsResponse) - err := c.cc.Invoke(ctx, VectorReader_QueryVectors_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, VectorReader_QueryVectors_FullMethodName, in, out, opts...) if err != nil { return nil, err } diff --git a/go/pkg/proto/coordinatorpb/coordinator.pb.go b/go/pkg/proto/coordinatorpb/coordinator.pb.go index 80eb941f22f..ffbc2a9339f 100644 --- a/go/pkg/proto/coordinatorpb/coordinator.pb.go +++ b/go/pkg/proto/coordinatorpb/coordinator.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2 -// protoc v5.26.1 +// protoc-gen-go v1.33.0 +// protoc v4.23.4 // source: chromadb/proto/coordinator.proto package coordinatorpb @@ -2368,7 +2368,7 @@ func file_chromadb_proto_coordinator_proto_rawDescGZIP() []byte { } var file_chromadb_proto_coordinator_proto_msgTypes = make([]protoimpl.MessageInfo, 34) -var file_chromadb_proto_coordinator_proto_goTypes = []any{ +var file_chromadb_proto_coordinator_proto_goTypes = []interface{}{ (*CreateDatabaseRequest)(nil), // 0: chroma.CreateDatabaseRequest (*CreateDatabaseResponse)(nil), // 1: chroma.CreateDatabaseResponse (*GetDatabaseRequest)(nil), // 2: chroma.GetDatabaseRequest @@ -2488,7 +2488,7 @@ func file_chromadb_proto_coordinator_proto_init() { } file_chromadb_proto_chroma_proto_init() if !protoimpl.UnsafeEnabled { - file_chromadb_proto_coordinator_proto_msgTypes[0].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateDatabaseRequest); i { case 0: return &v.state @@ -2500,7 +2500,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[1].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateDatabaseResponse); i { case 0: return &v.state @@ -2512,7 +2512,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[2].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetDatabaseRequest); i { case 0: return &v.state @@ -2524,7 +2524,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[3].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetDatabaseResponse); i { case 0: return &v.state @@ -2536,7 +2536,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[4].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateTenantRequest); i { case 0: return &v.state @@ -2548,7 +2548,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[5].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateTenantResponse); i { case 0: return &v.state @@ -2560,7 +2560,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[6].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetTenantRequest); i { case 0: return &v.state @@ -2572,7 +2572,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[7].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetTenantResponse); i { case 0: return &v.state @@ -2584,7 +2584,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[8].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateSegmentRequest); i { case 0: return &v.state @@ -2596,7 +2596,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[9].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateSegmentResponse); i { case 0: return &v.state @@ -2608,7 +2608,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[10].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DeleteSegmentRequest); i { case 0: return &v.state @@ -2620,7 +2620,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[11].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DeleteSegmentResponse); i { case 0: return &v.state @@ -2632,7 +2632,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[12].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetSegmentsRequest); i { case 0: return &v.state @@ -2644,7 +2644,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[13].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetSegmentsResponse); i { case 0: return &v.state @@ -2656,7 +2656,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[14].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UpdateSegmentRequest); i { case 0: return &v.state @@ -2668,7 +2668,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[15].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UpdateSegmentResponse); i { case 0: return &v.state @@ -2680,7 +2680,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[16].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateCollectionRequest); i { case 0: return &v.state @@ -2692,7 +2692,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[17].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CreateCollectionResponse); i { case 0: return &v.state @@ -2704,7 +2704,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[18].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[18].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DeleteCollectionRequest); i { case 0: return &v.state @@ -2716,7 +2716,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[19].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[19].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DeleteCollectionResponse); i { case 0: return &v.state @@ -2728,7 +2728,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[20].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetCollectionsRequest); i { case 0: return &v.state @@ -2740,7 +2740,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[21].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetCollectionsResponse); i { case 0: return &v.state @@ -2752,7 +2752,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[22].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[22].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UpdateCollectionRequest); i { case 0: return &v.state @@ -2764,7 +2764,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[23].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[23].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UpdateCollectionResponse); i { case 0: return &v.state @@ -2776,7 +2776,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[24].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[24].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Notification); i { case 0: return &v.state @@ -2788,7 +2788,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[25].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ResetStateResponse); i { case 0: return &v.state @@ -2800,7 +2800,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[26].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetLastCompactionTimeForTenantRequest); i { case 0: return &v.state @@ -2812,7 +2812,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[27].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*TenantLastCompactionTime); i { case 0: return &v.state @@ -2824,7 +2824,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[28].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetLastCompactionTimeForTenantResponse); i { case 0: return &v.state @@ -2836,7 +2836,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[29].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SetLastCompactionTimeForTenantRequest); i { case 0: return &v.state @@ -2848,7 +2848,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[30].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[30].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*FlushSegmentCompactionInfo); i { case 0: return &v.state @@ -2860,7 +2860,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[31].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[31].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*FlushCollectionCompactionRequest); i { case 0: return &v.state @@ -2872,7 +2872,7 @@ func file_chromadb_proto_coordinator_proto_init() { return nil } } - file_chromadb_proto_coordinator_proto_msgTypes[32].Exporter = func(v any, i int) any { + file_chromadb_proto_coordinator_proto_msgTypes[32].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*FlushCollectionCompactionResponse); i { case 0: return &v.state @@ -2885,16 +2885,16 @@ func file_chromadb_proto_coordinator_proto_init() { } } } - file_chromadb_proto_coordinator_proto_msgTypes[12].OneofWrappers = []any{} - file_chromadb_proto_coordinator_proto_msgTypes[14].OneofWrappers = []any{ + file_chromadb_proto_coordinator_proto_msgTypes[12].OneofWrappers = []interface{}{} + file_chromadb_proto_coordinator_proto_msgTypes[14].OneofWrappers = []interface{}{ (*UpdateSegmentRequest_Collection)(nil), (*UpdateSegmentRequest_ResetCollection)(nil), (*UpdateSegmentRequest_Metadata)(nil), (*UpdateSegmentRequest_ResetMetadata)(nil), } - file_chromadb_proto_coordinator_proto_msgTypes[16].OneofWrappers = []any{} - file_chromadb_proto_coordinator_proto_msgTypes[20].OneofWrappers = []any{} - file_chromadb_proto_coordinator_proto_msgTypes[22].OneofWrappers = []any{ + file_chromadb_proto_coordinator_proto_msgTypes[16].OneofWrappers = []interface{}{} + file_chromadb_proto_coordinator_proto_msgTypes[20].OneofWrappers = []interface{}{} + file_chromadb_proto_coordinator_proto_msgTypes[22].OneofWrappers = []interface{}{ (*UpdateCollectionRequest_Metadata)(nil), (*UpdateCollectionRequest_ResetMetadata)(nil), } diff --git a/go/pkg/proto/coordinatorpb/coordinator_grpc.pb.go b/go/pkg/proto/coordinatorpb/coordinator_grpc.pb.go index 0a5088ae91e..1306dbc1793 100644 --- a/go/pkg/proto/coordinatorpb/coordinator_grpc.pb.go +++ b/go/pkg/proto/coordinatorpb/coordinator_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 -// - protoc v5.26.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 // source: chromadb/proto/coordinator.proto package coordinatorpb @@ -16,8 +16,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 const ( SysDB_CreateDatabase_FullMethodName = "/chroma.SysDB/CreateDatabase" @@ -69,9 +69,8 @@ func NewSysDBClient(cc grpc.ClientConnInterface) SysDBClient { } func (c *sysDBClient) CreateDatabase(ctx context.Context, in *CreateDatabaseRequest, opts ...grpc.CallOption) (*CreateDatabaseResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(CreateDatabaseResponse) - err := c.cc.Invoke(ctx, SysDB_CreateDatabase_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_CreateDatabase_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -79,9 +78,8 @@ func (c *sysDBClient) CreateDatabase(ctx context.Context, in *CreateDatabaseRequ } func (c *sysDBClient) GetDatabase(ctx context.Context, in *GetDatabaseRequest, opts ...grpc.CallOption) (*GetDatabaseResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GetDatabaseResponse) - err := c.cc.Invoke(ctx, SysDB_GetDatabase_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_GetDatabase_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -89,9 +87,8 @@ func (c *sysDBClient) GetDatabase(ctx context.Context, in *GetDatabaseRequest, o } func (c *sysDBClient) CreateTenant(ctx context.Context, in *CreateTenantRequest, opts ...grpc.CallOption) (*CreateTenantResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(CreateTenantResponse) - err := c.cc.Invoke(ctx, SysDB_CreateTenant_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_CreateTenant_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -99,9 +96,8 @@ func (c *sysDBClient) CreateTenant(ctx context.Context, in *CreateTenantRequest, } func (c *sysDBClient) GetTenant(ctx context.Context, in *GetTenantRequest, opts ...grpc.CallOption) (*GetTenantResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GetTenantResponse) - err := c.cc.Invoke(ctx, SysDB_GetTenant_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_GetTenant_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -109,9 +105,8 @@ func (c *sysDBClient) GetTenant(ctx context.Context, in *GetTenantRequest, opts } func (c *sysDBClient) CreateSegment(ctx context.Context, in *CreateSegmentRequest, opts ...grpc.CallOption) (*CreateSegmentResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(CreateSegmentResponse) - err := c.cc.Invoke(ctx, SysDB_CreateSegment_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_CreateSegment_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -119,9 +114,8 @@ func (c *sysDBClient) CreateSegment(ctx context.Context, in *CreateSegmentReques } func (c *sysDBClient) DeleteSegment(ctx context.Context, in *DeleteSegmentRequest, opts ...grpc.CallOption) (*DeleteSegmentResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(DeleteSegmentResponse) - err := c.cc.Invoke(ctx, SysDB_DeleteSegment_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_DeleteSegment_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -129,9 +123,8 @@ func (c *sysDBClient) DeleteSegment(ctx context.Context, in *DeleteSegmentReques } func (c *sysDBClient) GetSegments(ctx context.Context, in *GetSegmentsRequest, opts ...grpc.CallOption) (*GetSegmentsResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GetSegmentsResponse) - err := c.cc.Invoke(ctx, SysDB_GetSegments_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_GetSegments_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -139,9 +132,8 @@ func (c *sysDBClient) GetSegments(ctx context.Context, in *GetSegmentsRequest, o } func (c *sysDBClient) UpdateSegment(ctx context.Context, in *UpdateSegmentRequest, opts ...grpc.CallOption) (*UpdateSegmentResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(UpdateSegmentResponse) - err := c.cc.Invoke(ctx, SysDB_UpdateSegment_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_UpdateSegment_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -149,9 +141,8 @@ func (c *sysDBClient) UpdateSegment(ctx context.Context, in *UpdateSegmentReques } func (c *sysDBClient) CreateCollection(ctx context.Context, in *CreateCollectionRequest, opts ...grpc.CallOption) (*CreateCollectionResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(CreateCollectionResponse) - err := c.cc.Invoke(ctx, SysDB_CreateCollection_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_CreateCollection_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -159,9 +150,8 @@ func (c *sysDBClient) CreateCollection(ctx context.Context, in *CreateCollection } func (c *sysDBClient) DeleteCollection(ctx context.Context, in *DeleteCollectionRequest, opts ...grpc.CallOption) (*DeleteCollectionResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(DeleteCollectionResponse) - err := c.cc.Invoke(ctx, SysDB_DeleteCollection_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_DeleteCollection_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -169,9 +159,8 @@ func (c *sysDBClient) DeleteCollection(ctx context.Context, in *DeleteCollection } func (c *sysDBClient) GetCollections(ctx context.Context, in *GetCollectionsRequest, opts ...grpc.CallOption) (*GetCollectionsResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GetCollectionsResponse) - err := c.cc.Invoke(ctx, SysDB_GetCollections_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_GetCollections_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -179,9 +168,8 @@ func (c *sysDBClient) GetCollections(ctx context.Context, in *GetCollectionsRequ } func (c *sysDBClient) UpdateCollection(ctx context.Context, in *UpdateCollectionRequest, opts ...grpc.CallOption) (*UpdateCollectionResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(UpdateCollectionResponse) - err := c.cc.Invoke(ctx, SysDB_UpdateCollection_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_UpdateCollection_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -189,9 +177,8 @@ func (c *sysDBClient) UpdateCollection(ctx context.Context, in *UpdateCollection } func (c *sysDBClient) ResetState(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ResetStateResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(ResetStateResponse) - err := c.cc.Invoke(ctx, SysDB_ResetState_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_ResetState_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -199,9 +186,8 @@ func (c *sysDBClient) ResetState(ctx context.Context, in *emptypb.Empty, opts .. } func (c *sysDBClient) GetLastCompactionTimeForTenant(ctx context.Context, in *GetLastCompactionTimeForTenantRequest, opts ...grpc.CallOption) (*GetLastCompactionTimeForTenantResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GetLastCompactionTimeForTenantResponse) - err := c.cc.Invoke(ctx, SysDB_GetLastCompactionTimeForTenant_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_GetLastCompactionTimeForTenant_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -209,9 +195,8 @@ func (c *sysDBClient) GetLastCompactionTimeForTenant(ctx context.Context, in *Ge } func (c *sysDBClient) SetLastCompactionTimeForTenant(ctx context.Context, in *SetLastCompactionTimeForTenantRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, SysDB_SetLastCompactionTimeForTenant_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_SetLastCompactionTimeForTenant_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -219,9 +204,8 @@ func (c *sysDBClient) SetLastCompactionTimeForTenant(ctx context.Context, in *Se } func (c *sysDBClient) FlushCollectionCompaction(ctx context.Context, in *FlushCollectionCompactionRequest, opts ...grpc.CallOption) (*FlushCollectionCompactionResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(FlushCollectionCompactionResponse) - err := c.cc.Invoke(ctx, SysDB_FlushCollectionCompaction_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, SysDB_FlushCollectionCompaction_FullMethodName, in, out, opts...) if err != nil { return nil, err } diff --git a/go/pkg/proto/logservicepb/logservice.pb.go b/go/pkg/proto/logservicepb/logservice.pb.go index 82fad7e3df3..c5600dac447 100644 --- a/go/pkg/proto/logservicepb/logservice.pb.go +++ b/go/pkg/proto/logservicepb/logservice.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2 -// protoc v5.26.1 +// protoc-gen-go v1.33.0 +// protoc v4.23.4 // source: chromadb/proto/logservice.proto package logservicepb @@ -366,6 +366,10 @@ type GetAllCollectionInfoToCompactRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + // The minimum number of log entries that a collection should have before it should + // be returned for compaction + MinCompactionSize uint64 `protobuf:"varint,1,opt,name=min_compaction_size,json=minCompactionSize,proto3" json:"min_compaction_size,omitempty"` } func (x *GetAllCollectionInfoToCompactRequest) Reset() { @@ -400,6 +404,13 @@ func (*GetAllCollectionInfoToCompactRequest) Descriptor() ([]byte, []int) { return file_chromadb_proto_logservice_proto_rawDescGZIP(), []int{6} } +func (x *GetAllCollectionInfoToCompactRequest) GetMinCompactionSize() uint64 { + if x != nil { + return x.MinCompactionSize + } + return 0 +} + type GetAllCollectionInfoToCompactResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -586,9 +597,12 @@ var file_chromadb_proto_logservice_proto_rawDesc = []byte{ 0x03, 0x52, 0x0e, 0x66, 0x69, 0x72, 0x73, 0x74, 0x4c, 0x6f, 0x67, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x20, 0x0a, 0x0c, 0x66, 0x69, 0x72, 0x73, 0x74, 0x5f, 0x6c, 0x6f, 0x67, 0x5f, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x66, 0x69, 0x72, 0x73, 0x74, 0x4c, 0x6f, - 0x67, 0x54, 0x73, 0x22, 0x26, 0x0a, 0x24, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, + 0x67, 0x54, 0x73, 0x22, 0x56, 0x0a, 0x24, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, - 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x6f, 0x0a, 0x25, 0x47, + 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x13, 0x6d, + 0x69, 0x6e, 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x69, + 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x11, 0x6d, 0x69, 0x6e, 0x43, 0x6f, 0x6d, + 0x70, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x69, 0x7a, 0x65, 0x22, 0x6f, 0x0a, 0x25, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x6e, 0x66, 0x6f, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x13, 0x61, 0x6c, 0x6c, 0x5f, 0x63, 0x6f, 0x6c, 0x6c, @@ -649,7 +663,7 @@ func file_chromadb_proto_logservice_proto_rawDescGZIP() []byte { } var file_chromadb_proto_logservice_proto_msgTypes = make([]protoimpl.MessageInfo, 10) -var file_chromadb_proto_logservice_proto_goTypes = []any{ +var file_chromadb_proto_logservice_proto_goTypes = []interface{}{ (*PushLogsRequest)(nil), // 0: chroma.PushLogsRequest (*PushLogsResponse)(nil), // 1: chroma.PushLogsResponse (*PullLogsRequest)(nil), // 2: chroma.PullLogsRequest @@ -688,7 +702,7 @@ func file_chromadb_proto_logservice_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_chromadb_proto_logservice_proto_msgTypes[0].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PushLogsRequest); i { case 0: return &v.state @@ -700,7 +714,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[1].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PushLogsResponse); i { case 0: return &v.state @@ -712,7 +726,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[2].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PullLogsRequest); i { case 0: return &v.state @@ -724,7 +738,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[3].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*LogRecord); i { case 0: return &v.state @@ -736,7 +750,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[4].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PullLogsResponse); i { case 0: return &v.state @@ -748,7 +762,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[5].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*CollectionInfo); i { case 0: return &v.state @@ -760,7 +774,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[6].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetAllCollectionInfoToCompactRequest); i { case 0: return &v.state @@ -772,7 +786,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[7].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*GetAllCollectionInfoToCompactResponse); i { case 0: return &v.state @@ -784,7 +798,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[8].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UpdateCollectionLogOffsetRequest); i { case 0: return &v.state @@ -796,7 +810,7 @@ func file_chromadb_proto_logservice_proto_init() { return nil } } - file_chromadb_proto_logservice_proto_msgTypes[9].Exporter = func(v any, i int) any { + file_chromadb_proto_logservice_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*UpdateCollectionLogOffsetResponse); i { case 0: return &v.state diff --git a/go/pkg/proto/logservicepb/logservice_grpc.pb.go b/go/pkg/proto/logservicepb/logservice_grpc.pb.go index ad525bfe495..57d0ab8a42f 100644 --- a/go/pkg/proto/logservicepb/logservice_grpc.pb.go +++ b/go/pkg/proto/logservicepb/logservice_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.4.0 -// - protoc v5.26.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 // source: chromadb/proto/logservice.proto package logservicepb @@ -15,8 +15,8 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.62.0 or later. -const _ = grpc.SupportPackageIsVersion8 +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 const ( LogService_PushLogs_FullMethodName = "/chroma.LogService/PushLogs" @@ -44,9 +44,8 @@ func NewLogServiceClient(cc grpc.ClientConnInterface) LogServiceClient { } func (c *logServiceClient) PushLogs(ctx context.Context, in *PushLogsRequest, opts ...grpc.CallOption) (*PushLogsResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(PushLogsResponse) - err := c.cc.Invoke(ctx, LogService_PushLogs_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, LogService_PushLogs_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -54,9 +53,8 @@ func (c *logServiceClient) PushLogs(ctx context.Context, in *PushLogsRequest, op } func (c *logServiceClient) PullLogs(ctx context.Context, in *PullLogsRequest, opts ...grpc.CallOption) (*PullLogsResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(PullLogsResponse) - err := c.cc.Invoke(ctx, LogService_PullLogs_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, LogService_PullLogs_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -64,9 +62,8 @@ func (c *logServiceClient) PullLogs(ctx context.Context, in *PullLogsRequest, op } func (c *logServiceClient) GetAllCollectionInfoToCompact(ctx context.Context, in *GetAllCollectionInfoToCompactRequest, opts ...grpc.CallOption) (*GetAllCollectionInfoToCompactResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(GetAllCollectionInfoToCompactResponse) - err := c.cc.Invoke(ctx, LogService_GetAllCollectionInfoToCompact_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, LogService_GetAllCollectionInfoToCompact_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -74,9 +71,8 @@ func (c *logServiceClient) GetAllCollectionInfoToCompact(ctx context.Context, in } func (c *logServiceClient) UpdateCollectionLogOffset(ctx context.Context, in *UpdateCollectionLogOffsetRequest, opts ...grpc.CallOption) (*UpdateCollectionLogOffsetResponse, error) { - cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(UpdateCollectionLogOffsetResponse) - err := c.cc.Invoke(ctx, LogService_UpdateCollectionLogOffset_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, LogService_UpdateCollectionLogOffset_FullMethodName, in, out, opts...) if err != nil { return nil, err } diff --git a/idl/chromadb/proto/logservice.proto b/idl/chromadb/proto/logservice.proto index b038e7ef245..be01b786c30 100644 --- a/idl/chromadb/proto/logservice.proto +++ b/idl/chromadb/proto/logservice.proto @@ -40,7 +40,9 @@ message CollectionInfo { } message GetAllCollectionInfoToCompactRequest { - // Empty + // The minimum number of log entries that a collection should have before it should + // be returned for compaction + uint64 min_compaction_size = 1; } message GetAllCollectionInfoToCompactResponse { diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index 373016fa832..d5318a940d1 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -78,3 +78,4 @@ compaction_service: compaction_manager_queue_size: 1000 max_concurrent_jobs: 100 compaction_interval_sec: 60 + min_compaction_size: 10 diff --git a/rust/worker/src/compactor/compaction_manager.rs b/rust/worker/src/compactor/compaction_manager.rs index 8329eb40a48..869c5ac563e 100644 --- a/rust/worker/src/compactor/compaction_manager.rs +++ b/rust/worker/src/compactor/compaction_manager.rs @@ -46,6 +46,7 @@ pub(crate) struct CompactionManager { // Config compaction_manager_queue_size: usize, compaction_interval: Duration, + min_compaction_size: usize, } #[derive(Error, Debug)] @@ -72,6 +73,7 @@ impl CompactionManager { hnsw_index_provider: HnswIndexProvider, compaction_manager_queue_size: usize, compaction_interval: Duration, + min_compaction_size: usize, ) -> Self { CompactionManager { system: None, @@ -84,6 +86,7 @@ impl CompactionManager { dispatcher: None, compaction_manager_queue_size, compaction_interval, + min_compaction_size, } } @@ -199,6 +202,7 @@ impl Configurable for CompactionManager { let compaction_interval_sec = config.compactor.compaction_interval_sec; let max_concurrent_jobs = config.compactor.max_concurrent_jobs; let compaction_manager_queue_size = config.compactor.compaction_manager_queue_size; + let min_compaction_size = config.compactor.min_compaction_size; let assignment_policy_config = &config.assignment_policy; let assignment_policy = match crate::assignment::from_config(assignment_policy_config).await @@ -214,6 +218,7 @@ impl Configurable for CompactionManager { sysdb.clone(), policy, max_concurrent_jobs, + min_compaction_size, assignment_policy, ); @@ -230,6 +235,7 @@ impl Configurable for CompactionManager { HnswIndexProvider::new(storage.clone(), path), compaction_manager_queue_size, Duration::from_secs(compaction_interval_sec), + min_compaction_size, )) } } @@ -468,6 +474,7 @@ mod tests { let compaction_manager_queue_size = 1000; let max_concurrent_jobs = 10; let compaction_interval = Duration::from_secs(1); + let min_compaction_size = 0; // Set assignment policy let mut assignment_policy = Box::new(RendezvousHashingAssignmentPolicy::new()); @@ -479,6 +486,7 @@ mod tests { sysdb.clone(), Box::new(LasCompactionTimeSchedulerPolicy {}), max_concurrent_jobs, + min_compaction_size, assignment_policy, ); // Set memberlist @@ -493,6 +501,7 @@ mod tests { HnswIndexProvider::new(storage, PathBuf::from(tmpdir.path().to_str().unwrap())), compaction_manager_queue_size, compaction_interval, + min_compaction_size, ); let system = System::new(); diff --git a/rust/worker/src/compactor/config.rs b/rust/worker/src/compactor/config.rs index 729a2e2d1da..f4bf610f2f4 100644 --- a/rust/worker/src/compactor/config.rs +++ b/rust/worker/src/compactor/config.rs @@ -5,4 +5,5 @@ pub(crate) struct CompactorConfig { pub(crate) compaction_manager_queue_size: usize, pub(crate) max_concurrent_jobs: usize, pub(crate) compaction_interval_sec: u64, + pub(crate) min_compaction_size: usize, } diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 4d039ada373..24bf4c88c97 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -15,6 +15,7 @@ pub(crate) struct Scheduler { policy: Box, job_queue: Vec, max_concurrent_jobs: usize, + min_compaction_size: usize, memberlist: Option, assignment_policy: Box, } @@ -26,12 +27,14 @@ impl Scheduler { sysdb: Box, policy: Box, max_concurrent_jobs: usize, + min_compaction_size: usize, assignment_policy: Box, ) -> Scheduler { Scheduler { my_ip, log, sysdb, + min_compaction_size, policy, job_queue: Vec::with_capacity(max_concurrent_jobs), max_concurrent_jobs, @@ -41,7 +44,10 @@ impl Scheduler { } async fn get_collections_with_new_data(&mut self) -> Vec { - let collections = self.log.get_collections_with_new_data().await; + let collections = self + .log + .get_collections_with_new_data(self.min_compaction_size as u64) + .await; // TODO: filter collecitons based on memberlist let collections = match collections { Ok(collections) => collections, @@ -299,6 +305,7 @@ mod tests { sysdb.clone(), scheduler_policy, max_concurrent_jobs, + 1, assignment_policy, ); // Scheduler does nothing without memberlist @@ -474,6 +481,7 @@ mod tests { sysdb.clone(), scheduler_policy, max_concurrent_jobs, + 1, assignment_policy, ); diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 48483e11417..e636b1f23af 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -228,6 +228,7 @@ mod tests { compaction_manager_queue_size: 1000 max_concurrent_jobs: 100 compaction_interval_sec: 60 + min_compaction_size: 10 "#, ); let config = RootConfig::load(); @@ -324,6 +325,7 @@ mod tests { compaction_manager_queue_size: 1000 max_concurrent_jobs: 100 compaction_interval_sec: 60 + min_compaction_size: 10 "#, ); let config = RootConfig::load_from_path("random_path.yaml"); @@ -438,6 +440,7 @@ mod tests { compaction_manager_queue_size: 1000 max_concurrent_jobs: 100 compaction_interval_sec: 60 + min_compaction_size: 10 "#, ); let config = RootConfig::load(); @@ -538,6 +541,7 @@ mod tests { compaction_manager_queue_size: 1000 max_concurrent_jobs: 100 compaction_interval_sec: 60 + min_compaction_size: 10 "#, ); let config = RootConfig::load(); diff --git a/rust/worker/src/log/log.rs b/rust/worker/src/log/log.rs index 0d7797f8e37..039b2643315 100644 --- a/rust/worker/src/log/log.rs +++ b/rust/worker/src/log/log.rs @@ -68,10 +68,11 @@ impl Log { pub(crate) async fn get_collections_with_new_data( &mut self, + min_compaction_size: u64, ) -> Result, GetCollectionsWithNewDataError> { match self { - Log::Grpc(log) => log.get_collections_with_new_data().await, - Log::InMemory(log) => log.get_collections_with_new_data().await, + Log::Grpc(log) => log.get_collections_with_new_data(min_compaction_size).await, + Log::InMemory(log) => log.get_collections_with_new_data(min_compaction_size).await, } } @@ -217,11 +218,14 @@ impl GrpcLog { async fn get_collections_with_new_data( &mut self, + min_compaction_size: u64, ) -> Result, GetCollectionsWithNewDataError> { let response = self .client .get_all_collection_info_to_compact( - chroma_proto::GetAllCollectionInfoToCompactRequest {}, + chroma_proto::GetAllCollectionInfoToCompactRequest { + min_compaction_size: min_compaction_size, + }, ) .await; @@ -397,6 +401,7 @@ impl InMemoryLog { async fn get_collections_with_new_data( &mut self, + min_compaction_size: u64, ) -> Result, GetCollectionsWithNewDataError> { let mut collections = Vec::new(); for (collection_id, log_records) in self.collection_to_log.iter() { @@ -414,6 +419,11 @@ impl InMemoryLog { } None => &log_records[..], }; + + if (filtered_records.len() as u64) < min_compaction_size { + continue; + } + let mut logs = filtered_records.to_vec(); logs.sort_by(|a, b| a.log_offset.cmp(&b.log_offset)); collections.push(CollectionInfo {