团队扩大后,前端代码格式化规范的统一成了一个不大不小的麻烦。虽然有 .prettierrc 和 husky,但总有人本地环境配置不当,导致 CI 阶段因为格式问题频繁失败。CI 流水线中执行 npm install 再运行 Prettier 既慢又占用资源。由此,一个想法浮现:构建一个集中式的、版本统一的 Prettier 格式化服务,IDE 插件或 Git 钩子直接调用这个服务,而不是依赖本地的 Node.js 环境。
初步构想是创建一个 API,接收代码片段,返回格式化后的结果。但这太过简单,无法满足一个关键需求:对于大型文件或批处理任务,格式化可能需要数秒,我们希望客户端能实时看到处理进度,例如“正在解析AST”、“正在格式化”、“写入完成”等,而不是一个漫长的等待后接收一个最终结果。这种单向、实时的状态推送场景,Server-Sent Events (SSE) 是比 WebSocket 更轻量、更合适的选择。
技术选型上,后端我选择了 Kotlin + Ktor,因为它轻量、异步优先,与 Kotlin Coroutines 深度集成,非常适合处理 I/O 密集型和长连接任务。而要在 JVM 环境中运行 Prettier 这个 JavaScript 工具,GraalVM 的 Polyglot API 是不二之选。它允许我们在 JVM 内部高效、安全地执行 JavaScript 代码。
于是,一个清晰的架构浮现了:
- 客户端通过一个 REST API
POST请求提交待格式化的代码,服务立即返回一个唯一的任务 ID 和一个用于接收实时事件的 SSE 连接端点。 - Ktor 后端接收到请求后,启动一个后台协程,使用 GraalVM
Context来执行 Prettier 的格式化逻辑。 - 客户端使用任务 ID 连接到 SSE 端点。
- 在后台协程中,每当 Prettier 完成一个关键步骤,就通过一个共享的事件通道(
SharedFlow)将状态事件推送出去。 - Ktor 的 SSE 端点订阅这个
SharedFlow,并将接收到的事件实时地流式传输给客户端。 - 任务完成后,推送一个包含最终格式化代码的
done事件,并关闭连接。
以下是这个异步格式化服务的核心实现过程。
环境与依赖配置
首先是项目的基础设施。build.gradle.kts 文件需要引入 Ktor 相关依赖、GraalVM SDK 以及日志库。
// build.gradle.kts
plugins {
kotlin("jvm") version "1.9.20"
id("io.ktor.plugin") version "2.3.6"
id("org.jetbrains.kotlin.plugin.serialization") version "1.9.20"
}
group = "com.example"
version = "0.1.0"
application {
mainClass.set("com.example.ApplicationKt")
}
repositories {
mavenCentral()
}
dependencies {
// Ktor Core
implementation("io.ktor:ktor-server-core-jvm")
implementation("io.ktor:ktor-server-cio-jvm")
implementation("io.ktor:ktor-server-content-negotiation-jvm")
implementation("io.ktor:ktor-serialization-kotlinx-json-jvm")
implementation("io.ktor:ktor-server-sse-jvm") // Server-Sent Events support
// Logging
implementation("ch.qos.logback:logback-classic:1.4.11")
// GraalVM Polyglot API for running JavaScript
implementation("org.graalvm.sdk:graal-sdk:23.1.1")
implementation("org.graalvm.js:js:23.1.1")
implementation("org.graalvm.js:js-scriptengine:23.1.1")
// Coroutines
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
// Testing
testImplementation("io.ktor:ktor-server-tests-jvm")
testImplementation(kotlin("test-junit5"))
testImplementation("org.junit.jupiter:junit-jupiter-api:5.10.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.10.0")
}
// Ktor Plugin Configuration
ktor {
fatJar {
archiveBaseName.set("prettier-service")
}
}
这里的关键是 io.ktor:ktor-server-sse-jvm 用于支持 SSE,以及 GraalVM 的 graal-sdk 和 js 依赖。
核心服务:Prettier 格式化引擎
我们需要一个服务来封装与 GraalVM 的交互。直接在路由处理器中创建和销毁 Context 开销巨大且容易出错。一个更好的实践是创建一个单例的服务,管理 GraalVM Context 的生命周期。
在真实项目中,GraalVM Context 不是线程安全的。如果多个请求并发使用同一个 Context,会产生冲突。因此,一种策略是为每个请求创建一个新的 Context,或者使用一个 Context 池。在这个场景下,由于格式化是CPU密集型任务,我们会将其调度到专用的线程池中,为每个任务创建一个新的 Context 是一个安全且合理的选择。
// src/main/kotlin/com/example/service/PrettierService.kt
package com.example.service
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.graalvm.polyglot.Context
import org.graalvm.polyglot.HostAccess
import org.graalvm.polyglot.Source
import java.io.File
import java.util.function.Consumer
// A wrapper class for emitting progress events
data class ProgressEvent(val type: String, val message: String, val data: String? = null)
class PrettierService {
// Pre-load Prettier source code to avoid reading from disk on every request.
private val prettierStandaloneJs: String by lazy {
// In a production environment, this file would be bundled in the JAR.
val prettierFile = File("src/main/resources/standalone.js")
if (!prettierFile.exists()) {
throw IllegalStateException("Prettier standalone JS not found at ${prettierFile.absolutePath}")
}
prettierFile.readText()
}
/**
* Formats the given code asynchronously and emits progress events.
* @param code The source code to format.
* @param parser The parser Prettier should use (e.g., "typescript", "babel", "json").
* @param eventEmitter A consumer function to send progress events back to the caller.
* @return The formatted code.
*/
suspend fun format(code: String, parser: String, eventEmitter: Consumer<ProgressEvent>): String {
// Formatting is a CPU-intensive task and involves blocking JS execution.
// It should be run on a dedicated dispatcher to avoid blocking Ktor's event loop.
return withContext(Dispatchers.IO) {
val contextBuilder = Context.newBuilder("js")
.allowHostAccess(HostAccess.ALL) // Allow JS to call back into Java
.allowIO(false) // For security, disallow file system access from JS
contextBuilder.build().use { context ->
try {
// Step 1: Emit an initialization event
eventEmitter.accept(ProgressEvent("status", "Initializing GraalVM context..."))
// Load Prettier into the context
context.eval(Source.create("js", prettierStandaloneJs))
eventEmitter.accept(ProgressEvent("status", "Prettier library loaded."))
// Get a proxy to the `prettier.format` function
val prettierFormat = context.getBindings("js").getMember("prettier").getMember("format")
eventEmitter.accept(ProgressEvent("status", "AST parsing started."))
// Step 2: Prepare options for Prettier
val options = context.eval("js", "({})")
options.putMember("parser", parser)
options.putMember("plugins", context.getBindings("js").getMember("prettierPlugins").getMember("babylon"))
// Step 3: Execute the format function. This is a blocking call.
val formattedCode = prettierFormat.execute(code, options).asString()
eventEmitter.accept(ProgressEvent("status", "Formatting complete."))
// Step 4: Return the final result
formattedCode
} catch (e: Exception) {
// In case of a JS error or any other exception, emit an error event.
eventEmitter.accept(ProgressEvent("error", "Formatting failed: ${e.message}"))
throw e // Re-throw to let the caller handle it
}
}
}
}
}
这里有几个关键点:
-
withContext(Dispatchers.IO): GraalVM 的 JavaScript 执行是阻塞和计算密集型的,将其放在Dispatchers.IO(或一个专用的固定大小线程池)上可以防止它阻塞 Ktor 的主事件循环线程。 -
Context.newBuilder(...).build().use:Context是一个需要被关闭的资源。使用use块可以确保无论成功还是异常,context.close()都会被调用,避免资源泄漏。 -
ProgressEvent和eventEmitter: 我们定义了一个简单的数据类来标准化事件结构,并通过一个回调函数eventEmitter将进度实时地发送回调用方。这是实现实时反馈的核心机制。 - 错误处理:
try-catch块捕获了格式化过程中可能出现的任何异常(例如,JavaScript 语法错误),并将其作为error事件发送出去。
任务管理与事件流
当一个格式化请求进来时,我们不能阻塞等待它完成。我们需要一个系统来管理这些后台任务,并将它们的进度事件广播给正确的客户端。
// src/main/kotlin/com/example/service/FormattingJobManager.kt
package com.example.service
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
// Data class to represent a formatting job request
data class FormatRequest(val code: String, val parser: String)
class FormattingJobManager(
private val prettierService: PrettierService
) : CoroutineScope {
// Use a SupervisorJob so that failure of one child coroutine doesn't cancel others.
override val coroutineContext: CoroutineContext = SupervisorJob() + Dispatchers.Default
// In-memory store for job events. For production, this should be backed by Redis or another distributed cache.
private val jobChannels = ConcurrentHashMap<String, MutableSharedFlow<ProgressEvent>>()
/**
* Submits a new formatting job.
* @return The unique ID for the created job.
*/
fun submitJob(request: FormatRequest): String {
val jobId = UUID.randomUUID().toString()
val eventFlow = MutableSharedFlow<ProgressEvent>(replay = 10) // Replay buffer for late subscribers
jobChannels[jobId] = eventFlow
launch { // Launch a new coroutine for this job
try {
// The eventEmitter lambda will push events into our SharedFlow
val formattedCode = prettierService.format(request.code, request.parser) { event ->
eventFlow.tryEmit(event) // Use tryEmit as we are in a hot path
}
// Emit the final result
eventFlow.emit(ProgressEvent("done", "Job completed successfully.", data = formattedCode))
} catch (e: Exception) {
// The service already emits an error event, but we can emit another one here if needed
eventFlow.emit(ProgressEvent("error", "Job failed with exception: ${e.message}"))
} finally {
// It's good practice to have a cleanup mechanism, e.g., remove old jobs
// For simplicity, we keep it in memory here. A TTL mechanism would be needed.
}
}
return jobId
}
/**
* Retrieves the event flow for a given job ID.
* @return A SharedFlow of progress events, or null if the job ID is not found.
*/
fun getJobFlow(jobId: String) = jobChannels[jobId]?.asSharedFlow()
}
这个管理器是整个异步流程的协调者:
-
CoroutineScope: 它实现了CoroutineScope,使其能够启动独立的、生命周期受控的协程。SupervisorJob确保一个任务的失败不会影响到其他正在运行的任务。 -
ConcurrentHashMap<String, MutableSharedFlow<ProgressEvent>>: 这是我们的核心数据结构。一个并发安全的Map,键是任务ID,值是一个MutableSharedFlow。SharedFlow是一种热流,允许多个订阅者接收其发出的事件。我们设置了一个replay缓冲区,这样即使客户端在一些事件发出后才连接,也能收到最近的几条历史事件。 -
submitJob: 此方法创建一个唯一的jobId,设置好SharedFlow,然后立即launch一个新的协程来执行耗时的prettierService.format调用。它本身是非阻塞的,会立刻返回jobId。 -
getJobFlow: 此方法允许 SSE 端点根据jobId找到对应的事件流并进行订阅。
Ktor 路由与 SSE 端点
最后,我们将所有部分在 Ktor 的应用模块中组装起来。
// src/main/kotlin/com/example/Application.kt
package com.example
import com.example.service.FormatRequest
import com.example.service.FormattingJobManager
import com.example.service.PrettierService
import io.ktor.serialization.kotlinx.json.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.cio.*
import io.ktor.server.plugins.contentnegotiation.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.server.sse.*
import kotlinx.serialization.Serializable
fun main() {
embeddedServer(CIO, port = 8080, host = "0.0.0.0", module = Application::module)
.start(wait = true)
}
// Data class for the API response after submitting a job
@Serializable
data class JobSubmissionResponse(val jobId: String, val eventsUrl: String)
fun Application.module() {
install(ContentNegotiation) {
json()
}
// Initialize services as singletons
val prettierService = PrettierService()
val jobManager = FormattingJobManager(prettierService)
routing {
// Endpoint to submit a new formatting job
post("/format") {
try {
val request = call.receive<FormatRequest>()
if (request.code.isBlank()) {
call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Code content cannot be empty.")
return@post
}
val jobId = jobManager.submitJob(request)
val response = JobSubmissionResponse(
jobId = jobId,
eventsUrl = "/format/events/$jobId"
)
call.respond(io.ktor.http.HttpStatusCode.Accepted, response)
} catch (e: Exception) {
call.respond(io.ktor.http.HttpStatusCode.InternalServerError, "Failed to submit job: ${e.message}")
}
}
// SSE endpoint to stream job progress
get("/format/events/{jobId}") {
val jobId = call.parameters["jobId"]
if (jobId == null) {
call.respond(io.ktor.http.HttpStatusCode.BadRequest, "Missing jobId")
return@get
}
val jobFlow = jobManager.getJobFlow(jobId)
if (jobFlow == null) {
call.respond(io.ktor.http.HttpStatusCode.NotFound, "Job with ID $jobId not found.")
return@get
}
try {
call.respondSse(jobFlow) { event ->
// Map our ProgressEvent to an SSE event
this.event = event.type // e.g., "status", "error", "done"
this.data = event.message
// If there is additional data (like the final code), send it in another data line or as JSON
if (event.data != null) {
this.data = "${event.message}\n${event.data}"
}
}
} catch (e: kotlinx.coroutines.CancellationException) {
// This is expected when the client disconnects. Log it for debugging.
log.info("Client for job $jobId disconnected.")
} catch (e: Exception) {
log.error("Error during SSE streaming for job $jobId", e)
}
}
}
}
这段代码定义了两个端点:
-
POST /format: 接收JSON格式的FormatRequest,调用jobManager.submitJob,然后立即以202 Accepted状态码返回一个包含jobId和 SSE URL 的响应。 -
GET /format/events/{jobId}: 这是一个 SSE 端点。它通过jobId从jobManager获取SharedFlow。如果找到,call.respondSse(jobFlow)会建立一个持久的连接,并将jobFlow中的每一个ProgressEvent自动转换为 SSE 格式发送给客户端。当流结束或客户端断开连接时,连接会自动关闭。
流程可视化与客户端交互
整个异步交互流程可以通过一个时序图来清晰地展示。
sequenceDiagram
participant Client
participant KtorServer as Ktor Server
participant JobManager as FormattingJobManager
participant PrettierWorker as Background Coroutine
Client->>+KtorServer: POST /format (code, parser)
KtorServer->>+JobManager: submitJob(request)
JobManager->>JobManager: Create jobId & SharedFlow
JobManager->>+PrettierWorker: launch { format(...) }
JobManager-->>-KtorServer: return jobId
KtorServer-->>-Client: 202 Accepted (jobId, eventsUrl)
Client->>+KtorServer: GET /format/events/{jobId} (SSE Connection)
KtorServer->>JobManager: getJobFlow(jobId)
JobManager-->>KtorServer: return SharedFlow
KtorServer-->>Client: Establishes SSE stream from Flow
PrettierWorker->>PrettierWorker: Prettier.format() begins
PrettierWorker->>JobManager: Emit ProgressEvent("status", "Initializing...")
JobManager->>KtorServer: (via SharedFlow)
KtorServer-->>Client: event: status\ndata: Initializing...
loop Formatting Steps
PrettierWorker->>JobManager: Emit ProgressEvent("status", "...")
JobManager->>KtorServer: (via SharedFlow)
KtorServer-->>Client: event: status\ndata: ...
end
PrettierWorker->>-JobManager: Emit ProgressEvent("done", "...", formattedCode)
JobManager->>KtorServer: (via SharedFlow)
KtorServer-->>-Client: event: done\ndata: ...\n[formatted code]
客户端可以使用原生的 EventSource API 来消费这些事件。
// A simple client-side example
async function formatCode(code, parser) {
const response = await fetch('/format', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ code, parser })
});
if (response.status !== 202) {
console.error("Failed to submit job:", await response.text());
return;
}
const { jobId, eventsUrl } = await response.json();
console.log(`Job submitted with ID: ${jobId}. Listening for events at ${eventsUrl}`);
const eventSource = new EventSource(eventsUrl);
eventSource.addEventListener('status', (event) => {
console.log(`[STATUS] ${event.data}`);
});
eventSource.addEventListener('error', (event) => {
console.error(`[ERROR] ${event.data}`);
eventSource.close(); // Close connection on fatal error
});
eventSource.addEventListener('done', (event) => {
console.log(`[DONE] Formatting complete.`);
const [message, ...formattedCodeLines] = event.data.split('\n');
const formattedCode = formattedCodeLines.join('\n');
console.log("Formatted Code:\n", formattedCode);
eventSource.close(); // Job is done, close the connection
});
eventSource.onerror = (err) => {
console.error("EventSource failed:", err);
eventSource.close();
};
}
// Example usage:
const myCode = `const hello = 'world'; console.log(hello)`;
formatCode(myCode, 'babel');
局限性与未来迭代路径
这套实现方案作为一个概念验证是完整且健壮的,但在生产环境中还存在一些局限性:
- 任务状态持久化:
FormattingJobManager使用内存中的ConcurrentHashMap来存储任务状态。如果服务重启,所有正在进行中的任务信息都会丢失。在生产环境中,任务队列和事件流应该由 Redis Streams 或 Kafka 等中间件来支持,以确保任务的持久性和服务的无状态化。 - 水平扩展: 当前设计是单节点的。如果要水平扩展服务实例,客户端的 SSE 连接可能会路由到没有执行该任务的实例上。使用 Redis Pub/Sub 来广播事件可以在多个实例间同步状态,从而解决这个问题。
- 资源管理: GraalVM
Context的创建和销毁是有成本的。对于高并发场景,可以考虑实现一个Context池来复用资源。同时,需要对并发执行的格式化任务数量进行限制,以防止耗尽 CPU 和内存资源。 - 安全性: 直接暴露一个可以执行任意代码的服务存在安全风险。尽管我们已经禁用了IO权限,但在生产环境中需要更严格的沙箱环境配置,并对输入代码的大小和复杂性进行限制。