욱'S 노트

Blocking vs Non-Blocking 구현 예제 본문

Methdology/IO

Blocking vs Non-Blocking 구현 예제

devsun 2024. 12. 23. 17:33
반응형

에코서버 작성

간단한 에코서버를 작성해보자. 아래 코드에서는 한번에 하나의 요청만을 처리하게 된다.

import java.net.ServerSocket
import java.net.Socket

object BlockingServer {
    @JvmStatic
    fun main(args: Array<String>) {
        val server = ServerSocket(8080)
        while (true) {
            val socket = server.accept()
            handleRequest(socket)
        }
    }

    fun handleRequest(socket: Socket) {
        println(Thread.currentThread().name)
        val inputStream = socket.getInputStream()
        val outputStream = socket.getOutputStream()

        var data: Int
        while (inputStream.read().also { data = it } != -1) {
            outputStream.write(data)
        }

        outputStream.flush()
    }
}

 

 

Blocking & ThreadPool 

다음은 간단하게 소켓 처리를 별도의 스레드풀에서 처리를 하는 방식이다.

import java.net.Socket
import java.util.concurrent.Executors

object ThreadPerRequestServer {
    @JvmStatic
    fun main(args: Array<String>) {
        val server = java.net.ServerSocket(8080)
        val threadPool = Executors.newFixedThreadPool(3)

        while (true) {
            val socket = server.accept()
            threadPool.submit {
                handleRequest(socket)
            }
        }
    }

    fun handleRequest(socket: Socket) {
        println(Thread.currentThread().name)
        val inputStream = socket.getInputStream()
        val outputStream = socket.getOutputStream()

        var data: Int
        while (inputStream.read().also { data = it } != -1) {
            outputStream.write(data)
        }

        outputStream.flush()
    }
}

 

각각의 리퀘스트가 스레드풀에 할당되고, 스레드가 부족하면 스레드를 할당 받을 때까지 대기하게 된다.

 

Non-Blocking

다음은 멀티플렉싱 기반의 I/O 방식의 서버를 구현해본것이다.

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.nio.channels.ServerSocketChannel
import java.nio.channels.SocketChannel
import java.util.concurrent.ConcurrentHashMap


object NonBlockingServer {
    val sockets = ConcurrentHashMap<SocketChannel, ByteBuffer>()

    @JvmStatic
    fun main(args: Array<String>) {
        val server = ServerSocketChannel.open()
        server.bind(InetSocketAddress(8080))
        server.configureBlocking(false)

        val selector = Selector.open()

        server.register(selector, SelectionKey.OP_ACCEPT)

        while (true) {
            selector.select()
            val selectionKeys = selector.selectedKeys()

            selectionKeys.forEach { key ->
                when {
                    key.isAcceptable ->
                        handleAccept(key)
                    key.isReadable ->
                        handleRead(key)
                    key.isWritable ->
                        handleWrite(key)
                }

                selectionKeys.remove(key)
            }
        }

    }

    private fun handleAccept(key: SelectionKey) {
        println("handleAccept: " + Thread.currentThread().name)
        val socketChannel = key.channel() as ServerSocketChannel
        val socket = socketChannel.accept()

        socket.configureBlocking(false)

        socket.register(key.selector(), SelectionKey.OP_READ)

        sockets[socket] = ByteBuffer.allocateDirect(80)
    }

    private fun handleRead(key: SelectionKey) {
        println("handleRead: " + Thread.currentThread().name)
        val socket = key.channel() as SocketChannel
        val byteBuffer = sockets[socket] ?: throw IllegalStateException("ByteBuffer not found")

        val data = socket.read(byteBuffer)

        if (data == -1) {
            socket.close()
            sockets.remove(socket)
        }

        byteBuffer.flip()

        socket.configureBlocking(false)
        key.interestOps(SelectionKey.OP_WRITE)
    }

    private fun handleWrite(key: SelectionKey) {
        println("handleWrite: " + Thread.currentThread().name)
        val socket = key.channel() as SocketChannel
        val byteBuffer = sockets[socket] ?: throw IllegalStateException("ByteBuffer not found")

        socket.write(byteBuffer)

        while (!byteBuffer.hasRemaining()) {
            byteBuffer.compact()
            key.interestOps(SelectionKey.OP_READ)
        }
    }
}

 

Selector는 시스템 콜의 이벤트 통지 API를 사용하여 하나의 스레드로 동시에 많은 IO를 담당할 수 있다.

 

셀렉터가 데이터를 읽는 과정은 아래와 같다.

 

셀렉터가 데이터를 전달하는 과정은 다음과 같다.

출처 : https://mark-kim.blog/understanding-non-blocking-io-and-nio/

반응형

'Methdology > IO' 카테고리의 다른 글

Spring MVC vs Spring WebFlux  (4) 2024.12.24
IO 모델 (Linux, Java)  (1) 2024.12.20
Comments