asyncio를 사용한 비동기 소켓 통신 예제
asyncio를 사용한 비동기 소켓 통신 예제를 다룹니다.
2024. 6. 13 최초작성
Python에서 TCP 통신을 최적화하기 위해 사용할 수 있는 대표적인 라이브러리로는 asyncio와 Twisted가 있습니다. 본 포스트에서는 asyncio를 다룹니다.
asyncio
asyncio는 Python 3.4부터 내장된 비동기 I/O 프레임워크로, async와 await 구문을 사용한 비동기 프로그래밍을 지원합니다.
비동기 I/O는 입력과 출력을 비동기적으로 처리하여, 다른 작업이 완료될 때까지 기다리지 않고 다음 작업을 수행할 수 있도록 합니다.
이벤트 루프는 작업(코루틴)들을 등록하고, 등록된 작업들이 완료될 때까지 기다리며, 작업이 완료되면 그 결과를 처리하는 역할을 합니다.
장점
- asyncio는 Python 표준 라이브러리의 일부이므로, 별도의 설치가 필요 없습니다.
- async와 await 키워드를 사용하여 비동기 코드를 쉽게 작성할 수 있습니다. 예를 들어, await 키워드를 사용하여 비동기 함수를 호출하면, 해당 함수가 완료될 때까지 기다렸다가 결과를 반환받을 수 있습니다.
- asyncio는 다른 비동기 라이브러리와 호환성이 좋습니다.
단점
- asyncio는 기본적인 비동기 처리를 지원하지만, 일부 고급 네트워크 기능이 부족할 수 있습니다.
- 특정 네트워크 프로토콜을 사용해야 하거나, 매우 복잡한 네트워크 아키텍처를 구현해야 하는 경우에는 asyncio가 적합하지 않을 수 있습니다.
동기식 소켓 코드와 asyncio를 사용한 비동기식 소켓 코드의 차이점
동기식 소켓 코드
동기식 소켓 코드는 클라이언트 또는 서버가 네트워크 작업을 수행할 때, 그 소켓 작업이 완료될 때까지 기다립니다. 이 과정에서 프로그램의 다른 부분은 실행되지 않고 멈춰 있게 됩니다. 예를 들어, 서버가 클라이언트의 메시지를 기다리는 동안 다른 작업을 할 수 없습니다. 따라서 여러 클라이언트가 연결을 시도하는 경우 성능이 떨어질 수 있습니다. 이를 해결하기 위해 멀티스레딩이나 멀티프로세싱을 도입합니다.
동기식 소켓 코드는 직관적이고 순차적으로 실행됩니다.
asyncio를 사용한 비동기식 소켓 코드
비동기식 소켓 코드는 이벤트 루프를 사용하여 여러 네트워크 작업을 동시에 처리합니다. asyncio는 이러한 비동기 작업을 관리하는 Python 표준 라이브러리로, 여러 작업이 동시에 진행되도록 도와줍니다. 이를 통해 한 작업이 멈추더라도 다른 작업은 계속 진행됩니다. 이벤트 루프를 사용하여 단일 스레드에서도 많은 클라이언트를 효율적으로 처리할 수 있습니다.
asyncio를 사용한 비동기식 소켓 코드는 async와 await 구문을 사용하여 비동기 작업을 명시적으로 나타내야 합니다. 코드 구조가 조금 더 복잡해질 수 있습니다.
비동기 프로그래밍의 동작 원리
비동기 프로그래밍에서, async 키워드를 붙여 만든 코루틴 함수는 완료될 때까지 다른 작업을 막지 않고 이벤트 루프가 계속 실행되도록 합니다. 코루틴이 대기 상태일 때, await 키워드를 사용하여 이벤트 루프에 제어권을 넘겨 다른 코루틴이 실행될 수 있도록 합니다.
1. 이벤트 루프
- 이벤트 루프는 비동기 작업(코루틴)을 관리하고 실행합니다.
- 대기 상태의 작업을 스케줄링하고, I/O 이벤트, 타이머 이벤트, 콜백 등을 처리합니다.
2. 코루틴
- 코루틴은 비동기 함수로, await 키워드를 사용하여 제어권을 이벤트 루프에 넘기고 자신은 대기할 수 있습니다. 이벤트 루프는 이 제어권을 다른 코루틴에 넘깁니다.
3. await와 비동기 작업
- await 키워드는 비동기 작업이 완료될 때까지 현재 코루틴의 실행을 일시 중단하고, 이벤트 루프에 제어권을 반환합니다.
- 이렇게 하면 이벤트 루프가 다른 비동기 작업들을 실행할 수 있습니다.
await 사용 예제
await가 어떻게 이벤트 루프와 협력하여 비동기 작업을 처리하는지 보여주는 간단한 예제입니다.
비동기 프로그래밍에서 await는 코루틴이 대기 상태에 있을 때 이벤트 루프에 제어를 반환하여 다른 작업들이 실행될 수 있도록 합니다. 이를 통해 여러 비동기 작업들이 효율적으로 실행되며, 프로그램이 블로킹되지 않고 계속해서 동작할 수 있습니다..
다음 예제는 두 개의 비동기 작업이 어떻게 번갈아 가며 실행되는지를 보여줍니다. await는 코루틴이 대기 상태에 있을 때 이벤트 루프에 제어를 반환하여 다른 작업들이 실행될 수 있게 합니다. 이로 인해 프로그램이 블로킹되지 않고 여러 작업이 동시에 실행될 수 있습니다.
print_numbers 함수와 print_letters 함수는 출력후, await asyncio.sleep를 호출하여 다음 출력까지 대기하는 시간이(1초와 1.5초) 다릅니다.
await asyncio.sleep 호출을 호출하면 코루틴을 일시 중단하고 이벤트 루프에 제어를 반환합니다. 이벤트 루프는 대기 상태에 있는 다른 코루틴들을 실행하여 CPU 시간을 효율적으로 사용합니다. 결과적으로 두 코루틴은 번갈아 가며 실행되며, 각기 다른 주기로 출력을 수행합니다.
import asyncio # 1부터 5까지의 숫자를 출력합니다. # 각 숫자를 출력한 후 await asyncio.sleep(1)를 사용하여 1초 동안 대기합니다. async def print_numbers(): for i in range(1, 6): print(f"숫자: {i}") await asyncio.sleep(1) # 1초 동안 대기 # A'부터 'E'까지의 글자를 출력합니다. # 각 글자를 출력한 후 await asyncio.sleep(1.5)를 사용하여 1.5초 동안 대기합니다. async def print_letters(): for letter in 'ABCDE': print(f"글자: {letter}") await asyncio.sleep(1.5) # 1.5초 동안 대기 async def main(): # 두 개의 비동기 작업(코루틴)을 동시에 실행합니다. await asyncio.gather( print_numbers(), print_letters() ) asyncio.run(main()) |
프로그램이 실행되면 두 개의 코루틴이 번갈아 가며 실행되어 출력됩니다. 숫자는 1초마다, 글자는 1.5초마다 출력됩니다.
숫자: 1
글자: A
숫자: 2
글자: B
숫자: 3
글자: C
숫자: 4
숫자: 5
글자: D
글자: E
소켓 코드 실행 결과
서버를 실행한 후, 클라이언트 하나를 실행한 경우입니다. 여러개의 클라이언트도 가능하니 테스트 해보세요. 서버와 각 클라이언트를 각기 다른 터미널 또는 명령 프롬프트에서 실행하면 됩니다.
서버 | 클라이언트 |
1. 서버 코드를 실행합니다. 서버 실행 중: ('127.0.0.1', 8888) |
|
클라이언트 연결됨: ('127.0.0.1', 49232) |
2. 클라이언트 코드를 실행합니다. 당신의 이름을 입력하세요: |
3. 이름을 입력하고 엔터키를 누릅니다. 당신의 이름을 입력하세요: momo |
|
4. 메시지를 입력하고 엔터키를 누릅니다. 메시지를 입력하세요: 안녕하세요 |
|
('127.0.0.1', 49232)로부터 수신됨: momo: 안녕하세요 | |
수신됨: momo: 안녕하세요 메시지를 입력하세요: |
|
5. quit를 입력하고 엔터키를 누릅니다. 메시지를 입력하세요: quit |
|
('127.0.0.1', 49232)로부터 수신됨: momo: quit 클라이언트 ('127.0.0.1', 49232)가 'quit'을 보냈습니다. 연결을 종료합니다. ('127.0.0.1', 49232)와의 연결이 종료되었습니다. |
연결을 종료합니다 |
소켓 코드 흐름
서버코드
서버 코드의 실행 흐름을 정리한 것입니다
1. 서버 초기화 및 실행
Server 클래스의 인스턴스가 생성됩니다
server = Server()
__init__ 메서드에서 서버가 바인딩할 호스트 주소와 포트 번호를 설정합니다.
asyncio.run(server.run())를 사용하여 이벤트 루프를 시작하고 메인 코루틴(server.run())을 실행합니다. 이후 모든 await가 붙은 함수 호출은 이벤트 루프에 의해 관리됩니다. await가 붙은 함수는 대기 상태에 있을 때 제어권을 이벤트 루프에 반환하며, 이벤트 루프는 다른 비동기 작업을 계속해서 처리할 수 있습니다. 이렇게 함으로써 프로그램은 블로킹 없이 여러 비동기 작업을 동시에 효율적으로 처리할 수 있습니다.
2. 서버 실행 (run 메서드)
asyncio.start_server를 사용하여 클라이언트가 연결될 때 호출되는 콜백 함수 client_connected_cb와 함께 서버가 시작됩니다.
서버가 바인딩된 주소들을 출력하고, await server.serve_forever()를 호출하여 서버가 실행됩니다.
3. 클라이언트 연결 콜백 (client_connected_cb)
클라이언트가 서버에 연결될 때마다 호출되는 콜백 함수입니다.
async def client_connected_cb(reader, writer):
# 새로운 클라이언트가 서버에 접속할 때마다 해당 클라이언트를 처리할 새로운 핸들러 객체를 생성합니다.(ClientConnectionHandler 클래스의 인스턴스) reader와 writer 스크림을 사용하여 클라이언트와의 통신을 관리하고 데이터를 주고 받는 역할을 합니다. 이를 통해 서버는 여러 클라이언트의 연결을 개별적으로 관리할 수 있습니다.
handler = ClientConnectionHandler(reader, writer)
# 생성된 핸들러 객체의 handle 메서드를 호출하여 클라이언트 연결을 처리합니다. 클라이언트로부터 데이터를 읽기 위해 `read_loop` 코루틴을 실행하고, 데이터가 도착하면 이를 처리합니다.
await handler.handle()
4. 클라이언트 연결 처리 (handle 메서드)
클라이언트 연결이 성공적으로 이루어졌음을 출력합니다.
read_loop 메서드를 호출하여 클라이언트로부터 메시지를 읽기 시작합니다.
5. 메시지 읽기 루프 (read_loop 메서드)
- 무한 루프를 시작하여 클라이언트로부터 데이터를 읽습니다.
- 데이터를 읽고, 빈 바이트열일 경우 연결이 종료된 것으로 간주하고 루프를 종료합니다.
- 수신된 데이터를 디코딩하여 출력하고, 메시지가 'quit'을 포함하는 경우 연결을 종료합니다.
- 메시지가 'quit'이 아닌 경우, 수신된 데이터를 클라이언트에게 다시 보냅니다 (에코).
- 예외가 발생한 경우 해당 예외를 출력하고 루프를 종료합니다.
- `await self.reader.read(100)`는 데이터를 읽기 위해 비동기적으로 대기합니다. 이때 제어권은 이벤트 루프에 반환되며, 이벤트 루프는 다른 준비된 코루틴이나 콜백을 실행합니다.
- 데이터가 도착하면 `read_loop`는 다시 실행되어 데이터를 처리하고, 필요한 경우 클라이언트에게 응답을 보냅니다.
6. 클라이언트 연결 종료
연결이 종료되었음을 출력합니다.
이 흐름을 통해 서버는 클라이언트 연결을 비동기적으로 처리하며, 클라이언트로부터 메시지를 받아 에코하거나 quit 메시지를 받으면 연결을 종료합니다.
클라이언트 코드
다음은 주어진 클라이언트 코드의 실행 흐름을 정리한 것입니다:
1. 클라이언트 초기화
Client` 클래스의 인스턴스가 생성합니다.
client = Client()
__init__ 메서드에서 클라이언트가 연결할 서버의 호스트 주소와 포트 번호를 설정합니다.
name 변수를 초기화하여 사용자의 이름을 저장할 준비를 합니다.
2. 클라이언트 실행 (run 메서드)
asyncio.run(client.run())을 사용하여 이벤트 루프를 시작하고 메인 코루틴(client.run())을 실행합니다. 이후 모든 await가 붙은 함수 호출은 이벤트 루프에 의해 관리됩니다. await가 붙은 함수는 대기 상태에 있을 때 제어권을 이벤트 루프에 반환하며, 이벤트 루프는 다른 비동기 작업을 계속해서 처리할 수 있습니다. 이렇게 함으로써 프로그램은 블로킹 없이 여러 비동기 작업을 동시에 효율적으로 처리할 수 있습니다.
connect 메서드를 호출하여 서버에 비동기적으로 연결합니다.
asyncio.gather를 사용하여 read_loop와 write_loop를 동시에 실행합니다.
3. 서버에 연결 (connect 메서드)
asyncio.open_connection을 사용하여 서버에 연결하고, reader와 writer 스트림을 설정합니다.
4. 메시지 쓰기 루프 (write_loop 메서드)
- 현재 실행 중인 이벤트 루프를 가져옵니다 (`loop = asyncio.get_running_loop()`).
- 사용자의 이름을 입력받고 초기 메시지 프롬프트를 출력합니다.
- 무한 루프를 시작하여 사용자 입력을 비동기적으로 읽습니다.
- 사용자의 이름과 메시지를 결합하여 서버에 전송합니다.
- 사용자가 'quit'을 입력하면 연결을 종료하고 루프를 종료합니다.
- 그렇지 않은 경우, 메시지를 서버에 전송하고 루프를 계속합니다.
5. 메시지 읽기 루프 (read_loop 메서드)
- 무한 루프를 시작하여 서버로부터 데이터를 비동기적으로 읽습니다.
- `reader.read` 메서드를 사용하여 최대 100바이트를 읽습니다.
- 읽은 데이터가 빈 바이트열일 경우, 연결이 종료된 것으로 간주하고 루프를 종료합니다.
- 데이터를 디코딩하고 양쪽 공백을 제거한 후 수신된 메시지를 출력합니다.
- 사용자 입력을 받기 위한 프롬프트를 다시 출력하고 루프를 계속합니다.
6. 클라이언트 종료
- 사용자가 'quit'을 입력하면 `write_loop`에서 마지막 메시지를 서버에 전송합니다.
- `writer` 스트림을 닫고 완전히 닫힐 때까지 대기합니다.
- `reader` 스트림에 EOF를 강제로 주입하여 `read_loop`가 종료되도록 합니다.
- 루프가 종료됩니다.
이 흐름을 통해 클라이언트는 서버에 비동기적으로 연결하여 메시지를 주고받으며, 사용자가 'quit'을 입력하면 연결을 종료하고 프로그램이 종료됩니다.
코드
서버코드
import asyncio class ClientConnectionHandler: """서버에 대한 클라이언트 연결을 나타내는 클래스.""" def __init__(self, reader, writer): """ 클라이언트 연결 핸들러를 초기화합니다. 매개변수: reader: 클라이언트로부터 데이터를 읽기 위한 스트림 writer: 클라이언트로 데이터를 쓰기 위한 스트림 """ # reader 스트림을 인스턴스 변수로 저장합니다. self.reader = reader # writer 스트림을 인스턴스 변수로 저장합니다. self.writer = writer # 클라이언트의 주소 정보를 저장합니다. self.addr = writer.get_extra_info('peername') async def handle(self): """클라이언트 연결을 처리합니다.""" # 클라이언트가 연결되었음을 출력합니다. print(f"클라이언트 연결됨: {self.addr}") try: # 클라이언트로부터 메시지를 읽는 루프를 시작합니다. read_loop 함수가 종료되기를 대기하게 됩니다. 이때 제어권을 넘기고 메시지가 수신되면 재개합니다. await self.read_loop() except Exception as e: # 예외가 발생하면 예외 정보를 출력합니다. print(f"{self.addr}에서 처리되지 않은 예외 발생: {e}") finally: # 연결이 종료되었음을 출력합니다. print(f"{self.addr}와의 연결이 종료되었습니다.") async def read_loop(self): """클라이언트로부터 메시지를 읽고 'quit' 메시지를 처리합니다.""" while True: # 무한 루프를 시작합니다. try: # 클라이언트로부터 최대 100바이트를 읽습니다. 데이터를 수신할때 까지 대기합니다. # 이벤트 루프가 데이터 수신을 감지하면 실행 재개 되도록 합니다. data = await self.reader.read(100) # data가 빈 바이트열이라면 연결이 종료된 것으로 보고 무한루프를 종료합니다. # # StreamReader.read 메서드는 다음과 같은 상황에서 빈 바이트열을 반환할 수 있습니다: # 서버가 연결을 닫은 경우: 서버가 연결을 닫으면 클라이언트에서 더 이상 읽을 데이터가 없게 됩니다. # 네트워크 오류가 발생한 경우: 네트워크 오류로 인해 연결이 끊길 수 있습니다. if not data: break # 수신된 데이터를 디코딩하고 양쪽 공백을 제거합니다. message = data.decode().strip() # 수신된 메시지를 출력합니다. print(f"{self.addr}로부터 수신됨: {message}") # 수신된 메시지가 'quit'을 포함하는지 확인합니다. if "quit" in message.lower(): # 클라이언트가 'quit'을 보냈음을 출력합니다. print(f"클라이언트 {self.addr}가 'quit'을 보냈습니다. 연결을 종료합니다.") # writer 스트림을 닫습니다. self.writer.close() # writer 스트림이 완전히 닫힐 때까지 대기합니다. await self.writer.wait_closed() break # 루프를 종료합니다. else: # 수신된 메시지를 클라이언트에게 다시 보냅니다 (에코). self.writer.write(data) # 수신된 데이터를 다시 전송합니다. await self.writer.drain() # 버퍼가 비워질 때까지 대기합니다. except ConnectionResetError: print(f"{self.addr}와의 연결이 끊어졌습니다") # 연결이 끊어졌음을 출력합니다. break # 루프를 종료합니다. async def client_connected_cb(reader, writer): """클라이언트가 연결될 때 호출되는 콜백 함수.""" # 새로운 클라이언트가 서버에 접속할 때마다 해당 클라이언트를 처리할 새로운 핸들러 객체를 생성합니다. handler = ClientConnectionHandler(reader, writer) # 클라이언트 연결을 처리합니다. await handler.handle() class Server: """비동기 서버를 나타내는 클래스.""" # 호스트를 '127.0.0.1'로 설정하여 IPv4만 사용하도록 합니다. def __init__(self, host: str='127.0.0.1', port: int=8888): """ 서버를 초기화합니다. 매개변수: host (str): 서버가 바인딩할 호스트 주소. port (int): 서버가 바인딩할 포트 번호. """ self.host = host # 호스트 주소를 저장합니다. self.port = port # 포트 번호를 저장합니다. async def run(self): """서버를 실행합니다.""" server = await asyncio.start_server( client_connected_cb, # 클라이언트가 연결될 때 호출되는 콜백 함수. host=self.host, # 서버가 바인딩할 호스트 주소. port=self.port, # 서버가 바인딩할 포트 번호. ) # 서버가 바인딩된 주소들을 문자열로 변환합니다. addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets) # 서버가 실행 중임을 출력합니다. print(f'서버 실행 중: {addrs}') # 서버가 무한히 실행되도록 합니다. await server.serve_forever() if __name__ == '__main__': # Server 인스턴스를 생성합니다. server = Server() # server.run 함수를 실행하면서 이벤트 루프를 시작합니다. asyncio.run(server.run()) |
클라이언트 코드
import asyncio # 비동기 입출력을 위한 asyncio 모듈을 가져옵니다. class Client: """비동기 소켓을 통해 서버에 연결하는 클라이언트 클래스.""" def __init__(self, host: str='localhost', port: int=8888): """ 클라이언트를 초기화합니다. 매개변수: host (str): 연결할 서버의 호스트. port (int): 연결할 서버의 포트. """ self.host = host # 호스트 주소를 저장합니다. self.port = port # 포트 번호를 저장합니다. self.name = "" # 사용자의 이름을 저장할 변수를 초기화합니다. async def connect(self): """ 서버에 비동기적으로 연결합니다. reader와 writer 스트림을 설정합니다. """ # 서버에 연결하고 reader와 writer를 설정합니다. reader, writer = await asyncio.open_connection(self.host, self.port) # reader 스트림을 인스턴스 변수로 저장합니다. self.reader = reader # writer 스트림을 인스턴스 변수로 저장합니다. self.writer = writer async def write(self, message: str | bytes): """ 서버에 메시지를 작성합니다. 매개변수: message (str | bytes): 서버에 보낼 메시지. """ # message가 문자열이면 바이트로 인코딩합니다. if isinstance(message, str): message = message.encode() # 메시지 끝에 새 줄을 추가하고 서버에 전송합니다. self.writer.write(message + b'\r\n') # 버퍼가 비워질 때까지 대기합니다. await self.writer.drain() async def write_loop(self): """ 사용자 입력을 기반으로 서버에 메시지를 지속적으로 보냅니다. 사용자가 'quit'을 입력하면 종료됩니다. """ # asyncio.get_running_loop와 loop.run_in_executor를 사용하여 # 키보드 입력을 비동기적으로 처리합니다. # 현재 실행 중인 이벤트 루프를 가져옵니다. loop = asyncio.get_running_loop() # 사용자의 이름을 입력받습니다. self.name = await loop.run_in_executor(None, input, "당신의 이름을 입력하세요: ") print("메시지를 입력하세요: ", end='', flush=True) # 초기 메시지 프롬프트를 출력합니다. # 무한 루프를 시작합니다. while True: # 사용자 입력을 비동기적으로 읽습니다. message = await loop.run_in_executor(None, input, "") # 사용자 이름과 메시지를 결합합니다. full_message = f"{self.name}: {message}" # 사용자가 'quit'을 입력했는지 확인합니다. if message.lower() == 'quit': print("연결을 종료합니다") # 마지막 메시지를 서버에 보냅니다. await self.write(full_message) # writer 스트림을 닫습니다. self.writer.close() # writer 스트림이 완전히 닫힐 때까지 대기합니다. await self.writer.wait_closed() # EOF를 강제로 주입하여 read_loop가 종료되도록 합니다. self.reader.feed_eof() # 루프를 종료합니다. break # 사용자 메시지를 서버에 전송합니다. await self.write(full_message) async def read_loop(self): """ 서버로부터 메시지를 지속적으로 읽습니다. 각 메시지는 새 줄로 구분됩니다. """ # 무한 루프를 시작합니다. while True: # 서버로부터 최대 100바이트를 읽습니다. 데이터를 수신할때 까지 대기합니다. data = await self.reader.read(100) # data가 빈 바이트열이라면 연결이 종료된 것으로 보고 무한루프를 종료합니다. # # StreamReader.read 메서드는 다음과 같은 상황에서 빈 바이트열을 반환할 수 있습니다: # 서버가 연결을 닫은 경우: 서버가 연결을 닫으면 클라이언트에서 더 이상 읽을 데이터가 없게 됩니다. # 네트워크 오류가 발생한 경우: 네트워크 오류로 인해 연결이 끊길 수 있습니다. if not data: break # 수신된 데이터를 디코딩하고 양쪽 공백을 제거합니다. message = data.decode().strip() # 수신된 메시지를 출력하고 사용자 입력을 받기 위한 프롬프트를 출력합니다. print(f"\r수신됨: {message}\n메시지를 입력하세요: ", end='', flush=True) async def run(self): """ 서버에 연결하고, 읽기 및 쓰기 루프를 동시에 실행합니다. """ await self.connect() # 서버에 연결합니다. await asyncio.gather(self.read_loop(), self.write_loop()) # 읽기 및 쓰기 루프를 동시에 시작합니다. if __name__ == "__main__": client = Client() # Client 인스턴스를 생성합니다. asyncio.run(client.run()) # 클라이언트를 실행합니다. |