10/26/2023

Socket.IO, HTTP 폴링, 쿠키, Redis 어댑터

발단

실시간 경매방을 구현하기 위해 SocketIO 라이브러리를 사용했는데 모르는 내용과 실수가 넘쳐나서 정말 힘들었다... 하지만 역시 결과는 항상 달콤하다! 물론 그 과정은 스트레스도 많고 좌절도 많다... 처음부터 하나씩 살펴보자~~

위코드 2차 프로젝트 당시 WebSocket을 사용했는데 앞서 말한 것처럼 리팩토링 시 SocketIO를 사용했다. 둘의 차이점은 무엇일까? 일단 SocketIO는 앞서 말한 것처럼 라이브러리다. 그럼 WebSocket은 뭐냐? WebSocket은 프로토콜이며 SocketIO에서 transports 옵션에서 이를 명시할 수 있다. SocketIO는 기본적으로 HTTP 요청을 연속적으로 보내는 HTTP Polling을 사용한다. SocketIO도 프로토콜인 줄 알았는데 SocketIO 공식 문서에 친절하게 차이점을 설명한다! 역시 모르거나 헷갈리면 시간을 내서 배우면 된다!

나는 NestJS 공식 문서에 WEBSOCKETS의 Gateway를 보면서 구현을 시작했다. 지금 생각해보면 문서에 지원하는 WebSocket 플랫폼이 SocketIO와 WS(SocketIO처럼 라이브러리)인데 사실 SocketIO의 HTTP Polling은 정확하게 말하면 WebSocket처럼 실시간은 아니다. 매우 근접하게 실시간을 모방한다고 해야 할까?

@WebsocketGateway() 데코레이터를 클래스에 붙이고 OnGatewayConnection, OnGatewayDisconnet 인터페이스를 추가해서 handleConnection()과 handleDisconnet() 메서드를 구현했고 경매방 참가, 떠나기, 메시지 기능을 담당하는 메서드를 추가했다. 여기까지는 큰 어려움이 없었다. 사실 SocketIO 공식 문서를 보면 서버 소켓/클라이언트 소켓으로 나누어서 메서드, 속성, 이벤트 등 정보가 많다. 처음 읽다보면 멍 해진다~


소켓 아이디

근데 한 가지 걸리는 점이 있었다. 클라이언트 소켓 아이디와 사용자 아이디를 저장하는 Map이었다. Map을 사용해서 연결을 관리하는 부분이 너무 허술했다. 이 방식은 위코드 2차 프로젝트에도 사용했는데 그때도 이와 같은 생각이 들었다. 연결이 많아질수록 Map의 크기는 계속 커져서 성능에 악영향을 줄 것이고 무엇보다도 만약에 서버가 중지된다면 모든 정보는 증발한다!(현재 내가 구현한 Map은 애플리케이션의 메모리를 인 메모리 캐시로 사용하기 때문에 SocketIO 공식 문서의 내용에 따라서 Redis와 같은 인 메모리 데이터 저장소로 변경해야 한다.) 그러나 더 큰제는 문제는 다음이다. SocketIO 공식 문서를 보다가 이와 관련된 부분을 읽게 되었는데 바로 소켓 아이디가 일시적(ephemeral)이기 때문에 사용하면 안된다는 것이다! 왜? 재연결 시 새로운 소켓 아이디가 생성되고 두 개의 브라우저 탭은 다른 소켓 아이디를 가지며 서버에 소켓 아이디를 저장하는 메시지 큐가 없어서 클라이언트 연결이 끊어지면 서버에서 이 소켓 아이디로 전달된 메시지는 유실된다. 따라서 쿠키 혹은 로컬 스토리지에 저장된 세션 아이디로 사용해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@WebSocketGateway({
  // 운영 환경에서는 CORS를 비활성화 하지만 개발 환경에서는 호스트는 127.0.0.1 또는 localhost만 CORS 활성화한다.
  cors: {
    origin:
      process.env.NODE_ENV === 'production'
        ? false
        : ['http://127.0.0.1:3000''http://localhost:3000'],
    // 쿠키 사용 시 필요한 속성.
    credentials: true,
  },
})
export class BidsGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  // 정말 허술하기 짝이 없는 소켓 관리! 성능은 말할 것도 없고 서버 중지되면 휘발성이라서 모든 정보가 휙~
  // 소켓의 고유 식별자인 소켓 아이디는 일시적이라서 더 큰 문제!
  connections: Map<string, MongooseSchema.Types.ObjectId> = new Map();
 
  ...
}
cs

쿠키

처음에는 도대체 무슨 의미인지 잘 이해가 되지 않았는데 새로고침을 하고 새로운 브라우저 탭을 열였는데 그 의미를 바로 알았다. 클라이언트가 재연결하면 정말 새로운 소켓 아이디(즉, 새로운 클라이언트 소켓)가 생성되었다! 다행히 SocketIO 공식 문서에 구현 방법을 설명하는 두 개의 링크가 있다. Private Messaging 예제가 구체적인 설명과 코드를 제공해서 큰 도움이 되었다. 예제의 1부를 읽은 다음 문제의 2부가 등장했다. 예제는 로컬 스토리지에 세션 아이디를 저장하는 방식이다. 로컬 스토리지를 사용하면 재연결하고 페이지 새로고침 시 세션이 유지되며 이 세션은 브라우저 탭 간에 공유된다. 근데 다수의 노드(즉, 서버)와 HTTP 폴링을 사용하는 경우 스티키 세션을 활성화 하기 위해 쿠키를 사용하라고 권장해서 쿠키를 사용하기로 결정했다!(WebSocket 프로토콜을 사용하면 전체 세션 동안 하나의 TCP 연결을 사용하기 때문에 스티키 세션이 필요없다.)
새로고침 시 참가 메시지가 새로 생성되고 브라우저 탭을 열면 다음과 같은 현상 발생!


handleConnection()

예제의 2부를 읽은 다음 쿠키를 적용하기 위해 링크를 클릭했는데 핸드셰이크(즉, 세션의 첫번째 요청)에 서버가 쿠키를 보낸다고 설명한다. 이를 위해 코드에 적용하려고 했는데 여기서 큰 난관에 봉착했다... handleConnection() 메서드에서 이를 적용해야 한다고 생각해 코드를 이리저리 수정했는데 도저히 쿠키를 보낼 수 없었다! 근데 이 메서드의 매개변수를 보면 여기서 쿠키를 설정할 수 없다는 것을 바로 알 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 매개변수가 서버가 아니라 클라이언트!!! 당연히 쿠키를 보낼 수 없다~
async handleConnection(client: Socket) {
  try {
 
    const email = client.handshake.query.email;
    const cookie = client.handshake.headers.cookie;
 
    const { access_token: accessToken } = parse(cookie);
 
    if (!accessToken) {
      throw new UnauthorizedException(
        '연결하려면 유효한 토큰을 제공.',
      );
    }
 
    let itemId: unknown = null;
    itemId = client.handshake.query.id;
 
    if (!itemId) {
      throw new NotFoundException('아이디에 해당하는 상품 없음.');
    }
 
    const payload = await this.authService.verifyToken(accessToken);
    const user = payload && (await this.usersService.findById(payload.id));
 
    if (!user) {
      throw new UnauthorizedException('토큰에 해당하는 사용자 없음.');
    }
 
    const room = await this.roomsSerivce.findByItemId(
      itemId as MongooseSchema.Types.ObjectId,
    );
 
    this.connections.set(client.id, user._id);
 
    if (room) {
      await this.joinRoom(client, user, room._id);
    } else {
      throw new NotFoundException('아이디에 해당하는 경매방 없음.');
    }
  } catch (error) {
    client.emit('error', { message: error.message });
    this.connections.delete(client.id);
    client.disconnect(true);
  }
}
cs


afterInit()

NestJS 공식 문서를 다시 읽으면서 OnGatewayInit 인터페이스의 afterInit() 메서드에서 이를 수정해야 한다고 생각했다. 매개변수가 서버이고 설명에서 서버 관련 메서드라고 적혀있기 때문이다. 예제에 적힌 방식대로 use 메서드를 사용해서 쿠키가 있으면 next() 메서드를 바로 호출하고 없으면 쿠키를 생성하는 코드를 구현했는데 쿠키가 생성되지 않았다. 그리고 절망의 나락으로 빠져버렸다...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// handleConnection() 메서드와 달리 매개변수가 서버라서 여기서 쿠키를 설정할 수 있을 줄 알았는데 아니였다... 그리고 다시 절망으로...
afterInit(server: Server) {
  server.use((socket, next) => {
    const cookie = socket.handshake.headers.cookie;
    const { socket_session_id: socketSessionId } = parse(cookie);
 
    if (socketSessionId) {
      next();
    }
 
    const newCookie = serialize(`session_id`, randomUUID(), {
      path: '/',
      httpOnly: true,
      secure: false,
      sameSite: 'strict',
      expires: new Date(Date.now() + 1800000),
    });
    socket.handshake.headers['set-cookie'= [newCookie];
 
    next();
  });
}
cs

망할 쿠키가 생성이 안되네...

이 방식마저 작동하지 않아서 좌절과 절망에 빠졌다... 몇 일 동안 새벽 2시까지 코드를 수정하고 이리저리 찾아봤지만 쿠키는 생성되지 않았다. 그러다가 NestJS 공식 문서의 Adapters에서 해결책의 실마리를 발견했다!! 그 때의 기쁨이란... IoAdapter 어댑터를 상속해서 로드 밸런서에 쿠키 기반 라우팅을 활성화할 수 있다고 적혀있다. 앞서말한 HTTP 폴링 기반 쿠키를 통한 스티키 세션을 활성화하기 위한 내용이다.


IoAdapter

IoAdapter 클래스를 상속해서 어댑터 클래스를 생성하고 검색을 통해 createIOServer() 메서드에서 서버를 생성한다는 것을 알았다. OnGatewayInit 인터페이스의 afterInit() 메서드는 이름에서 알 수 있듯이 서버 생성 후에 추가적인 작업을 하는 메서드이다. 빨리 알았으면 좋았을텐데... 여하튼 createIOServer() 메서드는 포트와 옵션을 매개변수로 가지는데 이 옵션에서 여러 작업을 수행한다. 그 중 하나가 allowRequest 옵션이다. SocketIO 공식 문서의 해당 옵션의 설명에 따르면 이 옵션은 핸드셰이크 또는 업그레이드 요청을 1번 매개변수로 받아 계속 진행할지 여부를 결정할 수 있는 함수이다. 즉, 이름 그래도 요청의 허용 여부를 묻는 함수이다. 그리고 그토록 찾던 부분이 밑에 나온다... 'initial_headers 이벤트와 함께 사용하여 클라이언트에 쿠키를 전송하는 데 사용될 수 있다' 하... 드디터 찾았다! 그런데... 삽집을 다시하게 되었다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
createIOServer(port: number, options?: ServerOptions): any {
  const server: Server = super.createIOServer(port, options);
 
  // allowRequest() 메서드는 핸드셰이크 또는 업그레이드 요청을 1번 매개변수로 받고 계속 진행할지 여부를 결정할 수 있는 함수이다.
  // initial_headers 이벤트와 함께 사용하여 클라이언트에 쿠키를 보내는 데에도 사용할 수 있다.
  options.allowRequest = async (request, allowFunction) => {
    const cookie = request.headers.cookie;
    const { access_token: accessToken } = parse(cookie);
 
    const isTokenVerified =
      accessToken && (await this.authService.verifyToken(accessToken));
    const doesUserExist =
      isTokenVerified &&
      (await this.usersService.findById(isTokenVerified.id));
 
    if (!isTokenVerified || !doesUserExist) {
      return allowFunction('권한 없음.'false);
    }
 
    return allowFunction(nulltrue);
  };
 
  // initial_headers 이벤트는 세션의 첫번째 HTTP 요청 (핸드셰이크)의 응답 헤더를 쓰기 직전에 발생하며 사용자 정의할 수 있도록 허용한다.
  server.engine.on('initial_headers', (headers, request) => {
    const session = this.getCookie('session_id', request);
    if (!session) {
      headers['set-cookie'= serialize(`session_id`, randomUUID(), {
        path: '/',
        httpOnly: false,
        secure: false,
        sameSite: 'strict',
        expires: new Date(Date.now() + 1800000),
      });
    }
  });
  return server;
}
 
private getCookie(name: string, request: any) {
  const cookies = parse(request.headers.cookie || '');
 
  return cookies[name];
}
cs

이상하게 쿠키가 생성되는 경우도 있고 안되는 경우도 있었다!? 뭐지?? 코드 자체에 문제가 있다고 생각해 이리저리 수정을 해도 도저히 해결되지 않았다. 그렇게 또 새벽까지 하다가 SocketIO 공식 문서를 읽다가 유레카 순간이 찾아왔다. 순서가 잘못된 것이었다!! allowRequest 옵션을 수정한 다음 이 옵션을 전달해서 서버를 만들어야 하는데 생성하고 옵션을 설정하니 당연히 제대로 동작할 수 없지! 근데 또 쿠키가 생성된 경우도 있어서 신기할 따름이다...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
createIOServer(port: number, options?: ServerOptions): any {
 
  options.allowRequest = async (request, allowFunction) => {
    const cookie = request.headers.cookie;
 
    // 쿠키에 토큰이 존재하지 않을 경우 accessToken 자체의 값이 없어서 서버가 다운되는 것을 방지한다.
    if (!cookie) {
      return allowFunction('권한 없음.'false);
    }
 
    const { access_token: accessToken } = parse(cookie);
 
    const isTokenValidated =
      accessToken && (await this.authService.validateToken(accessToken));
 
    const filter = buildFilter('_id', isTokenValidated.id)
 
    const doesUserExist =
      isTokenValidated &&
      (await this.usersService.findOne(filter));
 
    if (!isTokenValidated || !doesUserExist) {
      return allowFunction('권한 없음'false);
    }
    return allowFunction(nulltrue);
  };
 
  // allowRequest() 메서드에 설정한 옵션을 전달하여 서버를 생성한다.
  const server: Server = super.createIOServer(port, options);
 
  server.engine.on('initial_headers', (headers, request) => {
    const session = this.getCookie('session_id', request);
    if (!session) {
      headers['set-cookie'= serialize(`session_id`, randomUUID(), {
        path: '/',
        httpOnly: false,
        secure: false,
        sameSite: 'strict',
        expires: new Date(Date.now() + 1800000),
      });
    }
  });
 
  return server;
}
 
private getCookie(name: string, request: any) {
  const cookies = parse(request.headers.cookie || '');
 
  return cookies[name];
}
cs

드디어 쿠키 생성!!!!

그런데 쿠키를 handleConnection()에서 읽으려고 하니 undefined가 나오네... 이에 대한 이유는 바로 밑에~


요청/응답

분명히 개발자 도구의 Application의 Cookies를 보면 session_id가 저장된 쿠키가 보인다. 그런데 로그를 찍으면 undefined이다. 더 이상한 건 새로고침을 하거나 다른 경매방에 들어가면 값이 출력된다! SocketIO 공식 문서의 initial_headers 이벤트의 설명은 다음과 같다. '이벤트는 세션의 첫번째 HTTP 요청(핸드셰이크)의 응답 헤더를 작성하기 바로 직전에 발생하며 사용자 정의 설정을 할 수 있다.' 여기에 힌트가 숨어있다. 즉, 첫번째 응답에 쿠키를 설정하고 클라이언트가 다음 요청을 보낼 때 해당 쿠키를 첨부할 것이라는 의미이다!! 하지만 그러면 아까 말한것처럼 새로고침을 해야하는데 이는 사용자 관점에서 너무 불편하다. 그래서 상품 목록 페이지에서 경매방이 있는 버튼을 클릭하는 순간 소켓 연결을 시작하고 종료해서 쿠키를 생성하는 방안으로 코드를 수정했다.


입찰 갱신 & 방

입찰 갱신 메서드를 만들고 테스트를 하는데 문제가 발생했다! 이상하게 입찰을 하면 다른 참가자의 페이지에는 최고 입찰가가 바로 갱신되는데 입찰을 한 참가자의 페이지는 새로고침을 해야 갱신된 최고 입찰가를 볼 수 있었다. 또한 한 사용자가 여러 개의 브라우저 탭을 통해 메시지를 전송하는 경우 자신의 나머지 브라우저 탭에 보내지는 메시지와 다른 사용자에게 보내지는 메시지를 구분하는 문제도 발생했다. SocketIO 공식 문서를 읽고 코드를 조금씩 수정하니 부분적으로 문제를 해결할 수 있었다.

일단 방이라는 개념을 이해해야 한다! SocketIO 공식 문서에 따르면 방은 여러 소켓이 참여하고 나가는 임의의 채널로 일부 클라이언트에게 이벤트를 브로드캐스트하는 데 사용된다. 참고로 방은 서버 전용 개념으로 클라이언트는 참여한 방의 목록에 접근할 수 없다. 소켓은 자동으로 자신의 아이디로 식별된 방에 참여하는데 아이디는 위에서 말한 일시적인 세션의 고유 식별자이다. Private Messaging 예제의 2부에 보면 세션 아이디와 함께 사용자 아이디를 로컬 스토리지에 저장한다. 나는 세션 아이디를 가지는 쿠키를 생성했지만 사용자 아이디를 가지는 쿠키를 따로 생성하지는 않고 접근 토큰을 통해 데이터베이스에 저장된 사용자 아이디를 사용했다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@WebSocketGateway({
  cors: {
    origin:
      process.env.NODE_ENV === 'production'
        ? false
        : ['http://127.0.0.1:3000''http://localhost:3000'],
    credentials: true,
  },
})
@UseFilters(WebsocketExceptionFilter)
@UseGuards(WsJwtAuthGuard)
export class BidsGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
 
  connections: Map<string, MongooseSchema.Types.ObjectId> = new Map();
  
  ...
 
  async handleConnection(client: Socket) {
    try {
      const cookie = client.handshake.headers.cookie;
      ...
      
      const { socket_session_id: socketSessionId } = parse(cookie);
      const { access_token: accessToken } = parse(cookie);
 
      const payload = await this.authService.validateToken(accessToken);
      const filter = buildFilter('_id', payload.id);
      const user = payload && (await this.usersService.findOne(filter));
 
      if (socketSessionId) {        
        // 세션이 존재하는 경우 사용자의 아이디 방에 참가한다.
        client.join(user._id.toString());
        this.connections.set(socketSessionId, user._id);
      }
 
      ...
    }
    
    ...
  }
  
  ...
}
 
cs

다음은 나를 정말 많은 시련을 준 입찰 처리다~ 참가자가 입찰을 하면 자신의 여러 브라우저 탭에는 "본인" 형식으로 메시지를 받고 다른 참가자는 "이름(이메일)" 형식으로 메시지를 받는다. Private Messaging 예제의 2부를 보면 to() 메서드를 두 번 사용해서 본인의 다른 브라우저 탭과 상대방에게 메시지를 전달하는데 나의 경우 경매방에 본인과 참가자가 같이 있기 때문에 다른 방식을 사용해야 했다. 해당 경매방에 존재하는 모든 소켓 인스턴스를 가져오기 위해 fetchSockets() 메서드를 사용하고 for ... of문에서 각 소켓마다 socket.rooms.has() 메서드로 본인인지 확인한다(즉, 해당 소켓이 입찰을 하는 참가자의 방에 속하는지 확인한다.). 본인이면 socket.emit()을 아니면 client.to(socket.id).emit()을 호출한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@WebSocketGateway({
  cors: {
    origin:
      process.env.NODE_ENV === 'production'
        ? false
        : ['http://127.0.0.1:3000''http://localhost:3000'],
    credentials: true,
  },
})
@UseFilters(WebsocketExceptionFilter)
@UseGuards(WsJwtAuthGuard)
export class BidsGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  
  connections: Map<string, MongooseSchema.Types.ObjectId> = new Map();
 
  ...
 
  @SubscribeMessage('bid')
  async sendMessage(client: Socket, makeBidDto: MakeBidDto) {
    try {
      const cookie = client.handshake.headers.cookie;
      const { socket_session_id: socketSessionId } = parse(cookie);
      const userId = this.connections.get(socketSessionId);
      const filter = buildFilter('_id', userId);
      const user = await this.usersService.findOne(filter);
 
      let bid: string;
      makeBidDto.userId = userId;      
      // 해당 경매방에 존재하는 모든 소켓 인스턴스를 반환한다.
      const sockets = await this.server
        .in(makeBidDto.roomId.toString())
        .fetchSockets();
 
      for (const socket of sockets) {
        bid = socket.rooms.has(userId.toString())
          ? `본인: ${makeBidDto.price}원`
          : `${user.name}(${user.email}): ${makeBidDto.price}원`;
 
        if (socket.rooms.has(userId.toString())) {
          // 브라우저에서 사용자가 여러 개의 탭을 사용하는 경우 전달되는 메시지.
          socket.emit('bid', bid);
        } else {
          // 다른 사용자에게 전달되는 메시지.
          client.to(socket.id).emit('bid', bid);
        }
      }
 
      await this.bidsService.create(makeBidDto);
    } catch (error) {
      console.log(error);
      error.message = '입찰 중 오류 발생.';
 
      client.emit('error', error);
      client.disconnect(true);
    }
  }
}
cs

아래의 스크린 샷에서 참가자는 Chrome, FireFox, Chrome Incognito이고 오른쪽은 두 개의 FireFox 브라우저 탭, 왼쪽 위는 Chrome Incognito의 브라우저 탭, 왼쪽 아래는 Chrome 브라우저 탭이다.

문제는 이 즉시 갱신이 어느 경우는 일어나지 않는다. 프론트에서 문제가 발생하는 것 같은데 아쉽게도 아직 해결하지 못했다..


어댑터

현재까지 구현한 소켓 코드는 오직 하나의 SocketIO 서버에서만 작동한다. 앞에서 언급한 애플리케이션 메모리를 인 메모리 캐시로 사용하는 점에서 알 수 있다. 고가용성 및 로드 밸런싱 목적으로 여러 SocketIO 서버로 확장하려는 경우 다음 두 가지를 구현해야 한다. 스티키 세션을 활성화 해야하고 어댑터를 사용해야 한다. 일단 어댑터를 먼저 구현하자!

어댑터는 모든 또는 일부 클라이언트에 이벤트를 브로드캐스트하는 서버 측 구성 요소이다. 다수의 SocketIO 서버로 확장할 때 어댑터를 사용해 이벤트가 모든 클라이언트로 올바르게 라우팅되도록 한다. 간단하게 말해 SocketIO 서버들 간에 데이터를 주고받을 수 있도록 허용하는 중간 서버로 볼 수 있다. SocketIO 공식 문서를 보면 총 5개의 공식 구현이 있는데 작품 캐싱 적용 시 Redis를 사용해서 Redis 어댑터로 구현했다. 구현 시 SocketIO 공식 문서와 NestJS 공식 문서를 참조했다. 이제 구현 코드를 보자!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// socket.adpater.ts
export class SocketIoAdapter extends IoAdapter {
  private adapterConstructor: ReturnType<typeof createAdapter>;
 
  ...
 
  async connectToRedis(): Promise<void> {
    const pubClient = new Redis();
    const subClient = pubClient.duplicate();
 
    this.adapterConstructor = createAdapter(pubClient, subClient);
  }
 
  createIOServer(port: number, options?: ServerOptions): any {
    ...
 
    const server: Server = super.createIOServer(port, options);
 
    ...
    
    server.adapter(this.adapterConstructor);
 
    return server;
  }
}
 
// main.ts
async function bootstrap() {
  const app = await NestFactory.create<NestExpressApplication>(AppModule);
 
  ...
 
  const socketIoAdapter = new SocketIoAdapter(app);
  await socketIoAdapter.connectToRedis();
 
  app.useWebSocketAdapter(socketIoAdapter);
  
  ...
}
bootstrap();
cs

Redis 어댑터는 발행/구독 패러다임을 사용하는데 이는 Redis 공식 문서에 상세하게 나와있기 때문에 생략한다! 중요한 점은 Redis 어댑터를 통해 여러 클라이언트에 전송되는 각 패킷은 현재 서버에 연결된 모든 일치하는 클라이언트에게 전송되고 Redis 채널에 게시되어 클러스터의 다른 SocketIO 서버에서 수신된다. 코드를 보면 NestJS 공식 문서의 적혀있는 await Promise.all([pubClient.connect(), subClient.connect()]) 코드가 없는데 SocketIO 공식 문서를 보면 ioredis 패키지를 사용하는 경우 이 부분이 필요하지 않다고 나와있다.

스티키 세션이 활성화되면 SocketIO 서버 사이에 소켓 세션을 공유해야 하기 때문에 인 메모리 캐시로 애플리케이션의 메모리가 아닌 Redis와 같은 인 메모리 데이터 저장소를 사용해야 한다. 현재 Redis를 사용하고 있으며 Private Messaging의 4부를 보면서 구현한 소켓 세션 저장소 코드는 다음과 같다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// socket-session-store.ts
export class SocketSessionStore {
  constructor(private redisClient: Redis) {
    this.redisClient = redisClient;
  }
 
  // 작품 캐싱과 달리 Redis의 모든 메서드를 사용할 수 있어 Hash를 사용했다.
  async save(sessionId: string, userId: MongooseSchema.Types.ObjectId) {
    await this.redisClient
      // multi() 메서드는 트랜잭션 블록의 시작을 표시하며 이후의 명령은 exec() 메서드를 사용하여 원자적으로 실행되기 위해 대기열에 들어간다.
      .multi()
      .hset(`socket:session#${sessionId}`, 'user', userId.toString())
      .expire(`socket:session#${sessionId}`, process.env.SOCKET_SESSION_TTL)
      .exec();
  }
 
  async find(sessionId: string) {
    const userId: unknown = await this.redisClient.hget(
      `socket:session#${sessionId}`,
      'user',
    );
 
    return userId as MongooseSchema.Types.ObjectId;
  }
}
 
 
// bids.gateway.ts
@WebSocketGateway({
  ...
})
@UseFilters(WebsocketExceptionFilter)
@UseGuards(WsJwtAuthGuard)
export class BidsGateway implements OnGatewayConnection, OnGatewayDisconnect {
  // 서버 인스턴스
  @WebSocketServer()
  server: Server;
 
  // 애플리케이션을 인 메모리로 사용하면 단일 Socket.IO 서버에서만 작동한다.
  //connections: Map<string, MongooseSchema.Types.ObjectId> = new Map();
  socketSessionStore: SocketSessionStore = new SocketSessionStore(new Redis());
 
  ...
 
  async handleConnection(client: Socket): Promise<void> {
    try {
      ...
 
      if (socketSessionId) {
        client.join(user._id.toString());
        //this.connections.set(socketSessionId, user._id); 
        this.socketSessionStore.save(socketSessionId, user._id);
      }
 
      ...
    } catch (error) {
      ...
    }
  }
 
  ...
 
  @SubscribeMessage('bid')
  async makeBid(client: Socket, makeBidDto: MakeBidDto): Promise<void> {
    try {
      const cookie = client.handshake.headers.cookie;
      const { socket_session_id: socketSessionId } = parse(cookie);
      //const userId = this.connections.get(socketSessionId);
      const userId = await this.socketSessionStore.find(socketSessionId);
     
      ...
 
    } catch (error) {
      ...
    }
  }
}
cs

Docker Compose에서 포트 3000과 포트 3001을 사용하는 두 개의 인스턴스를 생성한 다음 경매방에 참여해서 입찰을 하면 다음과 같이 성공적으로 메시지가 한 포트에서 다른 포트로 전송된다!
사용자1(구글, 포트 3000, 포트 3001) vs. 사용자2(구글 비밀 모드, 포트 3000, 포트 3001)


update: 2024.02.18

댓글 없음:

댓글 쓰기