Intermediate
Group 11: Advanced Event Bus Patterns
Example 28: Event Bus with Codecs
By default, the event bus serializes messages as JSON. Registering a custom codec enables passing Java objects directly without serialization overhead when communicating within the same JVM.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
public class EventBusCodecDemo extends AbstractVerticle {
// Domain object to pass on event bus
record UserEvent(String userId, String action, long timestamp) {}
// => Java 16+ record: immutable data class with auto-generated equals/hashCode/toString
// Custom codec for UserEvent
static class UserEventCodec implements MessageCodec<UserEvent, UserEvent> {
@Override
public void encodeToWire(Buffer buffer, UserEvent event) {
// => Called when message crosses cluster boundary (network)
byte[] userId = event.userId().getBytes();
// => Serialize userId bytes
buffer.appendInt(userId.length);
// => Write length prefix for variable-length fields
buffer.appendBytes(userId);
// => Write userId bytes
byte[] action = event.action().getBytes();
buffer.appendInt(action.length);
buffer.appendBytes(action);
buffer.appendLong(event.timestamp());
// => Serialize fixed-length long directly
}
@Override
public UserEvent decodeFromWire(int pos, Buffer buffer) {
// => Called when message arrives from network; deserialize from buffer
int idLen = buffer.getInt(pos);
// => Read length-prefixed userId
pos += 4;
// => Advance position past the int
String userId = buffer.getString(pos, pos + idLen);
pos += idLen;
int actionLen = buffer.getInt(pos);
pos += 4;
String action = buffer.getString(pos, pos + actionLen);
pos += actionLen;
long timestamp = buffer.getLong(pos);
return new UserEvent(userId, action, timestamp);
// => Reconstruct domain object from wire bytes
}
@Override
public UserEvent transform(UserEvent event) {
return event;
// => Called for LOCAL delivery (same JVM); return event as-is
// => No serialization needed for local messages
}
@Override public String name() { return "UserEventCodec"; }
// => Unique codec name; used to look up codec by name
@Override public byte systemCodecID() { return -1; }
// => -1 means user-defined codec (not a built-in codec)
}
@Override
public void start(Promise<Void> startPromise) {
vertx.eventBus().registerCodec(new UserEventCodec());
// => Register codec globally; all verticles in this Vertx instance can use it
vertx.eventBus().consumer("user.events",
(io.vertx.core.eventbus.Message<UserEvent> msg) -> {
UserEvent event = msg.body();
// => Received as UserEvent object, not JSON string (local JVM delivery)
System.out.println("Received event: " + event.action()
+ " from " + event.userId());
// => Output: Received event: LOGIN from user-42
});
UserEvent event = new UserEvent("user-42", "LOGIN", System.currentTimeMillis());
vertx.eventBus()
.send("user.events", event, new io.vertx.core.eventbus.DeliveryOptions()
.setCodecName("UserEventCodec"));
// => Specify codec by name when sending; Vert.x uses it for encoding/decoding
startPromise.complete();
}
public static void main(String[] args) {
Vertx.vertx().deployVerticle(new EventBusCodecDemo());
}
}Key Takeaway: Register custom MessageCodec implementations to pass Java objects on the event bus without JSON serialization. Specify the codec in DeliveryOptions.setCodecName().
Why It Matters: Custom codecs eliminate the double-serialization overhead of converting domain objects to JSON and back for every intra-JVM message. For high-throughput event-driven systems where thousands of messages per second flow between verticles, this can reduce CPU load by 10-30%. Custom codecs also preserve type safety—the consumer receives the exact domain type, eliminating JSON schema mismatch bugs that only surface at runtime.
Example 29: Event Bus Headers and Message Interceptors
Event bus messages carry headers for metadata (correlation IDs, auth tokens, priority). Message interceptors allow cross-cutting concerns like authentication and audit logging at the bus level.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
public class EventBusHeadersDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
EventBus eb = vertx.eventBus();
// Outbound interceptor: add headers to all outgoing messages
eb.addOutboundInterceptor(ctx -> {
ctx.message().headers().set("x-correlation-id",
java.util.UUID.randomUUID().toString());
// => Automatically inject correlation ID into every outgoing message
// => Enables distributed tracing across verticle boundaries
ctx.next();
// => MUST call next() to continue the interceptor chain
});
// Inbound interceptor: log all incoming messages
eb.addInboundInterceptor(ctx -> {
String correlationId = ctx.message().headers().get("x-correlation-id");
System.out.println("Message received. CorrelationID: " + correlationId
+ " Address: " + ctx.message().address());
// => Output: Message received. CorrelationID: a1b2c3d4-... Address: order.process
ctx.next();
// => Pass to next interceptor or consumer
});
// Consumer that reads custom headers
eb.consumer("order.process", (Message<String> msg) -> {
String priority = msg.headers().get("priority");
// => Read custom header from sender
String correlationId = msg.headers().get("x-correlation-id");
// => Correlation ID injected by outbound interceptor
System.out.println("Processing order (priority=" + priority
+ ", correlation=" + correlationId + "): " + msg.body());
// => Output: Processing order (priority=HIGH, correlation=a1b2c3d4-...): ORD-999
msg.reply("OK");
});
// Send with custom headers
DeliveryOptions options = new DeliveryOptions()
.addHeader("priority", "HIGH")
// => Custom headers for routing decisions or metadata
.addHeader("source", "checkout-service")
.setSendTimeout(5000);
// => Custom reply timeout (5 seconds); default is 30 seconds
eb.request("order.process", "ORD-999", options)
.onSuccess(reply -> System.out.println("Reply: " + reply.body()));
// => Output: Reply: OK
startPromise.complete();
}
public static void main(String[] args) {
Vertx.vertx().deployVerticle(new EventBusHeadersDemo());
}
}Key Takeaway: Event bus headers carry metadata like correlation IDs. Message interceptors inject cross-cutting behavior (logging, auth, tracing) at the bus layer without modifying consumers.
Why It Matters: Message interceptors enforce policies like authentication and audit logging at the infrastructure layer, preventing individual verticle developers from accidentally omitting these concerns. Correlation IDs thread through all bus messages enable distributed tracing across the entire application, making it possible to reconstruct the execution path of a business transaction that spans multiple verticles and external systems.
Group 12: Advanced Web Router
Example 30: Sub-Routers and Modular Route Organization
Sub-routers split a large router into domain-specific modules, each managing its own routes and middleware. The main router mounts sub-routers at path prefixes.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
public class SubRouterDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router mainRouter = Router.router(vertx);
// Mount sub-routers at prefixes
mainRouter.route("/api/users/*").subRouter(createUserRouter());
// => All /api/users/* routes handled by user sub-router
mainRouter.route("/api/orders/*").subRouter(createOrderRouter());
// => All /api/orders/* routes handled by order sub-router
// Root-level routes still on main router
mainRouter.get("/health").handler(ctx -> ctx.response().end("OK"));
mainRouter.get("/").handler(ctx -> ctx.json(
new io.vertx.core.json.JsonObject().put("service", "api")));
vertx.createHttpServer()
.requestHandler(mainRouter)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private Router createUserRouter() {
Router userRouter = Router.router(vertx);
// => Sub-router has its own handler chain
userRouter.route().handler(BodyHandler.create());
// => BodyHandler on sub-router applies only to user routes
userRouter.route().handler(this::auditLog);
// => Middleware on sub-router applies only to /api/users/* routes
userRouter.get("/").handler(ctx -> {
// => Mounted at /api/users/* → GET /api/users/ maps here
ctx.json(new io.vertx.core.json.JsonArray()
.add(new io.vertx.core.json.JsonObject().put("id", 1).put("name", "Alice")));
});
userRouter.get("/:id").handler(ctx -> {
// => GET /api/users/42 → id = "42"
String id = ctx.pathParam("id");
ctx.json(new io.vertx.core.json.JsonObject().put("id", id));
});
userRouter.post("/").handler(ctx -> {
// => POST /api/users/
io.vertx.core.json.JsonObject body = ctx.body().asJsonObject();
ctx.response().setStatusCode(201).json(body);
});
return userRouter;
}
private Router createOrderRouter() {
Router orderRouter = Router.router(vertx);
orderRouter.route().handler(BodyHandler.create());
orderRouter.get("/").handler(ctx -> {
// => GET /api/orders/
ctx.json(new io.vertx.core.json.JsonArray());
});
orderRouter.get("/:id").handler(ctx -> {
// => GET /api/orders/ORD-1
ctx.json(new io.vertx.core.json.JsonObject()
.put("orderId", ctx.pathParam("id"))
.put("status", "PENDING"));
});
return orderRouter;
}
private void auditLog(RoutingContext ctx) {
System.out.println("[AUDIT] " + ctx.request().method()
+ " " + ctx.request().path());
// => Output: [AUDIT] GET /api/users/
ctx.next();
// => Pass to next handler in the chain
}
}Key Takeaway: Use mainRouter.route(prefix).subRouter(sub) to organize routes by domain. Each sub-router has its own middleware chain that applies only to its mounted prefix.
Why It Matters: Sub-routers enable team-based development where different teams own different API domains without coordinating on a single router file. Domain-specific middleware (e.g., admin-only auth on /api/admin/*) applies automatically to all routes in that sub-router, eliminating the risk of forgetting to add middleware to new routes. This structure also makes integration testing focused—each sub-router can be tested in isolation.
Example 31: Handler Chains and Middleware
Handler chains execute multiple handlers sequentially for a single route. Each handler calls ctx.next() to pass control to the next handler, enabling layered behavior like authentication, validation, and business logic.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
public class HandlerChainDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// Route with a chain of handlers
router.post("/transfer")
.handler(this::authenticate)
// => Step 1: verify caller identity
.handler(this::validateTransfer)
// => Step 2: validate request data
.handler(this::checkBalance)
// => Step 3: check business rules
.handler(this::executeTransfer);
// => Step 4: perform the transfer
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private void authenticate(RoutingContext ctx) {
String token = ctx.request().getHeader("Authorization");
// => Read Bearer token from Authorization header
if (token == null || !token.startsWith("Bearer ")) {
ctx.fail(401);
// => Short-circuit: stop chain and trigger 401 handler
return;
// => Return prevents calling ctx.next()
}
ctx.put("userId", "user-42");
// => ctx.put() stores data in the per-request context map
// => Later handlers retrieve it with ctx.get("userId")
ctx.next();
// => Authenticated: proceed to next handler
}
private void validateTransfer(RoutingContext ctx) {
JsonObject body = ctx.body().asJsonObject();
if (body == null || !body.containsKey("amount") || !body.containsKey("to")) {
ctx.fail(400);
// => Invalid request: stop chain
return;
}
double amount = body.getDouble("amount");
if (amount <= 0) {
ctx.response().setStatusCode(400)
.end("{\"error\":\"Amount must be positive\"}");
return;
// => Respond directly without using ctx.fail() for custom error bodies
}
ctx.put("amount", amount);
ctx.put("toAccount", body.getString("to"));
// => Store validated values for downstream handlers
ctx.next();
}
private void checkBalance(RoutingContext ctx) {
double amount = ctx.get("amount");
// => Retrieve validated amount set by validateTransfer
double currentBalance = 500.0;
// => In production: fetch from database
if (amount > currentBalance) {
ctx.response().setStatusCode(422)
// => 422 Unprocessable Entity: valid format but business rule violation
.end("{\"error\":\"Insufficient funds\"}");
return;
}
ctx.next();
}
private void executeTransfer(RoutingContext ctx) {
String userId = ctx.get("userId");
// => All validated data available from context
double amount = ctx.get("amount");
String toAccount = ctx.get("toAccount");
System.out.println("Transfer: " + userId + " sends $" + amount + " to " + toAccount);
// => Output: Transfer: user-42 sends $100.0 to acc-99
ctx.response().setStatusCode(200)
.json(new JsonObject().put("status", "completed").put("amount", amount));
}
}Key Takeaway: Chain multiple handlers on a route for layered behavior. Use ctx.put()/ctx.get() to pass validated data between handlers, and ctx.next() to proceed or ctx.fail() to short-circuit.
Why It Matters: Handler chains implement separation of concerns at the request-handling level—authentication, validation, authorization, and business logic each live in focused, independently testable functions. Short-circuiting with ctx.fail() ensures that later handlers only execute when all preconditions are satisfied, preventing partial execution bugs. Sharing data via ctx.put()/ctx.get() avoids parsing the request body multiple times.
Group 13: Authentication
Example 32: JWT Authentication
JSON Web Tokens provide stateless authentication. Vert.x Web’s JWTAuthHandler validates tokens and populates the routing context with the authenticated user.
graph LR
A["Client POST /auth/login"] --> B["JWTAuth.generateToken#40;#41;"]
B --> C["Returns JWT Token"]
C --> D["Client GET /api/resource<br/>Authorization: Bearer token"]
D --> E["JWTAuthHandler validates"]
E -->|"valid"| F["Handler runs"]
E -->|"invalid"| G["401 Unauthorized"]
style A fill:#0173B2,stroke:#000,color:#fff
style B fill:#DE8F05,stroke:#000,color:#fff
style C fill:#029E73,stroke:#000,color:#fff
style E fill:#CC78BC,stroke:#000,color:#fff
style F fill:#029E73,stroke:#000,color:#fff
style G fill:#CA9161,stroke:#000,color:#fff
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.JWTOptions;
import io.vertx.ext.auth.jwt.JWTAuth;
import io.vertx.ext.auth.jwt.JWTAuthOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.JWTAuthHandler;
public class JwtAuthVerticle extends AbstractVerticle {
private JWTAuth jwtAuth;
// => JWT provider: generates and validates tokens
@Override
public void start(Promise<Void> startPromise) {
jwtAuth = JWTAuth.create(vertx, new JWTAuthOptions()
.addJwkStore(new JsonObject()
.put("type", "secret")
.put("path", "keystore.jceks")
// => Java keystore file; create with: keytool -genseckey ...
.put("password", System.getenv("KEYSTORE_PASS"))));
// => Load password from environment; never hardcode secrets
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// Public endpoint: login
router.post("/auth/login").handler(this::login);
// Protected routes: require valid JWT
router.route("/api/*")
.handler(JWTAuthHandler.create(jwtAuth));
// => Validates Bearer token in Authorization header
// => Populates ctx.user() on success; returns 401 on failure
router.get("/api/profile").handler(this::getProfile);
router.get("/api/data").handler(ctx -> {
ctx.json(new JsonObject().put("data", "secret-stuff"));
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private void login(RoutingContext ctx) {
JsonObject body = ctx.body().asJsonObject();
String username = body.getString("username");
String password = body.getString("password");
// => In production: validate against DB with bcrypt comparison
if (!"admin".equals(username) || !"secret".equals(password)) {
ctx.response().setStatusCode(401)
.end("{\"error\":\"Invalid credentials\"}");
return;
}
// Generate JWT token with claims
String token = jwtAuth.generateToken(
new JsonObject()
.put("sub", username)
// => "sub" (subject): identifies the principal
.put("role", "admin")
// => Custom claim: role for authorization decisions
.put("email", username + "@example.com"),
new JWTOptions()
.setExpiresInSeconds(3600)
// => Token expires in 1 hour; client must re-login after expiry
.setAlgorithm("HS256"));
// => HMAC-SHA256 signature algorithm
ctx.json(new JsonObject()
.put("token", token)
// => Return token to client; client sends it as "Authorization: Bearer <token>"
.put("expiresIn", 3600));
}
private void getProfile(RoutingContext ctx) {
// JWTAuthHandler already validated the token and set ctx.user()
io.vertx.ext.auth.User user = ctx.user();
// => ctx.user() is the authenticated principal
user.principal().getString("sub");
// => Read JWT claim; "sub" = username set during token generation
String role = user.principal().getString("role");
// => role = "admin"
ctx.json(new JsonObject()
.put("username", user.principal().getString("sub"))
.put("role", role));
// => Output: {"username":"admin","role":"admin"}
}
}Key Takeaway: Use JWTAuthHandler to protect routes. Tokens are validated automatically; claims are available via ctx.user().principal() in protected handlers.
Why It Matters: Stateless JWT authentication scales horizontally without shared session storage—any server instance can validate any token using the signing key. The 1-hour expiry limits the window of a stolen token’s usability. Centralizing token validation in JWTAuthHandler means individual route handlers focus on business logic rather than security concerns, reducing the risk of accidentally skipping authentication on a protected route.
Example 33: Role-Based Authorization
After authentication, authorization checks whether the authenticated user has permission to perform the requested operation. Vert.x provides authorization providers that work with user principals.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
import io.vertx.ext.auth.authorization.RoleBasedAuthorization;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
public class AuthorizationDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// Simulate authenticated user context setup
router.route().handler(ctx -> {
// In production: JWTAuthHandler populates ctx.user()
// Here we simulate it for the authorization demo
JsonObject principal = new JsonObject()
.put("sub", "alice")
.put("roles", new io.vertx.core.json.JsonArray().add("user").add("editor"));
// => Roles embedded in JWT or fetched from DB
io.vertx.ext.auth.impl.UserImpl user =
new io.vertx.ext.auth.impl.UserImpl(principal, new JsonObject());
ctx.setUser(user);
// => Set the user principal on the context
ctx.next();
});
router.get("/articles").handler(ctx -> {
// => Anyone can read articles (no authorization check)
ctx.json(new io.vertx.core.json.JsonArray()
.add(new JsonObject().put("id", 1).put("title", "Vert.x Guide")));
});
router.post("/articles").handler(this::requireRole("editor"))
.handler(ctx -> {
// => Only "editor" role reaches this handler
JsonObject body = ctx.body().asJsonObject();
ctx.response().setStatusCode(201)
.json(new JsonObject().put("created", true).put("title", body.getString("title")));
});
router.delete("/articles/:id").handler(this::requireRole("admin"))
.handler(ctx -> {
// => Only "admin" role can delete
ctx.response().setStatusCode(204).end();
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private io.vertx.core.Handler<RoutingContext> requireRole(String requiredRole) {
return ctx -> {
User user = ctx.user();
if (user == null) {
ctx.fail(401);
// => Not authenticated
return;
}
io.vertx.core.json.JsonArray roles = user.principal()
.getJsonArray("roles", new io.vertx.core.json.JsonArray());
// => Get roles array from JWT principal
boolean hasRole = roles.stream()
.anyMatch(r -> requiredRole.equals(r.toString()));
// => Check if required role is in user's role list
if (!hasRole) {
ctx.fail(403);
// => Authenticated but not authorized
System.out.println("Access denied: user lacks role " + requiredRole);
// => Output: Access denied: user lacks role admin
return;
}
ctx.next();
// => Authorized: proceed to business logic handler
};
}
}Key Takeaway: Implement authorization as a handler factory that returns a Handler<RoutingContext>. Return 401 for unauthenticated users and 403 for authenticated but unauthorized users.
Why It Matters: Separating authentication (who are you?) from authorization (what can you do?) keeps each concern focused and testable. The requireRole() factory pattern makes authorization declarations self-documenting in the route definition and ensures the check runs before business logic for every protected route. The 401/403 distinction helps API clients implement correct retry logic—401 prompts re-login while 403 signals a permanent permission denial.
Example 34: CORS Configuration
Cross-Origin Resource Sharing (CORS) controls which web origins can make API calls. Vert.x Web’s CorsHandler configures allowed origins, methods, and headers.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.CorsHandler;
import java.util.Set;
public class CorsVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
// CORS handler must be BEFORE all other route handlers
router.route().handler(CorsHandler.create()
.addOrigin("https://myapp.example.com")
// => Allow requests from this specific origin
// => Rejects CORS requests from all other origins (403)
.addOrigin("http://localhost:3000")
// => Allow local dev frontend
.allowedMethod(io.vertx.core.http.HttpMethod.GET)
.allowedMethod(io.vertx.core.http.HttpMethod.POST)
.allowedMethod(io.vertx.core.http.HttpMethod.PUT)
.allowedMethod(io.vertx.core.http.HttpMethod.DELETE)
.allowedMethod(io.vertx.core.http.HttpMethod.OPTIONS)
// => OPTIONS is required for preflight requests
.allowedHeaders(Set.of(
"Authorization",
// => Allow JWT token header
"Content-Type",
// => Allow JSON content type header
"X-Request-Id"
// => Custom request ID header
))
.allowCredentials(true)
// => Allow cookies and Authorization header
// => Required when frontend sends credentials
.maxAgeSeconds(86400));
// => Cache preflight response for 24 hours (86400 seconds)
// => Reduces OPTIONS preflight requests for frequent APIs
router.get("/api/data").handler(ctx -> {
ctx.response()
.putHeader("Content-Type", "application/json")
.end("{\"data\":\"accessible from allowed origins\"}");
// => Response includes CORS headers automatically:
// => Access-Control-Allow-Origin: https://myapp.example.com
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
}Key Takeaway: Add CorsHandler before all route handlers. Allow credentials only when necessary, and cache preflight responses with maxAgeSeconds to reduce OPTIONS requests.
Why It Matters: Misconfigured CORS is a common security vulnerability. Using a wildcard origin (*) with allowCredentials(true) creates a CSRF attack vector. Explicitly listing allowed origins ensures only your frontend applications can make credentialed API calls. Caching preflight responses for 24 hours eliminates the performance overhead of OPTIONS preflight requests on every cross-origin API call, which is significant for high-frequency SPAs.
Group 14: File Upload and WebSocket
Example 35: File Upload Handling
Vert.x Web handles multipart file uploads through BodyHandler. Uploaded files are temporarily stored and accessible as FileUpload objects on the routing context.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.FileUpload;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import java.nio.file.Path;
public class FileUploadVerticle extends AbstractVerticle {
private static final String UPLOAD_DIR = "/tmp/uploads";
// => Directory where uploaded files are temporarily stored
@Override
public void start(Promise<Void> startPromise) {
// Ensure upload directory exists
vertx.fileSystem().mkdirs(UPLOAD_DIR)
.onSuccess(v -> {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()
.setUploadsDirectory(UPLOAD_DIR)
// => BodyHandler stores uploads here temporarily
.setBodyLimit(10 * 1024 * 1024));
// => 10MB max upload size; returns 413 if exceeded
router.post("/upload").handler(this::handleUpload);
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
});
}
private void handleUpload(RoutingContext ctx) {
java.util.List<FileUpload> uploads = ctx.fileUploads();
// => List of uploaded files from multipart/form-data request
// => Populated by BodyHandler; empty if no files uploaded
if (uploads.isEmpty()) {
ctx.response().setStatusCode(400)
.end("{\"error\":\"No files uploaded\"}");
return;
}
FileUpload file = uploads.get(0);
// => First uploaded file
String filename = file.fileName();
// => Original filename from client (e.g., "photo.jpg")
// => NEVER use this directly as filesystem path (path traversal attack)
String contentType = file.contentType();
// => MIME type from client: "image/jpeg", "application/pdf", etc.
long size = file.size();
// => File size in bytes
System.out.println("Upload: " + filename + " (" + contentType + ", " + size + " bytes)");
// => Output: Upload: photo.jpg (image/jpeg, 245678 bytes)
// Validate file type
if (!contentType.startsWith("image/")) {
ctx.response().setStatusCode(415)
// => 415 Unsupported Media Type
.end("{\"error\":\"Only images allowed\"}");
vertx.fileSystem().delete(file.uploadedFileName(), null);
// => Delete temporary file for rejected uploads
return;
}
// Move to permanent storage with a safe filename
String safeFilename = java.util.UUID.randomUUID() + "-"
+ Path.of(filename).getFileName().toString();
// => UUID prefix prevents filename collisions
// => Path.of().getFileName() strips directory components
String dest = UPLOAD_DIR + "/" + safeFilename;
vertx.fileSystem().move(file.uploadedFileName(), dest)
// => Async file move; non-blocking
.onSuccess(v -> {
ctx.response().setStatusCode(201)
.putHeader("Content-Type", "application/json")
.end("{\"filename\":\"" + safeFilename
+ "\",\"size\":" + size + "}");
// => Output: {"filename":"a1b2c3d4-photo.jpg","size":245678}
})
.onFailure(err -> ctx.fail(500, err));
}
}Key Takeaway: Use BodyHandler with setUploadsDirectory() for file uploads. Always validate MIME type server-side and use UUID-prefixed filenames to prevent collisions and path traversal.
Why It Matters: File uploads are a major attack surface. Client-provided filenames can contain path traversal sequences (../../etc/passwd) that overwrite sensitive files. MIME type validation server-side prevents executable files disguised as images. UUID-based storage names prevent enumeration attacks where attackers guess filenames of other users’ uploads. Setting a body size limit prevents memory exhaustion from maliciously large uploads.
Example 36: WebSocket Server
Vert.x HTTP servers support WebSocket upgrades. Upgraded connections remain open for bidirectional real-time communication without polling.
graph LR
A["Client"] -->|"HTTP Upgrade"| B["Vert.x HTTP Server"]
B -->|"101 Switching Protocols"| A
A <-->|"WebSocket Frames"| B
B --> C["WebSocket Handler"]
C --> D["Broadcast via Event Bus"]
D --> E["Other Clients"]
style A fill:#0173B2,stroke:#000,color:#fff
style B fill:#DE8F05,stroke:#000,color:#fff
style C fill:#029E73,stroke:#000,color:#fff
style D fill:#CC78BC,stroke:#000,color:#fff
style E fill:#CA9161,stroke:#000,color:#fff
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.ext.web.Router;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class WebSocketVerticle extends AbstractVerticle {
private final Map<String, ServerWebSocket> clients = new ConcurrentHashMap<>();
// => Track connected clients by ID
// => ConcurrentHashMap for thread-safe access from multiple event loops
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
router.get("/health").handler(ctx -> ctx.response().end("OK"));
HttpServer server = vertx.createHttpServer();
// WebSocket upgrade handler
server.webSocketHandler(ws -> {
String clientId = java.util.UUID.randomUUID().toString().substring(0, 8);
// => Unique ID for this connection
System.out.println("Client connected: " + clientId + " at " + ws.path());
// => Output: Client connected: a1b2c3d4 at /chat
// => ws.path() is the URL path used during upgrade request
clients.put(clientId, ws);
// => Register client for broadcasting
ws.textMessageHandler(msg -> {
// => Fires when client sends a text WebSocket frame
System.out.println("Message from " + clientId + ": " + msg);
// => Output: Message from a1b2c3d4: {"text":"Hello, World!"}
broadcast(clientId, msg);
// => Echo message to all OTHER clients
});
ws.closeHandler(v -> {
// => Fires when client disconnects (gracefully or network drop)
clients.remove(clientId);
System.out.println("Client disconnected: " + clientId);
// => Output: Client disconnected: a1b2c3d4
broadcast(clientId, clientId + " left the chat");
// => Notify remaining clients
});
ws.exceptionHandler(err -> {
// => Fires on WebSocket errors (corrupt frame, timeout, etc.)
System.err.println("WS error for " + clientId + ": " + err);
clients.remove(clientId);
});
ws.writeTextMessage("Welcome! Your ID: " + clientId);
// => Send greeting to newly connected client
});
server.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private void broadcast(String senderId, String message) {
clients.forEach((id, ws) -> {
if (!id.equals(senderId)) {
// => Don't echo back to sender
ws.writeTextMessage(message)
.onFailure(err -> {
System.err.println("Failed to send to " + id + ": " + err);
clients.remove(id);
// => Remove client if write fails (connection likely dead)
});
}
});
}
}Key Takeaway: Register a webSocketHandler on the HTTP server for WebSocket connections. Use ws.textMessageHandler() for incoming frames and ws.writeTextMessage() to send.
Why It Matters: WebSockets eliminate polling overhead for real-time features—a single persistent connection replaces hundreds of HTTP requests per minute for live updates. Vert.x handles thousands of simultaneous WebSocket connections on the same event loop threads that handle HTTP, with no additional thread overhead. Tracking connections in a map enables targeted messaging and broadcast patterns that power chat, live dashboards, collaborative tools, and gaming applications.
Group 15: Database with Vert.x SQL Client
Example 37: Reactive PostgreSQL Client - Connection Pool
The Vert.x reactive PostgreSQL client provides fully non-blocking database access. Queries return Future<RowSet>, enabling async composition without blocking the event loop.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.pgclient.PgBuilder;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
public class PgClientVerticle extends AbstractVerticle {
private Pool pool;
// => Connection pool; create once, reuse across requests
@Override
public void start(Promise<Void> startPromise) {
PgConnectOptions connectOptions = new PgConnectOptions()
.setHost(System.getenv().getOrDefault("DB_HOST", "localhost"))
// => Read from environment; default to localhost for development
.setPort(5432)
.setDatabase("myapp")
.setUser(System.getenv().getOrDefault("DB_USER", "postgres"))
.setPassword(System.getenv("DB_PASSWORD"));
// => NEVER hardcode passwords; use environment variables
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5)
// => Maximum 5 concurrent connections in the pool
// => Tune based on PostgreSQL max_connections and expected load
.setMaxWaitQueueSize(100);
// => Queue up to 100 pending requests when pool is exhausted
// => Returns error if queue is full (prevents memory buildup)
pool = PgBuilder.pool()
.using(vertx)
.with(poolOptions)
.connectingTo(connectOptions)
.build();
// => Creates the pool; connections are lazy (established on first query)
// Verify DB connectivity at startup
pool.getConnection()
.onSuccess(conn -> {
System.out.println("Database connected successfully");
// => Output: Database connected successfully
conn.close();
// => Return connection to pool; MUST close after use
startPromise.complete();
})
.onFailure(err -> {
System.err.println("Database connection failed: " + err);
startPromise.fail(err);
// => Abort startup if DB is unreachable
});
}
@Override
public void stop(Promise<Void> stopPromise) {
pool.close()
// => Close all pooled connections on shutdown
.onComplete(ar -> stopPromise.complete());
}
}Key Takeaway: Create one Pool per Vert.x instance and reuse it. Always close connections after use with conn.close() to return them to the pool.
Why It Matters: Connection pooling is essential for database-backed services. Creating a new connection per request typically takes 20-50ms and overloads the database server. A pool reuses connections, reducing connection overhead to near-zero. The setMaxWaitQueueSize limit prevents memory exhaustion during traffic spikes when all connections are busy. Failing fast at startup on DB connectivity issues surfaces configuration problems immediately rather than hours after deployment.
Example 38: Parameterized Queries and Row Mapping
Parameterized queries prevent SQL injection. The reactive client returns RowSet<Row> that you map to domain objects or JSON directly.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.pgclient.PgBuilder;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.Tuple;
public class QueryVerticle extends AbstractVerticle {
private Pool pool;
@Override
public void start(Promise<Void> startPromise) {
pool = PgBuilder.pool()
.using(vertx)
.with(new PoolOptions().setMaxSize(5))
.connectingTo(new PgConnectOptions()
.setHost("localhost").setDatabase("myapp")
.setUser("postgres").setPassword("secret"))
.build();
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.get("/users").handler(this::listUsers);
router.get("/users/:id").handler(this::getUser);
router.post("/users").handler(this::createUser);
router.delete("/users/:id").handler(this::deleteUser);
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private void listUsers(io.vertx.ext.web.RoutingContext ctx) {
pool.query("SELECT id, name, email FROM users ORDER BY id")
// => Simple query with no parameters
.execute()
// => Returns Future<RowSet<Row>>
.onSuccess(rows -> {
JsonArray result = new JsonArray();
for (Row row : rows) {
// => Iterate RowSet; each Row represents one DB row
result.add(new JsonObject()
.put("id", row.getInteger("id"))
// => Access column by name; type-safe
.put("name", row.getString("name"))
.put("email", row.getString("email")));
}
ctx.json(result);
// => Output: [{"id":1,"name":"Alice","email":"alice@example.com"}]
})
.onFailure(err -> ctx.fail(500, err));
}
private void getUser(io.vertx.ext.web.RoutingContext ctx) {
int id = Integer.parseInt(ctx.pathParam("id"));
// => Parse path param to int for type-safe query
pool.preparedQuery("SELECT id, name, email FROM users WHERE id = $1")
// => $1 is the PostgreSQL positional parameter placeholder
// => NEVER concatenate user input into SQL strings (SQL injection risk)
.execute(Tuple.of(id))
// => Tuple.of(value) provides parameter values in order
.onSuccess(rows -> {
if (rows.rowCount() == 0) {
// => rowCount() returns number of rows in the result set
ctx.fail(404);
return;
}
Row row = rows.iterator().next();
// => First row from result set
ctx.json(new JsonObject()
.put("id", row.getInteger("id"))
.put("name", row.getString("name"))
.put("email", row.getString("email")));
})
.onFailure(err -> ctx.fail(500, err));
}
private void createUser(io.vertx.ext.web.RoutingContext ctx) {
JsonObject body = ctx.body().asJsonObject();
pool.preparedQuery(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id, name, email")
// => RETURNING clause returns the inserted row
.execute(Tuple.of(body.getString("name"), body.getString("email")))
.onSuccess(rows -> {
Row row = rows.iterator().next();
// => RETURNING gives us back the generated id
ctx.response().setStatusCode(201)
.json(new JsonObject()
.put("id", row.getInteger("id"))
.put("name", row.getString("name"))
.put("email", row.getString("email")));
// => Output: {"id":42,"name":"Alice","email":"alice@example.com"}
})
.onFailure(err -> ctx.fail(500, err));
}
private void deleteUser(io.vertx.ext.web.RoutingContext ctx) {
int id = Integer.parseInt(ctx.pathParam("id"));
pool.preparedQuery("DELETE FROM users WHERE id = $1")
.execute(Tuple.of(id))
.onSuccess(rows -> {
if (rows.rowCount() == 0) {
ctx.fail(404);
return;
}
ctx.response().setStatusCode(204).end();
// => 204 No Content: delete successful, no body
})
.onFailure(err -> ctx.fail(500, err));
}
}Key Takeaway: Always use preparedQuery() with Tuple parameters. Never concatenate user input into SQL strings. Use rows.rowCount() to detect missing records.
Why It Matters: Parameterized queries are the single most effective defense against SQL injection attacks, one of the top OWASP vulnerabilities. The reactive SQL client returns futures that compose seamlessly with other Vert.x async operations, enabling database queries inside HTTP handlers without blocking the event loop. The RETURNING clause on INSERT eliminates the need for a separate SELECT query to retrieve generated IDs, halving the number of database round-trips for create operations.
Example 39: Database Transactions
Transactions group multiple SQL operations into an atomic unit. Either all operations succeed or all are rolled back, maintaining database consistency.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.pgclient.PgBuilder;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Transaction;
import io.vertx.sqlclient.Tuple;
public class TransactionVerticle extends AbstractVerticle {
private Pool pool;
@Override
public void start(Promise<Void> startPromise) {
pool = PgBuilder.pool()
.using(vertx)
.with(new PoolOptions().setMaxSize(5))
.connectingTo(new PgConnectOptions()
.setHost("localhost").setDatabase("myapp")
.setUser("postgres").setPassword("secret"))
.build();
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.post("/transfer").handler(this::transfer);
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private void transfer(RoutingContext ctx) {
JsonObject body = ctx.body().asJsonObject();
int fromId = body.getInteger("from");
int toId = body.getInteger("to");
double amount = body.getDouble("amount");
// => Transfer $amount from account fromId to account toId
pool.getConnection()
// => Get a dedicated connection for the transaction
.compose(conn -> conn.begin()
// => conn.begin() starts a transaction; returns Transaction
.compose(tx -> {
// => Execute both updates within the same transaction
return conn.preparedQuery(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2")
.execute(Tuple.of(amount, fromId))
// => Debit source account
.compose(rows -> {
if (rows.rowCount() == 0) {
return tx.rollback()
// => Source account not found; rollback transaction
.compose(v -> io.vertx.core.Future.failedFuture(
new RuntimeException("Source account not found")));
}
return conn.preparedQuery(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.execute(Tuple.of(amount, toId));
// => Credit destination account
})
.compose(rows -> {
if (rows.rowCount() == 0) {
return tx.rollback()
.compose(v -> io.vertx.core.Future.failedFuture(
new RuntimeException("Destination account not found")));
}
return tx.commit();
// => Both updates succeeded; commit atomically
})
.onComplete(ar -> conn.close());
// => ALWAYS close connection; releases it back to pool
}))
.onSuccess(v -> ctx.response().setStatusCode(200)
.json(new JsonObject().put("status", "transferred").put("amount", amount)))
.onFailure(err -> {
System.err.println("Transfer failed: " + err.getMessage());
// => Either rollback happened or commit failed
ctx.response().setStatusCode(422)
.end("{\"error\":\"" + err.getMessage() + "\"}");
});
}
}Key Takeaway: Use conn.begin() to start a transaction. Call tx.commit() on success and tx.rollback() on any failure. Always close the connection in onComplete.
Why It Matters: Database transactions are the foundation of data integrity in financial and inventory systems. Without transactions, a network failure between the debit and credit updates leaves accounts in an inconsistent state. The reactive transaction API composes naturally with futures, making it possible to write safe, atomic operations without the callback nesting that makes traditional async transaction code error-prone. Closing the connection in onComplete (not just onSuccess) ensures connections return to the pool even on failures.
Group 16: Testing Vert.x Applications
Example 40: JUnit 5 with VertxExtension
VertxExtension integrates Vert.x with JUnit 5, providing async test support and automatic verticle deployment for integration tests.
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.assertj.core.api.Assertions.assertThat;
@ExtendWith(VertxExtension.class)
// => Registers VertxExtension; manages Vert.x lifecycle for tests
// => Provides Vertx and VertxTestContext injection into test methods
class UserApiTest {
private WebClient client;
// => HTTP client for making test requests
@BeforeEach
void setUp(Vertx vertx, VertxTestContext ctx) {
// => Vertx is injected by VertxExtension (fresh instance per test class)
// => VertxTestContext manages async test completion and failures
client = WebClient.create(vertx, new WebClientOptions()
.setDefaultHost("localhost")
.setDefaultPort(8080));
// => WebClient for making HTTP requests in tests
vertx.deployVerticle(new RouterVerticle())
// => Deploy the verticle under test
.onSuccess(id -> ctx.completeNow())
// => ctx.completeNow() signals test setup is done
.onFailure(ctx::failNow);
// => ctx.failNow(cause) fails the test immediately
}
@AfterEach
void tearDown(Vertx vertx, VertxTestContext ctx) {
vertx.close().onComplete(ar -> ctx.completeNow());
// => Clean up Vert.x after each test
}
@Test
void testGetUsers(Vertx vertx, VertxTestContext ctx) {
client.get("/users")
// => Make GET /users request
.send()
.onSuccess(response -> {
ctx.verify(() -> {
// => ctx.verify() wraps assertions; failures are reported correctly
assertThat(response.statusCode()).isEqualTo(200);
// => Verify HTTP 200
assertThat(response.bodyAsJsonArray()).isNotNull();
// => Verify JSON array body
});
ctx.completeNow();
// => Signal test passed
})
.onFailure(ctx::failNow);
}
@Test
void testCreateUser(Vertx vertx, VertxTestContext ctx) {
JsonObject payload = new JsonObject()
.put("name", "Alice")
.put("email", "alice@example.com");
client.post("/users")
.putHeader("Content-Type", "application/json")
.sendJsonObject(payload)
// => Send JSON body; sets Content-Type automatically
.onSuccess(response -> {
ctx.verify(() -> {
assertThat(response.statusCode()).isEqualTo(201);
// => Verify 201 Created
JsonObject body = response.bodyAsJsonObject();
assertThat(body.getString("name")).isEqualTo("Alice");
// => Verify response body fields
});
ctx.completeNow();
})
.onFailure(ctx::failNow);
}
}
// => RouterVerticle is the verticle created in Example 6 (simplified)
// => This test deploys the real verticle and makes actual HTTP callsKey Takeaway: Use @ExtendWith(VertxExtension.class) for JUnit 5 async test support. Wrap assertions in ctx.verify() and always call ctx.completeNow() or ctx.failNow().
Why It Matters: Async testing without proper framework support leads to tests that pass even when assertions fail—the test method returns before assertions execute. VertxTestContext solves this by holding the test open until completeNow() or failNow() is called, ensuring assertions execute before the test concludes. Testing with the real verticle deployed catches integration issues like missing BodyHandler or incorrect route ordering that unit tests with mocks would miss.
Example 41: Testing with WebClient and Mocking the Event Bus
Isolate unit tests by mocking event bus consumers, allowing you to test HTTP handlers without deploying real downstream verticles.
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(VertxExtension.class)
class OrderApiTest {
@BeforeEach
void setUp(Vertx vertx, VertxTestContext ctx) {
// Register a MOCK event bus consumer before deploying the HTTP verticle
vertx.eventBus().consumer("order.create", msg -> {
// => This mock consumer handles messages that would normally go to a real OrderVerticle
JsonObject order = (JsonObject) msg.body();
System.out.println("[Mock] Processing order: " + order.getString("item"));
// => Output: [Mock] Processing order: laptop
msg.reply(new JsonObject()
.put("orderId", "MOCK-001")
.put("status", "created"));
// => Mock response simulates successful order creation
});
// Deploy the HTTP verticle under test
vertx.deployVerticle(new OrderHttpVerticle())
.onSuccess(id -> ctx.completeNow())
.onFailure(ctx::failNow);
}
@Test
void testCreateOrderSuccess(Vertx vertx, VertxTestContext ctx) {
WebClient client = WebClient.create(vertx);
client.post(8080, "localhost", "/orders")
.putHeader("Content-Type", "application/json")
.sendJson(new JsonObject().put("item", "laptop").put("quantity", 1))
.onSuccess(response -> {
ctx.verify(() -> {
org.assertj.core.api.Assertions
.assertThat(response.statusCode()).isEqualTo(201);
// => HTTP 201 from OrderHttpVerticle
org.assertj.core.api.Assertions
.assertThat(response.bodyAsJsonObject().getString("orderId"))
.isEqualTo("MOCK-001");
// => Response comes from our mock consumer
});
ctx.completeNow();
})
.onFailure(ctx::failNow);
}
}
// Simplified OrderHttpVerticle for the test
class OrderHttpVerticle extends io.vertx.core.AbstractVerticle {
@Override
public void start(io.vertx.core.Promise<Void> sp) {
io.vertx.ext.web.Router r = io.vertx.ext.web.Router.router(vertx);
r.route().handler(io.vertx.ext.web.handler.BodyHandler.create());
r.post("/orders").handler(ctx -> {
vertx.eventBus()
.request("order.create", ctx.body().asJsonObject())
.onSuccess(reply -> ctx.response().setStatusCode(201).json(reply.body()))
.onFailure(err -> ctx.fail(500, err));
});
vertx.createHttpServer().requestHandler(r)
.listen(8080).onSuccess(s -> sp.complete()).onFailure(sp::fail);
}
}Key Takeaway: Register mock event bus consumers in @BeforeEach to isolate HTTP handler tests from downstream verticles. The mock consumer controls response behavior for different test scenarios.
Why It Matters: Event bus mocking enables testing HTTP handlers in isolation without requiring a full deployment of downstream services. This makes tests faster, more reliable (no flaky dependencies), and easier to set up for edge cases (e.g., timeout, error scenarios). The mock consumer approach is simpler than dependency injection frameworks and works naturally with Vert.x’s event-driven architecture.
Group 17: Resilience Patterns
Example 42: Circuit Breaker
The circuit breaker pattern prevents cascading failures by short-circuiting calls to failing services. Vert.x provides CircuitBreaker with configurable thresholds and fallback handlers.
graph TD
A["Request"] --> B{"Circuit State?"}
B -->|"CLOSED"| C["Execute Operation"]
C -->|"success"| D["Return Result"]
C -->|"failure"| E["Increment Failure Count"]
E -->|"threshold reached"| F["OPEN Circuit"]
B -->|"OPEN"| G["Execute Fallback"]
F -->|"after timeout"| H["HALF-OPEN"]
H -->|"success"| I["CLOSED Circuit"]
H -->|"failure"| F
style A fill:#0173B2,stroke:#000,color:#fff
style B fill:#DE8F05,stroke:#000,color:#fff
style D fill:#029E73,stroke:#000,color:#fff
style F fill:#CA9161,stroke:#000,color:#fff
style G fill:#CC78BC,stroke:#000,color:#fff
style H fill:#DE8F05,stroke:#000,color:#fff
import io.vertx.circuitbreaker.CircuitBreaker;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
public class CircuitBreakerDemo extends AbstractVerticle {
private CircuitBreaker breaker;
// => Circuit breaker for calls to external payment service
@Override
public void start(Promise<Void> startPromise) {
breaker = CircuitBreaker.create("payment-service", vertx,
new CircuitBreakerOptions()
.setMaxFailures(3)
// => Open circuit after 3 consecutive failures
.setTimeout(2000)
// => Each attempt times out after 2000ms
.setResetTimeout(10000)
// => After 10s in OPEN state, move to HALF-OPEN to try again
.setFallbackOnFailure(true));
// => Execute fallback when circuit is OPEN
breaker.openHandler(v ->
System.out.println("Circuit OPENED - payment service unavailable"));
// => Called when circuit transitions from CLOSED to OPEN
breaker.closeHandler(v ->
System.out.println("Circuit CLOSED - payment service recovered"));
// => Called when circuit transitions back to CLOSED
Router router = Router.router(vertx);
router.post("/payments").handler(ctx -> {
io.vertx.ext.web.handler.BodyHandler.create().handle(ctx);
JsonObject body = ctx.body().asJsonObject();
breaker.executeWithFallback(
this::callPaymentService,
// => Primary operation: call external payment service
throwable -> {
// => Fallback: executed when circuit is OPEN or operation fails
System.out.println("Fallback triggered: " + throwable.getMessage());
// => Output: Fallback triggered: Circuit breaker is OPEN
return new JsonObject()
.put("status", "PENDING")
.put("message", "Payment queued for retry");
// => Degraded response: accept the request and queue for later processing
})
.onSuccess(result -> ctx.json(result))
.onFailure(err -> ctx.fail(503, err));
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private Future<JsonObject> callPaymentService(Promise<JsonObject> promise) {
// => Simulate calling an external payment API
// => In production: make HTTP call to payment provider
boolean serviceAvailable = Math.random() > 0.7;
// => 30% simulated failure rate to trigger circuit opening
if (serviceAvailable) {
promise.complete(new JsonObject()
.put("status", "APPROVED")
.put("transactionId", "TXN-" + System.currentTimeMillis()));
} else {
promise.fail("Payment service timeout");
// => Failure increments the breaker's failure counter
}
return promise.future();
}
}Key Takeaway: Wrap external service calls in a CircuitBreaker with failure thresholds and a fallback handler. The circuit opens after repeated failures, protecting both your service and the downstream dependency.
Why It Matters: Without circuit breakers, a slow or failed downstream service causes your service’s thread pool to exhaust waiting for timeouts, cascading failures across the entire system. The circuit breaker fast-fails when the service is known to be unavailable, returning degraded responses immediately. This protects user experience (queue the payment rather than timing out) and gives the downstream service time to recover without being overwhelmed by retry storms.
Example 43: Rate Limiting with Vert.x Web
Rate limiting prevents individual clients from overwhelming your API. Vert.x Web provides RateLimiterHandler based on a token bucket algorithm.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.RateLimiterHandler;
public class RateLimitingDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
// Rate limit: 10 requests per second per client IP
RateLimiterHandler rateLimiter = RateLimiterHandler.create(vertx, 10)
// => 10 requests per second globally (or per identifier)
// => Uses token bucket: tokens refill at 10/second
// => Clients that exhaust tokens receive 429 Too Many Requests
.identifierProvider(ctx -> ctx.request().remoteAddress().host());
// => Identify clients by IP address
// => Could also use JWT subject: ctx.user().principal().getString("sub")
router.route("/api/*").handler(rateLimiter);
// => Apply rate limiter to all /api/* routes
router.get("/api/data").handler(ctx -> {
ctx.json(new io.vertx.core.json.JsonObject()
.put("data", "rate-limited response"));
// => Only reached if client has remaining tokens
});
// Route outside rate limiting
router.get("/health").handler(ctx -> ctx.response().end("OK"));
// => Health checks bypass the rate limiter (intentional)
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
}Key Takeaway: Use RateLimiterHandler with an identifier provider to rate limit by client IP or authenticated user. Apply rate limiting only to paths that need protection.
Why It Matters: Rate limiting is essential defense against denial-of-service attacks, scraping bots, and API abuse. The token bucket algorithm allows short bursts above the average rate while maintaining long-term limits, providing a better user experience than hard per-second caps. Using JWT subject as the identifier limits authenticated users independently regardless of IP, preventing shared-IP users (corporate NAT) from being unfairly grouped together.
Group 18: Advanced Routing Patterns
Example 44: Request Validation with Vert.x Web Validator
Declarative request validation with ValidationHandler validates path params, query params, and request bodies before they reach your business logic, reducing boilerplate validation code.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.validation.RequestParameters;
import io.vertx.ext.web.validation.ValidationHandler;
import io.vertx.ext.web.validation.builder.Bodies;
import io.vertx.ext.web.validation.builder.Parameters;
import io.vertx.json.schema.SchemaParser;
import io.vertx.json.schema.SchemaRouter;
import io.vertx.json.schema.SchemaRouterOptions;
import io.vertx.json.schema.draft7.dsl.Keywords;
import io.vertx.json.schema.draft7.dsl.SchemaBuilder;
import static io.vertx.json.schema.draft7.dsl.Schemas.*;
public class ValidationDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
SchemaRouter schemaRouter = SchemaRouter.create(vertx, new SchemaRouterOptions());
// => Schema router manages JSON Schema definitions
SchemaParser schemaParser = SchemaParser.createDraft7SchemaParser(schemaRouter);
// => Parses JSON Schema draft 7; used for body validation
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// Validate path param, query param, and body in one handler
router.post("/users/:id/orders")
.handler(ValidationHandler.builder(schemaParser)
.pathParameter(Parameters.param("id", intSchema()))
// => Path param "id" must be an integer; fails if non-numeric
.queryParameter(Parameters.optionalParam("status",
stringSchema().with(Keywords.allowedValues("PENDING", "COMPLETED", "CANCELLED"))))
// => Optional query param; if present must be one of the allowed values
.body(Bodies.json(objectSchema()
.requiredProperty("item", stringSchema().with(Keywords.minLength(1)))
// => Body must be JSON object with non-empty "item" string
.requiredProperty("quantity", intSchema().with(Keywords.minimum(1)))
// => "quantity" must be integer >= 1
.optionalProperty("notes", stringSchema())))
// => "notes" is optional string
.build())
.handler(ctx -> {
RequestParameters params = ctx.get(ValidationHandler.REQUEST_CONTEXT_KEY);
// => Retrieve validated and type-converted parameters
int userId = params.pathParameter("id").getInteger();
// => Already converted to Integer by validator; no parseInt() needed
String status = params.queryParameter("status") != null
? params.queryParameter("status").getString() : "PENDING";
// => Optional param with default
JsonObject body = params.body().getJsonObject();
// => Validated and parsed body; schema guarantees required fields present
System.out.println("Create order for user " + userId
+ " item=" + body.getString("item")
+ " qty=" + body.getInteger("quantity")
+ " status=" + status);
// => Output: Create order for user 42 item=laptop qty=2 status=PENDING
ctx.response().setStatusCode(201)
.json(new JsonObject().put("created", true).put("userId", userId));
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
}Key Takeaway: Use ValidationHandler.builder() to declare parameter and body schemas. The handler automatically returns 400 with error details for invalid requests before your business logic runs.
Why It Matters: Declarative validation moves request validation from imperative if-statements scattered across handlers to structured schema declarations at the route definition level. This eliminates an entire class of validation bugs (forgetting to validate a field) and makes the API contract visible in code. Automatic 400 responses with detailed error messages improve developer experience for API consumers debugging integration issues.
Example 45: Serving Compressed and Cached Static Assets
Production static file serving requires proper cache control headers to reduce server load and improve client-side performance.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.StaticHandler;
public class ProductionStaticVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
// Immutable versioned assets: very long cache TTL
router.route("/static/versioned/*")
.handler(StaticHandler.create("webroot/versioned")
.setCachingEnabled(true)
.setMaxAgeSeconds(31536000)
// => 1 year cache (365 days × 86400 sec); versioned files never change
// => URL contains content hash: /static/versioned/app.a1b2c3.js
.setFilesReadOnly(true)
// => Hint: files don't change; skip filesystem mtime checks
.setIncludeHidden(false));
// => Do not serve dotfiles (.env, .htpasswd)
// Regular assets: moderate cache TTL
router.route("/static/*")
.handler(StaticHandler.create("webroot/static")
.setCachingEnabled(true)
.setMaxAgeSeconds(86400)
// => 24-hour cache for regular assets
.setSendVaryHeader(true));
// => Add Vary: Accept-Encoding header
// => Required when serving both compressed and uncompressed versions
// SPA HTML: no caching (must re-check for new deployments)
router.route("/*")
.handler(StaticHandler.create("webroot")
.setIndexPage("index.html")
.setCachingEnabled(false)
// => No cache for HTML; clients always get latest version
.setMaxAgeSeconds(0));
HttpServerOptions options = new HttpServerOptions()
.setCompressionSupported(true)
.setCompressionLevel(6);
// => Gzip responses when client supports it
vertx.createHttpServer(options)
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
}Key Takeaway: Use long cache TTLs for content-hashed versioned assets and no caching for HTML entry points. Combine with setCompressionSupported(true) for maximum performance.
Why It Matters: Optimal cache headers for static assets dramatically reduce server load and improve page load times. Versioned assets with year-long caches mean returning visitors load pages from browser cache with zero server requests. The no-cache HTML policy ensures users always run the latest version of your application after deployments, avoiding the common problem of cached HTML referencing non-existent old asset URLs after a new release.
Group 19: Advanced Async Patterns
Example 46: executeBlocking and Thread Pool Management
vertx.executeBlocking() runs blocking code on the worker thread pool while keeping the result delivery on the event loop, bridging blocking and non-blocking worlds cleanly.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
public class ExecuteBlockingDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
router.get("/thumbnail/:id").handler(this::generateThumbnail);
router.get("/report/:id").handler(this::generateReport);
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private void generateThumbnail(RoutingContext ctx) {
String id = ctx.pathParam("id");
vertx.executeBlocking(() -> {
// => This lambda runs on the WORKER thread pool
// => Safe to use blocking APIs: ImageIO, javax.imageio, etc.
System.out.println("Generating thumbnail on: "
+ Thread.currentThread().getName());
// => Output: Generating thumbnail on: vert.x-worker-thread-0
// Simulate blocking image processing (e.g., ImageIO.read/write)
Thread.sleep(100);
// => Blocking sleep OK on worker thread
return "thumbnail-" + id + ".jpg";
// => Return result; delivered to onSuccess on event loop thread
})
.onSuccess(filename -> {
// => This handler runs on the EVENT LOOP thread (not worker)
System.out.println("Thumbnail ready: " + filename);
// => Output: Thumbnail ready: thumbnail-42.jpg
ctx.response()
.putHeader("Content-Type", "application/json")
.end("{\"filename\":\"" + filename + "\"}");
})
.onFailure(err -> ctx.fail(500, err));
}
private void generateReport(RoutingContext ctx) {
String id = ctx.pathParam("id");
// Default: ordered=true (executes serially on a single worker thread)
// For parallel execution, use executeBlocking(callable, false)
vertx.executeBlocking(() -> heavyComputation(id), false)
// => false = unordered: multiple calls may run in parallel on worker pool
// => true (default) = ordered: calls queue on same worker thread
.onSuccess(result -> ctx.json(
new io.vertx.core.json.JsonObject().put("report", result)))
.onFailure(err -> ctx.fail(500, err));
}
private String heavyComputation(String id) {
// => CPU-intensive or blocking I/O; safe on worker thread
try { Thread.sleep(200); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "report-data-" + id;
// => Result returned to event loop via Future
}
public static void main(String[] args) {
Vertx.vertx().deployVerticle(new ExecuteBlockingDemo());
}
}Key Takeaway: Use vertx.executeBlocking() to run blocking code safely. Pass false as the second argument to allow parallel execution on the worker thread pool.
Why It Matters: executeBlocking provides a safe escape hatch for integrating blocking libraries without violating the event loop model. The ordered mode (default) ensures sequential execution for stateful operations, while unordered mode maximizes parallelism for independent blocking tasks. The automatic delivery of results back to the event loop thread means downstream handlers remain on the same thread model, preventing concurrency bugs at the blocking/non-blocking boundary.
Example 47: Periodic Tasks and Scheduled Jobs
Vert.x timers enable periodic background tasks without external schedulers. Worker verticles handle recurring jobs that involve blocking operations.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
public class ScheduledTasksVerticle extends AbstractVerticle {
private long periodicTimerId;
// => Store timer ID to cancel on shutdown
@Override
public void start(Promise<Void> startPromise) {
// Periodic task every 30 seconds
periodicTimerId = vertx.setPeriodic(30_000, id -> {
// => Fires every 30 seconds on the event loop
// => Use for lightweight checks (state updates, cache invalidation)
System.out.println("Periodic task at: " + java.time.Instant.now());
// => Output: Periodic task at: 2026-03-19T00:00:30Z
// Offload actual work to worker thread if it's blocking
vertx.executeBlocking(() -> {
// => Heavy work on worker thread
cleanExpiredSessions();
// => Blocking DB operation safe here
return null;
}).onFailure(err -> System.err.println("Cleanup failed: " + err));
});
// One-shot delayed task
vertx.setTimer(5_000, id -> {
// => Fires ONCE after 5 seconds
System.out.println("Delayed initialization complete");
// => Use for delayed initialization (warmup caches, etc.)
});
startPromise.complete();
}
@Override
public void stop(Promise<Void> stopPromise) {
vertx.cancelTimer(periodicTimerId);
// => Cancel periodic timer on shutdown to prevent orphaned tasks
// => Without cancellation, timer fires after verticle is stopped
System.out.println("Cancelled periodic task timer");
stopPromise.complete();
}
private void cleanExpiredSessions() {
// => In production: DELETE FROM sessions WHERE expires_at < NOW()
System.out.println("Cleaned expired sessions (simulated)");
}
public static void main(String[] args) {
Vertx.vertx().deployVerticle(new ScheduledTasksVerticle());
}
}Key Takeaway: Use vertx.setPeriodic() for recurring tasks and store the timer ID in a field. Cancel timers in stop() to prevent execution after the verticle is stopped.
Why It Matters: Cancelling timers on shutdown prevents tasks from firing against already-closed resources (database connections, HTTP clients), which can cause confusing NullPointerException logs during graceful shutdown. Offloading periodic work to executeBlocking keeps periodic timers lightweight and non-blocking, ensuring they don’t accumulate delay if the previous execution runs long. This pattern handles session cleanup, cache warming, and metrics flushing without an external cron job.
Group 20: Vert.x Service Proxy
Example 48: Service Proxy Code Generation
Vert.x service proxies generate type-safe event bus clients from interface definitions, hiding the low-level message-passing details behind clean Java interfaces.
import io.vertx.codegen.annotations.ProxyGen;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.serviceproxy.ServiceBinder;
import io.vertx.serviceproxy.ServiceProxyBuilder;
// Service interface annotated for proxy generation
@ProxyGen
// => Triggers code generation of UserServiceVertxEBProxy class
// => Run mvn generate-sources or gradle generateProxies
@VertxGen
// => Marks interface as Vert.x generated; required for multi-language support
interface UserService {
// => All methods must return Future<T> or accept Handler<AsyncResult<T>>
Future<JsonObject> getUser(String id);
// => Returns user data by ID asynchronously
Future<JsonObject> createUser(JsonObject data);
// => Creates a new user and returns the created user data
Future<Void> deleteUser(String id);
// => Deletes a user; returns Void on success
}
// Service implementation
class UserServiceImpl implements UserService {
@Override
public Future<JsonObject> getUser(String id) {
// => Real implementation: query database
return Future.succeededFuture(new JsonObject()
.put("id", id).put("name", "Alice"));
// => Returns completed future with result
}
@Override
public Future<JsonObject> createUser(JsonObject data) {
return Future.succeededFuture(data.copy().put("id", "new-" + System.currentTimeMillis()));
// => Simulates DB insert; returns data with generated ID
}
@Override
public Future<Void> deleteUser(String id) {
System.out.println("Deleting user: " + id);
return Future.succeededFuture();
// => Future.succeededFuture() for Void returns completed future
}
}
// Verticle that registers the service
class ServiceRegistrationDemo extends io.vertx.core.AbstractVerticle {
@Override
public void start(io.vertx.core.Promise<Void> sp) {
new ServiceBinder(vertx)
.setAddress("user.service")
// => Event bus address where the service is reachable
.register(UserService.class, new UserServiceImpl());
// => Registers the implementation; generates a consumer on the event bus
// Use the generated proxy to call the service
UserService proxy = new ServiceProxyBuilder(vertx)
.setAddress("user.service")
// => Connect to the service at this address
.build(UserService.class);
// => Builds a type-safe proxy; calls translate to event bus messages
proxy.getUser("user-42")
.onSuccess(user -> {
System.out.println("Got user: " + user.encode());
// => Output: Got user: {"id":"user-42","name":"Alice"}
// => Method call transparently becomes event bus request/reply
});
sp.complete();
}
public static void main(String[] args) {
Vertx.vertx().deployVerticle(new ServiceRegistrationDemo());
}
}Key Takeaway: Define service interfaces with @ProxyGen, register implementations with ServiceBinder, and call them through generated proxies. Event bus messaging is transparent to callers.
Why It Matters: Service proxies enforce API contracts between verticles at compile time, preventing the type erasure that comes with raw event bus message passing. The generated proxy code handles serialization, deserialization, and timeout management, eliminating boilerplate that developers would otherwise write and potentially get wrong. When a service moves to a different cluster node, proxy callers require no code changes—only the event bus address needs to remain consistent.
Example 49: Session Management with Vert.x Web
Server-side sessions store user state between requests. Vert.x Web’s SessionHandler manages session lifecycle, storage, and expiry automatically.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.Session;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CookieHandler;
import io.vertx.ext.web.handler.SessionHandler;
import io.vertx.ext.web.sstore.LocalSessionStore;
public class SessionDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
// Session handlers must run before route handlers
router.route().handler(SessionHandler.create(
LocalSessionStore.create(vertx))
// => LocalSessionStore: in-memory; use ClusteredSessionStore for distributed deployments
.setSessionTimeout(30 * 60 * 1000)
// => Session expires after 30 minutes of inactivity
.setNagHttps(false));
// => setNagHttps(false): allow sessions over HTTP (development only)
// => In production: setNagHttps(true) enforces HTTPS
router.post("/login").handler(this::login);
router.get("/dashboard").handler(this::dashboard);
router.post("/logout").handler(this::logout);
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private void login(RoutingContext ctx) {
JsonObject body = ctx.body().asJsonObject();
String username = body.getString("username");
// => Validate credentials (simplified)
if (!"alice".equals(username)) {
ctx.response().setStatusCode(401).end("Invalid credentials");
return;
}
Session session = ctx.session();
// => Get session from RoutingContext (created by SessionHandler)
session.put("userId", "user-42");
// => Store user data in session; persisted between requests
session.put("username", username);
// => Session is identified by a cookie sent to client
ctx.response()
.setStatusCode(303)
.putHeader("Location", "/dashboard")
.end();
// => Redirect to dashboard after successful login
}
private void dashboard(RoutingContext ctx) {
Session session = ctx.session();
String userId = session.get("userId");
// => Retrieve stored session data
if (userId == null) {
ctx.response()
.setStatusCode(303)
.putHeader("Location", "/login")
.end();
// => Redirect unauthenticated users to login
return;
}
ctx.json(new JsonObject()
.put("message", "Welcome, " + session.get("username"))
.put("userId", userId));
// => Output: {"message":"Welcome, alice","userId":"user-42"}
}
private void logout(RoutingContext ctx) {
ctx.session().destroy();
// => Invalidate session; removes from store; clears cookie
ctx.response()
.setStatusCode(303)
.putHeader("Location", "/login")
.end();
}
}Key Takeaway: Add SessionHandler before route handlers. Use ctx.session().put()/get() to store and retrieve user data. Call session.destroy() on logout.
Why It Matters: Server-side sessions provide a secure alternative to JWT for applications where token revocation is critical. Destroying a session immediately revokes access, unlike JWTs which remain valid until expiry. LocalSessionStore is appropriate for single-instance deployments; production clustered deployments should use ClusteredSessionStore or an external Redis-backed store to ensure sessions are accessible across all server instances.
Example 50: Response Caching with Cache Headers
Proper HTTP cache headers reduce server load by allowing clients and intermediary caches to serve responses without hitting the server. This example demonstrates different caching strategies.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
public class CacheHeadersDemo extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
// No caching: user-specific, frequently changing data
router.get("/api/cart").handler(this::getCart);
// Short cache: semi-static list data
router.get("/api/categories").handler(this::getCategories);
// Long cache: immutable versioned content
router.get("/api/config/:version").handler(this::getConfig);
// Conditional GET: ETag-based caching
router.get("/api/products/:id").handler(this::getProduct);
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
private void getCart(RoutingContext ctx) {
ctx.response()
.putHeader("Cache-Control", "private, no-cache, no-store")
// => private: only browser may cache; no CDN
// => no-store: never persist to disk (sensitive data)
.json(new JsonObject().put("items", 3).put("total", 99.99));
}
private void getCategories(RoutingContext ctx) {
ctx.response()
.putHeader("Cache-Control", "public, max-age=300")
// => public: CDN and browser may cache
// => max-age=300: valid for 5 minutes (300 seconds)
.json(new io.vertx.core.json.JsonArray()
.add("Electronics").add("Books").add("Clothing"));
}
private void getConfig(RoutingContext ctx) {
String version = ctx.pathParam("version");
ctx.response()
.putHeader("Cache-Control", "public, max-age=31536000, immutable")
// => immutable: content never changes at this versioned URL
// => Browsers skip revalidation entirely for 1 year
.json(new JsonObject().put("version", version).put("theme", "dark"));
}
private void getProduct(RoutingContext ctx) {
String id = ctx.pathParam("id");
// => Compute ETag from product data (e.g., last-modified timestamp)
String etag = "\"product-" + id + "-v3\"";
// => ETag uniquely identifies this version of the resource
String clientEtag = ctx.request().getHeader("If-None-Match");
// => Browser sends this header when it has a cached version
if (etag.equals(clientEtag)) {
ctx.response().setStatusCode(304).end();
// => 304 Not Modified: client's cache is still valid; send no body
// => Saves bandwidth; client uses its cached copy
return;
}
ctx.response()
.putHeader("ETag", etag)
// => Set ETag for client to use in future If-None-Match requests
.putHeader("Cache-Control", "public, max-age=60")
// => Cache for 60 seconds; revalidate with ETag after expiry
.json(new JsonObject().put("id", id).put("name", "Widget").put("price", 9.99));
}
}Key Takeaway: Match caching strategy to data mutability: no-store for sensitive data, short max-age for semi-static data, immutable for versioned assets, and ETags for conditional GETs.
Why It Matters: Proper cache headers are one of the highest-ROI performance optimizations. A well-configured CDN serves category lists millions of times without hitting your server. ETags reduce bandwidth by 90% for unchanged resources, critical for mobile users with limited data plans. The immutable directive eliminates revalidation overhead for versioned assets, reducing page load HTTP roundtrips by 50-80% for returning visitors.
Group 21: Monitoring
Example 51: Micrometer Metrics Integration
Vert.x integrates with Micrometer to expose JVM and application metrics in Prometheus format, enabling real-time monitoring and alerting.
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.ext.web.Router;
import io.vertx.micrometer.MicrometerMetricsOptions;
import io.vertx.micrometer.VertxPrometheusOptions;
public class MetricsVerticle extends AbstractVerticle {
private PrometheusMeterRegistry registry;
// => Prometheus registry: collects and exposes metrics
private Counter requestCounter;
// => Count total HTTP requests
private Timer requestTimer;
// => Measure request duration
@Override
public void start(Promise<Void> startPromise) {
registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
// => In-process Prometheus registry; collects all registered meters
requestCounter = Counter.builder("api_requests_total")
.description("Total number of API requests")
// => Metric description for documentation
.tag("service", "vertx-api")
// => Static tag applied to all observations of this counter
.register(registry);
// => Register with the global registry
requestTimer = Timer.builder("api_request_duration")
.description("API request duration in seconds")
.publishPercentiles(0.5, 0.95, 0.99)
// => Track 50th, 95th, 99th percentile latencies
.register(registry);
Router router = Router.router(vertx);
// Request instrumentation middleware
router.route().handler(ctx -> {
requestCounter.increment();
// => Increment counter for every request
Timer.Sample sample = Timer.start(registry);
// => Start timing this request
ctx.addBodyEndHandler(v -> {
sample.stop(requestTimer);
// => Stop timer when response is sent; records duration
});
ctx.next();
});
router.get("/api/data").handler(ctx -> {
ctx.json(new io.vertx.core.json.JsonObject().put("data", "OK"));
});
// Prometheus scrape endpoint
router.get("/metrics").handler(ctx -> {
ctx.response()
.putHeader("Content-Type", registry.contentType())
// => Prometheus text format: "text/plain; version=0.0.4; charset=utf-8"
.end(registry.scrape());
// => Output: # HELP api_requests_total Total number of API requests
// # TYPE api_requests_total counter
// api_requests_total{service="vertx-api"} 42.0
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
}Key Takeaway: Register Micrometer metrics in the verticle start method. Expose a /metrics endpoint for Prometheus scraping. Instrument middleware with counters and timers.
Why It Matters: Prometheus metrics with percentile tracking expose the actual user experience—P99 latency shows what 1% of users experience, often revealing tail latency issues invisible in averages. The counter/timer middleware added once to the router instruments all routes automatically, ensuring no endpoint is forgotten. These metrics feed alerting rules that page on-call engineers before users report problems, transforming reactive incident response into proactive monitoring.
Example 52: Distributed Tracing with OpenTelemetry
OpenTelemetry distributed tracing propagates trace context across service boundaries, enabling full request path visualization in tools like Jaeger or Zipkin.
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
public class TracingVerticle extends AbstractVerticle {
private Tracer tracer;
// => OpenTelemetry tracer for creating spans
@Override
public void start(Promise<Void> startPromise) {
// In production: configure OTel exporter (Jaeger, OTLP, etc.) at startup
// io.opentelemetry.sdk.trace.SdkTracerProvider...
tracer = GlobalOpenTelemetry.getTracer("vertx-api", "1.0.0");
// => Get tracer for this component; "vertx-api" identifies the instrumentation
Router router = Router.router(vertx);
// Trace context propagation middleware
router.route().handler(ctx -> {
Span span = tracer.spanBuilder(
ctx.request().method() + " " + ctx.request().path())
// => Span name: "GET /users/42" (HTTP method + path)
.startSpan();
// => Start timing this request in the trace
span.setAttribute("http.method", ctx.request().method().name());
// => Add HTTP attributes per OpenTelemetry semantic conventions
span.setAttribute("http.url", ctx.request().absoluteURI());
span.setAttribute("http.target", ctx.request().path());
ctx.put("span", span);
// => Store span in routing context for downstream handlers to use
ctx.addBodyEndHandler(v -> {
span.setAttribute("http.status_code", ctx.response().getStatusCode());
// => Record response status after handler completes
if (ctx.response().getStatusCode() >= 500) {
span.setStatus(StatusCode.ERROR, "Server error");
// => Mark span as error for 5xx responses
}
span.end();
// => End span; records total request duration
});
ctx.next();
});
router.get("/users/:id").handler(ctx -> {
Span parentSpan = ctx.get("span");
// => Get parent span created by middleware
Span childSpan = tracer.spanBuilder("db.query.getUser")
// => Create child span for the DB query
.setParent(Context.current().with(parentSpan))
.startSpan();
try (Scope scope = childSpan.makeCurrent()) {
// => scope ensures childSpan is current for nested operations
childSpan.setAttribute("db.statement", "SELECT * FROM users WHERE id = $1");
childSpan.setAttribute("db.system", "postgresql");
// Simulate DB query
String userId = ctx.pathParam("id");
childSpan.end();
// => End child span before sending response
ctx.json(new io.vertx.core.json.JsonObject()
.put("id", userId).put("name", "Alice"));
}
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
}Key Takeaway: Create spans in middleware and child spans for sub-operations. Store the parent span in RoutingContext so downstream handlers can attach child spans.
Why It Matters: Distributed tracing reveals the full execution path of a request across services, databases, and caches in a timeline visualization. Without tracing, diagnosing why a specific request took 2 seconds requires correlating logs across multiple services—a process that takes hours. With Jaeger or Zipkin, you see in seconds that 1.8 seconds were spent in a database query, pointing directly at the N+1 query problem or missing index responsible.
Example 53: Health Checks with Vert.x Health
HealthCheckHandler provides a standardized health check endpoint with named procedures that check individual dependencies—ideal for Kubernetes probes.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.HealthChecks;
import io.vertx.ext.healthchecks.Status;
import io.vertx.ext.web.Router;
import io.vertx.pgclient.PgBuilder;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
public class HealthChecksVerticle extends AbstractVerticle {
private Pool dbPool;
@Override
public void start(Promise<Void> startPromise) {
dbPool = PgBuilder.pool().using(vertx)
.with(new PoolOptions().setMaxSize(2))
.connectingTo(new PgConnectOptions()
.setHost("localhost").setDatabase("myapp")
.setUser("postgres").setPassword("secret"))
.build();
HealthChecks hc = HealthChecks.create(vertx);
// => Health check registry; register named procedures
hc.register("database", promise -> {
// => Named check: "database"
dbPool.query("SELECT 1")
.execute()
.onSuccess(rows -> promise.complete(Status.OK()))
// => Status.OK() marks this check as healthy
.onFailure(err -> promise.complete(Status.KO()
.put("error", err.getMessage())));
// => Status.KO() marks check as failed; include error details
});
hc.register("memory", promise -> {
Runtime runtime = Runtime.getRuntime();
long freeMemory = runtime.freeMemory();
long totalMemory = runtime.totalMemory();
double usedPercent = (double)(totalMemory - freeMemory) / totalMemory * 100;
// => Calculate memory utilization percentage
if (usedPercent > 90) {
promise.complete(Status.KO()
.put("used_percent", usedPercent)
.put("message", "Memory pressure critical"));
// => Fail check if memory >90% used
} else {
promise.complete(Status.OK()
.put("used_percent", Math.round(usedPercent)));
// => Include current value in health response for monitoring
}
});
Router router = Router.router(vertx);
// Readiness probe: checks all registered procedures
router.get("/health/ready").handler(HealthCheckHandler.createWithHealthChecks(hc));
// => Returns 200 if ALL checks pass; 503 if any check fails
// => Response body: {"status":"UP","checks":[{"id":"database","status":"UP"},...]}
// Liveness probe: always returns 200 if process is running
router.get("/health/live").handler(ctx ->
ctx.json(new io.vertx.core.json.JsonObject().put("status", "UP")));
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
}Key Takeaway: Register named health check procedures with HealthChecks.register(). Use HealthCheckHandler to expose a standards-compliant health endpoint that returns 503 when any check fails.
Why It Matters: Granular named health checks tell Kubernetes and load balancers exactly which dependency is failing, accelerating incident diagnosis. When the database check fails, the readiness probe returns 503, removing the pod from the load balancer rotation before users experience errors. Memory pressure checks catch JVM heap exhaustion before it becomes an OutOfMemoryError, allowing graceful pod replacement rather than crash recovery.
Example 54: WebClient for HTTP Service Calls
WebClient provides a fluent, non-blocking HTTP client for calling external services. It returns futures, composing cleanly with other Vert.x async operations.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.client.predicate.ResponsePredicate;
public class WebClientDemo extends AbstractVerticle {
private WebClient client;
// => Reusable HTTP client; thread-safe, create once
@Override
public void start(Promise<Void> startPromise) {
client = WebClient.create(vertx, new WebClientOptions()
.setDefaultHost("api.example.com")
.setDefaultPort(443)
.setSsl(true)
// => Enable HTTPS
.setConnectTimeout(3000)
// => Connection timeout: 3 seconds
.setIdleTimeout(10)
// => Idle connection timeout: 10 seconds
.setUserAgent("my-vertx-service/1.0"));
// => Set User-Agent header for all requests
// Make a GET request
client.get("/users/42")
.putHeader("Authorization", "Bearer " + getToken())
// => Add auth header
.as(io.vertx.ext.web.codec.BodyCodec.jsonObject())
// => Decode response body as JsonObject automatically
.send()
.onSuccess(response -> {
int status = response.statusCode();
// => HTTP status code
JsonObject user = response.body();
// => Already decoded as JsonObject by BodyCodec
System.out.println("Got user: " + user.encode());
// => Output: Got user: {"id":42,"name":"Alice"}
})
.onFailure(err -> System.err.println("Request failed: " + err));
// POST with body and response predicate
client.post("/users")
.expect(ResponsePredicate.SC_CREATED)
// => Automatically fail if status code is not 201 Created
.sendJsonObject(new JsonObject().put("name", "Bob").put("email", "bob@example.com"))
// => Send JSON body; sets Content-Type: application/json
.onSuccess(response -> System.out.println("Created: " + response.bodyAsJsonObject()))
.onFailure(err -> System.err.println("Create failed: " + err));
// => onFailure fires if status != 201 OR network error
startPromise.complete();
}
@Override
public void stop(Promise<Void> stopPromise) {
client.close();
// => Release connection pool resources on shutdown
stopPromise.complete();
}
private String getToken() {
return "example-token";
// => In production: retrieve from config or token store
}
}Key Takeaway: Use WebClient for external HTTP calls. Use expect(ResponsePredicate) to auto-fail on unexpected status codes and BodyCodec for automatic deserialization.
Why It Matters: ResponsePredicate prevents the common mistake of treating all HTTP responses as successes—a service returning 404 for a “found” resource is a bug that silent success handling misses. BodyCodec eliminates manual JSON deserialization and its associated error handling. Creating WebClient once and reusing it across requests enables connection pooling to the same upstream service, dramatically reducing connection overhead for service meshes making thousands of calls per second.
Example 55: Reactive Streams with Vert.x Streams API
Vert.x Streams API provides backpressure-aware streaming for large data, preventing memory exhaustion when producers outpace consumers.
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.streams.Pipe;
import io.vertx.ext.web.Router;
public class StreamingVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
Router router = Router.router(vertx);
router.get("/download/:filename").handler(ctx -> {
String filename = "/data/" + ctx.pathParam("filename");
vertx.fileSystem().open(filename,
new OpenOptions().setRead(true),
ar -> {
if (ar.failed()) {
ctx.fail(404, ar.cause());
return;
}
AsyncFile file = ar.result();
// => AsyncFile implements ReadStream<Buffer>
ctx.response()
.putHeader("Content-Type", "application/octet-stream")
.putHeader("Transfer-Encoding", "chunked");
// => Chunked encoding: send data as it becomes available
Pipe<Buffer> pipe = file.pipe();
// => Pipe manages backpressure between source and destination
// => Pauses the source when destination's write queue is full
pipe.to(ctx.response())
// => Pipe file content directly to HTTP response
// => Backpressure: if client reads slowly, file reading pauses
.onSuccess(v -> System.out.println("File streamed successfully"))
.onFailure(err -> {
System.err.println("Stream error: " + err);
// => Error during streaming; connection may already be closed
});
});
});
router.post("/upload/stream").handler(ctx -> {
ctx.request().pause();
// => Pause request stream immediately to avoid buffering before pipe is set up
ctx.request().resume();
// => Resume after setting up downstream processing
ctx.request()
.pipeTo(vertx.fileSystem().openBlocking(
"/uploads/" + java.util.UUID.randomUUID() + ".dat",
new OpenOptions().setWrite(true).setCreate(true)))
// => Stream request body directly to file; minimal memory usage
.onSuccess(v -> ctx.response().setStatusCode(201).end("{\"stored\":true}"))
.onFailure(err -> ctx.fail(500, err));
});
vertx.createHttpServer()
.requestHandler(router)
.listen(8080)
.onSuccess(s -> startPromise.complete())
.onFailure(startPromise::fail);
}
public static void main(String[] args) {
Vertx.vertx().deployVerticle(new StreamingVerticle());
}
}Key Takeaway: Use pipe().to() for streaming between ReadStream and WriteStream. Backpressure is automatic—the source pauses when the destination’s write queue fills.
Why It Matters: Streaming with backpressure is essential for serving large files (video, CSV exports, database dumps) without loading the entire file into memory. Without backpressure, a slow network client receiving a 1GB file forces your server to buffer the entire file in memory, quickly exhausting heap space under concurrent downloads. Vert.x’s Pipe API handles buffer size management automatically, keeping memory usage constant regardless of file size or client download speed.