May 3, 2013

FuelPHP x RatchetでWAMPのPubSubとRPCを試してみた

2013/05/05 追記:
当記事の内容を改良して、デモサイトに追加しました。
http://fuelratchet.madroom.org/ratchet/wamp/api

--

先日、以下の記事を書きました。

WebSocketとWAMPとRatchetに関するメモ
http://madroom-project.blogspot.jp/2013/05/websocketwampratchet.html


実際にFuelPHPのRatchetパッケージで、WAMPのPubSubとRPCを試してみました。
https://github.com/mp-php/fuel-packages-ratchet

具体的には、以下の機能を作ってみました。
* 指定したトピックに配信する (PubSub)
* 指定したトピックを購読する (PubSub)
* 指定したトピックを購読解除する (PubSub)
* 購読中のトピック一覧を取得する (RPC)


以下、htmlと、RatchetパッケージのRatchet_Wampクラスを継承したクラスのソースです。

-- public/wamp_test.html(実際にはviewファイル) --
* autobahn.min.jsはWampServerと通信を行うためのJSライブラリです。
* autobahn.min.jsは http://autobahn.ws/js/downloads からダウンロード出来ます。
* autobahn.min.jsのライセンスは http://autobahn.ws/js に"MIT"と記されています。
* "wsuri"の値は変更して下さい。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>FuelPHP x Ratchet WAMP Test</title>

<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js"></script>
<script type="text/javascript" src="assets/js/autobahn.min.js"></script>

</head>
<body>

<!--配信 (WampServerのonPublishメソッドが呼ばれる)-->
<div id="publish">
<select>
    <option value="topic_1">Topic 1</option>
    <option value="topic_2">Topic 2</option>
    <option value="topic_3">Topic 3</option>
    <option value="invalid_topic">Invalid Topic</option>
</select>
<input type="text" />
<button>Publish</button>
</div>

<hr />

<!--購読 (WampServerのonSubscribeメソッドが呼ばれる)-->
<div id="subscribe">
<select>
    <option value="topic_1">Topic 1</option>
    <option value="topic_2">Topic 2</option>
    <option value="topic_3">Topic 3</option>
    <option value="invalid_topic">Invalid Topic</option>
</select>
<button>Subscribe</button>
</div>

<hr />

<!--購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)-->
<div id="unsubscribe">
<select>
    <option value="topic_1">Topic 1</option>
    <option value="topic_2">Topic 2</option>
    <option value="topic_3">Topic 3</option>
    <option value="invalid_topic">Invalid Topic</option>
</select>
<button>Unsubscribe</button>
</div>

<hr />

<!--RPC (WampServerのonCallメソッドが呼ばれる)-->
<div id="rpc">
<select>
    <option value="get_subscribing_topics">Get Subscribing Topics</option>
    <option value="invalid_method">Invalid Method</option>
</select>
<button>Call</button>
</div>

<hr />

<p>Check your console.</p>

<script>
$(document).ready(function() {
    var sess; // WampServerとのコネクション
    var wsuri = 'ws://example.com:[ポート番号]';

    /**
     * 使い方等: http://autobahn.ws/js
     */
    sess = new ab.Session(wsuri,

        // コネクション接続時のコールバック関数
        function() {
            console.log("Connected!");
        },

        // コネクション切断時のコールバック関数
        function(reason) {
            switch (reason) {
                case ab.CONNECTION_CLOSED:
                    // 意図した切断の場合?
                    console.log("Connection was closed properly - done.");
                break;
                case ab.CONNECTION_UNREACHABLE:
                    // WampServerに到達できなかった場合?
                    console.log("Connection could not be established.");
                break;
                case ab.CONNECTION_UNSUPPORTED:
                    // ブラウザがWebSocketをサポートしていない場合
                    console.log("Browser does not support WebSocket.");
                break;
                case ab.CONNECTION_LOST:
                    // 意図しない切断の場合?
                    console.log("Connection lost - reconnecting ...");

                    // 1秒後に再接続を試みる
                    window.setTimeout(connect, 1000);
                break;
            }
        }
    );

    // 配信 (WampServerのonPublishメソッドが呼ばれる)
    $("#publish > button").click(function() {
        var input = $("#publish > input");
        var select = $("#publish > select");

        console.log("-- Publish --");
        if(input.val().length == 0) {
            console.log("Input is empty.");
        } else {
            console.log("Topic: " + select.val());
            console.log("Input: " + input.val());

            sess.publish(select.val(), JSON.stringify({input: input.val()}));
            input.val('');
        }
    });

    // 購読 (WampServerのonSubscribeメソッドが呼ばれる)
    $("#subscribe > button").click(function() {
        var select = $("#subscribe > select");

        console.log("-- Subscribe --");
        console.log("Topic: " + select.val());

        sess.subscribe(select.val(), function (topic, event) {
            console.log("-- Received --");
            console.log("Topic: " + topic);
            console.log("event: " + event);
        });
    });

    // 購読解除 (WampServerのonUnSubscribeメソッドが呼ばれる)
    $("#unsubscribe > button").click(function() {
        var select = $("#unsubscribe > select");

        console.log("-- Unsubscribe --");
        console.log("Topic: " + select.val());

        try {
            sess.unsubscribe(select.val());
        } catch(e) {
            console.warn(e);
        }
    });

    // RPC (WampServerのonCallメソッドが呼ばれる)
    $("#rpc > button").click(function() {
        var select = $("#rpc > select");

        console.log("-- RPC --");
        console.log("Method: " + select.val());

        sess.call(select.val()).then(function (result) {
           // do stuff with the result
           console.log(result);
        }, function(error) {
           // handle the error
           console.log(error);
        });
    });

});
</script>

</body>
</html>
-- fuel/app/classes/ratchet/wamp/test.php --
* バリデーションとか、細かな処理は抜けています。
<?php

class Ratchet_Wamp_Test extends Ratchet_Wamp
{
    // トピック一覧
    private $topics = array();

    /**
     * 切断
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     */
    public function onClose(\Ratchet\ConnectionInterface $conn) {
        // 全てのトピックを購読解除
        foreach ($this->topics as $topic)
        {
            $this->onUnSubscribe($conn, $topic);
        }
    }

    /**
     * 配信
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string|\Ratchet\Wamp\Topic $topic
     * @param  string $event
     * @param  array $exclude
     * @param  array $eligible
     */
    public function onPublish(\Ratchet\ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$topic : '.$topic);
        Log::debug('$event : '.$event);
        Log::debug('$exclude : '.print_r($exclude, true));
        Log::debug('$eligible : '.print_r($eligible, true));
        Log::debug('********** '.__FUNCTION__.' end **********');

        // 不正なトピック
        if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
        {
            return;
        }

        $json = json_decode($event);

        // トピックに対する購読者が存在する場合、配信
        if (array_key_exists($topic->getId(), $this->topics))
        {
            $topic->broadcast(Security::htmlentities($json->input));
        }
    }

    /**
     * 購読
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string|\Ratchet\Wamp\Topic $topic
     */
    public function onSubscribe(\Ratchet\ConnectionInterface $conn, $topic) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$topic : '.$topic);
        Log::debug('********** '.__FUNCTION__.' end **********');

        // 不正なトピック
        if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
        {
            return;
        }

        // トピック一覧にトピックを追加
        if (!array_key_exists($topic->getId(), $this->topics))
        {
            $this->topics[$topic->getId()] = $topic;
        }
    }

    /**
     * 購読解除
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string|\Ratchet\Wamp\Topic $topic
     */
    public function onUnSubscribe(\Ratchet\ConnectionInterface $conn, $topic) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$topic : '.$topic);
        Log::debug('********** '.__FUNCTION__.' end **********');

        // 不正なトピック
        if( ! in_array($topic, array('topic_1', 'topic_2', 'topic_3')))
        {
            return;
        }

        // トピックからコネクションを削除
        $topic->remove($conn);

        // トピックの購読者が存在しない場合、トピック一覧からトピックを削除
        if ($topic->count() == 0)
        {
            unset($this->topics[$topic->getId()]);
        }

    }

    /**
     * RPC
     * 
     * @param  \Ratchet\ConnectionInterface $conn
     * @param  string $id
     * @param  string|\Ratchet\Wamp\Topic $fn
     * @param  array $params
     * @return \Ratchet\Wamp\WampConnection
     */
    public function onCall(\Ratchet\ConnectionInterface $conn, $id, $fn, array $params) {
        Log::debug('********** '.__FUNCTION__.' begin **********');
        Log::debug('$id : '.$id);
        Log::debug('$fn : '.$fn);
        Log::debug('$params : '.print_r($params, true));
        Log::debug('********** '.__FUNCTION__.' end **********');

        switch ($fn) {
            // 購読しているトピック一覧を取得
            case 'get_subscribing_topics':
                $subscribing_topics = array();

                Log::debug('********** Topics begin **********');

                foreach ($this->topics as $topic)
                {
                    Log::debug('$topic : '.$topic);
                    Log::debug('$topic->count() : '.$topic->count());

                    $topic->has($conn) and $subscribing_topics[] = $topic;
                }

                Log::debug('********** Topics end **********');

                return $conn->callResult($id, Security::htmlentities($subscribing_topics));
            break;

            // エラー処理
            default:
                $errorUri = 'errorUri';
                $desc = 'desc';
                $details = 'details';

                /**
                 * \Ratchet\Wamp\WampConnection
                 * 
                 * callError($id, $errorUri, $desc = '', $details = null)
                 */
                return $conn->callError($id, $errorUri, $desc, $details);
            break;
        }
    }

}

/* end of file test.php */
wamp_test.htmlにアクセスすると、以下のような画面になります。

上から順に
* 選択したTopicでメッセージを配信
* 選択したTopicを購読
* 選択したTopicを購読解除
* 選択したメソッドをRPCパターンでコール
となります。尚、各種情報は console.log() しています。

P.S.
手元での確認用ソースとして作りましたが、改良してデモサイトにも追加しておこうかな。
http://fuelratchet.madroom.org/
http://madroom-project.blogspot.jp/2013/04/fuelphp-x-ratchet.html
もし追加したら当記事にも追記します。

次は、ZeroMQでHTTPサーバやタスクからのPublishか。

No comments:

Post a Comment