Step by step玩转RPC

JerryXia 发表于 , 阅读 (0)

RPC是实现SOA的基础,我在项目中多次玩过dubbo等SOA框架了,这种远程方法调用的方式不仅有趣而且有非常重要的现实意义。可以让我们把庞大的系统拆分成许多模块,每个模块又可以根据不同的压力启动不同数量的实例,模块间通过RPC透明地通信,从而将集中式的系统改造成分布式应用提高其扩展性,优化硬件资源利用率。

闲来无事,我们也来一步步手动实现RPC来感受下其中的乐趣吧。

第0步:写服务

甭管是否是远程方法调用,写两个可以调用的服务先。

GreetingService

CalculateService

很简单,一个是打招呼服务,两个方法,一个say hello,一个say byebye。另一个是计算服务,一个加,一个减。

另外,再整个简单的线程池来提高下资源利用率thread pool

第1步:socket通信实现RPC

1.服务端发布服务
bio service分别把GreetingService注册到3456端口,CalculateService注册到6543端口。

2.客户端订阅服务
bio client

3.管理发布和订阅的框架
bio rpcframework

4.服务发布线程
bio workthread服务发布的实质就是创建ServerSocket,监听服务发布的端口,当接收到请求时,根据请求中的方法名和参数动态调用方法,并把结果返回给客户端。

5.客户端代理
bio proxy用动态代理模式,使得客户端调用方法的时候,实际上是建立socket连接,把方法和参数传给服务端,并接收服务端返回的结果。

运行结果

第2步:nio改写代码

上一步我们已经实现了RPC模型,不过仍有很多不足,比如我们用的是阻塞IO的方式进行的通信,因为远程方法的调用不需要建立长连接,所以用非阻塞IO的方式可以大大提高效率。

1.客户端和服务端的Customer.java和Provider.java不变,因为我们只是改变通讯方式。

2.RpcFramework.java中的publish方法需要稍加改动
nio publish在bio的方式中,我们为每个service建立一个ServerSocket,而这里,我们创建一个selector,然后为每个service创建一个channel注册到selector中,每个channel处于OP_ACCETP状态。

3.WorkThread.java修改如下

public class WorkThread implements Runnable {    private Selector selector;    private Map<Integer, Object> map;    private final static int BLOCK = 4096;    private ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);    private ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK);    public WorkThread(Selector selector, Map<Integer, Object> map) {        this.selector = selector;        this.map = map;    }    @Override    public void run() {        while(true) {            try {                selector.select();                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();                while(iter.hasNext()) {                    SelectionKey sk = iter.next();                    iter.remove();                    ServerSocketChannel server = null;                    SocketChannel client = null;                    int count = 0;                    if(sk.isAcceptable()) {                        server = (ServerSocketChannel)sk.channel();                        client = server.accept();                        client.configureBlocking(false);                        client.register(selector, SelectionKey.OP_READ);                    } else if(sk.isReadable()) {                        client = (SocketChannel)sk.channel();                        receiveBuffer.clear();                        count = client.read(receiveBuffer);                        if(count > 0) {                            String s = new String(receiveBuffer.array(),0,count);                            ObjectMapper mapper = new ObjectMapper();                            JsonNode node = mapper.readTree(s);                            String methodName = node.get("method").asText();                            JsonNode ptNode = node.get("parameterType");                            JsonNode pvNode = node.get("args");                            Object result = null;                            if(ptNode.isArray() && pvNode.isArray()) {                                int length = ptNode.size();                                Class[] paramTypes = new Class[length];                                for(int i = 0; i < length; i++) {                                    paramTypes[i] = Class.forName(ptNode.get(i).asText());                                }                                Object[] args = new Object[length];                                for(int i = 0; i < length; i++) {                                    args[i] = pvNode.get(i).isInt()? Integer.valueOf(pvNode.get(i).asInt()): pvNode.get(i).asText();                                }                                int port = ((InetSocketAddress)client.getLocalAddress()).getPort();                                Object service = map.get(port);                                Method method = service.getClass().getMethod(methodName, paramTypes);                                result = method.invoke(service, args);                            }                            sendBuffer.clear();                            sendBuffer.put(result.toString().getBytes());                            sendBuffer.flip();                            client.write(sendBuffer);                            client.register(selector, SelectionKey.OP_READ);                        }                        client.close();                        sk.cancel();                    }                }            } catch (Exception e) {                e.printStackTrace();            }        }    }}

这里为了方便调试,使用了json字符串传递数据。

4.InvocationProxy.java修改如下

package com.rick.archi.soa.nio_tcp_rpc;import org.codehaus.jackson.map.ObjectMapper;  import org.codehaus.jackson.node.ArrayNode;  import org.codehaus.jackson.node.ObjectNode;import java.io.ObjectInputStream;  import java.io.ObjectOutputStream;  import java.lang.reflect.InvocationHandler;  import java.lang.reflect.Method;  import java.net.InetSocketAddress;  import java.net.Socket;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.SocketChannel;  import java.util.Iterator;  import java.util.Set;public class InvocationProxy implements InvocationHandler {    private String host;    private int port;    private final static int BLOCK = 4096;    private ByteBuffer sendBuffer = ByteBuffer.allocate(BLOCK);    private ByteBuffer receiveBuffer = ByteBuffer.allocate(BLOCK);    public InvocationProxy(String host, int port) {        this.host = host;        this.port = port;    }    @Override    public Object invoke(Object proxy, Method method, Object[] args)            throws Throwable {        SocketChannel sc = SocketChannel.open();        sc.configureBlocking(false);        Selector selector = Selector.open();        sc.register(selector, SelectionKey.OP_CONNECT);        sc.connect(new InetSocketAddress(host, port));        Iterator<SelectionKey> iterator;        SelectionKey selectionKey;        SocketChannel client;        String result = "";        int count=0;        boolean finish = false;        while(!finish) {            selector.select();            iterator = selector.selectedKeys().iterator();            while (iterator.hasNext()) {                selectionKey = iterator.next();                if (selectionKey.isConnectable()) {                    client = (SocketChannel) selectionKey.channel();                    if (client.isConnectionPending()) {                        client.finishConnect();                        sendBuffer.clear();                        ObjectMapper objectMapper = new ObjectMapper();                        ObjectNode node = objectMapper.createObjectNode();                        Class<?>[] clazz = method.getParameterTypes();                        ArrayNode array1 = objectMapper.valueToTree(clazz);                        ArrayNode array2 = objectMapper.valueToTree(args);                        node.put("method", method.getName());                        node.put("parameterType", array1);                        node.put("args", array2);                        String s = node.toString();                        sendBuffer.put(s.getBytes());                        sendBuffer.flip();                        client.write(sendBuffer);                    }                    client.register(selector, SelectionKey.OP_READ);                } else if (selectionKey.isReadable()) {                    client = (SocketChannel) selectionKey.channel();                    receiveBuffer.clear();                    count = client.read(receiveBuffer);                    if(count>0){                        result = new String( receiveBuffer.array(),0,count);                    }                    finish = true;                    client.close();                    selectionKey.cancel();                }                selector.selectedKeys().clear();            }