From fab1632664b107b5f5966ae0bf89e934f13410bf Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Tue, 19 Dec 2023 16:34:58 +0530 Subject: [PATCH 1/4] model schema log --- runtime/reconcilers/model.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/runtime/reconcilers/model.go b/runtime/reconcilers/model.go index 87e93313c6c..2e4aa2d62f3 100644 --- a/runtime/reconcilers/model.go +++ b/runtime/reconcilers/model.go @@ -7,11 +7,14 @@ import ( "encoding/hex" "errors" "fmt" + "strings" "time" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" compilerv1 "github.com/rilldata/rill/runtime/compilers/rillv1" + "github.com/rilldata/rill/runtime/drivers" + "go.uber.org/zap" "golang.org/x/exp/slog" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -222,6 +225,10 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa if createErr != nil { createErr = fmt.Errorf("failed to create model: %w", createErr) } + + // temporarily for debugging + r.logModelNameAndType(ctx, self, stagingTableName) + if createErr == nil && stage { // Rename the staging table to main view/table err = olapForceRenameTable(ctx, r.C, connector, stagingTableName, !materialize, tableName) @@ -459,3 +466,32 @@ func (r *ModelReconciler) createModel(ctx context.Context, self *runtimev1.Resou return olap.CreateTableAsSelect(ctx, tableName, view, sql) } + +func (r *ModelReconciler) logModelNameAndType(ctx context.Context, self *runtimev1.Resource, name string) { + olap, release, err := r.C.AcquireOLAP(ctx, self.GetModel().Spec.Connector) + if err != nil { + r.C.Logger.Error("ModelReconciler: failed to acquire OLAP", zap.Error(err)) + return + } + defer release() + + res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT column_name, data_type FROM information_schema.columns WHERE table_name=? ORDER BY column_name ASC", Args: []any{name}}) + if err != nil { + r.C.Logger.Error("ModelReconciler: failed information_schema.columns", zap.Error(err)) + return + } + defer res.Close() + + colTyp := make([]string, 0) + var col, typ string + for res.Next() { + err = res.Scan(&col, &typ) + if err != nil { + r.C.Logger.Error("ModelReconciler: failed scan", zap.Error(err)) + return + } + colTyp = append(colTyp, fmt.Sprintf("%s:%s", col, typ)) + } + + r.C.Logger.Info("ModelReconciler: ", slog.String("name", name), slog.String("schema", strings.Join(colTyp, ", "))) +} From 152970a57a98d3c40ef1813002e367c7ff8e4574 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Tue, 19 Dec 2023 16:37:13 +0530 Subject: [PATCH 2/4] remove zap --- runtime/reconcilers/model.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/runtime/reconcilers/model.go b/runtime/reconcilers/model.go index 2e4aa2d62f3..e12f4846c8c 100644 --- a/runtime/reconcilers/model.go +++ b/runtime/reconcilers/model.go @@ -14,7 +14,6 @@ import ( "github.com/rilldata/rill/runtime" compilerv1 "github.com/rilldata/rill/runtime/compilers/rillv1" "github.com/rilldata/rill/runtime/drivers" - "go.uber.org/zap" "golang.org/x/exp/slog" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -470,14 +469,14 @@ func (r *ModelReconciler) createModel(ctx context.Context, self *runtimev1.Resou func (r *ModelReconciler) logModelNameAndType(ctx context.Context, self *runtimev1.Resource, name string) { olap, release, err := r.C.AcquireOLAP(ctx, self.GetModel().Spec.Connector) if err != nil { - r.C.Logger.Error("ModelReconciler: failed to acquire OLAP", zap.Error(err)) + r.C.Logger.Error("ModelReconciler: failed to acquire OLAP", slog.Any("err", err)) return } defer release() res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT column_name, data_type FROM information_schema.columns WHERE table_name=? ORDER BY column_name ASC", Args: []any{name}}) if err != nil { - r.C.Logger.Error("ModelReconciler: failed information_schema.columns", zap.Error(err)) + r.C.Logger.Error("ModelReconciler: failed information_schema.columns", slog.Any("err", err)) return } defer res.Close() @@ -487,7 +486,7 @@ func (r *ModelReconciler) logModelNameAndType(ctx context.Context, self *runtime for res.Next() { err = res.Scan(&col, &typ) if err != nil { - r.C.Logger.Error("ModelReconciler: failed scan", zap.Error(err)) + r.C.Logger.Error("ModelReconciler: failed scan", slog.Any("err", err)) return } colTyp = append(colTyp, fmt.Sprintf("%s:%s", col, typ)) From 8800def73cc53ceb24f00b6a4ab09fa6f6b01f2f Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Tue, 19 Dec 2023 16:49:21 +0530 Subject: [PATCH 3/4] also add to source --- runtime/reconcilers/model.go | 39 +++++------------------------------ runtime/reconcilers/source.go | 6 ++++++ runtime/reconcilers/util.go | 30 +++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 34 deletions(-) diff --git a/runtime/reconcilers/model.go b/runtime/reconcilers/model.go index e12f4846c8c..6744e90e5c3 100644 --- a/runtime/reconcilers/model.go +++ b/runtime/reconcilers/model.go @@ -7,13 +7,11 @@ import ( "encoding/hex" "errors" "fmt" - "strings" "time" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" compilerv1 "github.com/rilldata/rill/runtime/compilers/rillv1" - "github.com/rilldata/rill/runtime/drivers" "golang.org/x/exp/slog" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -223,11 +221,13 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa createErr := r.createModel(ctx, self, stagingTableName, !materialize) if createErr != nil { createErr = fmt.Errorf("failed to create model: %w", createErr) + } else { + if !r.C.Runtime.AllowHostAccess() { + // temporarily for debugging + logTableNameAndType(ctx, r.C, connector, stagingTableName) + } } - // temporarily for debugging - r.logModelNameAndType(ctx, self, stagingTableName) - if createErr == nil && stage { // Rename the staging table to main view/table err = olapForceRenameTable(ctx, r.C, connector, stagingTableName, !materialize, tableName) @@ -465,32 +465,3 @@ func (r *ModelReconciler) createModel(ctx context.Context, self *runtimev1.Resou return olap.CreateTableAsSelect(ctx, tableName, view, sql) } - -func (r *ModelReconciler) logModelNameAndType(ctx context.Context, self *runtimev1.Resource, name string) { - olap, release, err := r.C.AcquireOLAP(ctx, self.GetModel().Spec.Connector) - if err != nil { - r.C.Logger.Error("ModelReconciler: failed to acquire OLAP", slog.Any("err", err)) - return - } - defer release() - - res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT column_name, data_type FROM information_schema.columns WHERE table_name=? ORDER BY column_name ASC", Args: []any{name}}) - if err != nil { - r.C.Logger.Error("ModelReconciler: failed information_schema.columns", slog.Any("err", err)) - return - } - defer res.Close() - - colTyp := make([]string, 0) - var col, typ string - for res.Next() { - err = res.Scan(&col, &typ) - if err != nil { - r.C.Logger.Error("ModelReconciler: failed scan", slog.Any("err", err)) - return - } - colTyp = append(colTyp, fmt.Sprintf("%s:%s", col, typ)) - } - - r.C.Logger.Info("ModelReconciler: ", slog.String("name", name), slog.String("schema", strings.Join(colTyp, ", "))) -} diff --git a/runtime/reconcilers/source.go b/runtime/reconcilers/source.go index 442e094d7bc..7305e3163b4 100644 --- a/runtime/reconcilers/source.go +++ b/runtime/reconcilers/source.go @@ -179,7 +179,13 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN ingestErr := r.ingestSource(ctx, src.Spec, stagingTableName) if ingestErr != nil { ingestErr = fmt.Errorf("failed to ingest source: %w", ingestErr) + } else { + if !r.C.Runtime.AllowHostAccess() { + // temporarily for debugging + logTableNameAndType(ctx, r.C, connector, stagingTableName) + } } + if ingestErr == nil && src.Spec.StageChanges { // Rename staging table to main table err = olapForceRenameTable(ctx, r.C, connector, stagingTableName, false, tableName) diff --git a/runtime/reconcilers/util.go b/runtime/reconcilers/util.go index d9105d8c334..a0c34ff6a39 100644 --- a/runtime/reconcilers/util.go +++ b/runtime/reconcilers/util.go @@ -11,6 +11,7 @@ import ( "github.com/rilldata/rill/runtime" "github.com/rilldata/rill/runtime/drivers" "github.com/robfig/cron/v3" + "golang.org/x/exp/slog" ) // checkRefs checks that all refs exist, are idle, and have no errors. @@ -154,3 +155,32 @@ func safeSQLName(name string) string { } return fmt.Sprintf("\"%s\"", strings.ReplaceAll(name, "\"", "\"\"")) } + +func logTableNameAndType(ctx context.Context, c *runtime.Controller, connector, name string) { + olap, release, err := c.AcquireOLAP(ctx, connector) + if err != nil { + c.Logger.Error("LogTableNameAndType: failed to acquire OLAP", slog.Any("err", err)) + return + } + defer release() + + res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT column_name, data_type FROM information_schema.columns WHERE table_name=? ORDER BY column_name ASC", Args: []any{name}}) + if err != nil { + c.Logger.Error("LogTableNameAndType: failed information_schema.columns", slog.Any("err", err)) + return + } + defer res.Close() + + colTyp := make([]string, 0) + var col, typ string + for res.Next() { + err = res.Scan(&col, &typ) + if err != nil { + c.Logger.Error("LogTableNameAndType: failed scan", slog.Any("err", err)) + return + } + colTyp = append(colTyp, fmt.Sprintf("%s:%s", col, typ)) + } + + c.Logger.Info("LogTableNameAndType: ", slog.String("name", name), slog.String("schema", strings.Join(colTyp, ", "))) +} From d75e70ee670caeb50d84ae7c8dcaf711264f1276 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal Date: Tue, 19 Dec 2023 16:53:42 +0530 Subject: [PATCH 4/4] lint fix --- runtime/reconcilers/model.go | 8 +++----- runtime/reconcilers/source.go | 8 +++----- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/runtime/reconcilers/model.go b/runtime/reconcilers/model.go index 6744e90e5c3..7b6a2f21be9 100644 --- a/runtime/reconcilers/model.go +++ b/runtime/reconcilers/model.go @@ -221,11 +221,9 @@ func (r *ModelReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa createErr := r.createModel(ctx, self, stagingTableName, !materialize) if createErr != nil { createErr = fmt.Errorf("failed to create model: %w", createErr) - } else { - if !r.C.Runtime.AllowHostAccess() { - // temporarily for debugging - logTableNameAndType(ctx, r.C, connector, stagingTableName) - } + } else if !r.C.Runtime.AllowHostAccess() { + // temporarily for debugging + logTableNameAndType(ctx, r.C, connector, stagingTableName) } if createErr == nil && stage { diff --git a/runtime/reconcilers/source.go b/runtime/reconcilers/source.go index 7305e3163b4..a78db2e5272 100644 --- a/runtime/reconcilers/source.go +++ b/runtime/reconcilers/source.go @@ -179,11 +179,9 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceN ingestErr := r.ingestSource(ctx, src.Spec, stagingTableName) if ingestErr != nil { ingestErr = fmt.Errorf("failed to ingest source: %w", ingestErr) - } else { - if !r.C.Runtime.AllowHostAccess() { - // temporarily for debugging - logTableNameAndType(ctx, r.C, connector, stagingTableName) - } + } else if !r.C.Runtime.AllowHostAccess() { + // temporarily for debugging + logTableNameAndType(ctx, r.C, connector, stagingTableName) } if ingestErr == nil && src.Spec.StageChanges {