#include #include #include "MQTTAsync.h" #include"MQTTClientPersistence.h" #include"mqtt/TestTimeout.h" #include"mqtt/SenseShared.h" #include"mqtt/Sens_data.h" #include #include #include "mqtt/mqtt.h" #include #include #include #include #include #include #include #include #include #include #include using namespace std; char *client_name; // -c //const char *ip_addr = "52.29.27.181"; // -i const char *ip_addr="192.168.0.102"; uint32_t port = 9876; // -p //uint32_t port=1883; //char *username = "ykhbjdvu"; char *username = NULL; char *password = NULL; //void* (*create)(); // const char *topic = "mydome/temp/value"; //char *topic; // const char *topic = "rpi2/temp"; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; volatile int finished = 0; char* topic = NULL; char* buffer = NULL; int subscribed = 0; int disconnected = 0; volatile int toStop = 0; struct { char* clientid; int nodelimiter; char delimiter; int maxdatalen; int qos; int retained; char* username; char* password; char* host; char* port; int showtopics; int verbose; int keepalive; } opts = { "stdout-subscriber-async", 1, '\n', 100, 2, 0, NULL, NULL, "broker.hivemq.com", "1883", 0, 0, 10 }; void personalised_usage() { cout << "mqtt client application" << endl; cout << "Usage::mqtt_applcation " << endl; cout << "--qos(this feature is available for paho_mqtt) default is" << opts.qos << endl; cout << "--host default is " << opts.host << endl; cout << "--port default is" << opts.port << endl; //cout << "--username default is" << opts.username << endl; //cout << "--password default " << opts.password << endl; cout << "--showtopics (this feature is available for paho_mqtt ;default is on if the topic has a wildcard, else off)" << endl; cout << "--keepalive(this feature is available for paho_mqtt;default is 10 seconds)" << endl; sleep(10); exit(EXIT_FAILURE); } void cfinish(int sig) { signal(SIGINT, NULL); finished = 1; } void cfinish_pub(int sig) { signal(SIGINT, NULL); toStop = 1; } //void paho_lib_opn(void *paho_handle) { // paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY); // if (!paho_handle) { // cerr << "Cannot open library: " << dlerror() << '\n'; // return 1; // } else { // cout << "you have successfully opened the library" << endl; //} //} void getopts(int argc, char** argv) { int count = 5; while (count < argc) { if (strcmp(argv[count], "--qos") == 0) { if (++count < argc) { if (strcmp(argv[count], "0") == 0) opts.qos = 0; else if (strcmp(argv[count], "1") == 0) opts.qos = 1; else if (strcmp(argv[count], "2") == 0) opts.qos = 2; } else personalised_usage(); } else if (strcmp(argv[count], "--host") == 0) { if (++count < argc) { if (strcmp(argv[1], "personalised_mqtt") == 0) { ip_addr = argv[count]; } else if (strcmp(argv[1], "paho_mqtt") == 0) { opts.host = argv[count]; } } else personalised_usage(); } else if (strcmp(argv[count], "--port") == 0) { if (++count < argc) { if (strcmp(argv[1], "personalised_mqtt") == 0) { port = atoi(argv[count]); } else if (strcmp(argv[1], "paho_mqtt") == 0) { opts.port = argv[count]; } } else personalised_usage(); } else if (strcmp(argv[count], "--username") == 0) { if (++count < argc) { if (strcmp(argv[1], "personalised_mqtt") == 0) { username = argv[count]; } else if (strcmp(argv[1], "paho_mqtt") == 0) { opts.username = argv[count]; } } else personalised_usage(); } else if (strcmp(argv[count], "--password") == 0) { if (++count < argc) { if (strcmp(argv[1], "personalised_mqtt") == 0) { password = argv[count]; } else if (strcmp(argv[1], "paho_mqtt") == 0) { opts.password = argv[count]; } } else personalised_usage(); } else if (strcmp(argv[count], "--showtopics") == 0) { if (++count < argc) { if (strcmp(argv[count], "on") == 0) opts.showtopics = 1; else if (strcmp(argv[count], "off") == 0) opts.showtopics = 0; else personalised_usage(); } else personalised_usage(); } else if (strcmp(argv[count], "--keepalive") == 0) { if (++count < argc) opts.keepalive = atoi(argv[count]); else personalised_usage(); } count++; } } int messageArrived_pub(void* context, char* topicName, int topicLen, MQTTAsync_message* m) { /* not expecting any messages */ return 1; } static int disconnected_pub = 0; void onDisconnect_pub(void* context, MQTTAsync_successData* response) { disconnected_pub = 1; } static int connected = 0; void myconnect(MQTTAsync* client); void onConnectFailure_pub(void* context, MQTTAsync_failureData* response) { printf("Connect failed, rc %d\n", response ? response->code : -1); connected = -1; MQTTAsync* client = (MQTTAsync*) context; myconnect(client); } void onConnect_pub(void* context, MQTTAsync_successData* response) { MQTTAsync* client = (MQTTAsync*) context; int rc; printf("Connected\n"); connected = 1; } void myconnect(MQTTAsync* client) { MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; int rc = 0; printf("Connecting\n"); conn_opts.keepAliveInterval = opts.keepalive; conn_opts.cleansession = 1; conn_opts.username = opts.username; conn_opts.password = opts.password; conn_opts.onSuccess = onConnect_pub; conn_opts.onFailure = onConnectFailure_pub; conn_opts.context = client; ssl_opts.enableServerCertAuth = 0; conn_opts.ssl = &ssl_opts; conn_opts.automaticReconnect = 1; connected = 0; void *paho_handle; paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY); typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*); paho_asyn_conn app_ascon; app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect"); if (!app_ascon) { cout << "eror with" << dlerror() << endl; } if ((rc = app_ascon(*client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", rc); exit(EXIT_FAILURE); } } static int published = 0; void onPublishFailure(void* context, MQTTAsync_failureData* response) { printf("Publish failed, rc %d\n", response ? -1 : response->code); published = -1; } void onPublish(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync) context; published = 1; } void connectionLost_pub(void *context, char *cause) { MQTTAsync client = (MQTTAsync) context; int rc; void *paho_handle; paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY); typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*); paho_asyn_conn app_ascon; app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect"); if (!app_ascon) { cout << "eror with" << dlerror() << endl; } printf("connectionLost called\n"); if ((rc = app_ascon(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start reconnect, return code %d\n", rc); finished = 1; } } //SenseData *sense_data,**sense_reuse; bool obj_flag=false; string stored_msg=""; int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { void *personalised_handle; personalised_handle = dlopen("/home/pom/NetBeansProjects/mqtt_dummy/lib/libmqttpp.so", RTLD_NOW); if (!personalised_handle) { cerr << "Cannot open personalised library: " << dlerror() << '\n'; return 1; } else { cout << "you have successfully opened your personalised library" << endl; } typedef SenseData*(*create_sense_t)(); typedef void (*destroy_sense_t)(SenseData*); create_sense_t sense_creat=(create_sense_t)dlsym(personalised_handle,"sense_create"); destroy_sense_t sense_destry=(destroy_sense_t)dlsym(personalised_handle,"sense_destroy"); // char *msg=new char[100]; if(!sense_creat){ cout<<"error with"<payloadlen, (char*) message->payload); msg_buffer=(char*) message->payload; // temp_sense.check_dup_temp(message); // temp_sense.show_unq_temp(); // if(obj_flag==true){ // cout<<"value of the store_temp after initialisation is "<store_temp; // sense_data->check_dup_temp(msg); // sense_data->show_unq_temp(); // sense_data->store_temp=msg; // sense_reuse=&sense_data; // cout<<"value of the store_temp after initialisation is "<<(*sense_reuse)->store_temp; // (*sense_reuse)->store_temp=msg_buffer; // delete(*sense_reuse); // }else{ // sense_data=sense_creat(); // obj_flag=true; if(stored_msg!=""){ cout<<"the stored msg is "<store_temp=(char*)(stored_msg.c_str()); } sense_data->check_dup_temp(msg_buffer); sense_data->show_unq_temp(); stored_msg=msg_buffer; sense_data->store_temp=msg_buffer; cout<<"the buffer is"<store_temp<someone(); // cout<<"the message printed here"<payloadlen, (char*) message->payload, opts.delimiter); } fflush(stdout); void *paho_handle; paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY); if (!paho_handle) { cerr << "Cannot open library: " << dlerror() << '\n'; // return 1; } else { cout << "you have successfully opened the library" << endl; } typedef void (*paho_asyn_frmsg)(MQTTAsync_message**); paho_asyn_frmsg app_frmsg; app_frmsg = (paho_asyn_frmsg) dlsym(paho_handle, "MQTTAsync_freeMessage"); if (!app_frmsg) { cout << "The error is " << dlerror(); } typedef void(*paho_asyn_fr)(void*); paho_asyn_fr app_fr; app_fr = (paho_asyn_fr) dlsym(paho_handle, "MQTTAsync_free"); if (!app_fr) { cout << "The error is " << dlerror(); } app_frmsg(&message); app_fr(topicName); //MQTTAsync_freeMessage(&message); //MQTTAsync_free(topicName); return 1; } void onDisconnect(void* context, MQTTAsync_successData* response) { disconnected = 1; } void onSubscribe(void* context, MQTTAsync_successData* response) { subscribed = 1; } void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { printf("Subscribe failed, rc %d\n", response->code); finished = 1; } void onConnectFailure(void* context, MQTTAsync_failureData* response) { printf("Connect failed, rc %d\n", response->code); finished = 1; } void onConnect(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync) context; MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; int rc; void *paho_handle; paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY); typedef int(*paho_asyn_subscrb)(MQTTAsync, const char*, int, MQTTAsync_responseOptions*); paho_asyn_subscrb app_assub; app_assub = (paho_asyn_subscrb) dlsym(paho_handle, "MQTTAsync_subscribe"); if (!app_assub) { cout << "The error is " << dlerror(); } if (opts.showtopics) printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos); ropts.onSuccess = onSubscribe; ropts.onFailure = onSubscribeFailure; ropts.context = client; if ((rc = app_assub(client, topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS) { printf("Failed to start subscribe, return code %d\n", rc); finished = 1; } } void connectionLost(void *context, char *cause) { MQTTAsync client = (MQTTAsync) context; int rc; void *paho_handle; paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY); typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*); paho_asyn_conn app_ascon; app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect"); printf("connectionLost called\n"); if ((rc = app_ascon(client, &conn_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start reconnect, return code %d\n", rc); finished = 1; } } char *client_packet; int counter = 0; mqtt_broker_handle_t **broker_final; bool is_number(char *message) { // cout<<"the lenght of character is"<"<clientid<someone(); //timer->someone(); if (!app_png_req) { cout << "The error is %s" << dlerror(); } // destroy(timer); if (strcmp(argv[2], "subscribe") == 0) { typedef int (*application_subsrcbe)(mqtt_broker_handle_t *, const char*, QoS); application_subsrcbe app_sub; app_sub = (application_subsrcbe) dlsym(lib_handle, "mqtt_subscribe"); int result = app_sub(broker, topic, QoS2); if (result != 1) { puts("failed to Subscribe"); exit(1); } typedef void (*mqtt_disp_msg)(mqtt_broker_handle_t*, int (*)(int)); mqtt_disp_msg app_disp_msg; app_disp_msg = (mqtt_disp_msg) dlsym(lib_handle, "mqtt_display_message"); if (!app_disp_msg) { cout << "The error is %s" << dlerror(); } // Timer *time = creat(); //timer->someone(); while (1) { timer->start(chrono::milliseconds(40000), [&app_png_req] { // cout << "Hello!" << endl; app_png_req(); }); app_disp_msg(broker, &putchar); // this_thread::sleep_for(chrono::seconds(4)); //printf("the socket file descriptor is %d \n",broker->socket); //show_socketfd(broker->socket); // mqtt_display_message(broker, &putchar); // std::thread start_background(mqtt_pin_req,broker); //mqtt_pin_req(broker); // int i=0; // start_background.join(); // z[i]->join(); // delete z[i]; //ths.push_back(thread(&mqtt_display_message, broker, &putchar)); //i++; // t.join(); // the_thread.join(); } } else if (strcmp(argv[2], "publish") == 0) { typedef int(*application_publsh)(mqtt_broker_handle_t *, const char*, const char*, QoS); application_publsh app_pub; app_pub = (application_publsh) dlsym(lib_handle, "mqtt_publish"); thread pub_th; pub_th = thread([ = ](){ while (1) { timer->start(chrono::milliseconds(20000), [&app_png_req] { // cout << "Hello!" << endl; app_png_req(); }); AGAIN : char msg[128]; cin.getline(msg, 128); // string message(msg); if (is_number(msg)) { if (app_pub(broker, topic, msg, QoS0) == -1) { printf("publish failed\n"); } else { printf("the message sent is %s \n", msg); } } else { //cout<<"value of counter is "< delim_len) { // cout << "came here" << endl; //printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]); if (strncmp("\n", &buffer[data_len - delim_len], delim_len) == 0) break; } // if(strcmp(opts.delimiter,&buffer[data_len - delim_len])==0){ // break; // } } while (data_len < opts.maxdatalen); if (opts.verbose) printf("Publishing data of length %d\n", data_len); pub_opts.onSuccess = onPublish; pub_opts.onFailure = onPublishFailure; do { rc = app_assnd(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts); } while (rc != MQTTASYNC_SUCCESS); } // printf("Stopping\n"); cout << "Stopping" << endl; free(buffer); disc_opts.onSuccess = onDisconnect_pub; if ((rc = app_asdiscon(client, &disc_opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start disconnect, return code %d\n", rc); exit(EXIT_FAILURE); } while (!disconnected_pub) usleep(10000L); app_asdstry(&client); return EXIT_SUCCESS; } } } // create(); // mqtt_broker_handle_t *broker = fptr(client_name, ip_addr, port,NULL, NULL); // if (broker == 0) { // puts("Failed to connect"); // exit(1); // }else{ // cout<<"connected now"<