目次

Ratchet Push統合

Version 0.4.3

y2sunlight 2020-11-24

Ratchet に戻る

関連記事

リンク

本章は、Ratchet 公式サイトのチュートリアル(Push to an Existing Site)のレビューです。本章は2つのパートから構成されています。



概要

問題点

これまでのチュートリアルは全てクールでしたが、開発者の焦点は、ユーザーが永続性なしでWebSocketを介して完全な対話のできるロングライフなアプリケーションを作成することです。これは、既存のサイトに組み込むには大変な作業になります。コードをリポジトリから取り出し、新しいRatchetアプリケーションに移植する必要があるかもしれません。また、以前に機能していたページが引き続き機能することを確認するには、全く新しいテストフェーズを実行する必要があるかもしれません。

目標

管理者ユーザーであれ、ブログにコメントを投稿しているユーザーであれ、ユーザーがフォーム送信(または AJAX)を介してPOSTを実行すると、その変更がそのページの他の全ての訪問者に直ぐにプッシュされるようにします。サイトへの変更はリアルタイムに行い、コードの基盤を壊したり、現在の安定性に影響に与えたりすることはしません。

このチュートリアルでは、あなたがWebサイトにブログ記事を公開していると仮定します(コードの雛形はありませんが読み取って下さい)。そして、訪問者は、あなはがそれを公開するとすぐに、その記事をポップアップ表示で見ることでしょう。

ネットワーク構成

  1. クライアントは、webサーバーに要求を行い応答を受け取り、ページをレンダリングします。その後、開いているWebSocketに対して接続を確立します。(クライアント2と3は同じことを行います)

  2. Client1 は、フォーム送信またはAJAXを介してWebサーバーにPOSTバックを実行します。(WebSocketにまだ接続してることに注意してください)

  3. webサーバーは、POSTリクエストを処理(データベースへの保存など)している間、ZeroMQトランスポートを使用してメッセージをWebSocketスタックに直接送信します。

  4. WebSocketスタックはZeroMQのメッセージを処理し、開いているWebSocket接続を介してそのメッセージを適切なクライアントに送信します。WebブラウザーはWebSocketからの着信メッセージを処理し、それに応じてWebページをJavascriptで更新します。

このワークフローは控えめで、既存のWebサイトに簡単に導入できます。サイトへの唯一の変更は、サーバーに ZeroMQ を追加し、WebSocket サーバーからの着信メッセージを処理するためにクライアントに Javascript ファイルを追加することだけです。


チュートリアルの説明

本項は、公式サイトのチュートリアル(Push to an Existing Site)の Requirement の項以降の翻訳ですが、筆者のWindows環境(PHP7.2.22)では、再現することができませんでした。

その原因は、PHPのCLI版では正常に動作した ZeroMQ が Apache を介したPHP拡張モジュールとし動作しなかったからです。phpinfo()でも ZeroMQ のロードが確認出来ており、ZMQSocket はパースエラーもなくそのインスタンスは正常に作成できていたにも関わらず、ZMQSocketのsend()メソッドが働きませんでした。この現象につき情報をお持ちの方は、お手数でもコメント頂戴できれば幸いです。

従って、本項はチュートリアルの翻訳だけに留め、その代替案として ZeroMQ ではなくて、PHPのソケット関数を使いチュートリアルを実行しました。その実装方法については、次章「チュートリアルの実行」を参照して下さい。

尚、ZeroMQのPHP拡張モジュールのWindowsバイナリは現段階でPHP7.2までしか正式に公開されていませんでしたので、PHP7.2.22の環境下でテストしております。(2020-11-25時点)


要件

ZeroMQ

実行中のスクリプトと通信するには、開いているソケットでリッスンしている必要があります。私たちのアプリケーションは、着信中の WebSocket 接続のポート8080をリッスンしますが、他のPHPスクリプトからの更新も取得するにはどうすればよいでしょうか。ZeroMQ|ZeroMQ を使いましょう。Ratchetを構築しているようなrawソケットを使用することもできますが、ZeroMQは、簡単にソケットだけを作ってくれるライブラリです。

ZeroMQは、ライブラリー(libzmq)であり、それはPHPバインディング用のPECL拡張機能と同様にインストールする必要があります。インストールは簡単で、多くのオペレーティングシステム用のものがWebサイトで提供されています。

ZeroMQのインストール方法は以下を参照して下さい。

ZeroMQのPHP拡張モジュールのWindowsバイナリは現段階でPHP7.2までしか正式に公開されていません。(2020-11-25時点)

React/ZMQ

Ratchet は、React と呼ばれるソケットライブラリの上に構築された WebSocket ライブラリです。React は、接続と Ratchet に対する raw I/O を処理します。私達は Ratchet に付属している React に加えて、React スイートの一部である別のライブラリ React/ZMQ が必要となります。このライブラリは、ZeroMQ ソケットをReactorコアにバインドし、WebSocket と ZeroMQ ソケットの両方を処理できるようにします。

React/ZMQ をインストールするための composer.json ファイルについては次項を参照して下さい。


コーディングの開始

さあコードに取り掛かりましょう! まず、アプリケーション スタブクラスから始めます。ここでは、Pub/Subパターンでの使いやすさのためにWAMP( WampServerInterface )を使用します。これにより、クライアントは特定のページの更新を購読でき、購読しているユーザーにのみ更新をプッシュします。

Pusher.php
<?php
namespace MyApp;
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
 
class Pusher implements WampServerInterface {
    public function onSubscribe(ConnectionInterface $conn, $topic) {
    }
    public function onUnSubscribe(ConnectionInterface $conn, $topic) {
    }
    public function onOpen(ConnectionInterface $conn) {
    }
    public function onClose(ConnectionInterface $conn) {
    }
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
        // In this application if clients send data it's because the user hacked around in console
        // このアプリケーションでは、クライアントがデータを送信する場合、それはユーザーがコンソールでハッキングしたためです
        $conn->callError($id, $topic, 'You are not allowed to make calls')->close();
    }
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
        // In this application if clients send data it's because the user hacked around in console
        $conn->close();
    }
    public function onError(ConnectionInterface $conn, \Exception $e) {
    }
}

これを /src/Pusher.php に保存します。WAMPに必要なメソッドを作成し、誰からもデータを送信されないようにし、送信された場合はその接続を閉じます。ここでは、プッシュアプリケーションを作成しており、WebSocketからの受信メッセージは受け付けていません。これらのメッセージはすべてAJAXから送信されます。


ブログ送信の編集

次に、新しいブログ投稿を処理する既存のWebサイトのコードに ZeroMQ のおまじないを少し追加します。ここでのコードは、あなたが実際にブログで設置している Drupal や WordPress のような高度なアーキテクチャと比較すると、少し基本的で古風なものかもしれませんが、ここでは基本的なことに焦点を当てています。

<?php
    // post.php ???
    // This all was here before  ;)
    $entryData = array(
        'category' => $_POST['category']
      , 'title'    => $_POST['title']
      , 'article'  => $_POST['article']
      , 'when'     => time()
    );
 
    $pdo->prepare("INSERT INTO blogs (title, article, category, published) VALUES (?, ?, ?, ?)")
        ->execute($entryData['title'], $entryData['article'], $entryData['category'], $entryData['when']);
 
    // This is our new stuff
    $context = new ZMQContext();
    $socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
    $socket->connect("tcp://localhost:5555");
 
    $socket->send(json_encode($entryData));

ブログ投稿をデータベースに記録した後、ソケットサーバーへの ZeroMQ 接続を開き、投稿と同じ情報を含むシリアル化されたメッセージを配信します。(注:適切な消毒を行ってください。これはとりあえずの汚い例です)


ZeroMQ メッセージの処理

アプリケーションスタブクラスに戻りましょう。そのままにしておくと、WebSocket 接続のみを処理することになります。前回のコードスニペットで見たように、データを送信するポート 5555 でローカルホストへの接続を開きました。ZeroMQメッセージに対する処理を追加し、WebSocketクライアントに再送信するようにしましょう。

<?php
namespace MyApp;
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
 
class Pusher implements WampServerInterface {
    /**
     * A lookup of all the topics clients have subscribed to
     * クライアントが購読している全てのトピックの探索配列
     */
    protected $subscribedTopics = array();
 
    public function onSubscribe(ConnectionInterface $conn, $topic) {
        $this->subscribedTopics[$topic->getId()] = $topic;
    }
 
    /**
     * @param string JSON'ified string we'll receive from ZeroMQ
     */
    public function onBlogEntry($entry) {
        $entryData = json_decode($entry, true);
 
        // If the lookup topic object isn't set there is no one to publish to
        // トピック探索オブジェクトがセットされていない場合、公開する相手はありません
        if (!array_key_exists($entryData['category'], $this->subscribedTopics)) {
            return;
        }
 
        $topic = $this->subscribedTopics[$entryData['category']];
 
        // re-send the data to all the clients subscribed to that category
        // そのカテゴリを購読している全てのクライアントにデータを再送信します
        $topic->broadcast($entryData);
    }
 
    /* The rest of our methods were as they were, omitted from docs to save space
       残りのメソッドはそのままで、スペースを節約するためにドキュメントから省略しました
    */
}


全てを結合する

これまで、メッセージの送信、受信、および処理のすべてのロジックについて説明してきました。次に、すべてを結合して、すべてを管理する実行可能なスクリプトを作成します。I/O、WebSocket、WAMP、ZeroMQ の各コンポーネントを使用してRatchetアプリケーションを構築し、イベントループを実行します。

<?php
    require dirname(__DIR__) . '/vendor/autoload.php';
 
    $loop   = React\EventLoop\Factory::create();
    $pusher = new MyApp\Pusher;
 
    // Listen for the web server to make a ZeroMQ push after an ajax request
    // ajaxリクエストの後にZeroMQプッシュを行うためにWebサーバーをリッスンします
    $context = new React\ZMQ\Context($loop);
    $pull = $context->getSocket(ZMQ::SOCKET_PULL);
    $pull->bind('tcp://127.0.0.1:5555'); // Binding to 127.0.0.1 means the only client that can connect is itself
    $pull->on('message', array($pusher, 'onBlogEntry'));
 
    // Set up our WebSocket server for clients wanting real-time updates
    // リアルタイム更新が必要なクライアント向けにWebSocketサーバーをセットアップします
    $webSock = new React\Socket\Server('0.0.0.0:8080', $loop); // Binding to 0.0.0.0 means remotes can connect
    $webServer = new Ratchet\Server\IoServer(
        new Ratchet\Http\HttpServer(
            new Ratchet\WebSocket\WsServer(
                new Ratchet\Wamp\WampServer(
                    $pusher
                )
            )
        ),
        $webSock
    );
 
    $loop->run();

コードを /bin/push-server.php として保存し、実行します。

$ php bin/push-server.php


クライアントサイド

サーバーサイドのコードが完成し、稼働しているので、これらのリアルタイムの投稿を取得する時が来ました!これらの更新で具体的に何を行うのかは、このドキュメントの範囲を超えています。これらのメッセージをデバッグコンソールに出力するだけにします。

<script src="https://gist.githubusercontent.com/cboden/fcae978cfc016d506639c5241f94e772/raw/e974ce895df527c83b8e010124a034cfcf6c9f4b/autobahn.js"></script>
<script>
    var conn = new ab.Session('ws://localhost:8080',
        function() {
            conn.subscribe('kittensCategory', function(topic, data) {
                // This is where you would add the new article to the DOM (beyond the scope of this tutorial)
                // ここで、新しい記事をDOMに追加します(これは、このチュートリアルの範囲を超えています)
                console.log('New article published to category "' + topic + '" : ' + data.title);
            });
        },
        function() {
            console.warn('WebSocket connection closed');
        },
        {'skipSubprotocolCheck': true}
    );
</script>

最後に、このJavascriptを配置したページを1つのブラウザウィンドウで開き、別のブラウザから「kittensCategory」にブログエントリを投稿して、最初からコンソールのログを確認します。それが機能していたら、次のステップは、受信したデータを操作性の良いDOMの中に組み込むことです。

これがローカルで機能している場合(ローカルホストが開発環境であると想定)、ローカルホストの参照と、場合によっては適切なサーバーホスト名/IPアドレスへの結合を変更することができます。


チュートリアルの実行

本項ではチュートリアルの実行について説明します。前項でも述べたように、筆者の環境では ZeroMQ を使った実装がWindowsで出来なかったので、以下の環境に変更してチュートリアルの実行を行いました。

以下では、実行で使用したプロジェクトとソースコードを掲載しますが、変更点以外は、前項のチュートリアルの説明と同じです。


プロジェクトの作成

まず、プロジェクトフォルダ(以下 {Project-Folder} と呼びます)を作成し、そこにComposerファイルを設置します。

composer.json
{
    "autoload": {
        "psr-4": {
            "MyApp\\": "src"
        }
    },
    "require": {
        "cboden/ratchet": "^0.4"
    }
}

上の設定では、プロジェクトフォルダ下の src フォルダにアプリケーションを設置し、そこを MyApp 名前空間として定義します。 Composer の規則に従い、ratchet は vender/cboden/ratchet の下に保存されます。

また、プロジェクトフォルダ下に以下のフォルダを作成します:

準備が出来たら、コマンドプロンプトでプロジェクトフォルダに移動して、composer install を実行します。


WebSockサーバー

以下のWebSockサーバーを {Project-Folder}/bin の下に配置します。

{Project-Folder}/bin/push-server.php

push-server.php
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
 
use React\EventLoop\Factory;
use React\Socket\ConnectionInterface;
use React\Socket\Server;
 
$pusher = new MyApp\Pusher;
$ws_port = '0.0.0.0:8080';       // WebSocketサーバー (0.0.0.0は公開を表す)
$pusher_port = '127.0.0.1:5555'; // プッシュ用のメッセージ受信ソケット(ローカル用)
 
// イベントループの作成
$loop = Factory::create();
 
// リアルタイム更新が必要なクライアント向けにWebSocketサーバーをセットアップします。
// リアルタイム更新を待ちます
$webSock = new React\Socket\Server($ws_port, $loop);
$webServer = new Ratchet\Server\IoServer(
    new Ratchet\Http\HttpServer(
        new Ratchet\WebSocket\WsServer(
            new Ratchet\Wamp\WampServer(
                $pusher
                )
            )
        ),
    $webSock
    );
 
// プッシュ用のメッセージ受信のために着信TCPソケットをセットアップします。
// postメッセージがrawソケットを使って、このTCPポートに送信されてきます
$inSocket = new Server($pusher_port, $loop);
$inSocket->on('connection', function (ConnectionInterface $conn) use ($pusher) {
    // ソケットでメッセージを受信した時
    $conn->on('data', function ($data) use ($conn, $pusher) {
        //ブログ投稿の更新をプッシュします
        $pusher->onBlogEntry($data);
        $conn->close();
    });
});
 
// イベントループ
$loop->run();

Pusherクラスについては、デバッグライトを追加した以外は、オリジナルのコードと同じです。これは、{Project-Folder}/src に配置します。

{Project-Folder}/src/Pusher.php

Pusher.php
<?php
namespace MyApp;
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
 
class Pusher implements WampServerInterface {
    /**
     * クライアントが購読している全てのトピックの探索配列
     */
    protected $subscribedTopics = array();
 
    public function onSubscribe(ConnectionInterface $conn, $topic) {
        echo "onSubscribe({$conn->remoteAddress},{$conn->resourceId},'{$topic->getId()}')\n";
        $this->subscribedTopics[$topic->getId()] = $topic;
    }
 
    /**
     * @param string ZeroMQから受信したJSON化された文字列
     */
    public function onBlogEntry($entry) {
        echo "onBlogEntry():\n";
        $entryData = json_decode($entry, true);
 
        // トピック探索オブジェクトがセットされていない場合、公開する相手はありません
        if (!array_key_exists($entryData['category'], $this->subscribedTopics)) {
            return;
        }
 
        $topic = $this->subscribedTopics[$entryData['category']];
 
        // そのカテゴリを購読している全てのクライアントにデータを再送信します
        $topic->broadcast($entryData);
        echo json_encode($entryData)."\n";
    }
 
    public function onUnSubscribe(ConnectionInterface $conn, $topic) {
        echo "onUnSubscribe({$conn->remoteAddress},{$conn->resourceId})\n";
    }
    public function onOpen(ConnectionInterface $conn) {
        echo "onOpen({$conn->remoteAddress},{$conn->resourceId})\n";
    }
    public function onClose(ConnectionInterface $conn) {
        echo "onClose({$conn->remoteAddress},{$conn->resourceId})\n";
    }
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
        echo "onCall({$conn->remoteAddress},{$conn->resourceId})\n";
        // このアプリケーションでは、クライアントがデータを送信する場合、それはユーザーがコンソールでハッキングしたためです
        $conn->callError($id, $topic, 'You are not allowed to make calls')->close();
    }
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
        echo "onPublish({$conn->remoteAddress},{$conn->resourceId})\n";
        // このアプリケーションでは、クライアントがデータを送信する場合、それはユーザーがコンソールでハッキングしたためです
        $conn->close();
    }
    public function onError(ConnectionInterface $conn, \Exception $e) {
        echo "onError({$conn->remoteAddress},{$conn->resourceId})\n";
    }
}


ブログ投稿ページ

新しいブログ投稿を処理する為のコードは、新規に作成し直しました。このコードは {Project-Folder}/blog に配置します。

{Project-Folder}/blog/entry.php

entry.php
<?php
putenv('app-version=1.0.0');
 
if ($_SERVER ['REQUEST_METHOD'] === 'POST')
{
    $entryData = array(
        'category' => $_POST['category']
        , 'title'    => $_POST['title']
        , 'article'  => $_POST['article']
        , 'when'     => time()
    );
 
    $instance = stream_socket_client('tcp://127.0.0.1:5555');
    fwrite($instance, json_encode($entryData));
 
    header("Location: {$_SERVER['REQUEST_URI']}");
    exit();
}
?>
<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Blog Stab</title>
    <!-- Bootstrap -->
    <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
    <script src="https://code.jquery.com/jquery-3.3.1.slim.min.js" integrity="sha384-q8i/X+965DzO0rT7abK41JStQIAqVgRVzpbzo5smXKp4YfRvH+8abtTE1Pi6jizo" crossorigin="anonymous"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.14.7/umd/popper.min.js" integrity="sha384-UO2eT0CpHqdSJQ6hJty5KVphtPhzWj9WO1clHTMGa3JDZwrnQq4sF86dIHNDz0W1" crossorigin="anonymous"></script>
    <script src="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js" integrity="sha384-JjSmVgyd0p3pXB1rRibZUAYoIIy6OrQ6VrjIEaFf/nJGzIxFDsf4x0xIM+B07jRM" crossorigin="anonymous"></script>
    <!-- javascript -->
    <script src="js/autobahn.js"></script>
    <script src="js/main.js?v=<?php echo getenv('app-version')?>"></script>
 
</head>
<body>
    <main class="mt-3 mb-5">
        <div class="container">
            <div class="row justify-content-center">
                <div class="col-md-12">
<?php include('template/entry.template.php')?>
                </div>
            </div>
            <div class="mt-2">
                <dl id="new-articles"><dl>
            </div>
        </div>
    </main>
</body>
</html>


クライアントサイド

クライアントサイドのメインのソースコードは上に掲載した entry.php 内のHTMLです。メインで使っている他のソースコードは以下の通りです。

以下は、ブログ投稿画面のHTMLです。このコードは {Project-Folder}/blog/template に配置します。

{Project-Folder}/blog/template/entry.template.php

entry.template.php
<h1>Blog Entry</h1>
<form method="POST" name="fm">
 
    <div class="form-group row">
        <label for="category" class="col-md-2 col-form-label">Category</label>
        <div class="col-md-10">
            <select name="category" id="category" class="form-control" required>
                <option value="kittensCategory">kittensCategory</option>
                <option value="category2">Category2</option>
                <option value="category3">Category3</option>
            </select>
        </div>
    </div>
 
    <div class="form-group row">
        <label for="title" class="col-md-2 col-form-label">Title</label>
        <div class="col-md-10">
            <input type="text" name="title" id="title" class="form-control" required>
        </div>
    </div>
 
    <div class="form-group row">
        <label for="article" class="col-md-2 col-form-label">Article</label>
        <div class="col-md-10">
            <textarea name="article" id="article" class="form-control" required></textarea>
        </div>
    </div>
 
    <div class="mt-4">
        <input type="submit" id="btn_submit" class="btn btn-primary">
    </div>
 
</form>

以下は、WebSocketとの接続、購読を行うJavaScriptです。このコードは {Project-Folder}/blog/js に配置します。

{Project-Folder}/blog/js/main.js

main.js
/*
 * Subscribe Session
 */
var conn = new ab.Session('ws://localhost:8080',
    function() {
        conn.subscribe('kittensCategory', function(topic, data) {
            console.log('New article published to category "' + topic + '" : ' + data.title);
            $('#new-articles').prepend('<dt>'+data.title+'</dt><dd>'+data.article+'</dd>');
        });
    },
    function() {
        console.warn('WebSocket connection closed');
    },
    {'skipSubprotocolCheck': true}
);

WAMP(V1)について

WAMP(V1)のJavaScriptライブラリーは、以下に設置します:

{Project-Folder}/blog/js/autobahn.js

このライブラリーは、以下より取得して設置しました。

https://gist.githubusercontent.com/cboden/fcae978cfc016d506639c5241f94e772/raw/e974ce895df527c83b8e010124a034cfcf6c9f4b/autobahn.js

この autobahn.js は、WAMP のバージョン1対応のライブラリです。Ratchet は WAMP のバージョン2をサポートしていないので、敢えて AutobahnJS の古いライブラリーを使用しています。

補足

WAMPとは WebSocket Application Messaging Protocol の略で、次の機能を提供するWebSocketのサブプロトコルです。

詳しくは以下を参照:


プログラムの実行

コマンドプロンプトでプロジェクトフォルダに移動してWebSocketサーバーを起動します。

php bin/push-server.php

ブラウザで entry.php を2つのブラウザーからアクセスします。一方のブラウザでプログを投稿すると、その内容は他方のブラウザに表示されます。