MQTT経由でPLCnext ControlをApache Kafkaに接続
技術的背景
カフカ
Apache Kafka は、データの取り込み、保存、処理、および再配布のためのフレームワークです。現在では、世界中の企業で広く導入されています。 Kafka の公式 Web サイトでは、そのアイデアとその展開方法に関する詳細情報が提供されています。その主な機能の 1 つは、MQTT などの他のアプリケーションや通信プロトコルへの既存のコネクタが多数存在することです。
MQTT
MQTT は軽量の TCP ベースのメッセージング プロトコルであり、堅牢でフットプリントが小さいため、IoT 通信によく使用されます。 OASIS 標準 MQTT の詳細については、その Web サイトを参照してください。
ここでは、Eclipse からの MQTT 実装である PLCnext 用に mosquitto をクロスコンパイルする方法に関する Makers Blog 記事を見つけることができます。または、PLCnext Store ですぐに使える MQTT アプリを提供しています。
要件
- PLCnext 上の MQTT クライアント (実装のヒントについては、前のセクションを参照してください)
- コントローラが PC/VM に接続されている
- PC/VM 上の MQTT ブローカー (mosquitto など)
- PC/VM 上の Kafka インスタンス (Kafka のクイックスタート ガイドを参照)
セットアップ
次の図は、PLCnext コントロールから Kafka にデータを取り込むために実装するセットアップの概要を示しています。 Confluent の MQTT Proxy を Kafka のバージョンに使用することは可能ですが (2)、より一般的なソリューション (1) に焦点を当てます。これは、クライアントがメッセージに接続してパブリッシュする MQTT ブローカーと、ブローカーでトピックにサブスクライブし、メッセージを処理して Kafka に転送するコネクターで構成されます。
コネクタの作成
このチュートリアルでは、コネクターは GitHub の evokly/kafka-connect-mqtt リポジトリーに基づいており、MIT ライセンス (詳細なライセンス情報) の下でライセンスされています。まず、リポジトリをダウンロードして抽出します。最新のリポジトリ バージョンは 2016 年末なので、build.gradle
を更新します。 ファイル、古い依存関係を新しいバージョンに置き換えることにより:
ext { kafkaVersion = '2.6.0' }
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.13'
compile "org.apache.kafka:connect-api:$kafkaVersion"
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
compile 'org.bouncycastle:bcprov-jdk15on:1.67'
compile 'org.bouncycastle:bcpkix-jdk15on:1.67'
compile 'org.bouncycastle:bcpg-jdk15on:1.67'
compile 'commons-io:commons-io:2.8.0'
compile 'org.slf4j:slf4j-api:1.7.30'
testCompile 'org.slf4j:slf4j-simple:1.7.30'
}
この例では、プレーンな String メッセージを Kafka に送信します。したがって、Java クラス DumbProcessor.java
を編集する必要があります。 /kafka-connect-mqtt-master/src/main/java/com/evokly/kafka/connect/mqtt
フォルダー内 、これはデフォルトのメッセージ プロセッサです:
@Override
public SourceRecord[] getRecords(String kafkaTopic) {
return new SourceRecord[]{new SourceRecord(null, //sourcePartition
null, //sourceOffset
kafkaTopic, //topic
null, //partition
null, //keySchema
mTopic, //key
null, //valueSchema
mMessage.toString(), //value
new Long(123L))}; //long timestamp
}
その後、依存関係を含む Java アーカイブ ファイル (JAR) を作成します:./gradlew clean jar
.出力 JAR kafka-connect-mqtt-1.1-SNAPSHOT.jar
をコピーします フォルダ /kafka-connect-mqtt-master/build/libs
にあります libs
まで Kafka のディレクトリ。
Kafka の libs ディレクトリにある org.eclipse.paho.client.mqttv3-1.2.5.jar アーカイブのコピーも必要です。ここからダウンロードできます。
さらに、コネクタ mqtt.properties
の構成ファイルを作成する必要があります。 Kafka の config
で フォルダ。ファイルの内容は次のとおりです:
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
# converters for plain String messages without schemas
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
kafka.topic=test_in # Kafka destination topic for the MQTT messages
mqtt.client_id=mqtt-kafka-123
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://172.17.0.1:1883 # address of the MQTT broker
mqtt.topic=test/# # MQTT topic where the messages should be collected
#if we want to use our own processor class
#message_processor_class=com.evokly.kafka.connect.mqtt.sample.OwnProcessor
ローカル テスト
これで、コネクタをローカルでテストできます。 Kafka のディレクトリに移動し、ZooKeeper および Broker インスタンスを開始します。
# start ZooKeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# start Kafka:
bin/kafka-server-start.sh config/server.properties
# start an MQTT-Broker (here a mosquitto docker container)
sudo docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto
# start the MQTT-Kafka connector
bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
# start a Kafka console consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_in --from-beginning --property print.value=true --property print.timestamp=true
# publish an MQTT message
mosquitto_pub -h 172.17.0.1 -p 1883 -t test/1 -m test123
メッセージはコンソール コンシューマーに表示されます。
産業技術