上个版本的代码看着比较生肉,与平时开发时的写法差别较大。
接下来这版,加入了动态代理后
动态代理-guava版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import com.google.common.reflect.Reflection;
class DemoGuava {
interface MyMath1 {
public Integer sum(Integer a, Integer b);
}
static class MyMathImpl implements MyMath1 {
@Override
public Integer sum(Integer a, Integer b) {
return a + b;
}
}
public static void main(String[] args) {
MyMath1 tg = new MyMathImpl();
// 创建对象代理
MyMath1 math = (MyMath1) Reflection.newProxy(MyMath1.class, (proxy, method, args1) -> {
long startTime = System.nanoTime();
Object result = method.invoke(tg, args1);
long estimatedTime = System.nanoTime() - startTime;
System.out.println("方法耗时-" + estimatedTime);
return result;
});
// 调用被代理后的方法
System.out.println(math.sum(1, 2));
}
}
|
v3版本的示例
所用到包
1
2
3
4
5
6
7
8
9
|
import java.io.*;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import com.google.common.reflect.Reflection;
|
序列化工具类-不变
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
class Util {
public static byte[] obj2byte(Object obj) {
byte[] bytes = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
oos.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
oos.close();
bos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
bytes = bos.toByteArray();
return bytes;
}
public static Object byte2Obj(byte[] bytes) {
Object obj = null;
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = null;
try {
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
try {
ois.close();
bis.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return obj;
}
}
|
服务接口及实现类-不变
1
2
3
4
5
6
7
8
9
10
11
|
interface HelloService {
public String echo(String msg);
}
class HelloServiceImpl implements HelloService {
@Override
public String echo(String msg) {
String result = "【" + msg + "】 is deal";
return result;
}
}
|
RpcPojo-添加参数
1
2
3
4
5
6
7
8
9
|
class RpcPojo implements Serializable {
private static final long serialVersionUID = -1055524749130316395L;
Integer taskId;
String ifaceName;
String methondName;
Object[] params;
Class[] paramsTypes;
Object result;
}
|
增加-通信桩
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
abstract class RpcNet implements Runnable {
final LinkedBlockingQueue<byte[]> reqMsgBox;
final LinkedBlockingQueue<byte[]> respMsgBox;
protected RpcNet(LinkedBlockingQueue<byte[]> reqMsgBox, LinkedBlockingQueue<byte[]> respMsgBox) {
this.reqMsgBox = reqMsgBox;
this.respMsgBox = respMsgBox;
}
}
class ProviderRpcStub extends RpcNet {
final HashMap serviceContext;
public ProviderRpcStub(LinkedBlockingQueue<byte[]> reqMsgBox, LinkedBlockingQueue<byte[]> respMsgBox, HashMap serviceContext) {
super(reqMsgBox, respMsgBox);
this.serviceContext = serviceContext;
}
@Override
public void run() {
while (true) {
try {
// 模拟网络传输-数据解包
RpcPojo task = (RpcPojo) Util.byte2Obj(reqMsgBox.take());
Class<?> cls = Class.forName(task.ifaceName);
Method method = cls.getMethod(task.methondName, task.paramsTypes);
Object serviceTarget = serviceContext.get(task.ifaceName);
Object result = method.invoke(serviceTarget, task.params);
task.result = result;
respMsgBox.add(Util.obj2byte(task));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
class ClientRpcStub extends RpcNet {
final Map<Integer, Thread> callerMap = new ConcurrentHashMap(2 << 4);
final Map<Integer, Object> resultMap = new ConcurrentHashMap(2 << 4);
final AtomicInteger idRepo = new AtomicInteger(1);
public ClientRpcStub(LinkedBlockingQueue<byte[]> reqMsgBox, LinkedBlockingQueue<byte[]> respMsgBox) {
super(reqMsgBox, respMsgBox);
}
public Object execAtRemote(RpcPojo task, Thread t) {
task.taskId = idRepo.getAndIncrement();
callerMap.put(task.taskId, t);
// 模拟网络传输
reqMsgBox.add(Util.obj2byte(task));
// 远程调用-超时时间
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
Object result = resultMap.remove(task.taskId);
if (result == null) {
throw new RuntimeException("响应超时");
}
return result;
}
@Override
public void run() {
while (true) {
try {
RpcPojo resp = (RpcPojo) Util.byte2Obj(respMsgBox.take());
Thread t = callerMap.remove(resp.taskId);
resultMap.put(resp.taskId, resp.result);
LockSupport.unpark(t);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
|
在进行远程调用时,需要把调用参数通过网络传送给provider节点,以及执行结果的回传
这类通用操作当然有专门的电报员
运行测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
class TestMockNetRpc {
static HelloService tagert = new HelloServiceImpl();
static HashMap ifaceMap = new HashMap();
static byte[] netDatas = null;
public static void main(String[] args) throws Exception {
LinkedBlockingQueue<byte[]> reqMsgBox = new LinkedBlockingQueue(2 << 4);
LinkedBlockingQueue<byte[]> respMsgBox = new LinkedBlockingQueue(2 << 4);
HashMap serviceContext = new HashMap();
// jvm1-类比 向spring去注册provider
serviceContext.put(HelloService.class.getName(), new HelloServiceImpl());
ProviderRpcStub providerStub = new ProviderRpcStub(reqMsgBox, respMsgBox, serviceContext);
ClientRpcStub clientStub = new ClientRpcStub(reqMsgBox, respMsgBox);
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(providerStub);
pool.execute(clientStub);
Class<HelloService> clientProxyTargetClass = HelloService.class;
// jvm2-类比 消费者从spring中取到的service
HelloService clientHelloService = Reflection.newProxy(clientProxyTargetClass, (proxy, method1, args1) -> {
RpcPojo clientSide = new RpcPojo();
clientSide.ifaceName = clientProxyTargetClass.getName();
clientSide.methondName = method1.getName();
clientSide.params = args1;
clientSide.paramsTypes = new Class[args1.length];
for (int i = clientSide.params.length - 1; i >= 0; i--) {
clientSide.paramsTypes[i] = clientSide.params[i].getClass();
}
Object remoteResult = clientStub.execAtRemote(clientSide, Thread.currentThread());
return remoteResult;
});
String result = clientHelloService.echo("v3-版本");
System.out.println(result);
System.exit(0);
}
}
|
这里没有启动两个jvm进行跨jvm调用,把跨网络调用的的传输步骤给简化掉了。
文章作者
duansheli
上次更新
2019-12-25
(325c7b3)