測試環(huán)境:stm32F401RCT6、RT-Thread版本: v4.1.0、RT-Thread Studio版本: 2.2.6、網(wǎng)絡(luò)硬件使用ec800m移植at_socket使用sal框架。
1、移植介紹
RyanMqtt 庫希望應(yīng)用程序?yàn)橐韵?a target="_blank">接口提供實(shí)現(xiàn):
system 接口
RyanMqtt 需要 RTOS 支持,必須實(shí)現(xiàn)如下接口才可以保證 mqtt 客戶端的正常運(yùn)行
network 接口
RyanMqtt 依賴于底層傳輸接口 API,必須實(shí)現(xiàn)該接口 API 才能在網(wǎng)絡(luò)上發(fā)送和接收數(shù)據(jù)包
MQTT 協(xié)議要求基礎(chǔ)傳輸層能夠提供有序的、可靠的、雙向傳輸(從客戶端到服務(wù)端 和從服務(wù)端到客戶端)的字節(jié)流
time 接口
RyanMqtt 依靠函數(shù)生成毫秒時(shí)間戳,用于計(jì)算持續(xù)時(shí)間和超時(shí),內(nèi)部已經(jīng)做了數(shù)值溢出處理
2、開始移植
得益于RT-Thread驅(qū)動(dòng)應(yīng)用層分離的思想和SAL框架,platform/rtthread的適配層可以適應(yīng)任何RT-Thread代碼,所以我們就不拿RT-Thread來移植了。
使用FreeRTOS內(nèi)核來移植,使用CMSIS-RTOS V2兼容層。
system 接口
系統(tǒng)接口,需要移植RTOS的接口。為方便管理類型使用平臺(tái)結(jié)構(gòu)體,修改platformSystem.h里面的結(jié)構(gòu)體
就是線程和互斥鎖
typedef struct
{
osThreadId_t thread;
} platformThread_t;
typedef struct
{
osMutexId_t mutex;
} platformMutex_t;
再來實(shí)現(xiàn)platformSystem.c里面的函數(shù)定義
注意里面的 platformPrint 函數(shù),由于FreeRTOS沒有官方的打印接口。記得修改為你的打印接口
#include "platformSystem.h"
/**
- @brief 申請(qǐng)內(nèi)存
- @param size
- @return void*
/
void platformMemoryMalloc(size_t size)
{
return pvPortMalloc(size);
}
/ - @brief 釋放內(nèi)存
- @param ptr
/
void platformMemoryFree(void ptr)
{
vPortFree(ptr);
}
/ - @brief ms延時(shí)
- @param ms
/
void platformDelay(uint32_t ms)
{
osDelay(ms);
}
/ * - @brief 打印字符串函數(shù),可通過串口打印出去
- @param str
- @param strLen
/
void platformPrint(char str, uint16_t strLen)
{
}
/ - @brief 初始化并運(yùn)行線程
- @param userData
- @param platformThread
- @param name
- @param entry
- @param param
- @param stackSize
- @param priority
- @return RyanMqttError_e
*/
RyanMqttError_e platformThreadInit(void *userData,
platformThread_t *platformThread,
const char *name,
void (*entry)(void ),
void const param,
uint32_t stackSize,
uint32_t priority)
{
const osThreadAttr_t myTask02_attributes = {
.name = name,
.stack_size = stackSize,
.priority = (osPriority_t)priority,
};
platformThread->thread = osThreadNew(entry, param, &myTask02_attributes);
if (NULL == platformThread->thread)
return RyanMqttNoRescourceError;
return RyanMqttSuccessError;
}
/ - @brief 銷毀自身線程
- @param userData
- @param platformThread
- @return RyanMqttError_e
*/
RyanMqttError_e platformThreadDestroy(void userData, platformThread_t platformThread)
{
osThreadTerminate(platformThread->thread);
return RyanMqttSuccessError;
}
/ - @brief 開啟線程
- @param userData
- @param platformThread
- @return RyanMqttError_e
*/
RyanMqttError_e platformThreadStart(void userData, platformThread_t platformThread)
{
osThreadResume(platformThread->thread);
return RyanMqttSuccessError;
}
/ - @brief 掛起線程
- @param userData
- @param platformThread
- @return RyanMqttError_e
*/
RyanMqttError_e platformThreadStop(void userData, platformThread_t platformThread)
{
osThreadSuspend(platformThread->thread);
return RyanMqttSuccessError;
}
/ - @brief 互斥鎖初始化
- @param userData
- @param platformMutex
- @return RyanMqttError_e
*/
RyanMqttError_e platformMutexInit(void userData, platformMutex_t platformMutex)
{
const osMutexAttr_t myMutex01_attributes = {
.name = "mqttMutex"};
platformMutex->mutex = osMutexNew(&myMutex01_attributes);
return RyanMqttSuccessError;
}
/ - @brief 銷毀互斥鎖
- @param userData
- @param platformMutex
- @return RyanMqttError_e
*/
RyanMqttError_e platformMutexDestroy(void userData, platformMutex_t platformMutex)
{
osMutexDelete(platformMutex->mutex);
return RyanMqttSuccessError;
}
/ - @brief 阻塞獲取互斥鎖
- @param userData
- @param platformMutex
- @return RyanMqttError_e
*/
RyanMqttError_e platformMutexLock(void userData, platformMutex_t platformMutex)
{
osMutexAcquire(platformMutex->mutex, osWaitForever);
return RyanMqttSuccessError;
}
/ - @brief 釋放互斥鎖
- @param userData
- @param platformMutex
- @return RyanMqttError_e
*/
RyanMqttError_e platformMutexUnLock(void userData, platformMutex_t platformMutex)
{
osMutexRelease(platformMutex->mutex);
return RyanMqttSuccessError;
}
/ - @brief 進(jìn)入臨界區(qū) / 關(guān)中斷
/
void platformCriticalEnter(void)
{
osKernelLock();
}
/ *
- @brief 退出臨界區(qū) / 開中斷
*/
void platformCriticalExit(void)
{
osKernelUnlock();
}
time 接口
time接口,只需要提供一個(gè)ms時(shí)間戳就行,直接修改函數(shù),這里使用FreeRTOS的心跳。
uint32_t platformUptimeMs(void)
{
if (1000 == osKernelGetTickFreq())
return (uint32_t)osKernelGetTickCount();
else
{
uint32_t tick = 0;
tick = osKernelGetTickCount() * 1000;
return (uint32_t)((tick + osKernelGetTickCount() - 1) / osKernelGetTickCount());
}
}
network 接口
MQTT 協(xié)議要求基礎(chǔ)傳輸層能夠提供有序的、可靠的、雙向傳輸(從客戶端到服務(wù)端 和從服務(wù)端到客戶端)的字節(jié)流
由于FreeRTOS沒有規(guī)定標(biāo)準(zhǔn)的網(wǎng)絡(luò)層,你可以選擇 FreeRTOS-Plus-TCP / FreeRTOS-Cellular-Interface/ lwip / W5500等網(wǎng)絡(luò)方法,幾乎RT-Thread支持的你也可以在FreeRTOS倉庫找到。
這里以lwip為例,使用socket接口來實(shí)現(xiàn),網(wǎng)絡(luò)阻塞發(fā)送和接收使用 SO_SNDTIMEO 和 SO_RCVTIMEO 來實(shí)現(xiàn),你也可以選擇select / poll / epoll等方式。
修改 platformNetwork_t 結(jié)構(gòu)體以支持 socket
typedef struct
{
int socket;
} platformNetwork_t;
接著實(shí)現(xiàn)platformNetwork.c里面的函數(shù)
#define rlogEnable 1 // 是否使能日志
#define rlogColorEnable 1 // 是否使能日志顏色
#define rlogLevel (rlogLvlWarning) // 日志打印等級(jí)
#define rlogTag "RyanMqttNet" // 日志tag
#include "platformNetwork.h"
#include "RyanMqttLog.h"
/**
- @brief 連接mqtt服務(wù)器
- @param userData
- @param platformNetwork
- @param host
- @param port
- @return RyanMqttError_e
- 成功返回RyanMqttSuccessError, 失敗返回錯(cuò)誤信息
*/
RyanMqttError_e platformNetworkConnect(void *userData, platformNetwork_t *platformNetwork, const char *host, const char port)
{
RyanMqttError_e result = RyanMqttSuccessError;
struct addrinfo addrList = NULL;
struct addrinfo hints = {
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM,
.ai_protocol = IPPROTO_TCP};
if (getaddrinfo(host, port, &hints, &addrList) != 0)
{
result = RyanSocketFailedError;
goto exit;
}
platformNetwork->socket = socket(addrList->ai_family, addrList->ai_socktype, addrList->ai_protocol);
if (platformNetwork->socket < 0)
{
result = RyanSocketFailedError;
goto exit;
}
if (connect(platformNetwork->socket, addrList->ai_addr, addrList->ai_addrlen) != 0)
{
platformNetworkClose(userData, platformNetwork);
result = RyanMqttSocketConnectFailError;
goto exit;
}
exit:
if (NULL != addrList)
freeaddrinfo(addrList);
return result;
}
/ - @brief 非阻塞接收數(shù)據(jù)
- @param userData
- @param platformNetwork
- @param recvBuf
- @param recvLen
- @param timeout
- @return RyanMqttError_e
- socket錯(cuò)誤返回 RyanSocketFailedError
- 接收超時(shí)或者接收數(shù)據(jù)長度不等于期待數(shù)據(jù)接受長度 RyanMqttRecvPacketTimeOutError
- 接收成功 RyanMqttSuccessError
*/
RyanMqttError_e platformNetworkRecvAsync(void *userData, platformNetwork_t *platformNetwork, char recvBuf, int recvLen, int timeout)
{
int32_t recvResult = 0;
int32_t offset = 0;
int32_t timeOut2 = timeout;
struct timeval tv = {0};
platformTimer_t timer = {0};
if (-1 == platformNetwork->socket)
return RyanSocketFailedError;
platformTimerCutdown(&timer, timeout);
while ((offset < recvLen) && (0 != timeOut2))
{
tv.tv_sec = timeOut2 / 1000;
tv.tv_usec = timeOut2 % 1000 * 1000;
if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
{
tv.tv_sec = 0;
tv.tv_usec = 100;
}
setsockopt(platformNetwork->socket, SOL_SOCKET, SO_RCVTIMEO, (char )&tv, sizeof(struct timeval)); // 設(shè)置錯(cuò)做模式為非阻塞
recvResult = recv(platformNetwork->socket, recvBuf + offset, recvLen - offset, 0);
if (recvResult <= 0) // 小于零,表示錯(cuò)誤,個(gè)別錯(cuò)誤不代表socket錯(cuò)誤
{
// 下列3種表示沒問題,但需要推出發(fā)送
if ((errno == EAGAIN || // 套接字已標(biāo)記為非阻塞,而接收操作被阻塞或者接收超時(shí)
errno == EWOULDBLOCK || // 發(fā)送時(shí)套接字發(fā)送緩沖區(qū)已滿,或接收時(shí)套接字接收緩沖區(qū)為空
errno == EINTR)) // 操作被信號(hào)中斷
break;
return RyanSocketFailedError;
}
offset += recvResult;
timeOut2 = platformTimerRemain(&timer);
}
if (offset != recvLen)
return RyanMqttRecvPacketTimeOutError;
return RyanMqttSuccessError;
}
/ - @brief 非阻塞發(fā)送數(shù)據(jù)
- @param userData
- @param platformNetwork
- @param sendBuf
- @param sendLen
- @param timeout
- @return RyanMqttError_e
- socket錯(cuò)誤返回 RyanSocketFailedError
- 接收超時(shí)或者接收數(shù)據(jù)長度不等于期待數(shù)據(jù)接受長度 RyanMqttRecvPacketTimeOutError
- 接收成功 RyanMqttSuccessError
*/
RyanMqttError_e platformNetworkSendAsync(void *userData, platformNetwork_t *platformNetwork, char sendBuf, int sendLen, int timeout)
{
int32_t sendResult = 0;
int32_t offset = 0;
int32_t timeOut2 = timeout;
struct timeval tv = {0};
platformTimer_t timer = {0};
if (-1 == platformNetwork->socket)
return RyanSocketFailedError;
platformTimerCutdown(&timer, timeout);
while ((offset < sendLen) && (0 != timeOut2))
{
tv.tv_sec = timeOut2 / 1000;
tv.tv_usec = timeOut2 % 1000 * 1000;
if (tv.tv_sec <= 0 && tv.tv_usec <= 100)
{
tv.tv_sec = 0;
tv.tv_usec = 100;
}
setsockopt(platformNetwork->socket, SOL_SOCKET, SO_SNDTIMEO, (char )&tv, sizeof(struct timeval)); // 設(shè)置錯(cuò)做模式為非阻塞
sendResult = send(platformNetwork->socket, sendBuf + offset, sendLen - offset, 0);
if (sendResult <= 0) // 小于零,表示錯(cuò)誤,個(gè)別錯(cuò)誤不代表socket錯(cuò)誤
{
// 下列3種表示沒問題,但需要推出發(fā)送
if ((errno == EAGAIN || // 套接字已標(biāo)記為非阻塞,而接收操作被阻塞或者接收超時(shí)
errno == EWOULDBLOCK || // 發(fā)送時(shí)套接字發(fā)送緩沖區(qū)已滿,或接收時(shí)套接字接收緩沖區(qū)為空
errno == EINTR)) // 操作被信號(hào)中斷
break;
return RyanSocketFailedError;
}
offset += sendResult;
timeOut2 = platformTimerRemain(&timer);
}
if (offset != sendLen)
return RyanMqttSendPacketTimeOutError;
return RyanMqttSuccessError;
}
/
@brief 斷開mqtt服務(wù)器連接
@param userData
@param platformNetwork
@return RyanMqttError_e
*/
RyanMqttError_e platformNetworkClose(void *userData, platformNetwork_t *platformNetwork)
{
if (platformNetwork->socket >= 0)
{
closesocket(platformNetwork->socket);
platformNetwork->socket = -1;
}
return RyanMqttSuccessError;
}
3、總結(jié)
可以看到,RyanMqtt移植非常簡單,有專門的platform層用來移植。
-
FreeRTOS
+關(guān)注
關(guān)注
12文章
484瀏覽量
62181 -
RT-Thread
+關(guān)注
關(guān)注
31文章
1289瀏覽量
40134 -
STM32F401
+關(guān)注
關(guān)注
1文章
16瀏覽量
10496 -
MQTT協(xié)議
+關(guān)注
關(guān)注
0文章
97瀏覽量
5379 -
TCP通信
+關(guān)注
關(guān)注
0文章
146瀏覽量
4223
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論