Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending empty batch to OpenAI returns 400 #4156

Open
pascalwhoop opened this issue Jul 26, 2024 · 0 comments
Open

Sending empty batch to OpenAI returns 400 #4156

pascalwhoop opened this issue Jul 26, 2024 · 0 comments

Comments

@pascalwhoop
Copy link

pascalwhoop commented Jul 26, 2024

Expected Behavior (Mandatory)

We're asking the DB to add embeddings to all nodes that don't have p.embeddings yet. That way, we don't recompute embeddings. However if they all have embeddings, the API still gets called once with an empty batch. That feels undesired since the API then returns a 400. Now we did bend the Neo4J setup a bit to actually explicitly fail when there's errors (before it would just fail sometimes and continue which meant we got partial embeddings) but IMO the way this DB should work is

  • you tell it to create embeddings
  • if any batch fails, it raises an error which the user can handle as they want
  • if the user query gives 0 nodes to embed, the DB doesn't actually call the 3rd party API but simply returns 0 results

Actual Behavior (Mandatory)

  • The DB sends a [] batch to OAI which returns a 400 and we catch those as failed batches and thus fail our process

How to Reproduce the Problem

    create_function("iterate", {"name": "apoc.periodic.iterate"}, func_raw=True)
    create_function("openai_embedding", {"name": "apoc.ml.openai.embedding"}, func_raw=True)
    create_function("set_property", {"name": "apoc.create.setProperty"}, func_raw=True)

    # Build query
    p = Pypher()

    # Due to f-string limitations
    empty = '\"\"'

    # The apoc iterate is a rather interesting function, that takes stringified
    # cypher queries as input. The first determines the subset of nodes on
    # include, whereas the second query defines the operation to execute.
    # https://neo4j.com/labs/apoc/4.1/overview/apoc.periodic/apoc.periodic.iterate/
    p.CALL.iterate(
        # Match every :Entity node in the graph
        cypher.stringify(cypher.MATCH.node("p", labels="Entity").WHERE.p.property("embedding").IS_NULL.RETURN.p),
        # For each batch, execute following statements, the $_batch is a special
        # variable made accessible to access the elements in the batch.
        cypher.stringify(
            [
                # Apply OpenAI embedding in a batched manner, embedding
                # is applied on the concatenation of supplied features for each node.
                cypher.CALL.openai_embedding(f"[item in $_batch | {'+'.join(f'coalesce(item.p.{item}, {empty})' for item in features)}]", "$apiKey", "{endpoint: $endpoint, model: $model}").YIELD("index", "text", "embedding"),
                # Set the attribute property of the node to the embedding
                cypher.CALL.set_property("$_batch[index].p", "$attribute", "embedding").YIELD("node").RETURN("node"),
            ]
        ),
        # The last argument bridges the variables used in the outer query
        # and the variables referenced in the stringified params.
        cypher.map(
            batchMode="BATCH_SINGLE",
            # FUTURE when this is fixed: https:/neo4j-contrib/neo4j-apoc-procedures/issues/4153 we should be able to max out
            # our capacity towards the service provider
            parallel="false",
            # parallel="false",
            batchSize=batch_size,
            concurrency=concurrency,
            params=cypher.map(apiKey=api_key, endpoint=endpoint, attribute=attribute, model=model),
        ),
    ).YIELD("batch", "operations").UNWIND("batch").AS("b").WITH("b").WHERE("b.failed > 0").RETURN("b.failed")
    # fmt: on

    failed = []
    with gdb.driver() as driver:
        failed = driver.execute_query(str(p), database_=gdb._database, **p.bound_params)

    if len(failed.records) > 0:
        raise RuntimeError("Failed batches in the embedding step")

    return {"success": "true"}

Screenshots (where it's possibile)

Specifications (Mandatory)

Currently used versions

Versions

  • OS: latest docker image
  • Neo4j: 5.21.0
  • Neo4j-Apoc: 5.21.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

No branches or pull requests

2 participants