Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query-service] implement load_references_from_dataset #9914

Merged
merged 2 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion hail/python/hail/backend/service_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ def get_reference(self, name):
return self.socket.request('references/get', name=name)

def load_references_from_dataset(self, path):
raise NotImplementedError("ServiceBackend does not support 'load_references_from_dataset'")
return self.socket.request('load_references_from_dataset',
path=path,
billing_project=self._billing_project,
bucket=self._bucket)

def add_sequence(self, name, fasta_file, index_file):
raise NotImplementedError("ServiceBackend does not support 'add_sequence'")
Expand Down
21 changes: 19 additions & 2 deletions hail/src/main/scala/is/hail/backend/service/ServiceBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ class ServiceTaskContext(val partitionId: Int) extends HailTaskContext {

object Worker {
def main(args: Array[String]): Unit = {
if (args.length != 2)
throw new IllegalArgumentException(s"expected one argument, not: ${ args.length }")
if (args.length != 2) {
throw new IllegalArgumentException(s"expected two arguments, not: ${ args.length }")
}

val root = args(0)
val i = args(1).toInt
Expand Down Expand Up @@ -437,4 +438,20 @@ class ServiceBackend() extends Backend {
def getPersistedBlockMatrix(backendContext: BackendContext, id: String): BlockMatrix = ???

def getPersistedBlockMatrixType(backendContext: BackendContext, id: String): BlockMatrixType = ???

def loadReferencesFromDataset(
username: String,
sessionID: String,
billingProject: String,
bucket: String,
path: String
): Response = {
statusForException {
ExecutionTimer.logTime("ServiceBackend.loadReferencesFromDataset") { timer =>
userContext(username, timer) { ctx =>
ReferenceGenome.fromHailDataset(ctx.fs, path)
}
}
}
}
}
1 change: 1 addition & 0 deletions hail/src/main/scala/is/hail/io/fs/GoogleStorageFS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ class GoogleStorageFS(serviceAccountKey: String) extends FS {

blobs.getValues.iterator.asScala
.map(b => GoogleStorageFileStatus(b))
.filter(fs => !(fs.isDirectory && fs.getPath == path))
.toArray
}

Expand Down
20 changes: 11 additions & 9 deletions hail/src/test/scala/is/hail/fs/FSSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import org.testng.annotations.Test

trait FSSuite {
def root: String

def fsResourcesRoot: String

def fs: FS

def tmpdir: String
Expand All @@ -38,9 +38,9 @@ trait FSSuite {
p.drop(root.length)
}.toSet
}

def pathsRelResourcesRoot(statuses: Array[FileStatus]): Set[String] = pathsRelRoot(fsResourcesRoot, statuses)

@Test def testExists(): Unit = {
assert(fs.exists(r("/a")))

Expand Down Expand Up @@ -140,15 +140,17 @@ trait FSSuite {
val statuses = fs.glob(r("/does_not_exist_dir/does_not_exist"))
assert(pathsRelResourcesRoot(statuses) == Set())
}

@Test def testGlobFilename(): Unit = {
val statuses = fs.glob(r("/a*"))
assert(pathsRelResourcesRoot(statuses) == Set("/a", "/adir", "/az"))
assert(pathsRelResourcesRoot(statuses) == Set("/a", "/adir", "/az"),
s"${statuses} ${pathsRelResourcesRoot(statuses)} ${Set("/a", "/adir", "/az")}")
}

@Test def testGlobMatchDir(): Unit = {
val statuses = fs.glob(r("/*dir/x"))
assert(pathsRelResourcesRoot(statuses) == Set("/adir/x", "/dir/x"))
assert(pathsRelResourcesRoot(statuses) == Set("/adir/x", "/dir/x"),
s"${statuses} ${pathsRelResourcesRoot(statuses)} ${Set("/adir/x", "/dir/x")}")
}

@Test def testGlobRoot(): Unit = {
Expand Down Expand Up @@ -218,8 +220,8 @@ trait FSSuite {

class HadoopFSSuite extends HailSuite with FSSuite {
val root: String = "file:/"

lazy val fsResourcesRoot: String = "file:" + new java.io.File("./src/test/resources/fs").getCanonicalPath

lazy val tmpdir: String = ctx.tmpdir
}
11 changes: 11 additions & 0 deletions query/query/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def blocking_execute(jbackend, userdata, body):
return jbackend.execute(userdata['username'], userdata['session_id'], body['billing_project'], body['bucket'], body['code'])


def blocking_load_references_from_dataset(jbackend, userdata, body):
return jbackend.loadReferencesFromDataset(
userdata['username'], userdata['session_id'], body['billing_project'], body['bucket'], body['path'])


def blocking_value_type(jbackend, userdata, body):
return jbackend.valueType(userdata['username'], body['code'])

Expand Down Expand Up @@ -126,6 +131,12 @@ async def execute(request, userdata):
return await handle_ws_response(request, userdata, 'execute', blocking_execute)


@routes.get('/api/v1alpha/load_references_from_dataset')
@rest_authenticated_users_only
async def load_references_from_dataset(request, userdata):
return await handle_ws_response(request, userdata, 'load_references_from_dataset', blocking_load_references_from_dataset)


@routes.get('/api/v1alpha/type/value')
@rest_authenticated_users_only
async def value_type(request, userdata):
Expand Down