团队扩大后,前端代码格式化规范的统一成了一个不大不小的麻烦。虽然有 .prettierrc
和 husky
,但总有人本地环境配置不当,导致 CI 阶段因为格式问题频繁失败。CI 流水线中执行 npm install
再运行 Prettier 既慢又占用资源。由此,一个想法浮现:构建一个集中式的、版本统一的 Prettier 格式化服务,IDE 插件或 Git 钩子直接调用这个服务,而不是依赖本地的 Node.js 环境。
初步构想是创建一个 API,接收代码片段,返回格式化后的结果。但这太过简单,无法满足一个关键需求:对于大型文件或批处理任务,格式化可能需要数秒,我们希望客户端能实时看到处理进度,例如“正在解析AST”、“正在格式化”、“写入完成”等,而不是一个漫长的等待后接收一个最终结果。这种单向、实时的状态推送场景,Server-Sent Events (SSE) 是比 WebSocket 更轻量、更合适的选择。
技术选型上,后端我选择了 Kotlin + Ktor,因为它轻量、异步优先,与 Kotlin Coroutines 深度集成,非常适合处理 I/O 密集型和长连接任务。而要在 JVM 环境中运行 Prettier 这个 JavaScript 工具,GraalVM 的 Polyglot API 是不二之选。它允许我们在 JVM 内部高效、安全地执行 JavaScript 代码。
于是,一个清晰的架构浮现了:
- 客户端通过一个 REST API
POST
请求提交待格式化的代码,服务立即返回一个唯一的任务 ID 和一个用于接收实时事件的 SSE 连接端点。 - Ktor 后端接收到请求后,启动一个后台协程,使用 GraalVM
Context
来执行 Prettier 的格式化逻辑。 - 客户端使用任务 ID 连接到 SSE 端点。
- 在后台协程中,每当 Prettier 完成一个关键步骤,就通过一个共享的事件通道(
SharedFlow
)将状态事件推送出去。 - Ktor 的 SSE 端点订阅这个
SharedFlow
,并将接收到的事件实时地流式传输给客户端。 - 任务完成后,推送一个包含最终格式化代码的
done
事件,并关闭连接。
以下是这个异步格式化服务的核心实现过程。
环境与依赖配置
首先是项目的基础设施。build.gradle.kts
文件需要引入 Ktor 相关依赖、GraalVM SDK 以及日志库。
// build.gradle.kts
plugins {
kotlin("jvm") version "1.9.20"
id("io.ktor.plugin") version "2.3.6"
id("org.jetbrains.kotlin.plugin.serialization") version "1.9.20"
}
group = "com.example"
version = "0.1.0"
application {
mainClass.set("com.example.ApplicationKt")
}
repositories {
mavenCentral()
}
dependencies {
// Ktor Core
implementation("io.ktor:ktor-server-core-jvm")
implementation("io.ktor:ktor-server-cio-jvm")
implementation("io.ktor:ktor-server-content-negotiation-jvm")
implementation("io.ktor:ktor-serialization-kotlinx-json-jvm")
implementation("io.ktor:ktor-server-sse-jvm") // Server-Sent Events support
// Logging
implementation("ch.qos.logback:logback-classic:1.4.11")
// GraalVM Polyglot API for running JavaScript
implementation("org.graalvm.sdk:graal-sdk:23.1.1")
implementation("org.graalvm.js:js:23.1.1")
implementation("org.graalvm.js:js-scriptengine:23.1.1")
// Coroutines
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
// Testing
testImplementation("io.ktor:ktor-server-tests-jvm")
testImplementation(kotlin("test-junit5"))
testImplementation("org.junit.jupiter:junit-jupiter-api:5.10.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.10.0")
}
// Ktor Plugin Configuration
ktor {
fatJar {
archiveBaseName.set("prettier-service")
}
}
这里的关键是 io.ktor:ktor-server-sse-jvm
用于支持 SSE,以及 GraalVM 的 graal-sdk
和 js
依赖。
核心服务:Prettier 格式化引擎
我们需要一个服务来封装与 GraalVM 的交互。直接在路由处理器中创建和销毁 Context
开销巨大且容易出错。一个更好的实践是创建一个单例的服务,管理 GraalVM Context
的生命周期。
在真实项目中,GraalVM Context
不是线程安全的。如果多个请求并发使用同一个 Context
,会产生冲突。因此,一种策略是为每个请求创建一个新的 Context
,或者使用一个 Context
池。在这个场景下,由于格式化是CPU密集型任务,我们会将其调度到专用的线程池中,为每个任务创建一个新的 Context
是一个安全且合理的选择。
// src/main/kotlin/com/example/service/PrettierService.kt
package com.example.service
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.graalvm.polyglot.Context
import org.graalvm.polyglot.HostAccess
import org.graalvm.polyglot.Source
import java.io.File
import java.util.function.Consumer
// A wrapper class for emitting progress events
data class ProgressEvent(val type: String, val message: String, val data: String? = null)
class PrettierService {
// Pre-load Prettier source code to avoid reading from disk on every request.
private val prettierStandaloneJs: String by lazy {
// In a production environment, this file would be bundled in the JAR.
val prettierFile = File("src/main/resources/standalone.js")
if (!prettierFile.exists()) {
throw IllegalStateException("Prettier standalone JS not found at ${prettierFile.absolutePath}")
}
prettierFile.readText()
}
/**
* Formats the given code asynchronously and emits progress events.
* @param code The source code to format.
* @param parser The parser Prettier should use (e.g., "typescript", "babel", "json").
* @param eventEmitter A consumer function to send progress events back to the caller.
* @return The formatted code.
*/
suspend fun format(code: String, parser: String, eventEmitter: Consumer<ProgressEvent>): String {
// Formatting is a CPU-intensive task and involves blocking JS execution.
// It should be run on a dedicated dispatcher to avoid blocking Ktor's event loop.
return withContext(Dispatchers.IO) {
val contextBuilder = Context.newBuilder("js")
.allowHostAccess(HostAccess.ALL) // Allow JS to call back into Java
.allowIO(false) // For security, disallow file system access from JS
contextBuilder.build().use { context ->
try {
// Step 1: Emit an initialization event
eventEmitter.accept(ProgressEvent("status", "Initializing GraalVM context..."))
// Load Prettier into the context
context.eval(Source.create("js", prettierStandaloneJs))
eventEmitter.accept(ProgressEvent("status", "Prettier library loaded."))
// Get a proxy to the `prettier.format` function
val prettierFormat = context.getBindings("js").getMember("prettier").getMember("format")
eventEmitter.accept(ProgressEvent("status", "AST parsing started."))
// Step 2: Prepare options for Prettier
val options = context.eval("js", "({})")
options.putMember("parser", parser)
options.putMember("plugins", context.getBindings("js").getMember("prettierPlugins").getMember("babylon"))
// Step 3: Execute the format function. This is a blocking call.
val formattedCode = prettierFormat.execute(code, options).asString()
eventEmitter.accept(ProgressEvent("status", "Formatting complete."))
// Step 4: Return the final result
formattedCode
} catch (e: Exception) {
// In case of a JS error or any other exception, emit an error event.
eventEmitter.accept(ProgressEvent("error", "Formatting failed: ${e.message}"))
throw e // Re-throw to let the caller handle it
}
}
}
}
}
这里有几个关键点:
-
withContext(Dispatchers.IO)
: GraalVM 的 JavaScript 执行是阻塞和计算密集型的,将其放在Dispatchers.IO
(或一个专用的固定大小线程池)上可以防止它阻塞 Ktor 的主事件循环线程。 -
Context.newBuilder(...).build().use
:Context
是一个需要被关闭的资源。使用use
块可以确保无论成功还是异常,context.close()
都会被调用,避免资源泄漏。 -
ProgressEvent
和eventEmitter
: 我们定义了一个简单的数据类来标准化事件结构,并通过一个回调函数eventEmitter
将进度实时地发送回调用方。这是实现实时反馈的核心机制。 - 错误处理:
try-catch
块捕获了格式化过程中可能出现的任何异常(例如,JavaScript 语法错误),并将其作为error
事件发送出去。
任务管理与事件流
当一个格式化请求进来时,我们不能阻塞等待它完成。我们需要一个系统来管理这些后台任务,并将它们的进度事件广播给正确的客户端。
// src/main/kotlin/com/example/service/FormattingJobManager.kt
package com.example.service
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
// Data class to represent a formatting job request
data class FormatRequest(val code: String, val parser: String)
class FormattingJobManager(
private val prettierService: PrettierService
) : CoroutineScope {
// Use a SupervisorJob so that failure of one child coroutine doesn't cancel others.
override val coroutineContext: CoroutineContext = SupervisorJob() + Dispatchers.Default
// In-memory store for job events. For production, this should be backed by Redis or another distributed cache.
private val jobChannels = ConcurrentHashMap<String, MutableSharedFlow<ProgressEvent>>()
/**
* Submits a new formatting job.
* @return The unique ID for the created job.
*/
fun submitJob(request: FormatRequest): String {
val jobId = UUID.randomUUID().toString()
val eventFlow = MutableSharedFlow<ProgressEvent>(replay = 10) // Replay buffer for late subscribers
jobChannels[jobId] = eventFlow
launch { // Launch a new coroutine for this job
try {
// The eventEmitter lambda will push events into our SharedFlow
val formattedCode = prettierService.format(request.code, request.parser) { event ->
eventFlow.tryEmit(event) // Use tryEmit as we are in a hot path
}
// Emit the final result
eventFlow.emit(ProgressEvent("done", "Job completed successfully.", data = formattedCode))
} catch (e: Exception) {
// The service already emits an error event, but we can emit another one here if needed
eventFlow.emit(ProgressEvent("error", "Job failed with exception: ${e.message}"))
} finally {
// It's good practice to have a cleanup mechanism, e.g., remove old jobs
// For simplicity, we keep it in memory here. A TTL mechanism would be needed.
}
}
return jobId
}
/**
* Retrieves the event flow for a given job ID.
* @return A SharedFlow of progress events, or null if the job ID is not found.
*/
fun getJobFlow(jobId: String) = jobChannels[jobId]?.asSharedFlow()
}
这个管理器是整个异步流程的协调者:
-
CoroutineScope
: 它实现了CoroutineScope
,使其能够启动独立的、生命周期受控的协程。SupervisorJob
确保一个任务的失败不会影响到其他正在运行的任务。 -
ConcurrentHashMap<String, MutableSharedFlow<ProgressEvent>>
: 这是我们的核心数据结构。一个并发安全的Map,键是任务ID,值是一个MutableSharedFlow
。SharedFlow
是一种热流,允许多个订阅者接收其发出的事件。我们设置了一个replay
缓冲区,这样即使客户端在一些事件发出后才连接,也能收到最近的几条历史事件。 -
submitJob
: 此方法创建一个唯一的jobId
,设置好SharedFlow
,然后立即launch
一个新的协程来执行耗时的prettierService.format
调用。它本身是非阻塞的,会立刻返回jobId
。 -
getJobFlow
: 此方法允许 SSE 端点根据jobId
找到对应的事件流并进行订阅。
Ktor 路由与 SSE 端点
最后,我们将所有部分在 Ktor 的应用模块中组装起来。
// src/main/kotlin/com/example/Application.kt
package com.example
import com.example.service.FormatRequest
import com.example.service.FormattingJobManager
import com.example.service.PrettierService
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.cio.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.sse.*
import kotlinx.serialization.Serializable
fun main() {
embeddedServer(CIO, port = 8080, host = "0.0.0.0", module = Application::module)
.start(wait = true)
}
// Data class for the API response after submitting a job
@Serializable
data class JobSubmissionResponse(val jobId: String, val eventsUrl: String)
fun Application.module() {
install(ContentNegotiation) {
json()
}
// Initialize services as singletons
val prettierService = PrettierService()
val jobManager = FormattingJobManager(prettierService)
routing {
// Endpoint to submit a new formatting job
post("/format") {
try {
val request = call.receive<FormatRequest>()
if (request.code.isBlank()) {
call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Code content cannot be empty.")
return@post
}
val jobId = jobManager.submitJob(request)
val response = JobSubmissionResponse(
jobId = jobId,
eventsUrl = "/format/events/$jobId"
)
call.respond(io.ktor.http.HttpStatusCode.Accepted, response)
} catch (e: Exception) {
call.respond(io.ktor.http.HttpStatusCode.InternalServerError, "Failed to submit job: ${e.message}")
}
}
// SSE endpoint to stream job progress
get("/format/events/{jobId}") {
val jobId = call.parameters["jobId"]
if (jobId == null) {
call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Missing jobId")
return@get
}
val jobFlow = jobManager.getJobFlow(jobId)
if (jobFlow == null) {
call.respond(io.ktor.http.HttpStatusCode.NotFound, "Job with ID $jobId not found.")
return@get
}
try {
call.respondSse(jobFlow) { event ->
// Map our ProgressEvent to an SSE event
this.event = event.type // e.g., "status", "error", "done"
this.data = event.message
// If there is additional data (like the final code), send it in another data line or as JSON
if (event.data != null) {
this.data = "${event.message}\n${event.data}"
}
}
} catch (e: kotlinx.coroutines.CancellationException) {
// This is expected when the client disconnects. Log it for debugging.
log.info("Client for job $jobId disconnected.")
} catch (e: Exception) {
log.error("Error during SSE streaming for job $jobId", e)
}
}
}
}
这段代码定义了两个端点:
-
POST /format
: 接收JSON格式的FormatRequest
,调用jobManager.submitJob
,然后立即以202 Accepted
状态码返回一个包含jobId
和 SSE URL 的响应。 -
GET /format/events/{jobId}
: 这是一个 SSE 端点。它通过jobId
从jobManager
获取SharedFlow
。如果找到,call.respondSse(jobFlow)
会建立一个持久的连接,并将jobFlow
中的每一个ProgressEvent
自动转换为 SSE 格式发送给客户端。当流结束或客户端断开连接时,连接会自动关闭。
流程可视化与客户端交互
整个异步交互流程可以通过一个时序图来清晰地展示。
sequenceDiagram participant Client participant KtorServer as Ktor Server participant JobManager as FormattingJobManager participant PrettierWorker as Background Coroutine Client->>+KtorServer: POST /format (code, parser) KtorServer->>+JobManager: submitJob(request) JobManager->>JobManager: Create jobId & SharedFlow JobManager->>+PrettierWorker: launch { format(...) } JobManager-->>-KtorServer: return jobId KtorServer-->>-Client: 202 Accepted (jobId, eventsUrl) Client->>+KtorServer: GET /format/events/{jobId} (SSE Connection) KtorServer->>JobManager: getJobFlow(jobId) JobManager-->>KtorServer: return SharedFlow KtorServer-->>Client: Establishes SSE stream from Flow PrettierWorker->>PrettierWorker: Prettier.format() begins PrettierWorker->>JobManager: Emit ProgressEvent("status", "Initializing...") JobManager->>KtorServer: (via SharedFlow) KtorServer-->>Client: event: status\ndata: Initializing... loop Formatting Steps PrettierWorker->>JobManager: Emit ProgressEvent("status", "...") JobManager->>KtorServer: (via SharedFlow) KtorServer-->>Client: event: status\ndata: ... end PrettierWorker->>-JobManager: Emit ProgressEvent("done", "...", formattedCode) JobManager->>KtorServer: (via SharedFlow) KtorServer-->>-Client: event: done\ndata: ...\n[formatted code]
客户端可以使用原生的 EventSource
API 来消费这些事件。
// A simple client-side example
async function formatCode(code, parser) {
const response = await fetch('/format', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ code, parser })
});
if (response.status !== 202) {
console.error("Failed to submit job:", await response.text());
return;
}
const { jobId, eventsUrl } = await response.json();
console.log(`Job submitted with ID: ${jobId}. Listening for events at ${eventsUrl}`);
const eventSource = new EventSource(eventsUrl);
eventSource.addEventListener('status', (event) => {
console.log(`[STATUS] ${event.data}`);
});
eventSource.addEventListener('error', (event) => {
console.error(`[ERROR] ${event.data}`);
eventSource.close(); // Close connection on fatal error
});
eventSource.addEventListener('done', (event) => {
console.log(`[DONE] Formatting complete.`);
const [message, ...formattedCodeLines] = event.data.split('\n');
const formattedCode = formattedCodeLines.join('\n');
console.log("Formatted Code:\n", formattedCode);
eventSource.close(); // Job is done, close the connection
});
eventSource.onerror = (err) => {
console.error("EventSource failed:", err);
eventSource.close();
};
}
// Example usage:
const myCode = `const hello = 'world'; console.log(hello)`;
formatCode(myCode, 'babel');
局限性与未来迭代路径
这套实现方案作为一个概念验证是完整且健壮的,但在生产环境中还存在一些局限性:
- 任务状态持久化:
FormattingJobManager
使用内存中的ConcurrentHashMap
来存储任务状态。如果服务重启,所有正在进行中的任务信息都会丢失。在生产环境中,任务队列和事件流应该由 Redis Streams 或 Kafka 等中间件来支持,以确保任务的持久性和服务的无状态化。 - 水平扩展: 当前设计是单节点的。如果要水平扩展服务实例,客户端的 SSE 连接可能会路由到没有执行该任务的实例上。使用 Redis Pub/Sub 来广播事件可以在多个实例间同步状态,从而解决这个问题。
- 资源管理: GraalVM
Context
的创建和销毁是有成本的。对于高并发场景,可以考虑实现一个Context
池来复用资源。同时,需要对并发执行的格式化任务数量进行限制,以防止耗尽 CPU 和内存资源。 - 安全性: 直接暴露一个可以执行任意代码的服务存在安全风险。尽管我们已经禁用了IO权限,但在生产环境中需要更严格的沙箱环境配置,并对输入代码的大小和复杂性进行限制。