본문 바로가기
Back-end/Spring boot

코인 모의투자 프로젝트(2) - Upbit Websocket 연결

by 악어코딩 2024. 4. 17.

우선 Upbit로부터 데이터를 받아야 뭐든지 만들 수가 있다.

그런데 어떻게 유저의 요청 없이 서버가 시작되면 Upbit에 요청을 보낼 수 있을까?

 

1. main 메서드에 삽입한다.

public static void main(String[] args) {
    SpringApplication.run(CoinSimulationMvcApplication.class, args);
    new Thread(() -> System.out.println("aaaaaaaaaaa")).start();
}

그럼 스프링 시작 후에 실행시킬 수 있다.

 

2. EventListener를 사용한다.

@EventListener(ApplicationReadyEvent.class)
public void upbitConnect() {
    System.out.println("hi");
}

ApplicationReadyEvent는 어플리케이션이 준비가 됐을 때 일어나는 이벤트이다.

이렇게 하면 확실하게 준비된 후에 시작할 수 있다.

 

이제 그러면 Upbit API에 대해서 알아봐야 한다.

Trade를 받는 예제

 

Request를 만들기 위해 대충 이렇게 만들고..

@UtilityClass
public class UpbitUtils {
    private final ObjectMapper camelOM = new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategies.LOWER_CAMEL_CASE);


    public String makeBody(UpbitRequestType type) throws JsonProcessingException {
        TicketData ticketData = createTicketData();
        TickerData tickerData = createTickerData(type);
        FormatData formatData = createFormatData();

        List<Object> dataList = Arrays.asList(ticketData, tickerData, formatData);

        return convertToJson(dataList);
    }

    private TicketData createTicketData() {
        return new TicketData("test example");
    }

    private TickerData createTickerData(UpbitRequestType type) {
        TickerData tickerData = new TickerData();
        tickerData.setType(type.getValue());
        tickerData.setCodes(Collections.singletonList("KRW-BTC"));
        tickerData.setOnlySnapshot(false);
        tickerData.setOnlyRealtime(true);
        return tickerData;
    }

    private FormatData createFormatData() {
        return new FormatData("DEFAULT");
    }

    private String convertToJson(List<Object> dataList) throws JsonProcessingException {
        return camelOM.writeValueAsString(dataList);
    }
}

 

Response를 받기 위해 DTO도 만들어준다.

@Data
public class Trade {
    private String type;
    private String code;
    private Double tradePrice;
    private Double tradeVolume;
    private Gubun askBid;
    private Double prevClosingPrice;
    private String change;
    private Double changePrice;
    private String tradeDate;
    private String tradeTime;
    private Long tradeTimestamp;
    private Long timestamp;
    private Long sequentialId;
    private String streamType;
}

 

그리고 Upbit에 WebSocket으로 연결하기 위해서 WebSocketClient라는 것을 사용할 것이다.

 

WebSocketClient는 쉽게 다른 서버의 WebSocket과 연결할 수 있게 도와주는 인터페이스인데, WebSocketHandler와 같이 사용하면 된다.

 

WebSocketHandler는 무엇이냐 하면.. 설명보다는 코드로 보는게 나을 것 같다.

WebSocketHandler를 abstract하게 구현한 AbstractWebSocketHandler를 보면 이런 메서드들이 있다.

@Override
public void afterConnectionEstablished(WebSocketSession session) Ωthrows Exception {
}

protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
}

protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
}

메서드 명만 봐도 의미가 아주 쉽게 전달된다!(나도 이렇게 개발해야하는데 ㅠㅠ)

이 메서드들을 Override해서 원하는 로직으로 구현하면 WebSocket이 연결됐을 때, 데이터를 받았을 때를 원하는 대로 개발할 수 있다!

이 AbstractWebSocketHandler를 상속받아 구현한 두 개의 클래스가 있는데, BinaryWebSocketHandler와 TextWebSocketHandler이다.

각각 클래스명대로 binary 데이터를 받는 지 Text 데이터를 받는 지 확인해서 해당 클래스를 상속받아 원하는 대로 구현하면 된다.

(BinaryWebSocketHandler의 경우에는 handleBinaryMessage를 Override하고, TextWebSocketHandler의 경우에는 handleTextMessage를 Override하면 된다.)

 

Upbit에서는 Response가 Binary형식으로 오기 때문에 BinaryWebSocketHandler를 상속받아서 구현해준다.

@Slf4j
@RequiredArgsConstructor
@Component
public class UpbitTradeHandler extends BinaryWebSocketHandler {
    private final ObjectMapper snakeObjectMapper;


    /**
     * 웹소켓 연결이 설정된 직후 실행되는 메서드입니다.
     * 실시간 시세 정보를 요청합니다.
     *
     * @param session 웹소켓 세션
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws IOException {
        session.sendMessage(new TextMessage(UpbitUtils.makeBody(UpbitRequestType.TRADE)));
    }

    /**
     * 웹소켓으로부터 메시지를 받았을 때 처리하는 메서드입니다.
     *
     * @param session 웹소켓 세션
     * @param message 받은 메시지
     */

    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
        String convertedMessage = new String(message.getPayload().array(), StandardCharsets.UTF_8);
        log.info(snakeObjectMapper.readValue(convertedMessage, Trade.class).toString());
    }

}

 

나는 WebSocketClient를 구현한 StandardWebSocketClient를 사용할 예정인데, 이 클래스를 조금 뜯어보면

Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session);
Callable<WebSocketSession> connectTask = () -> {
	this.webSocketContainer.connectToServer(endpoint, endpointConfig, uri);
	return session;
};
if (this.taskExecutor != null) {
	return FutureUtils.callAsync(connectTask, this.taskExecutor);
}
else {
	return FutureUtils.callAsync(connectTask);
}

execute 메서드의 파라미터로 받은 WebSocketHandler로 새로운 StandardWebSocketHandlerAdapter를 만든다음 FutureUtils.callAsync 메서드를 태워버린다!

(taskExecutor가 null이면 defaultExecutor를 사용한다!)

쉽게 설명하자면 WebSocketClient를 하나만 만들어놓고 써도 된다는 말이다. 그래서 Config에 WebSocketClient를 Bean으로 등록해서 재사용을 한다.

@Configuration
public class WebSocketClientConfig {

    @Bean
    public WebSocketClient webClient() {
        return new StandardWebSocketClient();
    }

    @Bean
    public ObjectMapper snakeObjectMapper() {
        return new ObjectMapper()
                .registerModule(new JavaTimeModule())
                .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
    }
}

(Response가 snake형식으로 오기 때문에 ObjectMapper도 Bean을 등록해준다)

 

connection이 establish되면 Request를 보내고, Response를 log로 내보낸다. 그렇게 실행을 누르면!

 

 

응답을 잘 받아오는 것을 확인할 수 있다!

 

이렇게 Trade, Orderbook, Ticker를 똑같이 만들어주면

/**
 * 애플리케이션이 준비되면 업비트 웹소켓에 연결합니다.
 */
@EventListener(ApplicationReadyEvent.class)
public void upbitConnect() {
    webSocketClient.execute(upbitOrderBookHandler, UPBIT_WEBSOCKET_URI);
    webSocketClient.execute(upbitTickerHandler, UPBIT_WEBSOCKET_URI);
    webSocketClient.execute(upbitTradeHandler, UPBIT_WEBSOCKET_URI);
}

서버가 실행되면 Upbit에 자동으로 연결해서 데이터를 받아올 수 있다.

 

자세한 코드는 여기서 확인할 수 있다!

https://github.com/dkrdj/coin-simulation-mvc