手写RPC 之 Hrpc Version 1.0

Github地址

Github: https://github.com/housirvip/hrpc

原理

客户端,通过接口调用,如 UserService,UserService.login(username, password),实际上是通过动态代理,代理执行 login 方法,在执行逻辑中,通过网络调用服务端的 UserService 实际实现类。

服务端,通过网络获取到请求的 className,method,以及各种参数 args,来反射调用 UserService 的实现类,并将执行结果,通过网络返回至客户端中。

公共代码

放在 core 包中,HrpcRequest,HrpcResponse,HrpcService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package vip.housir.hrpc.core;

import java.io.Serializable;

/**
* @author housirvip
*/
public class HrpcResponse implements Serializable {

private int statusCode;

private Object body;

private String message;

public int getStatusCode() {
return statusCode;
}

public void setStatusCode(int statusCode) {
this.statusCode = statusCode;
}

public Object getBody() {
return body;
}

public void setBody(Object body) {
this.body = body;
}

public String getMessage() {
return message;
}

public void setMessage(String message) {
this.message = message;
}

@Override
public String toString() {
return "HrpcResponse{" +
"statusCode=" + statusCode +
", body=" + body +
", message='" + message + '\'' +
'}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package vip.housir.hrpc.core;

import java.io.Serializable;
import java.util.Arrays;

/**
* @author housirvip
*/
public class HrpcRequest implements Serializable {

private String className;

private String methodName;

private Object[] args;

private Class<?>[] types;

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}

public Object[] getArgs() {
return args;
}

public void setArgs(Object[] args) {
this.args = args;
}

public Class<?>[] getTypes() {
return types;
}

public void setTypes(Class<?>[] types) {
this.types = types;
}

@Override
public String toString() {
return "HrpcRequest{" +
"className='" + className + '\'' +
", methodName='" + methodName + '\'' +
", args=" + Arrays.toString(args) +
", types=" + Arrays.toString(types) +
'}';
}
}

自定义 service 均继承于 HrpcService

1
2
3
4
5
6
7
8
9
package vip.housir.hrpc.core;

import java.io.Serializable;

/**
* @author housirvip
*/
public interface HrpcService extends Serializable {
}

Client 包

动态代理的具体实现方法,实际通过网络远程调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package vip.housir.hrpc.client.proxy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import vip.housir.hrpc.core.HrpcRequest;
import vip.housir.hrpc.core.HrpcResponse;
import vip.housir.hrpc.client.transport.Transport;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
* @author housirvip
*/
public class RequestInvocationHandler implements InvocationHandler {

private static final Logger logger = LoggerFactory.getLogger(RequestInvocationHandler.class);

private final Transport transport;

public RequestInvocationHandler(Transport transport) {
this.transport = transport;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

HrpcRequest request = new HrpcRequest();
request.setMethodName(method.getName());
request.setClassName(method.getDeclaringClass().getName());
request.setArgs(args);
request.setTypes(method.getParameterTypes());

logger.debug(request.toString());
HrpcResponse response = transport.send(request);
logger.debug(response.toString());

return response.getBody();
}
}

代理工厂,创建代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package vip.housir.hrpc.client.proxy;

import vip.housir.hrpc.client.transport.SocketTransport;

import java.lang.reflect.Proxy;

/**
* @author housirvip
*/
public class RequestProxyFactory {

@SuppressWarnings("unchecked")
public static <T> T createProxy(final Class<T> clazz, SocketTransport transport) {
return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
new Class<?>[]{clazz}, new RequestInvocationHandler(transport));
}
}

网络请求的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package vip.housir.hrpc.client.transport;

import vip.housir.hrpc.core.HrpcRequest;
import vip.housir.hrpc.core.HrpcResponse;

/**
* @author housirvip
*/
public interface Transport {

/**
* 远程调用,发送请求
*
* @param request HrpcRequest
* @return HrpcResponse
*/
HrpcResponse send(HrpcRequest request);
}

网络请求接口的实现方式,通过 socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package vip.housir.hrpc.client.transport;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import vip.housir.hrpc.core.HrpcRequest;
import vip.housir.hrpc.core.HrpcResponse;

import java.io.*;
import java.net.Socket;

/**
* @author housirvip
*/
public class SocketTransport implements Transport {

private static final Logger logger = LoggerFactory.getLogger(SocketTransport.class);

private final String host;

private final int port;

public SocketTransport(String host, int port) {
this.host = host;
this.port = port;
}

@Override
public HrpcResponse send(HrpcRequest request) {

Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
HrpcResponse response = null;
try {
socket = new Socket(host, port);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(request);
outputStream.flush();
inputStream = new ObjectInputStream(socket.getInputStream());
response = (HrpcResponse) inputStream.readObject();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
if (inputStream != null) {
inputStream.close();
}
if (outputStream != null) {
outputStream.close();
}
if (socket != null) {
socket.close();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
return response;
}
}

Server 包

服务发布接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package vip.housir.hrpc.server.publisher;

import vip.housir.hrpc.core.HrpcService;

import java.util.Map;

/**
* @author housirvip
*/
public interface Publisher {

/**
* 发布服务
*
* @param port 端口号
* @param services 发布的服务
*/
void publish(int port, Map<String, HrpcService> services);

/**
* 停止服务发布
*/
void shutdown();
}

服务发布接口的实现,通过线程池来处理多客户端的请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package vip.housir.hrpc.server.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import vip.housir.hrpc.core.HrpcService;
import vip.housir.hrpc.server.processor.ProcessorThread;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.*;

/**
* @author housirvip
*/
public class SocketPublisher implements Publisher {

private static final Logger logger = LoggerFactory.getLogger(SocketPublisher.class);

private final ExecutorService executor;

public SocketPublisher() {
executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}

@Override
public void publish(int port, Map<String, HrpcService> services) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);

while (true) {
Socket socket = serverSocket.accept();
executor.execute(new ProcessorThread(socket, services));
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
try {
if (serverSocket != null) {
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

@Override
public void shutdown() {
executor.shutdown();
}
}

socket 通讯,接到请求后的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package vip.housir.hrpc.server.processor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import vip.housir.hrpc.core.HrpcRequest;
import vip.housir.hrpc.core.HrpcResponse;
import vip.housir.hrpc.core.HrpcService;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.Map;

/**
* @author housirvip
*/
public class ProcessorThread implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(ProcessorThread.class);

private final Socket socket;

private final Map<String, HrpcService> services;

public ProcessorThread(Socket socket, Map<String, HrpcService> services) {
this.socket = socket;
this.services = services;
}

@Override
public void run() {

ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(socket.getInputStream());
outputStream = new ObjectOutputStream(socket.getOutputStream());

HrpcRequest request = (HrpcRequest) inputStream.readObject();
logger.debug(request.toString());

Class<?> clazz = Class.forName(request.getClassName());
Method method = clazz.getMethod(request.getMethodName(), request.getTypes());
HrpcService service = services.get(request.getClassName());
Object body = method.invoke(service, request.getArgs());

HrpcResponse response = new HrpcResponse();
response.setBody(body);
response.setStatusCode(200);
logger.debug(response.toString());

outputStream.writeObject(response);
outputStream.flush();

} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
try {
if (inputStream != null) {
inputStream.close();
}
if (outputStream != null) {
outputStream.close();
}
socket.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}

Demo 调用示例

业务 service 的接口,UserService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package vip.housir.hrpc.demo.service;

import vip.housir.hrpc.core.HrpcService;

/**
* @author housirvip
*/
public interface UserService extends HrpcService {

/**
* 实现用户登录功能
*
* @param username 用户名
* @param password 密码
* @return 用户令牌
*/
String login(String username, String password);
}

业务 service 的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package vip.housir.hrpc.demo.service.impl;

import vip.housir.hrpc.demo.service.UserService;

/**
* @author housirvip
*/
public class UserServiceImpl implements UserService {

@Override
public String login(String username, String password) {
return "这是server端返回的结果:" + username + "@" + password;
}
}

HrpcClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package vip.housir.hrpc.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import vip.housir.hrpc.demo.service.UserService;
import vip.housir.hrpc.client.proxy.RequestProxyFactory;
import vip.housir.hrpc.client.transport.SocketTransport;

/**
* @author housirvip
*/
public class HrpcClient {

private static final Logger logger = LoggerFactory.getLogger(HrpcClient.class);

public static void main(String[] args) {
SocketTransport transport = new SocketTransport("localhost", 7788);
UserService userService = RequestProxyFactory.createProxy(UserService.class, transport);

for (int i = 0; i < 1000; i++) {
String token = userService.login("housir", "vip");
logger.info(token);
}
}
}

HrpcServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package vip.housir.hrpc.demo;

import vip.housir.hrpc.demo.service.UserService;
import vip.housir.hrpc.core.HrpcService;
import vip.housir.hrpc.server.publisher.SocketPublisher;
import vip.housir.hrpc.demo.service.impl.UserServiceImpl;

import java.util.HashMap;
import java.util.Map;

/**
* @author housirvip
*/
public class HrpcServer {

public static void main(String[] args) {

Map<String, HrpcService> services = new HashMap<>(8);
services.put(UserService.class.getName(), new UserServiceImpl());

SocketPublisher processor = new SocketPublisher();
processor.publish(7788, services);
}
}

输出结果

Server

Client

TODO

在之后的 Hrpc version 2.3.4……..n 中预计实现,可能版本号起的很随意哈哈

  • 引入 Netty 进行 io 多路复用
  • 支持服务的注册和发现
  • 服务优雅的关闭
  • 支持 bean 的容器化管理,实现通过注解注入依赖
  • 支持多种类型序列化编码方式 json,xml,protobuf
作者

housirvip

发布于

2020-09-15

更新于

2023-01-01

许可协议

评论