45 #include "contiki-net.h"
46 #include "contiki-lib.h"
47 #include "lib/random.h"
51 #include "net/rpl/rpl.h"
56 #include "tcp-socket.h"
58 #include "lib/assert.h"
68 #define PRINTF(...) PRINTF(__VA_ARGS__)
74 MQTT_FHDR_MSG_TYPE_CONNECT = 0x10,
75 MQTT_FHDR_MSG_TYPE_CONNACK = 0x20,
76 MQTT_FHDR_MSG_TYPE_PUBLISH = 0x30,
77 MQTT_FHDR_MSG_TYPE_PUBACK = 0x40,
78 MQTT_FHDR_MSG_TYPE_PUBREC = 0x50,
79 MQTT_FHDR_MSG_TYPE_PUBREL = 0x60,
80 MQTT_FHDR_MSG_TYPE_PUBCOMP = 0x70,
81 MQTT_FHDR_MSG_TYPE_SUBSCRIBE = 0x80,
82 MQTT_FHDR_MSG_TYPE_SUBACK = 0x90,
83 MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE = 0xA0,
84 MQTT_FHDR_MSG_TYPE_UNSUBACK = 0xB0,
85 MQTT_FHDR_MSG_TYPE_PINGREQ = 0xC0,
86 MQTT_FHDR_MSG_TYPE_PINGRESP = 0xD0,
87 MQTT_FHDR_MSG_TYPE_DISCONNECT = 0xE0,
89 MQTT_FHDR_DUP_FLAG = 0x08,
91 MQTT_FHDR_QOS_LEVEL_0 = 0x00,
92 MQTT_FHDR_QOS_LEVEL_1 = 0x02,
93 MQTT_FHDR_QOS_LEVEL_2 = 0x04,
95 MQTT_FHDR_RETAIN_FLAG = 0x01,
99 MQTT_VHDR_USERNAME_FLAG = 0x80,
100 MQTT_VHDR_PASSWORD_FLAG = 0x40,
102 MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
103 MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
104 MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
105 MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
107 MQTT_VHDR_WILL_FLAG = 0x04,
108 MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02,
109 } mqtt_vhdr_conn_fields_t;
112 MQTT_VHDR_CONN_ACCEPTED,
113 MQTT_VHDR_CONN_REJECTED_PROTOCOL,
114 MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
115 MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
116 MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
117 MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
118 } mqtt_vhdr_connack_fields_t;
120 #define MQTT_CONNECT_VHDR_FLAGS_SIZE 12
122 #define MQTT_STRING_LEN_SIZE 2
123 #define MQTT_MID_SIZE 2
124 #define MQTT_QOS_SIZE 1
126 #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10)
128 #define INCREMENT_MID(conn) (conn)->mid_counter += 2
129 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length))
132 #define PT_MQTT_WRITE_BYTES(conn, data, len) \
133 while(write_bytes(conn, data, len)) { \
134 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
137 #define PT_MQTT_WRITE_BYTE(conn, data) \
138 while(write_byte(conn, data)) { \
139 PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
148 #define PT_MQTT_WAIT_SEND() \
150 process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL); \
151 PROCESS_WAIT_EVENT(); \
152 if(ev == mqtt_abort_now_event) { \
153 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
154 PT_EXIT(&conn->out_proto_thread); \
155 process_post(PROCESS_CURRENT(), ev, data); \
156 } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
157 process_post(PROCESS_CURRENT(), ev, data); \
161 static process_event_t mqtt_do_connect_tcp_event;
162 static process_event_t mqtt_do_connect_mqtt_event;
163 static process_event_t mqtt_do_disconnect_mqtt_event;
164 static process_event_t mqtt_do_subscribe_event;
165 static process_event_t mqtt_do_unsubscribe_event;
166 static process_event_t mqtt_do_publish_event;
167 static process_event_t mqtt_do_pingreq_event;
168 static process_event_t mqtt_continue_send_event;
169 static process_event_t mqtt_abort_now_event;
170 process_event_t mqtt_update_event;
177 static process_event_t mqtt_event_min;
178 static process_event_t mqtt_event_max;
182 tcp_input(
struct tcp_socket *s,
void *ptr,
const uint8_t *input_data_ptr,
185 static void tcp_event(
struct tcp_socket *s,
void *ptr,
186 tcp_socket_event_t event);
188 static void reset_packet(
struct mqtt_in_packet *packet);
190 LIST(mqtt_conn_list);
192 PROCESS(mqtt_process,
"MQTT process");
195 call_event(
struct mqtt_connection *conn,
199 conn->event_callback(conn, event, data);
204 reset_defaults(
struct mqtt_connection *conn)
206 conn->mid_counter = 1;
207 PT_INIT(&conn->out_proto_thread);
208 conn->waiting_for_pingresp = 0;
210 reset_packet(&conn->in_packet);
211 conn->out_buffer_sent = 0;
215 abort_connection(
struct mqtt_connection *conn)
217 conn->out_buffer_ptr = conn->out_buffer;
218 conn->out_queue_full = 0;
221 memset(&conn->out_packet, 0,
sizeof(conn->out_packet));
223 tcp_socket_close(&conn->socket);
224 tcp_socket_unregister(&conn->socket);
226 memset(&conn->socket, 0,
sizeof(conn->socket));
228 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
232 connect_tcp(
struct mqtt_connection *conn)
234 conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
236 reset_defaults(conn);
237 tcp_socket_register(&(conn->socket),
240 MQTT_TCP_INPUT_BUFF_SIZE,
242 MQTT_TCP_OUTPUT_BUFF_SIZE,
245 tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
249 disconnect_tcp(
struct mqtt_connection *conn)
251 conn->state = MQTT_CONN_STATE_DISCONNECTING;
252 tcp_socket_close(&(conn->socket));
253 tcp_socket_unregister(&conn->socket);
255 memset(&conn->socket, 0,
sizeof(conn->socket));
259 send_out_buffer(
struct mqtt_connection *conn)
261 if(conn->out_buffer_ptr - conn->out_buffer == 0) {
262 conn->out_buffer_sent = 1;
265 conn->out_buffer_sent = 0;
267 DBG(
"MQTT - (send_out_buffer) Space used in buffer: %i\n",
268 conn->out_buffer_ptr - conn->out_buffer);
270 tcp_socket_send(&conn->socket, conn->out_buffer,
271 conn->out_buffer_ptr - conn->out_buffer);
275 string_to_mqtt_string(
struct mqtt_string *mqtt_string,
char *
string)
277 if(mqtt_string ==
NULL) {
280 mqtt_string->string = string;
283 mqtt_string->length = strlen(
string);
285 mqtt_string->length = 0;
290 write_byte(
struct mqtt_connection *conn, uint8_t data)
292 DBG(
"MQTT - (write_byte) buff_size: %i write: '%02X'\n",
293 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
296 if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
297 send_out_buffer(conn);
301 *conn->out_buffer_ptr = data;
302 conn->out_buffer_ptr++;
307 write_bytes(
struct mqtt_connection *conn, uint8_t *data, uint16_t len)
309 uint16_t write_bytes;
311 MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
312 len - conn->out_write_pos);
314 memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
315 conn->out_write_pos += write_bytes;
316 conn->out_buffer_ptr += write_bytes;
318 DBG(
"MQTT - (write_bytes) len: %u write_pos: %lu\n", len,
319 conn->out_write_pos);
321 if(len - conn->out_write_pos == 0) {
322 conn->out_write_pos = 0;
325 send_out_buffer(conn);
326 return len - conn->out_write_pos;
331 encode_remaining_length(uint8_t *remaining_length,
332 uint8_t *remaining_length_bytes,
337 DBG(
"MQTT - Encoding length %lu\n", length);
339 *remaining_length_bytes = 0;
341 digit = length % 128;
342 length = length / 128;
344 digit = digit | 0x80;
347 remaining_length[*remaining_length_bytes] = digit;
348 (*remaining_length_bytes)++;
349 DBG(
"MQTT - Encode len digit '%u' length '%lu'\n", digit, length);
350 }
while(length > 0 && *remaining_length_bytes < 5);
351 DBG(
"MQTT - remaining_length_bytes %u\n", *remaining_length_bytes);
355 keep_alive_callback(
void *ptr)
357 struct mqtt_connection *conn = ptr;
359 DBG(
"MQTT - (keep_alive_callback) Called!\n");
362 if(conn->waiting_for_pingresp) {
363 PRINTF(
"MQTT - Disconnect due to no PINGRESP from broker.\n");
364 disconnect_tcp(conn);
368 process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
372 reset_packet(
struct mqtt_in_packet *packet)
374 memset(packet, 0,
sizeof(
struct mqtt_in_packet));
375 packet->remaining_multiplier = 1;
379 PT_THREAD(connect_pt(
struct pt *pt,
struct mqtt_connection *conn))
383 DBG(
"MQTT - Sending CONNECT message...\n");
386 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
387 conn->out_packet.remaining_length = 0;
388 conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_FLAGS_SIZE;
389 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
390 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
391 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
392 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
393 conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
394 encode_remaining_length(conn->out_packet.remaining_length_enc,
395 &conn->out_packet.remaining_length_enc_bytes,
396 conn->out_packet.remaining_length);
397 if(conn->out_packet.remaining_length_enc_bytes > 4) {
398 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR,
NULL);
399 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
404 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
405 PT_MQTT_WRITE_BYTES(conn,
406 conn->out_packet.remaining_length_enc,
407 conn->out_packet.remaining_length_enc_bytes);
408 PT_MQTT_WRITE_BYTE(conn, 0);
409 PT_MQTT_WRITE_BYTE(conn, 6);
410 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, 6);
411 PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
412 PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
413 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
414 PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
415 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length << 8);
416 PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
417 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
418 conn->client_id.length);
419 if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
420 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length << 8);
421 PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
422 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
423 conn->will.topic.length);
424 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length << 8);
425 PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
426 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
427 conn->will.message.length);
428 DBG(
"MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
429 conn->will.topic.string,
430 conn->will.topic.length,
431 conn->will.message.string,
432 conn->will.message.length);
434 if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
435 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length << 8);
436 PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
437 PT_MQTT_WRITE_BYTES(conn,
438 (uint8_t *)conn->credentials.username.string,
439 conn->credentials.username.length);
441 if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
442 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length << 8);
443 PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
444 PT_MQTT_WRITE_BYTES(conn,
445 (uint8_t *)conn->credentials.password.string,
446 conn->credentials.password.length);
450 send_out_buffer(conn);
451 conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
453 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
456 reset_packet(&conn->in_packet);
457 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
460 DBG(
"Timeout waiting for CONNACK\n");
464 reset_packet(&conn->in_packet);
466 DBG(
"MQTT - Done sending CONNECT\n");
469 DBG(
"MQTT - CONNECT message sent: \n");
471 for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
472 DBG(
"%02X ", conn->out_buffer[i]);
481 PT_THREAD(disconnect_pt(
struct pt *pt,
struct mqtt_connection *conn))
485 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
486 PT_MQTT_WRITE_BYTE(conn, 0);
488 send_out_buffer(conn);
502 PT_THREAD(subscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
506 DBG(
"MQTT - Sending subscribe message! topic %s topic_length %i\n",
507 conn->out_packet.topic,
508 conn->out_packet.topic_length);
509 DBG(
"MQTT - Buffer space is %i \n",
510 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
513 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
514 conn->out_packet.remaining_length = MQTT_MID_SIZE +
515 MQTT_STRING_LEN_SIZE +
516 conn->out_packet.topic_length +
518 encode_remaining_length(conn->out_packet.remaining_length_enc,
519 &conn->out_packet.remaining_length_enc_bytes,
520 conn->out_packet.remaining_length);
521 if(conn->out_packet.remaining_length_enc_bytes > 4) {
522 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR,
NULL);
523 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
528 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
529 PT_MQTT_WRITE_BYTES(conn,
530 conn->out_packet.remaining_length_enc,
531 conn->out_packet.remaining_length_enc_bytes);
533 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
534 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
536 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
537 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
538 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
539 conn->out_packet.topic_length);
540 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
543 send_out_buffer(conn);
544 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
547 reset_packet(&conn->in_packet);
548 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
552 DBG(
"Timeout waiting for SUBACK\n");
554 reset_packet(&conn->in_packet);
557 conn->out_queue_full = 0;
559 DBG(
"MQTT - Done in send_subscribe!\n");
565 PT_THREAD(unsubscribe_pt(
struct pt *pt,
struct mqtt_connection *conn))
569 DBG(
"MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
570 conn->out_packet.topic,
571 conn->out_packet.topic_length);
572 DBG(
"MQTT - Buffer space is %i \n",
573 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
576 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
577 MQTT_FHDR_QOS_LEVEL_1;
578 conn->out_packet.remaining_length = MQTT_MID_SIZE +
579 MQTT_STRING_LEN_SIZE +
580 conn->out_packet.topic_length;
581 encode_remaining_length(conn->out_packet.remaining_length_enc,
582 &conn->out_packet.remaining_length_enc_bytes,
583 conn->out_packet.remaining_length);
584 if(conn->out_packet.remaining_length_enc_bytes > 4) {
585 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR,
NULL);
586 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
591 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
592 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
593 conn->out_packet.remaining_length_enc_bytes);
595 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
596 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
598 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
599 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
600 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
601 conn->out_packet.topic_length);
604 send_out_buffer(conn);
605 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
608 reset_packet(&conn->in_packet);
609 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
613 DBG(
"Timeout waiting for UNSUBACK\n");
616 reset_packet(&conn->in_packet);
619 conn->out_queue_full = 0;
621 DBG(
"MQTT - Done writing subscribe message to out buffer!\n");
627 PT_THREAD(publish_pt(
struct pt *pt,
struct mqtt_connection *conn))
631 DBG(
"MQTT - Sending publish message! topic %s topic_length %i\n",
632 conn->out_packet.topic,
633 conn->out_packet.topic_length);
634 DBG(
"MQTT - Buffer space is %i \n",
635 &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
638 conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
639 conn->out_packet.qos << 1;
640 if(conn->out_packet.retain == MQTT_RETAIN_ON) {
641 conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
643 conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
644 conn->out_packet.topic_length +
645 conn->out_packet.payload_size;
646 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
647 conn->out_packet.remaining_length += MQTT_MID_SIZE;
649 encode_remaining_length(conn->out_packet.remaining_length_enc,
650 &conn->out_packet.remaining_length_enc_bytes,
651 conn->out_packet.remaining_length);
652 if(conn->out_packet.remaining_length_enc_bytes > 4) {
653 call_event(conn, MQTT_EVENT_PROTOCOL_ERROR,
NULL);
654 PRINTF(
"MQTT - Error, remaining length > 4 bytes\n");
659 PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
660 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
661 conn->out_packet.remaining_length_enc_bytes);
663 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
664 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
665 PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
666 conn->out_packet.topic_length);
667 if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
668 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
669 PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
672 PT_MQTT_WRITE_BYTES(conn,
673 conn->out_packet.payload,
674 conn->out_packet.payload_size);
676 send_out_buffer(conn);
677 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
685 if(conn->out_packet.qos == 0) {
687 }
else if(conn->out_packet.qos == 1) {
689 reset_packet(&conn->in_packet);
690 PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
693 DBG(
"Timeout waiting for PUBACK\n");
695 if(conn->in_packet.mid != conn->out_packet.mid) {
696 DBG(
"MQTT - Warning, got PUBACK with none matching MID. Currently there "
697 "is no support for several concurrent PUBLISH messages.\n");
699 }
else if(conn->out_packet.qos == 2) {
700 DBG(
"MQTT - QoS not implemented yet.\n");
704 reset_packet(&conn->in_packet);
707 conn->out_queue_full = 0;
709 DBG(
"MQTT - Publish Enqueued\n");
715 PT_THREAD(pingreq_pt(
struct pt *pt,
struct mqtt_connection *conn))
719 DBG(
"MQTT - Sending PINGREQ\n");
722 PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
723 PT_MQTT_WRITE_BYTE(conn, 0);
725 send_out_buffer(conn);
728 conn->waiting_for_pingresp = 1;
731 reset_packet(&conn->in_packet);
732 timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
736 reset_packet(&conn->in_packet);
738 conn->waiting_for_pingresp = 0;
744 handle_connack(
struct mqtt_connection *conn)
746 DBG(
"MQTT - Got CONNACK\n");
748 if(conn->in_packet.payload[1] != 0) {
749 PRINTF(
"MQTT - Connection refused with Return Code %i\n",
750 conn->in_packet.payload[1]);
752 MQTT_EVENT_CONNECTION_REFUSED_ERROR,
753 &conn->in_packet.payload[1]);
756 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
759 keep_alive_callback, conn);
762 conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
763 call_event(conn, MQTT_EVENT_CONNECTED,
NULL);
767 handle_pingresp(
struct mqtt_connection *conn)
769 DBG(
"MQTT - Got RINGRESP\n");
773 handle_suback(
struct mqtt_connection *conn)
775 struct mqtt_suback_event suback_event;
777 DBG(
"MQTT - Got SUBACK\n");
780 if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
781 MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
782 DBG(
"MQTT - Error, SUBACK with > 1 topic, not supported.\n");
785 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
787 suback_event.mid = (conn->in_packet.payload[0] << 8) |
788 (conn->in_packet.payload[1]);
789 suback_event.qos_level = conn->in_packet.payload[2];
790 conn->in_packet.mid = suback_event.mid;
792 if(conn->in_packet.mid != conn->out_packet.mid) {
793 DBG(
"MQTT - Warning, got SUBACK with none matching MID. Currently there is"
794 "no support for several concurrent SUBSCRIBE messages.\n");
798 call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
802 handle_unsuback(
struct mqtt_connection *conn)
804 DBG(
"MQTT - Got UNSUBACK\n");
806 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
807 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
808 (conn->in_packet.payload[1]);
810 if(conn->in_packet.mid != conn->out_packet.mid) {
811 DBG(
"MQTT - Warning, got UNSUBACK with none matching MID. Currently there is"
812 "no support for several concurrent UNSUBSCRIBE messages.\n");
815 call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
819 handle_puback(
struct mqtt_connection *conn)
821 DBG(
"MQTT - Got PUBACK\n");
823 conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
824 conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
825 (conn->in_packet.payload[1]);
827 call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
831 handle_publish(
struct mqtt_connection *conn)
833 DBG(
"MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
834 DBG(
"MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
836 DBG(
"MQTT - This chunk is %i bytes\n", conn->in_packet.payload_pos);
838 if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) {
839 PRINTF(
"MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
842 call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
844 if(conn->in_publish_msg.first_chunk == 1) {
845 conn->in_publish_msg.first_chunk = 0;
849 if(conn->in_publish_msg.payload_left == 0) {
854 DBG(
"MQTT - (handle_publish) resetting packet.\n");
855 reset_packet(&conn->in_packet);
860 parse_publish_vhdr(
struct mqtt_connection *conn,
862 const uint8_t *input_data_ptr,
868 if(conn->in_packet.topic_len_received == 0) {
869 conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
870 conn->in_packet.byte_counter++;
871 if(*pos >= input_data_len) {
874 conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
875 conn->in_packet.byte_counter++;
876 conn->in_packet.topic_len_received = 1;
878 DBG(
"MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
883 if(conn->in_packet.topic_len_received == 1 &&
884 conn->in_packet.topic_received == 0) {
885 copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
886 input_data_len - *pos);
887 DBG(
"MQTT - topic_pos: %i copy_bytes: %i", conn->in_packet.topic_pos,
889 memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
890 &input_data_ptr[*pos],
892 (*pos) += copy_bytes;
893 conn->in_packet.byte_counter += copy_bytes;
894 conn->in_packet.topic_pos += copy_bytes;
896 if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
897 DBG(
"MQTT - Got topic '%s'", conn->in_publish_msg.topic);
898 conn->in_packet.topic_received = 1;
899 conn->in_publish_msg.topic[conn->in_packet.topic_pos] =
'\0';
900 conn->in_publish_msg.payload_length =
901 conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
902 conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
906 conn->in_publish_msg.first_chunk = 1;
911 tcp_input(
struct tcp_socket *s,
913 const uint8_t *input_data_ptr,
916 struct mqtt_connection *conn = ptr;
918 uint32_t copy_bytes = 0;
921 if(input_data_len == 0) {
925 if(conn->in_packet.packet_received) {
926 reset_packet(&conn->in_packet);
929 DBG(
"tcp_input with %i bytes of data:\n", input_data_len);
932 if(!conn->in_packet.fhdr) {
933 conn->in_packet.fhdr = input_data_ptr[pos++];
934 conn->in_packet.byte_counter++;
936 DBG(
"MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
938 if(pos >= input_data_len) {
944 if(!conn->in_packet.has_remaining_length) {
946 if(pos >= input_data_len) {
950 byte = input_data_ptr[pos++];
951 conn->in_packet.byte_counter++;
952 conn->in_packet.remaining_length_bytes++;
953 DBG(
"MQTT - Read Remaining Length byte\n");
955 if(conn->in_packet.byte_counter > 5) {
956 call_event(conn, MQTT_EVENT_ERROR,
NULL);
957 DBG(
"Received more then 4 byte 'remaining lenght'.");
961 conn->in_packet.remaining_length +=
962 (byte & 127) * conn->in_packet.remaining_multiplier;
963 conn->in_packet.remaining_multiplier *= 128;
964 }
while((byte & 128) != 0);
966 DBG(
"MQTT - Finished reading remaining length byte\n");
967 conn->in_packet.has_remaining_length = 1;
976 if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
977 (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
979 PRINTF(
"MQTT - Error, unsupported payload size for non-PUBLISH message\n");
981 conn->in_packet.byte_counter += input_data_len;
982 if(conn->in_packet.byte_counter >=
983 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
984 conn->in_packet.packet_received = 1;
995 while(conn->in_packet.byte_counter <
996 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
998 if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
999 conn->in_packet.topic_received == 0) {
1000 parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1004 copy_bytes = MIN(input_data_len - pos,
1005 MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1006 DBG(
"- Copied %lu payload bytes\n", copy_bytes);
1007 memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1008 &input_data_ptr[pos],
1010 conn->in_packet.byte_counter += copy_bytes;
1011 conn->in_packet.payload_pos += copy_bytes;
1015 DBG(
"MQTT - Copied bytes: \n");
1016 for(i = 0; i < copy_bytes; i++) {
1017 DBG(
"%02X ", conn->in_packet.payload[i]);
1022 if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1023 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1024 conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1025 conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1027 handle_publish(conn);
1029 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1030 conn->in_packet.payload_pos = 0;
1033 if(pos >= input_data_len &&
1034 (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1042 DBG(
"MQTT - Finished reading packet!\n");
1044 DBG(
"MQTT - total data was %i bytes of data. \n",
1045 (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1048 switch(conn->in_packet.fhdr & 0xF0) {
1049 case MQTT_FHDR_MSG_TYPE_CONNACK:
1050 handle_connack(conn);
1052 case MQTT_FHDR_MSG_TYPE_PUBLISH:
1054 conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1055 conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1056 conn->in_publish_msg.payload_left = 0;
1057 handle_publish(conn);
1059 case MQTT_FHDR_MSG_TYPE_PUBACK:
1060 handle_puback(conn);
1062 case MQTT_FHDR_MSG_TYPE_SUBACK:
1063 handle_suback(conn);
1065 case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1066 handle_unsuback(conn);
1068 case MQTT_FHDR_MSG_TYPE_PINGRESP:
1069 handle_pingresp(conn);
1073 case MQTT_FHDR_MSG_TYPE_PUBREC:
1074 case MQTT_FHDR_MSG_TYPE_PUBREL:
1075 case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1076 call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR,
NULL);
1077 PRINTF(
"MQTT - Got unhandled MQTT Message Type '%i'",
1078 (conn->in_packet.fhdr & 0xF0));
1083 PRINTF(
"MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1087 conn->in_packet.packet_received = 1;
1096 tcp_event(
struct tcp_socket *s,
void *ptr, tcp_socket_event_t event)
1098 struct mqtt_connection *conn = ptr;
1104 case TCP_SOCKET_CLOSED:
1105 case TCP_SOCKET_TIMEDOUT:
1106 case TCP_SOCKET_ABORTED: {
1108 DBG(
"MQTT - Disconnected by tcp event %d\n", event);
1109 process_post(&mqtt_process, mqtt_abort_now_event, conn);
1110 conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1112 call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1113 abort_connection(conn);
1116 if(conn->auto_reconnect == 1) {
1121 case TCP_SOCKET_CONNECTED: {
1122 conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1123 conn->out_buffer_sent = 1;
1125 process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1128 case TCP_SOCKET_DATA_SENT: {
1129 DBG(
"MQTT - Got TCP_DATA_SENT\n");
1131 if(conn->socket.output_data_len == 0) {
1132 conn->out_buffer_sent = 1;
1133 conn->out_buffer_ptr = conn->out_buffer;
1141 DBG(
"MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1149 static struct mqtt_connection *conn;
1156 if(ev == mqtt_abort_now_event) {
1157 DBG(
"MQTT - Abort\n");
1159 conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1161 abort_connection(conn);
1163 if(ev == mqtt_do_connect_tcp_event) {
1165 DBG(
"MQTT - Got mqtt_do_connect_tcp_event!\n");
1168 if(ev == mqtt_do_connect_mqtt_event) {
1170 conn->socket.output_data_max_seg = conn->max_segment_size;
1171 DBG(
"MQTT - Got mqtt_do_connect_mqtt_event!\n");
1173 if(conn->out_buffer_sent == 1) {
1174 PT_INIT(&conn->out_proto_thread);
1175 while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1176 conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1177 PT_MQTT_WAIT_SEND();
1181 if(ev == mqtt_do_disconnect_mqtt_event) {
1183 DBG(
"MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1186 if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1187 if(conn->out_buffer_sent == 1) {
1188 PT_INIT(&conn->out_proto_thread);
1189 while(disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1190 conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1191 PT_MQTT_WAIT_SEND();
1193 abort_connection(conn);
1194 call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1196 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1200 if(ev == mqtt_do_pingreq_event) {
1202 DBG(
"MQTT - Got mqtt_do_pingreq_event!\n");
1204 if(conn->out_buffer_sent == 1 &&
1205 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1206 PT_INIT(&conn->out_proto_thread);
1207 while(pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1208 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1209 PT_MQTT_WAIT_SEND();
1213 if(ev == mqtt_do_subscribe_event) {
1215 DBG(
"MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1217 if(conn->out_buffer_sent == 1 &&
1218 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1219 PT_INIT(&conn->out_proto_thread);
1220 while(subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1221 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1222 PT_MQTT_WAIT_SEND();
1226 if(ev == mqtt_do_unsubscribe_event) {
1228 DBG(
"MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1230 if(conn->out_buffer_sent == 1 &&
1231 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1232 PT_INIT(&conn->out_proto_thread);
1233 while(unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1234 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1235 PT_MQTT_WAIT_SEND();
1239 if(ev == mqtt_do_publish_event) {
1241 DBG(
"MQTT - Got mqtt_do_publish_mqtt_event!\n");
1243 if(conn->out_buffer_sent == 1 &&
1244 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1245 PT_INIT(&conn->out_proto_thread);
1246 while(publish_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1247 conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1248 PT_MQTT_WAIT_SEND();
1259 static uint8_t inited = 0;
1262 mqtt_event_min = mqtt_do_connect_tcp_event;
1272 mqtt_event_max = mqtt_abort_now_event;
1285 uint16_t max_segment_size)
1287 if(strlen(client_id) < 1) {
1288 return MQTT_STATUS_INVALID_ARGS_ERROR;
1292 memset(conn, 0,
sizeof(
struct mqtt_connection));
1293 string_to_mqtt_string(&conn->client_id, client_id);
1294 conn->event_callback = event_callback;
1295 conn->app_process = app_process;
1296 conn->auto_reconnect = 1;
1297 conn->max_segment_size = max_segment_size;
1298 reset_defaults(conn);
1303 DBG(
"MQTT - Registered successfully\n");
1305 return MQTT_STATUS_OK;
1315 uint16_t keep_alive)
1317 uip_ip6addr_t ip6addr;
1322 if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1323 return MQTT_STATUS_OK;
1326 conn->server_host = host;
1327 conn->keep_alive = keep_alive;
1328 conn->server_port = port;
1329 conn->out_buffer_ptr = conn->out_buffer;
1330 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1331 conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1334 uiplib_ip6addrconv(host, &ip6addr);
1343 process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1345 return MQTT_STATUS_OK;
1351 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1355 conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1357 process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1362 mqtt_qos_level_t qos_level)
1364 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1365 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1368 DBG(
"MQTT - Call to mqtt_subscribe...\n");
1371 if(conn->out_queue_full) {
1372 DBG(
"MQTT - Not accepted!\n");
1373 return MQTT_STATUS_OUT_QUEUE_FULL;
1375 conn->out_queue_full = 1;
1376 DBG(
"MQTT - Accepted!\n");
1378 conn->out_packet.mid = INCREMENT_MID(conn);
1379 conn->out_packet.topic = topic;
1380 conn->out_packet.topic_length = strlen(topic);
1381 conn->out_packet.qos = qos_level;
1382 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1384 process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1385 return MQTT_STATUS_OK;
1391 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1392 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1395 DBG(
"MQTT - Call to mqtt_unsubscribe...\n");
1397 if(conn->out_queue_full) {
1398 DBG(
"MQTT - Not accepted!\n");
1399 return MQTT_STATUS_OUT_QUEUE_FULL;
1401 conn->out_queue_full = 1;
1402 DBG(
"MQTT - Accepted!\n");
1404 conn->out_packet.mid = INCREMENT_MID(conn);
1405 conn->out_packet.topic = topic;
1406 conn->out_packet.topic_length = strlen(topic);
1407 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1409 process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1410 return MQTT_STATUS_OK;
1415 uint8_t *payload, uint32_t payload_size,
1416 mqtt_qos_level_t qos_level, mqtt_retain_t retain)
1418 if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1419 return MQTT_STATUS_NOT_CONNECTED_ERROR;
1422 DBG(
"MQTT - Call to mqtt_publish...\n");
1425 if(conn->out_queue_full) {
1426 DBG(
"MQTT - Not accepted!\n");
1427 return MQTT_STATUS_OUT_QUEUE_FULL;
1429 conn->out_queue_full = 1;
1430 DBG(
"MQTT - Accepted!\n");
1432 conn->out_packet.mid = INCREMENT_MID(conn);
1433 conn->out_packet.retain = retain;
1434 conn->out_packet.topic = topic;
1435 conn->out_packet.topic_length = strlen(topic);
1436 conn->out_packet.payload = payload;
1437 conn->out_packet.payload_size = payload_size;
1438 conn->out_packet.qos = qos_level;
1439 conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1441 process_post(&mqtt_process, mqtt_do_publish_event, conn);
1442 return MQTT_STATUS_OK;
1450 string_to_mqtt_string(&conn->credentials.username, username);
1451 string_to_mqtt_string(&conn->credentials.password, password);
1454 if(username !=
NULL) {
1455 conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
1457 conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
1459 if(password !=
NULL) {
1460 conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
1462 conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
1468 mqtt_qos_level_t qos)
1471 string_to_mqtt_string(&conn->will.topic, topic);
1472 string_to_mqtt_string(&conn->will.message, message);
1475 conn->will.qos = qos;
1478 conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
1479 MQTT_VHDR_WILL_RETAIN_FLAG;
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive)
Connects to a MQTT broker.
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
Protothreads implementation.
Header file for the Contiki MQTT engine.
#define PT_INIT(pt)
Initialize a protothread.
#define LIST(name)
Declare a linked list.
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain)
Publish to a MQTT topic.
Header file for IPv6-related data structures.
void mqtt_disconnect(struct mqtt_connection *conn)
Disconnects from a MQTT broker.
Default definitions of C compiler quirk work-arounds.
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
#define PROCESS_END()
Define the end of a process.
#define PROCESS(name, strname)
Declare a process.
#define PROCESS_THREAD(name, ev, data)
Define the body of a process.
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
mqtt_event_t
MQTT engine events.
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
process_event_t process_alloc_event(void)
Allocate a global event number.
void list_init(list_t list)
Initialize a list.
#define PT_EXIT(pt)
Exit the protothread.
Header file for the callback timer
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
#define NULL
The null pointer.
#define PT_THREAD(name_args)
Declaration of a protothread.
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
void list_add(list_t list, void *item)
Add an item at the end of a list.
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
#define CLOCK_SECOND
A second, measured in system clock time.
Linked list manipulation routines.
Header file for the uIP TCP/IP stack.
void process_start(struct process *p, process_data_t data)
Start a process.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
#define PT_END(pt)
Declare the end of a protothread.
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos)
Set the last will topic and message for a MQTT client.
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
Unsubscribes from a MQTT topic.
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level)
Subscribes to a MQTT topic.
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
int timer_expired(struct timer *t)
Check if a timer has expired.
#define PROCESS_BEGIN()
Define the beginning of a process.