Skip to content

Commit

Permalink
Almost finish blob stuff + do some todos in DataQueue (nodejs#20)
Browse files Browse the repository at this point in the history
* Almost finish up blob work

* Finish some todos in DataQueue
  • Loading branch information
flakey5 authored Dec 1, 2022
1 parent 2adc7d5 commit 72df8ff
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 20 deletions.
22 changes: 13 additions & 9 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ using v8::Value;
using v8::String;
using v8::Global;
using v8::Function;
using v8::Int32;
using v8::Uint32;

namespace {
// ============================================================================
Expand Down Expand Up @@ -959,8 +961,8 @@ class StreamEntry final : public EntryBase {
// Creating the callback failed for whatever reason. The error will propagate
// thank to the callback scope, but let's end the reader and fail this read.
ended_ = true;
std::move(next)(bob::STATUS_EOS, nullptr, 0, [](size_t) {});
return bob::STATUS_EOS;
std::move(next)(bob::STATUS_FAILED, nullptr, 0, [](size_t) {});
return bob::STATUS_FAILED;
}

Local<Value> argv[] = { callback };
Expand All @@ -969,10 +971,9 @@ class StreamEntry final : public EntryBase {
if (!pull->Call(isolate->GetCurrentContext(), wrap, arraysize(argv), argv).ToLocal(&ret)) {
// The call failed for whatever reason. The error will propagate thanks to the
// callback scope, but let's end the reader and fail this read.
// TODO(@jasnell, @flakey5): Bob streams need a proper error status...
ended_ = true;
std::move(next)(bob::STATUS_EOS, nullptr, 0, [](size_t) {});
return bob::STATUS_EOS;
std::move(next)(bob::STATUS_FAILED, nullptr, 0, [](size_t) {});
return bob::STATUS_FAILED;
}

return bob::STATUS_WAIT;
Expand Down Expand Up @@ -1386,10 +1387,13 @@ class FdEntry final : public EntryBase {
CHECK(args.IsConstructCall());
Environment* env = Environment::GetCurrent(args);

// TODO(dataqueue): Get these from the arguments
int fd = 0;
size_t start = 0;
size_t end = 0;
CHECK(args[0]->IsInt32());
CHECK(args[1]->IsUint32());
CHECK(args[2]->IsUint32());

int fd = args[0].As<Int32>()->Value();
size_t start = args[1].As<Uint32>()->Value();
size_t end = args[1].As<Uint32>()->Value();

new Wrap(env, args.This(), fd, start, Just(end));
}
Expand Down
63 changes: 53 additions & 10 deletions src/node_blob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ bool Blob::HasInstance(Environment* env, v8::Local<v8::Value> object) {
return GetConstructorTemplate(env)->HasInstance(object);
}

BaseObjectPtr<Blob> Blob::Create(Environment* env, std::shared_ptr<DataQueue> data_queue) {
BaseObjectPtr<Blob> Blob::Create(
Environment* env, std::shared_ptr<DataQueue> data_queue) {
HandleScope scope(env->isolate());

Local<Function> ctor;
Expand All @@ -90,9 +91,8 @@ void Blob::New(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsArray()); // sources
CHECK(args[1]->IsUint32()); // length

// TODO(@flakey5): revisit when DataQueue is complete
// TODO(@flakey5): delete
//std::vector<BlobEntry> entries;

//size_t length = args[1].As<Uint32>()->Value();
//size_t len = 0;
//Local<Array> ary = args[0].As<Array>();
Expand All @@ -119,10 +119,32 @@ void Blob::New(const FunctionCallbackInfo<Value>& args) {
//}
//CHECK_EQ(length, len);

// TODO(@flakey5): get dataqueue from js
std::shared_ptr<DataQueue> data_queue = nullptr;
Local<Array> array = args[0].As<Array>();
size_t length = args[1].As<Uint32>()->Value();
std::vector<std::unique_ptr<DataQueue::Entry>> entries(length);

for (size_t i = 0; i < array->Length(); i++) {
Local<Value> entry;
if (!array->Get(env->context(), i).ToLocal(&entry)) {
return;
}

// TODO(@flakey5): check for different entry types
CHECK(entry->IsArrayBufferView() || Blob::HasInstance(env, entry));
if (entry->IsArrayBufferView()) {
Local<ArrayBufferView> view = entry.As<ArrayBufferView>();
CHECK_EQ(view->ByteOffset(), 0);

BaseObjectPtr<Blob> blob = Create(env, data_queue);
entries[i] = DataQueue::CreateInMemoryEntryFromView(view);
} else {
Blob* blob;
ASSIGN_OR_RETURN_UNWRAP(&blob, entry);

entries[i] = DataQueue::CreateDataQueueEntry(blob->data_queue_);
}
}

BaseObjectPtr<Blob> blob = Create(env, DataQueue::CreateIdempotent(entries));
if (blob)
args.GetReturnValue().Set(blob->object());
}
Expand Down Expand Up @@ -150,16 +172,37 @@ void Blob::ToSlice(const FunctionCallbackInfo<Value>& args) {
}

void Blob::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackFieldWithSize("store", length());
tracker->TrackField("data_queue_", data_queue_);
}

MaybeLocal<Value> Blob::GetArrayBuffer(Environment* env) {
// TODO(@flakey5): figure out how this'll work
EscapableHandleScope scope(env->isolate());
size_t len = length();
std::shared_ptr<BackingStore> store =
ArrayBuffer::NewBackingStore(env->isolate(), len);

if (len > 0) {
std::unique_ptr<DataQueue::Reader> reader = this->data_queue_->getReader();
DataQueue::Vec vec;
size_t offset = 0;
while (reader->Pull(
[&store, &offset](int,
const DataQueue::Vec* vec,
size_t count,
bob::Done done) {
for (size_t i = 0; i < count; i++) {
memcpy(
static_cast<uintptr_t*>(store->Data()) + offset,
vec[i].base,
vec[i].len);
offset += vec[i].len;
}
},
bob::Options::OPTIONS_NONE,
&vec,
len) != bob::Status::STATUS_EOS);
}

/*if (len > 0) {
unsigned char* dest = static_cast<unsigned char*>(store->Data());
size_t total = 0;
Expand Down Expand Up @@ -281,7 +324,7 @@ void Blob::GetDataObject(const v8::FunctionCallbackInfo<v8::Value>& args) {
}
}

// TODO(@flakey5): revisit when DataQueue is complete
// TODO(@flakey5): delete
//FixedSizeBlobCopyJob::FixedSizeBlobCopyJob(
// Environment* env,
// Local<Object> object,
Expand Down Expand Up @@ -471,7 +514,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Blob::GetDataObject);
registry->Register(Blob::RevokeDataObject);

FixedSizeBlobCopyJob::RegisterExternalReferences(registry);
//FixedSizeBlobCopyJob::RegisterExternalReferences(registry);
}

} // namespace node
Expand Down
2 changes: 1 addition & 1 deletion src/node_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class Blob : public BaseObject {
std::shared_ptr<DataQueue> data_queue_;
};

// TODO(@flakey5): revisit when DataQueue is complete
// TODO(@flakey5): delete
//class FixedSizeBlobCopyJob : public AsyncWrap, public ThreadPoolWork {
// public:
// enum class Mode {
Expand Down
4 changes: 4 additions & 0 deletions src/node_bob.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ constexpr size_t kMaxCountHint = 16;

// Negative status codes indicate error conditions.
enum Status : int {
// Indicates that there was an error while pulling.
// Should be treated similar to STATUS_EOS
STATUS_FAILED = -2,

// Indicates that an attempt was made to pull after end.
STATUS_EOS = -1,

Expand Down

0 comments on commit 72df8ff

Please sign in to comment.