forked from F33RNI/GPT-Telegramus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
UsersHandler.py
128 lines (108 loc) · 4.04 KB
/
UsersHandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Copyright (C) 2023 Fern Lane, GPT-Telegramus
Licensed under the GNU Affero General Public License, Version 3.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.gnu.org/licenses/agpl-3.0.en.html
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
"""
import logging
import multiprocessing
from typing import List, Dict
import JSONReaderWriter
DEFAULT_USER_NAME = "Noname"
def get_key_or_none(dictionary: dict, key, default_value=None):
"""
Safely gets value of key from dictionary
:param dictionary:
:param key:
:param default_value: default value if key not found
:return: key value or default_value if not found
"""
if key is None:
return default_value
if key in dictionary:
if dictionary[key] is None:
return default_value
else:
return dictionary[key]
return default_value
class UsersHandler:
def __init__(self, config: dict, messages: List[Dict]):
self.config = config
self.messages = messages
self.lock = multiprocessing.Lock()
def read_users(self) -> list:
"""
Reads users data from database
:return: users as list of dictionaries or [] if not found
"""
with self.lock:
users = JSONReaderWriter.load_json(self.config["files"]["users_database"])
if users is None:
return []
return users
def get_user_by_id(self, user_id: int) -> dict:
"""
Returns user (or create new one) as dictionary from database using user_id
:param user_id:
:return: dictionary
"""
users = self.read_users()
for user in users:
if user["user_id"] == user_id:
return user
# If we are here then user doesn't exist
return self._create_user(user_id)
def save_user(self, user_data: dict) -> None:
"""
Saves user_data to database
:param user_data:
:return:
"""
if user_data is None:
return
users = self.read_users()
with self.lock:
user_index = -1
for i in range(len(users)):
if users[i]["user_id"] == user_data["user_id"]:
user_index = i
break
# User exists
if user_index >= 0:
new_keys = user_data.keys()
for new_key in new_keys:
users[user_index][new_key] = user_data[new_key]
# New user
else:
users.append(user_data)
# Save to database
JSONReaderWriter.save_json(self.config["files"]["users_database"], users)
def _create_user(self, user_id: int) -> dict:
"""
Creates and saves new user
:return:
"""
logging.info("Creating new user with id: {0}".format(user_id))
user = {
"user_id": user_id,
"user_name": DEFAULT_USER_NAME,
"user_type": "",
"admin": True if user_id in self.config["telegram"]["admin_ids"] else False,
"banned": False if user_id in self.config["telegram"]["admin_ids"] else self.config["telegram"]["ban_by_default"],
"ban_reason": self.messages[0]["ban_reason_default"].replace("\\n", "\n"),
"module": self.config["modules"]["default_module"],
"requests_total": 0,
"reply_message_id_last": -1
}
self.save_user(user)
return user