×

Welcome to TagMyCode

Please login or create account to add a snippet.
0
0
 
0
Language: C++
Posted by: kus bhatt
Added: Nov 3, 2016 1:15 PM
Views: 2147
Tags: no tags
  1. #include <string.h>
  2. #include <csignal>
  3. #include "MQTTAsync.h"
  4. #include"MQTTClientPersistence.h"
  5. #include"mqtt/TestTimeout.h"
  6. #include"mqtt/SenseShared.h"
  7. #include"mqtt/Sens_data.h"
  8. #include <thread>
  9. #include <cstdint>
  10. #include "mqtt/mqtt.h"
  11. #include <sys/types.h>
  12. #include <unistd.h>
  13. #include <dlfcn.h>
  14. #include <iostream>
  15. #include <algorithm>
  16. #include <memory>
  17. #include <sys/time.h>
  18. #include <stdlib.h>
  19. #include <stdio.h>
  20. #include <vector>
  21. #include <stdbool.h>
  22. using namespace std;
  23. char *client_name; // -c
  24. //const char *ip_addr = "52.29.27.181"; // -i
  25. const char *ip_addr="192.168.0.102";
  26. uint32_t port = 9876; // -p
  27. //uint32_t port=1883;
  28. //char *username = "ykhbjdvu";
  29. char *username = NULL;
  30. char *password = NULL;
  31. //void* (*create)();
  32. //   const char *topic = "mydome/temp/value";
  33. //char *topic;
  34. // const char *topic = "rpi2/temp";
  35. MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  36. volatile int finished = 0;
  37. char* topic = NULL;
  38. char* buffer = NULL;
  39. int subscribed = 0;
  40. int disconnected = 0;
  41. volatile int toStop = 0;
  42.  
  43. struct {
  44.     char* clientid;
  45.     int nodelimiter;
  46.     char delimiter;
  47.     int maxdatalen;
  48.     int qos;
  49.     int retained;
  50.     char* username;
  51.     char* password;
  52.     char* host;
  53.     char* port;
  54.     int showtopics;
  55.     int verbose;
  56.     int keepalive;
  57. } opts = {
  58.     "stdout-subscriber-async", 1, '\n', 100, 2, 0, NULL, NULL, "broker.hivemq.com", "1883", 0, 0, 10
  59. };
  60.  
  61. void personalised_usage() {
  62.     cout << "mqtt client application" << endl;
  63.     cout << "Usage::mqtt_applcation <personalised_mqtt/paho_mqtt><subscribe/publish><unique clientname><topicname>" << endl;
  64.     cout << "--qos<Quality of Service>(this feature is available for paho_mqtt) default is" << opts.qos << endl;
  65.     cout << "--host <hostname> default is " << opts.host << endl;
  66.     cout << "--port<port> default is" << opts.port << endl;
  67.     //cout << "--username default is" << opts.username << endl;
  68.     //cout << "--password default " << opts.password << endl;
  69.     cout << "--showtopics (this feature is available for paho_mqtt ;default is on if the topic has a wildcard, else off)<on/off>" << endl;
  70.     cout << "--keepalive(this feature is available for paho_mqtt;default is 10 seconds)" << endl;
  71.     sleep(10);
  72.     exit(EXIT_FAILURE);
  73. }
  74.  
  75. void cfinish(int sig) {
  76.     signal(SIGINT, NULL);
  77.     finished = 1;
  78. }
  79.  
  80. void cfinish_pub(int sig) {
  81.     signal(SIGINT, NULL);
  82.     toStop = 1;
  83. }
  84. //void paho_lib_opn(void *paho_handle) {
  85. //  paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
  86. //  if (!paho_handle) {
  87. //   cerr << "Cannot open library: " << dlerror() << '\n';
  88. // return 1;
  89. //  } else {
  90. //     cout << "you have successfully opened  the  library" << endl;
  91. //}
  92. //}
  93.  
  94. void getopts(int argc, char** argv) {
  95.     int count = 5;
  96.     while (count < argc) {
  97.         if (strcmp(argv[count], "--qos") == 0) {
  98.             if (++count < argc) {
  99.                
  100.                                 if (strcmp(argv[count], "0") == 0)
  101.                                         opts.qos = 0;
  102.                                 else if (strcmp(argv[count], "1") == 0)
  103.                                         opts.qos = 1;
  104.                                 else if (strcmp(argv[count], "2") == 0)
  105.                                         opts.qos = 2;
  106.                     } else
  107.                 personalised_usage();
  108.             } else if (strcmp(argv[count], "--host") == 0) {
  109.             if (++count < argc) {
  110.                 if (strcmp(argv[1], "personalised_mqtt") == 0) {
  111.                     ip_addr = argv[count];
  112.                 } else if (strcmp(argv[1], "paho_mqtt") == 0) {
  113.                     opts.host = argv[count];
  114.                 }
  115.             } else
  116.                 personalised_usage();
  117.             } else if (strcmp(argv[count], "--port") == 0) {
  118.             if (++count < argc) {
  119.                 if (strcmp(argv[1], "personalised_mqtt") == 0) {
  120.                     port = atoi(argv[count]);
  121.                 } else if (strcmp(argv[1], "paho_mqtt") == 0) {
  122.                     opts.port = argv[count];
  123.                 }
  124.             } else
  125.                 personalised_usage();
  126.             } else if (strcmp(argv[count], "--username") == 0) {
  127.             if (++count < argc) {
  128.                 if (strcmp(argv[1], "personalised_mqtt") == 0) {
  129.                     username = argv[count];
  130.                 } else if (strcmp(argv[1], "paho_mqtt") == 0) {
  131.                     opts.username = argv[count];
  132.                 }
  133.             } else
  134.                 personalised_usage();
  135.             } else if (strcmp(argv[count], "--password") == 0) {
  136.             if (++count < argc) {
  137.                 if (strcmp(argv[1], "personalised_mqtt") == 0) {
  138.                     password = argv[count];
  139.                 } else if (strcmp(argv[1], "paho_mqtt") == 0) {
  140.                     opts.password = argv[count];
  141.                 }
  142.             } else
  143.                 personalised_usage();
  144.             } else if (strcmp(argv[count], "--showtopics") == 0) {
  145.             if (++count < argc) {
  146.                 if (strcmp(argv[count], "on") == 0)
  147.                         opts.showtopics = 1;
  148.                 else if (strcmp(argv[count], "off") == 0)
  149.                         opts.showtopics = 0;
  150.                 else
  151.                     personalised_usage();
  152.                 } else
  153.                 personalised_usage();
  154.             } else if (strcmp(argv[count], "--keepalive") == 0) {
  155.             if (++count < argc)
  156.                     opts.keepalive = atoi(argv[count]);
  157.  
  158.             else
  159.                 personalised_usage();
  160.             }
  161.         count++;
  162.     }
  163.  
  164. }
  165.  
  166. int messageArrived_pub(void* context, char* topicName, int topicLen, MQTTAsync_message* m) {
  167.  
  168.     /* not expecting any messages */
  169.     return 1;
  170. }
  171.  
  172.  
  173. static int disconnected_pub = 0;
  174.  
  175.         void onDisconnect_pub(void* context, MQTTAsync_successData* response) {
  176.  
  177.     disconnected_pub = 1;
  178. }
  179.  
  180.  
  181. static int connected = 0;
  182.         void myconnect(MQTTAsync* client);
  183.  
  184.         void onConnectFailure_pub(void* context, MQTTAsync_failureData* response) {
  185.  
  186.     printf("Connect failed, rc %d\n", response ? response->code : -1);
  187.             connected = -1;
  188.  
  189.             MQTTAsync* client = (MQTTAsync*) context;
  190.             myconnect(client);
  191. }
  192.  
  193. void onConnect_pub(void* context, MQTTAsync_successData* response) {
  194.  
  195.     MQTTAsync* client = (MQTTAsync*) context;
  196.             int rc;
  197.  
  198.             printf("Connected\n");
  199.             connected = 1;
  200. }
  201.  
  202. void myconnect(MQTTAsync* client) {
  203.     MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
  204.             MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
  205.             int rc = 0;
  206.             printf("Connecting\n");
  207.             conn_opts.keepAliveInterval = opts.keepalive;
  208.             conn_opts.cleansession = 1;
  209.             conn_opts.username = opts.username;
  210.             conn_opts.password = opts.password;
  211.             conn_opts.onSuccess = onConnect_pub;
  212.             conn_opts.onFailure = onConnectFailure_pub;
  213.             conn_opts.context = client;
  214.             ssl_opts.enableServerCertAuth = 0;
  215.             conn_opts.ssl = &ssl_opts;
  216.             conn_opts.automaticReconnect = 1;
  217.             connected = 0;
  218.             void *paho_handle;
  219.             paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
  220.             typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*);
  221.             paho_asyn_conn app_ascon;
  222.             app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect");
  223.     if (!app_ascon) {
  224.         cout << "eror with" << dlerror() << endl;
  225.     }
  226.     if ((rc = app_ascon(*client, &conn_opts)) != MQTTASYNC_SUCCESS) {
  227.  
  228.         printf("Failed to start connect, return code %d\n", rc);
  229.                 exit(EXIT_FAILURE);
  230.     }
  231. }
  232.  
  233.  
  234. static int published = 0;
  235.  
  236.         void onPublishFailure(void* context, MQTTAsync_failureData* response) {
  237.  
  238.     printf("Publish failed, rc %d\n", response ? -1 : response->code);
  239.             published = -1;
  240. }
  241.  
  242. void onPublish(void* context, MQTTAsync_successData* response) {
  243.  
  244.     MQTTAsync client = (MQTTAsync) context;
  245.  
  246.             published = 1;
  247. }
  248.  
  249. void connectionLost_pub(void *context, char *cause) {
  250.     MQTTAsync client = (MQTTAsync) context;
  251.             int rc;
  252.             void *paho_handle;
  253.             paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
  254.             typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*);
  255.             paho_asyn_conn app_ascon;
  256.             app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect");
  257.     if (!app_ascon) {
  258.         cout << "eror with" << dlerror() << endl;
  259.     }
  260.     printf("connectionLost called\n");
  261.     if ((rc = app_ascon(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
  262.  
  263.         printf("Failed to start reconnect, return code %d\n", rc);
  264.                 finished = 1;
  265.     }
  266. }
  267. //SenseData *sense_data,**sense_reuse;
  268. bool obj_flag=false;
  269. string stored_msg="";
  270. int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) {
  271.     void *personalised_handle;
  272.             personalised_handle = dlopen("/home/pom/NetBeansProjects/mqtt_dummy/lib/libmqttpp.so", RTLD_NOW);
  273.     if (!personalised_handle) {
  274.         cerr << "Cannot open personalised library: " << dlerror() << '\n';
  275.         return 1;
  276.     } else {
  277.         cout << "you have successfully opened  your personalised library" << endl;
  278.     }
  279.              typedef SenseData*(*create_sense_t)();
  280.              typedef void (*destroy_sense_t)(SenseData*);
  281.              create_sense_t sense_creat=(create_sense_t)dlsym(personalised_handle,"sense_create");
  282.              destroy_sense_t sense_destry=(destroy_sense_t)dlsym(personalised_handle,"sense_destroy");
  283.          //    char *msg=new char[100];
  284.        
  285.              if(!sense_creat){
  286.                  cout<<"error with"<<dlerror()<<endl;
  287.              }
  288.              if(!sense_destry){
  289.                   cout<<"error with"<<dlerror()<<endl;
  290.              }
  291.            SenseData *sense_data=sense_creat();
  292.              
  293.     if (opts.showtopics)
  294.             printf("%s\t", topicName);
  295.         if (opts.nodelimiter) {
  296.             char *msg_buffer=new char[100];
  297.            printf("%.*s  \n", message->payloadlen, (char*) message->payload);
  298.            msg_buffer=(char*) message->payload;
  299.         //   temp_sense.check_dup_temp(message);
  300.          // temp_sense.show_unq_temp();
  301.            //  if(obj_flag==true){
  302.                //  cout<<"value of the store_temp  after  initialisation is "<<sense_data->store_temp;
  303.                 // sense_data->check_dup_temp(msg);
  304.             //    sense_data->show_unq_temp();
  305.                // sense_data->store_temp=msg;
  306.                 // sense_reuse=&sense_data;
  307.                 //  cout<<"value of the store_temp  after  initialisation is "<<(*sense_reuse)->store_temp;
  308.                 //  (*sense_reuse)->store_temp=msg_buffer;
  309.                  // delete(*sense_reuse);
  310.          //    }else{
  311.             //     sense_data=sense_creat();
  312.            //      obj_flag=true;
  313.            if(stored_msg!=""){
  314.                cout<<"the stored msg is "<<stored_msg<<endl;
  315.              sense_data->store_temp=(char*)(stored_msg.c_str());
  316.            }
  317.               sense_data->check_dup_temp(msg_buffer);
  318.                 sense_data->show_unq_temp();
  319.                 stored_msg=msg_buffer;
  320.                 sense_data->store_temp=msg_buffer;
  321.                 cout<<"the buffer is"<<sense_data->store_temp<<endl;
  322.              }
  323.           // sense_data->someone();
  324.                     // cout<<"the message printed here"<<endl;
  325.           //  cout<<"the buffer is"<<msg<<endl;
  326.            
  327.         else{
  328.             printf("%.*s%c", message->payloadlen, (char*) message->payload, opts.delimiter);
  329.         }
  330.  
  331.     fflush(stdout);
  332.             void *paho_handle;
  333.             paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
  334.     if (!paho_handle) {
  335.         cerr << "Cannot open library: " << dlerror() << '\n';
  336.                 // return 1;
  337.     } else {
  338.         cout << "you have successfully opened  the  library" << endl;
  339.     }
  340.     typedef void (*paho_asyn_frmsg)(MQTTAsync_message**);
  341.             paho_asyn_frmsg app_frmsg;
  342.             app_frmsg = (paho_asyn_frmsg) dlsym(paho_handle, "MQTTAsync_freeMessage");
  343.     if (!app_frmsg) {
  344.         cout << "The error is " << dlerror();
  345.     }
  346.     typedef void(*paho_asyn_fr)(void*);
  347.             paho_asyn_fr app_fr;
  348.             app_fr = (paho_asyn_fr) dlsym(paho_handle, "MQTTAsync_free");
  349.     if (!app_fr) {
  350.         cout << "The error is " << dlerror();
  351.     }
  352.     app_frmsg(&message);
  353.             app_fr(topicName);
  354.             //MQTTAsync_freeMessage(&message);
  355.             //MQTTAsync_free(topicName);
  356.  
  357.     return 1;
  358. }
  359.  
  360. void onDisconnect(void* context, MQTTAsync_successData* response) {
  361.  
  362.     disconnected = 1;
  363. }
  364.  
  365. void onSubscribe(void* context, MQTTAsync_successData* response) {
  366.  
  367.     subscribed = 1;
  368. }
  369.  
  370. void onSubscribeFailure(void* context, MQTTAsync_failureData* response) {
  371.  
  372.     printf("Subscribe failed, rc %d\n", response->code);
  373.             finished = 1;
  374. }
  375.  
  376. void onConnectFailure(void* context, MQTTAsync_failureData* response) {
  377.  
  378.     printf("Connect failed, rc %d\n", response->code);
  379.             finished = 1;
  380. }
  381.  
  382. void onConnect(void* context, MQTTAsync_successData* response) {
  383.     MQTTAsync client = (MQTTAsync) context;
  384.             MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
  385.             MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  386.             int rc;
  387.             void *paho_handle;
  388.             paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
  389.             typedef int(*paho_asyn_subscrb)(MQTTAsync, const char*, int, MQTTAsync_responseOptions*);
  390.             paho_asyn_subscrb app_assub;
  391.             app_assub = (paho_asyn_subscrb) dlsym(paho_handle, "MQTTAsync_subscribe");
  392.     if (!app_assub) {
  393.         cout << "The error is " << dlerror();
  394.     }
  395.     if (opts.showtopics)
  396.             printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos);
  397.  
  398.             ropts.onSuccess = onSubscribe;
  399.             ropts.onFailure = onSubscribeFailure;
  400.             ropts.context = client;
  401.         if ((rc = app_assub(client, topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS) {
  402.  
  403.             printf("Failed to start subscribe, return code %d\n", rc);
  404.                     finished = 1;
  405.         }
  406. }
  407.  
  408. void connectionLost(void *context, char *cause) {
  409.     MQTTAsync client = (MQTTAsync) context;
  410.             int rc;
  411.             void *paho_handle;
  412.             paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
  413.             typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*);
  414.             paho_asyn_conn app_ascon;
  415.             app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect");
  416.             printf("connectionLost called\n");
  417.     if ((rc = app_ascon(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
  418.  
  419.         printf("Failed to start reconnect, return code %d\n", rc);
  420.                 finished = 1;
  421.     }
  422. }
  423.  
  424. char *client_packet;
  425.         int counter = 0;
  426.         mqtt_broker_handle_t **broker_final;
  427.  
  428.         bool is_number(char *message) {
  429.     //  cout<<"the lenght of character is"<<strlen(message)<<endl;
  430.     // return false;
  431.     int itr_index;
  432.             int set_flag = 0;
  433.     for (itr_index = 0; itr_index < strlen(message); itr_index++) {
  434.         // cout<<"the character passed is->"<<message[start_index]<<endl;
  435.         if (message[itr_index] == '.') {
  436.             counter++;
  437.         }
  438.         if ((isdigit(message[itr_index]) || message[itr_index] == '.') && counter <= 1) {
  439.             set_flag = 1;
  440.         } else {
  441.             set_flag = 0;
  442.             return false;
  443.             break;
  444.         }
  445.     }
  446.     if (set_flag == 1) {
  447.  
  448.         return true;
  449.     }
  450.     // return false;
  451. }
  452.  
  453. void mqtt_discon_handlr(int handlr) {
  454.     void* lib_handle;
  455.             lib_handle = dlopen("/home/pom/NetBeansProjects/mqtt_dummy/lib/libmqttpp.so", RTLD_NOW);
  456.     if (!lib_handle) {
  457.         cerr << "Cannot open library: " << dlerror() << '\n';
  458.                 // return 1;
  459.     } else {
  460.         cout << "you have successfully opened  the  library" << endl;
  461.     }
  462.     cout << "the client which is run  now is " << client_packet << endl;
  463.             typedef void(*applctn_disscnt)(mqtt_broker_handle_t *);
  464.             applctn_disscnt app_discon;
  465.             app_discon = (applctn_disscnt) dlsym(lib_handle, "mqtt_disconnect");
  466.     if (strcmp(client_packet, "subscribe") == 0) {
  467.         cout << "you reached here" << endl;
  468.                 typedef int(*applcatn_unsbscrbe)(mqtt_broker_handle_t *, const char*, QoS);
  469.                 applcatn_unsbscrbe app_unsub;
  470.                 app_unsub = (applcatn_unsbscrbe) dlsym(lib_handle, "mqtt_unsubscribe");
  471.                 int result = app_unsub(*broker_final, topic, QoS2);
  472.         if (result != 1) {
  473.             puts("failed to UnSubscribe");
  474.                     // cout<<"the broker clientid is"<<*(broker_final)->clientid<<endl;
  475.                     exit(1);
  476.  
  477.         }
  478.         app_discon(*broker_final);
  479.     } else if (strcmp(client_packet, "publish") == 0) {
  480.  
  481.         app_discon(*broker_final);
  482.     }
  483.     exit(1);
  484. }
  485.  
  486. int main(int argc, char** argv) {
  487.     client_name = argv[3];
  488.             topic = argv[4];
  489.             signal(SIGINT, mqtt_discon_handlr);
  490.             //signal(SIG_DFL, mqtt_discon_handlr);
  491.             client_packet = argv[2];
  492.             void* lib_handle;
  493.             char *error;
  494.             //void(cosine)(void);
  495.  
  496.     if (argc < 5) {
  497.         personalised_usage();
  498.     }
  499.     //thread async_thrd;
  500.     //getopts(argc, argv);
  501.     if (strcmp(argv[1], "personalised_mqtt") == 0) {
  502.         lib_handle = dlopen("/home/pom/NetBeansProjects/mqtt_dummy/lib/libmqttpp.so", RTLD_NOW);
  503.         if (!lib_handle) {
  504.             cerr << "Cannot open library: " << dlerror() << '\n';
  505.             return 1;
  506.         } else {
  507.             cout << "you have successfully opened  the  library" << endl;
  508.         }
  509.         //mqtt_broker_handle_t (*mq_conn)(mqtt_broker_handle_t*);
  510.         // typedef mqtt_broker_handle_t* (*hello_t)();
  511.         // cosine=dlsym(lib_handle,"something");
  512.         //typedef void(*cosine)();
  513.         //cosine some;
  514.         //mqtt_broker_handle_t (*mq_conn)(mqtt_broker_handle_t*);
  515.         // typedef mqtt_broker_handle_t* (*hello_t)();
  516.         //some=(cosine)dlsym(lib_handle,"something");
  517.         // if(!some){
  518.         // cout<<dlerror()<<endl;
  519.         //}else{
  520.         //    cout<<"done finding";
  521.         // }
  522.         typedef mqtt_broker_handle_t * (*applictn_connect)(char const*, char const*, unsigned int, char const*, char const*);
  523.                 applictn_connect app_conn;
  524.                 app_conn = (applictn_connect) dlsym(lib_handle, "mqtt_connect");
  525.         if (!app_conn) {
  526.             cout << dlerror() << endl;
  527.         } else {
  528.             cout << "done finding" << endl;
  529.         }
  530.         // dlerror();
  531.         // error= dlerror();
  532.         //if (error!=NULL) {
  533.         //     cerr << "Cannot load symbol 'mqtt_connect': " << dlerror() <<endl;          
  534.         //  dlclose(lib_handle);
  535.         //     return 1;
  536.         //  }else{
  537.         //   cout<<"linked succesfull"<<endl;
  538.         //  }
  539.         mqtt_broker_handle_t *broker = app_conn(client_name, ip_addr, port, NULL, NULL);
  540.                 broker_final = &broker;
  541.         if (broker == 0) {
  542.             puts("Failed to connect");
  543.                     exit(1);
  544.         } else {
  545.             cout << "you have been successufully  connected  to  the mqtt server" << endl;
  546.         }
  547.         typedef mqtt_timer * (*create_t)();
  548.                 typedef void (*destroy_t)(mqtt_timer*);
  549.                 typedef void (*mqtt_png_req)();
  550.                 create_t creat;
  551.                 destroy_t destroy;
  552.                 creat = (create_t) dlsym(lib_handle, "create");
  553.  
  554.                 destroy = (destroy_t) dlsym(lib_handle, "destroy");
  555.                 mqtt_png_req app_png_req;
  556.                 app_png_req = (mqtt_png_req) dlsym(lib_handle, "mqtt_pin_req");
  557.         if (!creat) {
  558.             cout << "The error is %s" << dlerror();
  559.         }
  560.         if (!destroy) {
  561.             cout << "The error is %s" << dlerror();
  562.         }
  563.         mqtt_timer *timer = creat();
  564.                 // timer->someone();
  565.                 //timer->someone();
  566.         if (!app_png_req) {
  567.             cout << "The error is %s" << dlerror();
  568.         }
  569.         //  destroy(timer);
  570.         if (strcmp(argv[2], "subscribe") == 0) {
  571.             typedef int (*application_subsrcbe)(mqtt_broker_handle_t *, const char*, QoS);
  572.                     application_subsrcbe app_sub;
  573.                     app_sub = (application_subsrcbe) dlsym(lib_handle, "mqtt_subscribe");
  574.                     int result = app_sub(broker, topic, QoS2);
  575.  
  576.             if (result != 1) {
  577.                 puts("failed to Subscribe");
  578.                         exit(1);
  579.             }
  580.             typedef void (*mqtt_disp_msg)(mqtt_broker_handle_t*, int (*)(int));
  581.                     mqtt_disp_msg app_disp_msg;
  582.                     app_disp_msg = (mqtt_disp_msg) dlsym(lib_handle, "mqtt_display_message");
  583.             if (!app_disp_msg) {
  584.                 cout << "The error is %s" << dlerror();
  585.             }
  586.             //   Timer *time = creat();
  587.             //timer->someone();
  588.             while (1) {
  589.                 timer->start(chrono::milliseconds(40000), [&app_png_req] {
  590.                     //  cout << "Hello!" << endl;
  591.                     app_png_req();
  592.                 });
  593.                 app_disp_msg(broker, &putchar);
  594.                         //   this_thread::sleep_for(chrono::seconds(4));
  595.                         //printf("the socket file descriptor is  %d \n",broker->socket);
  596.                         //show_socketfd(broker->socket);
  597.                         //  mqtt_display_message(broker, &putchar);
  598.                         // std::thread start_background(mqtt_pin_req,broker);
  599.                         //mqtt_pin_req(broker);
  600.                         // int i=0;
  601.  
  602.  
  603.                         // start_background.join();
  604.                         // z[i]->join();
  605.                         // delete z[i];
  606.                         //ths.push_back(thread(&mqtt_display_message, broker, &putchar));
  607.  
  608.                         //i++;
  609.                         // t.join();
  610.                         //  the_thread.join();
  611.  
  612.             }
  613.  
  614.         } else if (strcmp(argv[2], "publish") == 0) {
  615.             typedef int(*application_publsh)(mqtt_broker_handle_t *, const char*, const char*, QoS);
  616.                     application_publsh app_pub;
  617.                     app_pub = (application_publsh) dlsym(lib_handle, "mqtt_publish");
  618.                     thread pub_th;
  619.                     pub_th = thread([ = ](){
  620.                 while (1) {
  621.                     timer->start(chrono::milliseconds(20000), [&app_png_req] {
  622.                         //  cout << "Hello!" << endl;
  623.                         app_png_req();
  624.                     });
  625.                     AGAIN :
  626.                             char msg[128];
  627.                             cin.getline(msg, 128);
  628.                             //  string message(msg);
  629.                     if (is_number(msg)) {
  630.                         if (app_pub(broker, topic, msg, QoS0) == -1) {
  631.                             printf("publish failed\n");
  632.                         } else {
  633.                             printf("the message sent is %s \n", msg);
  634.                         }
  635.                     } else {
  636.                         //cout<<"value of counter is "<<counter<<endl;
  637.                         cout << "please enter digit since u have to enter temperature  here" << endl;
  638.                                 goto AGAIN;
  639.                     }
  640.  
  641.                 }
  642.             });
  643.             pub_th.join();
  644.         } else {
  645.             cout << "sorry there has been no match for  the  option  entered" << endl;
  646.                     exit(1);
  647.         }
  648.     } else if (strcmp(argv[1], "paho_mqtt") == 0) {
  649.         // opts.clientid=argv[]
  650.         opts.clientid = client_name;
  651.                 void *paho_lb_handle;
  652.                 // paho_lib_opn(paho_lb_handle);
  653.                 paho_lb_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
  654.                 // if (!paho_lb_handle) {
  655.                 //     cerr << "Cannot open library: " << dlerror() << '\n';
  656.                 // return 1;
  657.                 //   } else {
  658.                 //       cout << "you have successfully opened  the  library" << endl;
  659.                 //   }
  660.                 //  paho_lib_opn(paho_handle);
  661.                 // typedef  void (*paho_asyn_frmsg)(MQTTAsync_message**);
  662.                 // paho_asyn_frmsg app_frmsg;
  663.                 // app_frmsg=(paho_asyn_frmsg)dlsym(paho_lb_handle,"MQTTAsync_freeMessage");
  664.                 //  if(!app_frmsg){
  665.                 //     cout << "The error is " << dlerror();
  666.                 // }
  667.                 typedef int(*paho_asyn_stclbcks)(MQTTAsync, void*, MQTTAsync_connectionLost*, MQTTAsync_messageArrived*, MQTTAsync_deliveryComplete*);
  668.                 paho_asyn_stclbcks app_asstclbcks;
  669.                 // MQTTAsync_set
  670.                 app_asstclbcks = (paho_asyn_stclbcks) dlsym(paho_lb_handle, "MQTTAsync_setCallbacks");
  671.                 typedef int(*paho_asyn_discon)(MQTTAsync, const MQTTAsync_disconnectOptions*);
  672.                 paho_asyn_discon app_asdiscon;
  673.                 app_asdiscon = (paho_asyn_discon) dlsym(paho_lb_handle, "MQTTAsync_disconnect");
  674.                 typedef void(*paho_asyn_dstry)(MQTTAsync*);
  675.                 paho_asyn_dstry app_asdstry;
  676.                 app_asdstry = (paho_asyn_dstry) dlsym(paho_lb_handle, "MQTTAsync_destroy");
  677.                 char url[100];
  678.                 sprintf(url, "%s:%s", opts.host, opts.port);
  679.                 MQTTAsync client;
  680.         if (strcmp(argv[2], "subscribe") == 0) {
  681.             //opts.clientid = client_name;
  682.             int rc = 0;
  683.  
  684.                     MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  685.                     //  url = opts.host + ":" + opts.port;
  686.                     //  strcat(url, opts.host);
  687.                     // strcat(url, ":");
  688.                     //  strcat(url, opts.port);
  689.  
  690.                     typedef int(*paho_asyn_crte)(MQTTAsync*, const char*, const char*, int, void*);
  691.                     paho_asyn_crte app_ascrte;
  692.                     app_ascrte = (paho_asyn_crte) dlsym(paho_lb_handle, "MQTTAsync_create");
  693.  
  694.                     typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*);
  695.                     paho_asyn_conn app_asconn;
  696.                     app_asconn = (paho_asyn_conn) dlsym(paho_lb_handle, "MQTTAsync_connect");
  697.  
  698.  
  699.             if (!app_ascrte) {
  700.                 cout << "The error is " << dlerror();
  701.             }
  702.             if (!app_asstclbcks) {
  703.                 cout << "The error is " << dlerror();
  704.             }
  705.             if (!app_asconn) {
  706.                 cout << "The error is " << dlerror();
  707.             }
  708.             if (!app_asdiscon) {
  709.                 cout << "The error is " << dlerror();
  710.             }
  711.             if (!app_asdstry) {
  712.                 cout << "The error is " << dlerror();
  713.             }
  714.  
  715.             rc = app_ascrte(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
  716.  
  717.                     app_asstclbcks(client, client, connectionLost, messageArrived, NULL);
  718.  
  719.                     signal(SIGINT, cfinish);
  720.                     signal(SIGTERM, cfinish);
  721.  
  722.                     conn_opts.keepAliveInterval = opts.keepalive;
  723.                     conn_opts.cleansession = 1;
  724.                     conn_opts.username = opts.username;
  725.                     conn_opts.password = opts.password;
  726.                     conn_opts.onSuccess = onConnect;
  727.                     conn_opts.onFailure = onConnectFailure;
  728.                     conn_opts.context = client;
  729.             if ((rc = app_asconn(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
  730.                 printf("Failed to start connect, return code %d\n", rc);
  731.                         exit(EXIT_FAILURE);
  732.             } else {
  733.                 cout << "you have successfully connected" << endl;
  734.             }
  735.  
  736.             while (!subscribed)
  737. #if defined(WIN32)
  738.                     Sleep(100);
  739. #else
  740.                     usleep(10000L);
  741. #endif
  742.  
  743.                 if (finished)
  744.                         goto exit;
  745.  
  746.                     while (!finished)
  747. #if defined(WIN32)
  748.                             Sleep(100);
  749. #else
  750.                             usleep(10000L);
  751. #endif
  752.  
  753.                             disc_opts.onSuccess = onDisconnect;
  754.                         if ((rc = app_asdiscon(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
  755.                             printf("Failed to start disconnect, return code %d\n", rc);
  756.                                     exit(EXIT_FAILURE);
  757.                         }
  758.  
  759.             while (!disconnected)
  760. #if defined(WIN32)
  761.                     Sleep(100);
  762. #else
  763.                     usleep(10000L);
  764. #endif
  765.  
  766.                     exit :
  767.                     app_asdstry(&client);
  768.  
  769.                 return EXIT_SUCCESS;
  770.                     exit(1);
  771.             } else if (strcmp(argv[2], "publish") == 0) {
  772.             opts.qos = 0;
  773.                     cout << "qos of  publish message is" << opts.qos << endl;
  774.                     MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
  775.                     MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
  776.                     MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
  777.                     MQTTAsync client;
  778.                     int rc = 0;
  779.                     create_opts.sendWhileDisconnected = 1;
  780.                     typedef int(*paho_asyn_crtwtoptns)(MQTTAsync*, const char*, const char*, int, void*, MQTTAsync_createOptions*);
  781.                     paho_asyn_crtwtoptns app_ascrtwtoptns;
  782.                     app_ascrtwtoptns = (paho_asyn_crtwtoptns) dlsym(paho_lb_handle, "MQTTAsync_createWithOptions");
  783.             if (!app_ascrtwtoptns) {
  784.                 cout << "The error is " << dlerror();
  785.             }
  786.             rc = app_ascrtwtoptns(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
  787.                     signal(SIGINT, cfinish_pub);
  788.                     signal(SIGTERM, cfinish_pub);
  789.                     rc = app_asstclbcks(client, client, connectionLost_pub, messageArrived_pub, NULL);
  790.                     myconnect(&client);
  791.                     buffer = (char*) malloc(opts.maxdatalen);
  792.                     typedef int(*paho_asyn_snd)(MQTTAsync, const char*, int, void*, int, int, MQTTAsync_responseOptions*);
  793.                     paho_asyn_snd app_assnd;
  794.                     app_assnd = (paho_asyn_snd) dlsym(paho_lb_handle, "MQTTAsync_send");
  795.             if (!app_assnd) {
  796.                 cout << "error with " << dlerror() << endl;
  797.             }
  798.             while (!toStop) {
  799.                 int data_len = 0;
  800.                         int delim_len = 0;
  801.                         delim_len = 1;
  802.                 do {
  803.                     buffer[data_len++] = getchar();
  804.                     if (data_len > delim_len) {
  805.                         // cout << "came here" << endl;
  806.                         //printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]);
  807.                         if (strncmp("\n", &buffer[data_len - delim_len], delim_len) == 0)
  808.                             break;
  809.                         }
  810.                     //     if(strcmp(opts.delimiter,&buffer[data_len - delim_len])==0){
  811.                     //      break;
  812.                     //     }
  813.                 } while (data_len < opts.maxdatalen);
  814.  
  815.                     if (opts.verbose)
  816.                             printf("Publishing data of length %d\n", data_len);
  817.                             pub_opts.onSuccess = onPublish;
  818.                             pub_opts.onFailure = onPublishFailure;
  819.  
  820.                         do {
  821.                             rc = app_assnd(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts);
  822.                         } while (rc != MQTTASYNC_SUCCESS);
  823.                         }
  824.             // printf("Stopping\n");
  825.             cout << "Stopping" << endl;
  826.                     free(buffer);
  827.                     disc_opts.onSuccess = onDisconnect_pub;
  828.             if ((rc = app_asdiscon(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
  829.                 printf("Failed to start disconnect, return code %d\n", rc);
  830.                         exit(EXIT_FAILURE);
  831.             }
  832.             while (!disconnected_pub)
  833.                     usleep(10000L);
  834.                     app_asdstry(&client);
  835.  
  836.                 return EXIT_SUCCESS;
  837.             }
  838.     }
  839. }
  840. // create();
  841. // mqtt_broker_handle_t     *broker = fptr(client_name, ip_addr, port,NULL, NULL);
  842. // if (broker == 0) {
  843. //      puts("Failed to connect");
  844. //      exit(1);
  845. //  }else{
  846. //   cout<<"connected  now"<<endl;
  847. //  }
  848. //  dlerror();
  849. //cout<<"your  connection symbol  is"<<<<endl;
  850. // if ((error = dlerror()) != NULL){
  851. //  cerr<<dlerror()<<endl;
  852. //} else {
  853. // cout<<"symbolic locating was sucesfull"<<endl;
  854. //}
  855.  
  856.