Commit 1e3d3ab0 by Adi Amir

support rabbit client

parent 4d3fbac4
...@@ -21,6 +21,11 @@ ...@@ -21,6 +21,11 @@
}, },
{ {
"directory": "/home/adi/git/ipgallery/common/cpp/Rabbitmq", "directory": "/home/adi/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -DRabbitmq_EXPORTS -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -fPIC -I/home/adi/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/adi/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/Rabbitmq.dir/src/RMQ_Client.cpp.o -c /home/adi/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_Client.cpp",
"file": "/home/adi/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_Client.cpp"
},
{
"directory": "/home/adi/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -I/home/adi/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/adi/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/test_RabbitmqRecv.dir/test/RabbitMQRecv.cpp.o -c /home/adi/git/ipgallery/common/cpp/Rabbitmq/test/RabbitMQRecv.cpp", "command": "/usr/bin/g++ -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -I/home/adi/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/adi/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/test_RabbitmqRecv.dir/test/RabbitMQRecv.cpp.o -c /home/adi/git/ipgallery/common/cpp/Rabbitmq/test/RabbitMQRecv.cpp",
"file": "/home/adi/git/ipgallery/common/cpp/Rabbitmq/test/RabbitMQRecv.cpp" "file": "/home/adi/git/ipgallery/common/cpp/Rabbitmq/test/RabbitMQRecv.cpp"
}, },
......
...@@ -11,11 +11,11 @@ ...@@ -11,11 +11,11 @@
#include "RMQ_Channel.h" #include "RMQ_Channel.h"
#include "RMQ_Message.h" #include "RMQ_Message.h"
#include "RMQ_Utils.h" #include "RMQ_Utils.h"
#include "RMQ_MessageRest.h" #include "RMQ_Message.h"
cRMQ_Result cRMQ_Channel::OpenChannel(const char* pba_Host, int i_Port) cRMQ_Result cRMQ_Channel::OpenChannel(const char* pba_Host, int i_Port)
{ {
mpc_RestParser = new cRMQ_RestParser(E_MESSAGE_DATA_JSON); mpc_RestParser = new cRMQ_RestParser();
m_amqpConnection = amqp_new_connection(); m_amqpConnection = amqp_new_connection();
if (!m_amqpConnection) if (!m_amqpConnection)
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_new_connection() failed in cRMQ_Channel::OpenChannel"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_new_connection() failed in cRMQ_Channel::OpenChannel");
...@@ -41,55 +41,55 @@ cRMQ_Result cRMQ_Channel::OpenChannel(const char* pba_Host, int i_Port) ...@@ -41,55 +41,55 @@ cRMQ_Result cRMQ_Channel::OpenChannel(const char* pba_Host, int i_Port)
cRMQ_Result cRMQ_Channel::CloseChannel() cRMQ_Result cRMQ_Channel::CloseChannel()
{ {
cRMQ_Result rmqResult(cRMQ_ResultCode::RMQ_SUCCESS); cRMQ_Result rmqResult(cRMQ_ResultCode::RMQ_SUCCESS);
amqp_channel_close(m_amqpConnection, 1, AMQP_REPLY_SUCCESS); amqp_channel_close(m_amqpConnection, 1, AMQP_REPLY_SUCCESS);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
amqp_connection_close(m_amqpConnection, AMQP_REPLY_SUCCESS); amqp_connection_close(m_amqpConnection, AMQP_REPLY_SUCCESS);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
amqp_destroy_connection(m_amqpConnection); amqp_destroy_connection(m_amqpConnection);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS); return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS);
} }
cRMQ_Result cRMQ_Channel::OpenQueue(const char* pba_Exchange, const char* bindingkey) { cRMQ_Result cRMQ_Channel::OpenQueue(const char* pba_Exchange, const char* bindingkey) {
if (mb_ChannelOpenedSuccessfully == false) if (mb_ChannelOpenedSuccessfully == false)
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_CHANNEL_NOT_OPENED, "cRMQ_Channel::OpenQueue() failed"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_CHANNEL_NOT_OPENED, "cRMQ_Channel::OpenQueue() failed");
// declare queue // declare queue
amqp_queue_declare_ok_t *r = amqp_queue_declare(m_amqpConnection, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_queue_declare_ok_t *r = amqp_queue_declare(m_amqpConnection, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table); amqp_empty_table);
cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
// bind queue // bind queue
amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue); amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue);
if (queueName.bytes == NULL) if (queueName.bytes == NULL)
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_OUT_OF_MEMORY, "cRMQ_Channel::OpenQueue() failed while copying queue name"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_OUT_OF_MEMORY, "cRMQ_Channel::OpenQueue() failed while copying queue name");
amqp_queue_bind(m_amqpConnection, 1, queueName, amqp_cstring_bytes(pba_Exchange), amqp_cstring_bytes(bindingkey), amqp_queue_bind(m_amqpConnection, 1, queueName, amqp_cstring_bytes(pba_Exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table); amqp_empty_table);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
// basic consume // basic consume
amqp_basic_consume(m_amqpConnection, 1, queueName, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); amqp_basic_consume(m_amqpConnection, 1, queueName, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS); return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS);
} }
cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message) cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message)
...@@ -107,16 +107,17 @@ cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message) ...@@ -107,16 +107,17 @@ cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message)
std::string s_ContentType((const char *)p_Props->content_type.bytes, p_Props->content_type.len); std::string s_ContentType((const char *)p_Props->content_type.bytes, p_Props->content_type.len);
if (s_ContentType.compare(RMQ_APPLICATION_JSON) == 0) if (s_ContentType.compare(RMQ_APPLICATION_JSON) == 0)
{ {
result = mpc_RestParser->parse(pc_Message, &mc_envelope); result = mpc_RestParser->parse(pc_Message, &mc_envelope);
pc_Message->setType(cRMQ_MessageContentType::E_APPLICATION_JSON);
} }
else if (s_ContentType.compare(RMQ_TEXT_PLAIN) == 0) else if (s_ContentType.compare(RMQ_TEXT_PLAIN) == 0)
{ {
// free text // free text
cRMQ_MessageRest* p_Msg = (cRMQ_MessageRest *)pc_Message;
int len = (int)mc_envelope.message.body.len; int len = (int)mc_envelope.message.body.len;
char* content = new char[len]; char* content = new char[len];
strncpy(content, (const char *)mc_envelope.message.body.bytes, len); strncpy(content, (const char *)mc_envelope.message.body.bytes, len);
p_Msg->setContent(content); pc_Message->setContent(content);
pc_Message->setType(cRMQ_MessageContentType::E_TEXT_PLAIN);
} }
else else
result.SetResultCode(cRMQ_ResultCode::E_INVALID_CONTENT_TYPE); result.SetResultCode(cRMQ_ResultCode::E_INVALID_CONTENT_TYPE);
...@@ -133,10 +134,54 @@ cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message) ...@@ -133,10 +134,54 @@ cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message)
return result; return result;
} }
cRMQ_Result cRMQ_Channel::SendMessage(cRMQ_Message* pc_Message) cRMQ_Result cRMQ_Channel::SendMessage(cRMQ_Message* pc_Message, std::string exchange, std::string bindingKey)
{ {
// TBD amqp_basic_properties_t props;
return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS); props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG;
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2; /* persistent delivery mode */
// add example header
// amqp_table_t *table = &props.headers;
// props.headers.num_entries = 1;
// props.headers.entries = (amqp_table_entry_t_* )calloc(props.headers.num_entries, sizeof(amqp_table_entry_t));
// (table->entries[0]).key = amqp_cstring_bytes("key1");
// ((table->entries[0]).value).kind = AMQP_FIELD_KIND_I32;
// ((table->entries[0]).value).value.i32 = 1234;
rapidjson::Document rpj_Doc; // Null
rapidjson::Document::AllocatorType& rpj_Alloc = rpj_Doc.GetAllocator();
rpj_Doc.SetObject();
rpj_Doc.AddMember("method", pc_Message->getMethod().c_str(), rpj_Alloc);
rpj_Doc.AddMember("domain", pc_Message->getDomain().c_str(), rpj_Alloc);
rpj_Doc.AddMember("path", pc_Message->getPath().c_str(), rpj_Alloc);
rpj_Doc.AddMember("queryParams", pc_Message->getQueryParams().c_str(), rpj_Alloc);
rpj_Doc.AddMember("content", pc_Message->getContent().c_str(), rpj_Alloc);
// headers
rapidjson::Value headerArr(rapidjson::kArrayType);
for (auto it= pc_Message->getHeaders().begin(); it!=pc_Message->getHeaders().end(); ++it)
{
rapidjson::Value header;
header.SetObject();
header.AddMember((*it).m_name.c_str(), (*it).m_value.c_str(), rpj_Alloc);
headerArr.PushBack(header, rpj_Alloc);
}
rpj_Doc.AddMember("headers", headerArr, rpj_Alloc);
const char* content = rpj_Doc.GetString();
// build content
amqp_basic_publish(m_amqpConnection,
1,
amqp_cstring_bytes(exchange.c_str()),
amqp_cstring_bytes(bindingKey.c_str()),
0,
0,
&props,
amqp_cstring_bytes(content));
cRMQ_Result result = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::SendMessage");
return result;
} }
......
...@@ -42,7 +42,7 @@ public: ...@@ -42,7 +42,7 @@ public:
cRMQ_Result CloseChannel(); cRMQ_Result CloseChannel();
cRMQ_Result OpenQueue(const char* pba_Exchange, const char* bindingkey); cRMQ_Result OpenQueue(const char* pba_Exchange, const char* bindingkey);
cRMQ_Result RecieveMessage(cRMQ_Message* pc_Message); cRMQ_Result RecieveMessage(cRMQ_Message* pc_Message);
cRMQ_Result SendMessage(cRMQ_Message* pc_Message); cRMQ_Result SendMessage(cRMQ_Message* pc_Message, std::string exchange, std::string bindingKey);
}; };
#endif // _RMQ_CHANNEL_H #endif // _RMQ_CHANNEL_H
......
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
#include "RMQ_Client.h"
cRMQ_Client::cRMQ_Client() : mc_Channel()
{
}
cRMQ_Result cRMQ_Client::Init(const char* pba_Host, int i_Port, const char* pba_exchange, const char* pba_BindingKey)
{
cRMQ_Result result;
ms_Host = pba_Host;
mi_Port = i_Port;
ms_Exchange = pba_exchange;
ms_BindingKey = pba_BindingKey;
// open channel
result = mc_Channel.OpenChannel(pba_Host, i_Port);
if (result.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return result;
return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS);
}
void cRMQ_Client::Destroy()
{
mc_Channel.CloseChannel();
}
cRMQ_Result cRMQ_Client::SendMessage(cRMQ_Message* pc_Message)
{
return mc_Channel.SendMessage(pc_Message, ms_Exchange, ms_BindingKey);
}
//std::string cRMQ_Client::getQueueString()
//{
// std::string result = ms_BindingKey + "." + ms_Exchange + "@" + ms_Host + ":" + std::to_string(mi_Port);
// return result;
//}
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/*
* File: RMQ_Client.h
* Author: adi
*
* Created on June 14, 2016, 3:17 PM
*/
#ifndef RMQ_CLIENT_H
#define RMQ_CLIENT_H
#include <string>
#include "RMQ_Channel.h"
//#include "RMQ_IParser.h"
class cRMQ_Client
{
protected:
cRMQ_Channel mc_Channel;
std::string ms_Host;
int mi_Port;
std::string ms_Exchange;
std::string ms_BindingKey;
public:
cRMQ_Client();
cRMQ_Result Init(const char* pba_Host, int i_Port, const char* pba_exchange, const char* BindingKey);
std::string getQueueString();
void Destroy();
cRMQ_Channel* getChannel() { return &mc_Channel; }
cRMQ_Result SendMessage(cRMQ_Message* pc_Message);
};
#endif /* RMQ_CLIENT_H */
/** /**
* @file RMQ_Message.h * @file RMQ_Message.h
* @brief This is an abstract class. you must override it. * @brief A warpper for rabbit message
* @author Adi Amir * @author Adi Amir
*/ */
...@@ -9,20 +9,96 @@ ...@@ -9,20 +9,96 @@
//#include <General_Def.h> //#include <General_Def.h>
#include <string> #include <string>
#include <vector>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "RMQ_Utils.h"
#include "RMQ_Types.h" #include "RMQ_Types.h"
class cRMQ_Message class cRMQ_Message
{ {
protected: protected:
eRMQMessageType me_Type; cRMQ_MessageContentType::eType me_Type;
std::string method;
std::string domain;
std::string path;
std::string queryParams;
std::string content;
std::vector<cNameValuePair> headers;
public: public:
cRMQ_Message() cRMQ_Message()
{ {
me_Type = E_MESSAGE_TYPE_UNKNOWN; setType(cRMQ_MessageContentType::UNDEFINED);
} }
void setType(cRMQ_MessageContentType::eType type) {
this->me_Type = type;
}
cRMQ_MessageContentType::eType getType() {
return this->me_Type;
}
// get/set method
const std::string& getMethod() const {
return method;
}
void setMethod(const std::string& method) {
this->method = method;
}
// get/set domain
const std::string& getDomain() const {
return domain;
}
void setDomain(const std::string& domain) {
this->domain = domain;
}
// get/set path
const std::string& getPath() const {
return path;
}
void setPath(const std::string& path) {
this->path = path;
}
// get/set query parameters
const std::string& getQueryParams() const {
return queryParams;
}
void setQueryParams(const std::string& queryParams) {
this->queryParams = queryParams;
}
// get/set content
const std::string& getContent() const {
return content;
}
void setContent(const std::string& content) {
this->content = content;
}
// get/set headers
const std::vector<cNameValuePair>& getHeaders() const {
return headers;
}
void setHeaders(const std::vector<cNameValuePair>& headers) {
this->headers = headers;
}
cNameValuePair& getHeader(int index)
{
return this->headers.at(index);
}
void setHeader(cNameValuePair header) {
this->headers.push_back(header);
}
}; };
#endif // _RMQ_MESSAGE_H #endif // _RMQ_MESSAGE_H
......
/**
* @file RMQ_Message.h
* @brief A warpper for rabbit message
* @author Adi Amir
*/
#ifndef _RMQ_MESSAGE_REST_H
#define _RMQ_MESSAGE_REST_H
//#include <General_Def.h>
#include <string>
#include <vector>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "RMQ_Utils.h"
#include "RMQ_Message.h"
class cRMQ_MessageRest: public cRMQ_Message
{
protected:
std::string protocol;
std::string method;
std::string domain;
std::string uri;
std::string queryParams;
std::string content;
std::vector<cNameValuePair> headers;
public:
cRMQ_MessageRest()
{
me_Type = E_MESSAGE_TYPE_REST;
}
// protocol
const std::string& getProtocol() const {
return protocol;
}
void setProtocol(const std::string& protocol) {
this->protocol = protocol;
}
// get/set method
const std::string& getMethod() const {
return method;
}
void setMethod(const std::string& method) {
this->method = method;
}
// get/set domain
const std::string& getDomain() const {
return domain;
}
void setDomain(const std::string& domain) {
this->domain = domain;
}
// get/set Uri
const std::string& getUri() const {
return uri;
}
void setUri(const std::string& uri) {
this->uri = uri;
}
// get/set query parameters
const std::string& getQueryParams() const {
return queryParams;
}
void setQueryParams(const std::string& queryParams) {
this->queryParams = queryParams;
}
// get/set content
const std::string& getContent() const {
return content;
}
void setContent(const std::string& content) {
this->content = content;
}
// get/set headers
const std::vector<cNameValuePair>& getHeaders() const {
return headers;
}
void setHeaders(const std::vector<cNameValuePair>& headers) {
this->headers = headers;
}
cNameValuePair& getHeader(int index)
{
return this->headers.at(index);
}
void setHeader(cNameValuePair header) {
this->headers.push_back(header);
}
};
#endif // _RMQ_MESSAGE_REST_H
...@@ -10,17 +10,15 @@ ...@@ -10,17 +10,15 @@
#include "RMQ_Types.h" #include "RMQ_Types.h"
#include "RMQ_RestParser.h" #include "RMQ_RestParser.h"
#include "RMQ_MessageRest.h" #include "RMQ_Message.h"
cRMQ_RestParser::cRMQ_RestParser(eRMQMessageRestDataFormat e_DataFormat) cRMQ_RestParser::cRMQ_RestParser()
{ {
me_RMQMessageDataFormat = e_DataFormat;
} }
cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope) cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope)
{ {
cRMQ_Result result; cRMQ_Result result;
cRMQ_MessageRest* p_Msg = (cRMQ_MessageRest *)pc_Message;
rapidjson::Document rpj_Content; rapidjson::Document rpj_Content;
int len = (int)pc_Envelope->message.body.len; int len = (int)pc_Envelope->message.body.len;
char* content = new char[len]; char* content = new char[len];
...@@ -28,40 +26,32 @@ cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc ...@@ -28,40 +26,32 @@ cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc
// initial parse // initial parse
if (rpj_Content.Parse<0>(content).HasParseError() == false) if (rpj_Content.Parse<0>(content).HasParseError() == false)
{ {
//p_Msg->me_Type = E_MESSAGE_TYPE_REST;
// protocol
std::string s_protocol = getNodeAsString(rpj_Content, "protocol");
if (s_protocol == "")
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
p_Msg->setProtocol(s_protocol);
// method // method
std::string s_method = getNodeAsString(rpj_Content, "method"); std::string s_method = getNodeAsString(rpj_Content, "method");
if (s_method == "") if (s_method.empty())
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
p_Msg->setMethod(s_method); pc_Message->setMethod(s_method);
// domain // domain
std::string s_domain = getNodeAsString(rpj_Content, "domain"); std::string s_domain = getNodeAsString(rpj_Content, "domain");
if (s_domain == "") if (s_domain.empty())
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
p_Msg->setDomain(s_domain); pc_Message->setDomain(s_domain);
// Uri // path
std::string s_uri = getNodeAsString(rpj_Content, "uri"); std::string s_path = getNodeAsString(rpj_Content, "path");
if (s_uri == "") if (s_path.empty())
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
p_Msg->setUri(s_uri); pc_Message->setPath(s_path);
// requestParams (optional) // requestParams (optional)
std::string s_queryParams = getNodeAsString(rpj_Content, "queryParams"); std::string s_queryParams = getNodeAsString(rpj_Content, "queryParams");
p_Msg->setQueryParams(s_queryParams); pc_Message->setQueryParams(s_queryParams);
// content (optional) // content (optional)
std::string s_content = getNodeAsString(rpj_Content, "content"); std::string s_content = getNodeAsString(rpj_Content, "content");
p_Msg->setContent(s_content); pc_Message->setContent(s_content);
// headers // headers
const rapidjson::Value& rpj_headerArr = rpj_Content["headers"]; const rapidjson::Value& rpj_headerArr = rpj_Content["headers"];
...@@ -72,7 +62,7 @@ cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc ...@@ -72,7 +62,7 @@ cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc
cNameValuePair* pc_Header = parseHeader(s_header); cNameValuePair* pc_Header = parseHeader(s_header);
if (pc_Header != NULL) if (pc_Header != NULL)
p_Msg->setHeader(*pc_Header); pc_Message->setHeader(*pc_Header);
} }
} }
else else
......
...@@ -33,11 +33,8 @@ ...@@ -33,11 +33,8 @@
class cRMQ_RestParser : public cRMQ_IParser class cRMQ_RestParser : public cRMQ_IParser
{ {
protected:
eRMQMessageRestDataFormat me_RMQMessageDataFormat;
public: public:
cRMQ_RestParser(eRMQMessageRestDataFormat e_RMQMessageDataFormat); cRMQ_RestParser();
virtual ~cRMQ_RestParser() {}; virtual ~cRMQ_RestParser() {};
virtual cRMQ_Result parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope); virtual cRMQ_Result parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope);
......
...@@ -17,8 +17,8 @@ class cRMQ_MessageContentType ...@@ -17,8 +17,8 @@ class cRMQ_MessageContentType
{ {
public: public:
enum eType { enum eType {
TEXT_PLAIN, E_TEXT_PLAIN,
APPLICATION_JSON, E_APPLICATION_JSON,
UNDEFINED UNDEFINED
}; };
...@@ -26,16 +26,16 @@ protected: ...@@ -26,16 +26,16 @@ protected:
eType me_Type; eType me_Type;
public: public:
cRMQ_MessageContentType() { me_Type = TEXT_PLAIN; }; cRMQ_MessageContentType() { me_Type = E_TEXT_PLAIN; };
cRMQ_MessageContentType::eType GetType() { return me_Type; } cRMQ_MessageContentType::eType GetType() { return me_Type; }
void SetType(cRMQ_MessageContentType::eType Type) { me_Type = Type; } void SetType(cRMQ_MessageContentType::eType Type) { me_Type = Type; }
cRMQ_MessageContentType::eType Resolve(const char* pba_Type) cRMQ_MessageContentType::eType Resolve(const char* pba_Type)
{ {
if (strcmp(pba_Type, RMQ_TEXT_PLAIN) == 0) if (strcmp(pba_Type, RMQ_TEXT_PLAIN) == 0)
return cRMQ_MessageContentType::TEXT_PLAIN; return cRMQ_MessageContentType::E_TEXT_PLAIN;
else if (strcmp(pba_Type, RMQ_APPLICATION_JSON) == 0) else if (strcmp(pba_Type, RMQ_APPLICATION_JSON) == 0)
return cRMQ_MessageContentType::APPLICATION_JSON; return cRMQ_MessageContentType::E_APPLICATION_JSON;
else else
return cRMQ_MessageContentType::UNDEFINED; return cRMQ_MessageContentType::UNDEFINED;
} }
...@@ -44,9 +44,9 @@ public: ...@@ -44,9 +44,9 @@ public:
{ {
switch (me_Type) switch (me_Type)
{ {
case TEXT_PLAIN: case E_TEXT_PLAIN:
return RMQ_APPLICATION_JSON; return RMQ_APPLICATION_JSON;
case APPLICATION_JSON: case E_APPLICATION_JSON:
return RMQ_APPLICATION_JSON; return RMQ_APPLICATION_JSON;
default: default:
return "Unknown"; return "Unknown";
...@@ -54,20 +54,7 @@ public: ...@@ -54,20 +54,7 @@ public:
} }
}; };
enum eRMQMessageType
{
E_MESSAGE_TYPE_UNKNOWN,
E_MESSAGE_TYPE_TEXT,
E_MESSAGE_TYPE_REST,
E_MESSAGE_TYPE_BINARY
};
enum eRMQMessageRestDataFormat
{
E_MESSAGE_DATA_TEXT_PLAIN,
E_MESSAGE_DATA_JSON,
E_MESSAGE_DATA_XML
};
class cNameValuePair class cNameValuePair
{ {
...@@ -80,9 +67,6 @@ public: ...@@ -80,9 +67,6 @@ public:
{ {
this->m_name = name; this->m_name = name;
this->m_value = value; this->m_value = value;
} }
}; };
......
...@@ -43,10 +43,9 @@ int main(int argc, char** argv) ...@@ -43,10 +43,9 @@ int main(int argc, char** argv)
//if (messagebody == NULL) //if (messagebody == NULL)
{ {
s_msg = "{"; s_msg = "{";
s_msg += "\"protocol\": \"http\",";
s_msg += "\"method\": \"POST\","; s_msg += "\"method\": \"POST\",";
s_msg += "\"domain\": \"172.16.1.151:8081\","; s_msg += "\"domain\": \"172.16.1.151:8081\",";
s_msg += "\"uri\": \"/publicSafety/doAction\","; s_msg += "\"path\": \"/publicSafety/doAction\",";
s_msg += "\"queryParams\": \"name1=value1&name2=value2\","; s_msg += "\"queryParams\": \"name1=value1&name2=value2\",";
s_msg += "\"content\": \"playAudio\","; s_msg += "\"content\": \"playAudio\",";
s_msg += "\"headers\": [\"n1=v1\", \"n2=v2\"]"; s_msg += "\"headers\": [\"n1=v1\", \"n2=v2\"]";
......
to start rabbitmq server to start rabbitmq server
======================== ========================
rabbitmq-server start rabbitmqctl start
./RabbitMQSend localhost 5672 test1 MyFirstQ BBB ./RabbitMQSend localhost 5672 test1 MyFirstQ BBB
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment