SDN開発エンジニアを目指した活動ブログ

〜SDNなオープンソース製品を実際に使って試してみる〜

OpenStackメッセージング機構を探ってみる(3) 〜Cast(特定通知)編〜

前回につづいて、OpenStackメッセージング機構を探ってみる。

◆前回のおさらい

OpenStackメッセージング機構を体感するために、以下の例題を解決する実現モデルを考えてみる。

1. 例題

実験環境は、サーバ装置1台と、クライアント装置2台で構成されている。
ある特定のクライアント装置が、サーバ装置のマシン時刻を知りたい場合、どうような解決方法が存在するのだろうか?

2. 実現モデル

ここでは、OpenStackメッセージング機構の理解を目的とするので、実現モデルの優位性とかは問題視しません。
案1)同報通知モデル「定期的に、サーバ装置が、全クライアント装置に、同報的に通知する」
案2)特定通知モデル「定期的に、サーバ装置が、特定のクライアント装置に、通知する。」
案3)情報照会モデル「特定のクライアント装置が、サーバ装置にマシン時刻を問い合わせる」

前回は、案1での実現レベルを確認した。

◆案2「特定通知モデル」の事前調査

まず、OpenStackメッセージング概要を理解するために、OpenStack Developerページを確認してみた。
AMQP and Nova — nova 2014.2.dev38.ga8b2eaa documentation


以下は、同サイトからの引用。

RPC Casts

The diagram below the message flow during an rpc.cast operation:
1. A Topic Publisher is instantiated to send the message request to the queuing system.
2. Once the message is dispatched by the exchange, it is fetched by the Topic Consumer dictated by the routing key (such as ‘topic’) and passed to the Worker in charge of the task.

f:id:ttsubo:20140614094732p:plain

Rabbit mqを前提にした記述になっているが、Apcahe Qpidでも動作原理は同じはず。
OpenStackメッセージング機構でのRPC Castsの理解が深まったところで、さっそく、サンプルコードで試してみる。
あと、routing keyとして、topicキーワードへのホスト名の指定有無によって、動作の違いも気なるため、両方とも試してみる。

◆案2-1:routing keyにホスト名を指定しない場合

このモデルは、PublisherからConsumer宛に特定通知する仕組み。castというメソッドが用意されている。

サーバ側サンプルコード"send_cast1.py"

サーバ側の設定ファイル"server.conf"は、前回と同様のものを使用する。

import os
import datetime
import time
import logging
import eventlet
eventlet.monkey_patch()

from openstack.common import context
from openstack.common import rpc 
from openstack.common.rpc import proxy
from openstack.common.rpc import dispatcher
from oslo.config import cfg

logging.basicConfig()
logging.getLogger('qpid').setLevel(logging.INFO)
logging.getLogger('amqp').setLevel(logging.INFO)
logging.getLogger('openstack.common').setLevel(logging.INFO)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)

CONF = cfg.CONF
CONF(default_config_files=['server.conf'])

rpc.set_defaults(control_exchange='sdn')
topic = "sample2"
my_host = os.uname()[1]

class Server(proxy.RpcProxy):
    OPTS = []
    BASE_RPC_API_VERSION = '1.0'

    def __init__(self):
        self.context = context.get_admin_context()
        super(Server, self).__init__(
            topic=topic,
            default_version=self.BASE_RPC_API_VERSION)
        eventlet.spawn(self._periodic_cast)

    def _periodic_cast(self):
        while True:
            time.sleep(1)
            self.func_cast()

    def func_cast(self):
        self.nowtime = datetime.datetime.today()
        LOG.debug('send cast [ hostname=%s, nowtime=%s ]'
            %(my_host, self.nowtime))
        method = "func_cast"
        self.cast(
            self.context,
            self.make_msg(
                method, hostname=my_host, nowtime=self.nowtime
            ),
            topic=self.topic)


if __name__ == '__main__':
    Server()

    while True:
        time.sleep(1)
クライアント側サンプルコード"receive_cast1.py"

クライアント側の設定ファイル"client.conf"は、前回と同様のものを使用する。

import time
import logging
import eventlet
eventlet.monkey_patch()

from openstack.common import context
from openstack.common import rpc 
from openstack.common.rpc import proxy
from openstack.common.rpc import dispatcher
from oslo.config import cfg

logging.basicConfig()
logging.getLogger('qpid').setLevel(logging.INFO)
logging.getLogger('amqp').setLevel(logging.INFO)
logging.getLogger('openstack.common').setLevel(logging.INFO)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)

CONF = cfg.CONF
CONF(default_config_files=['client.conf'])

rpc.set_defaults(control_exchange='sdn')
topic = "sample2"

class Client(object):

    def __init__(self):
        self.dispatcher = dispatcher.RpcDispatcher([self])
        self.conn = rpc.create_connection(new=True)
        self.conn.create_consumer(topic, self.dispatcher)
        self.conn.consume_in_thread()

    def func_cast(self, context, **kwargs):
        LOG.debug('received cast [ hostname=%s, nowtime=%s ]'
            %(kwargs["hostname"],kwargs["nowtime"]))


if __name__ == '__main__':
    Client()

    while True:
        time.sleep(1)
実行結果

サンプルプログラムの実行結果は、こんな感じ。(上段がサーバ装置、中段、下段がクライアント装置)
約1秒ごとに、上段のサーバ装置のマシン時刻が各クライアント装置に特定通知されている様子が確認できた。
ここでのポイントは、2台のクライアント装置が、同一Topicでメッセージング処理を実施しているため、特定通知のスコープとして、クライアント装置が2台とも含まれている。これにより、サーバ装置が転送したマシン時刻を、クライアント装置の交互に通知される様子が確認できた。

f:id:ttsubo:20140614102318j:plain

cast動作概要

サンプルプログラム実行結果に関わるcast動作部分を図式化してみた。

f:id:ttsubo:20140614102502j:plain

◆案2-2:routing keyにホスト名を指定する場合

ここでは、通知先クライアント装置を"Client-2"とした。

サーバ側サンプルコード"send_cast2.py"
import os
import datetime
import time
import logging
import eventlet
eventlet.monkey_patch()

from openstack.common import context
from openstack.common import rpc 
from openstack.common.rpc import proxy
from openstack.common.rpc import dispatcher
from oslo.config import cfg

logging.basicConfig()
logging.getLogger('qpid').setLevel(logging.INFO)
logging.getLogger('amqp').setLevel(logging.INFO)
logging.getLogger('openstack.common').setLevel(logging.INFO)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)

CONF = cfg.CONF
CONF(default_config_files=['server.conf'])

rpc.set_defaults(control_exchange='sdn')
topic = "sample2"
to_host = "Client-2"

class Server(proxy.RpcProxy):
    OPTS = []
    BASE_RPC_API_VERSION = '1.0'

    def __init__(self):
        self.hostname = os.uname()[1]
        self.context = context.get_admin_context()
        super(Server, self).__init__(
            topic=topic,
            default_version=self.BASE_RPC_API_VERSION)
        eventlet.spawn(self._periodic_cast)

    def _periodic_cast(self):
        while True:
            time.sleep(1)
            self.func_cast()

    def func_cast(self):
        self.nowtime = datetime.datetime.today()
        LOG.debug('send cast [ hostname=%s, nowtime=%s ]'
            %(self.hostname, self.nowtime))
        method = "func_cast"
        self.cast(
            self.context,
            self.make_msg(
                method, hostname=self.hostname, nowtime=self.nowtime
            ),
            topic = "%s.%s" % (topic, to_host))


if __name__ == '__main__':
    Server()

    while True:
        time.sleep(1)
クライアント側サンプルコード"receive_cast2.py"
import os
import time
import logging
import eventlet
eventlet.monkey_patch()

from openstack.common import context
from openstack.common import rpc 
from openstack.common.rpc import proxy
from openstack.common.rpc import dispatcher
from oslo.config import cfg

logging.basicConfig()
logging.getLogger('qpid').setLevel(logging.INFO)
logging.getLogger('amqp').setLevel(logging.INFO)
logging.getLogger('openstack.common').setLevel(logging.INFO)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)

CONF = cfg.CONF
CONF(default_config_files=['client.conf'])

rpc.set_defaults(control_exchange='sdn')
topic_base = "sample2"
my_host = os.uname()[1]

class Client(object):

    def __init__(self):
        topic = "%s.%s" % (topic_base, my_host)
        self.dispatcher = dispatcher.RpcDispatcher([self])
        self.conn = rpc.create_connection(new=True)
        self.conn.create_consumer(topic, self.dispatcher)
        self.conn.consume_in_thread()

    def func_cast(self, context, **kwargs):
        LOG.debug('received cast [ hostname=%s, nowtime=%s ]'
            %(kwargs["hostname"],kwargs["nowtime"]))


if __name__ == '__main__':
    Client()

    while True:
        time.sleep(1)
実行結果

サンプルプログラムの実行結果は、こんな感じ。(上段がサーバ装置、中段、下段がクライアント装置)
routing keyに”CLient-2”を含んだ通知メッセージを受信するたびに、上段のサーバ装置のマシン時刻がクライアント装置”Client-2”のみに特定通知されている様子が確認できた。

f:id:ttsubo:20140614142636j:plain

cast動作概要

サンプルプログラム実行結果に関わるcast動作部分を図式化してみた。

f:id:ttsubo:20140614142651j:plain

◆おわりに
今回は、Qpid基本動作として、特定通知モデルのcastメソッド活用方法が理解できた。
次回は、案3「情報照会モデル」を試してみるつもり。