リアクティブシステム用のMQTTクライアントの実装
MQTT-Reactiveは、LiamBindleのMQTT-Cライブラリから派生したMQTTv3.1.1クライアントです。 MQTT-Reactiveの目的は、リアクティブ組み込みシステムで使用するために、Cで記述されたポータブルで非ブロッキングのMQTTクライアントを提供することです。まず、この記事では、リアクティブシステムとは何かについて説明します。次に、そのようなシステムに適したソフトウェア構造を設計する方法について説明します。最後に、この記事では、ステートマシンとイベント駆動型パラダイムを使用して、リアクティブシステムでMQTT-Reactiveライブラリを使用する方法を示します。そのために、この記事では実際のIoTデバイスを実例として使用し、ステートマシン、インタラクション、構造などのUML図を使用してソフトウェア構造と状態ベースの動作を説明しています。この記事では、IoTデバイスのMQTT-ReactiveクライアントをC言語で実装するためのガイドラインも提供しています。
多くの組み込みシステムは反応的です。つまり、内部または外部のイベントに反応します。これらの反応が完了すると、ソフトウェアは戻って次のイベントを待ちます。そのため、イベント駆動型システムはリアクティブシステムとも呼ばれます。
イベントドリブンプログラミング、またはリアクティブプログラミングは、リアクティブシステム用の柔軟で予測可能で保守可能なソフトウェアを実現するための最も適切なプログラミングパラダイムの1つです。このパラダイムでは、プログラムのフローはイベントによって決定されます。多くの場合、リアクティブソフトウェアの構造は、アクティブオブジェクトと呼ばれる複数の同時ユニットで構成されており、さまざまな種類のイベントを待機して処理します。各アクティブオブジェクトは、制御スレッドと、着信イベントを処理するためのイベントキューを所有しています。リアクティブシステムでは、アクティブオブジェクトは通常、ステートチャートで定義された状態ベースの動作をします。
複数の同時タスクがあり、ステートマシンとイベント駆動型パラダイムの両方を使用するリアクティブシステムでMQTT-Reactiveライブラリを使用する方法を探るために、例としてIoTデバイスを使用します。
MQTTプロトコルを使用するというアイデアは、鉄道会社向けにIoTデバイスが開発されているときに生まれました。このデバイスは、次のことができる明確な反応システムでした。
- いくつかのデジタル入力の変更を検出して保存する
- いくつかのアナログ信号を取得、フィルタリング、保存します
- 保存された情報を定期的にリモートサーバーに送信する
- GSMネットワークを介したMQTTプロトコルを介した情報の送受信
MQTTが選択されたのは、軽量のパブリッシャーサブスクライバーベースのメッセージングプロトコルであり、GSMネットワークなどの高遅延および低データレートのリンクが予想されるIoTおよびネットワーキングアプリケーションで一般的に使用されているためです。
前述のIoTデバイスのMQTT機能は、LiamBindleのMQTT-Cの修正バージョンを使用して実現されました。そのデバイスのソフトウェアはリアクティブソフトウェアとして設計されていたため、非同期イベントを交換してシステムの他の部分と通信するようにMQTT-Cを変更する必要がありました。これらのイベントは、ネットワークを介したトラフィックの送受信、および機密情報のサーバーへの接続と公開に使用されました。結果として得られたソフトウェアライブラリはMQTT-Reactiveと呼ばれていました。
ステートマシン
MQTT-Reactiveは、MQTT-Reactiveクライアントの基本的な動作をモデル化した図1に示すように、ステートマシンを介して使用されました。これは、MqttMgr(MQTT Manager)と呼ばれるアクティブなオブジェクトでした。図1のステートマシンのアクションは、MQTT-Reactiveライブラリをステートマシンから使用する方法を示しています。図1ではC言語がアクション言語として使用されていますが、任意のコンピューターまたは形式言語を使用できます。
クリックすると拡大画像が表示されます
図1.MQTT-Reactiveクライアントのステートマシン(出典:VortexMakes)
図1のステートマシンは、WaitingForNetConnection状態で起動します。サーバーへのネットワーク接続が確立された後、WaitingForNetConnectionはActivateイベントを受信し、ステートマシンはWaitingForSync状態に移行します。この状態でのみ、ステートマシンステージのMQTTメッセージを、それぞれConnectイベントとPublishイベントを介してCONNECTやPUBLISHなどのブローカーに送信できます。同期状態は、同期状態の内部コンパートメントに含まれるdeferキーワードによって指定されたPublishイベントを延期するために、UMLの特別なメカニズムを使用します。 Syncが現在の状態であるときにPublishイベントが発生した場合、SMがWaitingForSyncやWaitingForNetConnectionなどの遅延イベントリストに公開イベントが含まれていない状態になるまで、今後の処理のために保存(遅延)されます。このような状態に入ると、ステートマシンは、保存されたPublishイベントを自動的に呼び出し、遷移ターゲットの状態に応じてこのイベントを消費または破棄します。
SyncTimeミリ秒ごとに、ステートマシンはSync複合状態に移行します。これは、ReceiveイベントとSendイベントをネットワークマネージャーに送信することにより、ネットワークからのトラフィックの実際の送受信を行います。これは、ネットワークの問題を処理する並行エンティティです。
導入されたMqttMgrはCONNECTパケットとPUBLISHパケットのみをサポートしますが、かなり簡単な変更でSUBSCRIBEパケットをサポートできます。
ステートマシンアクションは、paramsキーワードを使用して、消費されたイベントのパラメーターにアクセスします。たとえば、次の遷移では、ConnectイベントはclientIdとkeepAliveの2つのパラメータを運びます。これらの値は、対応するMqttMgrオブジェクトの属性を更新するために使用されます。
Connect(clientId、keepAlive)/ me-> clientId =params-> clientId; me-> keepAlive =params-> keepAlive; me-> operRes =mqtt_connect(&me-> client、me-> clientId、NULL、NULL、0、NULL、NULL、0、me-> keepAlive);
この例では、Connect(clientId、keepAlive)イベントが遷移のトリガーであり、mqtt_connect()呼び出しが結果として実行されるアクションの一部です。つまり、MqttMgrオブジェクトが 'publishing_client'および '400'のパラメーターを持つConnect(clientId、keepAlive)イベントを受信すると、Connect( "publishing_client"、400)、MqttMgrのclientIdおよびkeepAlive属性が値 'で更新されます。その結果、publishing_client 'と' 400 'になります。
イベントを作成して送信するには、ステートマシンのアクションでGEN()マクロを使用します。たとえば、次のステートメントは、ReceiveイベントをCollectorオブジェクトに送信します。このオブジェクトは、そのCollectorポインターによってMqttMgrオブジェクトの属性として参照されます。
GEN(me-> itsCollector、Receive());
GEN()ステートメントの最初の引数はイベントを受信するオブジェクトですが、2番目の引数は送信されるイベントであり、イベント引数(存在する場合)を含みます。引数はイベントパラメータと一致する必要があります。たとえば、次のステートメントはConnRefused(code)イベントを生成し、それをCollectorオブジェクトに送信して、ブローカーから返されたコードをイベントパラメーターとして渡します。
GEN(me-> itsCollector、ConRefused(code));
paramsキーワードを使用して消費されたイベントのパラメーターにアクセスし、GEN()マクロを使用してアクションからイベントを生成するというアイデアは、純粋に説明のためにRational RhapsodyDeveloperのコードジェネレーターから採用されました。
図1のステートマシンのデフォルトのアクションは、ブローカーから接続の受け入れを受信するたびにMQTT-Reactiveによって呼び出されるコールバックを設定します。このコールバックは、MqttMgrコード内に実装する必要があります。このコールバックは、以下に示すように、Collectorオブジェクトに送信するためにConnAcceptedまたはConnRefused(code)イベントのいずれかを生成する必要があります。
静的 無効 connack_response_callback (列挙型 MQTTConnackReturnCode return_code){/ * ... * / if (return_code ==MQTT_CONNACK_ACCEPTED){GEN(me-> itsCollector、ConnAccepted()); } その他 {GEN(me-> itsCollector、ConnRefused(return_code)); }}
モデルの実装
図1のモデルは、お気に入りのソフトウェアツールまたは独自のステートマシン実装のいずれかを使用して、CまたはC ++で実装できます。これを行うためにインターネット上で利用できるさまざまなツールがあります。たとえば、RKHフレームワーク、QPフレームワーク、Yakinduステートチャートツール、Rational RhapsodyDeveloperなどです。それらはすべて、ステートチャートとC / C ++言語をサポートしています。さらに、それらのいくつかには、ステートチャート図を描画し、そこからコードを生成するためのツールが含まれています。
このステートマシンは、MqttMgr(MQTT Manager)と呼ばれるアクティブオブジェクトから実行されました。このオブジェクトは、MQTT-Reactiveコードの厳密なカプセル化を提供し、MQTT-Reactive関数の呼び出しまたはMQTT-Reactiveデータへのアクセスを許可された唯一のエンティティでした。システム内の他の同時エンティティとISRは、MqttMgrとイベントを交換することによって間接的にのみMQTT-Reactiveを使用できました。このメカニズムを使用して同時エンティティを同期し、それらの間でデータを共有することで、セマフォ、ミューテックス、遅延、イベントフラグなどの従来のブロッキングメカニズムの危険に対処する必要がなくなります。これらのメカニズムは、診断と修正が困難で面倒な予期しない誤動作を引き起こす可能性があります。
MqttMgrアクティブオブジェクトは、その属性をデータ項目のセットとしてカプセル化します。データ項目は、名前と型を使用して変数を指定します。型は実際にはデータ型です。 MqttMgrオブジェクトのデータ項目は、オブジェクトの構造体のメンバーにマップされます。メンバーの名前とタイプは、オブジェクトのデータと同じです。たとえば、MqttMgrオブジェクトタイプのクライアント属性は、MqttMgr構造内のデータメンバーとして値によって埋め込まれます。
struct MqttMgr { / * ... * / 構造体 mqtt_client クライアント; / *属性クライアント* / LocalRecvAll localRecv; / *属性localRecv * / };
MqttMgrオブジェクトのデータは、アクセサーまたはミューテーター操作を使用せずに直接アクセスおよび変更されます。たとえば、clientとlocalRecvには、MqttMgrのインスタンスを指すmeポインターを介してアクセスします。
mqtt_recvMsgError(&me-> client、&me-> localRecv);
MqttMgrには、表1に示す属性のリストがあります。
表1.MqttMgr属性
図2の構造は、関係するアクター間の関係を覚えておくのに役立ちます。それらは次のとおりです。ブローカーに情報を送信するコレクターオブジェクト。ネットワークを処理するNetMgrオブジェクト。およびMqttMgrオブジェクト。
図2.IoTシステム構造のドラフト(出典:VortexMakes)
図3のシーケンス図は、MQTTサーバーとのセッションを開く必要がある場合に、MqttMgrオブジェクトがシステムの他の部分とどのように相互作用するかを示しています。この図では、MqttMgr状態と交換された非同期メッセージが、Collector、MqttMgr、およびNetMgrアクター間で示されています。
図3.MQTTブローカーへの接続(出典:VortexMakes)
NetMgrオブジェクトによってブローカーへのネットワーク接続が確立された後、MqttMgrからMQTTサーバーに送信される最初のパケットはCONNECTパケットである必要があります。したがって、コレクターアクターはConnect(clientId、keepAlive)イベントをMqttMgrアクターに送信します。このイベントには、クライアント識別子とキープアライブの時間間隔が含まれている必要があります。サーバーが接続要求を受け入れると、MqttMgrアクターはConnAcceptedイベントをCollectorアクターに送信して、この状況を通知します。それ以降、コレクターアクターはそのブローカーに情報メッセージを公開できます。
サーバーが接続要求を拒否した場合、MqttMgrアクターはConnRefusedイベントをCollectorアクターに送信します。このイベントには、図4に示すように、拒否の原因を通知するコードが含まれています。MQTTv3.1.1セクション3.2.2.3を参照してください。
図4.ブローカーは接続要求を拒否します(出典:VortexMakes)
図5は、メッセージが公開されたときの対話フローを示しています。これを行うために、コレクターアクターはPublish(data、size、topic、qos)イベントを送信します。このイベントには、公開する情報(data)、情報の長さ(バイト単位)(size)、トピック名が含まれます。情報が公開され(トピック)、このメッセージを配信するための保証レベル(qos)。前述のIoTデバイスでは、公開された情報はJSON仕様を使用してフォーマットされていました。これは、人間が読めるテキストの属性と値のペアを持つデータオブジェクトを含むオープンスタンダード形式です。この形式は、Cで記述されたシンプルで軽量なライブラリであるjWriteを使用して実現されました。
図5.ブローカーへのデータの公開(出典:VortexMakes)
図6は、ネットワークへのMQTTメッセージの受信と送信が失敗するシナリオを示しています。ネットワークマネージャーがネットワークからトラフィックを受信できない場合、ReceiveFailをMqttMgrアクターに送信します。同様に、ネットワークマネージャーがネットワークにデータを送信できない場合、SendFailをMqttMgrアクターに送信します。
図6.ネットワークの障害(出典:VortexMakes)
表2は、示されているシナリオに関連するイベントをまとめたものです。
表2.イベント
結論
セマフォ、ミューテックス、遅延、イベントフラグなど、従来のブロッキングメカニズムの危険を回避することで、MQTT-Reactiveライブラリ、ステートマシン、およびこの記事で提案するソフトウェアアーキテクチャにより、リアクティブな組み込みシステムでMQTTクライアントを新規に実装できます。仕方。これは、MQTT-Reactiveコードをアクティブオブジェクトと呼ばれる同時実行ユニット内にカプセル化することで実現されます。 、その状態ベースの動作は、提案されたステートマシンで定義されています。このアクティブオブジェクトは、ネットワークを介したトラフィックの送受信だけでなく、モノのインターネットアプリケーションのサーバーへの情報の接続と公開にも使用される非同期イベントを交換することにより、システムの他の部分と通信します。
埋め込み
- IIoTの分類法
- インダストリー4.0向けのフレキシブル生産システムの構築
- マイクロコントローラと組み込みシステムのIC技術に関する簡単な説明
- RTLSソリューションを正常に実装するための6つの防弾のヒント
- WürthElektronikeiSosがスマートシステムの新しいコンポーネントを発表
- ロボットシステムのモーター制御の設計
- Syslogic:建設機械用の頑丈なコンピューターとHMIシステム
- KontronおよびSYSGO:セーフティクリティカルシステム向けのSAFe-VXコンピューティングプラットフォーム
- 回路の望ましい状態構成
- 企業はインテリジェントシステムの期限を設定します
- メーカー向けのトップ10ワークフロー