Commit 844ca258 by amir

starting with zmq and flatbuffers

parent 12371823
......@@ -32,8 +32,10 @@ dependencies {
compile 'com.github.davidb:metrics-influxdb:0.8.2'
compile 'io.dropwizard.metrics:metrics-graphite:3.1.2'
compile 'io.jsonwebtoken:jjwt:0.6.0'
compile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
testCompile group: 'junit', name: 'junit', version: '4.11'
testCompile group: 'org.zeromq', name: 'jeromq', version: '0.4.0'
}
//1. use this install task (under "other" section) to create this jar localy on your machine.
......
namespace common.context;
enum CrudMethod:byte { Create = 0, Read, Update, Delete }
table RestMsg {
rcid:ulong;
source:string;
crudMethod:CrudMethod = Read;
url:string;
queryString:string;
content:string;
}
root_type RestMsg;
namespace common.context;
table RestResponse {
rcid:ulong;
response:string;
}
root_type RestResponse;
/*
* Copyright 2014 Google Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.flatbuffers;
/// @cond FLATBUFFERS_INTERNAL
/**
* Class that holds shared constants
*/
public class Constants {
// Java doesn't seem to have these.
/** The number of bytes in an `byte`. */
static final int SIZEOF_BYTE = 1;
/** The number of bytes in a `short`. */
static final int SIZEOF_SHORT = 2;
/** The number of bytes in an `int`. */
static final int SIZEOF_INT = 4;
/** The number of bytes in an `float`. */
static final int SIZEOF_FLOAT = 4;
/** The number of bytes in an `long`. */
static final int SIZEOF_LONG = 8;
/** The number of bytes in an `double`. */
static final int SIZEOF_DOUBLE = 8;
/** The number of bytes in a file identifier. */
static final int FILE_IDENTIFIER_LENGTH = 4;
}
/// @endcond
/*
* Copyright 2014 Google Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.flatbuffers;
import java.nio.ByteBuffer;
/// @cond FLATBUFFERS_INTERNAL
/**
* All structs in the generated code derive from this class, and add their own accessors.
*/
public class Struct {
/** Used to hold the position of the `bb` buffer. */
protected int bb_pos;
/** The underlying ByteBuffer to hold the data of the Struct. */
protected ByteBuffer bb;
}
/// @endcond
/*
* Copyright 2014 Google Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.flatbuffers;
import static com.google.flatbuffers.Constants.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
/// @cond FLATBUFFERS_INTERNAL
/**
* All tables in the generated code derive from this class, and add their own accessors.
*/
public class Table {
private final static ThreadLocal<CharsetDecoder> UTF8_DECODER = new ThreadLocal<CharsetDecoder>() {
@Override
protected CharsetDecoder initialValue() {
return Charset.forName("UTF-8").newDecoder();
}
};
public final static ThreadLocal<Charset> UTF8_CHARSET = new ThreadLocal<Charset>() {
@Override
protected Charset initialValue() {
return Charset.forName("UTF-8");
}
};
private final static ThreadLocal<CharBuffer> CHAR_BUFFER = new ThreadLocal<CharBuffer>();
/** Used to hold the position of the `bb` buffer. */
protected int bb_pos;
/** The underlying ByteBuffer to hold the data of the Table. */
protected ByteBuffer bb;
/**
* Get the underlying ByteBuffer.
*
* @return Returns the Table's ByteBuffer.
*/
public ByteBuffer getByteBuffer() { return bb; }
/**
* Look up a field in the vtable.
*
* @param vtable_offset An `int` offset to the vtable in the Table's ByteBuffer.
* @return Returns an offset into the object, or `0` if the field is not present.
*/
protected int __offset(int vtable_offset) {
int vtable = bb_pos - bb.getInt(bb_pos);
return vtable_offset < bb.getShort(vtable) ? bb.getShort(vtable + vtable_offset) : 0;
}
protected static int __offset(int vtable_offset, int offset, ByteBuffer bb) {
int vtable = bb.array().length - offset;
return bb.getShort(vtable + vtable_offset - bb.getInt(vtable)) + vtable;
}
/**
* Retrieve a relative offset.
*
* @param offset An `int` index into the Table's ByteBuffer containing the relative offset.
* @return Returns the relative offset stored at `offset`.
*/
protected int __indirect(int offset) {
return offset + bb.getInt(offset);
}
protected static int __indirect(int offset, ByteBuffer bb) {
return offset + bb.getInt(offset);
}
/**
* Create a Java `String` from UTF-8 data stored inside the FlatBuffer.
*
* This allocates a new string and converts to wide chars upon each access,
* which is not very efficient. Instead, each FlatBuffer string also comes with an
* accessor based on __vector_as_bytebuffer below, which is much more efficient,
* assuming your Java program can handle UTF-8 data directly.
*
* @param offset An `int` index into the Table's ByteBuffer.
* @return Returns a `String` from the data stored inside the FlatBuffer at `offset`.
*/
protected String __string(int offset) {
CharsetDecoder decoder = UTF8_DECODER.get();
decoder.reset();
offset += bb.getInt(offset);
ByteBuffer src = bb.duplicate().order(ByteOrder.LITTLE_ENDIAN);
int length = src.getInt(offset);
src.position(offset + SIZEOF_INT);
src.limit(offset + SIZEOF_INT + length);
int required = (int)((float)length * decoder.maxCharsPerByte());
CharBuffer dst = CHAR_BUFFER.get();
if (dst == null || dst.capacity() < required) {
dst = CharBuffer.allocate(required);
CHAR_BUFFER.set(dst);
}
dst.clear();
try {
CoderResult cr = decoder.decode(src, dst, true);
if (!cr.isUnderflow()) {
cr.throwException();
}
} catch (CharacterCodingException x) {
throw new Error(x);
}
return dst.flip().toString();
}
/**
* Get the length of a vector.
*
* @param offset An `int` index into the Table's ByteBuffer.
* @return Returns the length of the vector whose offset is stored at `offset`.
*/
protected int __vector_len(int offset) {
offset += bb_pos;
offset += bb.getInt(offset);
return bb.getInt(offset);
}
/**
* Get the start data of a vector.
*
* @param offset An `int` index into the Table's ByteBuffer.
* @return Returns the start of the vector data whose offset is stored at `offset`.
*/
protected int __vector(int offset) {
offset += bb_pos;
return offset + bb.getInt(offset) + SIZEOF_INT; // data starts after the length
}
/**
* Get a whole vector as a ByteBuffer.
*
* This is efficient, since it only allocates a new {@link ByteBuffer} object,
* but does not actually copy the data, it still refers to the same bytes
* as the original ByteBuffer. Also useful with nested FlatBuffers, etc.
*
* @param vector_offset The position of the vector in the byte buffer
* @param elem_size The size of each element in the array
* @return The {@link ByteBuffer} for the array
*/
protected ByteBuffer __vector_as_bytebuffer(int vector_offset, int elem_size) {
int o = __offset(vector_offset);
if (o == 0) return null;
ByteBuffer bb = this.bb.duplicate().order(ByteOrder.LITTLE_ENDIAN);
int vectorstart = __vector(o);
bb.position(vectorstart);
bb.limit(vectorstart + __vector_len(o) * elem_size);
return bb;
}
/**
* Initialize any Table-derived type to point to the union at the given `offset`.
*
* @param t A `Table`-derived type that should point to the union at `offset`.
* @param offset An `int` index into the Table's ByteBuffer.
* @return Returns the Table that points to the union at `offset`.
*/
protected Table __union(Table t, int offset) {
offset += bb_pos;
t.bb_pos = offset + bb.getInt(offset);
t.bb = bb;
return t;
}
/**
* Check if a {@link ByteBuffer} contains a file identifier.
*
* @param bb A {@code ByteBuffer} to check if it contains the identifier
* `ident`.
* @param ident A `String` identifier of the FlatBuffer file.
* @return True if the buffer contains the file identifier
*/
protected static boolean __has_identifier(ByteBuffer bb, String ident) {
if (ident.length() != FILE_IDENTIFIER_LENGTH)
throw new AssertionError("FlatBuffers: file identifier must be length " +
FILE_IDENTIFIER_LENGTH);
for (int i = 0; i < FILE_IDENTIFIER_LENGTH; i++) {
if (ident.charAt(i) != (char)bb.get(bb.position() + SIZEOF_INT + i)) return false;
}
return true;
}
/**
* Sort tables by the key.
*
* @param offsets An 'int' indexes of the tables into the bb.
* @param bb A {@code ByteBuffer} to get the tables.
*/
protected void sortTables(int[] offsets, final ByteBuffer bb) {
Integer[] off = new Integer[offsets.length];
for (int i = 0; i < offsets.length; i++) off[i] = offsets[i];
java.util.Arrays.sort(off, new java.util.Comparator<Integer>() {
public int compare(Integer o1, Integer o2) {
return keysCompare(o1, o2, bb);
}
});
for (int i = 0; i < offsets.length; i++) offsets[i] = off[i];
}
/**
* Compare two tables by the key.
*
* @param o1 An 'Integer' index of the first key into the bb.
* @param o2 An 'Integer' index of the second key into the bb.
* @param bb A {@code ByteBuffer} to get the keys.
*/
protected int keysCompare(Integer o1, Integer o2, ByteBuffer bb) { return 0; }
/**
* Compare two strings in the buffer.
*
* @param offset_1 An 'int' index of the first string into the bb.
* @param offset_2 An 'int' index of the second string into the bb.
* @param bb A {@code ByteBuffer} to get the strings.
*/
protected static int compareStrings(int offset_1, int offset_2, ByteBuffer bb) {
offset_1 += bb.getInt(offset_1);
offset_2 += bb.getInt(offset_2);
int len_1 = bb.getInt(offset_1);
int len_2 = bb.getInt(offset_2);
int startPos_1 = offset_1 + SIZEOF_INT;
int startPos_2 = offset_2 + SIZEOF_INT;
int len = Math.min(len_1, len_2);
byte[] bbArray = bb.array();
for(int i = 0; i < len; i++) {
if (bbArray[i + startPos_1] != bbArray[i + startPos_2])
return bbArray[i + startPos_1] - bbArray[i + startPos_2];
}
return len_1 - len_2;
}
/**
* Compare string from the buffer with the 'String' object.
*
* @param offset_1 An 'int' index of the first string into the bb.
* @param key Second string as a byte array.
* @param bb A {@code ByteBuffer} to get the first string.
*/
protected static int compareStrings(int offset_1, byte[] key, ByteBuffer bb) {
offset_1 += bb.getInt(offset_1);
int len_1 = bb.getInt(offset_1);
int len_2 = key.length;
int startPos_1 = offset_1 + Constants.SIZEOF_INT;
int len = Math.min(len_1, len_2);
byte[] bbArray = bb.array();
for (int i = 0; i < len; i++) {
if (bbArray[i + startPos_1] != key[i])
return bbArray[i + startPos_1] - key[i];
}
return len_1 - len_2;
}
}
/// @endcond
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
public final class CrudMethod {
private CrudMethod() { }
public static final byte Create = 0;
public static final byte Read = 1;
public static final byte Update = 2;
public static final byte Delete = 3;
public static final String[] names = { "Create", "Read", "Update", "Delete", };
public static String name(int e) { return names[e]; }
}
package microservice;
package microservice.common.context;
import java.util.Deque;
import java.util.Map;
......
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
public final class RestMsg extends Table {
public static RestMsg getRootAsRestMsg(ByteBuffer _bb) { return getRootAsRestMsg(_bb, new RestMsg()); }
public static RestMsg getRootAsRestMsg(ByteBuffer _bb, RestMsg obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public RestMsg __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public long rcid() { int o = __offset(4); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public String source() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer sourceAsByteBuffer() { return __vector_as_bytebuffer(6, 1); }
public byte crudMethod() { int o = __offset(8); return o != 0 ? bb.get(o + bb_pos) : 1; }
public String url() { int o = __offset(10); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer urlAsByteBuffer() { return __vector_as_bytebuffer(10, 1); }
public String queryString() { int o = __offset(12); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer queryStringAsByteBuffer() { return __vector_as_bytebuffer(12, 1); }
public String content() { int o = __offset(14); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer contentAsByteBuffer() { return __vector_as_bytebuffer(14, 1); }
public static int createRestMsg(FlatBufferBuilder builder,
long rcid,
int sourceOffset,
byte crudMethod,
int urlOffset,
int queryStringOffset,
int contentOffset) {
builder.startObject(6);
RestMsg.addRcid(builder, rcid);
RestMsg.addContent(builder, contentOffset);
RestMsg.addQueryString(builder, queryStringOffset);
RestMsg.addUrl(builder, urlOffset);
RestMsg.addSource(builder, sourceOffset);
RestMsg.addCrudMethod(builder, crudMethod);
return RestMsg.endRestMsg(builder);
}
public static void startRestMsg(FlatBufferBuilder builder) { builder.startObject(6); }
public static void addRcid(FlatBufferBuilder builder, long rcid) { builder.addLong(0, rcid, 0L); }
public static void addSource(FlatBufferBuilder builder, int sourceOffset) { builder.addOffset(1, sourceOffset, 0); }
public static void addCrudMethod(FlatBufferBuilder builder, byte crudMethod) { builder.addByte(2, crudMethod, 1); }
public static void addUrl(FlatBufferBuilder builder, int urlOffset) { builder.addOffset(3, urlOffset, 0); }
public static void addQueryString(FlatBufferBuilder builder, int queryStringOffset) { builder.addOffset(4, queryStringOffset, 0); }
public static void addContent(FlatBufferBuilder builder, int contentOffset) { builder.addOffset(5, contentOffset, 0); }
public static int endRestMsg(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
public static void finishRestMsgBuffer(FlatBufferBuilder builder, int offset) { builder.finish(offset); }
}
// automatically generated by the FlatBuffers compiler, do not modify
package microservice.common.context;
import java.nio.*;
import java.lang.*;
import java.util.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
public final class RestResponse extends Table {
public static RestResponse getRootAsRestResponse(ByteBuffer _bb) { return getRootAsRestResponse(_bb, new RestResponse()); }
public static RestResponse getRootAsRestResponse(ByteBuffer _bb, RestResponse obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public RestResponse __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public long rcid() { int o = __offset(4); return o != 0 ? bb.getLong(o + bb_pos) : 0L; }
public String response() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer responseAsByteBuffer() { return __vector_as_bytebuffer(6, 1); }
public static int createRestResponse(FlatBufferBuilder builder,
long rcid,
int responseOffset) {
builder.startObject(2);
RestResponse.addRcid(builder, rcid);
RestResponse.addResponse(builder, responseOffset);
return RestResponse.endRestResponse(builder);
}
public static void startRestResponse(FlatBufferBuilder builder) { builder.startObject(2); }
public static void addRcid(FlatBufferBuilder builder, long rcid) { builder.addLong(0, rcid, 0L); }
public static void addResponse(FlatBufferBuilder builder, int responseOffset) { builder.addOffset(1, responseOffset, 0); }
public static int endRestResponse(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
public static void finishRestResponseBuffer(FlatBufferBuilder builder, int offset) { builder.finish(offset); }
}
......@@ -6,7 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import microservice.MicroserviceApp;
import microservice.RestContext;
import microservice.common.context.RestContext;
import microservice.io.iface.IConfiguration;
import microservice.io.iface.IContainer;
import microservice.io.iface.ILogger;
......@@ -90,31 +90,33 @@ public abstract class BaseHandler
public void subscribe(RestContext reqCtx, String topic, INotifyCallback notifyHandler)
{
reqCtx.container.subscribe(topic,notifyHandler);
// reqCtx.container.subscribe(topic,notifyHandler);
}
public void unsubscribe(RestContext reqCtx, String topic)
{
reqCtx.container.unsubscribe(topic);
//reqCtx.container.unsubscribe(topic);
}
public void publish(String topic, JsonNode messageNode)
{
optContainer.ifPresent(container -> container.publish(topic,messageNode));
// optContainer.ifPresent(container -> container.publish(topic,messageNode));
}
public void subscribe(String topic, INotifyCallback notifyHandler)
{
optContainer.ifPresent(container -> container.subscribe(topic,notifyHandler));
// optContainer.ifPresent(container -> container.subscribe(topic,notifyHandler));
}
public void unsubscribe(String topic)
{
optContainer.ifPresent(container -> container.unsubscribe(topic));
// optContainer.ifPresent(container -> container.unsubscribe(topic));
}
public void publish(RestContext reqCtx, String topic, JsonNode messageNode)
{
reqCtx.container.publish(topic,messageNode);
// reqCtx.container.publish(topic,messageNode);
}
......
......@@ -5,7 +5,7 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import microservice.RestContext;
import microservice.common.context.RestContext;
import microservice.defs.Constants;
import microservice.defs.Enums.EnumHttpMethod;
import microservice.io.iface.*;
......@@ -199,25 +199,25 @@ public class MBIHandler extends RMQHandler implements IContainer
return obj;
}
@Override
public void subscribe(String topic, INotifyCallback notifyHandler)
{
// TODO Auto-generated method stub
}
@Override
public void unsubscribe(String topic)
{
// TODO Auto-generated method stub
}
@Override
public void publish(String topic, JsonNode messageNode)
{
// TODO Auto-generated method stub
}
// @Override
// public void subscribe(String topic, INotifyCallback notifyHandler)
// {
// // TODO Auto-generated method stub
//
// }
//
// @Override
// public void unsubscribe(String topic)
// {
// // TODO Auto-generated method stub
//
// }
//
// @Override
// public void publish(String topic, JsonNode messageNode)
// {
// // TODO Auto-generated method stub
//
// }
}
......@@ -12,7 +12,7 @@ import microservice.io.iface.CommonServices;
import microservice.io.iface.IConfiguration;
import microservice.io.impl.IMetricsFactoryImpl;
import microservice.types.BaseRestResponse;
import microservice.RestContext;
import microservice.common.context.RestContext;
/**
* this class is for monitoring the microservice
......
......@@ -11,10 +11,9 @@ import io.undertow.util.HeaderMap;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import microservice.MicroserviceApp;
import microservice.RestContext;
import microservice.common.context.RestContext;
import microservice.common.EncryptionUtils;
import microservice.defs.Constants;
import microservice.defs.Enums;
import microservice.defs.Enums.EnumHttpMethod;
import microservice.io.iface.*;
import microservice.io.iface.IMetricsFactory.IMeter;
......@@ -363,37 +362,37 @@ public class RestHandler implements HttpHandler , IContainer
}
@Override
public void subscribe(String topic, INotifyCallback notifyHandler)
{
if (pubSub != null && topic != null && notifyHandler != null)
{
pubSub.subscribe(topic, notifyHandler);
}
}
@Override
public void unsubscribe(String topic)
{
// TODO Auto-generated method stub
if (pubSub != null && topic != null)
{
pubSub.unsubscribe(topic);
}
}
@Override
public void publish(String topic, JsonNode messageNode)
{
if (pubSub != null && topic != null)
{
pubSub.publish(topic, messageNode.toString());
}
}
// @Override
// public void subscribe(String topic, INotifyCallback notifyHandler)
// {
// if (pubSub != null && topic != null && notifyHandler != null)
// {
// pubSub.subscribe(topic, notifyHandler);
// }
//
// }
//
//
// @Override
// public void unsubscribe(String topic)
// {
// // TODO Auto-generated method stub
// if (pubSub != null && topic != null)
// {
// pubSub.unsubscribe(topic);
// }
// }
//
//
// @Override
// public void publish(String topic, JsonNode messageNode)
// {
// if (pubSub != null && topic != null)
// {
// pubSub.publish(topic, messageNode.toString());
// }
// }
//
}
......@@ -36,21 +36,21 @@ public interface IContainer
*/
public Object readObjectFromRequest(IRequest request,Class<?> ObjClass);
/**
* subscribing to specific topic
* @param topic
* @param notifyHandler
*/
public void subscribe(String topic, INotifyCallback notifyHandler);
public void unsubscribe(String topic);
/**
* publish msg on specific topic
* @param topic
* @param messageNode
*/
public void publish(String topic, JsonNode messageNode);
// /**
// * subscribing to specific topic
// * @param topic
// * @param notifyHandler
// */
// public void subscribe(String topic, INotifyCallback notifyHandler);
//
// public void unsubscribe(String topic);
//
// /**
// * publish msg on specific topic
// * @param topic
// * @param messageNode
// */
// public void publish(String topic, JsonNode messageNode);
//
}
......@@ -8,7 +8,6 @@ import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.server.Connectors;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.PathHandler;
......@@ -18,8 +17,7 @@ import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import io.undertow.util.MimeMappings;
import microservice.MicroserviceApp;
import microservice.MicroserviceClient;
import microservice.RestContext;
import microservice.common.context.RestContext;
import microservice.common.EncryptionUtils;
import microservice.defs.Constants;
import microservice.defs.Enums;
......@@ -28,19 +26,13 @@ import microservice.handlers.Reactor;
import microservice.io.iface.*;
import microservice.io.impl.IRequestRestImpl;
import microservice.io.impl.IResponseRestImpl;
import microservice.io.impl.IRestClientRestImpl;
import microservice.params.BaseClientParams;
import microservice.params.CommandParams;
import microservice.params.RestClientParams;
import microservice.params.RestServerParams;
import microservice.types.BaseRestResponse;
import microservice.types.UserProfile;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static io.undertow.Handlers.resource;
......@@ -370,22 +362,6 @@ public class IRestServiceHttpImpl extends CommonServices.IRestService implements
return obj;
}
@Override
public void subscribe(String topic, INotifyCallback notifyHandler) {
}
@Override
public void unsubscribe(String topic) {
}
@Override
public void publish(String topic, JsonNode messageNode) {
}
/**
* validate the request: service authorization etc.
* @param restContext
......
package microservice.io.impl.service;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import microservice.io.iface.*;
import microservice.params.CommandParams;
import microservice.types.BaseRestResponse;
import java.util.function.Consumer;
/**
* Created by amir on 14/05/17.
*/
public class IRestServiceZmqImpl extends CommonServices.IRestService implements HttpHandler, IContainer {
private String appName;
private String host = null; // the local host address of the service
private int port = 0; // in case of port like in tcp
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
}
@Override
public BaseRestResponse create(CommandParams cmdParams) {
return null;
}
@Override
public BaseRestResponse read(CommandParams cmdParams) {
return null;
}
@Override
public BaseRestResponse update(CommandParams cmdParams) {
return null;
}
@Override
public BaseRestResponse delete(CommandParams cmdParams) {
return null;
}
@Override
public boolean asyncCreate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncRead(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncUpdate(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public boolean asyncDelete(CommandParams reqCtx, Consumer<BaseRestResponse> cbFunc) {
return false;
}
@Override
public void sendErrorResp(IResponse response, String error) {
}
@Override
public void writeObjectToResponse(IResponse response, Object value) {
}
@Override
public Object readObjectFromRequest(IRequest request, Class<?> ObjClass) {
return null;
}
@Override
public void startAsync(IRequest request, Runnable asyncFunc) {
}
@Override
public boolean init() {
return false;
}
@Override
public void run() {
}
@Override
public void shutdown() {
}
@Override
public void handleNotImplmented(CommonServices.IMsgContext msgContext) {
}
@Override
public void register(IServiceDiscovery serviceDiscovery, String id) {
if (serviceDiscovery != null)
serviceDiscovery.registerService(appName, id, host, port);
}
}
package microservice.params;
import jdk.nashorn.internal.ir.EmptyNode;
/**
* Created by amir on 14/05/17.
*/
public class ZMQParams {
public static class ServerParams
{
public enum EnumProtocol
{
eInproc,
eIpc,
eTcp,
ePgm,
eEpgm
}
public ServerParams(EnumProtocol protocol, int port, String host) {
this.protocol = protocol;
this.port = port;
this.host = host;
}
EnumProtocol protocol() { return protocol; }
public static String buildAddress (String host, int port, EnumProtocol protocol) {
StringBuilder bindAddr = new StringBuilder();
if(! host.isEmpty()) {
switch (protocol) {
case eInproc:
bindAddr.append("inproc://").append(host);
break;
case eIpc:
bindAddr.append("ipc://").append(host);
break;
case eTcp:
bindAddr.append("tcp://").append(host).append(":").append(String.valueOf(port));
break;
case ePgm:
bindAddr.append("pgm://").append(host).append(":").append(String.valueOf(port));
break;
case eEpgm:
bindAddr.append("epgm://").append(host).append(":").append(String.valueOf(port));
break;
}
}
return bindAddr.toString();
}
String bindAddress() { return buildAddress(host,port,protocol); }
private EnumProtocol protocol;
private int port;
String host;
};
// class Microservice_ZMQPubSubParams : public Microservice_ZMQServerParams {
//
// public:
// Microservice_ZMQPubSubParams(String host,
// int port,
// eProtocol protocol,
// String subHost,
// int subPort):
// Microservice_ZMQServerParams(host,port,protocol),
// subHost_(subHost),subPort_(subPort){
//
// }
// String publishAddress() { return bindAddress(); }
// String subscribeAddress() {
// return buildAddress(subHost_,subPort_);
// }
//
// private:
// String subHost_;
// int subPort_;
//
// };
// class Microservice_ZMQRestClientParams {
// public:
// Microservice_ZMQRestClientParams(Microservice_ZMQServerParams client, Microservice_ZMQServerParams server) :
// client_(client), server_(server){}
//
// Microservice_ZMQServerParams& GetClient() { return client_; }
// ServerParams GetServer() { return server; }
// private ServerParams server;
// Microservice_ZMQServerParams client_;
// };
}
......@@ -4,8 +4,10 @@ import microservice.io.iface.CommonServices;
import microservice.io.iface.ICommandClient;
import microservice.io.impl.IRestClientRestImpl;
import microservice.io.impl.service.IRestServiceHttpImpl;
import microservice.io.impl.service.IRestServiceZmqImpl;
import microservice.params.RestClientParams;
import microservice.params.RestServerParams;
import microservice.params.ZMQParams;
/**
* Created by amir on 09/05/17.
......@@ -17,6 +19,10 @@ public class ServiceBuilderFactory {
return new RestServiceHttpBuilder(serviceMode);
}
public static RestServiceZmqBuilder createRestServiceZmqBuilder(CommonServices.EnumRestServiceMode serviceMode){
return new RestServiceZmqBuilder(serviceMode);
}
public interface IBuilder {
CommonServices.IService build();
}
......@@ -107,4 +113,77 @@ public class ServiceBuilderFactory {
return true;
}
}
public static class RestServiceZmqBuilder implements IBuilder {
CommonServices.EnumRestServiceMode serviceMode = CommonServices.EnumRestServiceMode.E_UNKNOWN;
ZMQParams.ServerParams serverParams = null;
ZMQParams.ServerParams clientParams = null;
IRestServiceZmqImpl restServiceZmq = null;
public RestServiceZmqBuilder(CommonServices.EnumRestServiceMode serviceMode) {
this.serviceMode = serviceMode;
}
public void setServerParams(ZMQParams.ServerParams serverParams) {
this.serverParams = serverParams;
}
public void setClientParams(ZMQParams.ServerParams clientParams) {
this.clientParams = clientParams;
}
private boolean validateParams() {
switch (serviceMode){
case E_UNKNOWN:
return false;
case E_SERVER:
if (this.serverParams == null)
return false;
break;
case E_CLIENT:
if (this.clientParams == null)
return false;
break;
case E_CLIENT_SERVER:
if (this.serverParams == null || this.clientParams == null)
return false;
break;
}
return true;
}
@Override
public CommonServices.IService build() {
if (validateParams()) {
try {
restServiceZmq = new IRestServiceZmqImpl();
restServiceZmq.setServiceMode(serviceMode);
// switch (serviceMode) {
// case E_SERVER:
// restServiceZmq.setRestServerParams(restServerParams);
// break;
// case E_CLIENT:
// if (restClient == null)
// restClient = new IRestClientRestImpl(restClientParams);
// restServiceZmq.setRestClient(restClient);
// break;
// case E_CLIENT_SERVER:
// restServiceZmq.setRestServerParams(restServerParams);
// if (restClient == null)
// restClient = new IRestClientRestImpl(restClientParams);
// restServiceZmq.setRestClient(restClient);
// break;
// }
} catch (Exception exp){
System.err.println(this.getClass().getName().toString() + "Exception >> " + exp);
restServiceZmq = null;
}
} else {
System.err.println(this.getClass().getName().toString() + " >> Failed in validating params");
}
return restServiceZmq;
}
}
}
......@@ -3,6 +3,7 @@ package microservice;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import microservice.common.context.RestContext;
import microservice.defs.Enums;
import microservice.io.iface.CommonServices;
import microservice.io.iface.ICommandClient;
......
package microservice;
import com.google.flatbuffers.FlatBufferBuilder;
import microservice.common.context.CrudMethod;
import microservice.common.context.RestMsg;
import microservice.common.context.RestResponse;
import org.junit.Test;
import org.zeromq.ZMQ;
import org.zeromq.ZSocket;
import zmq.Utils;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
/**
* Created by amir on 11/05/17.
*/
public class TestZMQ {
static final String IPC_FILE1 = "/tmp/service-name1.ipc";
static final String IPC_FILE2 = "/tmp/service-name2.ipc";
static final String EXIT_MSG = "exit";
static final int EXIT_MSG_LEN = EXIT_MSG.length();
static final String JSON_CONTENT = "{\n" +
" \"success\": true,\n" +
" \"error\": null,\n" +
" \"objectNode\": {\n" +
" \"success\": true,\n" +
" \"error\": null,\n" +
" \"objectNode\": {\n" +
" \"num_results\": 6,\n" +
" \"query\": \"base\",\n" +
" \"results\": [\n" +
" {\n" +
" \"description\": null,\n" +
" \"name\": \"amir/base-server-no-db\"\n" +
" },\n" +
" {\n" +
" \"description\": null,\n" +
" \"name\": \"amir/base-server-ui\"\n" +
" },\n" +
" {\n" +
" \"description\": null,\n" +
" \"name\": \"amir/base-server-db\"\n" +
" },\n" +
" {\n" +
" \"description\": \"\",\n" +
" \"name\": \"ipgallery/base-ims\"\n" +
" },\n" +
" {\n" +
" \"description\": \"\",\n" +
" \"name\": \"ipgallery/base-resin\"\n" +
" },\n" +
" {\n" +
" \"description\": \"\",\n" +
" \"name\": \"ipgallery/base-microservice-java\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" }\n" +
"}";
static final String SOURCE_CHANNEL = "ipc:///tmp/some-file.ipc";
static final String URI = "/xxx/resource/subResource";
static final String QUERY_STRING = "a=b&c=d&abba=sabba";
@Test
public void pushPullTestTCP() throws IOException
{
int port = Utils.findOpenPort();
try (
final ZSocket pull = new ZSocket(ZMQ.PULL);
final ZSocket push = new ZSocket(ZMQ.PUSH))
{
pull.bind("tcp://*:" + port);
push.connect("tcp://127.0.0.1:" + port);
final String expected = "hello";
push.sendStringUtf8(expected);
final String actual = pull.receiveStringUtf8();
assertEquals(expected, actual);
}
}
@Test
public void pushPullTestIPC() throws IOException
{
String ipcName = "//tmp//ipc1";
//int port = Utils.findOpenPort();
try (
final ZSocket pull = new ZSocket(ZMQ.PULL);
final ZSocket push = new ZSocket(ZMQ.PUSH))
{
pull.bind("ipc://" + ipcName);
push.connect("ipc://" + ipcName);
final String expected = "hello";
push.sendStringUtf8(expected);
final String actual = pull.receiveStringUtf8();
assertEquals(expected, actual);
}
}
void testRequestResponse(int iterations) throws InterruptedException {
final String ipcFile1 = IPC_FILE1;
final String ipcFile2 = IPC_FILE2;
//zmqpp::context context;
// create and bind a serverReceive socket
String ipcAddress1 = "ipc://" + ipcFile1;
String ipcAddress2 = "ipc://" + ipcFile2;
final ZSocket clientSend = new ZSocket(ZMQ.PUSH);
final ZSocket serverReceive = new ZSocket(ZMQ.PULL);
final ZSocket clientReceive = new ZSocket(ZMQ.PULL);
final ZSocket serverReply = new ZSocket(ZMQ.PUSH);
clientSend.connect(ipcAddress1);
clientReceive.bind(ipcAddress2);
serverReceive.bind(ipcAddress1);
serverReply.connect(ipcAddress2);
// int maxSize = 10000;
// serverReceive..set(zmqpp::socket_option::receive_high_water_mark, maxSize);
// serverReply.set(zmqpp::socket_option::send_high_water_mark, maxSize);
// clientReceive.set(zmqpp::socket_option::receive_high_water_mark, maxSize);
// clientSend.set(zmqpp::socket_option::send_high_water_mark, maxSize);
Thread serverThread = new Thread(() -> {
boolean keepRunning = true;
ByteBuffer respBB = ByteBuffer.allocate(1024);
FlatBufferBuilder requestBuilder = new FlatBufferBuilder();
FlatBufferBuilder respBuilder = new FlatBufferBuilder();
while (keepRunning) {
respBuilder.init(respBB);
final byte[] response = serverReceive.receive();
if (response.length > EXIT_MSG_LEN) {
ByteBuffer bb = ByteBuffer.wrap(response);
RestMsg receiveMsg = RestMsg.getRootAsRestMsg(bb);
//respBuilder.Clear();
long rcid = receiveMsg.rcid();
final String content = receiveMsg.content();
int contentOffset = respBuilder.createString(content);
int respSize = RestResponse.createRestResponse(respBuilder,rcid,contentOffset);
RestResponse.finishRestResponseBuffer(respBuilder,respSize);
serverReply.send(respBuilder.sizedByteArray(),ZMQ.DONTWAIT);
} else {
String msg = new String(response);
//std::cout << "Server Received Msg: " << msg << std::endl;
if (msg.equals(EXIT_MSG) ) {
keepRunning = false;
serverReply.send(msg.getBytes(),ZMQ.DONTWAIT);
}
// else if (response.parts() == 2) {
// msg = response.get(1);
// // std::cout << "Server Received Second Msg: " << msg << std::endl;
// serverReply.send(msg, zmqpp::socket::dont_wait);
// }
}
}
//std::cout << "Server exit.." << std::endl;
});
serverThread.start();
Thread clientReceiveThread = new Thread(() -> {
boolean keepRunning = true;
int lastNumber;
long rcid = 0;
//flatbuffers::FlatBufferBuilder respBuilder(1024);
while (keepRunning) {
//clientReceive.receive(response);
final byte[] response = clientReceive.receive();
if (response.length > EXIT_MSG_LEN) {
ByteBuffer bb = ByteBuffer.wrap(response);
RestMsg receiveMsg = RestMsg.getRootAsRestMsg(bb);
rcid = receiveMsg.rcid();
//std::cout << "Client Received Msg: " << receiveMsg->objectNode()->c_str() << std::endl;
} else {
String msg = new String(response);
//std::cout << "Client Received Msg: " << msg << std::endl;
if (msg.equals(EXIT_MSG))
keepRunning = false;
else
lastNumber = Integer.valueOf(msg);
}
}
//std::cout << "Client exit.." << std::endl;
});
clientReceiveThread.start();
//
// Send a single message from serverReceive to clientSend
int size;
ByteBuffer reqBB = ByteBuffer.allocate(1024);
FlatBufferBuilder requestBuilder = new FlatBufferBuilder();
for (int i = 0; i < iterations; i++) {
requestBuilder.init(reqBB);
//builder.Clear();
final int sourceOffset = requestBuilder.createString(SOURCE_CHANNEL);
int reqSize = RestMsg.createRestMsg(requestBuilder,
i,
sourceOffset,
CrudMethod.Create,
requestBuilder.createString(URI),
requestBuilder.createString(QUERY_STRING),
requestBuilder.createString(JSON_CONTENT));
RestMsg.finishRestMsgBuffer(requestBuilder,reqSize);
//std::cout << builder.GetSize() << std::endl;
final byte[] bytesToSend = requestBuilder.sizedByteArray();
clientSend.send(bytesToSend,ZMQ.DONTWAIT);
}
clientSend.send(EXIT_MSG.getBytes());
serverThread.join();
// std::cout << "Server exited" << std::endl;
clientReceiveThread.join();
// std::cout << "Client exited" << std::endl;
}
@Test
public void testPerformance(){
int iterations = 1000000;
final long start = System.currentTimeMillis();
try {
testRequestResponse(iterations);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Test of: " + String.valueOf(iterations) + " took(msec) כולל הקמה ופרוק: " + String.valueOf((System.currentTimeMillis() - start)));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment