Commit b3550ff9 by amir

first full version 1.0.0

parent 524d04f4
Pipeline #4 skipped in 0 seconds
......@@ -2,13 +2,32 @@ group 'com.ipgallery.common'
version '1.0.0'
apply plugin: 'java'
apply plugin: 'maven-publish'
sourceCompatibility = 1.8
repositories {
mavenCentral()
//mavenCentral()
maven { url "http://172.16.1.132:8081/repository/internal" }
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.11'
}
publishing {
publications {
repositories.maven {
url 'http://172.16.1.132:8081/repository/internal'
credentials {
username "admin"
password "giptmgr1"
}
}
mavenJava(MavenPublication) {
//artifactId 'group-service'
from components.java
}
}
}
\ No newline at end of file
package itc;
import java.io.Serializable;
/**
* this is the ITC simple messgae, it can be inherited
* @author amir
*
*/
public class ItcMessage implements Serializable
{
private static final long serialVersionUID = 1L;
int opCode = 0;
Object msgObj = null;
Object extraMsgObj = null;
boolean waitForReply = false;
String fromSender = null;
public ItcMessage(int opCode, Object msgObj, Object extraMsgObj,boolean waitForReply)
{
this.opCode = opCode;
this.msgObj = msgObj;
this.extraMsgObj = extraMsgObj;
this.waitForReply = waitForReply;
}
public ItcMessage(int opCode, Object msgObj, Object extraMsgObj,boolean waitForReply,String from)
{
this.opCode = opCode;
this.msgObj = msgObj;
this.extraMsgObj = extraMsgObj;
this.waitForReply = waitForReply;
this.fromSender = from;
}
public int getOpCode()
{
return opCode;
}
public Object getMsgObj()
{
return msgObj;
}
public Object getExtraMsgObj()
{
return extraMsgObj;
}
public boolean isWaitForReply()
{
return waitForReply;
}
public String getFromSender()
{
return fromSender;
}
public void setMsgObj(Object msgObj)
{
this.msgObj = msgObj;
}
}
package itc;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
public class ItcMessageQueue
{
private static final Logger logger = Logger.getLogger(ItcMessageQueue.class.getName());
final Lock mReplyLock = new ReentrantLock();
final Condition mReplyCond = mReplyLock.newCondition();
private BlockingQueue<ItcMessage> mMsgQueue;// = new ArrayBlockingQueue
private Object mReplyObject = null;
public ItcMessageQueue(int capacity)
{
mMsgQueue = new ArrayBlockingQueue<ItcMessage>(capacity);
}
public boolean sendMsg(int opcode, Object msgObj,Object extraMsgObj){
return this.sendMsg(opcode, msgObj, extraMsgObj, null);
}
/**
* sends the message if the queue is not full
* @param opcode
* @param msgObj
* @param extraMsgObj
* @param fromSender
* @return true on success, false otherwise.
*/
public boolean sendMsg(int opcode, Object msgObj,Object extraMsgObj,String fromSender)
{
boolean retStatus = true;
try
{
ItcMessage msg = new ItcMessage(opcode,msgObj,extraMsgObj,false,fromSender);
retStatus = mMsgQueue.offer(msg);
} catch (Exception e)
{
logger.severe(e.toString());
retStatus = false;
}
return retStatus;
}
/**
* sends the message if the queue is not full otherwise waiting the timeout to become available
* @param opcode
* @param msgObj
* @param extraMsgObj
* @param fromSender
* @param timeoutMilli
* @return
*/
public boolean sendMsg(int opcode, Object msgObj,Object extraMsgObj,String fromSender,long timeoutMilli)
{
boolean retStatus = true;
try
{
ItcMessage msg = new ItcMessage(opcode,msgObj,extraMsgObj,false,fromSender);
retStatus = mMsgQueue.offer(msg,timeoutMilli,TimeUnit.MILLISECONDS);
} catch (Exception e)
{
logger.severe(e.toString());
retStatus = false;
}
return retStatus;
}
/**
* sends the message if the queue is not full
* @param opcode
* @param msgObj
* @param extraMsgObj
* @param fromSender
* @return true on success, false otherwise.
*/
public boolean forwardMsg(ItcMessage msg)
{
boolean retStatus = true;
try
{
retStatus = mMsgQueue.offer(msg);
} catch (Exception e)
{
logger.severe(e.toString());
retStatus = false;
}
return retStatus;
}
public Object sendSessionMsgWaitReply(int opcode, Object msgObj,Object extraMsgObj, long waitTime){
return sendSessionMsgWaitReply(opcode, msgObj, extraMsgObj, null, waitTime);
}
/**
* same as {@link #sendMsg(int, Object, Object)}
* only here we can wait for a reply from the server for the waitTime received
* @param opcode
* @param msgObj
* @param extraMsgObj
* @param fromSender
* @param waitTime the time to wait for a reply in milli
* @return the reply or null
*/
public Object sendSessionMsgWaitReply(int opcode, Object msgObj,Object extraMsgObj, String fromSender, long waitTime)
{
Object reply = null;
ItcMessage msg = new ItcMessage(opcode,msgObj,extraMsgObj,true,fromSender);
mReplyLock.lock();
mReplyObject = null;
try
{
if(mMsgQueue.offer(msg,waitTime,TimeUnit.MILLISECONDS))
{
if (mReplyCond.await(waitTime,TimeUnit.MILLISECONDS) )
{
reply = mReplyObject;
}
}
} catch (Exception e)
{
logger.severe(e.toString());
}
finally {
mReplyLock.unlock();
}
return reply;
}
/**
* receive messagew
* @param wait wait or poll
* @return the message or null
*/
public ItcMessage recieveMsg(boolean wait)
{
ItcMessage msg = null;
try{
if (wait)
msg = mMsgQueue.take();
else
msg = mMsgQueue.poll();
}
catch (Exception e)
{
logger.severe(e.toString());
}
return msg;
}
public int recieveMsgs(Collection<ItcMessage> collection, int maxElements)
{
try{
ItcMessage msg = mMsgQueue.take();
collection.add(msg);
mMsgQueue.drainTo(collection, maxElements);
}
catch (Exception e)
{
logger.severe(e.toString());
}
return collection.size();
}
/**
* reply to the send msg reply
* @param replyObject
*/
public void replyMsg(Object replyObject)
{
mReplyLock.lock();
try
{
mReplyObject = replyObject;
mReplyCond.signalAll();
} catch (Exception e)
{
logger.severe(e.toString());
}
finally {
mReplyLock.unlock();
}
}
public int getCurrentWaitingMsgCount()
{
return mMsgQueue.size();
}
}
package itc;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.junit.Test;
import itc.ItcMessage;
import itc.ItcMessageQueue;
public class TestItc
{
@Test
public void testITCSimpleMsgNoWait()
{
final ItcMessageQueue msgQueue = new ItcMessageQueue(1000);
System.out.println("start testITCSimpleMsgNoWait");
Thread thr = new Thread(){
@Override
public void run()
{
ItcMessage msg = msgQueue.recieveMsg(true);
while (msg != null && msg.getOpCode() != -1)
{
Integer num = (Integer)msg.getMsgObj();
System.out.println("recieve:" + num.toString());
try
{
Thread.sleep(10);
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
msg = msgQueue.recieveMsg(true);
}
}
};
// thr.start();
Thread thrMulti = new Thread(){
@Override
public void run()
{
final int MAX_SIZE = 300;
boolean bContinue = true;
Collection<ItcMessage> msgList = new ArrayDeque<ItcMessage>(MAX_SIZE); //LinkedList<ItcMessage>();
//ItcMessage msg = msgQueue.recieveMsg(true);
int msgCount = msgQueue.recieveMsgs(msgList, MAX_SIZE);
System.out.println("recieve msg count:" + String.valueOf(msgCount));
while (bContinue && msgCount > 0 && !msgList.isEmpty() )
{
System.out.println("recieve msg count:" + String.valueOf(msgCount));
for (ItcMessage msg : msgList)
{
if (msg.getOpCode() != -1)
{
Integer num = (Integer)msg.getMsgObj();
//System.out.println("recieve:" + num.toString());
// try
// {
// Thread.sleep(10);
// } catch (InterruptedException e)
// {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
}
else
{
bContinue = false;
break;
}
}
if (bContinue)
{
msgList.clear();
msgCount = msgQueue.recieveMsgs(msgList, MAX_SIZE);
System.out.println("recieve msg count:" + String.valueOf(msgCount));
}
else
break;
}
}
};
thrMulti.start();
// Thread thr1 = new Thread(){
//
// @Override
// public void run()
// {
// int fullCount = 0;
// for (int i = 500; i < 1000; i++)
// {
// try
// {
// Thread.sleep(10);
// } catch (InterruptedException e)
// {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// System.out.println("sending: " + String.valueOf(i));
// if ( msgQueue.sendMsg(0, new Integer(i), null) == false)
// {
// System.out.println("QUEUE FULL : " + String.valueOf(i));
// fullCount++;
// }
// }
// System.out.println("FULL QUEUE COUNT : " + String.valueOf(fullCount));
// // exit
// System.out.println("exitting from thread...");
// msgQueue.sendMsg(-1, new Integer(-1), null);
//
// }
//
// };
// thr1.start();
try
{
int fullCount = 0;
for (int i = 0; i < 5000; i++)
{
//Thread.sleep(10);
System.out.println("sending: " + String.valueOf(i));
if ( msgQueue.sendMsg(0, new Integer(i), null) == false)
{
System.out.println("QUEUE FULL : " + String.valueOf(i));
fullCount++;
}
}
System.out.println("FULL QUEUE COUNT : " + String.valueOf(fullCount));
System.out.println("exitting...");
msgQueue.sendMsg(-1, new Integer(-1), null);
// thr.join();
// thr1.join();
thrMulti.join();
//thr.join();
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("finish");
}
@Test
public void testITCSimpleMsgAndWait()
{
final ItcMessageQueue msgQueue = new ItcMessageQueue(100);
Thread thr = new Thread(){
@Override
public void run()
{
Random sleepTime = new Random();
ItcMessage msg = msgQueue.recieveMsg(true);
while (msg != null && msg.getOpCode() != -1)
{
Integer num = (Integer)msg.getMsgObj();
Integer newNum = num * 10;
System.out.println("recieve:" + num.toString() + ", reply with:" + newNum.toString());
try
{
Thread.sleep(sleepTime.nextInt(500));
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
msgQueue.replyMsg(newNum);
msg = msgQueue.recieveMsg(true);
}
//super.run();
}
};
thr.start();
try
{
for (int i = 0; i < 10; i++)
{
//Thread.sleep(10);
System.out.println("sending: " + String.valueOf(i));
Integer retVal = (Integer)msgQueue.sendSessionMsgWaitReply(0, new Integer(i), null,100);
if (retVal != null)
{
System.out.println("Got Reply: " + retVal.toString());
}
else {
System.out.println("wait timeout for:" + String.valueOf(i));
}
}
System.out.println("exitting...");
msgQueue.sendMsg(-1, new Integer(-1), null);
thr.join();
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("finish");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment