Commit db128b5a by amir

Add content to post , Add SchedularTimer

parent abae72be
......@@ -27,7 +27,7 @@ include_directories(SYSTEM ../3party/mongoose)
include_directories(SYSTEM /usr/include/hiredis)
# recursive search files cpp files
file(GLOB_RECURSE SOURCES "src/*.cpp")
set (3PARTY_SOURCES ../3party/mongoose/mongoose.c)
set (3PARTY_SOURCES ../3party/mongoose/mongoose.c src/utils/ScheduledTimer.cpp src/utils/ScheduledTimer.h)
#Generate the shared library from the sources
add_library(Microservice SHARED ${SOURCES} ${3PARTY_SOURCES})
......
......@@ -6,4 +6,4 @@
# Created on May 8, 2016, 9:59:18 AM
#
sudo apt-get install -y libhiredis0.10 libhiredis-dev libzmq3 libzmq3-dev liblog4cpp5 liblog4cpp5-dev \
libgoogle-glog-dev
\ No newline at end of file
libgoogle-glog-dev libev-dev
\ No newline at end of file
......@@ -12,10 +12,13 @@
#include <map>
#include <vector>
#include <document.h>
#include <functional>
#include <chrono>
#include "Microservice_Defines.h"
#include "common/MSTypes.h"
#include "params/MSCommandParams.h"
#include <boost/function.hpp>
class cMicroservice_BaseRestResponse;
class cMicroservice_BaseHandler;
......@@ -140,6 +143,12 @@ namespace nsMicroservice_Iface
virtual void stop() = 0;
};
struct IHistogram
{
virtual void update(long value) = 0;
virtual long getCount() = 0;
virtual void clear() = 0;
};
/**
* must be at the end of init , after the netrics are defined
*/
......@@ -148,8 +157,8 @@ namespace nsMicroservice_Iface
virtual IMeter* createMeter(std::string& name) = 0;
virtual ICounter* createCounter(std::string& name) = 0;
virtual ITimer* createTimer(std::string& name) = 0;
virtual IHistogram* createHistogram(std::string& name) = 0;
virtual void GetMetrics(std::map<std::string,long>& metrics_map) = 0;
};
struct IPubSub
......@@ -287,6 +296,7 @@ namespace nsMicroservice_Iface
virtual bool getByPattern(std::string& pattern,std::vector<std::pair<std::string,std::string>>& retKeyValues) = 0;
virtual bool exists(std::string& key) = 0;
};
}
......
......@@ -53,6 +53,10 @@ IMetricsFactory::ITimer* MSIMetricsFactoryDropwisardImpl::createTimer(std::strin
return new ITimerDropwisardImpl(registry_->timer(name));
}
IMetricsFactory::IHistogram* MSIMetricsFactoryDropwisardImpl::createHistogram(std::string& name) {
return new IHistogramDropwizardImpl(registry_->histogram(name));
}
void MSIMetricsFactoryDropwisardImpl::startReporting()
{
char* graphite_hostport = getenv("graphite_hostport");
......@@ -99,26 +103,26 @@ void MSIMetricsFactoryDropwisardImpl::ITimerDropwisardImpl::stop() {
}
void MSIMetricsFactoryDropwisardImpl::GetMetrics(std::map<std::string, long>& metrics_map) {
std::string str;
for (auto counter : registry_->getCounters())
{
metrics_map["counter." + counter.first] = counter.second->getCount();
}
for (auto meter : registry_->getMeters())
{
register auto meter_ptr = meter.second;
str.assign("meter." + meter.first);
metrics_map[str] = meter_ptr->getCount();
metrics_map[str + "count"] = meter_ptr->getCount();
metrics_map[str + ".mean_rate"] = meter_ptr->getMeanRate();
metrics_map[str + ".1m_rate"] = meter_ptr->getOneMinuteRate();
metrics_map[str + ".5m_rate"] = meter_ptr->getFiveMinuteRate();
metrics_map[str + ".15m_rate"] = meter_ptr->getFifteenMinuteRate();
}
for (auto timer : registry_->getTimers())
{
register auto timer_snapshot_ptr = timer.second->getSnapshot();
......@@ -127,25 +131,35 @@ void MSIMetricsFactoryDropwisardImpl::GetMetrics(std::map<std::string, long>& me
metrics_map[str + ".min(ms)"] = timer_snapshot_ptr->getMin() / NANOS_IN_MILLI;
metrics_map[str + ".max(ms)"] = timer_snapshot_ptr->getMax() / NANOS_IN_MILLI;
metrics_map[str + ".median(ms)"] = timer_snapshot_ptr->getMedian() / NANOS_IN_MILLI;
}
for (auto histogram : registry_->getHistograms())
{
register auto histogram_snapshot_ptr = histogram.second->getSnapshot();
str.assign("histogram." + histogram.first);
metrics_map[str + ".mean_rate"] = histogram_snapshot_ptr->getMean();
metrics_map[str + ".min"] = histogram_snapshot_ptr->getMin();
metrics_map[str + ".max"] = histogram_snapshot_ptr->getMax();
metrics_map[str + ".median"] = histogram_snapshot_ptr->getMedian();
}
}
void MSIMetricsFactoryDropwisardImpl::ConfigureAndStartGraphiteReporter() {
if (!graphite_reporter_)
if (!graphite_reporter_)
{
const std::string& graphite_host(graphite_options_.host_);
boost::uint32_t graphite_port(graphite_options_.port_);
cppmetrics::graphite::GraphiteSenderPtr graphite_sender(
new cppmetrics::graphite::GraphiteSenderTCP(graphite_host, graphite_port));
graphite_reporter_.reset(new cppmetrics::graphite::GraphiteReporter(registry_, graphite_sender,
graphite_options_.prefix_));
p_GraphiteReportThread_ = new std::thread(run_thread,this);
}
else
else
{
printf("Graphite reporter already configured.\n");
}
......@@ -154,7 +168,7 @@ void MSIMetricsFactoryDropwisardImpl::ConfigureAndStartGraphiteReporter() {
void MSIMetricsFactoryDropwisardImpl::ReportToGraphite() {
if(graphite_reporter_)
{
while (s_sig_num == 0)
while (s_sig_num == 0)
{
std::this_thread::sleep_for(std::chrono::seconds(graphite_options_.interval_in_secs_));
((cppmetrics::core::ScheduledReporter*)graphite_reporter_.get())->report();
......
......@@ -41,6 +41,7 @@ public:
IMetricsFactory::ICounter* createCounter(std::string& name) override;
IMetricsFactory::IMeter* createMeter(std::string& name) override;
IMetricsFactory::ITimer* createTimer(std::string& name) override;
IMetricsFactory::IHistogram* createHistogram(std::string& name) override;
void startReporting() override;
void stopReporting() override;
......@@ -112,7 +113,21 @@ public:
cppmetrics::core::TimerPtr timer_;
};
class IHistogramDropwizardImpl: public IHistogram
{
public:
IHistogramDropwizardImpl(cppmetrics::core::HistogramPtr histogram) :
histogram_(histogram) {
}
long getCount() override { return histogram_->getCount(); }
void update(long value) override { histogram_->update(value); }
void clear() override { histogram_->clear(); }
private:
cppmetrics::core::HistogramPtr histogram_;
};
};
......
......@@ -90,7 +90,13 @@ void MSICommandClientHttpImpl::HandleCommand(HandleCommandData* p_cmd_data){
{
p_cmd_data->p_response->Reset();
http_client client(url);
auto request_task = client.request(*p_cmd_data->p_mtd);
//config.set_timeout<std::chrono::seconds>(std::chrono::seconds(2));
http_request request(*p_cmd_data->p_mtd);
request.headers().add(header_names::accept,"*/*");
if(!p_cmd_data->p_cmd_params->GetContent().empty())
request.set_body(p_cmd_data->p_cmd_params->GetContent(),"application/json");
//auto request_task = client.request(*p_cmd_data->p_mtd);
auto request_task = client.request(request);
if(p_cmd_data->p_cmd_params->IsAsync_())
{
request_task.then([&](http_response resp){
......
//
// Created by amir on 10/10/16.
//
#include <mhash.h>
#include <boost/thread/mutex.hpp>
#include <boost/atomic.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/bind.hpp>
#include <Microservice_App.h>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include "ScheduledTimer.h"
#define LOG_ERROR(str) std::cerr << str << std::endl //if(p_logger_) p_logger_->error(str);
namespace {
typedef boost::asio::deadline_timer Timer;
typedef boost::shared_ptr<Timer> TimerPtr;
typedef enum {
eFixed_Rate,
eFixed_Delay,
eOnce
}eTimerType;
}
class ScheduledTimer::Impl {
public:
class TimerTask {
public:
TimerTask() :
period_(1000) {
};
TimerTask(TimerPtr timer, std::function<void()> task,
boost::posix_time::milliseconds period, eTimerType timerType) :
timer_(timer), task_(task), period_(period), timerType_(timerType) {
}
TimerPtr timer_;
std::function<void()> task_;
boost::posix_time::milliseconds period_;
eTimerType timerType_;
};
private:
boost::atomic<bool> running_;
boost::asio::io_service io_service_;
boost::scoped_ptr<boost::asio::io_service::work> work_ptr_;
boost::thread_group thread_group_;
typedef std::map<std::string,TimerTask> TimerTasks;
TimerTasks timer_tasks_;
mutable boost::mutex timer_task_mutex_;
public:
Impl(size_t pool_size):
running_(true),
work_ptr_(new boost::asio::io_service::work(io_service_)) {
for (size_t i = 0; i < pool_size; ++i) {
thread_group_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_));
}
}
virtual ~Impl() {
shutdownNow();
}
void scheduleAtFixedRate(std::string& timerKey, TimeoutCallback& toFunc, std::chrono::milliseconds period){
scheduleTimer(timerKey,toFunc,period,eFixed_Rate);
}
void scheduleAtFixedDelay(std::string& timerKey, TimeoutCallback& toFunc, std::chrono::milliseconds period){
scheduleTimer(timerKey,toFunc,period,eFixed_Rate);
}
void scheduleOnce(std::string& timerKey, TimeoutCallback& toFunc, std::chrono::milliseconds period){
scheduleTimer(timerKey,toFunc,period,eOnce);
}
void cancel(std::string& timerKey);
/**
* Shuts down the service, may or may not return immediately depending on the pending tasks.
*/
virtual void shutdown();
/**
* Shuts down the service, will return immediately.
*/
virtual void shutdownNow();
/**
* gets the threadpool state.
* @return True if this is shutdown or shutting down, false otherwise.
*/
virtual bool isShutdown() const {
return !running_;
}
void cancelTimers();
private:
void timerHandler(const boost::system::error_code& ec, std::string timerKey);
void scheduleTimer(std::string timerKey,
TimeoutCallback& toFunc,
std::chrono::milliseconds interval,
eTimerType timerType);
};
void ScheduledTimer::Impl::shutdown() {
if (!running_) {
return;
}
running_ = false;
work_ptr_.reset();
thread_group_.interrupt_all();
thread_group_.join_all();
}
void ScheduledTimer::Impl::shutdownNow() {
if (!running_) {
return;
}
running_ = false;
cancelTimers();
io_service_.stop();
thread_group_.interrupt_all();
thread_group_.join_all();
}
void ScheduledTimer::Impl::timerHandler(const boost::system::error_code &ec, std::string timerKey) {
if (!running_) {
LOG_ERROR("Timer not started.");
return;
}
if (ec) {
LOG_ERROR(std::string("Unable to execute the timer, reason ").append(ec.message()));
return;
}
TimerTask timer_task;
try {
boost::lock_guard<boost::mutex> lock(timer_task_mutex_);
timer_task = timer_tasks_.at(timerKey);
} catch (const std::out_of_range& oor) {
LOG_ERROR(std::string("Unable to find the timer at index ").append(timerKey));
return;
}
if (!timer_task.timer_) {
LOG_ERROR(std::string("Unable to find the timer at index ").append(timerKey));
return;
}
timer_task.task_();
boost::system::error_code eec;
switch (timer_task.timerType_) {
case eFixed_Rate:
timer_task.timer_->expires_at(
timer_task.timer_->expires_at() + timer_task.period_, eec);
break;
case eFixed_Delay:
timer_task.timer_->expires_from_now(timer_task.period_, eec);
break;
case eOnce:
cancel(timerKey);
return; // don't reschedule
//break;
}
if (eec) {
LOG_ERROR(std::string("Unable to restart the time, reason ").append(eec.message()));
}
// reschedule
timer_task.timer_->async_wait(
boost::bind(&ScheduledTimer::Impl::timerHandler, this,
boost::asio::placeholders::error, timerKey));
}
void ScheduledTimer::Impl::cancelTimers() {
{
boost::lock_guard<boost::mutex> lock(timer_task_mutex_);
for (auto entry : timer_tasks_){
boost::system::error_code ec;
entry.second.timer_->cancel(ec);
}
}
}
void ScheduledTimer::Impl::scheduleTimer(std::string timerKey,
TimeoutCallback& toFunc,
std::chrono::milliseconds interval,
eTimerType timerType) {
boost::posix_time::milliseconds period(interval.count());
TimerPtr timer(new Timer(io_service_, period));
size_t timer_index = 0;
{
boost::lock_guard<boost::mutex> lock(timer_task_mutex_);
timer_tasks_[timerKey] = TimerTask(timer, toFunc, period, timerType);
timer_index = timer_tasks_.size() - 1;
}
timer->async_wait(
boost::bind(&ScheduledTimer::Impl::timerHandler, this,
boost::asio::placeholders::error, timerKey));
}
void ScheduledTimer::Impl::cancel(std::string &timerKey) {
boost::lock_guard<boost::mutex> lock(timer_task_mutex_);
auto entryItr = timer_tasks_.find(timerKey);
if (entryItr != timer_tasks_.end()) {
timer_tasks_.erase(timerKey);
boost::system::error_code ec;
entryItr->second.timer_->cancel(ec);
}
}
ScheduledTimer::ScheduledTimer() {
size_t pool_size = sysconf(_SC_NPROCESSORS_ONLN);
if (pool_size == 0)
pool_size = 1;
impl_ = std::make_shared<Impl>(pool_size);
}
void ScheduledTimer::start() {
}
/**
* stops to timers cancel all
* @param waitForCompletion - wait for comlete stop
*/
void ScheduledTimer::stop(bool waitForCompletion) {
if(waitForCompletion)
impl_->shutdownNow();
else
impl_->shutdown();
}
void ScheduledTimer::scheduleAtFixedRate(std::string &timerKey, TimeoutCallback toFunc, std::chrono::milliseconds period) {
impl_->scheduleAtFixedRate(timerKey,toFunc,period);
}
void ScheduledTimer::scheduleOnce(std::string &timerKey, TimeoutCallback toFunc, std::chrono::milliseconds period) {
impl_->scheduleOnce(timerKey,toFunc,period);
}
void ScheduledTimer::scheduleAtFixedDelay(std::string &timerKey, TimeoutCallback toFunc, std::chrono::milliseconds period) {
impl_->scheduleAtFixedDelay(timerKey,toFunc,period);
}
void ScheduledTimer::cancel(std::string &timerKey) {
impl_->cancel(timerKey);
}
void ScheduledTimer::cancelAll() {
impl_->cancelTimers();
}
//
// Created by amir on 10/10/16.
//
#ifndef MICROSERVICE_SCHEDULEDTIMER_H
#define MICROSERVICE_SCHEDULEDTIMER_H
/**
* Scheduled timer for one time or intervals
*/
#include <functional>
#include <chrono>
#include <memory>
using TimeoutCallback = std::function<void()>;
class ScheduledTimer {
public:
ScheduledTimer();
void start();
void stop(bool waitForCompletion);
void scheduleAtFixedRate(std::string& timerKey, TimeoutCallback toFunc, std::chrono::milliseconds period);
void scheduleAtFixedDelay(std::string& timerKey, TimeoutCallback toFunc, std::chrono::milliseconds period);
void scheduleOnce(std::string& timerKey, TimeoutCallback toFunc, std::chrono::milliseconds period);
void cancel(std::string& timerKey);
void cancelAll();
private:
// keeping the implementation separated from h file
class Impl;
std::shared_ptr<Impl> impl_;
};
#endif //MICROSERVICE_SCHEDULEDTIMER_H
......@@ -18,12 +18,14 @@
#include <params/MSCommandParams.h>
#include <common/MSTypes.h>
#include <string.h>
#include <iostream>
#include <utils/ScheduledTimer.h>
class cMicroserviceHandler: public cMicroservice_BaseHandler
{
char mba_GetReturnedString[1024];
cMicroservice_Client* p_client_;
nsMicroservice_Iface::IMetricsFactory::IHistogram* p_histo;
public:
cMicroserviceHandler(const char* pba_GetReturnedString)
{
......@@ -67,6 +69,9 @@ public:
else
this->SendErrorResp(pc_reqCtx,retstat.GetError());
//this->WriteObjectToResponse(pc_reqCtx,rpj_Doc);
// add metric
long value = rand() % 1000 + 1;
p_histo->update(value);
}
void DoUpdate(cMicroservice_RequestContext* pc_reqCtx)
......@@ -81,9 +86,11 @@ public:
void Init() {
std::string other_service("other-service");
auto port = this->mpc_Configuration->GetLong(std::string("server.port"),8000);
printf("port is: %u\n",(unsigned)port);
//printf("port is: %u\n",(unsigned)port);
p_client_ = GetApp()->GetMSClient(other_service);
srand (time(NULL));
std::string str("randy.random");
p_histo = this->GetApp()->GetMetricsFactory()->createHistogram(str);
}
};
......@@ -147,6 +154,37 @@ void testCache(){
pcc->delByPattern(key);
}
void to_func()
{
std::cout << " regular boring function" << std::endl;
}
class TO
{
public:
void to_member_func() {
std::cout << " member boring function" << std::endl;
}
};
void test_timers()
{
TO toA;
constexpr int NAGLOT = 1;
ScheduledTimer scheduledTimer;
for ( int i = 0 ; i < NAGLOT; i++) {
scheduledTimer.scheduleAtFixedRate(std::to_string(i).append("FixedRate"), [i] { std::cout << i << " inside exiting lambda!" << std::endl; },
std::chrono::milliseconds(1000));
scheduledTimer.scheduleAtFixedRate(std::to_string(i).append("FixedDelay"), std::bind(&to_func),
std::chrono::milliseconds(400));
scheduledTimer.scheduleOnce(std::to_string(i).append("Once"), std::bind(&TO::to_member_func,&toA),
std::chrono::milliseconds(2000));
}
std::this_thread::sleep_for(std::chrono::seconds(10));
std::cout << "stopping.....\n";
scheduledTimer.stop(true);
}
/**
* Test_Microservice app-name host port handler-prefix get-returned-string
* 1 2 3 4 5
......@@ -155,10 +193,10 @@ void testCache(){
int main(int argc, char *argv[])
{
// testCache();
runNewMS();
// testCache();
test_timers();
//runNewMS();
if (argc < 6)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment