Skip to content

Commit

Permalink
Merge pull request #1 from ipfs/mirror-writer-awesome
Browse files Browse the repository at this point in the history
better implementation of mirrorwriter
  • Loading branch information
jbenet committed Nov 3, 2015
2 parents 23ce949 + 90a77d2 commit 24c9b60
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 39 deletions.
2 changes: 1 addition & 1 deletion option.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

// Global writer group for logs to output to
var WriterGroup = new(MirrorWriter)
var WriterGroup = NewMirrorWriter()

type Option func()

Expand Down
250 changes: 212 additions & 38 deletions writer.go
Original file line number Diff line number Diff line change
@@ -1,66 +1,240 @@
package log

import (
"fmt"
"io"
"sync"
"time"
)

var MaxWriterBuffer = 512 * 1024

var log = Logger("eventlog")

type MirrorWriter struct {
writers []io.WriteCloser
lk sync.Mutex
active bool
activelk sync.Mutex

// channel for incoming writers
writerAdd chan io.WriteCloser

// slices of writer/sync-channel pairs
writers []*bufWriter

// synchronization channel for incoming writes
msgSync chan []byte
}

type writerSync struct {
w io.WriteCloser
br chan []byte
}

func NewMirrorWriter() *MirrorWriter {
mw := &MirrorWriter{
msgSync: make(chan []byte, 64), // sufficiently large buffer to avoid callers waiting
writerAdd: make(chan io.WriteCloser),
}

go mw.logRoutine()

return mw
}

func (mw *MirrorWriter) Write(b []byte) (int, error) {
mw.lk.Lock()
// write to all writers, and nil out the broken ones.
var dropped bool
done := make(chan error, 1)
for i, w := range mw.writers {
go func(out chan error) {
_, err := w.Write(b)
out <- err
}(done)
mycopy := make([]byte, len(b))
copy(mycopy, b)
mw.msgSync <- mycopy
return len(b), nil
}

func (mw *MirrorWriter) Close() error {
// it is up to the caller to ensure that write is not called during or
// after close is called.
close(mw.msgSync)
return nil
}

func (mw *MirrorWriter) doClose() {
for _, w := range mw.writers {
w.writer.Close()
}
}

func (mw *MirrorWriter) logRoutine() {
// rebind to avoid races on nilling out struct fields
msgSync := mw.msgSync
writerAdd := mw.writerAdd

defer mw.doClose()

for {
select {
case err := <-done:
if err != nil {
mw.writers[i].Close()
mw.writers[i] = nil
dropped = true
case b, ok := <-msgSync:
if !ok {
return
}
case <-time.After(time.Millisecond * 500):
mw.writers[i].Close()

// write to all writers
dropped := mw.broadcastMessage(b)

// consolidate the slice
if dropped {
mw.clearDeadWriters()
}
case w := <-writerAdd:
mw.writers = append(mw.writers, newBufWriter(w))

mw.activelk.Lock()
mw.active = true
mw.activelk.Unlock()
}
}
}

// broadcastMessage sends the given message to every writer
// if any writer is killed during the send, 'true' is returned
func (mw *MirrorWriter) broadcastMessage(b []byte) bool {
var dropped bool
for i, w := range mw.writers {
_, err := w.Write(b)
if err != nil {
mw.writers[i] = nil
dropped = true

// clear channel out
done = make(chan error, 1)
}
}
return dropped
}

// consolidate the slice
if dropped {
writers := mw.writers
mw.writers = nil
for _, w := range writers {
if w != nil {
mw.writers = append(mw.writers, w)
}
func (mw *MirrorWriter) clearDeadWriters() {
writers := mw.writers
mw.writers = nil
for _, w := range writers {
if w != nil {
mw.writers = append(mw.writers, w)
}
}
mw.lk.Unlock()
return len(b), nil
if len(mw.writers) == 0 {
mw.activelk.Lock()
mw.active = false
mw.activelk.Unlock()
}
}

func (mw *MirrorWriter) AddWriter(w io.WriteCloser) {
mw.lk.Lock()
mw.writers = append(mw.writers, w)
mw.lk.Unlock()
mw.writerAdd <- w
}

func (mw *MirrorWriter) Active() (active bool) {
mw.lk.Lock()
active = len(mw.writers) > 0
mw.lk.Unlock()
mw.activelk.Lock()
active = mw.active
mw.activelk.Unlock()
return
}

func newBufWriter(w io.WriteCloser) *bufWriter {
bw := &bufWriter{
writer: w,
incoming: make(chan []byte, 1),
}

go bw.loop()
return bw
}

type bufWriter struct {
writer io.WriteCloser

incoming chan []byte

deathLock sync.Mutex
dead bool
}

var errDeadWriter = fmt.Errorf("writer is dead")

func (bw *bufWriter) Write(b []byte) (int, error) {
bw.deathLock.Lock()
dead := bw.dead
bw.deathLock.Unlock()
if dead {
if bw.incoming != nil {
close(bw.incoming)
bw.incoming = nil
}
return 0, errDeadWriter
}

bw.incoming <- b
return len(b), nil
}

func (bw *bufWriter) die() {
bw.deathLock.Lock()
bw.dead = true
bw.writer.Close()
bw.deathLock.Unlock()
}

func (bw *bufWriter) loop() {
bufsize := 0
bufBase := make([][]byte, 0, 16) // some initial memory
buffered := bufBase
nextCh := make(chan []byte)

var nextMsg []byte

go func() {
for b := range nextCh {
_, err := bw.writer.Write(b)
if err != nil {
log.Info("eventlog write error: %s", err)
bw.die()
return
}
}
}()

// collect and buffer messages
incoming := bw.incoming
for {
if nextMsg == nil || nextCh == nil {
// nextCh == nil implies we are 'dead' and draining the incoming channel
// until the caller notices and closes it for us
select {
case b, ok := <-incoming:
if !ok {
return
}
nextMsg = b
}
}

select {
case b, ok := <-incoming:
if !ok {
return
}
bufsize += len(b)
buffered = append(buffered, b)
if bufsize > MaxWriterBuffer {
// if we have too many messages buffered, kill the writer
bw.die()
close(nextCh)
nextCh = nil
// explicity keep going here to drain incoming
}
case nextCh <- nextMsg:
nextMsg = nil
if len(buffered) > 0 {
nextMsg = buffered[0]
buffered = buffered[1:]
bufsize -= len(nextMsg)
}

if len(buffered) == 0 {
// reset slice position
buffered = bufBase[:0]
}
}
}
}
Loading

0 comments on commit 24c9b60

Please sign in to comment.