Skip to content

Commit

Permalink
add DataFrame filter and count ops
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Hulette committed Jan 12, 2018
1 parent 30f0330 commit 796f45d
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 16 deletions.
23 changes: 22 additions & 1 deletion js/perf/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ for (let {name, buffers, tests} of require('./table_config')) {
const dfIteratorCountSuite = new Benchmark.Suite(`DataFrame Iterator Count "${name}"`, { async: true });
const dfDirectCountSuite = new Benchmark.Suite(`DataFrame Direct Count "${name}"`, { async: true });
const dfScanCountSuite = new Benchmark.Suite(`DataFrame Scan Count "${name}"`, { async: true });
const dfFilterCountSuite = new Benchmark.Suite(`DataFrame Filter Scan Count "${name}"`, { async: true });
const vectorCountSuite = new Benchmark.Suite(`Vector Count "${name}"`, { async: true });
const table = Table.from(buffers);

Expand All @@ -58,10 +59,11 @@ for (let {name, buffers, tests} of require('./table_config')) {
dfIteratorCountSuite.add(createDataFrameIteratorCountTest(table, test.col, test.test, test.value))
dfDirectCountSuite.add(createDataFrameDirectCountTest(table, test.col, test.test, test.value))
dfScanCountSuite.add(createDataFrameScanCountTest(table, test.col, test.test, test.value))
dfFilterCountSuite.add(createDataFrameFilterCountTest(table, test.col, test.test, test.value))
vectorCountSuite.add(createVectorCountTest(table.columns[test.col], test.test, test.value))
}

suites.push(tableIteratorSuite, tableCountSuite, dfIteratorSuite, dfIteratorCountSuite, dfDirectCountSuite, dfScanCountSuite, vectorCountSuite)
suites.push(tableIteratorSuite, tableCountSuite, dfIteratorSuite, dfIteratorCountSuite, dfDirectCountSuite, dfScanCountSuite, dfFilterCountSuite, vectorCountSuite)
}

console.log('Running apache-arrow performance tests...\n');
Expand Down Expand Up @@ -275,6 +277,25 @@ function createDataFrameScanCountTest(table, column, test, value) {
};
}

function createDataFrameFilterCountTest(table, column, test, value) {
let df = DataFrame.from(table);
if (test == 'gteq') {
df = df.filter((idx, cols)=>cols[column].get(idx) >= value);
} else if (test == 'eq') {
df = df.filter((idx, cols)=>cols[column].get(idx) == value);
} else {
throw new Error(`Unrecognized test "${test}"`);
}

return {
async: true,
name: `name: '${table.columns[column].name}', length: ${table.length}, type: ${table.columns[column].type}, test: ${test}, value: ${value}`,
fn() {
df.count();
}
};
}

function createDataFrameIteratorCountTest(table, column, test, value) {
let df = DataFrame.from(table);

Expand Down
98 changes: 83 additions & 15 deletions js/src/dataframe/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ import { Vector } from "../vector/vector";
import { StructVector } from "../vector/struct";
import { VirtualVector } from "../vector/virtual";

export type NextFunc = (idx: number, cols: Vector[]) => void;
export type PredicateFunc = (idx: number, cols: Vector[]) => boolean;

export abstract class DataFrame {
constructor(readonly lengths: Uint32Array) {}
public abstract columns: Vector<any>[];
public abstract getBatch(batch: number): Vector[];
public abstract scan(next: (idx: number, cols: Vector[])=>void): void;
public abstract scan(next: NextFunc): void;
public filter(predicate: PredicateFunc): DataFrame {
return new FilteredDataFrame(this, predicate);
}

static from(table: Vector<any>): DataFrame {
// There are two types of Vectors we might want to make into
// a ChunkedDataFrame:
Expand All @@ -31,23 +39,26 @@ export abstract class DataFrame {
return new SimpleDataFrame([table]);
}
}

count(): number {
return this.lengths.reduce((acc, val) => acc + val);
}
}

class SimpleDataFrame extends DataFrame {
readonly lengths: Uint32Array;
constructor(public columns: Vector<any>[]) {
super();
super(new Uint32Array([0, columns[0].length]));
if (!this.columns.slice(1).every((v) => v.length === this.columns[0].length)) {
throw new Error("Attempted to create a DataFrame with un-aligned vectors");
}
this.lengths = new Uint32Array([0, this.columns[0].length]);
}

public getBatch() {
return this.columns;
}

public scan(next: (idx: number, cols: Vector[])=>void) {
public scan(next: NextFunc) {
for (let idx = -1; ++idx < this.lengths[1];) {
next(idx, this.columns)
}
Expand All @@ -62,24 +73,16 @@ class SimpleDataFrame extends DataFrame {

class ChunkedDataFrame extends DataFrame {
public columns: Vector<any>[];
readonly lengths: Uint32Array;
constructor(private virtuals: VirtualVector<any>[]) {
super();
const offsets = virtuals[0].offsets;
if (!this.virtuals.slice(1).every((v) => v.aligned(virtuals[0]))) {
throw new Error("Attempted to create a DataFrame with un-aligned vectors");
}
this.lengths = new Uint32Array(offsets.length);
offsets.forEach((offset, i) => {
this.lengths[i] = offsets[i+1] - offset;;
});
super(ChunkedDataFrame.getLengths(virtuals));
this.virtuals = virtuals;
}

getBatch(batch: number): Vector[] {
return this.virtuals.map((virt) => virt.vectors[batch]);
}

scan(next: (idx: number, cols: Vector[])=>void) {
scan(next: NextFunc) {
for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];

Expand All @@ -106,4 +109,69 @@ class ChunkedDataFrame extends DataFrame {
}
}
}

private static getLengths(virtuals: VirtualVector<any>[]): Uint32Array {
if (!virtuals.slice(1).every((v) => v.aligned(virtuals[0]))) {
throw new Error("Attempted to create a DataFrame with un-aligned vectors");
}
return new Uint32Array(virtuals[0].vectors.map((v)=>v.length));
}
}

class FilteredDataFrame extends DataFrame {
public columns: Vector<any>[];
constructor (readonly parent: DataFrame, private predicate: PredicateFunc) {
super(parent.lengths);
}

getBatch(batch: number): Vector[] {
return this.parent.getBatch(batch);
};

scan(next: NextFunc) {
// inlined version of this:
// this.parent.scan((idx, columns) => {
// if (this.predicate(idx, columns)) next(idx, columns);
// });
for (let batch = -1; ++batch < this.parent.lengths.length;) {
const length = this.parent.lengths[batch];

// load batches
const columns = this.parent.getBatch(batch);

// yield all indices
for (let idx = -1; ++idx < length;) {
if (this.predicate(idx, columns)) next(idx, columns);
}
}
}

count(): number {
// inlined version of this:
// let sum = 0;
// this.parent.scan((idx, columns) => {
// if (this.predicate(idx, columns)) ++sum;
// });
// return sum;
let sum = 0;
for (let batch = -1; ++batch < this.parent.lengths.length;) {
const length = this.parent.lengths[batch];

// load batches
const columns = this.parent.getBatch(batch);

// yield all indices
for (let idx = -1; ++idx < length;) {
if (this.predicate(idx, columns)) ++sum;
}
}
return sum;
}

filter(predicate: PredicateFunc): DataFrame {
return new FilteredDataFrame(
this.parent,
(idx, cols) => this.predicate(idx, cols) && predicate(idx, cols)
);
}
}

0 comments on commit 796f45d

Please sign in to comment.