Commit 4d3fbac4 by Adi Amir

support rabbitMQ in Microservice server side

parent cd14efd2
...@@ -11,108 +11,132 @@ ...@@ -11,108 +11,132 @@
#include "RMQ_Channel.h" #include "RMQ_Channel.h"
#include "RMQ_Message.h" #include "RMQ_Message.h"
#include "RMQ_Utils.h" #include "RMQ_Utils.h"
#include "RMQ_MessageRest.h"
cRMQ_Result cRMQ_Channel::OpenChannel(const char* pba_Host, int i_Port, cRMQ_IParser* pc_Parser) cRMQ_Result cRMQ_Channel::OpenChannel(const char* pba_Host, int i_Port)
{ {
mpc_Parser = pc_Parser; mpc_RestParser = new cRMQ_RestParser(E_MESSAGE_DATA_JSON);
m_amqpConnection = amqp_new_connection(); m_amqpConnection = amqp_new_connection();
if (!m_amqpConnection) if (!m_amqpConnection)
return cRMQ_Result(E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_new_connection() failed in cRMQ_Channel::OpenChannel"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_new_connection() failed in cRMQ_Channel::OpenChannel");
mp_amqpSocket = amqp_tcp_socket_new(m_amqpConnection); mp_amqpSocket = amqp_tcp_socket_new(m_amqpConnection);
if (!mp_amqpSocket) if (!mp_amqpSocket)
return cRMQ_Result(E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_tcp_socket_new() failed in cRMQ_Channel::OpenChannel"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_tcp_socket_new() failed in cRMQ_Channel::OpenChannel");
int ret = amqp_socket_open(mp_amqpSocket, pba_Host, i_Port); int ret = amqp_socket_open(mp_amqpSocket, pba_Host, i_Port);
if (ret != 0) if (ret != 0)
return cRMQ_Result(E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_socket_open() failed in cRMQ_Channel::OpenChannel"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_socket_open() failed in cRMQ_Channel::OpenChannel");
amqp_rpc_reply_t amqpRet = amqp_login(m_amqpConnection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"); amqp_rpc_reply_t amqpRet = amqp_login(m_amqpConnection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (amqpRet.reply_type != AMQP_RESPONSE_NORMAL) if (amqpRet.reply_type != AMQP_RESPONSE_NORMAL)
return cRMQ_Result(E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_login() failed in cRMQ_Channel::OpenChannel"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_login() failed in cRMQ_Channel::OpenChannel");
amqp_channel_open(m_amqpConnection, 1); amqp_channel_open(m_amqpConnection, 1);
cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenChannel"); cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenChannel");
if (rmqResult.getCode() == RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() == cRMQ_ResultCode::RMQ_SUCCESS)
mb_ChannelOpenedSuccessfully = true; mb_ChannelOpenedSuccessfully = true;
return rmqResult; return rmqResult;
} }
cRMQ_Result cRMQ_Channel::CloseChannel() cRMQ_Result cRMQ_Channel::CloseChannel()
{ {
cRMQ_Result rmqResult(RMQ_SUCCESS); cRMQ_Result rmqResult(cRMQ_ResultCode::RMQ_SUCCESS);
amqp_channel_close(m_amqpConnection, 1, AMQP_REPLY_SUCCESS); amqp_channel_close(m_amqpConnection, 1, AMQP_REPLY_SUCCESS);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
amqp_connection_close(m_amqpConnection, AMQP_REPLY_SUCCESS); amqp_connection_close(m_amqpConnection, AMQP_REPLY_SUCCESS);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
amqp_destroy_connection(m_amqpConnection); amqp_destroy_connection(m_amqpConnection);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
return cRMQ_Result(RMQ_SUCCESS); return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS);
} }
cRMQ_Result cRMQ_Channel::OpenQueue(const char* pba_Exchange, const char* bindingkey) { cRMQ_Result cRMQ_Channel::OpenQueue(const char* pba_Exchange, const char* bindingkey) {
if (mb_ChannelOpenedSuccessfully == false) if (mb_ChannelOpenedSuccessfully == false)
return cRMQ_Result(E_RMQ_CHANNEL_NOT_OPENED, "cRMQ_Channel::OpenQueue() failed"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_CHANNEL_NOT_OPENED, "cRMQ_Channel::OpenQueue() failed");
// declare queue // declare queue
amqp_queue_declare_ok_t *r = amqp_queue_declare(m_amqpConnection, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_queue_declare_ok_t *r = amqp_queue_declare(m_amqpConnection, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table); amqp_empty_table);
cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
// bind queue // bind queue
amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue); amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue);
if (queueName.bytes == NULL) if (queueName.bytes == NULL)
return cRMQ_Result(E_RMQ_OUT_OF_MEMORY, "cRMQ_Channel::OpenQueue() failed while copying queue name"); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_OUT_OF_MEMORY, "cRMQ_Channel::OpenQueue() failed while copying queue name");
amqp_queue_bind(m_amqpConnection, 1, queueName, amqp_cstring_bytes(pba_Exchange), amqp_cstring_bytes(bindingkey), amqp_queue_bind(m_amqpConnection, 1, queueName, amqp_cstring_bytes(pba_Exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table); amqp_empty_table);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
// basic consume // basic consume
amqp_basic_consume(m_amqpConnection, 1, queueName, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); amqp_basic_consume(m_amqpConnection, 1, queueName, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue"); rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS) if (rmqResult.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return rmqResult; return rmqResult;
return cRMQ_Result(RMQ_SUCCESS); return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS);
} }
cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message) cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message)
{ {
amqp_maybe_release_buffers(m_amqpConnection); cRMQ_Result result(cRMQ_ResultCode::RMQ_SUCCESS);
amqp_rpc_reply_t res = amqp_consume_message(m_amqpConnection, &mc_envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL == res.reply_type) amqp_maybe_release_buffers(m_amqpConnection);
{ amqp_rpc_reply_t res = amqp_consume_message(m_amqpConnection, &mc_envelope, NULL, 0);
cRMQ_Result result = mpc_Parser->parse(pc_Message, &mc_envelope); if (AMQP_RESPONSE_NORMAL == res.reply_type)
amqp_destroy_envelope(&mc_envelope); {
return result; // get content type
} amqp_basic_properties_t* p_Props = &mc_envelope.message.properties;
else if (p_Props->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG)
{ {
amqp_destroy_envelope(&mc_envelope); std::string s_ContentType((const char *)p_Props->content_type.bytes, p_Props->content_type.len);
cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::RecieveMessage"); if (s_ContentType.compare(RMQ_APPLICATION_JSON) == 0)
return rmqResult; {
} result = mpc_RestParser->parse(pc_Message, &mc_envelope);
}
else if (s_ContentType.compare(RMQ_TEXT_PLAIN) == 0)
{
// free text
cRMQ_MessageRest* p_Msg = (cRMQ_MessageRest *)pc_Message;
int len = (int)mc_envelope.message.body.len;
char* content = new char[len];
strncpy(content, (const char *)mc_envelope.message.body.bytes, len);
p_Msg->setContent(content);
}
else
result.SetResultCode(cRMQ_ResultCode::E_INVALID_CONTENT_TYPE);
}
else
result.SetResultCode(cRMQ_ResultCode::E_MISSING_CONTENT_TYPE);
}
else
{
result = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::RecieveMessage");
}
amqp_destroy_envelope(&mc_envelope);
return result;
} }
cRMQ_Result cRMQ_Channel::SendMessage(cRMQ_Message* pc_Message) cRMQ_Result cRMQ_Channel::SendMessage(cRMQ_Message* pc_Message)
{ {
// TBD // TBD
return cRMQ_Result(RMQ_SUCCESS); return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS);
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "RMQ_Result.h" #include "RMQ_Result.h"
#include "RMQ_Message.h" #include "RMQ_Message.h"
#include "RMQ_IParser.h" #include "RMQ_RestParser.h"
class cRMQ_Channel class cRMQ_Channel
{ {
...@@ -25,20 +25,20 @@ protected: ...@@ -25,20 +25,20 @@ protected:
amqp_connection_state_t m_amqpConnection; amqp_connection_state_t m_amqpConnection;
amqp_bytes_t m_Queuename; amqp_bytes_t m_Queuename;
amqp_envelope_t mc_envelope; amqp_envelope_t mc_envelope;
cRMQ_IParser* mpc_Parser; cRMQ_RestParser* mpc_RestParser;
bool mb_ChannelOpenedSuccessfully; bool mb_ChannelOpenedSuccessfully;
public: public:
cRMQ_Channel() cRMQ_Channel()
{ {
mpc_Parser = nullptr; mpc_RestParser = nullptr;
mp_amqpSocket = nullptr; mp_amqpSocket = nullptr;
m_amqpConnection = nullptr; m_amqpConnection = nullptr;
m_Queuename.len = 0; m_Queuename.len = 0;
mb_ChannelOpenedSuccessfully = false; mb_ChannelOpenedSuccessfully = false;
} }
cRMQ_Result OpenChannel(const char* pba_Host, int i_Port, cRMQ_IParser* pc_Parser); cRMQ_Result OpenChannel(const char* pba_Host, int i_Port);
cRMQ_Result CloseChannel(); cRMQ_Result CloseChannel();
cRMQ_Result OpenQueue(const char* pba_Exchange, const char* bindingkey); cRMQ_Result OpenQueue(const char* pba_Exchange, const char* bindingkey);
cRMQ_Result RecieveMessage(cRMQ_Message* pc_Message); cRMQ_Result RecieveMessage(cRMQ_Message* pc_Message);
......
...@@ -32,6 +32,7 @@ protected: ...@@ -32,6 +32,7 @@ protected:
public: public:
cRMQ_MessageRest() cRMQ_MessageRest()
{ {
me_Type = E_MESSAGE_TYPE_REST;
} }
// protocol // protocol
const std::string& getProtocol() const { const std::string& getProtocol() const {
......
...@@ -6,71 +6,82 @@ ...@@ -6,71 +6,82 @@
//#include <SysAdmin_LogApi.h> //#include <SysAdmin_LogApi.h>
#include <string.h>
#include "RMQ_Types.h" #include "RMQ_Types.h"
#include "RMQ_RestParser.h" #include "RMQ_RestParser.h"
#include "RMQ_MessageRest.h" #include "RMQ_MessageRest.h"
cRMQ_RestParser::cRMQ_RestParser(eRMQMessageDataFormat e_RMQMessageDataFormat) cRMQ_RestParser::cRMQ_RestParser(eRMQMessageRestDataFormat e_DataFormat)
{ {
me_RMQMessageDataFormat = e_RMQMessageDataFormat; me_RMQMessageDataFormat = e_DataFormat;
} }
cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope) cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope)
{ {
cRMQ_Result result;
cRMQ_MessageRest* p_Msg = (cRMQ_MessageRest *)pc_Message; cRMQ_MessageRest* p_Msg = (cRMQ_MessageRest *)pc_Message;
rapidjson::Document rpj_Content; rapidjson::Document rpj_Content;
std::string content( reinterpret_cast<char const*>(pc_Envelope->message.body.bytes), pc_Envelope->message.body.len); int len = (int)pc_Envelope->message.body.len;
char* content = new char[len];
strncpy(content, (const char *)pc_Envelope->message.body.bytes, len);
// initial parse // initial parse
if (rpj_Content.Parse<0>(content.c_str()).HasParseError()) if (rpj_Content.Parse<0>(content).HasParseError() == false)
{ {
return cRMQ_Result(E_RMQ_PARSER_FAILED); //p_Msg->me_Type = E_MESSAGE_TYPE_REST;
// protocol
std::string s_protocol = getNodeAsString(rpj_Content, "protocol");
if (s_protocol == "")
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
p_Msg->setProtocol(s_protocol);
// method
std::string s_method = getNodeAsString(rpj_Content, "method");
if (s_method == "")
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
p_Msg->setMethod(s_method);
// domain
std::string s_domain = getNodeAsString(rpj_Content, "domain");
if (s_domain == "")
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
p_Msg->setDomain(s_domain);
// Uri
std::string s_uri = getNodeAsString(rpj_Content, "uri");
if (s_uri == "")
return cRMQ_Result(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
p_Msg->setUri(s_uri);
// requestParams (optional)
std::string s_queryParams = getNodeAsString(rpj_Content, "queryParams");
p_Msg->setQueryParams(s_queryParams);
// content (optional)
std::string s_content = getNodeAsString(rpj_Content, "content");
p_Msg->setContent(s_content);
// headers
const rapidjson::Value& rpj_headerArr = rpj_Content["headers"];
for (rapidjson::SizeType i = 0; i < rpj_headerArr.Size(); i++) // rapidjson uses SizeType instead of size_t.
{
const rapidjson::Value& rpj_Header = rpj_headerArr[i];
std::string s_header = rpj_Header.GetString();
cNameValuePair* pc_Header = parseHeader(s_header);
if (pc_Header != NULL)
p_Msg->setHeader(*pc_Header);
}
} }
else
result.SetResultCode(cRMQ_ResultCode::E_RMQ_PARSER_FAILED);
// protocol // clean up
std::string s_protocol = getNodeAsString(rpj_Content, "protocol"); delete content;
if (s_protocol == "")
return cRMQ_Result(E_RMQ_PARSER_FAILED); return result;
p_Msg->setProtocol(s_protocol);
// method
std::string s_method = getNodeAsString(rpj_Content, "method");
if (s_method == "")
return cRMQ_Result(E_RMQ_PARSER_FAILED);
p_Msg->setMethod(s_method);
// domain
std::string s_domain = getNodeAsString(rpj_Content, "domain");
if (s_domain == "")
return cRMQ_Result(E_RMQ_PARSER_FAILED);
p_Msg->setDomain(s_domain);
// Uri
std::string s_uri = getNodeAsString(rpj_Content, "uri");
if (s_uri == "")
return cRMQ_Result(E_RMQ_PARSER_FAILED);
p_Msg->setUri(s_uri);
// requestParams (optional)
std::string s_queryParams = getNodeAsString(rpj_Content, "queryParams");
p_Msg->setQueryParams(s_queryParams);
// content (optional)
std::string s_content = getNodeAsString(rpj_Content, "content");
p_Msg->setContent(s_content);
// headers
const rapidjson::Value& rpj_headerArr = rpj_Content["headers"];
for (rapidjson::SizeType i = 0; i < rpj_headerArr.Size(); i++) // rapidjson uses SizeType instead of size_t.
{
const rapidjson::Value& rpj_Header = rpj_headerArr[i];
std::string s_header = rpj_Header.GetString();
cNameValuePair* pc_Header = parseHeader(s_header);
if (pc_Header != NULL)
p_Msg->setHeader(*pc_Header);
}
return cRMQ_Result(RMQ_SUCCESS);
} }
std::string cRMQ_RestParser::getNodeAsString(rapidjson::Document& rpj_Obj, const char* pba_NodeName) std::string cRMQ_RestParser::getNodeAsString(rapidjson::Document& rpj_Obj, const char* pba_NodeName)
......
...@@ -34,10 +34,10 @@ ...@@ -34,10 +34,10 @@
class cRMQ_RestParser : public cRMQ_IParser class cRMQ_RestParser : public cRMQ_IParser
{ {
protected: protected:
eRMQMessageDataFormat me_RMQMessageDataFormat; eRMQMessageRestDataFormat me_RMQMessageDataFormat;
public: public:
cRMQ_RestParser(eRMQMessageDataFormat e_RMQMessageDataFormat); cRMQ_RestParser(eRMQMessageRestDataFormat e_RMQMessageDataFormat);
virtual ~cRMQ_RestParser() {}; virtual ~cRMQ_RestParser() {};
virtual cRMQ_Result parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope); virtual cRMQ_Result parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope);
......
...@@ -14,7 +14,12 @@ ...@@ -14,7 +14,12 @@
#include <amqp.h> #include <amqp.h>
#include <amqp_framing.h> #include <amqp_framing.h>
enum eRMQ_Code {
class cRMQ_ResultCode
{
public:
enum eCode {
RMQ_SUCCESS, RMQ_SUCCESS,
E_RMQ_FAILED_TO_OPEN_CHANNEL, E_RMQ_FAILED_TO_OPEN_CHANNEL,
E_RMQ_CHANNEL_NOT_OPENED, E_RMQ_CHANNEL_NOT_OPENED,
...@@ -24,33 +29,80 @@ enum eRMQ_Code { ...@@ -24,33 +29,80 @@ enum eRMQ_Code {
E_RMQ_UNKNOWN_SERVER_ERROR, E_RMQ_UNKNOWN_SERVER_ERROR,
E_RMQ_OUT_OF_MEMORY, E_RMQ_OUT_OF_MEMORY,
E_RMQ_PARSER_FAILED, E_RMQ_PARSER_FAILED,
E_MISSING_CONTENT_TYPE,
E_INVALID_CONTENT_TYPE,
E_RMQ_UNINITIALIZED E_RMQ_UNINITIALIZED
}; };
protected:
eCode me_Code;
public:
cRMQ_ResultCode() { me_Code = E_RMQ_UNINITIALIZED; };
cRMQ_ResultCode::eCode GetCode() { return me_Code; }
void SetCode(cRMQ_ResultCode::eCode Code) { me_Code = Code; }
const char* ToString()
{
switch (me_Code)
{
case RMQ_SUCCESS:
return "RMQ_SUCCESS";
case E_RMQ_FAILED_TO_OPEN_CHANNEL:
return "E_RMQ_FAILED_TO_OPEN_CHANNEL";
case E_RMQ_CHANNEL_NOT_OPENED:
return "E_RMQ_CHANNEL_NOT_OPENED";
case E_RMQ_NO_RESPONSE:
return "E_RMQ_NO_RESPONSE";
case E_RMQ_RESPONSE_LIBRARY_EXCEPTION:
return "E_RMQ_RESPONSE_LIBRARY_EXCEPTION";
case E_RMQ_RESPONSE_SERVER_EXCEPTION:
return "E_RMQ_RESPONSE_SERVER_EXCEPTION";
case E_RMQ_UNKNOWN_SERVER_ERROR:
return "E_RMQ_UNKNOWN_SERVER_ERROR";
case E_RMQ_OUT_OF_MEMORY:
return "E_RMQ_OUT_OF_MEMORY";
case E_RMQ_PARSER_FAILED:
return "E_RMQ_PARSER_FAILED";
case E_MISSING_CONTENT_TYPE:
return "E_MISSING_CONTENT_TYPE";
case E_INVALID_CONTENT_TYPE:
return "E_INVALID_CONTENT_TYPE";
case E_RMQ_UNINITIALIZED:
return "E_RMQ_UNINITIALIZED";
default:
return "Unknown";
}
}
};
class cRMQ_Result class cRMQ_Result
{ {
protected: protected:
eRMQ_Code me_Code; cRMQ_ResultCode mc_ResultCode;
std::string ms_Error; std::string ms_Error;
public: public:
cRMQ_Result() cRMQ_Result()
{ {
me_Code = E_RMQ_UNINITIALIZED; mc_ResultCode.SetCode(cRMQ_ResultCode::RMQ_SUCCESS);
ms_Error = ""; ms_Error = "";
} }
cRMQ_Result(eRMQ_Code e_Code) cRMQ_Result(cRMQ_ResultCode::eCode Code)
{ {
me_Code = e_Code; mc_ResultCode.SetCode(Code);
ms_Error = ""; ms_Error = "";
} }
cRMQ_Result(eRMQ_Code e_Code, const char* pba_error) cRMQ_Result(cRMQ_ResultCode::eCode Code, const char* pba_error)
{ {
me_Code = e_Code; mc_ResultCode.SetCode(Code);
ms_Error = pba_error; ms_Error = pba_error;
} }
eRMQ_Code getCode() { return me_Code; }; cRMQ_ResultCode GetResultCode() { return mc_ResultCode; };
void SetResultCode(cRMQ_ResultCode ResultCode) { mc_ResultCode = ResultCode; }
void SetResultCode(cRMQ_ResultCode::eCode eCode) { mc_ResultCode.SetCode(eCode); }
std::string getError() { return ms_Error; }; std::string getError() { return ms_Error; };
}; };
......
...@@ -15,7 +15,7 @@ cRMQ_Server::cRMQ_Server() : mc_Channel() ...@@ -15,7 +15,7 @@ cRMQ_Server::cRMQ_Server() : mc_Channel()
{ {
} }
cRMQ_Result cRMQ_Server::Init(const char* pba_Host, int i_Port, const char* pba_exchange, const char* pba_BindingKey, cRMQ_IParser* pc_MsgParser) cRMQ_Result cRMQ_Server::Init(const char* pba_Host, int i_Port, const char* pba_exchange, const char* pba_BindingKey)
{ {
cRMQ_Result result; cRMQ_Result result;
...@@ -23,19 +23,18 @@ cRMQ_Result cRMQ_Server::Init(const char* pba_Host, int i_Port, const char* pba_ ...@@ -23,19 +23,18 @@ cRMQ_Result cRMQ_Server::Init(const char* pba_Host, int i_Port, const char* pba_
mi_Port = i_Port; mi_Port = i_Port;
ms_Exchange = pba_exchange; ms_Exchange = pba_exchange;
ms_BindingKey = pba_BindingKey; ms_BindingKey = pba_BindingKey;
mpc_MsgParser = pc_MsgParser;
// open channel // open channel
result = mc_Channel.OpenChannel(pba_Host, i_Port, mpc_MsgParser); result = mc_Channel.OpenChannel(pba_Host, i_Port);
if (result.getCode() != RMQ_SUCCESS) if (result.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return result; return result;
// open queue // open queue
result = mc_Channel.OpenQueue(pba_exchange, pba_BindingKey); result = mc_Channel.OpenQueue(pba_exchange, pba_BindingKey);
if (result.getCode() != RMQ_SUCCESS) if (result.GetResultCode().GetCode() != cRMQ_ResultCode::RMQ_SUCCESS)
return result; return result;
return cRMQ_Result(RMQ_SUCCESS); return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS);
} }
void cRMQ_Server::Destroy() void cRMQ_Server::Destroy()
......
...@@ -18,18 +18,16 @@ class cRMQ_Server ...@@ -18,18 +18,16 @@ class cRMQ_Server
{ {
protected: protected:
cRMQ_Channel mc_Channel; cRMQ_Channel mc_Channel;
cRMQ_IParser* mpc_MsgParser;
// server parameters // server parameters
std::string ms_Host; std::string ms_Host;
int mi_Port; int mi_Port;
std::string ms_Exchange; std::string ms_Exchange;
std::string ms_BindingKey; std::string ms_BindingKey;
public: public:
cRMQ_Server(); cRMQ_Server();
cRMQ_Result Init(const char* pba_Host, int i_Port, const char* pba_exchange, const char* BindingKey, cRMQ_IParser* pc_MsgParser); cRMQ_Result Init(const char* pba_Host, int i_Port, const char* pba_exchange, const char* BindingKey);
std::string getQueueString(); std::string getQueueString();
void Destroy(); void Destroy();
......
...@@ -7,8 +7,52 @@ ...@@ -7,8 +7,52 @@
#ifndef _RMQ_TYPES_H #ifndef _RMQ_TYPES_H
#define _RMQ_TYPES_H #define _RMQ_TYPES_H
//#include <General_Def.h>
#include <string> #include <string>
#include <string.h>
#define RMQ_TEXT_PLAIN "text/plain"
#define RMQ_APPLICATION_JSON "application/json"
class cRMQ_MessageContentType
{
public:
enum eType {
TEXT_PLAIN,
APPLICATION_JSON,
UNDEFINED
};
protected:
eType me_Type;
public:
cRMQ_MessageContentType() { me_Type = TEXT_PLAIN; };
cRMQ_MessageContentType::eType GetType() { return me_Type; }
void SetType(cRMQ_MessageContentType::eType Type) { me_Type = Type; }
cRMQ_MessageContentType::eType Resolve(const char* pba_Type)
{
if (strcmp(pba_Type, RMQ_TEXT_PLAIN) == 0)
return cRMQ_MessageContentType::TEXT_PLAIN;
else if (strcmp(pba_Type, RMQ_APPLICATION_JSON) == 0)
return cRMQ_MessageContentType::APPLICATION_JSON;
else
return cRMQ_MessageContentType::UNDEFINED;
}
const char* ToString()
{
switch (me_Type)
{
case TEXT_PLAIN:
return RMQ_APPLICATION_JSON;
case APPLICATION_JSON:
return RMQ_APPLICATION_JSON;
default:
return "Unknown";
}
}
};
enum eRMQMessageType enum eRMQMessageType
{ {
...@@ -18,9 +62,9 @@ enum eRMQMessageType ...@@ -18,9 +62,9 @@ enum eRMQMessageType
E_MESSAGE_TYPE_BINARY E_MESSAGE_TYPE_BINARY
}; };
enum eRMQMessageDataFormat enum eRMQMessageRestDataFormat
{ {
E_MESSAGE_DATA_TEXT, E_MESSAGE_DATA_TEXT_PLAIN,
E_MESSAGE_DATA_JSON, E_MESSAGE_DATA_JSON,
E_MESSAGE_DATA_XML E_MESSAGE_DATA_XML
}; };
......
...@@ -14,15 +14,15 @@ cRMQ_Result getAmqpRpcError(amqp_rpc_reply_t x, char const *context) ...@@ -14,15 +14,15 @@ cRMQ_Result getAmqpRpcError(amqp_rpc_reply_t x, char const *context)
switch (x.reply_type) switch (x.reply_type)
{ {
case AMQP_RESPONSE_NORMAL: case AMQP_RESPONSE_NORMAL:
return cRMQ_Result(RMQ_SUCCESS); return cRMQ_Result(cRMQ_ResultCode::RMQ_SUCCESS);
case AMQP_RESPONSE_NONE: case AMQP_RESPONSE_NONE:
printf(ba_Msg, "%s: missing RPC reply type!\n", context); printf(ba_Msg, "%s: missing RPC reply type!\n", context);
return cRMQ_Result(E_RMQ_NO_RESPONSE, ba_Msg); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_NO_RESPONSE, ba_Msg);
case AMQP_RESPONSE_LIBRARY_EXCEPTION: case AMQP_RESPONSE_LIBRARY_EXCEPTION:
printf(ba_Msg, "%s: %s\n", context, amqp_error_string2(x.library_error)); printf(ba_Msg, "%s: %s\n", context, amqp_error_string2(x.library_error));
return cRMQ_Result(E_RMQ_RESPONSE_LIBRARY_EXCEPTION, ba_Msg); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_RESPONSE_LIBRARY_EXCEPTION, ba_Msg);
case AMQP_RESPONSE_SERVER_EXCEPTION: case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) switch (x.reply.id)
...@@ -48,8 +48,8 @@ cRMQ_Result getAmqpRpcError(amqp_rpc_reply_t x, char const *context) ...@@ -48,8 +48,8 @@ cRMQ_Result getAmqpRpcError(amqp_rpc_reply_t x, char const *context)
context, x.reply.id); context, x.reply.id);
break; break;
} }
return cRMQ_Result(E_RMQ_RESPONSE_SERVER_EXCEPTION, ba_Msg); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_RESPONSE_SERVER_EXCEPTION, ba_Msg);
default: default:
return cRMQ_Result(E_RMQ_UNKNOWN_SERVER_ERROR, ""); return cRMQ_Result(cRMQ_ResultCode::E_RMQ_UNKNOWN_SERVER_ERROR, "");
} }
} }
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
std::string s_msg;
char const *hostname; char const *hostname;
int port, status; int port, status;
char const *exchange; char const *exchange;
...@@ -28,7 +29,7 @@ int main(int argc, char** argv) ...@@ -28,7 +29,7 @@ int main(int argc, char** argv)
amqp_socket_t *socket = NULL; amqp_socket_t *socket = NULL;
amqp_connection_state_t conn; amqp_connection_state_t conn;
if (argc < 6) { if (argc < 5) {
fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody\n"); fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody\n");
return 1; return 1;
} }
...@@ -39,30 +40,20 @@ int main(int argc, char** argv) ...@@ -39,30 +40,20 @@ int main(int argc, char** argv)
routingkey = argv[4]; routingkey = argv[4];
messagebody = argv[5]; messagebody = argv[5];
if (messagebody == NULL) //if (messagebody == NULL)
{ {
// { s_msg = "{";
// "protocol": "http/https",
// "method": "POST/GET...",
// "domain": "ip:port",
// "uri": "a/b/c",
// "queryParams": "name1=value1&name2=value2",
// "content": "xxxxxxx",
// "headers": ["n1=v1", "n2=v2"]
// }
std::string s_msg = "{";
s_msg += "\"protocol\": \"http\","; s_msg += "\"protocol\": \"http\",";
s_msg += "\"method\": \"POST\","; s_msg += "\"method\": \"POST\",";
s_msg += "\"domain\": \"172.16.1.151:8081\","; s_msg += "\"domain\": \"172.16.1.151:8081\",";
s_msg += "\"uri\": \"publicSafety/doAction\","; s_msg += "\"uri\": \"/publicSafety/doAction\",";
s_msg += "\"queryParams\": \"name1=value1&name2=value2\","; s_msg += "\"queryParams\": \"name1=value1&name2=value2\",";
s_msg += "\"content\": \"playAudio\","; s_msg += "\"content\": \"playAudio\",";
s_msg += "\"headers\": [\"n1=v1\", \"n2=v2\"]"; s_msg += "\"headers\": [\"n1=v1\", \"n2=v2\"]";
s_msg += "}"; s_msg += "}";
messagebody = s_msg.c_str(); messagebody = s_msg.c_str();
} }
conn = amqp_new_connection(); conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn); socket = amqp_tcp_socket_new(conn);
...@@ -82,9 +73,20 @@ int main(int argc, char** argv) ...@@ -82,9 +73,20 @@ int main(int argc, char** argv)
{ {
amqp_basic_properties_t props; amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG;
props.content_type = amqp_cstring_bytes("text/plain"); //props.content_type = amqp_cstring_bytes("text/plain");
props.content_type = amqp_cstring_bytes("application/json");
props.delivery_mode = 2; /* persistent delivery mode */ props.delivery_mode = 2; /* persistent delivery mode */
// add example header
// amqp_table_t *table = &props.headers;
// props.headers.num_entries = 1;
// props.headers.entries = (amqp_table_entry_t_* )calloc(props.headers.num_entries, sizeof(amqp_table_entry_t));
// (table->entries[0]).key = amqp_cstring_bytes("key1");
// ((table->entries[0]).value).kind = AMQP_FIELD_KIND_I32;
// ((table->entries[0]).value).value.i32 = 1234;
die_on_error(amqp_basic_publish(conn, die_on_error(amqp_basic_publish(conn,
1, 1,
amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchange),
...@@ -93,7 +95,7 @@ int main(int argc, char** argv) ...@@ -93,7 +95,7 @@ int main(int argc, char** argv)
0, 0,
&props, &props,
amqp_cstring_bytes(messagebody)), amqp_cstring_bytes(messagebody)),
"Publishing"); "Publishing");
} }
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment