提交 9b7ae987 编写于 作者: I Ian Craggs

Allow topic and payload to be returned in publish onSuccess/onFailure callbacks #1133

上级 72491060
......@@ -487,7 +487,9 @@ void MQTTAsync_destroy(MQTTAsync* handle)
MQTTAsync_closeSession(m->c, MQTTREASONCODE_SUCCESS, NULL);
MQTTAsync_NULLPublishResponses(m);
MQTTAsync_freeResponses(m);
MQTTAsync_NULLPublishCommands(m);
MQTTAsync_freeCommands(m);
ListFree(m->responses);
......
......@@ -1053,6 +1053,7 @@ static void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
free(command->key);
}
static void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
{
MQTTAsync_freeCommand1(command);
......@@ -1461,11 +1462,6 @@ static int MQTTAsync_processCommand(void)
command->client->pending_write = &command->command;
}
}
else
{
command->command.details.pub.payload = NULL; /* this will be freed by the protocol code */
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
}
free(p); /* should this be done if the write isn't complete? */
}
else if (command->command.type == DISCONNECT)
......@@ -2422,6 +2418,65 @@ static int clientStructCompare(void* a, void* b)
}
/*
* Set destinationName and payload to NULL in all responses
* for a client, so that these memory locations aren't freed twice as they
* are also stored by MQTTProtocol_storePublication.
* @param m the client to process
*/
void MQTTAsync_NULLPublishResponses(MQTTAsyncs* m)
{
FUNC_ENTRY;
if (m->responses)
{
ListElement* cur_response = NULL;
while (ListNextElement(m->responses, &cur_response))
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(cur_response->content);
if (command->command.type == PUBLISH)
{
/* these values are going to be freed in RemovePublication */
command->command.details.pub.destinationName = NULL;
command->command.details.pub.payload = NULL;
}
}
}
FUNC_EXIT;
}
/*
* Set destinationName and payload to NULL in all commands
* for a client, so that these memory locations aren't freed twice as they
* are also stored by MQTTProtocol_storePublication.
* @param m the client to process
*/
void MQTTAsync_NULLPublishCommands(MQTTAsyncs* m)
{
ListElement* current = NULL;
ListElement *next = NULL;
FUNC_ENTRY;
current = ListNextElement(MQTTAsync_commands, &next);
ListNextElement(MQTTAsync_commands, &next);
while (current)
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->client == m && command->command.type == PUBLISH)
{
/* these values are going to be freed in RemovePublication */
command->command.details.pub.destinationName = NULL;
command->command.details.pub.payload = NULL;
}
current = next;
ListNextElement(MQTTAsync_commands, &next);
}
FUNC_EXIT;
}
/**
* Clean the MQTT session data. This includes the MQTT inflight messages, because
* that is part of the MQTT state that will be cleared by the MQTT broker too.
......@@ -2443,6 +2498,7 @@ static int MQTTAsync_cleanSession(Clients* client)
if ((found = ListFindItem(MQTTAsync_handles, client, clientStructCompare)) != NULL)
{
MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
MQTTAsync_NULLPublishResponses(m);
MQTTAsync_freeResponses(m);
}
else
......@@ -2964,6 +3020,7 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
ackrc = 0,
mqttversion = 0;
MQTTProperties msgprops = MQTTProperties_initializer;
Publications* pubToRemove = NULL;
/* This block is so that the ack variable is local and isn't accidentally reused */
{
......@@ -2981,11 +3038,11 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
}
if (pack->header.bits.type == PUBCOMP)
*rc = MQTTProtocol_handlePubcomps(pack, *sock);
*rc = MQTTProtocol_handlePubcomps(pack, *sock, &pubToRemove);
else if (pack->header.bits.type == PUBREC)
*rc = MQTTProtocol_handlePubrecs(pack, *sock);
*rc = MQTTProtocol_handlePubrecs(pack, *sock, &pubToRemove);
else if (pack->header.bits.type == PUBACK)
*rc = MQTTProtocol_handlePubacks(pack, *sock);
*rc = MQTTProtocol_handlePubacks(pack, *sock, &pubToRemove);
if (!m)
Log(LOG_ERROR, -1, "PUBCOMP, PUBACK or PUBREC received for no client, msgid %d", msgid);
if (m && (msgtype != PUBREC || ackrc >= MQTTREASONCODE_UNSPECIFIED_ERROR))
......@@ -3043,6 +3100,16 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
Log(TRACE_MIN, -1, "Calling publish failure for client %s", m->c->clientID);
(*(command->command.onFailure5))(command->command.context, &data);
}
if (pubToRemove != NULL)
{
MQTTProtocol_removePublication(pubToRemove);
pubToRemove = NULL;
/* removePublication has freed the topic and payload memory, so here we indicate that
* so freeCommand doesn't try to free them again.
*/
command->command.details.pub.destinationName = NULL;
command->command.details.pub.payload = NULL;
}
MQTTAsync_freeCommand(command);
break;
}
......@@ -3050,6 +3117,8 @@ static MQTTPacket* MQTTAsync_cycle(SOCKET* sock, unsigned long timeout, int* rc)
if (mqttversion >= MQTTVERSION_5)
MQTTProperties_free(&msgprops);
}
if (pubToRemove != NULL)
MQTTProtocol_removePublication(pubToRemove);
}
else if (pack->header.bits.type == PUBREL)
*rc = MQTTProtocol_handlePubrels(pack, *sock);
......
......@@ -172,6 +172,8 @@ int MQTTAsync_getNoBufferedMessages(MQTTAsyncs* m);
void MQTTAsync_writeContinue(SOCKET socket);
void MQTTAsync_writeComplete(SOCKET socket, int rc);
void setRetryLoopInterval(int keepalive);
void MQTTAsync_NULLPublishResponses(MQTTAsyncs* m);
void MQTTAsync_NULLPublishCommands(MQTTAsyncs* m);
#if defined(_WIN32) || defined(_WIN64)
#else
......
......@@ -2570,7 +2570,7 @@ static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int
(*(m->published))(m->published_context, msgid, pack->header.bits.type, &ack.properties, ack.rc);
}
*rc = (pack->header.bits.type == PUBCOMP) ?
MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock);
MQTTProtocol_handlePubcomps(pack, *sock, NULL) : MQTTProtocol_handlePubacks(pack, *sock, NULL);
if (m && m->dc)
{
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
......@@ -2587,7 +2587,7 @@ static MQTTPacket* MQTTClient_cycle(SOCKET* sock, ELAPSED_TIME_TYPE timeout, int
(*(m->published))(m->published_context, pubrec->msgId, pack->header.bits.type,
&pubrec->properties, pubrec->rc);
}
*rc = MQTTProtocol_handlePubrecs(pack, *sock);
*rc = MQTTProtocol_handlePubrecs(pack, *sock, NULL);
}
else if (pack->header.bits.type == PUBREL)
*rc = MQTTProtocol_handlePubrels(pack, *sock);
......
......@@ -297,10 +297,16 @@ void MQTTProtocol_removePublication(Publications* p)
FUNC_ENTRY;
if (p && --(p->refcount) == 0)
{
free(p->payload);
p->payload = NULL;
free(p->topic);
p->topic = NULL;
if (p->payload)
{
free(p->payload);
p->payload = NULL;
}
if (p->topic)
{
free(p->topic);
p->topic = NULL;
}
ListRemove(&(state.publications), p);
}
FUNC_EXIT;
......@@ -427,7 +433,7 @@ exit:
* @param sock the socket on which the packet was received
* @return completion code
*/
int MQTTProtocol_handlePubacks(void* pack, SOCKET sock)
int MQTTProtocol_handlePubacks(void* pack, SOCKET sock, Publications** pubToRemove)
{
Puback* puback = (Puback*)pack;
Clients* client = NULL;
......@@ -453,7 +459,10 @@ int MQTTProtocol_handlePubacks(void* pack, SOCKET sock)
(m->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
m->qos, puback->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (pubToRemove != NULL)
*pubToRemove = m->publish;
else
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m);
......@@ -473,7 +482,7 @@ int MQTTProtocol_handlePubacks(void* pack, SOCKET sock)
* @param sock the socket on which the packet was received
* @return completion code
*/
int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock)
int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock, Publications** pubToRemove)
{
Pubrec* pubrec = (Pubrec*)pack;
Clients* client = NULL;
......@@ -515,7 +524,10 @@ int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock)
(pubrec->MQTTVersion >= MQTTVERSION_5) ? PERSISTENCE_V5_PUBLISH_SENT : PERSISTENCE_PUBLISH_SENT,
m->qos, pubrec->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (pubToRemove != NULL)
*pubToRemove = m->publish;
else
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m);
......@@ -627,7 +639,7 @@ int MQTTProtocol_handlePubrels(void* pack, SOCKET sock)
* @param sock the socket on which the packet was received
* @return completion code
*/
int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock)
int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock, Publications** pubToRemove)
{
Pubcomp* pubcomp = (Pubcomp*)pack;
Clients* client = NULL;
......@@ -662,7 +674,10 @@ int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock)
if (rc != 0)
Log(LOG_ERROR, -1, "Error removing PUBCOMP for client id %s msgid %d from persistence", client->clientID, pubcomp->msgId);
#endif
MQTTProtocol_removePublication(m->publish);
if (pubToRemove != NULL)
*pubToRemove = m->publish;
else
MQTTProtocol_removePublication(m->publish);
if (m->MQTTVersion >= MQTTVERSION_5)
MQTTProperties_free(&m->properties);
ListRemove(client->outboundMsgs, m);
......
......@@ -40,10 +40,10 @@ void MQTTProtocol_removePublication(Publications* p);
void Protocol_processPublication(Publish* publish, Clients* client, int allocatePayload);
int MQTTProtocol_handlePublishes(void* pack, SOCKET sock);
int MQTTProtocol_handlePubacks(void* pack, SOCKET sock);
int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock);
int MQTTProtocol_handlePubacks(void* pack, SOCKET sock, Publications** pubToRemove);
int MQTTProtocol_handlePubrecs(void* pack, SOCKET sock, Publications** pubToRemove);
int MQTTProtocol_handlePubrels(void* pack, SOCKET sock);
int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock);
int MQTTProtocol_handlePubcomps(void* pack, SOCKET sock, Publications** pubToRemove);
void MQTTProtocol_closeSession(Clients* c, int sendwill);
void MQTTProtocol_keepalive(START_TIME_TYPE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册