Skip to content

Commit

Permalink
Merge pull request #1 from spenceralger/segment_in_courier
Browse files Browse the repository at this point in the history
Segmented fetch in courier
  • Loading branch information
grouma committed Dec 16, 2014
2 parents 67df143 + eb3d904 commit 90aed85
Show file tree
Hide file tree
Showing 13 changed files with 622 additions and 625 deletions.
54 changes: 37 additions & 17 deletions src/kibana/components/courier/data_source/_abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,34 +154,38 @@ define(function (require) {

/**
* Fetch just this source ASAP
* @param {Function} cb - callback
*
* ONLY USE IF YOU NEED THE RESULTS OTHERWISE USE .fetchPending()
* TO TRIGGER FETCHING ALL PENDING REQUESTS
*
* @async
*/
SourceAbstract.prototype.fetch = function () {
var self = this;
var req = _.first(self._myPending());
if (!req) {
req = self._createRequest();
pendingRequests.push(req);
}

var req = self._createRequest();
pendingRequests.push(req);

// fetch just the requests for this source
courierFetch.these(self._getType(), pendingRequests.splice(0).filter(function (req) {
if (req.source !== self) {
pendingRequests.push(req);
return false;
}

return true;
}));

self.fetchPending();
return req.defer.promise;
};

/**
* Fetch all pending requests for this source ASAP
* @async
*/
SourceAbstract.prototype.fetchPending = function () {
return courierFetch.these(this._pullMyPending());
};

/**
* Cancel all pending requests for this dataSource
* @return {undefined}
*/
SourceAbstract.prototype.cancelPending = function () {
var pending = _.where(pendingRequests, { source: this});
_.pull.apply(_, [pendingRequests].concat(pending));
this._pullMyPending();
};

/**
Expand All @@ -196,12 +200,28 @@ define(function (require) {
* PRIVATE API
*****/

SourceAbstract.prototype._myPending = function () {
return _.where(pendingRequests, { source: this });
};

SourceAbstract.prototype._pullMyPending = function () {
var self = this;
return pendingRequests.splice(0).filter(function (req) {
if (req.source !== self) {
pendingRequests.push(req);
return false;
}
return true;
});
};

SourceAbstract.prototype._createRequest = function (defer) {
var self = this;

var req = {
source: self,
defer: defer || Promise.defer()
defer: defer || Promise.defer(),
strategy: self._fetchStrategy
};

if (self.history) {
Expand Down
21 changes: 21 additions & 0 deletions src/kibana/components/courier/data_source/search_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ define(function (require) {
var _ = require('lodash');
var errors = require('errors');
var SourceAbstract = Private(require('components/courier/data_source/_abstract'));
var pendingRequests = Private(require('components/courier/_pending_requests'));
var segmentedStrategy = Private(require('components/courier/fetch/strategy/segmented'));

var FetchFailure = errors.FetchFailure;
var RequestFailure = errors.RequestFailure;
Expand Down Expand Up @@ -125,6 +127,25 @@ define(function (require) {
return normal;
};

SearchSource.prototype.onBeginSegmentedFetch = function (init) {
var self = this;
return Promise.try(function addRequest() {
var defer = Promise.defer();

var req = self._createRequest(defer);
// add a couple items to this request to identify it as a segmented fetch request
req.segmented = true;
req.strategy = segmentedStrategy;
req.init = init;

pendingRequests.push(req);

// return promises created by the completion handler so that
// errors will bubble properly
return defer.promise.then(addRequest);
});
};

/******
* PRIVATE APIS
******/
Expand Down
151 changes: 151 additions & 0 deletions src/kibana/components/courier/fetch/_fetch_these.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
define(function (require) {
return function FetchTheseProvider(Private, Promise, es, Notifier, sessionId, configFile) {
var _ = require('lodash');
var moment = require('moment');
var errors = require('errors');
var pendingRequests = Private(require('components/courier/_pending_requests'));

var notify = new Notifier({
location: 'Courier Fetch'
});

function eachStrategy(requests, block) {
block = Promise.method(block);
var sets = [];

requests.forEach(function (req) {
var strategy = req.strategy;
var set = _.find(sets, { 0: strategy });
if (set) set[1].push(req);
else sets.push([strategy, [req]]);
});

return Promise.all(sets.map(function (set) {
return (function fetch(requests, strategy) {
return block(requests, strategy)
.then(function checkForIncompleteRequests(result) {
if (_.isFunction(strategy.getIncompleteRequests)) {
var incomplete = strategy.getIncompleteRequests(pendingRequests);
if (incomplete.length) {
return fetch(incomplete, strategy);
}
}
return result;
});
}(set[1], set[0]));
}))
.catch(notify.fatal);
}

function initRequest(req) {
if (req.source.activeFetchCount) {
req.source.activeFetchCount += 1;
} else {
req.source.activeFetchCount = 1;
}

req.moment = moment();
}

function mergeDuplicateRequests(requests) {
// dedupe requests
var index = {};
return requests.splice(0).filter(function (req) {
var iid = req.source._instanceid;
if (!index[iid]) {
// this request is unique so far
index[iid] = req;
// keep the request
return true;
}

// the source was requested at least twice
var uniq = index[iid];
if (uniq._merged) {
// already setup the merged list
uniq._merged.push(req);
} else {
// put all requests into this array and itterate them on response
uniq._merged = [uniq, req];
}
});
}

function reqComplete(req, resp, errorHandler) {
if (resp.timed_out) {
notify.warning(new errors.SearchTimeout());
}

req.complete = true;
req.resp = resp;
req.ms = req.moment.diff() * -1;
req.source.activeFetchCount -= 1;

if (resp.error) {
return errorHandler.handle(req, new errors.FetchFailure(resp));
}

req.strategy.resolveRequest(req, resp);
}

function fetchThese(requests, errorHandler) {
return eachStrategy(requests, function (requests, strategy) {
requests.forEach(initRequest);

var uniq = mergeDuplicateRequests(requests);
var states;

return Promise.map(uniq, function (req) {
return strategy.getSourceStateFromRequest(req);
})
.then(function (_states_) {
states = _states_;

// all requests must have been disabled
if (!states.length) return Promise.resolve(false);

return es[strategy.clientMethod]({
timeout: configFile.shard_timeout,
preference: sessionId,
body: strategy.convertStatesToBody(states)
});
})
.then(strategy.getResponses)
.then(function (responses) {

responses.forEach(function (resp) {
var req = uniq.shift();
var state = states.shift();
if (!req._merged) {
req.state = state;
reqComplete(req, resp, errorHandler);
} else {
req._merged.forEach(function (mergedReq) {
mergedReq.state = state;
var respClone = _.cloneDeep(resp);
reqComplete(mergedReq, respClone, errorHandler);
});
}
});

return responses;
})
.catch(function (err) {

function sendFailure(req) {
req.source.activeFetchCount -= 1;
errorHandler.handle(req, err);
}

uniq.forEach(function (req) {
if (!req._merged) sendFailure(req);
else req._merged.forEach(sendFailure);
});
throw err;
});
});
}

return fetchThese;
};
});
Loading

0 comments on commit 90aed85

Please sign in to comment.