2023-07-30  阅读(2)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

本章,我将对dfs-rpc这个模块进行讲解。该模块依赖gRPC,存放各个RPC服务的存根。我们的分布式文件系统的服务间的调用需要以gRPC作为通信组件,所以本章我会讲解gRPC的基本使用,但我不会对gRPC作深入介绍,gRPC的底层原理和各种高阶用法,读者可以参考其它资料。

一、dfs-rpc工程

gRPC本身支持不同语言,通过ProtoBuf编写的.proto文件,然后使用不同语言的编译器,可以生成特定语言相关的模板代码。由于我们的工程使用Java,所以需要使用grpc-java

Protobuf是一种语言中立、平台无关、可扩展的 序列化数据的格式 ,可用于通信协议,数据存储等。我们的系统中涉及的服务接口都需要通过protobuf来生成。

1.1 引入依赖

首先,新建一个Maven工程dfs-rpc,在pom中引入以下依赖。注意,我这里使用了protobuf-maven-plugin插件,用来将.proto文件转化成 Java 模板代码。protobuf-maven-plugin插件内部整合了protoc编译器,读者也可以自己下载protoc编译器,手动将.proto文件转化为Java代码:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <groupId>com.tpvlog.dfs</groupId>
          <artifactId>dfs-rpc</artifactId>
          <version>0.0.1-SNAPSHOT</version>
          <packaging>jar</packaging>
    
          <name>dfs-rpc</name>
          <url>http://maven.apache.org</url>
    
          <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          </properties>
    
        <dependencies>
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-netty-shaded</artifactId>
                <version>1.35.0</version>
            </dependency>
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-protobuf</artifactId>
                <version>1.35.0</version>
            </dependency>
            <dependency>
                <groupId>io.grpc</groupId>
                <artifactId>grpc-stub</artifactId>
                <version>1.35.0</version>
            </dependency>
            <dependency> <!-- necessary for Java 9+ -->
                <groupId>org.apache.tomcat</groupId>
                <artifactId>annotations-api</artifactId>
                <version>6.0.53</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
        <build>
            <extensions>
                <extension>
                    <groupId>kr.motd.maven</groupId>
                    <artifactId>os-maven-plugin</artifactId>
                    <version>1.6.2</version>
                </extension>
            </extensions>
            <plugins>
                <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.6.1</version>
                    <configuration>
                        <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
                        <pluginId>grpc-java</pluginId>
                        <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.35.0:exe:${os.detected.classifier}</pluginArtifact>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>compile-custom</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>

1.2 接口编写

服务接口的编写使用ProtoBuf的标准语法,我们编写一个NameNodeServiceProto.proto文件,里面定义 NameNode 提供的各个服务接口:

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_outer_classname = "NameNodeServiceProto";
    
    service NameNodeService {
        rpc register(RegisterRequest) returns (RegisterResponse){}
        rpc heartbeat(HeartbeatRequest) returns (HeartbeatResponse){}
        rpc mkdir(MkDirRequest) returns (MkDirResponse){}
        rpc shutdown(ShutdownRequest) returns (ShutdownResponse){}
        rpc fetchEditsLog(FetchEditsLogRequest) returns (FetchEditsLogResponse){}
        rpc updateCheckpointTxid(UpdateCheckpointTxidRequest) returns (UpdateCheckpointTxidResponse){}
    }
    
    message RegisterRequest{
        string ip  = 1;
        string hostname  = 2;
    }
    message RegisterResponse{
        int32 status  = 1;
    }
    message HeartbeatRequest{
        string ip  = 1;
        string hostname  = 2;
    }
    message HeartbeatResponse{
        int32 status  = 1;
    }
    message MkDirRequest{
        string path  = 1;
    }
    message MkDirResponse{
        int32 status  = 1;
    }
    
    message ShutdownRequest{
        int32 code  = 1;
    }
    message ShutdownResponse{
        int32 status  = 1;
    }
    message FetchEditsLogRequest{
        int64 syncedTxid  = 1;
    }
    message FetchEditsLogResponse{
        string editsLog  = 1;
    }
    message UpdateCheckpointTxidRequest{
        int64 txid  = 1;
    }
    message UpdateCheckpointTxidResponse{
        int32 status  = 1;
    }

1.3 Java代码生成

我们把上面的.proto文件存放到项目的/src/main/proto目录下,然后 cd 到项目根目录,执行mvn clean compile命令:

    C:\Users\Ressmix\Desktop\source-code\dfs\dfs-rpc>mvn clean compile
    [INFO] Scanning for projects...
    [INFO] ------------------------------------------------------------------------
    [INFO] Detecting the operating system and CPU architecture
    [INFO] ------------------------------------------------------------------------
    [INFO] os.detected.name: windows
    [INFO] os.detected.arch: x86_64
    [INFO] os.detected.version: 10.0
    [INFO] os.detected.version.major: 10
    [INFO] os.detected.version.minor: 0
    [INFO] os.detected.classifier: windows-x86_64
    [INFO]
    [INFO] -----------------------< com.tpvlog.dfs:dfs-rpc >-----------------------
    [INFO] Building dfs-rpc 0.0.1-SNAPSHOT
    [INFO] --------------------------------[ jar ]---------------------------------
    [INFO]
    [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ dfs-rpc ---
    [INFO] Deleting C:\Users\Ressmix\Desktop\source-code\dfs\dfs-rpc\target
    [INFO]
    [INFO] --- protobuf-maven-plugin:0.6.1:compile (default) @ dfs-rpc ---
    [INFO] Compiling 1 proto file(s) to C:\Users\Ressmix\Desktop\source-code\dfs\dfs-rpc\target\generated-sources\protobuf\java
    [INFO]
    [INFO] --- protobuf-maven-plugin:0.6.1:compile-custom (default) @ dfs-rpc ---
    [INFO] Compiling 1 proto file(s) to C:\Users\Ressmix\Desktop\source-code\dfs\dfs-rpc\target\generated-sources\protobuf\grpc-java
    [INFO]
    [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ dfs-rpc ---
    [INFO] Using 'UTF-8' encoding to copy filtered resources.
    [INFO] skip non existing resourceDirectory C:\Users\Ressmix\Desktop\source-code\dfs\dfs-rpc\src\main\resources
    [INFO] Copying 1 resource
    [INFO] Copying 1 resource
    [INFO]
    [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ dfs-rpc ---
    [INFO] Changes detected - recompiling the module!
    [INFO] Compiling 48 source files to C:\Users\Ressmix\Desktop\source-code\dfs\dfs-rpc\target\classes
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  7.920 s
    [INFO] Finished at: 2021-02-19T16:29:49+08:00
    [INFO] ------------------------------------------------------------------------

完成后会在/target/generated-sources/protobuf/java/目录下看到一堆生成的Java模板代码:

202307302136263731.png

1.4 测试使用

我们把这些代码拷贝到dfs-rpc工程的com.tpvlog.dfs.rpc.service包下:

202307302136272082.png

然后来测试一下,先写一个服务端测试类:

    package com.tpvlog.dfs.rpc.service;
    
    import io.grpc.Server;
    import io.grpc.ServerBuilder;
    import io.grpc.stub.StreamObserver;
    
    import java.io.IOException;
    
    public class GrpcServerTest {
        private final int port = 19080;
        private Server server;
    
        public static void main(String[] args) throws IOException, InterruptedException {
            final GrpcServerTest grpcServer = new GrpcServerTest();
            grpcServer.start();
            grpcServer.blockUntilShutdown();
        }
    
        private void start() throws IOException {
            server = ServerBuilder.forPort(port).addService(new NameNodeServiceImpl()).build().start();
            System.out.println("------- NameNodeService Started -------");
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    System.err.println("------shutting down gRPC server since JVM is shutting down-------");
                    GrpcServerTest.this.stop();
                    System.err.println("------server shut down------");
                }
            });
        }
    
        /**
         * 服务接口实现类
         */
        private class NameNodeServiceImpl extends NameNodeServiceGrpc.NameNodeServiceImplBase {
            public void register(RegisterRequest request, StreamObserver<RegisterResponse> responseObserver) {
                RegisterResponse response = RegisterResponse.newBuilder().setStatus(200).build();
                // onNext()方法向客户端返回结果
                responseObserver.onNext(response);
                // 告诉客户端这次调用已经完成
                responseObserver.onCompleted();
            }
    
            public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
                HeartbeatResponse response=HeartbeatResponse.newBuilder().setStatus(200).build();
                responseObserver.onNext(response);
                responseObserver.onCompleted();
            }
        }
    
        private void stop() {
            if (server != null) {
                server.shutdown();
            }
        }
    
        private void blockUntilShutdown() throws InterruptedException {
            if (server != null) {
                server.awaitTermination();
            }
        }
    }

再写一个客户端测试类:

    package com.tpvlog.dfs.rpc.service;
    
    import io.grpc.ManagedChannel;
    import io.grpc.ManagedChannelBuilder;
    
    import java.util.concurrent.TimeUnit;
    
    public class GrpcClientTest {
        private final ManagedChannel channel;
        private final NameNodeServiceGrpc.NameNodeServiceBlockingStub blockingStub;
    
        private static final String host = "127.0.0.1";
        private static final int ip = 19080;
    
        public GrpcClientTest(String host, int port) {
            // usePlaintext表示明文传输,否则需要配置ssl
            // channel表示通信通道
            channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
            // 存根
            blockingStub = NameNodeServiceGrpc.newBlockingStub(channel);
        }
    
        public void shutdown() throws InterruptedException {
            channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
        }
    
        public void testHeartbeat(String name) {
            HeartbeatRequest request = HeartbeatRequest.newBuilder().setIp("127.0.0.1").setHostname("localhost").build();
            HeartbeatResponse response = blockingStub.heartbeat(request);
            System.out.println(name + ": " + response.getStatus());
        }
    
        public void testRegister(String name) {
            RegisterRequest request = RegisterRequest.newBuilder().setIp("127.0.0.1").setHostname("localhost").build();
            RegisterResponse response = blockingStub.register(request);
            System.out.println(response.getStatus());
        }
    
        public static void main(String[] args) {
            GrpcClientTest client = new GrpcClientTest(host, ip);
            for (int i = 0; i <= 5; i++) {
                client.testHeartbeat("<<<<<Heartbeat result>>>>>-" + i);
            }
    
            for (int i = 0; i <= 5; i++) {
                client.testHeartbeat("<<<<<Register result>>>>>-" + i);
            }
        }
    }

最后,先启动服务端,然后启动客户端,输出如下,说明RPC调用成功了:

    ------- NodeRegisterService Started -------
    
    <<<<<Heartbeat result>>>>>-0: 200
    <<<<<Heartbeat result>>>>>-1: 200
    <<<<<Heartbeat result>>>>>-2: 200
    <<<<<Heartbeat result>>>>>-3: 200
    <<<<<Heartbeat result>>>>>-4: 200
    <<<<<Heartbeat result>>>>>-5: 200
    <<<<<Register result>>>>>-0: 200
    <<<<<Register result>>>>>-1: 200
    <<<<<Register result>>>>>-2: 200
    <<<<<Register result>>>>>-3: 200
    <<<<<Register result>>>>>-4: 200
    <<<<<Register result>>>>>-5: 200

二、总结

本章,我完成了dfs-rpc工程的构建,同时我对gRPC的基本使用进行了讲解,后续这个工程会存放我们所有的gRPC模板代码和.proto文件,其它涉及gRPC服务调用的工程都需要依赖该工程。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文