diff --git a/lib/sanbase/clickhouse/metric/file_handler.ex b/lib/sanbase/clickhouse/metric/file_handler.ex index 5b6e3f63b..e9d3c13c2 100644 --- a/lib/sanbase/clickhouse/metric/file_handler.ex +++ b/lib/sanbase/clickhouse/metric/file_handler.ex @@ -148,27 +148,27 @@ defmodule Sanbase.Clickhouse.MetricAdapter.FileHandler do @external_resource path_to_deprecated.("deprecated_labeled_exchange_flow_metrics.json") @external_resource path_to_deprecated.("deprecated_social_metrics.json") - @metrics_json_pre_alias_expand Enum.reduce( - @external_resource, - [], - fn file, acc -> - try do - (File.read!(file) |> Jason.decode!()) ++ acc - rescue - e in [Jason.DecodeError] -> - IO.warn("Jason decoding error in file #{file}") - reraise e, __STACKTRACE__ - end - end - ) - - def pre_alias(), do: @metrics_json_pre_alias_expand + @raw_metrics_json Enum.reduce( + @external_resource, + [], + fn file, acc -> + try do + (File.read!(file) |> Jason.decode!()) ++ acc + rescue + e in [Jason.DecodeError] -> + IO.warn("Jason decoding error in file #{file}") + reraise e, __STACKTRACE__ + end + end + ) + + def pre_alias(), do: @raw_metrics_json # Allow the same metric to be defined more than once if it differs in the `data_type`. # Also allow the same metric to be used if different `fixed_parameters` are provided. # In this case the metric is exposed with some of the parameters (like labels) already fixed, # like: balance of funds, balance of whales, etc. Enum.group_by( - @metrics_json_pre_alias_expand, + @raw_metrics_json, fn metric -> {metric["metric"], metric["data_type"], metric["fixed_parameters"]} end ) |> Map.values() @@ -185,7 +185,7 @@ defmodule Sanbase.Clickhouse.MetricAdapter.FileHandler do end) @metrics_json_pre_expand_patterns Enum.flat_map( - @metrics_json_pre_alias_expand, + @raw_metrics_json, fn metric -> Map.get(metric, "aliases", []) |> Helper.resolve_metric_aliases(metric) @@ -314,6 +314,8 @@ defmodule Sanbase.Clickhouse.MetricAdapter.FileHandler do """) end + def metrics_json(), do: @metrics_json + def raw_metrics_json(), do: @raw_metrics_json def aggregations(), do: @aggregations def access_map(), do: @access_map |> transform() def table_map(), do: @table_map |> transform() diff --git a/lib/sanbase/metric/registry/populate.ex b/lib/sanbase/metric/registry/populate.ex new file mode 100644 index 000000000..8d5da91d6 --- /dev/null +++ b/lib/sanbase/metric/registry/populate.ex @@ -0,0 +1,51 @@ +defmodule Sanbase.Metric.Registry.Populate do + @moduledoc ~s""" + Migrate the info about Clickhouse metrics from local JSON files to DB + """ + def run() do + Sanbase.Repo.transaction(fn -> + populate() + end) + end + + def populate() do + Sanbase.Clickhouse.MetricAdapter.FileHandler.raw_metrics_json() + |> Enum.reduce_while([], fn map, acc -> + {:ok, captures} = Sanbase.TemplateEngine.Captures.extract_captures(map["metric"]) + is_template_metric = captures != [] + + changeset = + %Sanbase.Metric.Registry{} + |> Sanbase.Metric.Registry.changeset(%{ + metric: map["name"], + internal_metric: map["metric"], + human_readable_name: map["human_readable_name"], + aliases: Map.get(map, "aliases", []), + table: map["table"] |> List.wrap(), + aggregation: map["aggregation"], + min_interval: map["min_interval"], + is_template_metric: is_template_metric, + parameters: Map.get(map, "parameters", []), + is_deprecated: Map.get(map, "is_deprecated", false), + hard_deprecate_after: map["hard_deprecate_after"], + has_incomplete_data: Map.get(map, "has_incomplete_data", false), + data_type: map["data_type"], + docs_links: Map.get(map, "docs_links", []), + is_timebound: Map.get(map, "is_timebound", false) + }) + + case Sanbase.Repo.insert(changeset, on_conflict: :nothing) do + {:ok, result} -> {:cont, [result | acc]} + {:error, error} -> {:halt, {:error, error}} + end + end) + |> case do + list when is_list(list) -> + {:ok, list} + + {:error, error} -> + IO.puts("Error: #{Sanbase.Utils.ErrorHandling.changeset_errors_string(error)}") + {:error, error} + end + end +end diff --git a/lib/sanbase/metric/registry/registry.ex b/lib/sanbase/metric/registry/registry.ex new file mode 100644 index 000000000..19daa0cc0 --- /dev/null +++ b/lib/sanbase/metric/registry/registry.ex @@ -0,0 +1,83 @@ +defmodule Sanbase.Metric.Registry do + use Ecto.Schema + + import Ecto.Changeset + + alias __MODULE__.Validation + + @timestamps_opts [type: :utc_datetime] + schema "metric_registry" do + # How the metric is exposed to external users + field(:metric, :string) + field(:human_readable_name, :string) + field(:aliases, {:array, :string}, default: []) + + # What is the name of the metric in the DB and where to find it + field(:internal_metric, :string) + field(:table, {:array, :string}) + + field(:aggregation, :string) + field(:min_interval, :string) + + # If the metric is a template metric, then the parameters need to be used + # to define the full set of metrics + field(:is_template_metric, :boolean, default: false) + field(:parameters, {:array, :map}, default: []) + field(:fixed_parameters, :map, default: %{}) + + field(:is_timebound, :boolean, default: false) + field(:has_incomplete_data, :boolean, default: false) + + field(:is_exposed, :boolean, default: true) + field(:exposed_environments, :string, default: "all") + + field(:is_deprecated, :boolean, default: false) + field(:hard_deprecate_after, :utc_datetime, default: nil) + + field(:data_type, :string, default: "timeseries") + field(:docs_links, {:array, :string}, default: []) + + timestamps() + end + + def changeset(%__MODULE__{} = metric_description, attrs) do + metric_description + |> cast(attrs, [ + :metric, + :human_readable_name, + :aliases, + :internal_metric, + :table, + :aggregation, + :min_interval, + :is_template_metric, + :parameters, + :fixed_parameters, + :is_deprecated, + :hard_deprecate_after, + :is_timebound, + :has_incomplete_data, + :is_exposed, + :exposed_environments, + :data_type, + :docs_links + ]) + |> validate_required([ + :metric, + :human_readable_name, + :internal_metric, + :table, + :has_incomplete_data, + :aggregation, + :min_interval + ]) + |> validate_inclusion(:aggregation, ["sum", "last", "count", "avg", "max", "min", "first"]) + |> validate_inclusion(:data_type, ["timeseries", "histogram", "table"]) + |> validate_inclusion(:exposed_environments, ["all", "stage", "prod"]) + |> validate_change(:min_interval, &Validation.validate_min_interval/2) + |> Validation.validate_template_fields() + |> unique_constraint([:metric, :data_type, :fixed_parameters], + name: :metric_registry_composite_unique_index + ) + end +end diff --git a/lib/sanbase/metric/registry/validation.ex b/lib/sanbase/metric/registry/validation.ex new file mode 100644 index 000000000..a78788aa9 --- /dev/null +++ b/lib/sanbase/metric/registry/validation.ex @@ -0,0 +1,65 @@ +defmodule Sanbase.Metric.Registry.Validation do + import Ecto.Changeset + + def validate_min_interval(:min_interval, min_interval) do + if Sanbase.DateTimeUtils.valid_compound_duration?(min_interval) do + [] + else + [ + min_interval: + "The provided min_interval #{min_interval} is not a valid duration - a number followed by one of: s (second), m (minute), h (hour) or d (day)" + ] + end + end + + def validate_template_fields(%Ecto.Changeset{} = changeset) do + is_template_metric = get_field(changeset, :is_template_metric) + parameters = get_field(changeset, :parameters) + + cond do + is_template_metric and parameters == [] -> + add_error( + changeset, + :parameters, + "When the metric is labeled as template metric, parameters cannot be empty" + ) + + not is_template_metric and parameters != [] -> + add_error( + changeset, + :parameters, + "When the metric is not labeled as template metric, the parameters must be empty" + ) + + is_template_metric and parameters != [] -> + validate_parameters_match_captures(changeset) + + true -> + changeset + end + end + + def validate_parameters_match_captures(changeset) do + parameters = get_field(changeset, :parameters) + metric = get_field(changeset, :metric) + internal_metric = get_field(changeset, :internal_metric) + {:ok, captures1} = Sanbase.TemplateEngine.Captures.extract_captures(metric) + {:ok, captures2} = Sanbase.TemplateEngine.Captures.extract_captures(internal_metric) + captures = Enum.map(captures1 ++ captures2, & &1.inner_content) |> Enum.uniq() |> Enum.sort() + parameter_keys = Enum.flat_map(parameters, &Map.keys/1) |> Enum.uniq() |> Enum.sort() + + if captures == parameter_keys do + changeset + else + add_error( + changeset, + :parameters, + """ + The provided parameters do not match the captures in the metric #{metric}. + Captures: #{Enum.join(captures, ", ")} + Parameters: #{Enum.join(parameter_keys, ", ")} + """ + ) + end + end +end diff --git a/priv/repo/migrations/20241018115340_create_metric_registry.exs b/priv/repo/migrations/20241018115340_create_metric_registry.exs new file mode 100644 index 000000000..cb7c50b58 --- /dev/null +++ b/priv/repo/migrations/20241018115340_create_metric_registry.exs @@ -0,0 +1,101 @@ +defmodule Sanbase.Repo.Migrations.CreateMetricRegistry do + use Ecto.Migration + + # We have JSON records that define single metrics: + + # { + # "human_readable_name": "USD Price", + # "name": "price_usd_5m", + # "metric": "price_usd", + # "version": "2019-01-01", + # "access": "free", + # "selectors": [ + # "slug" + # ], + # "min_plan": { + # "SANAPI": "free", + # "SANBASE": "free" + # }, + # "aggregation": "last", + # "min_interval": "5m", + # "table": "intraday_metrics", + # "has_incomplete_data": false, + # "data_type": "timeseries", + # "docs_links": ["https://academy.santiment.net/metrics/price"] + # } + + # But we also have metrics that define multiple metrics using templates + + # { + # "human_readable_name": "Mean Realized USD Price for coins that moved in the past {{timebound:human_readable}}", + # "name": "mean_realized_price_usd_{{timebound}}", + # "metric": "mean_realized_price_usd_{{timebound}}", + # "parameters": [ + # { "timebound": "1d" }, + # { "timebound": "7d" }, + # { "timebound": "30d" }, + # { "timebound": "60d" }, + # { "timebound": "90d" }, + # { "timebound": "180d" }, + # { "timebound": "365d" }, + # { "timebound": "2y" }, + # { "timebound": "3y" }, + # { "timebound": "5y" }, + # { "timebound": "10y" } + # ], + # "is_timebound": true, + # "version": "2019-01-01", + # "access": "restricted", + # "selectors": [ "slug" ], + # "min_plan": { + # "SANAPI": "free", + # "SANBASE": "free" + # }, + # "aggregation": "avg", + # "min_interval": "1d", + # "table": "daily_metrics_v2", + # "has_incomplete_data": true, + # "data_type": "timeseries", + # "docs_links": ["https://academy.santiment.net/metrics/mean-realized-price"] + # } + def change do + create table(:metric_registry) do + add(:metric, :string, null: false) + add(:internal_metric, :string, null: false) + add(:human_readable_name, :string, null: false) + add(:aliases, {:array, :string}, null: false, default: []) + add(:table, {:array, :string}, null: false) + + add(:is_template_metric, :boolean, null: false, default: false) + add(:parameters, {:array, :jsonb}, null: false, default: []) + add(:fixed_parameters, :jsonb, null: "false", default: "{}") + + add(:is_timebound, :boolean, null: false, null: false) + add(:is_exposed, :boolean, null: false, default: true) + add(:exposed_environments, :string, null: false, default: "all") + + add(:version, :string) + + add(:selectors, {:array, :string}, null: false, default: []) + add(:required_selectors, {:array, :string}, null: false, default: []) + + add(:aggregation, :string, null: false) + add(:min_interval, :string, null: false) + add(:has_incomplete_data, :boolean, null: false) + + add(:data_type, :string, null: false, default: "timeseries") + add(:docs_links, {:array, :string}, null: false, default: []) + + add(:is_deprecated, :boolean, null: false, default: false) + add(:hard_deprecate_after, :utc_datetime, null: true, default: nil) + + timestamps(type: :timestamptz) + end + + create( + unique_index(:metric_registry, [:metric, :data_type, :fixed_parameters], + name: :metric_registry_composite_unique_index + ) + ) + end +end diff --git a/priv/repo/structure.sql b/priv/repo/structure.sql index 3231910e6..e734d1138 100644 --- a/priv/repo/structure.sql +++ b/priv/repo/structure.sql @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 15.1 (Homebrew) --- Dumped by pg_dump version 15.1 (Homebrew) +-- Dumped from database version 14.12 (Homebrew) +-- Dumped by pg_dump version 14.12 (Homebrew) SET statement_timeout = 0; SET lock_timeout = 0; @@ -2227,6 +2227,57 @@ CREATE SEQUENCE public.menus_id_seq ALTER SEQUENCE public.menus_id_seq OWNED BY public.menus.id; +-- +-- Name: metric_registry; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.metric_registry ( + id bigint NOT NULL, + metric character varying(255) NOT NULL, + internal_metric character varying(255) NOT NULL, + human_readable_name character varying(255) NOT NULL, + aliases character varying(255)[] DEFAULT ARRAY[]::character varying[] NOT NULL, + "table" character varying(255)[] NOT NULL, + is_template_metric boolean DEFAULT false NOT NULL, + parameters jsonb[] DEFAULT ARRAY[]::jsonb[] NOT NULL, + fixed_parameters jsonb DEFAULT '{}'::jsonb, + is_timebound boolean NOT NULL, + is_exposed boolean DEFAULT true NOT NULL, + exposed_environments character varying(255) DEFAULT 'all'::character varying NOT NULL, + version character varying(255), + selectors character varying(255)[] DEFAULT ARRAY[]::character varying[] NOT NULL, + required_selectors character varying(255)[] DEFAULT ARRAY[]::character varying[] NOT NULL, + aggregation character varying(255) NOT NULL, + min_interval character varying(255) NOT NULL, + has_incomplete_data boolean NOT NULL, + data_type character varying(255) DEFAULT 'timeseries'::character varying NOT NULL, + docs_links character varying(255)[] DEFAULT ARRAY[]::character varying[] NOT NULL, + is_deprecated boolean DEFAULT false NOT NULL, + hard_deprecate_after timestamp(0) without time zone DEFAULT NULL::timestamp without time zone, + inserted_at timestamp with time zone NOT NULL, + updated_at timestamp with time zone NOT NULL +); + + +-- +-- Name: metric_registry_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.metric_registry_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: metric_registry_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.metric_registry_id_seq OWNED BY public.metric_registry.id; + + -- -- Name: metrics; Type: TABLE; Schema: public; Owner: - -- @@ -4998,6 +5049,13 @@ ALTER TABLE ONLY public.menu_items ALTER COLUMN id SET DEFAULT nextval('public.m ALTER TABLE ONLY public.menus ALTER COLUMN id SET DEFAULT nextval('public.menus_id_seq'::regclass); +-- +-- Name: metric_registry id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.metric_registry ALTER COLUMN id SET DEFAULT nextval('public.metric_registry_id_seq'::regclass); + + -- -- Name: metrics id; Type: DEFAULT; Schema: public; Owner: - -- @@ -5874,6 +5932,14 @@ ALTER TABLE ONLY public.menus ADD CONSTRAINT menus_pkey PRIMARY KEY (id); +-- +-- Name: metric_registry metric_registry_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.metric_registry + ADD CONSTRAINT metric_registry_pkey PRIMARY KEY (id); + + -- -- Name: metrics metrics_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -6932,6 +6998,13 @@ CREATE UNIQUE INDEX market_segments_name_index ON public.market_segments USING b CREATE INDEX menus_user_id_index ON public.menus USING btree (user_id); +-- +-- Name: metric_registry_composite_unique_index; Type: INDEX; Schema: public; Owner: - +-- + +CREATE UNIQUE INDEX metric_registry_composite_unique_index ON public.metric_registry USING btree (metric, data_type, fixed_parameters); + + -- -- Name: metrics_name_index; Type: INDEX; Schema: public; Owner: - -- @@ -9295,4 +9368,6 @@ INSERT INTO public."schema_migrations" (version) VALUES (20240809122904); INSERT INTO public."schema_migrations" (version) VALUES (20240904135651); INSERT INTO public."schema_migrations" (version) VALUES (20240926130910); INSERT INTO public."schema_migrations" (version) VALUES (20240926135951); +INSERT INTO public."schema_migrations" (version) VALUES (20241014115340); INSERT INTO public."schema_migrations" (version) VALUES (20241017092520); +INSERT INTO public."schema_migrations" (version) VALUES (20241018115340);