RPC-手写RPC调用过程

rpc 全称remote procedure call 远程过程调用,是一种分布式服务调用协议。

需求:分布式环境下,服务A想要调用服务B的某方法,就像调用自己本地的方法一样

分析:

  • 首先,服务A需要知道服务B提供了哪些方法,并且知道这些方法的调用方式(参数列表,返回类型)
  • 其次,服务A需要将调用的方法和参数发送给服务B,B接收到之后本地调用获得结果,再将结果发送给服务A

设计:

  • 要提供一套统一的接口让调用方服务A知道有哪些方法可供调用,服务B作为接口的实现方。
  • 服务A要提供接口的代理类工厂,本地调用接口方法时,代理类能将调用请求发送出去。
  • 数据传输方面,暂不考虑性能,使用BIO以及JDK序列化方式

开始编码:

提供一套接口:创建maven项目rpc-api,项目中添加接口文件

1
2
3
public interface IHelloService {
String sayHello(String var1);
}

接口中还需要指定一个简要的rpc协议,调用方和服务方都要遵循此协议

1
2
3
4
5
public class RpcRequest implements Serializable {
private String className;
private String methodName;
private Object[] args;
}

将rpc-api install到本地仓库。

调用方服务A:创建maven项目rpc-client,pom中添加依赖rpc-api

先来写main方法:

1
2
3
4
5
6
7
8
9
public class App {
public static void main( String[] args ) {
RpcProxyFactory rpcProxyFactory = new RpcProxyFactory();
IHelloService helloService =
rpcProxyFactory.newProxyInstance(IHelloService.class,"localhost",8080);
Object o = helloService.sayHello("zzk");
System.out.println((String) o);
}
}

流程很清晰:

  1. 创建代理工厂RpcProxyFactory
  2. 使用代理工厂类生成指定接口的代理对象
  3. 调用接口方法获取结果
1
2
3
4
5
6
public class RpcProxyFactory {
public <T> T newProxyInstance(final Class<T> interfaceClass, final String host, final int port){
return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[] {interfaceClass},
new RemoteInvocationHandler(host,port));
}
}

代理方式我选用的是jdk的动态代理,创建代理对象需要传进三个参数

  • 类加载器
  • 需要代理的接口
  • 触发管理类

前两个参数都是现成的,编写触发管理类代码,实现InvocationHandler接口,通过代理对象调用接口方法都会进到invoke方法中来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RemoteInvocationHandler implements InvocationHandler {
private String host;
private int port;

public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setArgs(args);
return new RpcNetTransport(host,port).send(rpcRequest);
}
}

invoke 方法负责将请求参数序列化,并发送出去,这里使用一个类专门负责发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class RpcNetTransport {
private String host;
private int port;

public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}

public Object send(RpcRequest rpcRequest){
try(Socket socket = new Socket(host,port);
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream in = new ObjectInputStream(socket.getInputStream())) {
out.writeObject(rpcRequest);
return in.readObject();
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}

至此调用发代码完成

再来看服务方B的代码编写:创建maven项目,引入依赖rpc-api、spring-context来管理对象

首先服务方应该实现接口:

1
2
3
4
5
6
7
8
@Service
public class HelloServiceImpl implements IHelloService {
@Override
public String sayHello(String content) {
System.out.println("request sayHello from : " + content);
return "Response: hello, " + content;
}
}

服务器需要接收请求,代码思想:启动监听指定端口,这里使用了spring的InitializingBean接口,创建RpcServer时,当port设置成功之后会执行afterPropertiesSet()方法启动监听。

此外,我们将对象交给spring管理后,当请求进来我们需要找到正确service去执行,我的做法是让RpcServer实现ApplicationContextAware接口,这样RpcServer可以在setApplicationContext方法中,将所有的service对象取出来缓存,请求进来直接在缓存中找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class RpcServer implements InitializingBean,ApplicationContextAware {
private ExecutorService pool = Executors.newCachedThreadPool();
private int port;
private Map<String,Object> serviceObjs = new HashMap<>();

public RpcServer(int port) {
this.port = port;
}

@Override
public void afterPropertiesSet() throws Exception {
// socket 通信 发布服务
try (ServerSocket serverSocket = new ServerSocket(port)){

while (true){
Socket socket = serverSocket.accept();
pool.execute(new ProcessorHandler(socket,serviceObjs));
}
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取容器中的服务bean 封装成map
Map<String,Object> beans =
applicationContext.getBeansWithAnnotation(RpcService.class);
for(Object service : beans.values()){
Class<?> clazz = service.getClass();
String serviceKey = rpcService.value().getName();
serviceObjs.put(serviceKey,service);
}
}
}

当获取请求之后,丢给线程池进行执行,那我继续编写线程池执行任务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ProcessorHandler implements Runnable {
private Socket socket;
private Map<String,Object> serviceObj;
public ProcessorHandler(Socket socket,Map<String,Object> serviceObj) {
this.socket = socket;
this.serviceObj = serviceObj;
}
@Override
public void run() {
try (ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream())){

RpcRequest rpcRequest = (RpcRequest) in.readObject();
Object result = invoke(rpcRequest);
out.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
}
}

private Object invoke(RpcRequest rpcRequest) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Object[] args = rpcRequest.getArgs();
Class<?>[] types = new Class[args.length];
for(int i = 0; i < args.length; i++){
types[i]= args[i].getClass();
}
Class clazz = Class.forName(rpcRequest.getClassName());
Method method = clazz.getMethod(rpcRequest.getMethodName(),types);
String serviceKey = rpcRequest.getClassName() + rpcRequest.getVersion();
return method.invoke(serviceObj.get(serviceKey),args);
}
}

既然是线程池执行的任务,肯定是Runnable对象,run方法的逻辑很清晰:从socket中获取请求对象,丢给invoke返回结果,在通过socket发送出去。

重点在invoke方法:通过反射的方式对方法进行调用

还有spring的最后一步,配置和启动:

1
2
3
4
5
6
7
8
@Configuration
@ComponentScan(basePackages = "com.pd")
public class SpringConfig{
@Bean
public RpcServer rpcServer(){
return new RpcServer(8080);
}
}
1
2
3
4
5
6
7
public class App {
public static void main( String[] args ) {
AnnotationConfigApplicationContext context =
new AnnotationConfigApplicationContext(SpringConfig.class);
context.start();
}
}

运行:

先启动rpc-server 再启动rpc-client

结果:Response: hello, zzk

总结流程图示:

rpc-基础流程

参考代码