上个版本的代码看着比较生肉,与平时开发时的写法差别较大。 接下来这版,加入了动态代理后

动态代理-guava版

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.google.common.reflect.Reflection;

class DemoGuava {

    interface MyMath1 {
        public Integer sum(Integer a, Integer b);
    }

    static class MyMathImpl implements MyMath1 {
        @Override
        public Integer sum(Integer a, Integer b) {
            return a + b;
        }
    }

    public static void main(String[] args) {
        MyMath1 tg = new MyMathImpl();

        // 创建对象代理
        MyMath1 math = (MyMath1) Reflection.newProxy(MyMath1.class, (proxy, method, args1) -> {
            long startTime = System.nanoTime();
            Object result = method.invoke(tg, args1);
            long estimatedTime = System.nanoTime() - startTime;
            System.out.println("方法耗时-" + estimatedTime);
            return result;
        });
        // 调用被代理后的方法
        System.out.println(math.sum(1, 2));
    }
}

v3版本的示例

所用到包

1
2
3
4
5
6
7
8
9
import java.io.*;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

import com.google.common.reflect.Reflection;

序列化工具类-不变

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class Util {
    public static byte[] obj2byte(Object obj) {
        byte[] bytes = null;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = null;
        try {
            oos = new ObjectOutputStream(bos);
            oos.writeObject(obj);
            oos.flush();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                oos.close();
                bos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        bytes = bos.toByteArray();
        return bytes;
    }

    public static Object byte2Obj(byte[] bytes) {
        Object obj = null;
        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
        ObjectInputStream ois = null;
        try {
            ois = new ObjectInputStream(bis);
            obj = ois.readObject();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            try {
                ois.close();
                bis.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return obj;
    }
}

服务接口及实现类-不变

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
interface HelloService {
    public String echo(String msg);
}

class HelloServiceImpl implements HelloService {
    @Override
    public String echo(String msg) {
        String result = "【" + msg + "】 is deal";
        return result;
    }
}

RpcPojo-添加参数

1
2
3
4
5
6
7
8
9
class RpcPojo implements Serializable {
    private static final long serialVersionUID = -1055524749130316395L;
    Integer taskId;
    String ifaceName;
    String methondName;
    Object[] params;
    Class[] paramsTypes;
    Object result;
}

增加-通信桩

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
abstract class RpcNet implements Runnable {
    final LinkedBlockingQueue<byte[]> reqMsgBox;
    final LinkedBlockingQueue<byte[]> respMsgBox;


    protected RpcNet(LinkedBlockingQueue<byte[]> reqMsgBox, LinkedBlockingQueue<byte[]> respMsgBox) {
        this.reqMsgBox = reqMsgBox;
        this.respMsgBox = respMsgBox;
    }
}

class ProviderRpcStub extends RpcNet {
    final HashMap serviceContext;

    public ProviderRpcStub(LinkedBlockingQueue<byte[]> reqMsgBox, LinkedBlockingQueue<byte[]> respMsgBox, HashMap serviceContext) {
        super(reqMsgBox, respMsgBox);
        this.serviceContext = serviceContext;
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 模拟网络传输-数据解包
                RpcPojo task = (RpcPojo) Util.byte2Obj(reqMsgBox.take());

                Class<?> cls = Class.forName(task.ifaceName);
                Method method = cls.getMethod(task.methondName, task.paramsTypes);
                Object serviceTarget = serviceContext.get(task.ifaceName);
                Object result = method.invoke(serviceTarget, task.params);
                task.result = result;
                respMsgBox.add(Util.obj2byte(task));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

class ClientRpcStub extends RpcNet {
    final Map<Integer, Thread> callerMap = new ConcurrentHashMap(2 << 4);
    final Map<Integer, Object> resultMap = new ConcurrentHashMap(2 << 4);
    final AtomicInteger idRepo = new AtomicInteger(1);

    public ClientRpcStub(LinkedBlockingQueue<byte[]> reqMsgBox, LinkedBlockingQueue<byte[]> respMsgBox) {
        super(reqMsgBox, respMsgBox);
    }

    public Object execAtRemote(RpcPojo task, Thread t) {
        task.taskId = idRepo.getAndIncrement();
        callerMap.put(task.taskId, t);
        // 模拟网络传输
        reqMsgBox.add(Util.obj2byte(task));
        // 远程调用-超时时间
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
        Object result = resultMap.remove(task.taskId);
        if (result == null) {
            throw new RuntimeException("响应超时");
        }
        return result;
    }

    @Override
    public void run() {
        while (true) {
            try {
                RpcPojo resp = (RpcPojo) Util.byte2Obj(respMsgBox.take());
                Thread t = callerMap.remove(resp.taskId);
                resultMap.put(resp.taskId, resp.result);
                LockSupport.unpark(t);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

在进行远程调用时,需要把调用参数通过网络传送给provider节点,以及执行结果的回传
这类通用操作当然有专门的电报员

运行测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class TestMockNetRpc {
    static HelloService tagert = new HelloServiceImpl();
    static HashMap ifaceMap = new HashMap();
    static byte[] netDatas = null;

    public static void main(String[] args) throws Exception {
        LinkedBlockingQueue<byte[]> reqMsgBox = new LinkedBlockingQueue(2 << 4);
        LinkedBlockingQueue<byte[]> respMsgBox = new LinkedBlockingQueue(2 << 4);

        HashMap serviceContext = new HashMap();
        // jvm1-类比 向spring去注册provider
        serviceContext.put(HelloService.class.getName(), new HelloServiceImpl());

        ProviderRpcStub providerStub = new ProviderRpcStub(reqMsgBox, respMsgBox, serviceContext);
        ClientRpcStub clientStub = new ClientRpcStub(reqMsgBox, respMsgBox);
        ExecutorService pool = Executors.newFixedThreadPool(2);
        pool.execute(providerStub);
        pool.execute(clientStub);

        Class<HelloService> clientProxyTargetClass = HelloService.class;
        // jvm2-类比 消费者从spring中取到的service
        HelloService clientHelloService = Reflection.newProxy(clientProxyTargetClass, (proxy, method1, args1) -> {
            RpcPojo clientSide = new RpcPojo();
            clientSide.ifaceName = clientProxyTargetClass.getName();
            clientSide.methondName = method1.getName();

            clientSide.params = args1;
            clientSide.paramsTypes = new Class[args1.length];
            for (int i = clientSide.params.length - 1; i >= 0; i--) {
                clientSide.paramsTypes[i] = clientSide.params[i].getClass();
            }
            Object remoteResult = clientStub.execAtRemote(clientSide, Thread.currentThread());
            return remoteResult;
        });

        String result = clientHelloService.echo("v3-版本");
        System.out.println(result);

        System.exit(0);
    }
}

这里没有启动两个jvm进行跨jvm调用,把跨网络调用的的传输步骤给简化掉了。