Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use password authentication with RabbitMQ #223

Merged
merged 7 commits into from
Mar 28, 2024
Merged

Use password authentication with RabbitMQ #223

merged 7 commits into from
Mar 28, 2024

Conversation

sajith
Copy link
Member

@sajith sajith commented Feb 7, 2024

Resolves #197.

@coveralls
Copy link

coveralls commented Feb 7, 2024

Pull Request Test Coverage Report for Build 8468322182

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 9 of 9 (100.0%) changed or added relevant lines in 2 files are covered.
  • 52 unchanged lines in 6 files lost coverage.
  • Overall coverage increased (+12.8%) to 50.923%

Files with Coverage Reduction New Missed Lines %
sdx_controller/handlers/lc_message_handler.py 2 22.22%
sdx_controller/init.py 2 88.89%
sdx_controller/controllers/topology_controller.py 5 68.42%
sdx_controller/messaging/rpc_queue_consumer.py 12 50.94%
sdx_controller/models/location.py 14 74.58%
sdx_controller/handlers/connection_handler.py 17 61.36%
Totals Coverage Status
Change from base Build 7821205707: 12.8%
Covered Lines: 705
Relevant Lines: 1415

💛 - Coveralls

Copy link
Contributor

@italovalcy italovalcy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @sajith excellent to see you already have submitted a code to handle RabbitMQ auth, which is an important step on the environment setup (since Luis setup is also using authentication). The code seems clear and neat. Very nice!

I just have one doubt. I do see pika.ConnectionParameters being used on other places that you didn't cover on this PR, as well as RabbitMqServerConfigure which references MQ_HOST variable.

can you please take a look on the following occurrences:

./sdx_controller/messaging/topic_queue_consumer.py:MQ_HOST = os.environ.get("MQ_HOST")
./sdx_controller/messaging/topic_queue_consumer.py:            pika.ConnectionParameters(host=MQ_HOST)
./sdx_controller/messaging/topic_queue_consumer.py:    #         pika.ConnectionParameters(host=MQ_HOST)
./sdx_controller/messaging/message_queue_consumer.py:MQ_HOST = os.environ.get("MQ_HOST")
./sdx_controller/messaging/message_queue_consumer.py:    def __init__(self, host=MQ_HOST, queue="hello"):
./sdx_controller/messaging/message_queue_consumer.py:    serverconfigure = RabbitMqServerConfigure(host=MQ_HOST, queue="hello")
./sdx_controller/messaging/rpc_queue_producer.py:MQ_HOST = os.environ.get("MQ_HOST")
./sdx_controller/messaging/rpc_queue_producer.py:            pika.ConnectionParameters(host=MQ_HOST)
./sdx_controller/messaging/topic_queue_producer.py:MQ_HOST = os.environ.get("MQ_HOST")
./sdx_controller/messaging/topic_queue_producer.py:            pika.ConnectionParameters(host=MQ_HOST)
./sdx_controller/messaging/topic_queue_producer.py:        #     self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=MQ_HOST))

@italovalcy
Copy link
Contributor

italovalcy commented Feb 9, 2024

Hi @sajith excellent to see you already have submitted a code to handle RabbitMQ auth, which is an important step on the environment setup (since Luis setup is also using authentication). The code seems clear and neat. Very nice!

I just have one doubt. I do see pika.ConnectionParameters being used on other places that you didn't cover on this PR, as well as RabbitMqServerConfigure which references MQ_HOST variable.

can you please take a look on the following occurrences:

./sdx_controller/messaging/topic_queue_consumer.py:MQ_HOST = os.environ.get("MQ_HOST")
./sdx_controller/messaging/topic_queue_consumer.py:            pika.ConnectionParameters(host=MQ_HOST)
./sdx_controller/messaging/topic_queue_consumer.py:    #         pika.ConnectionParameters(host=MQ_HOST)
./sdx_controller/messaging/message_queue_consumer.py:MQ_HOST = os.environ.get("MQ_HOST")
./sdx_controller/messaging/message_queue_consumer.py:    def __init__(self, host=MQ_HOST, queue="hello"):
./sdx_controller/messaging/message_queue_consumer.py:    serverconfigure = RabbitMqServerConfigure(host=MQ_HOST, queue="hello")
./sdx_controller/messaging/rpc_queue_producer.py:MQ_HOST = os.environ.get("MQ_HOST")
./sdx_controller/messaging/rpc_queue_producer.py:            pika.ConnectionParameters(host=MQ_HOST)
./sdx_controller/messaging/topic_queue_producer.py:MQ_HOST = os.environ.get("MQ_HOST")
./sdx_controller/messaging/topic_queue_producer.py:            pika.ConnectionParameters(host=MQ_HOST)
./sdx_controller/messaging/topic_queue_producer.py:        #     self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=MQ_HOST))

@sajith if you allow my contribution here, below is a augmented version of the changes you made to also include the files above (I tried to follow the same approach/code-style as you):

diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 12847e8..9d6163a 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -79,6 +79,9 @@ jobs:
           SDX_VERSION: '1.0.0'
           SDX_NAME: 'sdx-controller-test'
           MQ_HOST: 'localhost'
+          MQ_PORT: '5672'
+          MQ_USER: 'guest'
+          MQ_PASS: 'guest'
           SUB_QUEUE: 'sdx-controller-test-queue'
           DB_NAME: 'sdx-controllder-test-db'
           DB_CONFIG_TABLE_NAME: 'sdx-controller-test-table'
diff --git a/env.template b/env.template
index e45fefe..f70209d 100644
--- a/env.template
+++ b/env.template
@@ -7,6 +7,9 @@ export SDX_NAME="sdx-controller-test"

 # Message queue settings for SDX Controller.
 export MQ_HOST="aw-sdx-monitor.renci.org"
+export MQ_PORT=5672
+export MQ_USER="guest"
+export MQ_PASS="guest"
 export SUB_QUEUE="topo"

 # MongoDB settings for SDX Controller.
diff --git a/sdx_controller/messaging/message_queue_consumer.py b/sdx_controller/messaging/message_queue_consumer.py
index af4f71b..6485d96 100644
--- a/sdx_controller/messaging/message_queue_consumer.py
+++ b/sdx_controller/messaging/message_queue_consumer.py
@@ -3,7 +3,10 @@ import os

 import pika

-MQ_HOST = os.environ.get("MQ_HOST")
+MQ_HOST = os.getenv("MQ_HOST")
+MQ_PORT = os.getenv("MQ_PORT") or 5672
+MQ_USER = os.getenv("MQ_USER") or "guest"
+MQ_PASS = os.getenv("MQ_PASS") or "guest"


 class MessageQueue:
@@ -21,8 +24,18 @@ class MetaClass(type):


 class RabbitMqServerConfigure(metaclass=MetaClass):
-    def __init__(self, host=MQ_HOST, queue="hello"):
+    def __init__(
+        self,
+        host=MQ_HOST,
+        port=MQ_PORT,
+        username=MQ_USER,
+        password=MQ_PASS,
+        queue="hello",
+    ):
         self.host = host
+        self.port = port
+        self.username = username
+        self.password = password
         self.queue = queue


@@ -30,7 +43,14 @@ class rabbitmqServer:
     def __init__(self, server):
         self.server = server
         self._connection = pika.BlockingConnection(
-            pika.ConnectionParameters(host=self.server.host)
+            pika.ConnectionParameters(
+                host=self.server.host,
+                port=self.server.port,
+                credentials=pika.PlainCredentials(
+                    username=self.server.username,
+                    password=self.server.password,
+                ),
+            )
         )
         self._channel = self._connection.channel()
         self._tem = self._channel.queue_declare(queue=self.server.queue)
@@ -54,7 +74,13 @@ class rabbitmqServer:


 if __name__ == "__main__":
-    serverconfigure = RabbitMqServerConfigure(host=MQ_HOST, queue="hello")
+    serverconfigure = RabbitMqServerConfigure(
+        host=MQ_HOST,
+        port=MQ_PORT,
+        username=MQ_USER,
+        password=MQ_PASS,
+        queue="hello",
+    )

     server = rabbitmqServer(server=serverconfigure)
     server.startserver()
diff --git a/sdx_controller/messaging/rpc_queue_consumer.py b/sdx_controller/messaging/rpc_queue_consumer.py
index dcaf849..7dfb4b6 100644
--- a/sdx_controller/messaging/rpc_queue_consumer.py
+++ b/sdx_controller/messaging/rpc_queue_consumer.py
@@ -10,9 +10,13 @@ import pika
 from sdx_controller.handlers.lc_message_handler import LcMessageHandler
 from sdx_controller.utils.parse_helper import ParseHelper

-MQ_HOST = os.environ.get("MQ_HOST")
+MQ_HOST = os.getenv("MQ_HOST")
+MQ_PORT = os.getenv("MQ_PORT") or 5672
+MQ_USER = os.getenv("MQ_USER") or "guest"
+MQ_PASS = os.getenv("MQ_PASS") or "guest"
+
 # subscribe to the corresponding queue
-SUB_QUEUE = os.environ.get("SUB_QUEUE")
+SUB_QUEUE = os.getenv("SUB_QUEUE")

 logger = logging.getLogger(__name__)

@@ -20,8 +24,13 @@ logger = logging.getLogger(__name__)
 class RpcConsumer(object):
     def __init__(self, thread_queue, exchange_name, topology_manager):
         self.logger = logging.getLogger(__name__)
+
         self.connection = pika.BlockingConnection(
-            pika.ConnectionParameters(host=MQ_HOST)
+            pika.ConnectionParameters(
+                host=MQ_HOST,
+                port=MQ_PORT,
+                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
+            )
         )

         self.channel = self.connection.channel()
@@ -39,7 +48,11 @@ class RpcConsumer(object):
         self._thread_queue.put(message_body)

         self.connection = pika.BlockingConnection(
-            pika.ConnectionParameters(host=MQ_HOST)
+            pika.ConnectionParameters(
+                host=MQ_HOST,
+                port=MQ_PORT,
+                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
+            )
         )
         self.channel = self.connection.channel()

diff --git a/sdx_controller/messaging/rpc_queue_producer.py b/sdx_controller/messaging/rpc_queue_producer.py
index 357ffbe..dce3900 100644
--- a/sdx_controller/messaging/rpc_queue_producer.py
+++ b/sdx_controller/messaging/rpc_queue_producer.py
@@ -7,14 +7,21 @@ import uuid

 import pika

-MQ_HOST = os.environ.get("MQ_HOST")
+MQ_HOST = os.getenv("MQ_HOST")
+MQ_PORT = os.getenv("MQ_PORT") or 5672
+MQ_USER = os.getenv("MQ_USER") or "guest"
+MQ_PASS = os.getenv("MQ_PASS") or "guest"


 class RpcProducer(object):
     def __init__(self, timeout, exchange_name, routing_key):
         self.logger = logging.getLogger(__name__)
         self.connection = pika.BlockingConnection(
-            pika.ConnectionParameters(host=MQ_HOST)
+            pika.ConnectionParameters(
+                host=MQ_HOST,
+                port=MQ_PORT,
+                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
+            )
         )

         self.channel = self.connection.channel()
diff --git a/sdx_controller/messaging/topic_queue_consumer.py b/sdx_controller/messaging/topic_queue_consumer.py
index e1b4ccf..926d052 100644
--- a/sdx_controller/messaging/topic_queue_consumer.py
+++ b/sdx_controller/messaging/topic_queue_consumer.py
@@ -6,18 +6,25 @@ from queue import Queue

 import pika

-MQ_HOST = os.environ.get("MQ_HOST")
+MQ_HOST = os.getenv("MQ_HOST")
+MQ_PORT = os.getenv("MQ_PORT") or 5672
+MQ_USER = os.getenv("MQ_USER") or "guest"
+MQ_PASS = os.getenv("MQ_PASS") or "guest"
 # subscribe to the corresponding queue
-SUB_QUEUE = os.environ.get("SUB_QUEUE")
-SUB_TOPIC = os.environ.get("SUB_TOPIC")
-SUB_EXCHANGE = os.environ.get("SUB_EXCHANGE")
+SUB_QUEUE = os.getenv("SUB_QUEUE")
+SUB_TOPIC = os.getenv("SUB_TOPIC")
+SUB_EXCHANGE = os.getenv("SUB_EXCHANGE")


 class TopicQueueConsumer(object):
     def __init__(self, thread_queue, exchange_name):
         self.logger = logging.getLogger(__name__)
         self.connection = pika.BlockingConnection(
-            pika.ConnectionParameters(host=MQ_HOST)
+            pika.ConnectionParameters(
+                host=MQ_HOST,
+                port=MQ_PORT,
+                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
+            )
         )

         self.channel = self.connection.channel()
diff --git a/sdx_controller/messaging/topic_queue_producer.py b/sdx_controller/messaging/topic_queue_producer.py
index 3d10575..4c9710b 100644
--- a/sdx_controller/messaging/topic_queue_producer.py
+++ b/sdx_controller/messaging/topic_queue_producer.py
@@ -6,7 +6,10 @@ import uuid

 import pika

-MQ_HOST = os.environ.get("MQ_HOST")
+MQ_HOST = os.getenv("MQ_HOST")
+MQ_PORT = os.getenv("MQ_PORT") or 5672
+MQ_USER = os.getenv("MQ_USER") or "guest"
+MQ_PASS = os.getenv("MQ_PASS") or "guest"


 class TopicQueueProducer(object):
@@ -15,7 +18,11 @@ class TopicQueueProducer(object):
     def __init__(self, timeout, exchange_name, routing_key):
         self.logger = logging.getLogger(__name__)
         self.connection = pika.BlockingConnection(
-            pika.ConnectionParameters(host=MQ_HOST)
+            pika.ConnectionParameters(
+                host=MQ_HOST,
+                port=MQ_PORT,
+                credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
+            )
         )

         self.channel = self.connection.channel()

@lmarinve
Copy link
Collaborator

lmarinve commented Feb 9, 2024

Thank you for the update, @sajith! I will now test using the main branch with this change.

@lmarinve lmarinve closed this Feb 9, 2024
@lmarinve lmarinve reopened this Feb 9, 2024
@sajith
Copy link
Member Author

sajith commented Mar 4, 2024

@italovalcy Good catch Italo! Thank you. And sorry about the latency -- I've been away. As you've noticed, in this PR, I only addressed the subscribe part, and I missed the publish part.

There are two places where SDX Controller uses the message queue. This is where it listens for incoming messages:

app.rpc_consumer = RpcConsumer(thread_queue, "", topology_manager)
rpc_thread = threading.Thread(
target=app.rpc_consumer.start_sdx_consumer,
kwargs={"thread_queue": thread_queue, "db_instance": app.db_instance},
daemon=True,
)
rpc_thread.start()

And this is where it publishes:

producer = TopicQueueProducer(
timeout=5, exchange_name=exchange_name, routing_key=domain_name
)
producer.call(json.dumps(link))

That is it, as far as I know. There are several classes under sdx_controller/messaging, but I am not very familiar with them. It probably would be useful to simplify the code under that directory and remove the unused code (or make it a library that both sdx-controller and sdx-lc can use), but I am not sure if/when to do that or if it is worth the effort. I believe @congwang09 might have opinions on it.

Your contributions are totally welcome, and your approach is certainly better. Would you be willing to start a PR?

@lmarinve
Copy link
Collaborator

lmarinve commented Mar 4, 2024

Good afternoon, I suggested isolating the RabbitMQ process from other submodules as a microservice. like here: atlanticwave-sdx/sdx-continuous-development@e7cfa41

@italovalcy
Copy link
Contributor

@italovalcy Good catch Italo! Thank you. And sorry about the latency -- I've been away. As you've noticed, in this PR, I only addressed the subscribe part, and I missed the publish part.

There are two places where SDX Controller uses the message queue. This is where it listens for incoming messages:

app.rpc_consumer = RpcConsumer(thread_queue, "", topology_manager)
rpc_thread = threading.Thread(
target=app.rpc_consumer.start_sdx_consumer,
kwargs={"thread_queue": thread_queue, "db_instance": app.db_instance},
daemon=True,
)
rpc_thread.start()

And this is where it publishes:

producer = TopicQueueProducer(
timeout=5, exchange_name=exchange_name, routing_key=domain_name
)
producer.call(json.dumps(link))

That is it, as far as I know. There are several classes under sdx_controller/messaging, but I am not very familiar with them. It probably would be useful to simplify the code under that directory and remove the unused code (or make it a library that both sdx-controller and sdx-lc can use), but I am not sure if/when to do that or if it is worth the effort. I believe @congwang09 might have opinions on it.

Your contributions are totally welcome, and your approach is certainly better. Would you be willing to start a PR?

Hi @sajith great points you raised. Thanks for continue the discussion and I agree with you regarding simplifying the code under messaging folder. I also agree with the strategy you mentioned about we having another PR as an addition to this one with the missing files I mentioned above. And I'm willing to contribute authoring the new PR. Let's do this: following the above comments, I will open an issue to let @congwang09 and @YufengXin decide what can be simplified under messaging folder, as well as work in a new PR that will sits over this one with the missing changes.

Thanks again for you comments, @sajith!

@italovalcy
Copy link
Contributor

Hi @sajith great points you raised. Thanks for continue the discussion and I agree with you regarding simplifying the code under messaging folder. I also agree with the strategy you mentioned about we having another PR as an addition to this one with the missing files I mentioned above. And I'm willing to contribute authoring the new PR. Let's do this: following the above comments, I will open an issue to let @congwang09 and @YufengXin decide what can be simplified under messaging folder, as well as work in a new PR that will sits over this one with the missing changes.

Thanks again for you comments, @sajith!

Opened Issue #249 to discuss code cleaning opportunities under messaging subfolder. Opened PR #250 to handle additional changes for RabbitMQ authentication.

@sajith with that said, I will update my review and approve your PR.

Copy link
Contributor

@italovalcy italovalcy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Additional changes to allow RabbitMQ authentication
@sajith sajith merged commit 81df847 into main Mar 28, 2024
11 checks passed
@sajith sajith deleted the 197.rabbitmq-auth branch March 28, 2024 14:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

adding credentials to rabbitmq
4 participants