#include <string.h>
#include <csignal>
#include "MQTTAsync.h"
#include"MQTTClientPersistence.h"
#include"mqtt/TestTimeout.h"
#include"mqtt/SenseShared.h"
#include"mqtt/Sens_data.h"
#include <thread>
#include <cstdint>
#include "mqtt/mqtt.h"
#include <sys/types.h>
#include <unistd.h>
#include <dlfcn.h>
#include <iostream>
#include <algorithm>
#include <memory>
#include <sys/time.h>
#include <stdlib.h>
#include <stdio.h>
#include <vector>
#include <stdbool.h>
using namespace std;
char *client_name; // -c
//const char *ip_addr = "52.29.27.181"; // -i
const char *ip_addr="192.168.0.102";
uint32_t port = 9876; // -p
//uint32_t port=1883;
//char *username = "ykhbjdvu";
char *username = NULL;
char *password = NULL;
//void* (*create)();
// const char *topic = "mydome/temp/value";
//char *topic;
// const char *topic = "rpi2/temp";
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
volatile int finished = 0;
char* topic = NULL;
char* buffer = NULL;
int subscribed = 0;
int disconnected = 0;
volatile int toStop = 0;
struct {
char* clientid;
int nodelimiter;
char delimiter;
int maxdatalen;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
int showtopics;
int verbose;
int keepalive;
} opts = {
"stdout-subscriber-async", 1, '\n', 100, 2, 0, NULL, NULL, "broker.hivemq.com", "1883", 0, 0, 10
};
void personalised_usage() {
cout << "mqtt client application" << endl;
cout << "Usage::mqtt_applcation <personalised_mqtt/paho_mqtt><subscribe/publish><unique clientname><topicname>" << endl;
cout << "--qos<Quality of Service>(this feature is available for paho_mqtt) default is" << opts.qos << endl;
cout << "--host <hostname> default is " << opts.host << endl;
cout << "--port<port> default is" << opts.port << endl;
//cout << "--username default is" << opts.username << endl;
//cout << "--password default " << opts.password << endl;
cout << "--showtopics (this feature is available for paho_mqtt ;default is on if the topic has a wildcard, else off)<on/off>" << endl;
cout << "--keepalive(this feature is available for paho_mqtt;default is 10 seconds)" << endl;
sleep(10);
exit(EXIT_FAILURE);
}
void cfinish(int sig) {
signal(SIGINT, NULL);
finished = 1;
}
void cfinish_pub(int sig) {
signal(SIGINT, NULL);
toStop = 1;
}
//void paho_lib_opn(void *paho_handle) {
// paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
// if (!paho_handle) {
// cerr << "Cannot open library: " << dlerror() << '\n';
// return 1;
// } else {
// cout << "you have successfully opened the library" << endl;
//}
//}
void getopts(int argc, char** argv) {
int count = 5;
while (count < argc) {
if (strcmp(argv[count], "--qos") == 0) {
if (++count < argc) {
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
} else
personalised_usage();
} else if (strcmp(argv[count], "--host") == 0) {
if (++count < argc) {
if (strcmp(argv[1], "personalised_mqtt") == 0) {
ip_addr = argv[count];
} else if (strcmp(argv[1], "paho_mqtt") == 0) {
opts.host = argv[count];
}
} else
personalised_usage();
} else if (strcmp(argv[count], "--port") == 0) {
if (++count < argc) {
if (strcmp(argv[1], "personalised_mqtt") == 0) {
port = atoi(argv[count]);
} else if (strcmp(argv[1], "paho_mqtt") == 0) {
opts.port = argv[count];
}
} else
personalised_usage();
} else if (strcmp(argv[count], "--username") == 0) {
if (++count < argc) {
if (strcmp(argv[1], "personalised_mqtt") == 0) {
username = argv[count];
} else if (strcmp(argv[1], "paho_mqtt") == 0) {
opts.username = argv[count];
}
} else
personalised_usage();
} else if (strcmp(argv[count], "--password") == 0) {
if (++count < argc) {
if (strcmp(argv[1], "personalised_mqtt") == 0) {
password = argv[count];
} else if (strcmp(argv[1], "paho_mqtt") == 0) {
opts.password = argv[count];
}
} else
personalised_usage();
} else if (strcmp(argv[count], "--showtopics") == 0) {
if (++count < argc) {
if (strcmp(argv[count], "on") == 0)
opts.showtopics = 1;
else if (strcmp(argv[count], "off") == 0)
opts.showtopics = 0;
else
personalised_usage();
} else
personalised_usage();
} else if (strcmp(argv[count], "--keepalive") == 0) {
if (++count < argc)
opts.keepalive = atoi(argv[count]);
else
personalised_usage();
}
count++;
}
}
int messageArrived_pub(void* context, char* topicName, int topicLen, MQTTAsync_message* m) {
/* not expecting any messages */
return 1;
}
static int disconnected_pub = 0;
void onDisconnect_pub(void* context, MQTTAsync_successData* response) {
disconnected_pub = 1;
}
static int connected = 0;
void myconnect(MQTTAsync* client);
void onConnectFailure_pub(void* context, MQTTAsync_failureData* response) {
printf("Connect failed, rc %d\n", response ? response->code : -1);
connected = -1;
MQTTAsync* client = (MQTTAsync*) context;
myconnect(client);
}
void onConnect_pub(void* context, MQTTAsync_successData* response) {
MQTTAsync* client = (MQTTAsync*) context;
int rc;
printf("Connected\n");
connected = 1;
}
void myconnect(MQTTAsync* client) {
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
int rc = 0;
printf("Connecting\n");
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect_pub;
conn_opts.onFailure = onConnectFailure_pub;
conn_opts.context = client;
ssl_opts.enableServerCertAuth = 0;
conn_opts.ssl = &ssl_opts;
conn_opts.automaticReconnect = 1;
connected = 0;
void *paho_handle;
paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*);
paho_asyn_conn app_ascon;
app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect");
if (!app_ascon) {
cout << "eror with" << dlerror() << endl;
}
if ((rc = app_ascon(*client, &conn_opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
static int published = 0;
void onPublishFailure(void* context, MQTTAsync_failureData* response) {
printf("Publish failed, rc %d\n", response ? -1 : response->code);
published = -1;
}
void onPublish(void* context, MQTTAsync_successData* response) {
MQTTAsync client = (MQTTAsync) context;
published = 1;
}
void connectionLost_pub(void *context, char *cause) {
MQTTAsync client = (MQTTAsync) context;
int rc;
void *paho_handle;
paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*);
paho_asyn_conn app_ascon;
app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect");
if (!app_ascon) {
cout << "eror with" << dlerror() << endl;
}
printf("connectionLost called\n");
if ((rc = app_ascon(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start reconnect, return code %d\n", rc);
finished = 1;
}
}
//SenseData *sense_data,**sense_reuse;
bool obj_flag=false;
string stored_msg="";
int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) {
void *personalised_handle;
personalised_handle = dlopen("/home/pom/NetBeansProjects/mqtt_dummy/lib/libmqttpp.so", RTLD_NOW);
if (!personalised_handle) {
cerr << "Cannot open personalised library: " << dlerror() << '\n';
return 1;
} else {
cout << "you have successfully opened your personalised library" << endl;
}
typedef SenseData*(*create_sense_t)();
typedef void (*destroy_sense_t)(SenseData*);
create_sense_t sense_creat=(create_sense_t)dlsym(personalised_handle,"sense_create");
destroy_sense_t sense_destry=(destroy_sense_t)dlsym(personalised_handle,"sense_destroy");
// char *msg=new char[100];
if(!sense_creat){
cout<<"error with"<<dlerror()<<endl;
}
if(!sense_destry){
cout<<"error with"<<dlerror()<<endl;
}
SenseData *sense_data=sense_creat();
if (opts.showtopics)
printf("%s\t", topicName);
if (opts.nodelimiter) {
char *msg_buffer=new char[100];
printf("%.*s \n", message->payloadlen, (char*) message->payload);
msg_buffer=(char*) message->payload;
// temp_sense.check_dup_temp(message);
// temp_sense.show_unq_temp();
// if(obj_flag==true){
// cout<<"value of the store_temp after initialisation is "<<sense_data->store_temp;
// sense_data->check_dup_temp(msg);
// sense_data->show_unq_temp();
// sense_data->store_temp=msg;
// sense_reuse=&sense_data;
// cout<<"value of the store_temp after initialisation is "<<(*sense_reuse)->store_temp;
// (*sense_reuse)->store_temp=msg_buffer;
// delete(*sense_reuse);
// }else{
// sense_data=sense_creat();
// obj_flag=true;
if(stored_msg!=""){
cout<<"the stored msg is "<<stored_msg<<endl;
sense_data->store_temp=(char*)(stored_msg.c_str());
}
sense_data->check_dup_temp(msg_buffer);
sense_data->show_unq_temp();
stored_msg=msg_buffer;
sense_data->store_temp=msg_buffer;
cout<<"the buffer is"<<sense_data->store_temp<<endl;
}
// sense_data->someone();
// cout<<"the message printed here"<<endl;
// cout<<"the buffer is"<<msg<<endl;
else{
printf("%.*s%c", message->payloadlen, (char*) message->payload, opts.delimiter);
}
fflush(stdout);
void *paho_handle;
paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
if (!paho_handle) {
cerr << "Cannot open library: " << dlerror() << '\n';
// return 1;
} else {
cout << "you have successfully opened the library" << endl;
}
typedef void (*paho_asyn_frmsg)(MQTTAsync_message**);
paho_asyn_frmsg app_frmsg;
app_frmsg = (paho_asyn_frmsg) dlsym(paho_handle, "MQTTAsync_freeMessage");
if (!app_frmsg) {
cout << "The error is " << dlerror();
}
typedef void(*paho_asyn_fr)(void*);
paho_asyn_fr app_fr;
app_fr = (paho_asyn_fr) dlsym(paho_handle, "MQTTAsync_free");
if (!app_fr) {
cout << "The error is " << dlerror();
}
app_frmsg(&message);
app_fr(topicName);
//MQTTAsync_freeMessage(&message);
//MQTTAsync_free(topicName);
return 1;
}
void onDisconnect(void* context, MQTTAsync_successData* response) {
disconnected = 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response) {
subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response) {
printf("Subscribe failed, rc %d\n", response->code);
finished = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response) {
printf("Connect failed, rc %d\n", response->code);
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response) {
MQTTAsync client = (MQTTAsync) context;
MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
void *paho_handle;
paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
typedef int(*paho_asyn_subscrb)(MQTTAsync, const char*, int, MQTTAsync_responseOptions*);
paho_asyn_subscrb app_assub;
app_assub = (paho_asyn_subscrb) dlsym(paho_handle, "MQTTAsync_subscribe");
if (!app_assub) {
cout << "The error is " << dlerror();
}
if (opts.showtopics)
printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos);
ropts.onSuccess = onSubscribe;
ropts.onFailure = onSubscribeFailure;
ropts.context = client;
if ((rc = app_assub(client, topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start subscribe, return code %d\n", rc);
finished = 1;
}
}
void connectionLost(void *context, char *cause) {
MQTTAsync client = (MQTTAsync) context;
int rc;
void *paho_handle;
paho_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*);
paho_asyn_conn app_ascon;
app_ascon = (paho_asyn_conn) dlsym(paho_handle, "MQTTAsync_connect");
printf("connectionLost called\n");
if ((rc = app_ascon(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start reconnect, return code %d\n", rc);
finished = 1;
}
}
char *client_packet;
int counter = 0;
mqtt_broker_handle_t **broker_final;
bool is_number(char *message) {
// cout<<"the lenght of character is"<<strlen(message)<<endl;
// return false;
int itr_index;
int set_flag = 0;
for (itr_index = 0; itr_index < strlen(message); itr_index++) {
// cout<<"the character passed is->"<<message[start_index]<<endl;
if (message[itr_index] == '.') {
counter++;
}
if ((isdigit(message[itr_index]) || message[itr_index] == '.') && counter <= 1) {
set_flag = 1;
} else {
set_flag = 0;
return false;
break;
}
}
if (set_flag == 1) {
return true;
}
// return false;
}
void mqtt_discon_handlr(int handlr) {
void* lib_handle;
lib_handle = dlopen("/home/pom/NetBeansProjects/mqtt_dummy/lib/libmqttpp.so", RTLD_NOW);
if (!lib_handle) {
cerr << "Cannot open library: " << dlerror() << '\n';
// return 1;
} else {
cout << "you have successfully opened the library" << endl;
}
cout << "the client which is run now is " << client_packet << endl;
typedef void(*applctn_disscnt)(mqtt_broker_handle_t *);
applctn_disscnt app_discon;
app_discon = (applctn_disscnt) dlsym(lib_handle, "mqtt_disconnect");
if (strcmp(client_packet, "subscribe") == 0) {
cout << "you reached here" << endl;
typedef int(*applcatn_unsbscrbe)(mqtt_broker_handle_t *, const char*, QoS);
applcatn_unsbscrbe app_unsub;
app_unsub = (applcatn_unsbscrbe) dlsym(lib_handle, "mqtt_unsubscribe");
int result = app_unsub(*broker_final, topic, QoS2);
if (result != 1) {
puts("failed to UnSubscribe");
// cout<<"the broker clientid is"<<*(broker_final)->clientid<<endl;
exit(1);
}
app_discon(*broker_final);
} else if (strcmp(client_packet, "publish") == 0) {
app_discon(*broker_final);
}
exit(1);
}
int main(int argc, char** argv) {
client_name = argv[3];
topic = argv[4];
signal(SIGINT, mqtt_discon_handlr);
//signal(SIG_DFL, mqtt_discon_handlr);
client_packet = argv[2];
void* lib_handle;
char *error;
//void(cosine)(void);
if (argc < 5) {
personalised_usage();
}
//thread async_thrd;
//getopts(argc, argv);
if (strcmp(argv[1], "personalised_mqtt") == 0) {
lib_handle = dlopen("/home/pom/NetBeansProjects/mqtt_dummy/lib/libmqttpp.so", RTLD_NOW);
if (!lib_handle) {
cerr << "Cannot open library: " << dlerror() << '\n';
return 1;
} else {
cout << "you have successfully opened the library" << endl;
}
//mqtt_broker_handle_t (*mq_conn)(mqtt_broker_handle_t*);
// typedef mqtt_broker_handle_t* (*hello_t)();
// cosine=dlsym(lib_handle,"something");
//typedef void(*cosine)();
//cosine some;
//mqtt_broker_handle_t (*mq_conn)(mqtt_broker_handle_t*);
// typedef mqtt_broker_handle_t* (*hello_t)();
//some=(cosine)dlsym(lib_handle,"something");
// if(!some){
// cout<<dlerror()<<endl;
//}else{
// cout<<"done finding";
// }
typedef mqtt_broker_handle_t * (*applictn_connect)(char const*, char const*, unsigned int, char const*, char const*);
applictn_connect app_conn;
app_conn = (applictn_connect) dlsym(lib_handle, "mqtt_connect");
if (!app_conn) {
cout << dlerror() << endl;
} else {
cout << "done finding" << endl;
}
// dlerror();
// error= dlerror();
//if (error!=NULL) {
// cerr << "Cannot load symbol 'mqtt_connect': " << dlerror() <<endl;
// dlclose(lib_handle);
// return 1;
// }else{
// cout<<"linked succesfull"<<endl;
// }
mqtt_broker_handle_t *broker = app_conn(client_name, ip_addr, port, NULL, NULL);
broker_final = &broker;
if (broker == 0) {
puts("Failed to connect");
exit(1);
} else {
cout << "you have been successufully connected to the mqtt server" << endl;
}
typedef mqtt_timer * (*create_t)();
typedef void (*destroy_t)(mqtt_timer*);
typedef void (*mqtt_png_req)();
create_t creat;
destroy_t destroy;
creat = (create_t) dlsym(lib_handle, "create");
destroy = (destroy_t) dlsym(lib_handle, "destroy");
mqtt_png_req app_png_req;
app_png_req = (mqtt_png_req) dlsym(lib_handle, "mqtt_pin_req");
if (!creat) {
cout << "The error is %s" << dlerror();
}
if (!destroy) {
cout << "The error is %s" << dlerror();
}
mqtt_timer *timer = creat();
// timer->someone();
//timer->someone();
if (!app_png_req) {
cout << "The error is %s" << dlerror();
}
// destroy(timer);
if (strcmp(argv[2], "subscribe") == 0) {
typedef int (*application_subsrcbe)(mqtt_broker_handle_t *, const char*, QoS);
application_subsrcbe app_sub;
app_sub = (application_subsrcbe) dlsym(lib_handle, "mqtt_subscribe");
int result = app_sub(broker, topic, QoS2);
if (result != 1) {
puts("failed to Subscribe");
exit(1);
}
typedef void (*mqtt_disp_msg)(mqtt_broker_handle_t*, int (*)(int));
mqtt_disp_msg app_disp_msg;
app_disp_msg = (mqtt_disp_msg) dlsym(lib_handle, "mqtt_display_message");
if (!app_disp_msg) {
cout << "The error is %s" << dlerror();
}
// Timer *time = creat();
//timer->someone();
while (1) {
timer->start(chrono::milliseconds(40000), [&app_png_req] {
// cout << "Hello!" << endl;
app_png_req();
});
app_disp_msg(broker, &putchar);
// this_thread::sleep_for(chrono::seconds(4));
//printf("the socket file descriptor is %d \n",broker->socket);
//show_socketfd(broker->socket);
// mqtt_display_message(broker, &putchar);
// std::thread start_background(mqtt_pin_req,broker);
//mqtt_pin_req(broker);
// int i=0;
// start_background.join();
// z[i]->join();
// delete z[i];
//ths.push_back(thread(&mqtt_display_message, broker, &putchar));
//i++;
// t.join();
// the_thread.join();
}
} else if (strcmp(argv[2], "publish") == 0) {
typedef int(*application_publsh)(mqtt_broker_handle_t *, const char*, const char*, QoS);
application_publsh app_pub;
app_pub = (application_publsh) dlsym(lib_handle, "mqtt_publish");
thread pub_th;
pub_th = thread([ = ](){
while (1) {
timer->start(chrono::milliseconds(20000), [&app_png_req] {
// cout << "Hello!" << endl;
app_png_req();
});
AGAIN :
char msg[128];
cin.getline(msg, 128);
// string message(msg);
if (is_number(msg)) {
if (app_pub(broker, topic, msg, QoS0) == -1) {
printf("publish failed\n");
} else {
printf("the message sent is %s \n", msg);
}
} else {
//cout<<"value of counter is "<<counter<<endl;
cout << "please enter digit since u have to enter temperature here" << endl;
goto AGAIN;
}
}
});
pub_th.join();
} else {
cout << "sorry there has been no match for the option entered" << endl;
exit(1);
}
} else if (strcmp(argv[1], "paho_mqtt") == 0) {
// opts.clientid=argv[]
opts.clientid = client_name;
void *paho_lb_handle;
// paho_lib_opn(paho_lb_handle);
paho_lb_handle = dlopen("libpaho-mqtt3a.so", RTLD_NOW | RTLD_LAZY);
// if (!paho_lb_handle) {
// cerr << "Cannot open library: " << dlerror() << '\n';
// return 1;
// } else {
// cout << "you have successfully opened the library" << endl;
// }
// paho_lib_opn(paho_handle);
// typedef void (*paho_asyn_frmsg)(MQTTAsync_message**);
// paho_asyn_frmsg app_frmsg;
// app_frmsg=(paho_asyn_frmsg)dlsym(paho_lb_handle,"MQTTAsync_freeMessage");
// if(!app_frmsg){
// cout << "The error is " << dlerror();
// }
typedef int(*paho_asyn_stclbcks)(MQTTAsync, void*, MQTTAsync_connectionLost*, MQTTAsync_messageArrived*, MQTTAsync_deliveryComplete*);
paho_asyn_stclbcks app_asstclbcks;
// MQTTAsync_set
app_asstclbcks = (paho_asyn_stclbcks) dlsym(paho_lb_handle, "MQTTAsync_setCallbacks");
typedef int(*paho_asyn_discon)(MQTTAsync, const MQTTAsync_disconnectOptions*);
paho_asyn_discon app_asdiscon;
app_asdiscon = (paho_asyn_discon) dlsym(paho_lb_handle, "MQTTAsync_disconnect");
typedef void(*paho_asyn_dstry)(MQTTAsync*);
paho_asyn_dstry app_asdstry;
app_asdstry = (paho_asyn_dstry) dlsym(paho_lb_handle, "MQTTAsync_destroy");
char url[100];
sprintf(url, "%s:%s", opts.host, opts.port);
MQTTAsync client;
if (strcmp(argv[2], "subscribe") == 0) {
//opts.clientid = client_name;
int rc = 0;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
// url = opts.host + ":" + opts.port;
// strcat(url, opts.host);
// strcat(url, ":");
// strcat(url, opts.port);
typedef int(*paho_asyn_crte)(MQTTAsync*, const char*, const char*, int, void*);
paho_asyn_crte app_ascrte;
app_ascrte = (paho_asyn_crte) dlsym(paho_lb_handle, "MQTTAsync_create");
typedef int(*paho_asyn_conn)(MQTTAsync, const MQTTAsync_connectOptions*);
paho_asyn_conn app_asconn;
app_asconn = (paho_asyn_conn) dlsym(paho_lb_handle, "MQTTAsync_connect");
if (!app_ascrte) {
cout << "The error is " << dlerror();
}
if (!app_asstclbcks) {
cout << "The error is " << dlerror();
}
if (!app_asconn) {
cout << "The error is " << dlerror();
}
if (!app_asdiscon) {
cout << "The error is " << dlerror();
}
if (!app_asdstry) {
cout << "The error is " << dlerror();
}
rc = app_ascrte(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL);
app_asstclbcks(client, client, connectionLost, messageArrived, NULL);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = app_asconn(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
} else {
cout << "you have successfully connected" << endl;
}
while (!subscribed)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (finished)
goto exit;
while (!finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
disc_opts.onSuccess = onDisconnect;
if ((rc = app_asdiscon(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!disconnected)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
exit :
app_asdstry(&client);
return EXIT_SUCCESS;
exit(1);
} else if (strcmp(argv[2], "publish") == 0) {
opts.qos = 0;
cout << "qos of publish message is" << opts.qos << endl;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync client;
int rc = 0;
create_opts.sendWhileDisconnected = 1;
typedef int(*paho_asyn_crtwtoptns)(MQTTAsync*, const char*, const char*, int, void*, MQTTAsync_createOptions*);
paho_asyn_crtwtoptns app_ascrtwtoptns;
app_ascrtwtoptns = (paho_asyn_crtwtoptns) dlsym(paho_lb_handle, "MQTTAsync_createWithOptions");
if (!app_ascrtwtoptns) {
cout << "The error is " << dlerror();
}
rc = app_ascrtwtoptns(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
signal(SIGINT, cfinish_pub);
signal(SIGTERM, cfinish_pub);
rc = app_asstclbcks(client, client, connectionLost_pub, messageArrived_pub, NULL);
myconnect(&client);
buffer = (char*) malloc(opts.maxdatalen);
typedef int(*paho_asyn_snd)(MQTTAsync, const char*, int, void*, int, int, MQTTAsync_responseOptions*);
paho_asyn_snd app_assnd;
app_assnd = (paho_asyn_snd) dlsym(paho_lb_handle, "MQTTAsync_send");
if (!app_assnd) {
cout << "error with " << dlerror() << endl;
}
while (!toStop) {
int data_len = 0;
int delim_len = 0;
delim_len = 1;
do {
buffer[data_len++] = getchar();
if (data_len > delim_len) {
// cout << "came here" << endl;
//printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]);
if (strncmp("\n", &buffer[data_len - delim_len], delim_len) == 0)
break;
}
// if(strcmp(opts.delimiter,&buffer[data_len - delim_len])==0){
// break;
// }
} while (data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
pub_opts.onSuccess = onPublish;
pub_opts.onFailure = onPublishFailure;
do {
rc = app_assnd(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts);
} while (rc != MQTTASYNC_SUCCESS);
}
// printf("Stopping\n");
cout << "Stopping" << endl;
free(buffer);
disc_opts.onSuccess = onDisconnect_pub;
if ((rc = app_asdiscon(client, &disc_opts)) != MQTTASYNC_SUCCESS) {
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!disconnected_pub)
usleep(10000L);
app_asdstry(&client);
return EXIT_SUCCESS;
}
}
}
// create();
// mqtt_broker_handle_t *broker = fptr(client_name, ip_addr, port,NULL, NULL);
// if (broker == 0) {
// puts("Failed to connect");
// exit(1);
// }else{
// cout<<"connected now"<<endl;
// }
// dlerror();
//cout<<"your connection symbol is"<<<<endl;
// if ((error = dlerror()) != NULL){
// cerr<<dlerror()<<endl;
//} else {
// cout<<"symbolic locating was sucesfull"<<endl;
//}