Contiki 3.x
mqtt.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015, Texas Instruments Incorporated - http://www.ti.com/
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  * 3. Neither the name of the copyright holder nor the names of its
14  * contributors may be used to endorse or promote products derived
15  * from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
20  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
21  * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
26  * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
28  * OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 /*---------------------------------------------------------------------------*/
31 /**
32  * \addtogroup mqtt-engine
33  * @{
34  */
35 /**
36  * \file
37  * Implementation of the Contiki MQTT engine
38  *
39  * \author
40  * Texas Instruments
41  */
42 /*---------------------------------------------------------------------------*/
43 #include "mqtt.h"
44 #include "contiki.h"
45 #include "contiki-net.h"
46 #include "contiki-lib.h"
47 #include "lib/random.h"
48 #include "sys/ctimer.h"
49 #include "sys/etimer.h"
50 #include "sys/pt.h"
51 #include "net/rpl/rpl.h"
52 #include "net/ip/uip.h"
53 #include "net/ipv6/uip-ds6.h"
54 #include "dev/leds.h"
55 
56 #include "tcp-socket.h"
57 
58 #include "lib/assert.h"
59 #include "lib/list.h"
60 #include "sys/cc.h"
61 
62 #include <stdlib.h>
63 #include <stdio.h>
64 #include <string.h>
65 /*---------------------------------------------------------------------------*/
66 #define DEBUG 0
67 #if DEBUG
68 #define PRINTF(...) PRINTF(__VA_ARGS__)
69 #else
70 #define PRINTF(...)
71 #endif
72 /*---------------------------------------------------------------------------*/
73 typedef enum {
74  MQTT_FHDR_MSG_TYPE_CONNECT = 0x10,
75  MQTT_FHDR_MSG_TYPE_CONNACK = 0x20,
76  MQTT_FHDR_MSG_TYPE_PUBLISH = 0x30,
77  MQTT_FHDR_MSG_TYPE_PUBACK = 0x40,
78  MQTT_FHDR_MSG_TYPE_PUBREC = 0x50,
79  MQTT_FHDR_MSG_TYPE_PUBREL = 0x60,
80  MQTT_FHDR_MSG_TYPE_PUBCOMP = 0x70,
81  MQTT_FHDR_MSG_TYPE_SUBSCRIBE = 0x80,
82  MQTT_FHDR_MSG_TYPE_SUBACK = 0x90,
83  MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE = 0xA0,
84  MQTT_FHDR_MSG_TYPE_UNSUBACK = 0xB0,
85  MQTT_FHDR_MSG_TYPE_PINGREQ = 0xC0,
86  MQTT_FHDR_MSG_TYPE_PINGRESP = 0xD0,
87  MQTT_FHDR_MSG_TYPE_DISCONNECT = 0xE0,
88 
89  MQTT_FHDR_DUP_FLAG = 0x08,
90 
91  MQTT_FHDR_QOS_LEVEL_0 = 0x00,
92  MQTT_FHDR_QOS_LEVEL_1 = 0x02,
93  MQTT_FHDR_QOS_LEVEL_2 = 0x04,
94 
95  MQTT_FHDR_RETAIN_FLAG = 0x01,
96 } mqtt_fhdr_fields_t;
97 /*---------------------------------------------------------------------------*/
98 typedef enum {
99  MQTT_VHDR_USERNAME_FLAG = 0x80,
100  MQTT_VHDR_PASSWORD_FLAG = 0x40,
101 
102  MQTT_VHDR_WILL_RETAIN_FLAG = 0x20,
103  MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00,
104  MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08,
105  MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10,
106 
107  MQTT_VHDR_WILL_FLAG = 0x04,
108  MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02,
109 } mqtt_vhdr_conn_fields_t;
110 /*---------------------------------------------------------------------------*/
111 typedef enum {
112  MQTT_VHDR_CONN_ACCEPTED,
113  MQTT_VHDR_CONN_REJECTED_PROTOCOL,
114  MQTT_VHDR_CONN_REJECTED_IDENTIFIER,
115  MQTT_VHDR_CONN_REJECTED_UNAVAILABLE,
116  MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS,
117  MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED,
118 } mqtt_vhdr_connack_fields_t;
119 /*---------------------------------------------------------------------------*/
120 #define MQTT_CONNECT_VHDR_FLAGS_SIZE 12
121 
122 #define MQTT_STRING_LEN_SIZE 2
123 #define MQTT_MID_SIZE 2
124 #define MQTT_QOS_SIZE 1
125 /*---------------------------------------------------------------------------*/
126 #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10)
127 /*---------------------------------------------------------------------------*/
128 #define INCREMENT_MID(conn) (conn)->mid_counter += 2
129 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length))
130 /*---------------------------------------------------------------------------*/
131 /* Protothread send macros */
132 #define PT_MQTT_WRITE_BYTES(conn, data, len) \
133  while(write_bytes(conn, data, len)) { \
134  PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
135  }
136 
137 #define PT_MQTT_WRITE_BYTE(conn, data) \
138  while(write_byte(conn, data)) { \
139  PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
140  }
141 /*---------------------------------------------------------------------------*/
142 /*
143  * Sends the continue send event and wait for that event.
144  *
145  * The reason we cannot use PROCESS_PAUSE() is since we would risk loosing any
146  * events posted during the sending process.
147  */
148 #define PT_MQTT_WAIT_SEND() \
149  do { \
150  process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL); \
151  PROCESS_WAIT_EVENT(); \
152  if(ev == mqtt_abort_now_event) { \
153  conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
154  PT_EXIT(&conn->out_proto_thread); \
155  process_post(PROCESS_CURRENT(), ev, data); \
156  } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
157  process_post(PROCESS_CURRENT(), ev, data); \
158  } \
159  } while(0)
160 /*---------------------------------------------------------------------------*/
161 static process_event_t mqtt_do_connect_tcp_event;
162 static process_event_t mqtt_do_connect_mqtt_event;
163 static process_event_t mqtt_do_disconnect_mqtt_event;
164 static process_event_t mqtt_do_subscribe_event;
165 static process_event_t mqtt_do_unsubscribe_event;
166 static process_event_t mqtt_do_publish_event;
167 static process_event_t mqtt_do_pingreq_event;
168 static process_event_t mqtt_continue_send_event;
169 static process_event_t mqtt_abort_now_event;
170 process_event_t mqtt_update_event;
171 
172 /*
173  * Min and Max event numbers we want to acknowledge while we're in the process
174  * of doing something else. continue_send does not count, therefore must be
175  * allocated last
176  */
177 static process_event_t mqtt_event_min;
178 static process_event_t mqtt_event_max;
179 /*---------------------------------------------------------------------------*/
180 /* Prototypes */
181 static int
182 tcp_input(struct tcp_socket *s, void *ptr, const uint8_t *input_data_ptr,
183  int input_data_len);
184 
185 static void tcp_event(struct tcp_socket *s, void *ptr,
186  tcp_socket_event_t event);
187 
188 static void reset_packet(struct mqtt_in_packet *packet);
189 /*---------------------------------------------------------------------------*/
190 LIST(mqtt_conn_list);
191 /*---------------------------------------------------------------------------*/
192 PROCESS(mqtt_process, "MQTT process");
193 /*---------------------------------------------------------------------------*/
194 static void
195 call_event(struct mqtt_connection *conn,
196  mqtt_event_t event,
197  void *data)
198 {
199  conn->event_callback(conn, event, data);
200  process_post(conn->app_process, mqtt_update_event, NULL);
201 }
202 /*---------------------------------------------------------------------------*/
203 static void
204 reset_defaults(struct mqtt_connection *conn)
205 {
206  conn->mid_counter = 1;
207  PT_INIT(&conn->out_proto_thread);
208  conn->waiting_for_pingresp = 0;
209 
210  reset_packet(&conn->in_packet);
211  conn->out_buffer_sent = 0;
212 }
213 /*---------------------------------------------------------------------------*/
214 static void
215 abort_connection(struct mqtt_connection *conn)
216 {
217  conn->out_buffer_ptr = conn->out_buffer;
218  conn->out_queue_full = 0;
219 
220  /* Reset outgoing packet */
221  memset(&conn->out_packet, 0, sizeof(conn->out_packet));
222 
223  tcp_socket_close(&conn->socket);
224  tcp_socket_unregister(&conn->socket);
225 
226  memset(&conn->socket, 0, sizeof(conn->socket));
227 
228  conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
229 }
230 /*---------------------------------------------------------------------------*/
231 static void
232 connect_tcp(struct mqtt_connection *conn)
233 {
234  conn->state = MQTT_CONN_STATE_TCP_CONNECTING;
235 
236  reset_defaults(conn);
237  tcp_socket_register(&(conn->socket),
238  conn,
239  conn->in_buffer,
240  MQTT_TCP_INPUT_BUFF_SIZE,
241  conn->out_buffer,
242  MQTT_TCP_OUTPUT_BUFF_SIZE,
243  tcp_input,
244  tcp_event);
245  tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port);
246 }
247 /*---------------------------------------------------------------------------*/
248 static void
249 disconnect_tcp(struct mqtt_connection *conn)
250 {
251  conn->state = MQTT_CONN_STATE_DISCONNECTING;
252  tcp_socket_close(&(conn->socket));
253  tcp_socket_unregister(&conn->socket);
254 
255  memset(&conn->socket, 0, sizeof(conn->socket));
256 }
257 /*---------------------------------------------------------------------------*/
258 static void
259 send_out_buffer(struct mqtt_connection *conn)
260 {
261  if(conn->out_buffer_ptr - conn->out_buffer == 0) {
262  conn->out_buffer_sent = 1;
263  return;
264  }
265  conn->out_buffer_sent = 0;
266 
267  DBG("MQTT - (send_out_buffer) Space used in buffer: %i\n",
268  conn->out_buffer_ptr - conn->out_buffer);
269 
270  tcp_socket_send(&conn->socket, conn->out_buffer,
271  conn->out_buffer_ptr - conn->out_buffer);
272 }
273 /*---------------------------------------------------------------------------*/
274 static void
275 string_to_mqtt_string(struct mqtt_string *mqtt_string, char *string)
276 {
277  if(mqtt_string == NULL) {
278  return;
279  }
280  mqtt_string->string = string;
281 
282  if(string != NULL) {
283  mqtt_string->length = strlen(string);
284  } else {
285  mqtt_string->length = 0;
286  }
287 }
288 /*---------------------------------------------------------------------------*/
289 static int
290 write_byte(struct mqtt_connection *conn, uint8_t data)
291 {
292  DBG("MQTT - (write_byte) buff_size: %i write: '%02X'\n",
293  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
294  data);
295 
296  if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) {
297  send_out_buffer(conn);
298  return 1;
299  }
300 
301  *conn->out_buffer_ptr = data;
302  conn->out_buffer_ptr++;
303  return 0;
304 }
305 /*---------------------------------------------------------------------------*/
306 static int
307 write_bytes(struct mqtt_connection *conn, uint8_t *data, uint16_t len)
308 {
309  uint16_t write_bytes;
310  write_bytes =
311  MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr,
312  len - conn->out_write_pos);
313 
314  memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes);
315  conn->out_write_pos += write_bytes;
316  conn->out_buffer_ptr += write_bytes;
317 
318  DBG("MQTT - (write_bytes) len: %u write_pos: %lu\n", len,
319  conn->out_write_pos);
320 
321  if(len - conn->out_write_pos == 0) {
322  conn->out_write_pos = 0;
323  return 0;
324  } else {
325  send_out_buffer(conn);
326  return len - conn->out_write_pos;
327  }
328 }
329 /*---------------------------------------------------------------------------*/
330 static void
331 encode_remaining_length(uint8_t *remaining_length,
332  uint8_t *remaining_length_bytes,
333  uint32_t length)
334 {
335  uint8_t digit;
336 
337  DBG("MQTT - Encoding length %lu\n", length);
338 
339  *remaining_length_bytes = 0;
340  do {
341  digit = length % 128;
342  length = length / 128;
343  if(length > 0) {
344  digit = digit | 0x80;
345  }
346 
347  remaining_length[*remaining_length_bytes] = digit;
348  (*remaining_length_bytes)++;
349  DBG("MQTT - Encode len digit '%u' length '%lu'\n", digit, length);
350  } while(length > 0 && *remaining_length_bytes < 5);
351  DBG("MQTT - remaining_length_bytes %u\n", *remaining_length_bytes);
352 }
353 /*---------------------------------------------------------------------------*/
354 static void
355 keep_alive_callback(void *ptr)
356 {
357  struct mqtt_connection *conn = ptr;
358 
359  DBG("MQTT - (keep_alive_callback) Called!\n");
360 
361  /* The flag is set when the PINGREQ has been sent */
362  if(conn->waiting_for_pingresp) {
363  PRINTF("MQTT - Disconnect due to no PINGRESP from broker.\n");
364  disconnect_tcp(conn);
365  return;
366  }
367 
368  process_post(&mqtt_process, mqtt_do_pingreq_event, conn);
369 }
370 /*---------------------------------------------------------------------------*/
371 static void
372 reset_packet(struct mqtt_in_packet *packet)
373 {
374  memset(packet, 0, sizeof(struct mqtt_in_packet));
375  packet->remaining_multiplier = 1;
376 }
377 /*---------------------------------------------------------------------------*/
378 static
379 PT_THREAD(connect_pt(struct pt *pt, struct mqtt_connection *conn))
380 {
381  PT_BEGIN(pt);
382 
383  DBG("MQTT - Sending CONNECT message...\n");
384 
385  /* Set up FHDR */
386  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT;
387  conn->out_packet.remaining_length = 0;
388  conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_FLAGS_SIZE;
389  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id);
390  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username);
391  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password);
392  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic);
393  conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message);
394  encode_remaining_length(conn->out_packet.remaining_length_enc,
395  &conn->out_packet.remaining_length_enc_bytes,
396  conn->out_packet.remaining_length);
397  if(conn->out_packet.remaining_length_enc_bytes > 4) {
398  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
399  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
400  PT_EXIT(pt);
401  }
402 
403  /* Write Fixed Header */
404  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
405  PT_MQTT_WRITE_BYTES(conn,
406  conn->out_packet.remaining_length_enc,
407  conn->out_packet.remaining_length_enc_bytes);
408  PT_MQTT_WRITE_BYTE(conn, 0);
409  PT_MQTT_WRITE_BYTE(conn, 6);
410  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, 6);
411  PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION);
412  PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags);
413  PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8));
414  PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF));
415  PT_MQTT_WRITE_BYTE(conn, conn->client_id.length << 8);
416  PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF);
417  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string,
418  conn->client_id.length);
419  if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) {
420  PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length << 8);
421  PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF);
422  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string,
423  conn->will.topic.length);
424  PT_MQTT_WRITE_BYTE(conn, conn->will.message.length << 8);
425  PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF);
426  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string,
427  conn->will.message.length);
428  DBG("MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n",
429  conn->will.topic.string,
430  conn->will.topic.length,
431  conn->will.message.string,
432  conn->will.message.length);
433  }
434  if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) {
435  PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length << 8);
436  PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF);
437  PT_MQTT_WRITE_BYTES(conn,
438  (uint8_t *)conn->credentials.username.string,
439  conn->credentials.username.length);
440  }
441  if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) {
442  PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length << 8);
443  PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF);
444  PT_MQTT_WRITE_BYTES(conn,
445  (uint8_t *)conn->credentials.password.string,
446  conn->credentials.password.length);
447  }
448 
449  /* Send out buffer */
450  send_out_buffer(conn);
451  conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER;
452 
453  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
454 
455  /* Wait for CONNACK */
456  reset_packet(&conn->in_packet);
457  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
458  timer_expired(&conn->t));
459  if(timer_expired(&conn->t)) {
460  DBG("Timeout waiting for CONNACK\n");
461  /* We stick to the letter of the spec here: Tear the connection down */
462  mqtt_disconnect(conn);
463  }
464  reset_packet(&conn->in_packet);
465 
466  DBG("MQTT - Done sending CONNECT\n");
467 
468 #if DEBUG_MQTT == 1
469  DBG("MQTT - CONNECT message sent: \n");
470  uint16_t i;
471  for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) {
472  DBG("%02X ", conn->out_buffer[i]);
473  }
474  DBG("\n");
475 #endif
476 
477  PT_END(pt);
478 }
479 /*---------------------------------------------------------------------------*/
480 static
481 PT_THREAD(disconnect_pt(struct pt *pt, struct mqtt_connection *conn))
482 {
483  PT_BEGIN(pt);
484 
485  PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT);
486  PT_MQTT_WRITE_BYTE(conn, 0);
487 
488  send_out_buffer(conn);
489 
490  /*
491  * Wait a couple of seconds for a TCP ACK. We don't really need the ACK,
492  * we do want the TCP/IP stack to actually send this disconnect before we
493  * tear down the session.
494  */
495  timer_set(&conn->t, (CLOCK_SECOND * 2));
496  PT_WAIT_UNTIL(pt, conn->out_buffer_sent || timer_expired(&conn->t));
497 
498  PT_END(pt);
499 }
500 /*---------------------------------------------------------------------------*/
501 static
502 PT_THREAD(subscribe_pt(struct pt *pt, struct mqtt_connection *conn))
503 {
504  PT_BEGIN(pt);
505 
506  DBG("MQTT - Sending subscribe message! topic %s topic_length %i\n",
507  conn->out_packet.topic,
508  conn->out_packet.topic_length);
509  DBG("MQTT - Buffer space is %i \n",
510  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
511 
512  /* Set up FHDR */
513  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1;
514  conn->out_packet.remaining_length = MQTT_MID_SIZE +
515  MQTT_STRING_LEN_SIZE +
516  conn->out_packet.topic_length +
517  MQTT_QOS_SIZE;
518  encode_remaining_length(conn->out_packet.remaining_length_enc,
519  &conn->out_packet.remaining_length_enc_bytes,
520  conn->out_packet.remaining_length);
521  if(conn->out_packet.remaining_length_enc_bytes > 4) {
522  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
523  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
524  PT_EXIT(pt);
525  }
526 
527  /* Write Fixed Header */
528  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
529  PT_MQTT_WRITE_BYTES(conn,
530  conn->out_packet.remaining_length_enc,
531  conn->out_packet.remaining_length_enc_bytes);
532  /* Write Variable Header */
533  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
534  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
535  /* Write Payload */
536  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
537  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
538  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
539  conn->out_packet.topic_length);
540  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos);
541 
542  /* Send out buffer */
543  send_out_buffer(conn);
544  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
545 
546  /* Wait for SUBACK. */
547  reset_packet(&conn->in_packet);
548  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
549  timer_expired(&conn->t));
550 
551  if(timer_expired(&conn->t)) {
552  DBG("Timeout waiting for SUBACK\n");
553  }
554  reset_packet(&conn->in_packet);
555 
556  /* This is clear after the entire transaction is complete */
557  conn->out_queue_full = 0;
558 
559  DBG("MQTT - Done in send_subscribe!\n");
560 
561  PT_END(pt);
562 }
563 /*---------------------------------------------------------------------------*/
564 static
565 PT_THREAD(unsubscribe_pt(struct pt *pt, struct mqtt_connection *conn))
566 {
567  PT_BEGIN(pt);
568 
569  DBG("MQTT - Sending unsubscribe message on topic %s topic_length %i\n",
570  conn->out_packet.topic,
571  conn->out_packet.topic_length);
572  DBG("MQTT - Buffer space is %i \n",
573  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
574 
575  /* Set up FHDR */
576  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE |
577  MQTT_FHDR_QOS_LEVEL_1;
578  conn->out_packet.remaining_length = MQTT_MID_SIZE +
579  MQTT_STRING_LEN_SIZE +
580  conn->out_packet.topic_length;
581  encode_remaining_length(conn->out_packet.remaining_length_enc,
582  &conn->out_packet.remaining_length_enc_bytes,
583  conn->out_packet.remaining_length);
584  if(conn->out_packet.remaining_length_enc_bytes > 4) {
585  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
586  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
587  PT_EXIT(pt);
588  }
589 
590  /* Write Fixed Header */
591  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
592  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
593  conn->out_packet.remaining_length_enc_bytes);
594  /* Write Variable Header */
595  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
596  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
597  /* Write Payload */
598  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
599  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
600  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
601  conn->out_packet.topic_length);
602 
603  /* Send out buffer */
604  send_out_buffer(conn);
605  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
606 
607  /* Wait for UNSUBACK */
608  reset_packet(&conn->in_packet);
609  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
610  timer_expired(&conn->t));
611 
612  if(timer_expired(&conn->t)) {
613  DBG("Timeout waiting for UNSUBACK\n");
614  }
615 
616  reset_packet(&conn->in_packet);
617 
618  /* This is clear after the entire transaction is complete */
619  conn->out_queue_full = 0;
620 
621  DBG("MQTT - Done writing subscribe message to out buffer!\n");
622 
623  PT_END(pt);
624 }
625 /*---------------------------------------------------------------------------*/
626 static
627 PT_THREAD(publish_pt(struct pt *pt, struct mqtt_connection *conn))
628 {
629  PT_BEGIN(pt);
630 
631  DBG("MQTT - Sending publish message! topic %s topic_length %i\n",
632  conn->out_packet.topic,
633  conn->out_packet.topic_length);
634  DBG("MQTT - Buffer space is %i \n",
635  &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr);
636 
637  /* Set up FHDR */
638  conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH |
639  conn->out_packet.qos << 1;
640  if(conn->out_packet.retain == MQTT_RETAIN_ON) {
641  conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG;
642  }
643  conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE +
644  conn->out_packet.topic_length +
645  conn->out_packet.payload_size;
646  if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
647  conn->out_packet.remaining_length += MQTT_MID_SIZE;
648  }
649  encode_remaining_length(conn->out_packet.remaining_length_enc,
650  &conn->out_packet.remaining_length_enc_bytes,
651  conn->out_packet.remaining_length);
652  if(conn->out_packet.remaining_length_enc_bytes > 4) {
653  call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL);
654  PRINTF("MQTT - Error, remaining length > 4 bytes\n");
655  PT_EXIT(pt);
656  }
657 
658  /* Write Fixed Header */
659  PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr);
660  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc,
661  conn->out_packet.remaining_length_enc_bytes);
662  /* Write Variable Header */
663  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8));
664  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF));
665  PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic,
666  conn->out_packet.topic_length);
667  if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) {
668  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8));
669  PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF));
670  }
671  /* Write Payload */
672  PT_MQTT_WRITE_BYTES(conn,
673  conn->out_packet.payload,
674  conn->out_packet.payload_size);
675 
676  send_out_buffer(conn);
677  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
678 
679  /*
680  * If QoS is zero then wait until the message has been sent, since there is
681  * no ACK to wait for.
682  *
683  * Also notify the app will not be notified via PUBACK or PUBCOMP
684  */
685  if(conn->out_packet.qos == 0) {
686  process_post(conn->app_process, mqtt_update_event, NULL);
687  } else if(conn->out_packet.qos == 1) {
688  /* Wait for PUBACK */
689  reset_packet(&conn->in_packet);
690  PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK ||
691  timer_expired(&conn->t));
692  if(timer_expired(&conn->t)) {
693  DBG("Timeout waiting for PUBACK\n");
694  }
695  if(conn->in_packet.mid != conn->out_packet.mid) {
696  DBG("MQTT - Warning, got PUBACK with none matching MID. Currently there "
697  "is no support for several concurrent PUBLISH messages.\n");
698  }
699  } else if(conn->out_packet.qos == 2) {
700  DBG("MQTT - QoS not implemented yet.\n");
701  /* Should wait for PUBREC, send PUBREL and then wait for PUBCOMP */
702  }
703 
704  reset_packet(&conn->in_packet);
705 
706  /* This is clear after the entire transaction is complete */
707  conn->out_queue_full = 0;
708 
709  DBG("MQTT - Publish Enqueued\n");
710 
711  PT_END(pt);
712 }
713 /*---------------------------------------------------------------------------*/
714 static
715 PT_THREAD(pingreq_pt(struct pt *pt, struct mqtt_connection *conn))
716 {
717  PT_BEGIN(pt);
718 
719  DBG("MQTT - Sending PINGREQ\n");
720 
721  /* Write Fixed Header */
722  PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ);
723  PT_MQTT_WRITE_BYTE(conn, 0);
724 
725  send_out_buffer(conn);
726 
727  /* Start timeout for reply. */
728  conn->waiting_for_pingresp = 1;
729 
730  /* Wait for PINGRESP or timeout */
731  reset_packet(&conn->in_packet);
732  timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT);
733 
734  PT_WAIT_UNTIL(pt, conn->in_packet.packet_received || timer_expired(&conn->t));
735 
736  reset_packet(&conn->in_packet);
737 
738  conn->waiting_for_pingresp = 0;
739 
740  PT_END(pt);
741 }
742 /*---------------------------------------------------------------------------*/
743 static void
744 handle_connack(struct mqtt_connection *conn)
745 {
746  DBG("MQTT - Got CONNACK\n");
747 
748  if(conn->in_packet.payload[1] != 0) {
749  PRINTF("MQTT - Connection refused with Return Code %i\n",
750  conn->in_packet.payload[1]);
751  call_event(conn,
752  MQTT_EVENT_CONNECTION_REFUSED_ERROR,
753  &conn->in_packet.payload[1]);
754  }
755 
756  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
757 
758  ctimer_set(&conn->keep_alive_timer, conn->keep_alive * CLOCK_SECOND,
759  keep_alive_callback, conn);
760 
761  /* Always reset packet before callback since it might be used directly */
762  conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER;
763  call_event(conn, MQTT_EVENT_CONNECTED, NULL);
764 }
765 /*---------------------------------------------------------------------------*/
766 static void
767 handle_pingresp(struct mqtt_connection *conn)
768 {
769  DBG("MQTT - Got RINGRESP\n");
770 }
771 /*---------------------------------------------------------------------------*/
772 static void
773 handle_suback(struct mqtt_connection *conn)
774 {
775  struct mqtt_suback_event suback_event;
776 
777  DBG("MQTT - Got SUBACK\n");
778 
779  /* Only accept SUBACKS with X topic QoS response, assume 1 */
780  if(conn->in_packet.remaining_length > MQTT_MID_SIZE +
781  MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) {
782  DBG("MQTT - Error, SUBACK with > 1 topic, not supported.\n");
783  }
784 
785  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
786 
787  suback_event.mid = (conn->in_packet.payload[0] << 8) |
788  (conn->in_packet.payload[1]);
789  suback_event.qos_level = conn->in_packet.payload[2];
790  conn->in_packet.mid = suback_event.mid;
791 
792  if(conn->in_packet.mid != conn->out_packet.mid) {
793  DBG("MQTT - Warning, got SUBACK with none matching MID. Currently there is"
794  "no support for several concurrent SUBSCRIBE messages.\n");
795  }
796 
797  /* Always reset packet before callback since it might be used directly */
798  call_event(conn, MQTT_EVENT_SUBACK, &suback_event);
799 }
800 /*---------------------------------------------------------------------------*/
801 static void
802 handle_unsuback(struct mqtt_connection *conn)
803 {
804  DBG("MQTT - Got UNSUBACK\n");
805 
806  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
807  conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
808  (conn->in_packet.payload[1]);
809 
810  if(conn->in_packet.mid != conn->out_packet.mid) {
811  DBG("MQTT - Warning, got UNSUBACK with none matching MID. Currently there is"
812  "no support for several concurrent UNSUBSCRIBE messages.\n");
813  }
814 
815  call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid);
816 }
817 /*---------------------------------------------------------------------------*/
818 static void
819 handle_puback(struct mqtt_connection *conn)
820 {
821  DBG("MQTT - Got PUBACK\n");
822 
823  conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK;
824  conn->in_packet.mid = (conn->in_packet.payload[0] << 8) |
825  (conn->in_packet.payload[1]);
826 
827  call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid);
828 }
829 /*---------------------------------------------------------------------------*/
830 static void
831 handle_publish(struct mqtt_connection *conn)
832 {
833  DBG("MQTT - Got PUBLISH, called once per manageable chunk of message.\n");
834  DBG("MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic);
835 
836  DBG("MQTT - This chunk is %i bytes\n", conn->in_packet.payload_pos);
837 
838  if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) {
839  PRINTF("MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n");
840  }
841 
842  call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg);
843 
844  if(conn->in_publish_msg.first_chunk == 1) {
845  conn->in_publish_msg.first_chunk = 0;
846  }
847 
848  /* If this is the last time handle_publish will be called, reset packet. */
849  if(conn->in_publish_msg.payload_left == 0) {
850 
851  /* Check for QoS and initiate the reply, do not rely on the data in the
852  * in_packet being untouched. */
853 
854  DBG("MQTT - (handle_publish) resetting packet.\n");
855  reset_packet(&conn->in_packet);
856  }
857 }
858 /*---------------------------------------------------------------------------*/
859 static void
860 parse_publish_vhdr(struct mqtt_connection *conn,
861  uint32_t *pos,
862  const uint8_t *input_data_ptr,
863  int input_data_len)
864 {
865  uint16_t copy_bytes;
866 
867  /* Read out topic length */
868  if(conn->in_packet.topic_len_received == 0) {
869  conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8);
870  conn->in_packet.byte_counter++;
871  if(*pos >= input_data_len) {
872  return;
873  }
874  conn->in_packet.topic_len |= input_data_ptr[(*pos)++];
875  conn->in_packet.byte_counter++;
876  conn->in_packet.topic_len_received = 1;
877 
878  DBG("MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len);
879  /* WARNING: Check here if TOPIC fits in payload area, otherwise error */
880  }
881 
882  /* Read out topic */
883  if(conn->in_packet.topic_len_received == 1 &&
884  conn->in_packet.topic_received == 0) {
885  copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos,
886  input_data_len - *pos);
887  DBG("MQTT - topic_pos: %i copy_bytes: %i", conn->in_packet.topic_pos,
888  copy_bytes);
889  memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos],
890  &input_data_ptr[*pos],
891  copy_bytes);
892  (*pos) += copy_bytes;
893  conn->in_packet.byte_counter += copy_bytes;
894  conn->in_packet.topic_pos += copy_bytes;
895 
896  if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) {
897  DBG("MQTT - Got topic '%s'", conn->in_publish_msg.topic);
898  conn->in_packet.topic_received = 1;
899  conn->in_publish_msg.topic[conn->in_packet.topic_pos] = '\0';
900  conn->in_publish_msg.payload_length =
901  conn->in_packet.remaining_length - conn->in_packet.topic_len - 2;
902  conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length;
903  }
904 
905  /* Set this once per incomming publish message */
906  conn->in_publish_msg.first_chunk = 1;
907  }
908 }
909 /*---------------------------------------------------------------------------*/
910 static int
911 tcp_input(struct tcp_socket *s,
912  void *ptr,
913  const uint8_t *input_data_ptr,
914  int input_data_len)
915 {
916  struct mqtt_connection *conn = ptr;
917  uint32_t pos = 0;
918  uint32_t copy_bytes = 0;
919  uint8_t byte;
920 
921  if(input_data_len == 0) {
922  return 0;
923  }
924 
925  if(conn->in_packet.packet_received) {
926  reset_packet(&conn->in_packet);
927  }
928 
929  DBG("tcp_input with %i bytes of data:\n", input_data_len);
930 
931  /* Read the fixed header field, if we do not have it */
932  if(!conn->in_packet.fhdr) {
933  conn->in_packet.fhdr = input_data_ptr[pos++];
934  conn->in_packet.byte_counter++;
935 
936  DBG("MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr);
937 
938  if(pos >= input_data_len) {
939  return 0;
940  }
941  }
942 
943  /* Read the Remaining Length field, if we do not have it */
944  if(!conn->in_packet.has_remaining_length) {
945  do {
946  if(pos >= input_data_len) {
947  return 0;
948  }
949 
950  byte = input_data_ptr[pos++];
951  conn->in_packet.byte_counter++;
952  conn->in_packet.remaining_length_bytes++;
953  DBG("MQTT - Read Remaining Length byte\n");
954 
955  if(conn->in_packet.byte_counter > 5) {
956  call_event(conn, MQTT_EVENT_ERROR, NULL);
957  DBG("Received more then 4 byte 'remaining lenght'.");
958  return 0;
959  }
960 
961  conn->in_packet.remaining_length +=
962  (byte & 127) * conn->in_packet.remaining_multiplier;
963  conn->in_packet.remaining_multiplier *= 128;
964  } while((byte & 128) != 0);
965 
966  DBG("MQTT - Finished reading remaining length byte\n");
967  conn->in_packet.has_remaining_length = 1;
968  }
969 
970  /*
971  * Check for unsupported payload length. Will read all incoming data from the
972  * server in any case and then reset the packet.
973  *
974  * TODO: Decide if we, for example, want to disconnect instead.
975  */
976  if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) &&
977  (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) {
978 
979  PRINTF("MQTT - Error, unsupported payload size for non-PUBLISH message\n");
980 
981  conn->in_packet.byte_counter += input_data_len;
982  if(conn->in_packet.byte_counter >=
983  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
984  conn->in_packet.packet_received = 1;
985  }
986  return 0;
987  }
988 
989  /*
990  * Supported payload, reads out both VHDR and Payload of all packets.
991  *
992  * Note: There will always be at least one byte left to read when we enter
993  * this loop.
994  */
995  while(conn->in_packet.byte_counter <
996  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) {
997 
998  if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH &&
999  conn->in_packet.topic_received == 0) {
1000  parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len);
1001  }
1002 
1003  /* Read in as much as we can into the packet payload */
1004  copy_bytes = MIN(input_data_len - pos,
1005  MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos);
1006  DBG("- Copied %lu payload bytes\n", copy_bytes);
1007  memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos],
1008  &input_data_ptr[pos],
1009  copy_bytes);
1010  conn->in_packet.byte_counter += copy_bytes;
1011  conn->in_packet.payload_pos += copy_bytes;
1012  pos += copy_bytes;
1013 
1014  uint8_t i;
1015  DBG("MQTT - Copied bytes: \n");
1016  for(i = 0; i < copy_bytes; i++) {
1017  DBG("%02X ", conn->in_packet.payload[i]);
1018  }
1019  DBG("\n");
1020 
1021  /* Full buffer, shall only happen to PUBLISH messages. */
1022  if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) {
1023  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1024  conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE;
1025  conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE;
1026 
1027  handle_publish(conn);
1028 
1029  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1030  conn->in_packet.payload_pos = 0;
1031  }
1032 
1033  if(pos >= input_data_len &&
1034  (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) {
1035  return 0;
1036  }
1037  }
1038 
1039  /* Debug information */
1040  DBG("\n");
1041  /* Take care of input */
1042  DBG("MQTT - Finished reading packet!\n");
1043  /* What to return? */
1044  DBG("MQTT - total data was %i bytes of data. \n",
1045  (MQTT_FHDR_SIZE + conn->in_packet.remaining_length));
1046 
1047  /* Handle packet here. */
1048  switch(conn->in_packet.fhdr & 0xF0) {
1049  case MQTT_FHDR_MSG_TYPE_CONNACK:
1050  handle_connack(conn);
1051  break;
1052  case MQTT_FHDR_MSG_TYPE_PUBLISH:
1053  /* This is the only or the last chunk of publish payload */
1054  conn->in_publish_msg.payload_chunk = conn->in_packet.payload;
1055  conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos;
1056  conn->in_publish_msg.payload_left = 0;
1057  handle_publish(conn);
1058  break;
1059  case MQTT_FHDR_MSG_TYPE_PUBACK:
1060  handle_puback(conn);
1061  break;
1062  case MQTT_FHDR_MSG_TYPE_SUBACK:
1063  handle_suback(conn);
1064  break;
1065  case MQTT_FHDR_MSG_TYPE_UNSUBACK:
1066  handle_unsuback(conn);
1067  break;
1068  case MQTT_FHDR_MSG_TYPE_PINGRESP:
1069  handle_pingresp(conn);
1070  break;
1071 
1072  /* QoS 2 not implemented yet */
1073  case MQTT_FHDR_MSG_TYPE_PUBREC:
1074  case MQTT_FHDR_MSG_TYPE_PUBREL:
1075  case MQTT_FHDR_MSG_TYPE_PUBCOMP:
1076  call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL);
1077  PRINTF("MQTT - Got unhandled MQTT Message Type '%i'",
1078  (conn->in_packet.fhdr & 0xF0));
1079  break;
1080 
1081  default:
1082  /* All server-only message */
1083  PRINTF("MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0));
1084  break;
1085  }
1086 
1087  conn->in_packet.packet_received = 1;
1088 
1089  return 0;
1090 }
1091 /*---------------------------------------------------------------------------*/
1092 /*
1093  * Handles TCP events from Simple TCP
1094  */
1095 static void
1096 tcp_event(struct tcp_socket *s, void *ptr, tcp_socket_event_t event)
1097 {
1098  struct mqtt_connection *conn = ptr;
1099 
1100  /* Take care of event */
1101  switch(event) {
1102 
1103  /* Fall through to manage different disconnect event the same way. */
1104  case TCP_SOCKET_CLOSED:
1105  case TCP_SOCKET_TIMEDOUT:
1106  case TCP_SOCKET_ABORTED: {
1107 
1108  DBG("MQTT - Disconnected by tcp event %d\n", event);
1109  process_post(&mqtt_process, mqtt_abort_now_event, conn);
1110  conn->state = MQTT_CONN_STATE_NOT_CONNECTED;
1111  ctimer_stop(&conn->keep_alive_timer);
1112  call_event(conn, MQTT_EVENT_DISCONNECTED, &event);
1113  abort_connection(conn);
1114 
1115  /* If connecting retry */
1116  if(conn->auto_reconnect == 1) {
1117  connect_tcp(conn);
1118  }
1119  break;
1120  }
1121  case TCP_SOCKET_CONNECTED: {
1122  conn->state = MQTT_CONN_STATE_TCP_CONNECTED;
1123  conn->out_buffer_sent = 1;
1124 
1125  process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn);
1126  break;
1127  }
1128  case TCP_SOCKET_DATA_SENT: {
1129  DBG("MQTT - Got TCP_DATA_SENT\n");
1130 
1131  if(conn->socket.output_data_len == 0) {
1132  conn->out_buffer_sent = 1;
1133  conn->out_buffer_ptr = conn->out_buffer;
1134  }
1135 
1136  ctimer_restart(&conn->keep_alive_timer);
1137  break;
1138  }
1139 
1140  default: {
1141  DBG("MQTT - TCP Event %d is currently not managed by the tcp event callback\n",
1142  event);
1143  }
1144  }
1145 }
1146 /*---------------------------------------------------------------------------*/
1147 PROCESS_THREAD(mqtt_process, ev, data)
1148 {
1149  static struct mqtt_connection *conn;
1150 
1151  PROCESS_BEGIN();
1152 
1153  while(1) {
1155 
1156  if(ev == mqtt_abort_now_event) {
1157  DBG("MQTT - Abort\n");
1158  conn = data;
1159  conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE;
1160 
1161  abort_connection(conn);
1162  }
1163  if(ev == mqtt_do_connect_tcp_event) {
1164  conn = data;
1165  DBG("MQTT - Got mqtt_do_connect_tcp_event!\n");
1166  connect_tcp(conn);
1167  }
1168  if(ev == mqtt_do_connect_mqtt_event) {
1169  conn = data;
1170  conn->socket.output_data_max_seg = conn->max_segment_size;
1171  DBG("MQTT - Got mqtt_do_connect_mqtt_event!\n");
1172 
1173  if(conn->out_buffer_sent == 1) {
1174  PT_INIT(&conn->out_proto_thread);
1175  while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1176  conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1177  PT_MQTT_WAIT_SEND();
1178  }
1179  }
1180  }
1181  if(ev == mqtt_do_disconnect_mqtt_event) {
1182  conn = data;
1183  DBG("MQTT - Got mqtt_do_disconnect_mqtt_event!\n");
1184 
1185  /* Send MQTT Disconnect if we are connected */
1186  if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
1187  if(conn->out_buffer_sent == 1) {
1188  PT_INIT(&conn->out_proto_thread);
1189  while(disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1190  conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
1191  PT_MQTT_WAIT_SEND();
1192  }
1193  abort_connection(conn);
1194  call_event(conn, MQTT_EVENT_DISCONNECTED, &ev);
1195  } else {
1196  process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1197  }
1198  }
1199  }
1200  if(ev == mqtt_do_pingreq_event) {
1201  conn = data;
1202  DBG("MQTT - Got mqtt_do_pingreq_event!\n");
1203 
1204  if(conn->out_buffer_sent == 1 &&
1205  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1206  PT_INIT(&conn->out_proto_thread);
1207  while(pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1208  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1209  PT_MQTT_WAIT_SEND();
1210  }
1211  }
1212  }
1213  if(ev == mqtt_do_subscribe_event) {
1214  conn = data;
1215  DBG("MQTT - Got mqtt_do_subscribe_mqtt_event!\n");
1216 
1217  if(conn->out_buffer_sent == 1 &&
1218  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1219  PT_INIT(&conn->out_proto_thread);
1220  while(subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1221  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1222  PT_MQTT_WAIT_SEND();
1223  }
1224  }
1225  }
1226  if(ev == mqtt_do_unsubscribe_event) {
1227  conn = data;
1228  DBG("MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n");
1229 
1230  if(conn->out_buffer_sent == 1 &&
1231  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1232  PT_INIT(&conn->out_proto_thread);
1233  while(unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1234  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1235  PT_MQTT_WAIT_SEND();
1236  }
1237  }
1238  }
1239  if(ev == mqtt_do_publish_event) {
1240  conn = data;
1241  DBG("MQTT - Got mqtt_do_publish_mqtt_event!\n");
1242 
1243  if(conn->out_buffer_sent == 1 &&
1244  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1245  PT_INIT(&conn->out_proto_thread);
1246  while(publish_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
1247  conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1248  PT_MQTT_WAIT_SEND();
1249  }
1250  }
1251  }
1252  }
1253  PROCESS_END();
1254 }
1255 /*---------------------------------------------------------------------------*/
1256 void
1257 mqtt_init(void)
1258 {
1259  static uint8_t inited = 0;
1260  if(!inited) {
1261  mqtt_do_connect_tcp_event = process_alloc_event();
1262  mqtt_event_min = mqtt_do_connect_tcp_event;
1263 
1264  mqtt_do_connect_mqtt_event = process_alloc_event();
1265  mqtt_do_disconnect_mqtt_event = process_alloc_event();
1266  mqtt_do_subscribe_event = process_alloc_event();
1267  mqtt_do_unsubscribe_event = process_alloc_event();
1268  mqtt_do_publish_event = process_alloc_event();
1269  mqtt_do_pingreq_event = process_alloc_event();
1270  mqtt_update_event = process_alloc_event();
1271  mqtt_abort_now_event = process_alloc_event();
1272  mqtt_event_max = mqtt_abort_now_event;
1273 
1274  mqtt_continue_send_event = process_alloc_event();
1275 
1276  list_init(mqtt_conn_list);
1277  process_start(&mqtt_process, NULL);
1278  inited = 1;
1279  }
1280 }
1281 /*---------------------------------------------------------------------------*/
1282 mqtt_status_t
1283 mqtt_register(struct mqtt_connection *conn, struct process *app_process,
1284  char *client_id, mqtt_event_callback_t event_callback,
1285  uint16_t max_segment_size)
1286 {
1287  if(strlen(client_id) < 1) {
1288  return MQTT_STATUS_INVALID_ARGS_ERROR;
1289  }
1290 
1291  /* Set defaults - Set all to zero to begin with */
1292  memset(conn, 0, sizeof(struct mqtt_connection));
1293  string_to_mqtt_string(&conn->client_id, client_id);
1294  conn->event_callback = event_callback;
1295  conn->app_process = app_process;
1296  conn->auto_reconnect = 1;
1297  conn->max_segment_size = max_segment_size;
1298  reset_defaults(conn);
1299 
1300  mqtt_init();
1301  list_add(mqtt_conn_list, conn);
1302 
1303  DBG("MQTT - Registered successfully\n");
1304 
1305  return MQTT_STATUS_OK;
1306 }
1307 /*---------------------------------------------------------------------------*/
1308 /*
1309  * Connect to MQTT broker.
1310  *
1311  * N.B. Non-blocking call.
1312  */
1313 mqtt_status_t
1314 mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port,
1315  uint16_t keep_alive)
1316 {
1317  uip_ip6addr_t ip6addr;
1318  uip_ipaddr_t *ipaddr;
1319  ipaddr = &ip6addr;
1320 
1321  /* Check if we are already trying to connect */
1322  if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) {
1323  return MQTT_STATUS_OK;
1324  }
1325 
1326  conn->server_host = host;
1327  conn->keep_alive = keep_alive;
1328  conn->server_port = port;
1329  conn->out_buffer_ptr = conn->out_buffer;
1330  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1331  conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG;
1332 
1333  /* convert the string IPv6 address to a numeric IPv6 address */
1334  uiplib_ip6addrconv(host, &ip6addr);
1335 
1336  uip_ipaddr_copy(&(conn->server_ip), ipaddr);
1337 
1338  /*
1339  * Initiate the connection if the IP could be resolved. Otherwise the
1340  * connection will be initiated when the DNS lookup is finished, in the main
1341  * event loop.
1342  */
1343  process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn);
1344 
1345  return MQTT_STATUS_OK;
1346 }
1347 /*----------------------------------------------------------------------------*/
1348 void
1349 mqtt_disconnect(struct mqtt_connection *conn)
1350 {
1351  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1352  return;
1353  }
1354 
1355  conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT;
1356 
1357  process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn);
1358 }
1359 /*----------------------------------------------------------------------------*/
1360 mqtt_status_t
1361 mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1362  mqtt_qos_level_t qos_level)
1363 {
1364  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1365  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1366  }
1367 
1368  DBG("MQTT - Call to mqtt_subscribe...\n");
1369 
1370  /* Currently don't have a queue, so only one item at a time */
1371  if(conn->out_queue_full) {
1372  DBG("MQTT - Not accepted!\n");
1373  return MQTT_STATUS_OUT_QUEUE_FULL;
1374  }
1375  conn->out_queue_full = 1;
1376  DBG("MQTT - Accepted!\n");
1377 
1378  conn->out_packet.mid = INCREMENT_MID(conn);
1379  conn->out_packet.topic = topic;
1380  conn->out_packet.topic_length = strlen(topic);
1381  conn->out_packet.qos = qos_level;
1382  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1383 
1384  process_post(&mqtt_process, mqtt_do_subscribe_event, conn);
1385  return MQTT_STATUS_OK;
1386 }
1387 /*----------------------------------------------------------------------------*/
1388 mqtt_status_t
1389 mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
1390 {
1391  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1392  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1393  }
1394 
1395  DBG("MQTT - Call to mqtt_unsubscribe...\n");
1396  /* Currently don't have a queue, so only one item at a time */
1397  if(conn->out_queue_full) {
1398  DBG("MQTT - Not accepted!\n");
1399  return MQTT_STATUS_OUT_QUEUE_FULL;
1400  }
1401  conn->out_queue_full = 1;
1402  DBG("MQTT - Accepted!\n");
1403 
1404  conn->out_packet.mid = INCREMENT_MID(conn);
1405  conn->out_packet.topic = topic;
1406  conn->out_packet.topic_length = strlen(topic);
1407  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1408 
1409  process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn);
1410  return MQTT_STATUS_OK;
1411 }
1412 /*----------------------------------------------------------------------------*/
1413 mqtt_status_t
1414 mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic,
1415  uint8_t *payload, uint32_t payload_size,
1416  mqtt_qos_level_t qos_level, mqtt_retain_t retain)
1417 {
1418  if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
1419  return MQTT_STATUS_NOT_CONNECTED_ERROR;
1420  }
1421 
1422  DBG("MQTT - Call to mqtt_publish...\n");
1423 
1424  /* Currently don't have a queue, so only one item at a time */
1425  if(conn->out_queue_full) {
1426  DBG("MQTT - Not accepted!\n");
1427  return MQTT_STATUS_OUT_QUEUE_FULL;
1428  }
1429  conn->out_queue_full = 1;
1430  DBG("MQTT - Accepted!\n");
1431 
1432  conn->out_packet.mid = INCREMENT_MID(conn);
1433  conn->out_packet.retain = retain;
1434  conn->out_packet.topic = topic;
1435  conn->out_packet.topic_length = strlen(topic);
1436  conn->out_packet.payload = payload;
1437  conn->out_packet.payload_size = payload_size;
1438  conn->out_packet.qos = qos_level;
1439  conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK;
1440 
1441  process_post(&mqtt_process, mqtt_do_publish_event, conn);
1442  return MQTT_STATUS_OK;
1443 }
1444 /*----------------------------------------------------------------------------*/
1445 void
1446 mqtt_set_username_password(struct mqtt_connection *conn, char *username,
1447  char *password)
1448 {
1449  /* Set strings, NULL string will simply set length to zero */
1450  string_to_mqtt_string(&conn->credentials.username, username);
1451  string_to_mqtt_string(&conn->credentials.password, password);
1452 
1453  /* Set CONNECT VHDR flags */
1454  if(username != NULL) {
1455  conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG;
1456  } else {
1457  conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG;
1458  }
1459  if(password != NULL) {
1460  conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG;
1461  } else {
1462  conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG;
1463  }
1464 }
1465 /*----------------------------------------------------------------------------*/
1466 void
1467 mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message,
1468  mqtt_qos_level_t qos)
1469 {
1470  /* Set strings, NULL string will simply set length to zero */
1471  string_to_mqtt_string(&conn->will.topic, topic);
1472  string_to_mqtt_string(&conn->will.message, message);
1473 
1474  /* Currently not used! */
1475  conn->will.qos = qos;
1476 
1477  if(topic != NULL) {
1478  conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG |
1479  MQTT_VHDR_WILL_RETAIN_FLAG;
1480  }
1481 }
1482 /*----------------------------------------------------------------------------*/
1483 /** @} */
mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive)
Connects to a MQTT broker.
Definition: mqtt.c:1314
static uip_ipaddr_t ipaddr
Pointer to prefix information option in uip_buf.
Definition: uip-nd6.c:129
mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size)
Initializes the MQTT engine.
Definition: mqtt.c:1283
Protothreads implementation.
Header file for the Contiki MQTT engine.
#define PT_INIT(pt)
Initialize a protothread.
Definition: pt.h:79
#define LIST(name)
Declare a linked list.
Definition: list.h:86
mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain)
Publish to a MQTT topic.
Definition: mqtt.c:1414
Header file for IPv6-related data structures.
void mqtt_disconnect(struct mqtt_connection *conn)
Disconnects from a MQTT broker.
Definition: mqtt.c:1349
Default definitions of C compiler quirk work-arounds.
void timer_set(struct timer *t, clock_time_t interval)
Set a timer.
Definition: timer.c:64
#define PROCESS_END()
Define the end of a process.
Definition: process.h:131
#define PROCESS(name, strname)
Declare a process.
Definition: process.h:307
#define PROCESS_THREAD(name, ev, data)
Define the body of a process.
Definition: process.h:273
#define PT_WAIT_UNTIL(pt, condition)
Block and wait until condition is true.
Definition: pt.h:147
mqtt_event_t
MQTT engine events.
Definition: mqtt.h:146
#define uip_ipaddr_copy(dest, src)
Copy an IP address from one place to another.
Definition: uip.h:1027
int process_post(struct process *p, process_event_t ev, process_data_t data)
Post an asynchronous event.
Definition: process.c:322
process_event_t process_alloc_event(void)
Allocate a global event number.
Definition: process.c:93
void list_init(list_t list)
Initialize a list.
Definition: list.c:66
#define PT_EXIT(pt)
Exit the protothread.
Definition: pt.h:245
Header file for the callback timer
void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password)
Set the user name and password for a MQTT client.
Definition: mqtt.c:1446
#define NULL
The null pointer.
#define PT_THREAD(name_args)
Declaration of a protothread.
Definition: pt.h:99
#define PT_BEGIN(pt)
Declare the start of a protothread inside the C function implementing the protothread.
Definition: pt.h:114
void list_add(list_t list, void *item)
Add an item at the end of a list.
Definition: list.c:143
void ctimer_restart(struct ctimer *c)
Restart a callback timer from the current point in time.
Definition: ctimer.c:137
#define CLOCK_SECOND
A second, measured in system clock time.
Definition: clock.h:82
Linked list manipulation routines.
Header file for the uIP TCP/IP stack.
void process_start(struct process *p, process_data_t data)
Start a process.
Definition: process.c:99
Event timer header file.
void ctimer_set(struct ctimer *c, clock_time_t t, void(*f)(void *), void *ptr)
Set a callback timer.
Definition: ctimer.c:99
#define PT_END(pt)
Declare the end of a protothread.
Definition: pt.h:126
void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos)
Set the last will topic and message for a MQTT client.
Definition: mqtt.c:1467
void ctimer_stop(struct ctimer *c)
Stop a pending callback timer.
Definition: ctimer.c:149
mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic)
Unsubscribes from a MQTT topic.
Definition: mqtt.c:1389
mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level)
Subscribes to a MQTT topic.
Definition: mqtt.c:1361
#define PROCESS_WAIT_EVENT()
Wait for an event to be posted to the process.
Definition: process.h:141
void(* mqtt_event_callback_t)(struct mqtt_connection *m, mqtt_event_t event, void *data)
MQTT event callback function.
Definition: mqtt.h:298
int timer_expired(struct timer *t)
Check if a timer has expired.
Definition: timer.c:122
#define PROCESS_BEGIN()
Define the beginning of a process.
Definition: process.h:120