PlatformIOでESP32を開発する

MQTTクライアント(AsyncMqttClient)


MQTTクライアントを実装する場合、ESP8266ではPubSubClientライブラリを使いますが、ESP32ではこのライブラリが使え ません。
ESP32ではこち らのライブラリが使えます。

ライブラリのインストールはPlastformIOのライブラリマネージャーを使います。
私はグローバルライブラリとしてインストールしました。
$ pio lib --global install "AsyncMqttClient"
Library Storage: /home/pi/.platformio/lib
Looking for AsyncMqttClient library in registry
Found: https://platformio.org/lib/show/346/AsyncMqttClient
LibraryManager: Installing id=346
Downloading...
Unpacking  [####################################]  100%
AsyncMqttClient @ 0.8.2 has been successfully installed!
Installing dependencies
Looking for ESPAsyncTCP library in registry
Found: https://platformio.org/lib/show/305/ESPAsyncTCP
LibraryManager: Installing id=305 @ ^1.1.0
Downloading...
Unpacking  [####################################]  100%
ESPAsyncTCP @ 1.2.0 has been successfully installed!
Looking for AsyncTCP library in registry
Found: https://platformio.org/lib/show/1826/AsyncTCP
LibraryManager: Installing id=1826 @ ^1.0.0
Downloading...
Unpacking  [####################################]  100%
AsyncTCP @ 1.0.3 has been successfully installed!



こちらに PlatformIOのサンプルコードが有りますが、ESP8266用のコードなので、そのままではビルドが通りません。
そこで、FreeRTOSのマルチタスク機能を使って、PublisherとSubscriberを分離してみました。

以下のコードでは、3つのタスクを起動しています。

・mqttTask
MQTTの送受信を行うタスクです。

・pubTask
MQTTのPublishを行うタスクです。

・subTask
Subscribeしたデータを表示するタスクです。

SSID、PASSWORD、MQTT_HOSTは適当に変更してください。
#include "freertos/FreeRTOS.h"
#include <Arduino.h>
#include <WiFi.h>
#include <Ticker.h>
#include <AsyncMqttClient.h>

#define WIFI_SSID "SSID"
#define WIFI_PASS "PASSWORD"

#define MQTT_HOST IPAddress(192, 168, 10, 40)
#define MQTT_PORT 1883

typedef struct {
  char topic[64];
  char payload[64];
  int qos;
} MQTT_t;

QueueHandle_t xQueue1;
QueueHandle_t xQueue2;

AsyncMqttClient mqttClient;
Ticker mqttReconnectTimer;

void connectToWiFi() {
  Serial.println("Connecting to WiFi...");
  WiFi.begin(WIFI_SSID, WIFI_PASS);
  while (WiFi.status() != WL_CONNECTED) {
    Serial.println("Wait for WiFi...");
    delay(500);
  }
}

void connectToMqtt() {
  Serial.println("Connecting to MQTT...");
  mqttClient.connect();
  while (1) {
    Serial.println("Wait for MQTT...");
    if (mqttClient.connected()) break;
    delay(500);
  }
}

void onMqttConnect(bool sessionPresent) {
  Serial.println("Connected to MQTT.");
  Serial.print("Session present: ");
  Serial.println(sessionPresent);
//  uint16_t packetIdSub = mqttClient.subscribe("test/lol", 2);
  uint16_t packetIdSub = mqttClient.subscribe("test/receive", 2);
  Serial.print("Subscribing at QoS 2, packetId: ");
  Serial.println(packetIdSub);

#if 0
  mqttClient.publish("test/lol", 0, true, "test 1");
  Serial.println("Publishing at QoS 0");
  uint16_t packetIdPub1 = mqttClient.publish("test/lol", 1, true, "QoS 1");
  Serial.print("Publishing at QoS 1, packetId: ");
  Serial.println(packetIdPub1);
  uint16_t packetIdPub2 = mqttClient.publish("test/lol", 2, true, "QoS 2");
  Serial.print("Publishing at QoS 2, packetId: ");
  Serial.println(packetIdPub2);
#endif
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
  Serial.println("Disconnected from MQTT.");
#if 0
  if (reason == AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT) {
    Serial.println("Bad server fingerprint.");
  }

  if (WiFi.isConnected()) {
    mqttReconnectTimer.once(2, connectToMqtt);
  }
#endif
}

void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
  Serial.println("Subscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
  Serial.print("  qos: ");
  Serial.println(qos);
}

void onMqttUnsubscribe(uint16_t packetId) {
  Serial.println("Unsubscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("  payload: ");

  MQTT_t _receive;
  strcpy(_receive.topic,topic);
  _receive.qos = properties.qos;;
  for(int pos=0; pos<len; pos++) {
    Serial.print(payload[pos]);
    _receive.payload[pos] = payload[pos];
    _receive.payload[pos+1] = 0;
  }
  Serial.println();
  xQueueSendFromISR(xQueue2, &_receive, 0);
}

void onMqttPublish(uint16_t packetId) {
  Serial.println("Publish acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}


void mqttTask(void *pvParameter)
{
  char * taskName = pcTaskGetTaskName( NULL );
  UBaseType_t taskPrio = uxTaskPriorityGet( NULL );
  TickType_t nowTick;

  nowTick = xTaskGetTickCount();
  printf("[%s:%d] start Priority=%d\n",taskName,nowTick,taskPrio);

  mqttClient.onConnect(onMqttConnect);
  mqttClient.onDisconnect(onMqttDisconnect);
  mqttClient.onSubscribe(onMqttSubscribe);
  mqttClient.onUnsubscribe(onMqttUnsubscribe);
  mqttClient.onMessage(onMqttMessage);
  mqttClient.onPublish(onMqttPublish);
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);

  MQTT_t send;
  while(1) {
    connectToMqtt();

    while(1) {
      xQueueReceive(xQueue1, &send, portMAX_DELAY);
      nowTick = xTaskGetTickCount();
      printf("[%s:%d] topic=[%s] payload=[%s] qos=%d\n",taskName,nowTick,send.topic,send.payload,send.qos);
      if (mqttClient.connected()) {
        uint16_t packetIdPub = mqttClient.publish(send.topic, send.qos, true, send.payload);
//      mqttClient.publish("test/send", 0, true, "test 1");
        Serial.print("Publishing at QoS ");
        Serial.println(send.qos);
        Serial.print("Publishing packetId: ");
        Serial.println(packetIdPub);
      } else {
        Serial.println("MQTT Not connected");
        if (WiFi.isConnected()) {
          Serial.println("WiFi connected");
        } else {
          Serial.println("WiFi Not connected");
          connectToWiFi();
        }
        break;
      }
    } // end while
  } // end while
}

void pubTask(void *pvParameter)
{
  uint32_t interval = ( uint32_t ) pvParameter;
  char * taskName = pcTaskGetTaskName( NULL );
  UBaseType_t taskPrio = uxTaskPriorityGet( NULL );
  TickType_t nowTick;

  nowTick = xTaskGetTickCount();
  printf("[%s:%d] start Priority=%d interval=%d\n",taskName,nowTick,taskPrio,interval);

  MQTT_t send;
  strcpy(send.topic,"test/send");
  send.qos = 0;
  while(1) {
    nowTick = xTaskGetTickCount();
    sprintf(send.payload, "%d", nowTick);
    xQueueSend(xQueue1, &send, 0);
    vTaskDelay(interval / portTICK_PERIOD_MS);
  }
}

void subTask(void *pvParameter)
{
  char * taskName = pcTaskGetTaskName( NULL );
  UBaseType_t taskPrio = uxTaskPriorityGet( NULL );
  TickType_t nowTick;

  nowTick = xTaskGetTickCount();
  printf("[%s:%d] start Priority=%d\n",taskName,nowTick,taskPrio);

  MQTT_t receive;
  while(1) {
    xQueueReceive(xQueue2, &receive, portMAX_DELAY);
    nowTick = xTaskGetTickCount();
    printf("[%s:%d] topic=[%s] payload=[%s] qos=%d\n",taskName,nowTick,receive.topic,receive.payload,receive.qos);
  }
}


void setup() {
  delay(2000); Serial.begin(115200);
  Serial.println("ESP32 setup start");

#if 0
  WiFi.begin(WIFI_SSID, WIFI_PASS);

  Serial.println("Wait for WiFi...");
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
  }
#endif
  connectToWiFi();

#if 0
  mqttClient.onConnect(onMqttConnect);
  mqttClient.onDisconnect(onMqttDisconnect);
  mqttClient.onSubscribe(onMqttSubscribe);
  mqttClient.onUnsubscribe(onMqttUnsubscribe);
  mqttClient.onMessage(onMqttMessage);
  mqttClient.onPublish(onMqttPublish);
  mqttClient.setServer(MQTT_HOST, MQTT_PORT);

  connectToMqtt();
#endif

  /* Create the queues and semaphores that will be contained in the set. */
  xQueue1 = xQueueCreate( 10, sizeof(MQTT_t) );
  xQueue2 = xQueueCreate( 10, sizeof(MQTT_t) );
  /* Check everything was created. */
  configASSERT( xQueue1 );
  configASSERT( xQueue2 );

  /* Start MQTT Task */
  xTaskCreate(mqttTask, "MQTT", 4096, NULL, 1, NULL);

  /* Start PUBLISH Task */
  xTaskCreate(pubTask, "PUBLISH", 4096, (void *) 3000, 1, NULL);

  /* Start SUBSCRIBE Task */
  xTaskCreate(subTask, "SUBSCRIBE", 4096, NULL, 1, NULL);

  /* stop loop task */
  vTaskDelete( NULL );

}

void loop() { // Never run
}

ビルドすると3000Tick毎に「test/send」のトピックをPublishします。
また、「test/receive」のトピックをSubscribeしています。


別の端末でSubscribeしてみました。


別の端末から「test/receive」のトピックをPublishしてみました。




続く....