Ktor 与 Server-Sent Events 驱动的异步 Prettier 格式化服务实现


团队扩大后,前端代码格式化规范的统一成了一个不大不小的麻烦。虽然有 .prettierrchusky,但总有人本地环境配置不当,导致 CI 阶段因为格式问题频繁失败。CI 流水线中执行 npm install 再运行 Prettier 既慢又占用资源。由此,一个想法浮现:构建一个集中式的、版本统一的 Prettier 格式化服务,IDE 插件或 Git 钩子直接调用这个服务,而不是依赖本地的 Node.js 环境。

初步构想是创建一个 API,接收代码片段,返回格式化后的结果。但这太过简单,无法满足一个关键需求:对于大型文件或批处理任务,格式化可能需要数秒,我们希望客户端能实时看到处理进度,例如“正在解析AST”、“正在格式化”、“写入完成”等,而不是一个漫长的等待后接收一个最终结果。这种单向、实时的状态推送场景,Server-Sent Events (SSE) 是比 WebSocket 更轻量、更合适的选择。

技术选型上,后端我选择了 Kotlin + Ktor,因为它轻量、异步优先,与 Kotlin Coroutines 深度集成,非常适合处理 I/O 密集型和长连接任务。而要在 JVM 环境中运行 Prettier 这个 JavaScript 工具,GraalVM 的 Polyglot API 是不二之选。它允许我们在 JVM 内部高效、安全地执行 JavaScript 代码。

于是,一个清晰的架构浮现了:

  1. 客户端通过一个 REST API POST 请求提交待格式化的代码,服务立即返回一个唯一的任务 ID 和一个用于接收实时事件的 SSE 连接端点。
  2. Ktor 后端接收到请求后,启动一个后台协程,使用 GraalVM Context 来执行 Prettier 的格式化逻辑。
  3. 客户端使用任务 ID 连接到 SSE 端点。
  4. 在后台协程中,每当 Prettier 完成一个关键步骤,就通过一个共享的事件通道(SharedFlow)将状态事件推送出去。
  5. Ktor 的 SSE 端点订阅这个 SharedFlow,并将接收到的事件实时地流式传输给客户端。
  6. 任务完成后,推送一个包含最终格式化代码的 done 事件,并关闭连接。

以下是这个异步格式化服务的核心实现过程。

环境与依赖配置

首先是项目的基础设施。build.gradle.kts 文件需要引入 Ktor 相关依赖、GraalVM SDK 以及日志库。

// build.gradle.kts

plugins {
    kotlin("jvm") version "1.9.20"
    id("io.ktor.plugin") version "2.3.6"
    id("org.jetbrains.kotlin.plugin.serialization") version "1.9.20"
}

group = "com.example"
version = "0.1.0"

application {
    mainClass.set("com.example.ApplicationKt")
}

repositories {
    mavenCentral()
}

dependencies {
    // Ktor Core
    implementation("io.ktor:ktor-server-core-jvm")
    implementation("io.ktor:ktor-server-cio-jvm")
    implementation("io.ktor:ktor-server-content-negotiation-jvm")
    implementation("io.ktor:ktor-serialization-kotlinx-json-jvm")
    implementation("io.ktor:ktor-server-sse-jvm") // Server-Sent Events support

    // Logging
    implementation("ch.qos.logback:logback-classic:1.4.11")

    // GraalVM Polyglot API for running JavaScript
    implementation("org.graalvm.sdk:graal-sdk:23.1.1")
    implementation("org.graalvm.js:js:23.1.1")
    implementation("org.graalvm.js:js-scriptengine:23.1.1")

    // Coroutines
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")

    // Testing
    testImplementation("io.ktor:ktor-server-tests-jvm")
    testImplementation(kotlin("test-junit5"))
    testImplementation("org.junit.jupiter:junit-jupiter-api:5.10.0")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.10.0")
}

// Ktor Plugin Configuration
ktor {
    fatJar {
        archiveBaseName.set("prettier-service")
    }
}

这里的关键是 io.ktor:ktor-server-sse-jvm 用于支持 SSE,以及 GraalVM 的 graal-sdkjs 依赖。

核心服务:Prettier 格式化引擎

我们需要一个服务来封装与 GraalVM 的交互。直接在路由处理器中创建和销毁 Context 开销巨大且容易出错。一个更好的实践是创建一个单例的服务,管理 GraalVM Context 的生命周期。

在真实项目中,GraalVM Context 不是线程安全的。如果多个请求并发使用同一个 Context,会产生冲突。因此,一种策略是为每个请求创建一个新的 Context,或者使用一个 Context 池。在这个场景下,由于格式化是CPU密集型任务,我们会将其调度到专用的线程池中,为每个任务创建一个新的 Context 是一个安全且合理的选择。

// src/main/kotlin/com/example/service/PrettierService.kt

package com.example.service

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.graalvm.polyglot.Context
import org.graalvm.polyglot.HostAccess
import org.graalvm.polyglot.Source
import java.io.File
import java.util.function.Consumer

// A wrapper class for emitting progress events
data class ProgressEvent(val type: String, val message: String, val data: String? = null)

class PrettierService {

    // Pre-load Prettier source code to avoid reading from disk on every request.
    private val prettierStandaloneJs: String by lazy {
        // In a production environment, this file would be bundled in the JAR.
        val prettierFile = File("src/main/resources/standalone.js")
        if (!prettierFile.exists()) {
            throw IllegalStateException("Prettier standalone JS not found at ${prettierFile.absolutePath}")
        }
        prettierFile.readText()
    }

    /**
     * Formats the given code asynchronously and emits progress events.
     * @param code The source code to format.
     * @param parser The parser Prettier should use (e.g., "typescript", "babel", "json").
     * @param eventEmitter A consumer function to send progress events back to the caller.
     * @return The formatted code.
     */
    suspend fun format(code: String, parser: String, eventEmitter: Consumer<ProgressEvent>): String {
        // Formatting is a CPU-intensive task and involves blocking JS execution.
        // It should be run on a dedicated dispatcher to avoid blocking Ktor's event loop.
        return withContext(Dispatchers.IO) {
            val contextBuilder = Context.newBuilder("js")
                .allowHostAccess(HostAccess.ALL) // Allow JS to call back into Java
                .allowIO(false) // For security, disallow file system access from JS

            contextBuilder.build().use { context ->
                try {
                    // Step 1: Emit an initialization event
                    eventEmitter.accept(ProgressEvent("status", "Initializing GraalVM context..."))

                    // Load Prettier into the context
                    context.eval(Source.create("js", prettierStandaloneJs))
                    eventEmitter.accept(ProgressEvent("status", "Prettier library loaded."))

                    // Get a proxy to the `prettier.format` function
                    val prettierFormat = context.getBindings("js").getMember("prettier").getMember("format")
                    eventEmitter.accept(ProgressEvent("status", "AST parsing started."))

                    // Step 2: Prepare options for Prettier
                    val options = context.eval("js", "({})")
                    options.putMember("parser", parser)
                    options.putMember("plugins", context.getBindings("js").getMember("prettierPlugins").getMember("babylon"))

                    // Step 3: Execute the format function. This is a blocking call.
                    val formattedCode = prettierFormat.execute(code, options).asString()
                    eventEmitter.accept(ProgressEvent("status", "Formatting complete."))

                    // Step 4: Return the final result
                    formattedCode
                } catch (e: Exception) {
                    // In case of a JS error or any other exception, emit an error event.
                    eventEmitter.accept(ProgressEvent("error", "Formatting failed: ${e.message}"))
                    throw e // Re-throw to let the caller handle it
                }
            }
        }
    }
}

这里有几个关键点:

  1. withContext(Dispatchers.IO): GraalVM 的 JavaScript 执行是阻塞和计算密集型的,将其放在 Dispatchers.IO(或一个专用的固定大小线程池)上可以防止它阻塞 Ktor 的主事件循环线程。
  2. Context.newBuilder(...).build().use: Context 是一个需要被关闭的资源。使用 use 块可以确保无论成功还是异常,context.close() 都会被调用,避免资源泄漏。
  3. ProgressEventeventEmitter: 我们定义了一个简单的数据类来标准化事件结构,并通过一个回调函数 eventEmitter 将进度实时地发送回调用方。这是实现实时反馈的核心机制。
  4. 错误处理: try-catch 块捕获了格式化过程中可能出现的任何异常(例如,JavaScript 语法错误),并将其作为 error 事件发送出去。

任务管理与事件流

当一个格式化请求进来时,我们不能阻塞等待它完成。我们需要一个系统来管理这些后台任务,并将它们的进度事件广播给正确的客户端。

// src/main/kotlin/com/example/service/FormattingJobManager.kt

package com.example.service

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext

// Data class to represent a formatting job request
data class FormatRequest(val code: String, val parser: String)

class FormattingJobManager(
    private val prettierService: PrettierService
) : CoroutineScope {

    // Use a SupervisorJob so that failure of one child coroutine doesn't cancel others.
    override val coroutineContext: CoroutineContext = SupervisorJob() + Dispatchers.Default

    // In-memory store for job events. For production, this should be backed by Redis or another distributed cache.
    private val jobChannels = ConcurrentHashMap<String, MutableSharedFlow<ProgressEvent>>()

    /**
     * Submits a new formatting job.
     * @return The unique ID for the created job.
     */
    fun submitJob(request: FormatRequest): String {
        val jobId = UUID.randomUUID().toString()
        val eventFlow = MutableSharedFlow<ProgressEvent>(replay = 10) // Replay buffer for late subscribers
        jobChannels[jobId] = eventFlow

        launch { // Launch a new coroutine for this job
            try {
                // The eventEmitter lambda will push events into our SharedFlow
                val formattedCode = prettierService.format(request.code, request.parser) { event ->
                    eventFlow.tryEmit(event) // Use tryEmit as we are in a hot path
                }
                // Emit the final result
                eventFlow.emit(ProgressEvent("done", "Job completed successfully.", data = formattedCode))
            } catch (e: Exception) {
                // The service already emits an error event, but we can emit another one here if needed
                eventFlow.emit(ProgressEvent("error", "Job failed with exception: ${e.message}"))
            } finally {
                // It's good practice to have a cleanup mechanism, e.g., remove old jobs
                // For simplicity, we keep it in memory here. A TTL mechanism would be needed.
            }
        }
        return jobId
    }

    /**
     * Retrieves the event flow for a given job ID.
     * @return A SharedFlow of progress events, or null if the job ID is not found.
     */
    fun getJobFlow(jobId: String) = jobChannels[jobId]?.asSharedFlow()
}

这个管理器是整个异步流程的协调者:

  1. CoroutineScope: 它实现了 CoroutineScope,使其能够启动独立的、生命周期受控的协程。SupervisorJob 确保一个任务的失败不会影响到其他正在运行的任务。
  2. ConcurrentHashMap<String, MutableSharedFlow<ProgressEvent>>: 这是我们的核心数据结构。一个并发安全的Map,键是任务ID,值是一个 MutableSharedFlowSharedFlow 是一种热流,允许多个订阅者接收其发出的事件。我们设置了一个 replay 缓冲区,这样即使客户端在一些事件发出后才连接,也能收到最近的几条历史事件。
  3. submitJob: 此方法创建一个唯一的 jobId,设置好 SharedFlow,然后立即 launch 一个新的协程来执行耗时的 prettierService.format 调用。它本身是非阻塞的,会立刻返回 jobId
  4. getJobFlow: 此方法允许 SSE 端点根据 jobId 找到对应的事件流并进行订阅。

Ktor 路由与 SSE 端点

最后,我们将所有部分在 Ktor 的应用模块中组装起来。

// src/main/kotlin/com/example/Application.kt

package com.example

import com.example.service.FormatRequest
import com.example.service.FormattingJobManager
import com.example.service.PrettierService
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.cio.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.sse.*
import kotlinx.serialization.Serializable

fun main() {
    embeddedServer(CIO, port = 8080, host = "0.0.0.0", module = Application::module)
        .start(wait = true)
}

// Data class for the API response after submitting a job
@Serializable
data class JobSubmissionResponse(val jobId: String, val eventsUrl: String)

fun Application.module() {
    install(ContentNegotiation) {
        json()
    }

    // Initialize services as singletons
    val prettierService = PrettierService()
    val jobManager = FormattingJobManager(prettierService)

    routing {
        // Endpoint to submit a new formatting job
        post("/format") {
            try {
                val request = call.receive<FormatRequest>()
                if (request.code.isBlank()) {
                    call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Code content cannot be empty.")
                    return@post
                }

                val jobId = jobManager.submitJob(request)
                val response = JobSubmissionResponse(
                    jobId = jobId,
                    eventsUrl = "/format/events/$jobId"
                )
                call.respond(io.ktor.http.HttpStatusCode.Accepted, response)
            } catch (e: Exception) {
                call.respond(io.ktor.http.HttpStatusCode.InternalServerError, "Failed to submit job: ${e.message}")
            }
        }

        // SSE endpoint to stream job progress
        get("/format/events/{jobId}") {
            val jobId = call.parameters["jobId"]
            if (jobId == null) {
                call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Missing jobId")
                return@get
            }

            val jobFlow = jobManager.getJobFlow(jobId)
            if (jobFlow == null) {
                call.respond(io.ktor.http.HttpStatusCode.NotFound, "Job with ID $jobId not found.")
                return@get
            }

            try {
                call.respondSse(jobFlow) { event ->
                    // Map our ProgressEvent to an SSE event
                    this.event = event.type // e.g., "status", "error", "done"
                    this.data = event.message
                    // If there is additional data (like the final code), send it in another data line or as JSON
                    if (event.data != null) {
                        this.data = "${event.message}\n${event.data}"
                    }
                }
            } catch (e: kotlinx.coroutines.CancellationException) {
                // This is expected when the client disconnects. Log it for debugging.
                log.info("Client for job $jobId disconnected.")
            } catch (e: Exception) {
                log.error("Error during SSE streaming for job $jobId", e)
            }
        }
    }
}

这段代码定义了两个端点:

  1. POST /format: 接收JSON格式的 FormatRequest,调用 jobManager.submitJob,然后立即以 202 Accepted 状态码返回一个包含 jobId 和 SSE URL 的响应。
  2. GET /format/events/{jobId}: 这是一个 SSE 端点。它通过 jobIdjobManager 获取 SharedFlow。如果找到,call.respondSse(jobFlow) 会建立一个持久的连接,并将 jobFlow 中的每一个 ProgressEvent 自动转换为 SSE 格式发送给客户端。当流结束或客户端断开连接时,连接会自动关闭。

流程可视化与客户端交互

整个异步交互流程可以通过一个时序图来清晰地展示。

sequenceDiagram
    participant Client
    participant KtorServer as Ktor Server
    participant JobManager as FormattingJobManager
    participant PrettierWorker as Background Coroutine

    Client->>+KtorServer: POST /format (code, parser)
    KtorServer->>+JobManager: submitJob(request)
    JobManager->>JobManager: Create jobId & SharedFlow
    JobManager->>+PrettierWorker: launch { format(...) }
    JobManager-->>-KtorServer: return jobId
    KtorServer-->>-Client: 202 Accepted (jobId, eventsUrl)

    Client->>+KtorServer: GET /format/events/{jobId} (SSE Connection)
    KtorServer->>JobManager: getJobFlow(jobId)
    JobManager-->>KtorServer: return SharedFlow
    KtorServer-->>Client: Establishes SSE stream from Flow

    PrettierWorker->>PrettierWorker: Prettier.format() begins
    PrettierWorker->>JobManager: Emit ProgressEvent("status", "Initializing...")
    JobManager->>KtorServer: (via SharedFlow)
    KtorServer-->>Client: event: status\ndata: Initializing...

    loop Formatting Steps
        PrettierWorker->>JobManager: Emit ProgressEvent("status", "...")
        JobManager->>KtorServer: (via SharedFlow)
        KtorServer-->>Client: event: status\ndata: ...
    end

    PrettierWorker->>-JobManager: Emit ProgressEvent("done", "...", formattedCode)
    JobManager->>KtorServer: (via SharedFlow)
    KtorServer-->>-Client: event: done\ndata: ...\n[formatted code]

客户端可以使用原生的 EventSource API 来消费这些事件。

// A simple client-side example
async function formatCode(code, parser) {
    const response = await fetch('/format', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ code, parser })
    });

    if (response.status !== 202) {
        console.error("Failed to submit job:", await response.text());
        return;
    }

    const { jobId, eventsUrl } = await response.json();
    console.log(`Job submitted with ID: ${jobId}. Listening for events at ${eventsUrl}`);

    const eventSource = new EventSource(eventsUrl);

    eventSource.addEventListener('status', (event) => {
        console.log(`[STATUS] ${event.data}`);
    });

    eventSource.addEventListener('error', (event) => {
        console.error(`[ERROR] ${event.data}`);
        eventSource.close(); // Close connection on fatal error
    });

    eventSource.addEventListener('done', (event) => {
        console.log(`[DONE] Formatting complete.`);
        const [message, ...formattedCodeLines] = event.data.split('\n');
        const formattedCode = formattedCodeLines.join('\n');
        console.log("Formatted Code:\n", formattedCode);
        eventSource.close(); // Job is done, close the connection
    });

    eventSource.onerror = (err) => {
        console.error("EventSource failed:", err);
        eventSource.close();
    };
}

// Example usage:
const myCode = `const hello   =   'world'; console.log(hello)`;
formatCode(myCode, 'babel');

局限性与未来迭代路径

这套实现方案作为一个概念验证是完整且健壮的,但在生产环境中还存在一些局限性:

  1. 任务状态持久化: FormattingJobManager 使用内存中的 ConcurrentHashMap 来存储任务状态。如果服务重启,所有正在进行中的任务信息都会丢失。在生产环境中,任务队列和事件流应该由 Redis Streams 或 Kafka 等中间件来支持,以确保任务的持久性和服务的无状态化。
  2. 水平扩展: 当前设计是单节点的。如果要水平扩展服务实例,客户端的 SSE 连接可能会路由到没有执行该任务的实例上。使用 Redis Pub/Sub 来广播事件可以在多个实例间同步状态,从而解决这个问题。
  3. 资源管理: GraalVM Context 的创建和销毁是有成本的。对于高并发场景,可以考虑实现一个 Context 池来复用资源。同时,需要对并发执行的格式化任务数量进行限制,以防止耗尽 CPU 和内存资源。
  4. 安全性: 直接暴露一个可以执行任意代码的服务存在安全风险。尽管我们已经禁用了IO权限,但在生产环境中需要更严格的沙箱环境配置,并对输入代码的大小和复杂性进行限制。

  目录