Skip to content

Commit

Permalink
Support new result writer and writing directly to sqlite without json…
Browse files Browse the repository at this point in the history
… intermediary (#66)

* New resultwriter

* SQLite result writer is building but still need to handle resolve DM_getPanel to panel id and querying correct data source

* Building and buffered insert

* Basically working!

* Space difference

* Support for prepared

* Builds

* Encode maps as json in sqlite writer

* Fix for caching

* Rename table format

* Fix mod

* Drop redundant

* Add benchmark script
  • Loading branch information
eatonphil authored Jun 14, 2022
1 parent cfd934b commit 240acd1
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go 1.18
require (
github.com/chzyer/readline v1.5.0
github.com/google/uuid v1.3.0
github.com/multiprocessio/datastation/runner v0.0.0-20220609232347-405d8c1a88b2
github.com/multiprocessio/datastation/runner v0.0.0-20220614050412-e3776fe304bc
github.com/olekukonko/tablewriter v0.0.5
)

Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/multiprocessio/datastation/runner v0.0.0-20220601032709-9bda16b723bb h1:sG23Q6XOfcOtK9bM4QhcmGiqsFVzoXwkZRvb8OJ3EiU=
github.com/multiprocessio/datastation/runner v0.0.0-20220601032709-9bda16b723bb/go.mod h1:UCms/xK08DspNqDDZ5XsaIlc39AuREmeELspFYghMGI=
github.com/multiprocessio/datastation/runner v0.0.0-20220609232347-405d8c1a88b2 h1:WWCPwJPWfBVUhuAfFZJGs6vxemeeqW8ahDRtTtbGyxw=
github.com/multiprocessio/datastation/runner v0.0.0-20220609232347-405d8c1a88b2/go.mod h1:UCms/xK08DspNqDDZ5XsaIlc39AuREmeELspFYghMGI=
github.com/multiprocessio/datastation/runner v0.0.0-20220614050412-e3776fe304bc h1:PZrwLRdb13fjcblFznyzFjTZisKKLl15VTrN5Bp3EFw=
github.com/multiprocessio/datastation/runner v0.0.0-20220614050412-e3776fe304bc/go.mod h1:UCms/xK08DspNqDDZ5XsaIlc39AuREmeELspFYghMGI=
github.com/multiprocessio/go-json v0.0.0-20220308002443-61d497dd7b9e h1:NlPl7amllnQyVAkZgjBvFEkKxJSba/R8ZpaTodc7SIQ=
github.com/multiprocessio/go-json v0.0.0-20220308002443-61d497dd7b9e/go.mod h1:huI4M/MrI5px/SgmXYi0a2byKikSLgDrnMQuXOqKtw4=
github.com/multiprocessio/go-openoffice v0.0.0-20220110232726-064f5dda1956 h1:WVofL03Eq+z3LbDOfH5eKzu2U85LFZZngOMBlNaO/H0=
Expand Down
145 changes: 110 additions & 35 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bufio"
"crypto/sha1"
"encoding/hex"
"encoding/json"
Expand All @@ -11,13 +10,15 @@ import (
"io"
"io/ioutil"
"log"
"math/rand"
"os"
"path"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"

"github.com/chzyer/readline"
"github.com/multiprocessio/datastation/runner"
Expand All @@ -40,7 +41,7 @@ func resolveContentType(fileExtensionOrContentType string) runner.MimeType {
return runner.GetMimeType("x."+fileExtensionOrContentType, runner.ContentTypeInfo{})
}

func evalFileInto(file, mimetype string, convertNumbers bool, out *os.File) error {
func evalFileInto(file, mimetype string, convertNumbers bool, w *runner.ResultWriter) error {
if mimetype == "" {
mimetype = string(runner.GetMimeType(file, runner.ContentTypeInfo{}))
} else {
Expand All @@ -51,23 +52,16 @@ func evalFileInto(file, mimetype string, convertNumbers bool, out *os.File) erro
return fmt.Errorf("Unknown mimetype for file: %s.\n", file)
}

w := bufio.NewWriterSize(out, 1e6)
defer w.Flush()

return runner.TransformFile(file, runner.ContentTypeInfo{
Type: mimetype,
ConvertNumbers: convertNumbers,
}, w)
}

func getShape(resultFile, panelId string) (*runner.Shape, error) {
return runner.ShapeFromFile(resultFile, panelId, runner.DefaultShapeMaxBytesToRead, 100)
}

var tableFileRe = regexp.MustCompile(`({(?P<number>[0-9]+)(((,\s*(?P<numbersinglepath>"(?:[^"\\]|\\.)*\"))?)|(,\s*(?P<numberdoublepath>'(?:[^'\\]|\\.)*\'))?)})|({((((?P<singlepath>"(?:[^"\\]|\\.)*\"))?)|((?P<doublepath>'(?:[^'\\]|\\.)*\'))?)})`)

func rewriteQuery(query string) string {
query = strings.ReplaceAll(query, "{}", "DM_getPanel(0)")
func rewriteQuery(query string, resolveDM_getPanelToId *map[string]string) string {
query = strings.ReplaceAll(query, "{}", "{0}")

query = tableFileRe.ReplaceAllStringFunc(query, func(m string) string {
matchForSubexps := tableFileRe.FindStringSubmatch(m)
Expand All @@ -90,6 +84,10 @@ func rewriteQuery(query string) string {
return fmt.Sprintf("DM_getPanel(%s, %s)", index, path)
}

if resolveDM_getPanelToId != nil {
return "\"" + (*resolveDM_getPanelToId)[index] + "\""
}

return fmt.Sprintf("DM_getPanel(%s)", index)
})

Expand Down Expand Up @@ -214,22 +212,23 @@ func getFilesContentHash(files []string) (string, error) {
return hex.EncodeToString(sha1.Sum(nil)), nil
}

func importFile(projectId string, file, mimetype string, convertNumbers bool, ec runner.EvalContext) (*runner.PanelInfo, error) {
panelId := uuid.New().String()
resultFile := ec.GetPanelResultsFile(projectId, panelId)
out, err := openTruncate(resultFile)
func importFile(projectId, panelId, file, mimetype string, convertNumbers bool, w *runner.ResultWriter, withShape bool) (*runner.PanelInfo, error) {
err := evalFileInto(file, mimetype, convertNumbers, w)
if err != nil {
return nil, err
}
defer out.Close()

if err := evalFileInto(file, mimetype, convertNumbers, out); err != nil {
err = w.Close()
if err != nil {
return nil, err
}

s, err := getShape(resultFile, panelId)
if err != nil {
return nil, err
s := &runner.Shape{}
if withShape {
s, err = w.Shape(panelId, runner.DefaultShapeMaxBytesToRead, 100)
if err != nil {
return nil, err
}
}

return &runner.PanelInfo{
Expand All @@ -241,8 +240,8 @@ func importFile(projectId string, file, mimetype string, convertNumbers bool, ec
}, nil
}

func runQuery(queryRaw string, project *runner.ProjectState, ec *runner.EvalContext, args *args, files []string) error {
query := rewriteQuery(queryRaw)
func runQuery(queryRaw string, project *runner.ProjectState, ec *runner.EvalContext, args *args, files []string, resolveDM_getPanelToId *map[string]string) error {
query := rewriteQuery(queryRaw, resolveDM_getPanelToId)
panel := &runner.PanelInfo{
Type: runner.DatabasePanel,
Content: query,
Expand Down Expand Up @@ -273,7 +272,7 @@ func runQuery(queryRaw string, project *runner.ProjectState, ec *runner.EvalCont
return dumpJSONFile(resultFile, args.pretty, args.schema)
}

func repl(project *runner.ProjectState, ec *runner.EvalContext, args *args, files []string) error {
func repl(project *runner.ProjectState, ec *runner.EvalContext, args *args, files []string, resolveDM_getPanelToId *map[string]string) error {
completer := readline.NewPrefixCompleter(
readline.PcItem("SELECT"),
readline.PcItem("FROM"),
Expand Down Expand Up @@ -326,7 +325,7 @@ func repl(project *runner.ProjectState, ec *runner.EvalContext, args *args, file
return nil
}

err = runQuery(queryRaw, project, ec, args, files)
err = runQuery(queryRaw, project, ec, args, files, resolveDM_getPanelToId)
if err != nil {
return err
}
Expand All @@ -343,6 +342,7 @@ type args struct {
dumpCacheFile bool
isInteractive bool
convertNumbers bool
noSQLiteWriter bool
}

func getArgs() (*args, error) {
Expand Down Expand Up @@ -422,6 +422,11 @@ func getArgs() (*args, error) {
continue
}

if arg == "--no-sqlite-writer" {
args.noSQLiteWriter = true
continue
}

args.nonFlagArgs = append(args.nonFlagArgs, arg)
}

Expand Down Expand Up @@ -457,6 +462,8 @@ Examples:
See the repo for more details: https:/multiprocessio/dsq.`

func _main() error {
rand.Seed(time.Now().UnixNano())

log.SetFlags(0)
runner.Verbose = false

Expand Down Expand Up @@ -562,10 +569,77 @@ func _main() error {
}

ec := runner.NewEvalContext(*runner.DefaultSettings, tmpDir)
connector, err := runner.MakeTmpSQLiteConnector()
if err != nil {
return err
}
if args.cacheSettings.Enabled {
connector.DatabaseConnectorInfo.Database.Database = cachedPath
}

// Check if we can use direct SQLite writer
useSQLiteWriter := !args.noSQLiteWriter && !args.convertNumbers && !args.schema
if useSQLiteWriter && !args.cacheSettings.Enabled {
tmp, err := ioutil.TempFile("", "dsq-sqlite-shared")
if err != nil {
return err
}
defer os.Remove(tmp.Name())
connector.DatabaseConnectorInfo.Database.Database = tmp.Name()
}

for _, file := range files {
mt := mimetypeOverride[file]
if mt == "" {
mt = string(runner.GetMimeType(file, runner.ContentTypeInfo{}))
} else {
mt = string(resolveContentType(mt))
}
mtm := runner.MimeType(mt)
useSQLiteWriter = useSQLiteWriter && (mtm == runner.CSVMimeType ||
mtm == runner.ParquetMimeType ||
mtm == runner.AvroMimeType ||
mtm == runner.TSVMimeType ||
mtm == runner.JSONLinesMimeType ||
mtm == runner.RegexpLinesMimeType)
if !useSQLiteWriter {
break
}
}
// Done checking if we can use SQLiteWriter

// This is going to break sometime. Reset back to original possible values.
if !useSQLiteWriter {
if args.cacheSettings.Enabled {
connector.DatabaseConnectorInfo.Database.Database = cachedPath
} else {
connector.DatabaseConnectorInfo.Database.Database = ":memory:"
}
}

// When dumping schema, need to injest even if cache is on.
if !args.cacheSettings.CachePresent || !args.cacheSettings.Enabled || lastNonFlagArg == "" {
for _, file := range files {
panel, err := importFile(project.Id, file, mimetypeOverride[file], args.convertNumbers, ec)
for i, file := range files {
panelId := uuid.New().String()

var w *runner.ResultWriter
if useSQLiteWriter {
tableName := fmt.Sprintf("t_%d", i)
sw, err := openSQLiteResultItemWriter(connector.DatabaseConnectorInfo.Database.Database, tableName)
if err != nil {
return err
}

w = runner.NewResultWriter(sw)
} else {
// Use JSONWriter
w, err = ec.GetResultWriter(project.Id, panelId)
if err != nil {
return err
}
}

panel, err := importFile(project.Id, panelId, file, mimetypeOverride[file], args.convertNumbers, w, !useSQLiteWriter)
if err != nil {
return err
}
Expand All @@ -584,21 +658,22 @@ func _main() error {
return dumpJSONFile(resultFile, args.pretty, args.schema)
}

connector, err := runner.MakeTmpSQLiteConnector()
if err != nil {
return err
}
project.Connectors = append(project.Connectors, *connector)

if args.cacheSettings.Enabled {
connector.DatabaseConnectorInfo.Database.Database = cachedPath
var resolveDM_getPanelToId *map[string]string
if useSQLiteWriter {
m := map[string]string{}
for i := range files {
m[fmt.Sprintf("%d", i)] = fmt.Sprintf("t_%d", i)
}
resolveDM_getPanelToId = &m
}
project.Connectors = append(project.Connectors, *connector)

if args.isInteractive {
return repl(project, &ec, args, files)
return repl(project, &ec, args, files, resolveDM_getPanelToId)
}

return runQuery(lastNonFlagArg, project, &ec, args, files)
return runQuery(lastNonFlagArg, project, &ec, args, files, resolveDM_getPanelToId)
}

func main() {
Expand Down
6 changes: 6 additions & 0 deletions scripts/benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
hyperfine -w 2 --export-markdown benchmarks.md \
'OCTOSQL_NO_TELEMETRY=1 octosql "SELECT passenger_count, COUNT(*), AVG(total_amount) FROM taxi.csv GROUP BY passenger_count"' \
"q -d ',' -H \"SELECT passenger_count, COUNT(*), AVG(total_amount) FROM taxi.csv GROUP BY passenger_count\"" \
"q -C -d ',' -H \"SELECT passenger_count, COUNT(*), AVG(total_amount) FROM taxi.csv GROUP BY passenger_count\"" \
'dsq taxi.csv "SELECT passenger_count, COUNT(*), AVG(total_amount) FROM {} GROUP BY passenger_count"'
'dsq -C taxi.csv "SELECT passenger_count, COUNT(*), AVG(total_amount) FROM {} GROUP BY passenger_count"'
22 changes: 19 additions & 3 deletions scripts/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tempfile
from datetime import datetime

DEBUG = '-d' in sys.argv or '--debug' in sys.argv
WIN = os.name == 'nt'

def cmd(to_run, bash=False, doNotReplaceWin=False):
Expand All @@ -27,14 +28,23 @@ def cmd(to_run, bash=False, doNotReplaceWin=False):
def test(name, to_run, want, fail=False, sort=False, winSkip=False, within_seconds=None, want_stderr=None):
global tests
global failures

skip = False
for i, arg in enumerate(sys.argv):
if arg == '-f' or arg == '--filter':
if sys.argv[i+1].lower() not in name.lower():
return

tests += 1
skipped = True

t1 = datetime.now()

print('STARTING: ' + name)
if DEBUG:
print(to_run)

if WIN and winSkip:
if WIN and winSkip or skip:
print(' SKIPPED\n')
print()
return
Expand Down Expand Up @@ -94,7 +104,7 @@ def test(name, to_run, want, fail=False, sort=False, winSkip=False, within_secon
t2 = datetime.now()
s = (t2-t1).seconds
if within_seconds and s > within_seconds:
printf(f' FAILURE: completed in {s} seconds. Wanted <{within_seconds}s')
print(f' FAILURE: completed in {s} seconds. Wanted <{within_seconds}s')
failures += 1
return

Expand Down Expand Up @@ -287,7 +297,13 @@ def test(name, to_run, want, fail=False, sort=False, winSkip=False, within_secon
cat taxi.csv | ./dsq --cache -s csv 'SELECT passenger_count, COUNT(*), AVG(total_amount) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC'
"""

test("Caching from pipe (second time so import not required)", to_run, want, sort=True, winSkip=True, within_seconds=5)
test("Caching from pipe (second time so import not required sqlitewriter)", to_run, want, sort=True, winSkip=True, within_seconds=5)

to_run = """
cat taxi.csv | ./dsq --no-sqlite-writer --cache -s csv 'SELECT passenger_count, COUNT(*), AVG(total_amount) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC'
"""

test("Caching from pipe (second time so import not required, jsonwriter)", to_run, want, sort=True, winSkip=True, within_seconds=5)

to_run = """
cat testdata/taxi_trunc.csv | ./dsq --cache -s csv 'SELECT passenger_count, COUNT(*), AVG(total_amount) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC'"""
Expand Down
Loading

0 comments on commit 240acd1

Please sign in to comment.