Commit 83979294 by amir

first time

parent 12c6863b
Pipeline #82 skipped in 0 seconds
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app
Makefile
.make.state
nbproject
build
bin
lib
target/*
CMakeFiles
CMakeCache.txt
cmake_install.cmake
install_manifest.txt
\ No newline at end of file
cmake_minimum_required(VERSION 2.8.12)
project(Rabbitmq)
# version stuff
set (Rabbitmq_VERSION_MAJOR 1)
set (Rabbitmq_VERSION_MINOR 0)
set (Rabbitmq_VERSION_PATCH 0)
set(Rabbitmq_VERSION_STRING ${Rabbitmq_VERSION_MAJOR}.${Rabbitmq_VERSION_MINOR}.${Rabbitmq_VERSION_PATCH})
# type build flags
set(CMAKE_BUILD_TYPE Release)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -m64 -g -Wall")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/target)
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/target)
# linked libs and their locations
set ( PROJECT_LINK_LIBS -ljson -lpthread -lrabbitmq )
link_directories( ../3party/lib )
# h files locations
include_directories(src)
include_directories(SYSTEM ../3party/rapidjson-0.11/include/rapidjson)
# recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp")
#set (3PARTY_SOURCES ../3party/mongoose/mongoose.c)
#Generate the shared library from the sources
add_library(Rabbitmq SHARED ${SOURCES} ${3PARTY_SOURCES})
target_link_libraries(Rabbitmq ${PROJECT_LINK_LIBS} )
set_target_properties(Rabbitmq PROPERTIES VERSION ${Rabbitmq_VERSION_STRING}
SOVERSION ${Rabbitmq_VERSION_MAJOR})
# Test part
#set (RabbitmqRecv_TEST_SOURCES test/RabbitMQRecv.cpp test/utils.cpp)
add_executable(test_RabbitmqRecv test/RabbitMQRecv.cpp test/utils.cpp) #EXCLUDE_FROM_ALL ${Rabbitmq_TEST_SOURCES})
target_link_libraries (test_RabbitmqRecv Rabbitmq)
add_executable(test_RabbitmqSend test/RabbitMQSend.cpp test/utils.cpp) #EXCLUDE_FROM_ALL ${Rabbitmq_TEST_SOURCES})
target_link_libraries (test_RabbitmqSend Rabbitmq)
# install part
set (CMAKE_INSTALL_PREFIX ../internals)
file (GLOB INSTALL_FILES "src/*.h")
install(TARGETS Rabbitmq DESTINATION lib)
install(FILES ${INSTALL_FILES} DESTINATION include/Rabbitmq)
[
{
"directory": "/home/amir/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -DRabbitmq_EXPORTS -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -fPIC -I/home/amir/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/amir/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/Rabbitmq.dir/src/RMQ_Utils.cpp.o -c /home/amir/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_Utils.cpp",
"file": "/home/amir/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_Utils.cpp"
},
{
"directory": "/home/amir/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -DRabbitmq_EXPORTS -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -fPIC -I/home/amir/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/amir/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/Rabbitmq.dir/src/RMQ_Server.cpp.o -c /home/amir/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_Server.cpp",
"file": "/home/amir/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_Server.cpp"
},
{
"directory": "/home/amir/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -DRabbitmq_EXPORTS -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -fPIC -I/home/amir/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/amir/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/Rabbitmq.dir/src/RMQ_RestParser.cpp.o -c /home/amir/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_RestParser.cpp",
"file": "/home/amir/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_RestParser.cpp"
},
{
"directory": "/home/amir/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -DRabbitmq_EXPORTS -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -fPIC -I/home/amir/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/amir/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/Rabbitmq.dir/src/RMQ_Channel.cpp.o -c /home/amir/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_Channel.cpp",
"file": "/home/amir/git/ipgallery/common/cpp/Rabbitmq/src/RMQ_Channel.cpp"
},
{
"directory": "/home/amir/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -I/home/amir/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/amir/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/test_RabbitmqRecv.dir/test/RabbitMQRecv.cpp.o -c /home/amir/git/ipgallery/common/cpp/Rabbitmq/test/RabbitMQRecv.cpp",
"file": "/home/amir/git/ipgallery/common/cpp/Rabbitmq/test/RabbitMQRecv.cpp"
},
{
"directory": "/home/amir/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -I/home/amir/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/amir/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/test_RabbitmqRecv.dir/test/utils.cpp.o -c /home/amir/git/ipgallery/common/cpp/Rabbitmq/test/utils.cpp",
"file": "/home/amir/git/ipgallery/common/cpp/Rabbitmq/test/utils.cpp"
},
{
"directory": "/home/amir/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -I/home/amir/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/amir/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/test_RabbitmqSend.dir/test/RabbitMQSend.cpp.o -c /home/amir/git/ipgallery/common/cpp/Rabbitmq/test/RabbitMQSend.cpp",
"file": "/home/amir/git/ipgallery/common/cpp/Rabbitmq/test/RabbitMQSend.cpp"
},
{
"directory": "/home/amir/git/ipgallery/common/cpp/Rabbitmq",
"command": "/usr/bin/g++ -std=c++11 -m64 -g -Wall -O3 -DNDEBUG -I/home/amir/git/ipgallery/common/cpp/Rabbitmq/src -isystem /home/amir/git/ipgallery/common/cpp/Rabbitmq/../3party/rapidjson-0.11/include/rapidjson -o CMakeFiles/test_RabbitmqSend.dir/test/utils.cpp.o -c /home/amir/git/ipgallery/common/cpp/Rabbitmq/test/utils.cpp",
"file": "/home/amir/git/ipgallery/common/cpp/Rabbitmq/test/utils.cpp"
}
]
\ No newline at end of file
#!/bin/sh
#
# File: install-dependencies.sh
# Author: amir
#
# Created on May 8, 2016, 9:59:18 AM
#
sudo apt-get install -y librabbitmq-dev
\ No newline at end of file
/**
* @file RMQ_Channel.cpp
* @brief contains all rabbit nq related interfaces
* @author Adi Amir
*/
//#include <SysAdmin_LogApi.h>
#include <stringbuffer.h> //rapidjson string
#include <writer.h> //rapidjson writer
#include "RMQ_Channel.h"
#include "RMQ_Message.h"
#include "RMQ_Utils.h"
cRMQ_Result cRMQ_Channel::OpenChannel(const char* pba_Host, int i_Port, cRMQ_IParser* pc_Parser)
{
mpc_Parser = pc_Parser;
m_amqpConnection = amqp_new_connection();
if (!m_amqpConnection)
return cRMQ_Result(E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_new_connection() failed in cRMQ_Channel::OpenChannel");
mp_amqpSocket = amqp_tcp_socket_new(m_amqpConnection);
if (!mp_amqpSocket)
return cRMQ_Result(E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_tcp_socket_new() failed in cRMQ_Channel::OpenChannel");
int ret = amqp_socket_open(mp_amqpSocket, pba_Host, i_Port);
if (ret != 0)
return cRMQ_Result(E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_socket_open() failed in cRMQ_Channel::OpenChannel");
amqp_rpc_reply_t amqpRet = amqp_login(m_amqpConnection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
if (amqpRet.reply_type != AMQP_RESPONSE_NORMAL)
return cRMQ_Result(E_RMQ_FAILED_TO_OPEN_CHANNEL, "amqp_login() failed in cRMQ_Channel::OpenChannel");
amqp_channel_open(m_amqpConnection, 1);
cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenChannel");
if (rmqResult.getCode() == RMQ_SUCCESS)
mb_ChannelOpenedSuccessfully = true;
return rmqResult;
}
cRMQ_Result cRMQ_Channel::CloseChannel()
{
cRMQ_Result rmqResult(RMQ_SUCCESS);
amqp_channel_close(m_amqpConnection, 1, AMQP_REPLY_SUCCESS);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS)
return rmqResult;
amqp_connection_close(m_amqpConnection, AMQP_REPLY_SUCCESS);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS)
return rmqResult;
amqp_destroy_connection(m_amqpConnection);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS)
return rmqResult;
return cRMQ_Result(RMQ_SUCCESS);
}
cRMQ_Result cRMQ_Channel::OpenQueue(const char* pba_Exchange, const char* bindingkey) {
if (mb_ChannelOpenedSuccessfully == false)
return cRMQ_Result(E_RMQ_CHANNEL_NOT_OPENED, "cRMQ_Channel::OpenQueue() failed");
// declare queue
amqp_queue_declare_ok_t *r = amqp_queue_declare(m_amqpConnection, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table);
cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS)
return rmqResult;
// bind queue
amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue);
if (queueName.bytes == NULL)
return cRMQ_Result(E_RMQ_OUT_OF_MEMORY, "cRMQ_Channel::OpenQueue() failed while copying queue name");
amqp_queue_bind(m_amqpConnection, 1, queueName, amqp_cstring_bytes(pba_Exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS)
return rmqResult;
// basic consume
amqp_basic_consume(m_amqpConnection, 1, queueName, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::OpenQueue");
if (rmqResult.getCode() != RMQ_SUCCESS)
return rmqResult;
return cRMQ_Result(RMQ_SUCCESS);
}
cRMQ_Result cRMQ_Channel::RecieveMessage(cRMQ_Message* pc_Message)
{
amqp_maybe_release_buffers(m_amqpConnection);
amqp_rpc_reply_t res = amqp_consume_message(m_amqpConnection, &mc_envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL == res.reply_type)
{
cRMQ_Result result = mpc_Parser->parse(pc_Message, &mc_envelope);
amqp_destroy_envelope(&mc_envelope);
return result;
}
else
{
amqp_destroy_envelope(&mc_envelope);
cRMQ_Result rmqResult = getAmqpRpcError(amqp_get_rpc_reply(m_amqpConnection), "cRMQ_Channel::RecieveMessage");
return rmqResult;
}
}
cRMQ_Result cRMQ_Channel::SendMessage(cRMQ_Message* pc_Message)
{
// TBD
return cRMQ_Result(RMQ_SUCCESS);
}
/**
* @file RMQ_Channel.h
* @brief A class helper for sending & receiving service requests tp/from the CZ services
* @author Adi Amir
*/
#ifndef _RMQ_CHANNEL_H
#define _RMQ_CHANNEL_H
//#include <General_Def.h>
#include <string>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "RMQ_Result.h"
#include "RMQ_Message.h"
#include "RMQ_IParser.h"
class cRMQ_Channel
{
protected:
amqp_socket_t* mp_amqpSocket = NULL;
amqp_connection_state_t m_amqpConnection;
amqp_bytes_t m_Queuename;
amqp_envelope_t mc_envelope;
cRMQ_IParser* mpc_Parser;
bool mb_ChannelOpenedSuccessfully;
public:
cRMQ_Channel()
{
mpc_Parser = nullptr;
mp_amqpSocket = nullptr;
m_amqpConnection = nullptr;
m_Queuename.len = 0;
mb_ChannelOpenedSuccessfully = false;
}
cRMQ_Result OpenChannel(const char* pba_Host, int i_Port, cRMQ_IParser* pc_Parser);
cRMQ_Result CloseChannel();
cRMQ_Result OpenQueue(const char* pba_Exchange, const char* bindingkey);
cRMQ_Result RecieveMessage(cRMQ_Message* pc_Message);
cRMQ_Result SendMessage(cRMQ_Message* pc_Message);
};
#endif // _RMQ_CHANNEL_H
/**
* @file Microservice_IParser.h
* @brief A class helper for sending & receiving service requests tp/from the CZ services
* @author Adi Amir
*/
#ifndef _MICROSERVICE_IPARSER
#define _MICROSERVICE_IPARSER
//#include <General_Def.h>
#include <string>
#include "amqp.h"
#include "RMQ_Result.h"
#include "RMQ_Message.h"
class cRMQ_IParser
{
protected:
public:
cRMQ_IParser()
{
}
virtual ~cRMQ_IParser() {};
virtual cRMQ_Result parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope) = 0;
};
#endif // _MICROSERVICE_IPARSER
/**
* @file RMQ_Message.h
* @brief This is an abstract class. you must override it.
* @author Adi Amir
*/
#ifndef _RMQ_MESSAGE_H
#define _RMQ_MESSAGE_H
//#include <General_Def.h>
#include <string>
#include "RMQ_Types.h"
class cRMQ_Message
{
protected:
eRMQMessageType me_Type;
public:
cRMQ_Message()
{
me_Type = E_MESSAGE_TYPE_UNKNOWN;
}
};
#endif // _RMQ_MESSAGE_H
/**
* @file RMQ_Message.h
* @brief A warpper for rabbit message
* @author Adi Amir
*/
#ifndef _RMQ_MESSAGE_REST_H
#define _RMQ_MESSAGE_REST_H
//#include <General_Def.h>
#include <string>
#include <vector>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "RMQ_Utils.h"
#include "RMQ_Message.h"
class cRMQ_MessageRest: public cRMQ_Message
{
protected:
std::string protocol;
std::string method;
std::string domain;
std::string uri;
std::string queryParams;
std::string content;
std::vector<cNameValuePair> headers;
public:
cRMQ_MessageRest()
{
}
// protocol
const std::string& getProtocol() const {
return protocol;
}
void setProtocol(const std::string& protocol) {
this->protocol = protocol;
}
// get/set method
const std::string& getMethod() const {
return method;
}
void setMethod(const std::string& method) {
this->method = method;
}
// get/set domain
const std::string& getDomain() const {
return domain;
}
void setDomain(const std::string& domain) {
this->domain = domain;
}
// get/set Uri
const std::string& getUri() const {
return uri;
}
void setUri(const std::string& uri) {
this->uri = uri;
}
// get/set query parameters
const std::string& getQueryParams() const {
return queryParams;
}
void setQueryParams(const std::string& queryParams) {
this->queryParams = queryParams;
}
// get/set content
const std::string& getContent() const {
return content;
}
void setContent(const std::string& content) {
this->content = content;
}
// get/set headers
const std::vector<cNameValuePair>& getHeaders() const {
return headers;
}
void setHeaders(const std::vector<cNameValuePair>& headers) {
this->headers = headers;
}
cNameValuePair& getHeader(int index)
{
return this->headers.at(index);
}
void setHeader(cNameValuePair header) {
this->headers.push_back(header);
}
};
#endif // _RMQ_MESSAGE_REST_H
/**
* @file RMQ_Message.h
* @brief A warpper for rabbit message
* @author Adi Amir
*/
#ifndef _RMQ_MESSAGE_TEXT_H
#define _RMQ_MESSAGE_TEXT_H
#include <General_Def.h>
#include <string>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "RMQ_Message.h"
#include "RMQ_Types.h"
class cRMQ_MessageText: public cRMQ_Message
{
protected:
std::string content;
std::vector<cNameValuePair> headers;
public:
cRMQ_MessageText()
{
content = "";
}
// get/set content
const std::string& getContent() const {
return content;
}
void setContent(const std::string& content) {
this->content = content;
}
};
#endif // _RMQ_MESSAGE_TEXT_H
/**
* @file RMQ_RestParser.cpp
* @brief message parser for rest based messages
* @author Adi Amir
*/
//#include <SysAdmin_LogApi.h>
#include "RMQ_Types.h"
#include "RMQ_RestParser.h"
#include "RMQ_MessageRest.h"
cRMQ_RestParser::cRMQ_RestParser(eRMQMessageDataFormat e_RMQMessageDataFormat)
{
me_RMQMessageDataFormat = e_RMQMessageDataFormat;
}
cRMQ_Result cRMQ_RestParser::parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope)
{
cRMQ_MessageRest* p_Msg = (cRMQ_MessageRest *)pc_Message;
rapidjson::Document rpj_Content;
std::string content( reinterpret_cast<char const*>(pc_Envelope->message.body.bytes), pc_Envelope->message.body.len);
// initial parse
if (rpj_Content.Parse<0>(content.c_str()).HasParseError())
{
return cRMQ_Result(E_RMQ_PARSER_FAILED);
}
// protocol
std::string s_protocol = getNodeAsString(rpj_Content, "protocol");
if (s_protocol == "")
return cRMQ_Result(E_RMQ_PARSER_FAILED);
p_Msg->setProtocol(s_protocol);
// method
std::string s_method = getNodeAsString(rpj_Content, "method");
if (s_method == "")
return cRMQ_Result(E_RMQ_PARSER_FAILED);
p_Msg->setMethod(s_method);
// domain
std::string s_domain = getNodeAsString(rpj_Content, "domain");
if (s_domain == "")
return cRMQ_Result(E_RMQ_PARSER_FAILED);
p_Msg->setDomain(s_domain);
// Uri
std::string s_uri = getNodeAsString(rpj_Content, "uri");
if (s_uri == "")
return cRMQ_Result(E_RMQ_PARSER_FAILED);
p_Msg->setUri(s_uri);
// requestParams (optional)
std::string s_queryParams = getNodeAsString(rpj_Content, "queryParams");
p_Msg->setQueryParams(s_queryParams);
// content (optional)
std::string s_content = getNodeAsString(rpj_Content, "content");
p_Msg->setContent(s_content);
// headers
const rapidjson::Value& rpj_headerArr = rpj_Content["headers"];
for (rapidjson::SizeType i = 0; i < rpj_headerArr.Size(); i++) // rapidjson uses SizeType instead of size_t.
{
const rapidjson::Value& rpj_Header = rpj_headerArr[i];
std::string s_header = rpj_Header.GetString();
cNameValuePair* pc_Header = parseHeader(s_header);
if (pc_Header != NULL)
p_Msg->setHeader(*pc_Header);
}
return cRMQ_Result(RMQ_SUCCESS);
}
std::string cRMQ_RestParser::getNodeAsString(rapidjson::Document& rpj_Obj, const char* pba_NodeName)
{
if(rpj_Obj.HasMember(pba_NodeName))
{
if(rpj_Obj[pba_NodeName].IsString())
{
return rpj_Obj[pba_NodeName].GetString();
}
}
return "";
}
cNameValuePair* cRMQ_RestParser::parseHeader(std::string s_header)
{
char *saveptr, *token;
char* pba_HeaderName = strtok_r((char *)s_header.c_str(), "=", &saveptr);
if (pba_HeaderName != NULL)
{
char* pba_HeaderValue = strtok_r(NULL, "=", &saveptr);
if (pba_HeaderValue != NULL)
return new cNameValuePair(pba_HeaderName, pba_HeaderValue);
}
return NULL;
}
/**
* @file Microservice_RestParser.h
* @brief A class helper for sending & receiving service requests tp/from the CZ services
* @author Adi Amir
*/
#ifndef _MICROSERVICE_REST_PARSER
#define _MICROSERVICE_REST_PARSER
//#include <General_Def.h>
#include <string>
#include <stringbuffer.h> //rapidjson string
#include <writer.h> //rapidjson writer
#include <document.h>
#include "RMQ_IParser.h"
#include "RMQ_Types.h"
#include "RMQ_Message.h"
/** Rest message format
{
"protocol": "http/https",
"method": "POST/GET...",
"domain": "ip:port",
"uri": "a/b/c",
"queryParams": "name1=value1&name2=value2",
"content": "xxxxxxx",
"headers": ["name1=value1", "name2=value2"],
"cookies": ["a", "b", "c"]
}
**/
class cRMQ_RestParser : public cRMQ_IParser
{
protected:
eRMQMessageDataFormat me_RMQMessageDataFormat;
public:
cRMQ_RestParser(eRMQMessageDataFormat e_RMQMessageDataFormat);
virtual ~cRMQ_RestParser() {};
virtual cRMQ_Result parse(cRMQ_Message* pc_Message, amqp_envelope_t* pc_Envelope);
protected:
std::string getNodeAsString(rapidjson::Document& rpj_Obj, const char* pba_NodeName);
cNameValuePair* parseHeader(std::string s_header);
};
#endif // _MICROSERVICE_REST_PARSER
/**
* @file RMQ_Result.h
* @brief result structure to be returned by all API(s)
* @author Adi Amir
*/
#ifndef _RMQ_RESULT_H
#define _RMQ_RESULT_H
//#include <General_Def.h>
#include <string>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
enum eRMQ_Code {
RMQ_SUCCESS,
E_RMQ_FAILED_TO_OPEN_CHANNEL,
E_RMQ_CHANNEL_NOT_OPENED,
E_RMQ_NO_RESPONSE,
E_RMQ_RESPONSE_LIBRARY_EXCEPTION,
E_RMQ_RESPONSE_SERVER_EXCEPTION,
E_RMQ_UNKNOWN_SERVER_ERROR,
E_RMQ_OUT_OF_MEMORY,
E_RMQ_PARSER_FAILED,
E_RMQ_UNINITIALIZED
};
class cRMQ_Result
{
protected:
eRMQ_Code me_Code;
std::string ms_Error;
public:
cRMQ_Result()
{
me_Code = E_RMQ_UNINITIALIZED;
ms_Error = "";
}
cRMQ_Result(eRMQ_Code e_Code)
{
me_Code = e_Code;
ms_Error = "";
}
cRMQ_Result(eRMQ_Code e_Code, char* pba_error)
{
me_Code = e_Code;
ms_Error = pba_error;
}
eRMQ_Code getCode() { return me_Code; };
std::string getError() { return ms_Error; };
};
#endif // _RMQ_RESULT_H
/**
* @file RMQ_Channel.cpp
* @brief contains all rabbit nq related interfaces
* @author Adi Amir
*/
//#include <SysAdmin_LogApi.h>
#include <stringbuffer.h> //rapidjson string
#include <writer.h> //rapidjson writer
#include "RMQ_Server.h"
#include "RMQ_Types.h"
cRMQ_Server::cRMQ_Server() : mc_Channel()
{
}
cRMQ_Result cRMQ_Server::Init(const char* pba_Host, int i_Port, const char* pba_exchange, const char* BindingKey, cRMQ_IParser* pc_MsgParser)
{
cRMQ_Result result;
mpc_MsgParser = pc_MsgParser;
// open channel
result = mc_Channel.OpenChannel(pba_Host, i_Port, mpc_MsgParser);
if (result.getCode() != RMQ_SUCCESS)
return result;
// open queue
result = mc_Channel.OpenQueue(pba_exchange, BindingKey);
if (result.getCode() != RMQ_SUCCESS)
return result;
return cRMQ_Result(RMQ_SUCCESS);
}
void cRMQ_Server::Destroy()
{
mc_Channel.CloseChannel();
}
cRMQ_Result cRMQ_Server::RecieveMessage(cRMQ_Message* pc_Message)
{
return mc_Channel.RecieveMessage(pc_Message);
}
/**
* @file RMQ_IParser.h
* @brief A class helper for sending & receiving service requests tp/from the CZ services
* @author Adi Amir
*/
#ifndef _RMQ_SERVER_H
#define _RMQ_SERVER_H
//#include <General_Def.h>
#include <string>
#include "RMQ_Channel.h"
#include "RMQ_IParser.h"
class cRMQ_Server
{
protected:
cRMQ_Channel mc_Channel;
cRMQ_IParser* mpc_MsgParser;
public:
cRMQ_Server();
cRMQ_Result Init(const char* pba_Host, int i_Port, const char* pba_exchange, const char* BindingKey, cRMQ_IParser* pc_MsgParser);
void Destroy();
cRMQ_Channel* getChannel() { return &mc_Channel; }
cRMQ_Result RecieveMessage(cRMQ_Message* pc_Message);
};
#endif // _RMQ_SERVER_H
/**
* @file RMQ_Message.h
* @brief A warpper for rabbit message
* @author Adi Amir
*/
#ifndef _RMQ_TYPES_H
#define _RMQ_TYPES_H
//#include <General_Def.h>
#include <string>
enum eRMQMessageType
{
E_MESSAGE_TYPE_UNKNOWN,
E_MESSAGE_TYPE_TEXT,
E_MESSAGE_TYPE_REST,
E_MESSAGE_TYPE_BINARY
};
enum eRMQMessageDataFormat
{
E_MESSAGE_DATA_TEXT,
E_MESSAGE_DATA_JSON,
E_MESSAGE_DATA_XML
};
class cNameValuePair
{
public:
std::string m_name;
std::string m_value;
public:
cNameValuePair(std::string name, std::string value)
{
this->m_name = name;
this->m_value = value;
}
};
#endif // _RMQ_TYPES_H
/**
* @file RMQ_Utils.cpp
* @brief Utils
* @author Adi Amir
*/
//#include <SysAdmin_LogApi.h>
#include "RMQ_Utils.h"
cRMQ_Result getAmqpRpcError(amqp_rpc_reply_t x, char const *context)
{
char ba_Msg[1024];
switch (x.reply_type)
{
case AMQP_RESPONSE_NORMAL:
return cRMQ_Result(RMQ_SUCCESS);
case AMQP_RESPONSE_NONE:
printf(ba_Msg, "%s: missing RPC reply type!\n", context);
return cRMQ_Result(E_RMQ_NO_RESPONSE, ba_Msg);
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
printf(ba_Msg, "%s: %s\n", context, amqp_error_string2(x.library_error));
return cRMQ_Result(E_RMQ_RESPONSE_LIBRARY_EXCEPTION, ba_Msg);
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id)
{
case AMQP_CONNECTION_CLOSE_METHOD:
{
amqp_connection_close_t *m =
(amqp_connection_close_t *) x.reply.decoded;
printf(ba_Msg, "%s: server connection error %d, message: %.*s\n",
context, m->reply_code, (int) m->reply_text.len,
(char *) m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
printf(ba_Msg, "%s: server channel error %d, message: %.*s\n",
context, m->reply_code, (int) m->reply_text.len,
(char *) m->reply_text.bytes);
break;
}
default:
printf(ba_Msg, "%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
break;
}
return cRMQ_Result(E_RMQ_RESPONSE_SERVER_EXCEPTION, ba_Msg);
default:
return cRMQ_Result(E_RMQ_UNKNOWN_SERVER_ERROR, "");
}
}
/**
* @file MRC.h
* @brief A class helper for sending & receiving service requests tp/from the CZ services
* @author Adi Amir
*/
#ifndef _RMQ_UTILS_H
#define _RMQ_UTILS_H
//#include <General_Def.h>
#include <string>
#include <amqp.h>
#include "RMQ_Result.h"
cRMQ_Result getAmqpRpcError(amqp_rpc_reply_t x, char const *context);
#endif // _RMQ_UTILS_H
// ShAppSim.cpp
#include <stdio.h>
//#include <General_Def.h>
//#include <SysAdmin_Manager.h>
//#include <SysAdmin_LogApi.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "utils.h"
int main(int argc, char** argv)
{
char const *hostname;
int port, status;
char const *exchange;
char const *bindingkey;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
amqp_bytes_t queuename;
if (argc < 5) {
fprintf(stderr, "Usage: amqp_listen host port exchange bindingkey\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
bindingkey = argv[4];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
fprintf(stderr, "Out of memory while copying queue name");
return 1;
}
}
amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
while (1) {
amqp_rpc_reply_t res;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(conn);
res = amqp_consume_message(conn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != res.reply_type) {
break;
}
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
(unsigned) envelope.delivery_tag,
(int) envelope.exchange.len, (char *) envelope.exchange.bytes,
(int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
(int) envelope.message.properties.content_type.len,
(char *) envelope.message.properties.content_type.bytes);
}
printf("----\n");
amqp_dump(envelope.message.body.bytes, envelope.message.body.len);
amqp_destroy_envelope(&envelope);
}
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}
// ShAppSim.cpp
#include <stdio.h>
//#include <General_Def.h>
//#include <SysAdmin_Manager.h>
//#include <SysAdmin_LogApi.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include <string>
#include "utils.h"
int main(int argc, char** argv)
{
char const *hostname;
int port, status;
char const *exchange;
char const *routingkey;
char const *messagebody;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
fprintf(stderr, "Usage: amqp_sendstring host port exchange routingkey messagebody\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
routingkey = argv[4];
messagebody = argv[5];
if (messagebody == NULL)
{
// {
// "protocol": "http/https",
// "method": "POST/GET...",
// "domain": "ip:port",
// "uri": "a/b/c",
// "queryParams": "name1=value1&name2=value2",
// "content": "xxxxxxx",
// "headers": ["n1=v1", "n2=v2"]
// }
std::string s_msg = "{";
s_msg += "\"protocol\": \"http\",";
s_msg += "\"method\": \"POST\",";
s_msg += "\"domain\": \"172.16.1.151:8081\",";
s_msg += "\"uri\": \"publicSafety/doAction\",";
s_msg += "\"queryParams\": \"name1=value1&name2=value2\",";
s_msg += "\"content\": \"playAudio\",";
s_msg += "\"headers\": [\"n1=v1\", \"n2=v2\"]";
s_msg += "}";
messagebody = s_msg.c_str();
}
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.content_type = amqp_cstring_bytes("text/plain");
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey),
0,
0,
&props,
amqp_cstring_bytes(messagebody)),
"Publishing");
}
die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
return(EXIT_SUCCESS);
}
./RabbitMQSend localhost 5672 test1 MyFirstQ BBB
./RabbitMQRecv localhost 5672 test1 MyFirstQ
setenv LD_LIBRARY_PATH /home/adi/git/rabbitmq-c/librabbitmq/.libs
g++ -g -Wall -I/home/adi/git/rabbitmq-c/librabbitmq -L/home/adi/git/rabbitmq-c/librabbitmq/.libs -o test amqp_sendstring.c utils.c -lrabbitmq
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <stdint.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "utils.h"
void die(const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
fprintf(stderr, "\n");
exit(1);
}
void die_on_error(int x, char const *context)
{
if (x < 0) {
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x));
exit(1);
}
}
void die_on_amqp_error(amqp_rpc_reply_t x, char const *context)
{
switch (x.reply_type) {
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf(stderr, "%s: missing RPC reply type!\n", context);
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD: {
amqp_connection_close_t *m = (amqp_connection_close_t *) x.reply.decoded;
fprintf(stderr, "%s: server connection error %d, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
case AMQP_CHANNEL_CLOSE_METHOD: {
amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded;
fprintf(stderr, "%s: server channel error %d, message: %.*s\n",
context,
m->reply_code,
(int) m->reply_text.len, (char *) m->reply_text.bytes);
break;
}
default:
fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id);
break;
}
break;
}
exit(1);
}
static void dump_row(long count, int numinrow, int *chs)
{
int i;
printf("%08lX:", count - numinrow);
if (numinrow > 0) {
for (i = 0; i < numinrow; i++) {
if (i == 8) {
printf(" :");
}
printf(" %02X", chs[i]);
}
for (i = numinrow; i < 16; i++) {
if (i == 8) {
printf(" :");
}
printf(" ");
}
printf(" ");
for (i = 0; i < numinrow; i++) {
if (isprint(chs[i])) {
printf("%c", chs[i]);
} else {
printf(".");
}
}
}
printf("\n");
}
static int rows_eq(int *a, int *b)
{
int i;
for (i=0; i<16; i++)
if (a[i] != b[i]) {
return 0;
}
return 1;
}
void amqp_dump(void const *buffer, size_t len)
{
unsigned char *buf = (unsigned char *) buffer;
long count = 0;
int numinrow = 0;
int chs[16];
int oldchs[16] = {0};
int showed_dots = 0;
size_t i;
for (i = 0; i < len; i++) {
int ch = buf[i];
if (numinrow == 16) {
int i;
if (rows_eq(oldchs, chs)) {
if (!showed_dots) {
showed_dots = 1;
printf(" .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n");
}
} else {
showed_dots = 0;
dump_row(count, numinrow, chs);
}
for (i=0; i<16; i++) {
oldchs[i] = chs[i];
}
numinrow = 0;
}
count++;
chs[numinrow++] = ch;
}
dump_row(count, numinrow, chs);
if (numinrow != 0) {
printf("%08lX:\n", count);
}
}
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
#ifndef librabbitmq_examples_utils_h
#define librabbitmq_examples_utils_h
/*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
void die(const char *fmt, ...);
extern void die_on_error(int x, char const *context);
extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context);
extern void amqp_dump(void const *buffer, size_t len);
extern uint64_t now_microseconds(void);
extern void microsleep(int usec);
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment