Skip to content

Commit

Permalink
Rework price scraping for TOTAL_MARKET
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanIvanoff committed Aug 21, 2023
1 parent 18420b8 commit b430cdb
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 69 deletions.
35 changes: 10 additions & 25 deletions lib/sanbase/external_services/coinmarketcap/coinmarketcap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -270,22 +270,11 @@ defmodule Sanbase.ExternalServices.Coinmarketcap do
Registry.register(Sanbase.Registry, key, :running)
Logger.info("Fetch and process prices for #{project.slug}")

case last_price_datetime(project) do
{:ok, datetime} ->
Logger.info(
"[CMC] Latest price datetime for #{Project.describe(project)} - #{datetime}"
)

WebApi.fetch_and_store_prices(project, datetime)

Registry.unregister(Sanbase.Registry, key)
:ok

_ ->
err_msg = "[CMC] Cannot fetch the last price datetime for project #{project.slug}"

Logger.warning(err_msg)
{:error, err_msg}
with {:ok, datetime} <- last_price_datetime(project),
:ok <- WebApi.fetch_and_store_prices(project, datetime) do
:ok = Registry.unregister(Sanbase.Registry, key)
else
error -> error
end
else
Logger.info(
Expand Down Expand Up @@ -327,15 +316,11 @@ defmodule Sanbase.ExternalServices.Coinmarketcap do
end

defp fetch_total_market_data() do
case last_price_datetime("TOTAL_MARKET") do
{:ok, %DateTime{} = datetime} ->
WebApi.fetch_and_store_prices("TOTAL_MARKET", datetime)
:ok

_ ->
err_msg = "[CMC] Cannot fetch the last price datetime for TOTAL_MARKET"
Logger.warning(err_msg)
{:error, err_msg}
with {:ok, %DateTime{} = datetime} <- last_price_datetime("TOTAL_MARKET"),
:ok <- WebApi.fetch_and_store_prices("TOTAL_MARKET", datetime) do
:ok
else
error -> error
end
end

Expand Down
58 changes: 46 additions & 12 deletions lib/sanbase/external_services/coinmarketcap/web_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do
# in case the next fetch succeeds and we store a later progress datetime
price_stream(coinmarketcap_integer_id, last_fetched_datetime, DateTime.utc_now())
|> Stream.take(10)
|> Enum.reduce_while(%{}, fn
|> Enum.reduce_while(:ok, fn
{:ok, result, interval}, acc ->
store_price_points(project, result, interval)
{:cont, acc}

_, acc ->
{:halt, acc}
error, _acc ->
{:halt, {:error, "Error in fetch_and_store_prices/2 for project: #{inspect(error)}"}}
end)
end
end
Expand All @@ -110,13 +110,13 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do
# in case the next fetch succeeds and we store a later progress datetime
price_stream("TOTAL_MARKET", last_fetched_datetime, DateTime.utc_now())
|> Stream.take(10)
|> Enum.reduce_while(%{}, fn
|> Enum.reduce_while(:ok, fn
{:ok, result, interval}, acc ->
store_price_points("TOTAL_MARKET", result, interval)
{:cont, acc}

_, acc ->
{:halt, acc}
error, _acc ->
{:halt, {:error, "Error in fetch_and_store_prices/2 for TOTAL_MARKET: #{inspect(error)}"}}
end)
end

Expand Down Expand Up @@ -170,11 +170,19 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do
defp json_to_price_points(json, "TOTAL_MARKET", interval) do
with {:ok, decoded} <- Jason.decode(json),
%{
"data" => data,
"status" => %{"error_code" => 0, "error_message" => nil}
"data" => %{"quotes" => quotes},
"status" => %{"error_code" => "0"}
} <- decoded do
result =
Enum.map(data, fn {datetime_iso8601, [marketcap_usd, volume_usd]} ->
Enum.map(quotes, fn %{
"quote" => [
%{
"timestamp" => datetime_iso8601,
"totalMarketCap" => marketcap_usd,
"totalVolume24H" => volume_usd
}
]
} ->
%PricePoint{
marketcap_usd: marketcap_usd |> Sanbase.Math.to_integer(),
volume_usd: volume_usd |> Sanbase.Math.to_integer(),
Expand Down Expand Up @@ -217,12 +225,38 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do
end
end

defp extract_price_points_for_interval(
"TOTAL_MARKET" = total_market,
{from_unix, to_unix} = interval
) do
"https://api.coinmarketcap.com/data-api/v3/global-metrics/quotes/historical?format=chart&interval=5m&timeEnd=#{to_unix}&timeStart=#{from_unix}"
|> get()
|> case do
{:ok, %Tesla.Env{status: 429} = resp} ->
wait_rate_limit(resp)
extract_price_points_for_interval(total_market, interval)

{:ok, %Tesla.Env{status: 200, body: body}} ->
json_to_price_points(body, total_market, interval)

{:ok, %Tesla.Env{status: status}} ->
error_msg = "[CMC] Error fetching data for #{total_market}. Status code: #{status}"
Logger.error(error_msg)
{:error, error_msg}

{:error, error} ->
error_msg = "[CMC] Error fetching data for #{total_market}. Reason: #{inspect(error)}"
Logger.error(error_msg)
{:error, error_msg}
end
end

defp extract_price_points_for_interval(
"TOTAL_MARKET" = total_market,
{from_unix, to_unix} = interval
) do
Logger.info("""
[CMC] Extracting price points for TOTAL_MARKET and interval [#{DateTime.from_unix!(from_unix)} - #{DateTime.from_unix!(to_unix)}]
[CMC] Extracting price points for #{total_market} and interval [#{DateTime.from_unix!(from_unix)} - #{DateTime.from_unix!(to_unix)}]
""")

"/v1.1/global-metrics/quotes/historical?format=chart&interval=5m&time_start=#{from_unix}&time_end=#{to_unix}"
Expand All @@ -236,12 +270,12 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApi do
json_to_price_points(body, total_market, interval)

{:ok, %Tesla.Env{status: status}} ->
error_msg = "[CMC] Error fetching data for TOTAL_MARKET. Status code: #{status}"
error_msg = "[CMC] Error fetching data for #{total_market}. Status code: #{status}"
Logger.error(error_msg)
{:error, error_msg}

{:error, error} ->
error_msg = "[CMC] Error fetching data for TOTAL_MARKET. Reason: #{inspect(error)}"
error_msg = "[CMC] Error fetching data for #{total_market}. Reason: #{inspect(error)}"
Logger.error(error_msg)
{:error, error_msg}
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sanbase/external_services/rate_limiting/middleware.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Sanbase.ExternalServices.RateLimiting.Middleware do
alias Sanbase.ExternalServices.RateLimiting.Server

def call(env, next, options) do
Server.wait(Keyword.get(options, :name))
# Server.wait(Keyword.get(options, :name))

Tesla.run(env, next)
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,63 @@
{
"status": {
"timestamp": "2019-12-20T09:29:01.436Z",
"error_code": 0,
"error_message": null,
"elapsed": 9,
"credit_count": 1,
"notice": null
},
"data": {
"2018-01-02T00:00:00.000Z": [
615633059840,
27340439552
],
"2018-01-03T00:00:00.000Z": [
673426702336,
43249201152
],
"2018-01-04T00:00:00.000Z": [
739147579392,
52711313408
],
"2018-01-05T00:00:00.000Z": [
755427835904,
67713847296
"quotes": [
{
"timestamp": "2023-08-21T12:35:00.000Z",
"searchInterval": "5",
"btcDominance": 48.0836,
"ethDominance": 19.0837,
"activeCryptocurrencies": 9553,
"activeExchanges": 663,
"activeMarketPairs": 63424,
"quote": [
{
"name": "USD",
"timestamp": "2023-08-21T12:35:00.000Z",
"totalMarketCap": 1053345319615.14,
"totalVolume24H": 24975771712.74,
"totalVolume24HReported": 88842480163.87,
"altcoinVolume24H": 14141463173.83,
"altcoinVolume24HReported": 53743544558.18,
"altcoinMarketCap": 546859232684.86,
"originId": "1253416"
}
],
"totalCryptocurrencies": 26575,
"totalExchanges": 6585,
"score": 1692621300000
},
{
"timestamp": "2023-08-21T12:40:00.000Z",
"searchInterval": "5",
"btcDominance": 48.0949,
"ethDominance": 19.088,
"activeCryptocurrencies": 9553,
"activeExchanges": 663,
"activeMarketPairs": 63424,
"quote": [
{
"name": "USD",
"timestamp": "2023-08-21T12:40:00.000Z",
"totalMarketCap": 1053950205502.36,
"totalVolume24H": 24998401100.74,
"totalVolume24HReported": 89033858863.01,
"altcoinVolume24H": 14156727577.48,
"altcoinVolume24HReported": 53805498718.86,
"altcoinMarketCap": 547053824537.45,
"originId": "1253421"
}
],
"totalCryptocurrencies": 26575,
"totalExchanges": 6587,
"score": 1692621600000
}
]
},
"status": {
"timestamp": "2023-08-21T12:54:32.540Z",
"error_code": "0",
"error_message": "SUCCESS",
"elapsed": "30",
"credit_count": 0
}
}
}
14 changes: 7 additions & 7 deletions test/sanbase/external_services/coinmarketcap/web_api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApiTest do
}
end)

WebApi.fetch_and_store_prices(context.project, ~U[2023-07-19 00:00:00Z])
:ok = WebApi.fetch_and_store_prices(context.project, ~U[2023-07-19 00:00:00Z])
prices = Sanbase.InMemoryKafka.Producer.get_state() |> Map.get("asset_prices")

filtered_record =
Expand All @@ -56,7 +56,7 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApiTest do

from_datetime = ~U[2023-07-19 00:00:00Z]

WebApi.fetch_and_store_prices(context.project, from_datetime)
:ok = WebApi.fetch_and_store_prices(context.project, from_datetime)
state = Sanbase.InMemoryKafka.Producer.get_state()
prices = state["asset_prices"]
assert length(prices) > 0
Expand Down Expand Up @@ -100,16 +100,16 @@ defmodule Sanbase.ExternalServices.Coinmarketcap.WebApiTest do
}
end)

WebApi.fetch_and_store_prices("TOTAL_MARKET", ~U[2018-01-01 00:00:00Z])
:ok = WebApi.fetch_and_store_prices("TOTAL_MARKET", ~U[2018-01-01 00:00:00Z])

state = Sanbase.InMemoryKafka.Producer.get_state()
prices = state["asset_prices"]
assert length(prices) > 0

record =
{"coinmarketcap_TOTAL_MARKET_2018-01-03T00:00:00.000Z",
"{\"marketcap_usd\":673426702336,\"price_btc\":null,\"price_usd\":null,\"slug\":\"TOTAL_MARKET\",\"source\":\"coinmarketcap\",\"timestamp\":1514937600,\"volume_usd\":43249201152}"}
assert {"coinmarketcap_TOTAL_MARKET_2023-08-21T12:35:00.000Z",
"{\"marketcap_usd\":1053345319615,\"price_btc\":null,\"price_usd\":null,\"slug\":\"TOTAL_MARKET\",\"source\":\"coinmarketcap\",\"timestamp\":1692621300,\"volume_usd\":24975771713}"} in prices

assert record in prices
assert {"coinmarketcap_TOTAL_MARKET_2023-08-21T12:35:00.000Z",
"{\"marketcap_usd\":1053345319615,\"price_btc\":null,\"price_usd\":null,\"slug\":\"TOTAL_MARKET\",\"source\":\"coinmarketcap\",\"timestamp\":1692621300,\"volume_usd\":24975771713}"} in prices
end
end

0 comments on commit b430cdb

Please sign in to comment.