Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cq_cache decorator #17

Merged
merged 27 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions plugins/cq_cache/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# cq_cache

This plugin provides a decorator function that allows you to add a file based cache to your functions to allow you to speed up the execution of your computationally heavy cadquery functions.

## Installation

To install this plugin, the following line should be used.

```
pip install -e "git+https:/CadQuery/cadquery-plugins.git#egg=cq_cache&subdirectory=plugins/cq_cache"
```
You can also clone the repository of the plugin and run in the repository the following command :
```
python setup.py install
```

## Dependencies

This plugin has no dependencies other than the cadquery library.

## Usage

To use this plugin after it has been installed, just import it and use the decorator on your functions

```python
# decorate your functions that build computationally heavy shapes
from cq_cache import cq_cache, clear_cq_cache

@cq_cache(cache_size=1)
def make_cube(a,b,c):
cube = cq.Workplane().box(a,b,c)
return cube

for i in range(200):
make_cube(1,1,1+i)

clear_cq_cache()
# >>> Cache cleared for 1.036 MB
```

## Speed gain example
```python
import cadquery as cq
import time, functools
from cq_cache import cq_cache
from itertools import cycle

@cq_cache()
def lofting(nb_sec):
wires = []
radius = cycle([2,5,8,6,10])
for i in range(nb_sec):
if i%2==0:
Y = 1
else:
Y = 0
wires.append(cq.Wire.makeCircle(next(radius),cq.Vector(0,0,5*i),cq.Vector(0,Y,1)))
loft = cq.Solid.makeLoft(wires)
return loft

lofting(500)

# First script run :
# >>> 4500 ms
# Second script run :
# >>> 20 ms
```
Empty file added plugins/cq_cache/__init__.py
Empty file.
202 changes: 202 additions & 0 deletions plugins/cq_cache/cq_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import cadquery as cq
import cadquery
from cadquery import exporters, importers
from functools import wraps
import tempfile
import os
import inspect
import base64
from OCP.BRepTools import BRepTools
from OCP.BRep import BRep_Builder
from OCP.TopoDS import TopoDS_Shape


TEMPDIR_PATH = tempfile.gettempdir()
CACHE_DIR_NAME = "cadquery_geom_cache"
CACHE_DIR_PATH = os.path.join(TEMPDIR_PATH, CACHE_DIR_NAME)
CQ_TYPES = [
cq.Shape,
cq.Solid,
cq.Shell,
cq.Compound,
cq.Face,
cq.Wire,
cq.Edge,
cq.Vertex,
TopoDS_Shape,
cq.Workplane,
]

if CACHE_DIR_NAME not in os.listdir(TEMPDIR_PATH):
os.mkdir(CACHE_DIR_PATH)


def importBrep(file_path):
"""
Import a boundary representation model
Returns a TopoDS_Shape object
"""
builder = BRep_Builder()
shape = TopoDS_Shape()
return_code = BRepTools.Read_s(shape, file_path, builder)
if return_code is False:
raise ValueError("Import failed, check file name")
return shape


def get_cache_dir_size(cache_dir_path):
"""
Returns size of the specified directory in bytes
"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(cache_dir_path):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return total_size


def delete_oldest_file(cache_dir_path):
"""
When the cache directory size exceed the limit, this function is called
deleting the oldest file of the cache
"""
cwd = os.getcwd()
os.chdir(cache_dir_path)
files = sorted(os.listdir(os.getcwd()), key=os.path.getmtime)
oldest = files[0]
os.remove(os.path.join(cache_dir_path, oldest))
os.chdir(cwd)


def build_file_name(fct, *args, **kwargs):
"""
Returns a file name given the specified function and args.
If the function and the args are the same this function returns the same filename
"""
SPACER = "_"
file_name = fct.__name__
for arg in args:
if isinstance(arg, cq.Workplane):
raise TypeError(
"Can not cache a function that accepts Workplane objects as argument"
)
file_name += SPACER + str(hash(arg))
for kwarg_value in kwargs.values():
if isinstance(kwarg_value, cq.Workplane):
raise TypeError(
"Can not cache a function that accepts Workplane objects as argument"
)
file_name += SPACER + str(hash(kwarg_value))
file_name = bytes(file_name, "utf-8")
return base64.urlsafe_b64encode(file_name).decode(
"utf-8"
) # compacts the long string of hash ints into a urlsafe string


def clear_cq_cache():
"""
Removes all the files from the cq cache
"""
cache_size = get_cache_dir_size(CACHE_DIR_PATH)
for cache_file in os.listdir(CACHE_DIR_PATH):
os.remove(os.path.join(CACHE_DIR_PATH, cache_file))
print(f"Cache cleared for {round(cache_size*1e-6,3)} MB ")


def using_same_function(fct, file_name):
"""
Checks if this exact function call has been cached.
Take care of the eventuality where the user cache a function but
modify the body of the function afterwards.
It assure that if the function has been modify, the cache won't load a wrong cached file
"""
with open(file_name, "r") as f:
cached_function = "".join(f.readlines()[:-1])

caching_function = inspect.getsource(fct)
if cached_function == caching_function:
return True
else:
return False


def return_right_wrapper(source, target_file):
"""
Cast the TopoDS_Shape object loaded by importBrep as the right type that the original function is returning
"""

with open(target_file, "r") as tf:
target = tf.readlines()[-1]
target = (
target.replace("class ", "").lstrip("<'").rstrip("'>")
) # eval cannot evaluate this "<class 'cadquery.cq.Workplane'>"" but this "cadquery.cq.Workplane" is ok
target = eval(target) # by the checking above forbids malicious excecution

for cq_type in CQ_TYPES:
if target == cq_type:
if cq_type == cq.Workplane:
shape = cq.Shape(source)
shape = cq.Workplane(obj=shape)
else:
shape = cq_type(source)
return shape


def cq_cache(cache_size=500):
"""
cache_size : Maximum cache memory in MB

This function save the model created by the cached function as a BREP file and
loads it if the cached function is called several time with the same arguments.

Note that it is primarly made for caching function with simple types as argument.
Classes passed as argument without __hash__ function or with __hash__ function that returns
a similar value for different object will fail but might not raise an error, keep that in mind.

"""

def _cq_cache(function):
@wraps(function)
def wrapper(*args, **kwargs):
file_name = build_file_name(function, *args, **kwargs)
file_path = os.path.join(CACHE_DIR_PATH, file_name)

if file_name in os.listdir(CACHE_DIR_PATH) and using_same_function(
function, file_path
): # check that a change in function passed doesn't load up an old BREP file.
shape = importBrep(
os.path.join(CACHE_DIR_PATH, file_name + ".brep")
) # If implemented in cadquery, could switch to the cadquery version of importBrep
return return_right_wrapper(shape, file_path)

else:
shape = function(*args, **kwargs)
shape_type = type(shape)
if shape_type not in CQ_TYPES:
raise TypeError(f"cq_cache cannot wrap {shape_type} objects")
try:
shape_export = (
shape.val()
) # if shape is a workplane retrive only the shape object
except AttributeError:
shape_export = shape

shape_export.exportBrep(
os.path.join(CACHE_DIR_PATH, file_name) + ".brep"
)

with open(os.path.join(CACHE_DIR_PATH, file_name), "w") as fun_file:
fun_file.write(inspect.getsource(function))
fun_file.write(str(shape_type))

cache_dir_size = get_cache_dir_size(CACHE_DIR_PATH)
while (cache_dir_size * 1e-6) > cache_size:
delete_oldest_file(CACHE_DIR_PATH)
cache_dir_size = get_cache_dir_size(CACHE_DIR_PATH)

return shape

return wrapper

return _cq_cache
45 changes: 45 additions & 0 deletions plugins/cq_cache/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from setuptools import setup, find_packages

version = "1.0.0" # Please update this version number when updating the plugin
plugin_name = "cq_cache"
description = "File based cache decorator"
long_description = "Allow to use file based cache to not have to rebuild every cadquery model from scratch"
author = "Romain FERRU"
author_email = "[email protected]"
install_requires = (
[]
) # Any dependencies that pip also needs to install to make this plugin work


setup(
name=plugin_name,
version=version,
url="https:/CadQuery/cadquery-plugins",
license="Apache Public License 2.0",
author=author,
author_email=author_email,
description=description,
long_description=long_description,
packages=find_packages(where="cq_cache"),
install_requires=install_requires,
include_package_data=True,
zip_safe=False,
platforms="any",
test_suite="tests",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Intended Audience :: End Users/Desktop",
"Intended Audience :: Information Technology",
"Intended Audience :: Science/Research",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: Apache Software License",
"Operating System :: POSIX",
"Operating System :: MacOS",
"Operating System :: Unix",
"Programming Language :: Python",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Internet",
"Topic :: Scientific/Engineering",
],
)
63 changes: 63 additions & 0 deletions tests/test_cq_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import cadquery as cq
import cadquery
from plugins.cq_cache.cq_cache import cq_cache, clear_cq_cache, get_cache_dir_size
import tempfile
import os

TEMPDIR_PATH = tempfile.gettempdir()
CACHE_DIR_NAME = "cadquery_geom_cache"
CACHE_DIR_PATH = os.path.join(TEMPDIR_PATH,CACHE_DIR_NAME)
CACHE_SIZE = 0.1
for f in os.listdir(CACHE_DIR_PATH):
os.remove(os.path.join(CACHE_DIR_PATH,f))

@cq_cache(CACHE_SIZE)
def cube(a,b,c):
cube = cq.Workplane().box(a,b,c)
return cube.val()

def test_get_cache_dir_size():
with open(os.path.join(CACHE_DIR_PATH,"fill.txt"), "w") as f:
f.write("test")
assert get_cache_dir_size(CACHE_DIR_PATH) == 4

def test_clear_cache():
with open(os.path.join(CACHE_DIR_PATH,"fill.txt"), "w") as f:
f.write("test")
assert len(os.listdir(CACHE_DIR_PATH)) == 1
clear_cq_cache()
assert len(os.listdir(CACHE_DIR_PATH)) == 0

def test_cache_file_creation():
clear_cq_cache()
cube1 = cube(1,1,1)
cube2 = cube(1,1,1)
files = os.listdir(CACHE_DIR_PATH)
assert len(files) == 2
assert "Y3ViZV8xXzFfMQ==.brep" in files
assert "Y3ViZV8xXzFfMQ==" in files

def test_not_exceeding_size():
clear_cq_cache()
for i in range(20):
cube(1,1,1+i)
assert get_cache_dir_size(CACHE_DIR_PATH) < CACHE_SIZE*1e6

def test_cache_type_return():
cube1 = cube(1,1,1) #at first call get the type directly from function call
cube2 = cube(1,1,1) #at second call the decorator retrives the right type with some logic that may fail
assert isinstance(cube1,cadquery.occ_impl.shapes.Solid)
assert isinstance(cube2,cadquery.occ_impl.shapes.Solid)

def test_cache_type_return_with_modified_function():

@cq_cache(CACHE_SIZE)
def cube(a,b,c):
cube = cq.Workplane().box(a,b,c)
return cube

cube1 = cube(1,1,1) #at first call get the type directly from function call
cube2 = cube(1,1,1) #at second call the decorator retrives the right type with some logic that may fail
assert isinstance(cube1,cq.Workplane)
assert isinstance(cube2,cq.Workplane)