Skip to content

Commit

Permalink
Fix: Mypy error
Browse files Browse the repository at this point in the history
  • Loading branch information
1yam committed Aug 22, 2023
1 parent b79b68c commit 5e21735
Showing 1 changed file with 34 additions and 31 deletions.
65 changes: 34 additions & 31 deletions src/aleph/web/controllers/storage.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,36 @@
import asyncio
import base64
import datetime as dt
import functools
import logging
from hashlib import sha256
from io import StringIO
from typing import Union, Tuple, Dict, Optional
from typing import Union, Tuple

import aio_pika
from eth_account import Account
from eth_account.messages import encode_defunct

from aleph.chains.common import get_verification_buffer
from aleph.jobs.process_pending_messages import PendingMessageProcessor

from aiohttp import web
from aiohttp.web_request import FileField
from aleph_message.models import ItemType
from multidict import MultiDictProxy

from aleph.chains.chain_service import ChainService, LOGGER
from aleph.chains.nuls import NulsConnector
from aleph.db.accessors.balances import get_total_balance
from aleph.db.accessors.files import count_file_pins, get_file
from aleph.db.accessors.messages import get_message_status, message_exists
from aleph.db.connection import make_session_factory
from aleph.db.models import PendingMessageDb
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.services.p2p import init_p2p_client
from aleph.services.storage.engine import StorageEngine
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
from aleph.storage import StorageService
from aleph.toolkit import json
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.db_session import DbSessionFactory, DbSession
from aleph.types.db_session import DbSession
from aleph.utils import run_in_executor, item_type_from_hash
from aleph.web.controllers.app_state_getters import (
get_session_factory_from_request,
get_storage_service_from_request, get_mq_channel_from_request, get_config_from_request, get_mq_conn_from_request,
get_storage_service_from_request,
get_config_from_request,
)
from aleph.web.controllers.utils import multidict_proxy_to_io
from aleph.schemas.pending_messages import BasePendingMessage

logger = logging.getLogger(__name__)
from aleph.schemas.pending_messages import parse_message

MAX_FILE_SIZE = 100 * 1024 * 1024

Expand Down Expand Up @@ -111,34 +98,47 @@ async def verify_signature(message: BasePendingMessage) -> bool:
return verified


async def get_message_content(post_data: MultiDictProxy[Union[str, bytes, FileField]]) -> Tuple[dict, int]:
async def get_message_content(
post_data: MultiDictProxy[Union[str, bytes, FileField]]
) -> Tuple[dict, int]:
message_bytearray = post_data.get("message", b"")
value = post_data.get("size") or 0
if not message_bytearray:
return {}, int(value) # Empty dictionary if no message content

message_string = message_bytearray.decode("utf-8")
message_dict = json.loads(message_string)
message_dict["time"] = float(message_dict["time"])
if isinstance(message_bytearray, bytearray):
message_string = message_bytearray.decode("utf-8")
message_dict = json.loads(message_string)
message_dict["time"] = float(message_dict["time"])
else:
message_dict = {}

return message_dict, int(value)
return message_dict, int(str(value))


async def init_mq_con(config):
return await aio_pika.connect_robust(
host=config.p2p.mq_host.value, port=config.rabbitmq.port.value, login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value
host=config.p2p.mq_host.value,
port=config.rabbitmq.port.value,
login=config.rabbitmq.username.value,
password=config.rabbitmq.password.value,
)


async def verify_and_handle_request(pending_message_db, file_io, message, size):
async def verify_and_handle_request(
pending_message_db, file_io, message, size, session_factory
):
content = file_io.read(size)
item_content = json.loads(message["item_content"])
actual_item_hash = sha256(content).hexdigest()
c_item_hash = item_content["item_hash"]

is_signature = await verify_signature(message=pending_message_db)

with session_factory() as session:
current_balance = get_total_balance(
session=session, address=pending_message_db.sender
)
if current_balance < len(content):
output = {"status": "Payment Required"}
return web.json_response(output, status=402)
if not is_signature:
output = {"status": "Forbidden"}
return web.json_response(output, status=403)
Expand All @@ -161,9 +161,12 @@ async def storage_add_file_with_message(request: web.Request):
post = await request.post()
file_io = multidict_proxy_to_io(post)
message, size = await get_message_content(post)
pending_message_db = PendingMessageDb.from_message_dict(message_dict=message, reception_time=dt.datetime.now(),
fetched=True)
is_valid_message = await verify_and_handle_request(pending_message_db, file_io, message, size)
pending_message_db = PendingMessageDb.from_message_dict(
message_dict=message, reception_time=dt.datetime.now(), fetched=True
)
is_valid_message = await verify_and_handle_request(
pending_message_db, file_io, message, size, session_factory
)
if is_valid_message is not None:
return is_valid_message

Expand Down

0 comments on commit 5e21735

Please sign in to comment.