Skip to content

Commit

Permalink
Merge pull request globalsign#2 from eaglerayp/feature/bulkUpdateError
Browse files Browse the repository at this point in the history
Feature/bulk update error
  • Loading branch information
domodwyer authored Mar 29, 2017
2 parents 3e2c238 + b873cea commit 3e4b3fe
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
22 changes: 22 additions & 0 deletions bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,28 @@ func (s *S) TestBulkUpdate(c *C) {
c.Assert(res, DeepEquals, []doc{{10}, {20}, {30}})
}

func (s *S) TestBulkUpdateOver1000(c *C) {
session, err := mgo.Dial("localhost:40001")
c.Assert(err, IsNil)
defer session.Close()

coll := session.DB("mydb").C("mycoll")

bulk := coll.Bulk()
for i := 0; i < 1010; i++ {
bulk.Insert(M{"n": i})
}
_, err = bulk.Run()
c.Assert(err, IsNil)
bulk = coll.Bulk()
for i := 0; i < 1010; i++ {
bulk.Update(M{"n": i}, M{"$set": M{"m": i}})
}
// if not handle well, mongo will return error here
_, err = bulk.Run()
c.Assert(err, IsNil)
}

func (s *S) TestBulkUpdateError(c *C) {
session, err := mgo.Dial("localhost:40001")
c.Assert(err, IsNil)
Expand Down
52 changes: 52 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4766,6 +4766,58 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
}
return &lerr, nil
}
if updateOp, ok := op.(bulkUpdateOp); ok && len(updateOp) > 1000 {
var lerr LastError

// Maximum batch size is 1000. Must split out in separate operations for compatibility.
for i := 0; i < len(updateOp); i += 1000 {
l := i + 1000
if l > len(updateOp) {
l = len(updateOp)
}

oplerr, err := c.writeOpCommand(socket, safeOp, updateOp[i:l], ordered, bypassValidation)

lerr.N += oplerr.N
lerr.modified += oplerr.modified
if err != nil {
lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
if ordered {
break
}
}
}
if len(lerr.ecases) != 0 {
return &lerr, lerr.ecases[0].Err
}
return &lerr, nil
}
if deleteOps, ok := op.(bulkDeleteOp); ok && len(deleteOps) > 1000 {
var lerr LastError

// Maximum batch size is 1000. Must split out in separate operations for compatibility.
for i := 0; i < len(deleteOps); i += 1000 {
l := i + 1000
if l > len(deleteOps) {
l = len(deleteOps)
}

oplerr, err := c.writeOpCommand(socket, safeOp, deleteOps[i:l], ordered, bypassValidation)

lerr.N += oplerr.N
lerr.modified += oplerr.modified
if err != nil {
lerr.ecases = append(lerr.ecases, BulkErrorCase{i, err})
if ordered {
break
}
}
}
if len(lerr.ecases) != 0 {
return &lerr, lerr.ecases[0].Err
}
return &lerr, nil
}
return c.writeOpCommand(socket, safeOp, op, ordered, bypassValidation)
} else if updateOps, ok := op.(bulkUpdateOp); ok {
var lerr LastError
Expand Down

0 comments on commit 3e4b3fe

Please sign in to comment.