読者です 読者をやめる 読者になる 読者になる

SDN開発エンジニアを目指した活動ブログ

〜SDNなオープンソース製品を実際に使って試してみる〜

OpenStackメッセージング機構を探ってみる(2) 〜Fanout(同報通知)編〜

前回、OpenStackメッセージング機構を探るための実験環境が構築できた。
OpenStackメッセージング機構を探ってみる(1) 〜環境構築編〜 - SDN開発エンジニアを目指した活動ブログ
今回から、OpenStack開発エンジニアを目指して、OpenStackメッセージング機構を体感してみたい。

◆はじめに

OpenStackメッセージング機構を体感するために、以下の例題を解決する実現モデルを考えてみる。

1. 例題

実験環境は、サーバ装置1台と、クライアント装置2台で構成されている。
ある特定のクライアント装置が、サーバ装置のマシン時刻を知りたい場合、どうような解決方法が存在するのだろうか?

2. 実現モデル

ここでは、OpenStackメッセージング機構の理解を目的とするので、実現モデルの優位性とかは問題視しません。
案1)同報通知モデル「定期的に、サーバ装置が、全クライアント装置に、同報的に通知する」
案2)特定通知モデル「定期的に、サーバ装置が、特定のクライアント装置に、通知する。」
案3)情報照会モデル「特定のクライアント装置が、サーバ装置にマシン時刻を問い合わせる」

◆案1「同報通知モデル」を試してみた

このモデルは、PublisherからConsumer宛に同報通知する仕組み。fanout_castというメソッドが用意されている。

サーバ側サンプルコード"send_fanout.py"
import os
import datetime
import time
import logging
import eventlet
eventlet.monkey_patch()

from openstack.common import context
from openstack.common import rpc 
from openstack.common.rpc import proxy
from openstack.common.rpc import dispatcher
from oslo.config import cfg

logging.basicConfig()
logging.getLogger('qpid').setLevel(logging.INFO)
logging.getLogger('amqp').setLevel(logging.INFO)
logging.getLogger('openstack.common').setLevel(logging.INFO)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)

CONF = cfg.CONF
CONF(default_config_files=['server.conf'])

topic = "sample1"
my_host = os.uname()[1]

class Server(proxy.RpcProxy):
    OPTS = []
    BASE_RPC_API_VERSION = '1.0'

    def __init__(self):
        self.context = context.get_admin_context()
        super(Server, self).__init__(
            topic=topic,
            default_version=self.BASE_RPC_API_VERSION)
        eventlet.spawn(self._periodic_fanout)

    def _periodic_fanout(self):
        while True:
            time.sleep(1)
            self.func_fanout()

    def func_fanout(self):
        self.nowtime = datetime.datetime.today()
        LOG.debug('send fanout [ hostname=%s, nowtime=%s ]'
            %(my_host, self.nowtime))
        method = "func_fanout"
        self.fanout_cast(
            self.context,
            self.make_msg(
                method, hostname=my_host, nowtime=self.nowtime
            ),
            topic=self.topic)


if __name__ == '__main__':
    Server()

    while True:
        time.sleep(1)
サーバ側の設定ファイル"server.conf"
[DEFAULT]
qpid_hostname = 192.168.100.101
rpc_backend = openstack.common.rpc.impl_qpid
クライアント側サンプルコード"receive_fanout.py"
import time
import logging
import eventlet
eventlet.monkey_patch()

from openstack.common import context
from openstack.common import rpc 
from openstack.common.rpc import dispatcher
from oslo.config import cfg

logging.basicConfig()
logging.getLogger('qpid').setLevel(logging.INFO)
logging.getLogger('amqp').setLevel(logging.INFO)
logging.getLogger('openstack.common').setLevel(logging.INFO)

LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)

CONF = cfg.CONF
CONF(default_config_files=['client.conf'])

topic = "sample1"

class Client(object):
    OPTS = []

    def __init__(self):
        self.context = context.get_admin_context()
        self.dispatcher = dispatcher.RpcDispatcher([self])
        self.conn = rpc.create_connection(new=True)
        self.conn.create_consumer(topic, self.dispatcher, fanout=True)
        self.conn.consume_in_thread()

    def func_fanout(self, ctxt, **kwargs):
        LOG.debug('received fanout [ hostname=%s, nowtime=%s ]'
            %(kwargs["hostname"],kwargs["nowtime"]))


if __name__ == '__main__':
    Client()

    while True:
        time.sleep(1)
クライアント側の設定ファイル"client.conf"
[DEFAULT]
qpid_hostname = 192.168.100.101
rpc_backend = openstack.common.rpc.impl_qpid
実行結果

サンプルプログラムの実行結果は、こんな感じ。(上段がサーバ装置、中段、下段がクライアント装置)
約1秒ごとに、上段のサーバ装置のマシン時刻が各クライアント装置に同報通知されている様子が確認できた。
f:id:ttsubo:20140614092914j:plain

fanout動作概要

サンプルプログラム実行結果に関わるfanout動作部分を図式化してみた。
実際、qpid-toolsを活用すると、Qpid動作情報も、いろいろと確認できて、Good !
f:id:ttsubo:20140614092901j:plain

◆おわりに

今回は、Qpid基本動作として、同報通知モデルのfanoutメソッド活用方法が理解できた。
OpenStack基盤での開発方法は、なかなか参考文献が少なく、かなり手探りなところがあるが、実際に動作できたので結果オーライでしょうか。
次回は、案2「特定通知モデル」を試してみるつもり。