Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds example on how to chunk Hamilton documentation #721

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Scraping and Chunking
Scraping and chunking are an important part of any RAG dataflow.

Here we show you how you can scale the scraping and chunking dataflow to run in parallel
locally, as well as with Ray, Dask, and even PySpark.

We'll use creating chunks of text from the Hamilton documentation as an example.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
151 changes: 151 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""
Things this module does.

1. takes in a sitemap.xml file and creates a list of all the URLs in the file.
2. takes in a list of URLs and pulls the HTML from each URL.
3. it then strips the HTML to the relevant body of HTML.
html/body/div[class="page"]/div[class="main"]/div[class="content"]/div[class="article-container"]/article
4. it then chunks the HTML into smaller pieces -- returning langchain documents
5. what this doesn't do is create embeddings -- but that would be easy to extend.
"""

import re

import requests
from langchain import text_splitter
from langchain_core import documents

from hamilton.htypes import Collect, Parallelizable


def sitemap_text(sitemap_url: str = "https://hamilton.dagworks.io/en/latest/sitemap.xml") -> str:
"""Takes in a sitemap URL and returns the sitemap.xml file.

:param sitemap_url: the URL of sitemap.xml file
:return:
"""
sitemap = requests.get(sitemap_url)
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
return sitemap.text


def urls_from_sitemap(sitemap_text: str) -> list[str]:
"""Takes in a sitemap.xml file contents and creates a list of all the URLs in the file.

:param sitemap_text: the contents of a sitemap.xml file
:return: list of URLs
"""
urls = re.findall(r"<loc>(.*?)</loc>", sitemap_text)
return urls


def url(urls_from_sitemap: list[str], max_urls: int = 1000) -> Parallelizable[str]:
"""
Takes in a list of URLs for parallel processing.
"""
for url in urls_from_sitemap[0:max_urls]:
yield url


def article_regex() -> str:
"""This assumes you're using the furo theme for sphinx"""
return r'<article role="main" id="furo-main-content">(.*?)</article>'


def article_text(url: str, article_regex: str) -> str:
"""Pulls URL and takes out relevant HTML.

:param url: the url to pull.
:param article_regex: the regext to use to get the contents out of.
:return: sub-portion of the HTML
"""
html = requests.get(url)
article = re.findall(article_regex, html.text, re.DOTALL)
if not article:
raise ValueError(f"No article found in {url}")
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
text = article[0].strip()
return text


def html_chunker() -> text_splitter.HTMLHeaderTextSplitter:
"""Return HTML chunker object.

:return:
"""
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
return text_splitter.HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)


def text_chunker(
chunk_size: int = 256, chunk_overlap: int = 32
) -> text_splitter.RecursiveCharacterTextSplitter:
"""Returns the text chunker object.

:param chunk_size:
:param chunk_overlap:
:return:
"""
return text_splitter.RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)


def chunked_text(
article_text: str,
html_chunker: text_splitter.HTMLHeaderTextSplitter,
text_chunker: text_splitter.RecursiveCharacterTextSplitter,
) -> list[documents.Document]:
"""This function takes in HTML, chunks it, and then chunks it again.

It then outputs a list of langchain "documents". Multiple documents for one HTML header section is possible.

:param article_text:
:param html_chunker:
:param text_chunker:
:return:
"""
header_splits = html_chunker.split_text(article_text)
splits = text_chunker.split_documents(header_splits)
return splits


def url_result(url: str, article_text: str, chunked_text: list[documents.Document]) -> dict:
"""Function to aggregate what we want to return from parallel processing.

:param url:
:param article_text:
:param chunked_text:
:return:
"""
return {"url": url, "article_text": article_text, "chunks": chunked_text}


def collect_chunked_url_text(url_result: Collect[dict]) -> list:
"""Function to collect the results from parallel processing."""
return list(url_result)


if __name__ == "__main__":
import pipeline

from hamilton import driver
from hamilton.execution import executors

dr = (
driver.Builder()
.with_modules(pipeline)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({})
.with_local_executor(executors.SynchronousLocalTaskExecutor())
.with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
.build()
)
dr.display_all_functions("pipeline.png")
result = dr.execute(["collect_chunked_url_text"])
import pprint

for chunk in result["collect_chunked_url_text"]:
pprint.pprint(type(chunk["chunks"][0]))
3 changes: 3 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
langchain
langchain-core
sf-hamilton[visualization]
29 changes: 29 additions & 0 deletions examples/LLM_Workflows/scraping_and_chunking/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
A basic script to run the pipeline defined by Hamilton.
"""

import pipeline

from hamilton import driver
from hamilton.execution import executors

dr = (
driver.Builder()
.with_modules(pipeline)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({})
.with_local_executor(executors.SynchronousLocalTaskExecutor())
# could be Ray or Dask
.with_remote_executor(executors.MultiProcessingExecutor(max_tasks=5))
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
.build()
)
dr.display_all_functions("pipeline.png")
result = dr.execute(
["collect_chunked_url_text"],
inputs={"chunk_size": 256, "chunk_overlap": 32},
)
# do something with the result...
# import pprint
#
# for chunk in result["collect_chunked_url_text"]:
# pprint.pprint(chunk)