Skip to content

Commit

Permalink
Merge pull request #223 from atlanticwave-sdx/197.rabbitmq-auth
Browse files Browse the repository at this point in the history
Use password authentication with RabbitMQ
  • Loading branch information
sajith authored Mar 28, 2024
2 parents ca5d85a + 810fc1a commit 81df847
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ jobs:
SDX_VERSION: '1.0.0'
SDX_NAME: 'sdx-controller-test'
MQ_HOST: 'localhost'
MQ_PORT: '5672'
MQ_USER: 'guest'
MQ_PASS: 'guest'
SUB_QUEUE: 'sdx-controller-test-queue'
DB_NAME: 'sdx-controllder-test-db'
DB_CONFIG_TABLE_NAME: 'sdx-controller-test-table'
Expand Down
3 changes: 3 additions & 0 deletions env.template
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ export SDX_NAME="sdx-controller-test"

# Message queue settings for SDX Controller.
export MQ_HOST="aw-sdx-monitor.renci.org"
export MQ_PORT=5672
export MQ_USER="guest"
export MQ_PASS="guest"
export SUB_QUEUE="topo"

# MongoDB settings for SDX Controller.
Expand Down
34 changes: 30 additions & 4 deletions sdx_controller/messaging/message_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

import pika

MQ_HOST = os.environ.get("MQ_HOST")
MQ_HOST = os.getenv("MQ_HOST")
MQ_PORT = os.getenv("MQ_PORT") or 5672
MQ_USER = os.getenv("MQ_USER") or "guest"
MQ_PASS = os.getenv("MQ_PASS") or "guest"


class MessageQueue:
Expand All @@ -21,16 +24,33 @@ def __call__(cls, *args, **kwargs):


class RabbitMqServerConfigure(metaclass=MetaClass):
def __init__(self, host=MQ_HOST, queue="hello"):
def __init__(
self,
host=MQ_HOST,
port=MQ_PORT,
username=MQ_USER,
password=MQ_PASS,
queue="hello",
):
self.host = host
self.port = port
self.username = username
self.password = password
self.queue = queue


class rabbitmqServer:
def __init__(self, server):
self.server = server
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.server.host)
pika.ConnectionParameters(
host=self.server.host,
port=self.server.port,
credentials=pika.PlainCredentials(
username=self.server.username,
password=self.server.password,
),
)
)
self._channel = self._connection.channel()
self._tem = self._channel.queue_declare(queue=self.server.queue)
Expand All @@ -54,7 +74,13 @@ def startserver(self):


if __name__ == "__main__":
serverconfigure = RabbitMqServerConfigure(host=MQ_HOST, queue="hello")
serverconfigure = RabbitMqServerConfigure(
host=MQ_HOST,
port=MQ_PORT,
username=MQ_USER,
password=MQ_PASS,
queue="hello",
)

server = rabbitmqServer(server=serverconfigure)
server.startserver()
15 changes: 12 additions & 3 deletions sdx_controller/messaging/rpc_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,27 @@
from sdx_controller.handlers.lc_message_handler import LcMessageHandler
from sdx_controller.utils.parse_helper import ParseHelper

MQ_HOST = os.environ.get("MQ_HOST")
MQ_HOST = os.getenv("MQ_HOST")
MQ_PORT = os.getenv("MQ_PORT") or 5672
MQ_USER = os.getenv("MQ_USER") or "guest"
MQ_PASS = os.getenv("MQ_PASS") or "guest"

# subscribe to the corresponding queue
SUB_QUEUE = os.environ.get("SUB_QUEUE")
SUB_QUEUE = os.getenv("SUB_QUEUE")

logger = logging.getLogger(__name__)


class RpcConsumer(object):
def __init__(self, thread_queue, exchange_name, te_manager):
self.logger = logging.getLogger(__name__)

self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=MQ_HOST)
pika.ConnectionParameters(
host=MQ_HOST,
port=MQ_PORT,
credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
)
)

self.channel = self.connection.channel()
Expand Down
11 changes: 9 additions & 2 deletions sdx_controller/messaging/rpc_queue_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,21 @@

import pika

MQ_HOST = os.environ.get("MQ_HOST")
MQ_HOST = os.getenv("MQ_HOST")
MQ_PORT = os.getenv("MQ_PORT") or 5672
MQ_USER = os.getenv("MQ_USER") or "guest"
MQ_PASS = os.getenv("MQ_PASS") or "guest"


class RpcProducer(object):
def __init__(self, timeout, exchange_name, routing_key):
self.logger = logging.getLogger(__name__)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=MQ_HOST)
pika.ConnectionParameters(
host=MQ_HOST,
port=MQ_PORT,
credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
)
)

self.channel = self.connection.channel()
Expand Down
17 changes: 12 additions & 5 deletions sdx_controller/messaging/topic_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@

import pika

MQ_HOST = os.environ.get("MQ_HOST")
MQ_HOST = os.getenv("MQ_HOST")
MQ_PORT = os.getenv("MQ_PORT") or 5672
MQ_USER = os.getenv("MQ_USER") or "guest"
MQ_PASS = os.getenv("MQ_PASS") or "guest"
# subscribe to the corresponding queue
SUB_QUEUE = os.environ.get("SUB_QUEUE")
SUB_TOPIC = os.environ.get("SUB_TOPIC")
SUB_EXCHANGE = os.environ.get("SUB_EXCHANGE")
SUB_QUEUE = os.getenv("SUB_QUEUE")
SUB_TOPIC = os.getenv("SUB_TOPIC")
SUB_EXCHANGE = os.getenv("SUB_EXCHANGE")


class TopicQueueConsumer(object):
def __init__(self, thread_queue, exchange_name):
self.logger = logging.getLogger(__name__)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=MQ_HOST)
pika.ConnectionParameters(
host=MQ_HOST,
port=MQ_PORT,
credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
)
)

self.channel = self.connection.channel()
Expand Down
11 changes: 9 additions & 2 deletions sdx_controller/messaging/topic_queue_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

import pika

MQ_HOST = os.environ.get("MQ_HOST")
MQ_HOST = os.getenv("MQ_HOST")
MQ_PORT = os.getenv("MQ_PORT") or 5672
MQ_USER = os.getenv("MQ_USER") or "guest"
MQ_PASS = os.getenv("MQ_PASS") or "guest"


class TopicQueueProducer(object):
Expand All @@ -15,7 +18,11 @@ class TopicQueueProducer(object):
def __init__(self, timeout, exchange_name, routing_key):
self.logger = logging.getLogger(__name__)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=MQ_HOST)
pika.ConnectionParameters(
host=MQ_HOST,
port=MQ_PORT,
credentials=pika.PlainCredentials(username=MQ_USER, password=MQ_PASS),
)
)

self.channel = self.connection.channel()
Expand Down

0 comments on commit 81df847

Please sign in to comment.