Skip to content

Commit

Permalink
nlnwa#3: Expose compression type in CLI/conf
Browse files Browse the repository at this point in the history
This allows the user to set the compression type so save memory footprint at the cost of cpu cycles
  • Loading branch information
Avokadoen committed Feb 25, 2021
1 parent 8a0dfc5 commit 287a18b
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 48 deletions.
31 changes: 15 additions & 16 deletions cmd/warcserver/cmd/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/nlnwa/gowarc/warcoptions"
"github.com/nlnwa/gowarc/warcreader"
"github.com/nlnwa/gowarcserver/pkg/dbfromviper"
"github.com/nlnwa/gowarcserver/pkg/index"
"github.com/spf13/cobra"
)
Expand All @@ -37,7 +38,11 @@ func parseFormat(format string) (index.CdxWriter, error) {
case "cdxpb":
return &index.CdxPb{}, nil
case "db":
return &index.CdxDb{}, nil
db, err := dbfromviper.DbFromViper()
if err != nil {
return nil, err
}
return index.NewCdxDb(db), nil
}
return nil, fmt.Errorf("unknwon format %v, valid formats are: 'cdx', 'cdxj', 'cdxpb', 'db'", format)
}
Expand Down Expand Up @@ -76,41 +81,35 @@ func NewCommand() *cobra.Command {
}

func runE(c *conf) error {
fmt.Printf("Format: %v\n", c.writerFormat)

err := c.writer.Init()
if err != nil {
return err
}
defer c.writer.Close()
fmt.Printf("Format: %v\n", c.writerFormat)

readFile(c)
return nil
return readFile(c)
}

// TODO: return error
func readFile(c *conf) {
func readFile(c *conf) error {
opts := &warcoptions.WarcOptions{Strict: false}
wf, err := warcreader.NewWarcFilename(c.fileName, 0, opts)
if err != nil {
return
return err
}
defer wf.Close()

count := 0

// At the end, print count even if an error occurs
defer fmt.Fprintln(os.Stderr, "Count: ", count)
for {
wr, currentOffset, err := wf.Next()
if err == io.EOF {
break
}
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Error: %v, rec num: %v, Offset %v\n", err.Error(), strconv.Itoa(count), currentOffset)
break
return fmt.Errorf("Error: %v, rec num: %v, Offset %v\n", err.Error(), strconv.Itoa(count), currentOffset)
}
count++

c.writer.Write(wr, c.fileName, currentOffset)
}
fmt.Fprintln(os.Stderr, "Count: ", count)

return nil
}
15 changes: 10 additions & 5 deletions cmd/warcserver/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
type conf struct {
cfgFile string
logLevel string

// DB settings
compression string
}

// NewCommand returns a new cobra.Command implementing the root command for warc
Expand All @@ -45,10 +48,6 @@ func NewCommand() *cobra.Command {
// https:/dgraph-io/badger#are-there-any-go-specific-settings-that-i-should-use
runtime.GOMAXPROCS(128)

if c.logLevel == "" {
c.logLevel = viper.GetString("loglevel")
}

level, err := log.ParseLevel(c.logLevel)
if err != nil {
return fmt.Errorf("'%s' is not part of the valid levels: 'panic', 'fatal', 'error', 'warn', 'warning', 'info', 'debug', 'trace'", c.logLevel)
Expand All @@ -62,8 +61,13 @@ func NewCommand() *cobra.Command {
cobra.OnInitialize(func() { c.initConfig() })

// Flags
cmd.PersistentFlags().StringVarP(&c.logLevel, "log-level", "l", "", "set the log level of gowarc, it will take precedence over config 'loglevel'")
cmd.PersistentFlags().StringVarP(&c.compression, "compression", "c", "none", "DB compression type: 'none', 'snappy', 'zstd'")
cmd.PersistentFlags().StringVarP(&c.logLevel, "loglevel", "l", "info", "set the log level of gowarc, it will take precedence over config 'loglevel'")
cmd.PersistentFlags().StringVar(&c.cfgFile, "config", "", "config file. If not set, /etc/warc/, $HOME/.warc/ and current working dir will be searched for file config.yaml")

// bind flags and config
viper.BindPFlag("compression", cmd.PersistentFlags().Lookup("compression"))
viper.BindPFlag("loglevel", cmd.PersistentFlags().Lookup("loglevel"))
viper.BindPFlag("config", cmd.PersistentFlags().Lookup("config"))

// Subcommands
Expand All @@ -81,6 +85,7 @@ func (c *conf) initConfig() {
viper.SetDefault("autoindex", true)
viper.SetDefault("warcport", 9999)
viper.SetDefault("loglevel", "info")
viper.SetDefault("compression", "none")

viper.AutomaticEnv() // read in environment variables that match

Expand Down
6 changes: 3 additions & 3 deletions cmd/warcserver/cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package serve

import (
"github.com/nlnwa/gowarcserver/pkg/dbfromviper"
"github.com/nlnwa/gowarcserver/pkg/index"
"github.com/nlnwa/gowarcserver/pkg/server"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -56,10 +57,9 @@ func runE(c *conf) error {
c.port = viper.GetInt("warcport")
}

dbDir := viper.GetString("indexdir")
db, err := index.NewIndexDb(dbDir)
db, err := dbfromviper.DbFromViper()
if err != nil {
log.Fatal(err)
return err
}
defer db.Close()

Expand Down
112 changes: 112 additions & 0 deletions pkg/compressiontype/compressiontype_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package compressiontype_test

import (
"testing"

"github.com/dgraph-io/badger/v2/options"
ct "github.com/nlnwa/gowarcserver/pkg/compressiontype"
)

func TestCompressionType_String(t *testing.T) {
tests := []struct {
name string
cType options.CompressionType
expected string
errorState bool
}{
{
"options.None to string none",
options.None,
"none",
false,
},
{
"options.Snappy to string snappy",
options.Snappy,
"snappy",
false,
},
{
"options.ZSTD to string zstd",
options.ZSTD,
"zstd",
false,
},
{
"Illegal to error",
999,
"",
true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ct.CompressionType(tt.cType).String()
if err != nil && !tt.errorState {
t.Errorf("Unexpected failure: %v", err)
} else if err == nil && tt.errorState {
t.Errorf("Expected failure with %v, but got '%v'", tt.cType, got)
}

if tt.expected != got {
t.Errorf("Exptected %v got %v", tt.expected, got)
}
})
}
}

func TestCompressionType_FromString(t *testing.T) {
tests := []struct {
name string
stringValue string
expected ct.CompressionType
errorState bool
}{
{
"string none to options.None",
"none",
ct.CompressionType(options.None),
false,
},
{
"string snappy to options.Snappy",
"snappy",
ct.CompressionType(options.Snappy),
false,
},
{
"string zstd to options.ZSTD",
"zstd",
ct.CompressionType(options.ZSTD),
false,
},
{
"string ZstD to options.ZSTD",
"ZstD",
ct.CompressionType(options.ZSTD),
false,
},
{
"string garbage to error",
"garbage",
0,
true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ct.FromString(tt.stringValue)
if err != nil && !tt.errorState {
t.Errorf("Unexpected failure: %v", err)
} else if err == nil && tt.errorState {
t.Errorf("Expected failure with %v, but got %v", tt.stringValue, got)
}

if tt.expected != got {
t.Errorf("Exptected %v got %v", tt.expected, got)
}
})
}
}
35 changes: 35 additions & 0 deletions pkg/compressiontype/comressiontype.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package compressiontype

import (
"fmt"
"strings"

"github.com/dgraph-io/badger/v2/options"
)

type CompressionType options.CompressionType

// should be const, since it is internal to this file it should be ok as var...
var stringRepresentation = [...]string{"none", "snappy", "zstd"}

func (c CompressionType) String() (string, error) {
if c != CompressionType(options.None) && c != CompressionType(options.Snappy) && c != CompressionType(options.ZSTD) {
return "", fmt.Errorf("CompressionType can only have value %v, %v or %v", options.None, options.Snappy, options.ZSTD)
}
return stringRepresentation[c], nil
}

func FromString(value string) (CompressionType, error) {
lowered := strings.ToLower(value)

// for now we manually check for each type and return if we find it
if lowered == "none" {
return CompressionType(options.None), nil
} else if lowered == "snappy" {
return CompressionType(options.Snappy), nil
} else if lowered == "zstd" {
return CompressionType(options.ZSTD), nil
}

return CompressionType(0), fmt.Errorf("unexpected value '%v', expected any of listed: '%v'", lowered, strings.Join(stringRepresentation[:], ", "))
}
31 changes: 31 additions & 0 deletions pkg/dbfromviper/dbfromviper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package dbfromviper

/*
Usually it should be avoided to reference viper in pkg, but this functionality is used by
cmd index and server, so it's an exception.
*/

import (
"github.com/dgraph-io/badger/v2/options"
"github.com/nlnwa/gowarcserver/pkg/compressiontype"
"github.com/nlnwa/gowarcserver/pkg/index"
"github.com/spf13/viper"
)

// TODO: test somehow?
// Create a database based on the viper settings set by the user
func DbFromViper() (*index.Db, error) {
compressionString := viper.GetString("compression")
compression, cErr := compressiontype.FromString(compressionString)
if cErr != nil {
return nil, cErr
}

dbDir := viper.GetString("indexdir")
db, dbErr := index.NewIndexDb(dbDir, options.CompressionType(compression))
if dbErr != nil {
return nil, dbErr
}

return db, nil
}
15 changes: 8 additions & 7 deletions pkg/index/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
gowarcpb "github.com/nlnwa/gowarc/proto"
Expand Down Expand Up @@ -55,7 +56,7 @@ type Db struct {
batchFlushChan chan []*record
}

func NewIndexDb(dbDir string) (*Db, error) {
func NewIndexDb(dbDir string, compression options.CompressionType) (*Db, error) {
dbDir = path.Join(dbDir, "warcdb")
idIndexDir := path.Join(dbDir, "id-index")
fileIndexDir := path.Join(dbDir, "file-index")
Expand Down Expand Up @@ -93,17 +94,17 @@ func NewIndexDb(dbDir string) (*Db, error) {
// Open db
var err error

d.idIndex, err = openIndex(idIndexDir)
d.idIndex, err = openIndex(idIndexDir, compression)
if err != nil {
return nil, err
}

d.fileIndex, err = openIndex(fileIndexDir)
d.fileIndex, err = openIndex(fileIndexDir, compression)
if err != nil {
return nil, err
}

d.cdxIndex, err = openIndex(cdxIndexDir)
d.cdxIndex, err = openIndex(cdxIndexDir, compression)
if err != nil {
return nil, err
}
Expand All @@ -117,12 +118,12 @@ func NewIndexDb(dbDir string) (*Db, error) {
return d, nil
}

func openIndex(indexDir string) (db *badger.DB, err error) {
func openIndex(indexDir string, compression options.CompressionType) (db *badger.DB, err error) {
if err := os.MkdirAll(indexDir, 0777); err != nil {
return nil, err
}
opts := badger.DefaultOptions(indexDir)
opts.Logger = log.StandardLogger()
opts := badger.DefaultOptions(indexDir).WithLogger(log.StandardLogger())
opts.Compression = compression
db, err = badger.Open(opts)
return
}
Expand Down
Loading

0 comments on commit 287a18b

Please sign in to comment.