Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes bugs in pyspark @with_columns #730

Merged
merged 5 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions ellipsis.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: 1.3

about:
- "This is a codebase for the open source library called Hamilton. Hamilton helps data scientists and engineers define testable, modular, self-documenting dataflows, that encode lineage and metadata. Runs and scales everywhere python does."
- "Examples under `examples` do not need to be robust, they just need to be illustrative and simple to follow."


pr_review:
auto_review_enabled: true
auto_summarize_pr: false
rules:
- "Code should be DRY (Dont Repeat Yourself)"
- "There should no secrets or credentials in the code"
- "Extremely Complicated Code Needs Comments"
- "Use Descriptive Variable and Constant Names"
- "Don't log sensitive data"
- "Follow the Single Responsibility Principle"
- "Function and Method Naming Should Follow Consistent Patterns"
- "Ignore pull requests and commits with WIP in the title or body description"
- "Please suggest places to add to the sphinx documentation under `docs/` where appropriate for each pull request"
3 changes: 3 additions & 0 deletions examples/LLM_Workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ in various LLM contexts.

For more examples that you can pull off-the-shelf, we invite you to checkout [hub.dagworks.io](https://hub.dagworks.io),
and use the keyword `llm` to filter for LLM related examples.

Note, some examples have alternative implementations, such as PySpark or Ray. This is to demonstrate how
one could take the same code and scale it easily.
35 changes: 29 additions & 6 deletions examples/LLM_Workflows/pdf_summarizer/run_on_spark/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
# PDF Summarizer on Spark

Here we show how you can run the same Hamilton dataflow, that we defined in the backend
folder, on Spark. This is useful if you want to run the same dataflow on a larger dataset,
or have to run it on a cluster. Importantly this means you don't have to rewrite your
code, or have to change where/how you develop!
folder, on Spark (using two approaches). This is useful if you want to run the same dataflow
on a larger dataset, or have to run it on a cluster. Importantly this means you don't have
to rewrite your code, or have to change where/how you develop!

![Summarization dataflow](spark_summarization.png)

# File organization
- `summarization.py` this should be a carbon copy of the one in the backend folder.
- `run.py` this contains the code to create a spark job and run the summarization code.
- `run_with_columns.py` this contains the code to create a spark job and run the summarization code, but
using the `@with_columns` syntax. This is a more ergonomic way to write a full spark job with Hamilton.
- `run.py` this contains the code to create a spark job and run the summarization code as UDFs.

# How this works
# How `run_with_columns.py` works
We take the dataflow defined by `summarization.py` and execute it as a bunch
of row based UDFs on spark. The magic to do this happens in the `@with_columns` decorator.

This approach allows you to put your entire pyspark workflow into Hamilton in an ergonomic way.
You can request any intermediate outputs of summarization.py as columns in the dataframe.
This is great for debugging and understanding your dataflow.

![with_columns](spark_with_columns_summarization.png)

# How `run.py` works
We take the dataflow defined by `summarization.py` and execute it as a bunch
of row based UDFs on spark. The magic to do this happens in the Hamilton PySparkUDFGraphAdapter.

Expand All @@ -20,7 +32,18 @@ that contains a column that maps to the required input. Which in this example
is `pdf_source`. You can request whatever intermediate outputs as columns, which
in this example we do with `["raw_text", "chunked_text", "summarized_text"]`.

## Running the code
![udfs](spark_with_columns_summarization.png)

## Running `run_with_columns.py`

1. Make sure you have the right dependencies installed. You can do this by running
`pip install -r requirements.txt`.
2. You can then run the code with the single PDF that is provided, else add more paths to
PDF files in `spark_pdf_pipeline.py`.
3. Then you can run the code with `python run_with_columns.py`. Be sure to have your OPENAI_API_KEY in the
environment.

## Running `run.py`

1. Make sure you have the right dependencies installed. You can do this by running
`pip install -r requirements.txt`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Spark driver and Hamilton driver code."""

import spark_pdf_pipeline
from pyspark.sql import SparkSession

from hamilton import base, driver, log_setup


def my_spark_job(spark: SparkSession, openai_gpt_model: str, content_type: str, user_query: str):
"""Template for a Spark job that uses Hamilton for their featuring engineering, i.e. any map, operations.
:param spark: the SparkSession
:param openai_gpt_model: the model to use for summarization
:param content_type: the content type of the document to summarize
:param user_query: the user query to use for summarization
"""
dr = (
driver.Builder()
.with_config({"file_type": "pdf"})
.with_modules(spark_pdf_pipeline)
.with_adapter(base.DefaultAdapter())
.build()
)
# create inputs to the UDFs - this needs to be column_name -> spark dataframe.
execute_inputs = {
"spark_session": spark,
"save_path": "summarized_pdf_df.parquet",
"openai_gpt_model": openai_gpt_model,
"content_type": content_type,
"user_query": user_query,
}
output = ["saved_summarized_pdf_df"]
# visualize execution of what is going to be appended
dr.visualize_execution(
output,
"./spark_with_columns_summarization.png",
inputs=execute_inputs,
deduplicate_inputs=True,
)
# tell Hamilton to tell Spark what to do
dict_result = dr.execute(output, inputs=execute_inputs)
return dict_result["saved_summarized_pdf_df"]


if __name__ == "__main__":
import os

openai_api_key = os.environ.get("OPENAI_API_KEY")
log_setup.setup_logging(log_level=log_setup.LOG_LEVELS["INFO"])
# create the SparkSession -- note in real life, you'd adjust the number of executors to control parallelism.
spark = SparkSession.builder.config(
"spark.executorEnv.OPENAI_API_KEY", openai_api_key
).getOrCreate()
spark.sparkContext.setLogLevel("info")
# run the job
_df = my_spark_job(spark, "gpt-3.5-turbo-0613", "Scientific article", "Can you ELI5 the paper?")
# show the dataframe & thus make spark compute something
_df.show()
spark.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pandas as pd
import pyspark.sql as ps
import summarization

from hamilton.plugins.h_spark import with_columns


def pdf_df(spark_session: ps.SparkSession) -> ps.DataFrame:
pandas_df = pd.DataFrame(
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
# TODO: update this to point to a PDF or two.
{"pdf_source": ["CDMS_HAMILTON_PAPER.pdf"]}
)
df = spark_session.createDataFrame(pandas_df)
return df


@with_columns(
summarization,
select=["summarized_chunks", "summarized_text"],
columns_to_pass=["pdf_source"],
config_required=["file_type"],
)
def summarized_pdf_df(pdf_df: ps.DataFrame) -> ps.DataFrame:
return pdf_df
skrawcz marked this conversation as resolved.
Show resolved Hide resolved


def saved_summarized_pdf_df(
summarized_pdf_df: ps.DataFrame, save_path: str, persist_before_save: bool = True
) -> ps.DataFrame:
"""Save the summarized PDF dataframe to a parquet file."""
if persist_before_save:
summarized_pdf_df.persist()
summarized_pdf_df.write.parquet(save_path, mode="overwrite")
return summarized_pdf_df
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
57 changes: 57 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# PySpark version of the scraping and chunking example

Here we show how you can integrate the Hamilton dataflow, that we defined previously
easily into a PySpark job. This is useful if you want to run the same dataflow on a larger dataset,
or have to run it on a cluster.

# Why use PySpark?
PySpark is a great way to scale your data processing. In the case of scraping and chunking it allows
you to parallelize operations. This is useful if you have a large number of documents to process.


# File organization
- `spark_pipeline.py` is the main dataflow for the spark job
- `doc_pipline.py` contains the code to parse and chunk the documents. It has a few adjustments to make
it work with the `@with_columns` integration Hamilton has.

# How to run
If you're on MacOS:
```bash
OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES python spark_pipeline.py
```
Otherwise:
```bash
python spark_pipeline.py
```

# Changes to run on PySpark
Here's why we need to make some minor adjustments to the code.

1. You can only make dataframe columns from spark compatible types. Thus if a function is returning an object,
e.g. the LangChain chunker in this example, it cannot be used as a column. We have to convert it to a string
or a struct, or not include it in the "sub-DAG" that with_columns is creating.
2. The `@with_columns` decorator assumes that intermediate functions can be saved as columns to the dataframe.
That's how it strings together computation. If this is not possible, you have to tell Hamilton to not include
the function in the "sub-DAG" that with_columns is creating.

## Handling langchain's Document type
So instead of returning a LangChain Document object, we return a JSON string for simplicity. We could
create a PySpark Struct type, but that's a bit more involved.

## Handling `@with_columns` restrictions
In `doc_pipline.py` you'll see we define a variable that is a list of the functions we want to include in the
"sub-DAG" that with_columns is creating. This is a quick way to label or specify what transforms,
can be made into columns. `@with_columns` will then reference this list of functions to know what to include.
That way `html_chunker` and `text_chunker` are not included in the "sub-DAG" that `@with_columns` is creating, and
are then run once and bound to the UDFs the Hamilton is creating underneath.

# Caveats to think about in real life
## Partitions & number of executors
PySpark's parallelism is basically controlled by the number of partitions and the number of executors. That is,
the parallelism you get is `min(number of partitions, number of executors)`. The number of partitions relates
to how many independent chunks of data you have. When you load data you can repartition it. The number
of executors relates to how many independent processes you have to do work. It is not exposed here in this example,
but you'd set the number of executors when you submit the job.

So you'll likely need to tune these for your use case so that you don't DoS a resource you're hitting. E.g. especially
important if you extend this workflow to create embeddings.
84 changes: 84 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/spark/doc_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import json
import re

import requests
from langchain import text_splitter

# from langchain_core import documents


def article_regex() -> str:
"""This assumes you're using the furo theme for sphinx"""
return r'<article role="main" id="furo-main-content">(.*?)</article>'


def article_text(url: str, article_regex: str) -> str:
"""Pulls URL and takes out relevant HTML.

:param url: the url to pull.
:param article_regex: the regext to use to get the contents out of.
:return: sub-portion of the HTML
"""
try:
html = requests.get(url)
except requests.exceptions.RequestException:
raise Exception(f"Failed to get URL: {url}")
article = re.findall(article_regex, html.text, re.DOTALL)
if not article:
raise ValueError(f"No article found in {url}")
text = article[0].strip()
return text


def html_chunker() -> text_splitter.HTMLHeaderTextSplitter:
"""Return HTML chunker object.

:return:
"""
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
return text_splitter.HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)


def text_chunker(
chunk_size: int = 256, chunk_overlap: int = 32
) -> text_splitter.RecursiveCharacterTextSplitter:
"""Returns the text chunker object.

:param chunk_size:
:param chunk_overlap:
:return:
"""
return text_splitter.RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)


def chunked_text(
article_text: str,
html_chunker: text_splitter.HTMLHeaderTextSplitter,
text_chunker: text_splitter.RecursiveCharacterTextSplitter,
) -> list[str]:
"""This function takes in HTML, chunks it, and then chunks it again.

It then outputs a list of langchain "documents". Multiple documents for one HTML header section is possible.

:param article_text:
:param html_chunker:
:param text_chunker:
:return: need to return something we can make a pyspark column with
"""
header_splits = html_chunker.split_text(article_text)
splits = text_chunker.split_documents(header_splits)
# TODO: make this a struct field compatible structure
return [json.dumps(s.to_json()) for s in splits]


spark_safe = [
article_regex,
article_text,
chunked_text,
]
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading