#include "freertos/FreeRTOS.h"
#include <Arduino.h>
#include <WiFi.h>
#include <Ticker.h>
#include <AsyncMqttClient.h>
#define WIFI_SSID "SSID"
#define WIFI_PASS "PASSWORD"
#define MQTT_HOST IPAddress(192, 168, 10, 40)
#define MQTT_PORT 1883
typedef struct {
char topic[64];
char payload[64];
int qos;
} MQTT_t;
QueueHandle_t xQueue1;
QueueHandle_t xQueue2;
AsyncMqttClient mqttClient;
Ticker mqttReconnectTimer;
void connectToWiFi() {
Serial.println("Connecting to WiFi...");
WiFi.begin(WIFI_SSID, WIFI_PASS);
while (WiFi.status() != WL_CONNECTED) {
Serial.println("Wait for WiFi...");
delay(500);
}
}
void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
while (1) {
Serial.println("Wait for MQTT...");
if (mqttClient.connected()) break;
delay(500);
}
}
void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
// uint16_t packetIdSub =
mqttClient.subscribe("test/lol", 2);
uint16_t packetIdSub =
mqttClient.subscribe("test/receive", 2);
Serial.print("Subscribing at QoS 2, packetId: ");
Serial.println(packetIdSub);
#if 0
mqttClient.publish("test/lol", 0, true, "test 1");
Serial.println("Publishing at QoS 0");
uint16_t packetIdPub1 =
mqttClient.publish("test/lol", 1, true, "QoS 1");
Serial.print("Publishing at QoS 1, packetId: ");
Serial.println(packetIdPub1);
uint16_t packetIdPub2 =
mqttClient.publish("test/lol", 2, true, "QoS 2");
Serial.print("Publishing at QoS 2, packetId: ");
Serial.println(packetIdPub2);
#endif
}
void onMqttDisconnect(AsyncMqttClientDisconnectReason
reason) {
Serial.println("Disconnected from MQTT.");
#if 0
if (reason ==
AsyncMqttClientDisconnectReason::TLS_BAD_FINGERPRINT) {
Serial.println("Bad server
fingerprint.");
}
if (WiFi.isConnected()) {
mqttReconnectTimer.once(2,
connectToMqtt);
}
#endif
}
void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
}
void onMqttUnsubscribe(uint16_t packetId) {
Serial.println("Unsubscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void onMqttMessage(char* topic, char* payload,
AsyncMqttClientMessageProperties properties, size_t len,
size_t index, size_t total) {
Serial.println("Publish received.");
Serial.print(" topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
Serial.print(" payload: ");
MQTT_t _receive;
strcpy(_receive.topic,topic);
_receive.qos = properties.qos;;
for(int pos=0; pos<len; pos++) {
Serial.print(payload[pos]);
_receive.payload[pos] = payload[pos];
_receive.payload[pos+1] = 0;
}
Serial.println();
xQueueSendFromISR(xQueue2, &_receive, 0);
}
void onMqttPublish(uint16_t packetId) {
Serial.println("Publish acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}
void mqttTask(void *pvParameter)
{
char * taskName = pcTaskGetTaskName( NULL );
UBaseType_t taskPrio = uxTaskPriorityGet( NULL );
TickType_t nowTick;
nowTick = xTaskGetTickCount();
printf("[%s:%d] start
Priority=%d\n",taskName,nowTick,taskPrio);
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onPublish(onMqttPublish);
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
MQTT_t send;
while(1) {
connectToMqtt();
while(1) {
xQueueReceive(xQueue1,
&send, portMAX_DELAY);
nowTick =
xTaskGetTickCount();
printf("[%s:%d] topic=[%s]
payload=[%s]
qos=%d\n",taskName,nowTick,send.topic,send.payload,send.qos);
if (mqttClient.connected())
{
uint16_t
packetIdPub = mqttClient.publish(send.topic, send.qos,
true, send.payload);
//
mqttClient.publish("test/send", 0, true, "test 1");
Serial.print("Publishing at QoS ");
Serial.println(send.qos);
Serial.print("Publishing packetId: ");
Serial.println(packetIdPub);
} else {
Serial.println("MQTT Not connected");
if
(WiFi.isConnected()) {
Serial.println("WiFi connected");
} else {
Serial.println("WiFi Not connected");
connectToWiFi();
}
break;
}
} // end while
} // end while
}
void pubTask(void *pvParameter)
{
uint32_t interval = ( uint32_t ) pvParameter;
char * taskName = pcTaskGetTaskName( NULL );
UBaseType_t taskPrio = uxTaskPriorityGet( NULL );
TickType_t nowTick;
nowTick = xTaskGetTickCount();
printf("[%s:%d] start Priority=%d
interval=%d\n",taskName,nowTick,taskPrio,interval);
MQTT_t send;
strcpy(send.topic,"test/send");
send.qos = 0;
while(1) {
nowTick = xTaskGetTickCount();
sprintf(send.payload, "%d", nowTick);
xQueueSend(xQueue1, &send, 0);
vTaskDelay(interval /
portTICK_PERIOD_MS);
}
}
void subTask(void *pvParameter)
{
char * taskName = pcTaskGetTaskName( NULL );
UBaseType_t taskPrio = uxTaskPriorityGet( NULL );
TickType_t nowTick;
nowTick = xTaskGetTickCount();
printf("[%s:%d] start
Priority=%d\n",taskName,nowTick,taskPrio);
MQTT_t receive;
while(1) {
xQueueReceive(xQueue2, &receive,
portMAX_DELAY);
nowTick = xTaskGetTickCount();
printf("[%s:%d] topic=[%s] payload=[%s]
qos=%d\n",taskName,nowTick,receive.topic,receive.payload,receive.qos);
}
}
void setup() {
delay(2000); Serial.begin(115200);
Serial.println("ESP32 setup start");
#if 0
WiFi.begin(WIFI_SSID, WIFI_PASS);
Serial.println("Wait for WiFi...");
while (WiFi.status() != WL_CONNECTED) {
delay(500);
}
#endif
connectToWiFi();
#if 0
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onPublish(onMqttPublish);
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
connectToMqtt();
#endif
/* Create the queues and semaphores that will be
contained in the set. */
xQueue1 = xQueueCreate( 10, sizeof(MQTT_t) );
xQueue2 = xQueueCreate( 10, sizeof(MQTT_t) );
/* Check everything was created. */
configASSERT( xQueue1 );
configASSERT( xQueue2 );
/* Start MQTT Task */
xTaskCreate(mqttTask, "MQTT", 4096, NULL, 1, NULL);
/* Start PUBLISH Task */
xTaskCreate(pubTask, "PUBLISH", 4096, (void *)
3000, 1, NULL);
/* Start SUBSCRIBE Task */
xTaskCreate(subTask, "SUBSCRIBE", 4096, NULL, 1,
NULL);
/* stop loop task */
vTaskDelete( NULL );
}
void loop() { // Never run
} |