Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: make applier use ReadTx() in Txn() instead of ConcurrentReadTx() #12896

Merged
merged 3 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions server/etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra
resp.Header = &pb.ResponseHeader{}

if txn == nil {
txn = a.s.kv.Read(trace)
txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txn.End()
}

Expand Down Expand Up @@ -434,7 +434,15 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
}
isWrite := !isTxnReadonly(rt)
txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace))

// When the transaction contains write operations, we use ReadTx instead of
// ConcurrentReadTx to avoid extra overhead of copying buffer.
var txn mvcc.TxnWrite
if isWrite {
wilsonwang371 marked this conversation as resolved.
Show resolved Hide resolved
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
} else {
txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
}

var txnPath []bool
trace.StepWithFunction(
Expand Down
11 changes: 10 additions & 1 deletion server/mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,21 @@ func (trw *txnReadWrite) Changes() []mvccpb.KeyValue { return nil }

func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite { return &txnReadWrite{txn} }

type ReadTxMode uint32

const (
// Use ConcurrentReadTx and the txReadBuffer is copied
ConcurrentReadTxMode = ReadTxMode(1)
// Use backend ReadTx and txReadBuffer is not copied
SharedBufReadTxMode = ReadTxMode(2)
)

type KV interface {
ReadView
WriteView

// Read creates a read transaction.
Read(trace *traceutil.Trace) TxnRead
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead

// Write creates a write transaction.
Write(trace *traceutil.Trace) TxnWrite
Expand Down
2 changes: 1 addition & 1 deletion server/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var (
return kv.Range(context.TODO(), key, end, ro)
}
txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
txn := kv.Read(traceutil.TODO())
txn := kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer txn.End()
return txn.Range(context.TODO(), key, end, ro)
}
Expand Down
6 changes: 3 additions & 3 deletions server/mvcc/kv_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import (
type readView struct{ kv KV }

func (rv *readView) FirstRev() int64 {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.FirstRev()
}

func (rv *readView) Rev() int64 {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.Rev()
}

func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
tr := rv.kv.Read(traceutil.TODO())
tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
defer tr.End()
return tr.Range(ctx, key, end, ro)
}
Expand Down
6 changes: 3 additions & 3 deletions server/mvcc/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

// readTx simulates a long read request
readTx1 := s.Read(traceutil.TODO())
readTx1 := s.Read(ConcurrentReadTxMode, traceutil.TODO())

// write should not be blocked by reads
done := make(chan struct{}, 1)
Expand All @@ -673,7 +673,7 @@ func TestConcurrentReadNotBlockingWrite(t *testing.T) {
}

// readTx2 simulates a short read request
readTx2 := s.Read(traceutil.TODO())
readTx2 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro)
if err != nil {
Expand Down Expand Up @@ -756,7 +756,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) {
mu.Lock()
wKVs := make(kvs, len(committedKVs))
copy(wKVs, committedKVs)
tx := s.Read(traceutil.TODO())
tx := s.Read(ConcurrentReadTxMode, traceutil.TODO())
mu.Unlock()
// get all keys in backend store, and compare with wKVs
ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
Expand Down
10 changes: 8 additions & 2 deletions server/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,18 @@ type storeTxnRead struct {
trace *traceutil.Trace
}

func (s *store) Read(trace *traceutil.Trace) TxnRead {
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
var tx backend.ReadTx
s.mu.RLock()
s.revMu.RLock()
// backend holds b.readTx.RLock() only when creating the concurrentReadTx. After
// ConcurrentReadTx is created, it will not block write transaction.
tx := s.b.ConcurrentReadTx()
if mode == ConcurrentReadTxMode {
wilsonwang371 marked this conversation as resolved.
Show resolved Hide resolved
tx = s.b.ConcurrentReadTx()
} else {
tx = s.b.ReadTx()
}

tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
Expand Down