Github地址
Github: https://github.com/housirvip/hrpc
原理
客户端,通过接口调用,如 UserService,UserService.login(username, password),实际上是通过动态代理,代理执行 login 方法,在执行逻辑中,通过网络调用服务端的 UserService 实际实现类。
服务端,通过网络获取到请求的 className,method,以及各种参数 args,来反射调用 UserService 的实现类,并将执行结果,通过网络返回至客户端中。
公共代码
放在 core 包中,HrpcRequest,HrpcResponse,HrpcService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package vip.housir.hrpc.core;
import java.io.Serializable;
public class HrpcResponse implements Serializable {
private int statusCode;
private Object body;
private String message;
public int getStatusCode() { return statusCode; }
public void setStatusCode(int statusCode) { this.statusCode = statusCode; }
public Object getBody() { return body; }
public void setBody(Object body) { this.body = body; }
public String getMessage() { return message; }
public void setMessage(String message) { this.message = message; }
@Override public String toString() { return "HrpcResponse{" + "statusCode=" + statusCode + ", body=" + body + ", message='" + message + '\'' + '}'; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package vip.housir.hrpc.core;
import java.io.Serializable; import java.util.Arrays;
public class HrpcRequest implements Serializable {
private String className;
private String methodName;
private Object[] args;
private Class<?>[] types;
public String getClassName() { return className; }
public void setClassName(String className) { this.className = className; }
public String getMethodName() { return methodName; }
public void setMethodName(String methodName) { this.methodName = methodName; }
public Object[] getArgs() { return args; }
public void setArgs(Object[] args) { this.args = args; }
public Class<?>[] getTypes() { return types; }
public void setTypes(Class<?>[] types) { this.types = types; }
@Override public String toString() { return "HrpcRequest{" + "className='" + className + '\'' + ", methodName='" + methodName + '\'' + ", args=" + Arrays.toString(args) + ", types=" + Arrays.toString(types) + '}'; } }
|
自定义 service 均继承于 HrpcService
1 2 3 4 5 6 7 8 9
| package vip.housir.hrpc.core;
import java.io.Serializable;
public interface HrpcService extends Serializable { }
|
Client 包
动态代理的具体实现方法,实际通过网络远程调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package vip.housir.hrpc.client.proxy;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import vip.housir.hrpc.core.HrpcRequest; import vip.housir.hrpc.core.HrpcResponse; import vip.housir.hrpc.client.transport.Transport;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method;
public class RequestInvocationHandler implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(RequestInvocationHandler.class);
private final Transport transport;
public RequestInvocationHandler(Transport transport) { this.transport = transport; }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
HrpcRequest request = new HrpcRequest(); request.setMethodName(method.getName()); request.setClassName(method.getDeclaringClass().getName()); request.setArgs(args); request.setTypes(method.getParameterTypes());
logger.debug(request.toString()); HrpcResponse response = transport.send(request); logger.debug(response.toString());
return response.getBody(); } }
|
代理工厂,创建代理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package vip.housir.hrpc.client.proxy;
import vip.housir.hrpc.client.transport.SocketTransport;
import java.lang.reflect.Proxy;
public class RequestProxyFactory {
@SuppressWarnings("unchecked") public static <T> T createProxy(final Class<T> clazz, SocketTransport transport) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, new RequestInvocationHandler(transport)); } }
|
网络请求的接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package vip.housir.hrpc.client.transport;
import vip.housir.hrpc.core.HrpcRequest; import vip.housir.hrpc.core.HrpcResponse;
public interface Transport {
HrpcResponse send(HrpcRequest request); }
|
网络请求接口的实现方式,通过 socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package vip.housir.hrpc.client.transport;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import vip.housir.hrpc.core.HrpcRequest; import vip.housir.hrpc.core.HrpcResponse;
import java.io.*; import java.net.Socket;
public class SocketTransport implements Transport {
private static final Logger logger = LoggerFactory.getLogger(SocketTransport.class);
private final String host;
private final int port;
public SocketTransport(String host, int port) { this.host = host; this.port = port; }
@Override public HrpcResponse send(HrpcRequest request) {
Socket socket = null; ObjectOutputStream outputStream = null; ObjectInputStream inputStream = null; HrpcResponse response = null; try { socket = new Socket(host, port); outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(request); outputStream.flush(); inputStream = new ObjectInputStream(socket.getInputStream()); response = (HrpcResponse) inputStream.readObject(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { try { if (inputStream != null) { inputStream.close(); } if (outputStream != null) { outputStream.close(); } if (socket != null) { socket.close(); } } catch (IOException e) { logger.error(e.getMessage(), e); } } return response; } }
|
Server 包
服务发布接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package vip.housir.hrpc.server.publisher;
import vip.housir.hrpc.core.HrpcService;
import java.util.Map;
public interface Publisher {
void publish(int port, Map<String, HrpcService> services);
void shutdown(); }
|
服务发布接口的实现,通过线程池来处理多客户端的请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package vip.housir.hrpc.server.publisher;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import vip.housir.hrpc.core.HrpcService; import vip.housir.hrpc.server.processor.ProcessorThread;
import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.Map; import java.util.concurrent.*;
public class SocketPublisher implements Publisher {
private static final Logger logger = LoggerFactory.getLogger(SocketPublisher.class);
private final ExecutorService executor;
public SocketPublisher() { executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); }
@Override public void publish(int port, Map<String, HrpcService> services) { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(port);
while (true) { Socket socket = serverSocket.accept(); executor.execute(new ProcessorThread(socket, services)); } } catch (IOException e) { logger.error(e.getMessage(), e); } finally { try { if (serverSocket != null) { serverSocket.close(); } } catch (IOException e) { e.printStackTrace(); } } }
@Override public void shutdown() { executor.shutdown(); } }
|
socket 通讯,接到请求后的具体实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package vip.housir.hrpc.server.processor;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import vip.housir.hrpc.core.HrpcRequest; import vip.housir.hrpc.core.HrpcResponse; import vip.housir.hrpc.core.HrpcService;
import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.Socket; import java.util.Map;
public class ProcessorThread implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(ProcessorThread.class);
private final Socket socket;
private final Map<String, HrpcService> services;
public ProcessorThread(Socket socket, Map<String, HrpcService> services) { this.socket = socket; this.services = services; }
@Override public void run() {
ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; try { inputStream = new ObjectInputStream(socket.getInputStream()); outputStream = new ObjectOutputStream(socket.getOutputStream());
HrpcRequest request = (HrpcRequest) inputStream.readObject(); logger.debug(request.toString());
Class<?> clazz = Class.forName(request.getClassName()); Method method = clazz.getMethod(request.getMethodName(), request.getTypes()); HrpcService service = services.get(request.getClassName()); Object body = method.invoke(service, request.getArgs());
HrpcResponse response = new HrpcResponse(); response.setBody(body); response.setStatusCode(200); logger.debug(response.toString());
outputStream.writeObject(response); outputStream.flush();
} catch (Exception e) { logger.error(e.getMessage(), e); } finally { try { if (inputStream != null) { inputStream.close(); } if (outputStream != null) { outputStream.close(); } socket.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } }
|
Demo 调用示例
业务 service 的接口,UserService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package vip.housir.hrpc.demo.service;
import vip.housir.hrpc.core.HrpcService;
public interface UserService extends HrpcService {
String login(String username, String password); }
|
业务 service 的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package vip.housir.hrpc.demo.service.impl;
import vip.housir.hrpc.demo.service.UserService;
public class UserServiceImpl implements UserService {
@Override public String login(String username, String password) { return "这是server端返回的结果:" + username + "@" + password; } }
|
HrpcClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package vip.housir.hrpc.demo;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import vip.housir.hrpc.demo.service.UserService; import vip.housir.hrpc.client.proxy.RequestProxyFactory; import vip.housir.hrpc.client.transport.SocketTransport;
public class HrpcClient {
private static final Logger logger = LoggerFactory.getLogger(HrpcClient.class);
public static void main(String[] args) { SocketTransport transport = new SocketTransport("localhost", 7788); UserService userService = RequestProxyFactory.createProxy(UserService.class, transport);
for (int i = 0; i < 1000; i++) { String token = userService.login("housir", "vip"); logger.info(token); } } }
|
HrpcServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package vip.housir.hrpc.demo;
import vip.housir.hrpc.demo.service.UserService; import vip.housir.hrpc.core.HrpcService; import vip.housir.hrpc.server.publisher.SocketPublisher; import vip.housir.hrpc.demo.service.impl.UserServiceImpl;
import java.util.HashMap; import java.util.Map;
public class HrpcServer {
public static void main(String[] args) {
Map<String, HrpcService> services = new HashMap<>(8); services.put(UserService.class.getName(), new UserServiceImpl());
SocketPublisher processor = new SocketPublisher(); processor.publish(7788, services); } }
|
输出结果
Server
Client
TODO
在之后的 Hrpc version 2.3.4……..n 中预计实现,可能版本号起的很随意哈哈
- 引入 Netty 进行 io 多路复用
- 支持服务的注册和发现
- 服务优雅的关闭
- 支持 bean 的容器化管理,实现通过注解注入依赖
- 支持多种类型序列化编码方式 json,xml,protobuf