본문 바로가기

네트워크 통신/grpc

4. grpc의 여러가지 통신 기법

grpc는 4개의 통신을 지원한다.

    - unary (1개 request , 1개 respone)

    - server stream (1개 request, n개 response)

    - client stream (n개 request, 1개 response)

    - bi stream (n개 request, n개 response)

그리고 클라이언트에서 4가지 통신을 3가지 방법으로 콜 할 수 있다.

    - blocking

    - asyn

    - future

 

그러면 모든 통신의 경우의 수는 12가지이다.

하지만 request가 n개 일때는 asyn만 지원하고, response가 n개 일 때는 future을 지원하지 않는다. 따라서 총 7가지가 존재한다.

  unary server stream client stream bi stream
blocking o o x x
asyn o o o o
future o x x x

 

위의 7가지를 테스트 하는 코드이다.

1. proto

syntax = "proto3";
option java_multiple_files = true;
package com.example.grpc_test.proto;

message HelloRequest {
    string firstName = 1;
    string lastName = 2;
}

message HelloResponse {
    string greeting = 1;
}

service HelloService {
    rpc hello(HelloRequest) returns (HelloResponse);
    rpc helloServerStream(HelloRequest) returns (stream HelloResponse);
    rpc helloClientStream(stream HelloRequest) returns (HelloResponse);
    rpc helloBiStream(stream HelloRequest) returns (stream HelloResponse);
}

 

2. 서버 코드

@GrpcService
public class HelloServiceImpl extends HelloServiceImplBase {

  // unary
  @Override
  public void hello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {

    String greeting = request.getFirstName() + "," + request.getLastName();

    HelloResponse response = HelloResponse.newBuilder()
        .setGreeting(greeting)
        .build();

    responseObserver.onNext(response);
    responseObserver.onCompleted();
  }

  // server stream
  @Override
  public void helloServerStream(HelloRequest request,
      StreamObserver<HelloResponse> responseObserver) {

    List<String> greetingList = new ArrayList<>();
    for (int i = 1; i <= 3; i++) {
      greetingList.add(request.getFirstName() + "," + request.getLastName() + ":" + i);
    }

    for (String greeting : greetingList) {
      HelloResponse response = HelloResponse.newBuilder()
          .setGreeting(greeting)
          .build();
      responseObserver.onNext(response);
    }

    responseObserver.onCompleted();
  }

  // client stream
  @Override
  public StreamObserver<com.example.grpc_test.proto.HelloRequest> helloClientStream(
      StreamObserver<HelloResponse> responseObserver) {
    return new StreamObserver<HelloRequest>() {
      @Override
      public void onNext(HelloRequest helloRequest) {
        System.out.println(helloRequest.getFirstName() + "," + helloRequest.getLastName());
        // 주의 : 여기서 responseObserver.onNext를 쓰면 biStream이 되버림
      }

      @Override
      public void onError(Throwable throwable) {
        System.out.println("error");
      }

      @Override
      public void onCompleted() {
        responseObserver.onNext(HelloResponse.newBuilder().setGreeting("success").build());
        responseObserver.onCompleted();
      }
    };
  }

  // bi stream
  @Override
  public StreamObserver<HelloRequest> helloBiStream(
      StreamObserver<HelloResponse> responseObserver) {
    return new StreamObserver<HelloRequest>() {
      @Override
      public void onNext(HelloRequest helloRequest) {
        String greeting = helloRequest.getFirstName() + "," + helloRequest.getLastName();
        System.out.println(greeting);

        responseObserver.onNext(HelloResponse.newBuilder().setGreeting(greeting+"1").build());
        responseObserver.onNext(HelloResponse.newBuilder().setGreeting(greeting+"2").build());
      }

      @Override
      public void onError(Throwable throwable) {
        System.out.println("error");
      }

      @Override
      public void onCompleted() {
        responseObserver.onCompleted();
      }
    };
  }
}

 

3. 클라이언트 코드

 - grpc caller 구현부분

    - channel로 서버에 연결하고 stub 객체로 통신한다.

public class GrpcCaller {

  private ManagedChannel channel;
  private HelloServiceBlockingStub blockingStub;
  private HelloServiceStub asynStub;
  private HelloServiceFutureStub futureStub;

  public GrpcCaller(String domain, int port) {
    channel = ManagedChannelBuilder.forAddress(domain, port)
        .usePlaintext()
        .build();
    blockingStub = HelloServiceGrpc.newBlockingStub(channel);
    asynStub = HelloServiceGrpc.newStub(channel);
    futureStub = HelloServiceGrpc.newFutureStub(channel);
  }

  public void sendBlockingUnary(HelloRequest request) {
    System.out.println("step1 : client 1 server 1 blocking");
    HelloResponse helloResponse = blockingStub.hello(request);
    System.out.println("step1 결과 : " + helloResponse.getGreeting());
    System.out.println("step1 끝");
  }

  public void sendAsynUnary(HelloRequest request) {
    System.out.println("step2 : client1 server 1 asyn");
    asynStub.hello(request, new StreamObserver<HelloResponse>() {
      @Override
      public void onNext(HelloResponse helloResponse) {
        System.out.println("step2 결과 : " + helloResponse.getGreeting());
      }

      @Override
      public void onError(Throwable throwable) {
        System.out.println("error");
      }

      @Override
      public void onCompleted() {
        System.out.println("step2 통신 끝");
      }
    });
    System.out.println("step2 끝");
  }

  public void sendFutureUnary(HelloRequest request) {
    System.out.println("step3 : client 1 server 1 future");
    ListenableFuture<HelloResponse> future = futureStub.hello(request);
    HelloResponse response = null;
    try {
      response = future.get(2, TimeUnit.SECONDS);
    } catch (Exception e) {
      e.printStackTrace();
    }
    System.out.println("step3 결과 : " + response.getGreeting());
    System.out.println("step3 끝");
  }

  public void sendBlockingServerStream(HelloRequest request) {
    System.out.println("step4 : client 1 server n blocking");
    Iterator<HelloResponse> responseIter = blockingStub.helloServerStream(request);
    responseIter.forEachRemaining(response -> {
      System.out.println("step4 결과 : " + response.getGreeting());
    });
    System.out.println("step4 끝");
  }

  public void sendAsynServerStream(HelloRequest request) {
    System.out.println("step5 : client 1 server n asyn");
    asynStub.helloServerStream(request, new StreamObserver<HelloResponse>() {
      @Override
      public void onNext(HelloResponse helloResponse) {
        System.out.println("step5 결과 : " + helloResponse.getGreeting());
      }

      @Override
      public void onError(Throwable throwable) {
        System.out.println("error");
      }

      @Override
      public void onCompleted() {
        System.out.println("step5 통신 끝");
      }
    });
    System.out.println("step5 끝");
  }

  public void sendAsynClientStream(List<HelloRequest> requestList) {
    System.out.println("step6 : client n server 1 asyn");
    StreamObserver<HelloResponse> responseObserver = new StreamObserver<HelloResponse>() {
      @Override
      public void onNext(HelloResponse helloResponse) {
        System.out.println("step6 결과 : " + helloResponse.getGreeting());
      }

      @Override
      public void onError(Throwable throwable) {
        System.out.println("error");
      }

      @Override
      public void onCompleted() {
        System.out.println("step6 통신 끝");
      }
    };
    StreamObserver<HelloRequest> requestObserver = asynStub.helloClientStream(responseObserver);
    for (HelloRequest request : requestList) {
      requestObserver.onNext(request);
    }
    requestObserver.onCompleted();
    System.out.println("step6 끝");
  }

  public void sendAsynBiStream(List<HelloRequest> requestList) {
    System.out.println("step7 client n server n asny");
    StreamObserver<HelloResponse> responseObsever = new StreamObserver<HelloResponse>() {
      @Override
      public void onNext(HelloResponse helloResponse) {
        System.out.println("step7 결과 : " + helloResponse.getGreeting());
      }

      @Override
      public void onError(Throwable throwable) {
        System.out.println("error");
      }

      @Override
      public void onCompleted() {
        System.out.println("step7 통신 끝");
      }
    };

    StreamObserver<HelloRequest> requestObsever = asynStub.helloBiStream(responseObsever);

    for (HelloRequest request : requestList) {
      requestObsever.onNext(request);
    }

    requestObsever.onCompleted();
    System.out.println("step7 끝");
  }
}

- grpc caller 호출부분

public class CallTest {

  public static void main(String[] args) {
    // channel, stub 준비
    GrpcCaller caller = new GrpcCaller("localhost", 9090);

    // Request들 준비
    List<HelloRequest> requestList = new ArrayList<>();
    requestList.add(HelloRequest.newBuilder().setFirstName("a").setLastName("aa").build());
    requestList.add(HelloRequest.newBuilder().setFirstName("b").setLastName("bb").build());
    requestList.add(HelloRequest.newBuilder().setFirstName("c").setLastName("cc").build());

    caller.sendBlockingUnary(requestList.get(0));
    caller.sendAsynUnary(requestList.get(0));
    caller.sendFutureUnary(requestList.get(0));
    caller.sendBlockingServerStream(requestList.get(0));
    caller.sendAsynServerStream(requestList.get(0));
    caller.sendAsynClientStream(requestList);
    caller.sendAsynBiStream(requestList);
  }
}

 

client 출력 결과

step1 : client 1 server 1 blocking
step1 결과 : a,aa
step1 끝

step2 : client1 server 1 asyn
step2 끝

step3 : client 1 server 1 future
step2 결과 : a,aa
step2 통신 끝
step3 결과 : a,aa
step3 끝

step4 : client 1 server n blocking
step4 결과 : a,aa:1
step4 결과 : a,aa:2
step4 결과 : a,aa:3
step4 끝

step5 : client 1 server n asyn
step5 끝

step6 : client n server 1 asyn
step6 끝

step7 client n server n asny
step7 끝

step7 결과 : a,aa1
step7 결과 : a,aa2
step6 결과 : success
step6 통신 끝
step7 결과 : b,bb1
step7 결과 : b,bb2
step7 결과 : c,cc1
step7 결과 : c,cc2
step7 통신 끝
step5 결과 : a,aa:1
step5 결과 : a,aa:2
step5 결과 : a,aa:3
step5 통신 끝

'네트워크 통신 > grpc' 카테고리의 다른 글

3. spring boot에서 grpc 사용하기  (379) 2021.04.20
2. proto3 언어  (0) 2021.04.11
1. grpc 란  (364) 2021.04.04