diff --git a/cmd/warc/cmd/root.go b/cmd/warc/cmd/root.go index 6cd5ddc..c9aa72e 100644 --- a/cmd/warc/cmd/root.go +++ b/cmd/warc/cmd/root.go @@ -17,8 +17,6 @@ package cmd import ( "fmt" - "runtime" - "github.com/fsnotify/fsnotify" "github.com/nlnwa/gowarc/cmd/warc/cmd/cat" "github.com/nlnwa/gowarc/cmd/warc/cmd/index" @@ -27,6 +25,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" + "strings" ) type conf struct { @@ -41,16 +40,7 @@ func NewCommand() *cobra.Command { Use: "warc", Short: "A tool for handling warc files", Long: ``, - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - // Increase GOMAXPROCS as recommended by badger - // https://github.com/dgraph-io/badger#are-there-any-go-specific-settings-that-i-should-use - runtime.GOMAXPROCS(128) - - if c.logLevel == "" { - c.logLevel = viper.GetString("loglevel") - } - level, err := log.ParseLevel(c.logLevel) if err != nil { return fmt.Errorf("'%s' is not part of the valid levels: 'panic', 'fatal', 'error', 'warn', 'warning', 'info', 'debug', 'trace'", c.logLevel) @@ -63,10 +53,13 @@ func NewCommand() *cobra.Command { cobra.OnInitialize(func() { c.initConfig() }) + // Flags - cmd.PersistentFlags().StringVarP(&c.logLevel, "log-level", "l", "", "set the log level of gowarc, it will take precedence over config 'loglevel'") - cmd.PersistentFlags().StringVar(&c.cfgFile, "config", "", "config file. If not set, /etc/warc/, $HOME/.warc/ and current working dir will be searched for file config.yaml") - viper.BindPFlag("config", cmd.PersistentFlags().Lookup("config")) + cmd.PersistentFlags().StringVarP(&c.logLevel, "log-level", "l", "info", "fatal, error, warn, info, debug or trace") + cmd.PersistentFlags().StringVar(&c.cfgFile, "config", "c", "config file. If not set, /etc/warc/, $HOME/.warc/ and current working dir will be searched for file config.yaml") + if err := viper.BindPFlags(cmd.PersistentFlags()); err != nil { + log.Fatalf("Failed to bind serve flags: %v", err) + } // Subcommands cmd.AddCommand(ls.NewCommand()) @@ -80,14 +73,9 @@ func NewCommand() *cobra.Command { // initConfig reads in config file and ENV variables if set. func (c *conf) initConfig() { viper.SetTypeByDefaultValue(true) - viper.SetDefault("warcdir", []string{"."}) - viper.SetDefault("indexdir", ".") - viper.SetDefault("autoindex", true) - viper.SetDefault("warcport", 9999) - viper.SetDefault("loglevel", "info") viper.AutomaticEnv() // read in environment variables that match - + viper.EnvKeyReplacer(strings.NewReplacer("-", "_")) if viper.IsSet("config") { // Use config file from the flag. viper.SetConfigFile(viper.GetString("config")) @@ -110,7 +98,7 @@ func (c *conf) initConfig() { // Config file not found; ignore error } else { // Config file was found but another error was produced - log.Fatalf("error reading config file: %v", err) + log.Fatalf("Failed to read config file: %v", err) } } diff --git a/cmd/warc/cmd/serve/serve.go b/cmd/warc/cmd/serve/serve.go index 35402d4..cb4d79a 100644 --- a/cmd/warc/cmd/serve/serve.go +++ b/cmd/warc/cmd/serve/serve.go @@ -16,60 +16,94 @@ package serve import ( + "context" + "fmt" + "github.com/gorilla/handlers" "github.com/nlnwa/gowarc/pkg/index" "github.com/nlnwa/gowarc/pkg/server" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" + "net/http" + "os" + "os/signal" + "runtime" + "syscall" + "time" ) -type conf struct { - port int - warcDirs []string - watchDepth int -} - func NewCommand() *cobra.Command { - c := &conf{} - var cmd = &cobra.Command{ + cmd := &cobra.Command{ Use: "serve", Short: "Start the warc server to serve warc records", Long: ``, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + // Increase GOMAXPROCS as recommended by badger + // https://github.com/dgraph-io/badger#are-there-any-go-specific-settings-that-i-should-use + runtime.GOMAXPROCS(128) + }, RunE: func(cmd *cobra.Command, args []string) error { if len(args) > 0 { - c.warcDirs = args - } else { - c.warcDirs = viper.GetStringSlice("warcdir") + viper.Set("warc-dir", args) } - return runE(c) + return runE() }, } - cmd.Flags().IntVarP(&c.port, "port", "p", -1, "the port that should be used to serve, will use config value otherwise") - cmd.Flags().IntVarP(&c.watchDepth, "watch-depth", "w", 4, "The maximum depth when indexing warc") + cmd.Flags().IntP("port", "p", 9999, "Server listening port") + cmd.Flags().StringP("path-prefix", "", "/", "Path prefix") + cmd.Flags().IntP("watch-depth", "d", 4, "The maximum depth when indexing warc") + cmd.Flags().BoolP("auto-index", "", true, "Enable automatic indexing") + + cmd.Flags().StringP("index-dir", "", ".", "Index directory") + cmd.Flags().StringSliceP("warc-dir", "", []string{"."}, "List of directories containing warcfiles") + cmd.Flags().StringP("cdx-cache-size", "", "", "Size of cdx index cache in bytes") + cmd.Flags().StringP("file-cache-size", "", "", "Size of file index cache in bytes") + cmd.Flags().StringP("id-cache-size", "", "", "Size of id index cache") + if err := viper.BindPFlags(cmd.Flags()); err != nil { + log.Fatalf("Failed to bind serve flags: %v", err) + } return cmd } -func runE(c *conf) error { - if c.port < 0 { - c.port = viper.GetInt("warcport") - } +func runE() error { + opts := index.DefaultOptions(). + WithDir(viper.GetString("index-dir")). + WithIdCacheSize(int64(viper.GetSizeInBytes("id-cache-size"))). + WithFileCacheSize(int64(viper.GetSizeInBytes("file-cache-size"))). + WithCdxCacheSize(int64(viper.GetSizeInBytes("cdx-cache-size"))) - dbDir := viper.GetString("indexdir") - db, err := index.NewIndexDb(dbDir) + db, err := index.NewIndexDb(opts) if err != nil { log.Fatal(err) } defer db.Close() - if viper.GetBool("autoindex") { + if viper.GetBool("auto-index") { log.Infof("Starting autoindexer") - autoindexer := index.NewAutoIndexer(db, c.warcDirs, c.watchDepth) + autoindexer := index.NewAutoIndexer(db, viper.GetStringSlice("warc-dir"), viper.GetInt("watch-depth")) defer autoindexer.Shutdown() } - log.Infof("Starting web server at http://localhost:%v", c.port) - server.Serve(db, c.port) - return nil + loggingMw := func(h http.Handler) http.Handler { + return handlers.CombinedLoggingHandler(os.Stdout, h) + } + server.Handler(db, loggingMw) + + httpServer := &http.Server{ + Addr: fmt.Sprintf(":%v", viper.GetString("port")), + } + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) + go func() { + <-sigs + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = httpServer.Shutdown(ctx) + }() + + log.Infof("Starting web server at http://localhost:%v", viper.GetInt("port")) + return httpServer.ListenAndServe() } diff --git a/go.mod b/go.mod index 1a8a627..4e9fea8 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/nlnwa/gowarc go 1.13 require ( - github.com/dgraph-io/badger/v2 v2.0.2 + github.com/dgraph-io/badger/v2 v2.2007.2 github.com/fsnotify/fsnotify v1.4.7 github.com/golang/protobuf v1.4.0-rc.4 github.com/gorilla/handlers v1.4.2 diff --git a/go.sum b/go.sum index e013b82..b62615e 100644 --- a/go.sum +++ b/go.sum @@ -25,8 +25,12 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger/v2 v2.0.2 h1:uBAA5oM9Gz9TrP01v9LxBGztE5rhtGeBxpF1IvxGGtw= github.com/dgraph-io/badger/v2 v2.0.2/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= +github.com/dgraph-io/badger/v2 v2.2007.2 h1:EjjK0KqwaFMlPin1ajhP943VPENHJdEz1KLIegjaI3k= +github.com/dgraph-io/badger/v2 v2.2007.2/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= +github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= diff --git a/k8s/overlays/prod/config.yaml b/k8s/overlays/prod/config.yaml new file mode 100644 index 0000000..0fb9f61 --- /dev/null +++ b/k8s/overlays/prod/config.yaml @@ -0,0 +1,5 @@ +autoindex: true +indexdir: /warcindex +warcdir: + - /warcfiles +loglevel: info diff --git a/k8s/overlays/prod/deployment_volume_patch.yaml b/k8s/overlays/prod/deployment_volume_patch.yaml new file mode 100644 index 0000000..ff27888 --- /dev/null +++ b/k8s/overlays/prod/deployment_volume_patch.yaml @@ -0,0 +1,32 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gowarc-server +spec: + template: + spec: + containers: + - name: gowarc-server + resources: + limits: + memory: 30Gi + volumeMounts: + - mountPath: /etc/warc + name: gowarc-config + - mountPath: /warcindex + name: warcindex + subPath: nettaviser/nettaviser_2020 + - mountPath: /warcfiles + name: warcfiles + subPath: veidemann/validwarcs/nettaviser/nettaviser_2020 + volumes: + - name: gowarc-config + configMap: + name: gowarc-config + - name: warcindex + persistentVolumeClaim: + claimName: loke-b11 + - name: warcfiles + persistentVolumeClaim: + claimName: nettarkivet-0 + readOnly: true diff --git a/k8s/overlays/prod/kustomization.yaml b/k8s/overlays/prod/kustomization.yaml new file mode 100644 index 0000000..9bfa55f --- /dev/null +++ b/k8s/overlays/prod/kustomization.yaml @@ -0,0 +1,17 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: nettarkivet +namePrefix: "loke-" +nameSuffix: "-nettaviser" + +resources: + - ../../base + +configMapGenerator: + - name: gowarc-config + files: + - config.yaml + +patchesStrategicMerge: + - deployment_volume_patch.yaml diff --git a/pkg/index/db.go b/pkg/index/db.go index 1efda68..eac4fe3 100644 --- a/pkg/index/db.go +++ b/pkg/index/db.go @@ -46,6 +46,11 @@ type Db struct { cdxIndex *badger.DB dbGcInterval *time.Ticker + // cache settings + fileIndexCacheSize int64 + cdxIndexCacheSize int64 + idIndexCacheSize int64 + // batch settings batchMaxSize int batchMaxWait time.Duration @@ -55,8 +60,8 @@ type Db struct { batchFlushChan chan []*record } -func NewIndexDb(dbDir string) (*Db, error) { - dbDir = path.Join(dbDir, "warcdb") +func NewIndexDb(opts Options) (*Db, error) { + dbDir := path.Join(opts.Dir, "warcdb") idIndexDir := path.Join(dbDir, "id-index") fileIndexDir := path.Join(dbDir, "file-index") cdxIndexDir := path.Join(dbDir, "cdx-index") @@ -93,17 +98,17 @@ func NewIndexDb(dbDir string) (*Db, error) { // Open db var err error - d.idIndex, err = openIndex(idIndexDir) + d.idIndex, err = openIndex(idIndexDir, opts.IdCacheSize) if err != nil { return nil, err } - d.fileIndex, err = openIndex(fileIndexDir) + d.fileIndex, err = openIndex(fileIndexDir, opts.FileCacheSize) if err != nil { return nil, err } - d.cdxIndex, err = openIndex(cdxIndexDir) + d.cdxIndex, err = openIndex(cdxIndexDir, opts.CdxCacheSize) if err != nil { return nil, err } @@ -117,11 +122,11 @@ func NewIndexDb(dbDir string) (*Db, error) { return d, nil } -func openIndex(indexDir string) (db *badger.DB, err error) { - if err := os.MkdirAll(indexDir, 0777); err != nil { +func openIndex(dir string, cacheSize int64) (db *badger.DB, err error) { + if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } - opts := badger.DefaultOptions(indexDir) + opts := badger.DefaultOptions(dir).WithIndexCacheSize(cacheSize) opts.Logger = log.StandardLogger() db, err = badger.Open(opts) return @@ -221,12 +226,18 @@ func (d *Db) UpdateFilePath(filePath string) { } func (d *Db) AddBatch(records []*record) { - log.Debugf("flushing batch to DB") + log.Debug("Flushing batch to DB") var err error err = d.idIndex.Update(func(txn *badger.Txn) error { for _, r := range records { + if r == nil { + log.Warn("Record is nil") + continue + } else if r.filePath == "" { + log.Warn("Empty filepath") + } r.filePath, err = filepath.Abs(r.filePath) if err != nil { log.Errorf("%v", err) @@ -274,10 +285,7 @@ func (d *Db) Flush() { return } - copiedItems := make([]*record, len(d.batchItems)) - for idx, i := range d.batchItems { - copiedItems[idx] = i - } + copiedItems := d.batchItems d.batchItems = d.batchItems[:0] d.batchFlushChan <- copiedItems } diff --git a/pkg/index/indexworker.go b/pkg/index/indexworker.go index 25dcf1f..fda6408 100644 --- a/pkg/index/indexworker.go +++ b/pkg/index/indexworker.go @@ -102,7 +102,7 @@ func indexFile(db *Db, fileName string) { // Check if file is indexed and has not changed since indexing stat, err := os.Stat(fileName) if err != nil { - log.Errorf("%v", err) + log.Errorf("Failed to stat file %v: %v", fileName, err) } fileSize := stat.Size() @@ -111,7 +111,7 @@ func indexFile(db *Db, fileName string) { if fileInfo, err := db.GetFilePath(fn); err == nil { fileInfoLastModified, err := ptypes.Timestamp(fileInfo.LastModified) if err != nil { - log.Errorf("%v", err) + log.Errorf("Failed to convert timestamp: %v", err) } if fileInfo.Size == fileSize && fileInfoLastModified.Equal(fileLastModified) { log.Debugf("Already indexed %v", fileName) diff --git a/pkg/index/indexwriter.go b/pkg/index/indexwriter.go index 6021016..fc6a308 100644 --- a/pkg/index/indexwriter.go +++ b/pkg/index/indexwriter.go @@ -22,7 +22,6 @@ import ( "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/nlnwa/gowarc/warcrecord" - "github.com/spf13/viper" ) type CdxWriter interface { @@ -44,8 +43,7 @@ type CdxDb struct { } func (c *CdxDb) Init() (err error) { - dbDir := viper.GetString("indexdir") - c.db, err = NewIndexDb(dbDir) + c.db, err = NewIndexDb(DefaultOptions()) if err != nil { return err } diff --git a/pkg/index/options.go b/pkg/index/options.go new file mode 100644 index 0000000..d409308 --- /dev/null +++ b/pkg/index/options.go @@ -0,0 +1,50 @@ +/* + * Copyright 2020 National Library of Norway. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package index + +type Options struct { + Dir string + FileCacheSize int64 + IdCacheSize int64 + CdxCacheSize int64 +} + +func DefaultOptions() Options { + return Options{ + Dir: "", + FileCacheSize: 0, + IdCacheSize: 0, + CdxCacheSize: 0, + } +} + +func (opt Options) WithDir(val string) Options { + opt.Dir = val + return opt +} +func (opt Options) WithFileCacheSize(val int64) Options { + opt.FileCacheSize = val + return opt +} +func (opt Options) WithIdCacheSize(val int64) Options { + opt.IdCacheSize = val + return opt +} +func (opt Options) WithCdxCacheSize(val int64) Options { + opt.CdxCacheSize = val + return opt +} diff --git a/pkg/server/server.go b/pkg/server/server.go index f40578f..f2b0f5f 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -17,24 +17,14 @@ package server import ( - "context" - "fmt" - "net/http" - "os" - "os/signal" - "strconv" - "syscall" - "time" - - "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/nlnwa/gowarc/pkg/index" "github.com/nlnwa/gowarc/pkg/loader" "github.com/nlnwa/gowarc/pkg/server/warcserver" - log "github.com/sirupsen/logrus" + "net/http" ) -func Serve(db *index.Db, port int) { +func Handler(db *index.Db, middlewares ...func (http.Handler) http.Handler) http.Handler { l := &loader.Loader{ Resolver: &storageRefResolver{db: db}, Loader: &loader.FileStorageLoader{FilePathResolver: func(fileName string) (filePath string, err error) { @@ -50,28 +40,12 @@ func Serve(db *index.Db, port int) { r.Handle("/search", &searchHandler{l, db}) warcserverRoutes := r.PathPrefix("/warcserver").Subrouter() warcserver.RegisterRoutes(warcserverRoutes, db, l) - http.Handle("/", r) - loggingMw := func(h http.Handler) http.Handler { - return handlers.CombinedLoggingHandler(os.Stdout, h) + for _, middleware := range middlewares { + r.Use(middleware) } - r.Use(loggingMw) - - portStr := strconv.Itoa(port) - httpServer := &http.Server{ - Addr: fmt.Sprintf(":%v", portStr), - } - - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, os.Interrupt, syscall.SIGTERM) - go func() { - <-sigs - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - httpServer.Shutdown(ctx) - }() - log.Info(httpServer.ListenAndServe()) + return r } type storageRefResolver struct {