Skip to content

[da-vinci][server][common] Stateful CDC client direct blob transfer from Venice server#2869

Open
shresthhh wants to merge 1 commit into
linkedin:mainfrom
shresthhh:shtiwary/stateful-cdc-server-blob-transfer
Open

[da-vinci][server][common] Stateful CDC client direct blob transfer from Venice server#2869
shresthhh wants to merge 1 commit into
linkedin:mainfrom
shresthhh:shtiwary/stateful-cdc-server-blob-transfer

Conversation

@shresthhh

@shresthhh shresthhh commented Jun 16, 2026

Copy link
Copy Markdown

Problem Statement

When a Stateful CDC / Da Vinci client cold-starts with no local state, it loads the store by replaying the Version Topic (VT) or by pulling a RocksDB snapshot from a client peer. With the move to Northguard (no topic compaction), the VT can grow unbounded, so VT-replay cold-start scales from minutes to days and becomes impractical. Peer-to-peer blob transfer covers the steady state but breaks when there are no peers:

  • New-store onboarding: a brand-new store has no peer clients yet to serve a blob.
  • Disaster (Sev 0-1): all peers in a region are down for an extended period, so no peer can serve a blob.

Today there is no automated recovery path for these cases — a Venice operator must manually issue a repush.

Solution

Allow Venice servers to serve blob-transfer requests from Stateful CDC clients when no client peer is available. The client falls back to a server only after peer discovery returns no usable peer. Everything is gated by independent, default-off flags, and the pre-existing server-to-server (s2s) and peer-to-peer blob-transfer paths are unchanged.

  • Server discovery: MetadataBasedServerBlobFinder (venice-common) finds servers hosting the partition via request-based metadata over D2 server-routing (the Fast Client RequestBasedMetadata pattern) — no new client ZK fan-out. The Avro response is decoded against the server-advertised writer schema (TransportClientResponse.getSchemaId()RouterBackedSchemaReader) to tolerate MetadataResponseRecord version skew; any failure fails safe to VT replay.
  • Client fallback: NettyP2PBlobTransferManager escalates peers → one bounded server pass → VT.
  • Accept gate + access control: P2PFileTransferServerHandler honors the accept flag; BlobTransferAclHandler classifies SERVER- vs CLIENT-origin by application identity (via IdentityParser) and enforces the per-store read ACL (DynamicAccessController) on client-origin requests.
  • Prioritization: BlobTransferAdmissionController caps concurrent client-origin transfers so server-to-server traffic is never starved.
  • Format compatibility reuses the existing requestTableFormat 404 rejection.

Out of scope (per design): RocksDB format conversion, cross-region client→server transfers, and automatic client→server overload recovery.

Code changes

  • Added new code behind a config. Configs (all default off / conservative):
    • server.blob.transfer.accept.client.request.enabled — server accepts client-origin blob requests. Default false.
    • davinci.blob.transfer.server.fallback.enabled — client falls back to a server when no peer is found. Default false.
    • server.blob.transfer.client.capacity.percent — cap on the host blob-transfer budget that client-origin transfers may use (s2s uncapped). Default 25.
  • Introduced new log lines (INFO milestones + DEBUG diagnostics on the cold-start path).
    • Confirmed if logs need to be rate limited — they are on the rare cold-start/fallback path (not a hot path), so no rate limiting is needed.

Concurrency-Specific Checks

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms are used where needed (double-checked locking for the lazy schema reader; synchronized admission accounting).
  • No blocking calls inside critical sections (schema/metadata fetches occur outside locks).
  • Verified thread-safe collections are used (VeniceConcurrentHashMap).
  • Validated proper exception handling in multi-threaded code (discovery and transfer fail safe to VT replay).

How was this PR tested?

  • Local code review completed
  • New unit tests added (TestBlobTransferAclHandler, TestBlobTransferAdmissionController, MetadataBasedServerBlobFinderTest).
  • New integration tests added (DaVinciClientServerFallbackBlobTransferTest — server→client cold-start, asserts a positive blob-transfer metric so it cannot false-pass via VT; TestClientOriginServerBlobTransfer — real-mTLS CLIENT-origin accept/ACL gates).
  • Modified or extended existing tests (TestNettyP2PBlobTransferManager, TestP2PFileTransferServerHandler, TestBlobTransferManagerBuilder).
  • Verified backward compatibility — flags default off; the pre-existing s2s (BlobP2PTransferAmongServersTest) and DVC<->DVC (DaVinciClientP2PBlobTransferTest) suites remain green.

Local run: da-vinci-client *BlobTransfer*/*P2P* (169) and venice-common MetadataBasedServerBlobFinder (11) unit tests pass; the integration tests above pass. Local code review completed. A complete e2e integration test could not be completed due to certification issues.

Does this PR introduce any user-facing or breaking changes?

  • No. All new behavior is behind default-off flags; with the flags off, server-to-server and DVC<->DVC blob transfer behave exactly as before.

@shresthhh shresthhh force-pushed the shtiwary/stateful-cdc-server-blob-transfer branch from 532fa20 to 2bba4ba Compare June 17, 2026 07:43

@jingy-li jingy-li left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congrats on landing your first PR here — nice work! 🎉

Before I go deep on the review, could you please rework the feature rollout so it's strictly behind config control, with the new logic separated from the existing path rather than fused into it? A couple of concrete spots where they're currently mixed together:

  1. ACL classification has been integrated into the existing accept decision flow. Previously, the logic was simply: "same issuer → trusted → accept." The access check can only perform only when the corresponding configuration flag [acceptClientRequestEnabled] is enabled, so it does not affect or disrupt the current behavior.

  2. The new admission controller replaced the existing concurrency counter wholesale (P2PFileTransferServerHandler). The old globalConcurrentTransferRequests counter is gone, and BlobTransferAdmissionController is now in the pipeline for all origins, including pure s2s. Same total budget, but the count is now taken earlier (at admit, before getTransferMetadata, but actually we put counting after getTransferMetadata is intentionally, because if get metadata fail, we wont count it, as there is no real traffic to send files. ) and on a different population, so the s2s throttle behaves differently even with the feature off. I'd prefer the entire origin path keep the original counter untouched and only route the new feature traffic through the controller, so the new accounting can't perturb s2s.

Those 2 are just samples, there are also several minor parts about this config control.

Let's use a simple if-else on the accept-client-request config to cleanly separate the existing logic from the new feature. The reason I'm calling out on this: the existing path has been tuned over many back-and-forth runs in EI and finally reached stable performance in production. If the new behavior is interleaved with it rather than gated, it's very easy for a change to slip through and reach production unnoticed. Keeping them separated behind the config means flags-off = exactly today's behavior, and we can validate the new path on its own.

@shresthhh shresthhh force-pushed the shtiwary/stateful-cdc-server-blob-transfer branch from 2bba4ba to f1e1271 Compare June 18, 2026 21:01
@shresthhh

Copy link
Copy Markdown
Author

Thank you for the feedback! You're right that the new path was fused into the existing one. I've reworked it so the feature is gated behind acceptClientRequestEnabled, with the new logic separated.

Some points to consider:

1. Concurrency counter

  • BlobTransferAdmissionController is now client-only: it tracks only client-origin transfers on a separate budget. Server-to-server never goes through it.

  • The handler branches if (acceptClientRequestEnabled && origin == CLIENT) { client admission } else { original global counter }

    Important Note: Budgets are now independent rather than a shared N split. s2s keeps its full N (ex: 15); the client cap (default floor(N·25%) = 3) sits on top, so peak concurrency can reach N + clientCap (15→18). This was to guarantees client traffic can never delay s2s (previously I had made it a shared pool) since it was already running on N. Tradeoff is up to clientCap extra concurrent transfers when clients are active; if that takes up disk, we could do it out of N itself.

2. ACL classification

  • The per-store access check DynamicAccessController.hasAccess now runs only when acceptClientRequestEnabled is on.

  • Flag off: s2s (same-issuer, SERVER-origin) is forwarded exactly as today; a client-origin request is rejected with 403 — per our discussion that the server should reject a client when it isn't accepting client requests.

    Important Note: The one new step that necessarily runs when off is origin classification, since that's the only way to tell a client apart from an s2s peer in order to reject the former and forward the latter. It uses the sanctioned IdentityParser, and a correctly-classified s2s peer is forwarded unchanged.

Client admission timing
I intentionally kept the client admission before getTransferMetadata, unlike the server's count-after. Counting a client up front tags it as a client early, so when the connection closes, cleanup always releases the client budget and never the server one. If clients were counted after metadata (like the server), a client that gets rejected at that step could accidentally subtract from the server counter and loosen the s2s limit. Keeping the current order stops the two budgets from interfering - can restructure if this doesn't make sense

@shresthhh shresthhh force-pushed the shtiwary/stateful-cdc-server-blob-transfer branch from f1e1271 to 45f7262 Compare June 19, 2026 00:17
@kvargha kvargha requested a review from Copilot June 19, 2026 00:40

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an opt-in “server fallback” blob-transfer path for Stateful CDC / Da Vinci clients so cold-start bootstrapping can fetch RocksDB snapshots directly from Venice servers when no peer clients are available. It introduces server discovery via server-served metadata (metadata/<store>), enforces client-origin access control on the server side, and adds admission control so client-origin transfers are capacity-bounded.

Changes:

  • Add metadata-based server discovery and Da Vinci client fallback from peers → servers → VT replay.
  • Add server-side classification (SERVER vs CLIENT origin), accept gating + per-store ACL enforcement, and client-origin admission control.
  • Add unit + integration coverage for discovery, fallback, ACL/origin behavior, and admission control.

Reviewed changes

Copilot reviewed 22 out of 25 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java Wires new blob-transfer config params and ACL handler creation into server startup.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java Wires new blob-transfer config params, ACL handler creation, and enables server fallback for Da Vinci.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java Adds new config knobs (accept client requests, fallback enable, client capacity percent).
internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java Defines new config keys for server accept, client fallback, and client capacity percent.
internal/venice-common/src/main/java/com/linkedin/venice/blobtransfer/MetadataBasedServerBlobFinder.java Implements server-host discovery via server-served metadata and schema-id tolerant decoding.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/NettyP2PBlobTransferManager.java Adds server-fallback discovery and a bounded “one server pass” attempt before VT replay.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java Adds builder wiring for server fallback and auto-constructs the server finder when enabled.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/P2PBlobTransferConfig.java Extends config to carry client capacity percent and accept-client flag.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferUtils.java Extends ACL handler creation to inject identity parser, store access controller, and accept flag; adds origin attribute + enum.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferAclHandler.java Classifies origin and applies accept/ACL checks for client-origin requests on servers.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferAdmissionController.java New controller to cap concurrent client-origin transfers as % of host budget.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PFileTransferServerHandler.java Applies client-origin admission control and tracks client admission for cleanup.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/P2PBlobTransferService.java Wires admission controller + accept-client flag into the Netty server pipeline.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/server/BlobTransferNettyChannelInitializer.java Passes admission controller + accept-client flag to the server handler.
internal/venice-common/src/test/java/com/linkedin/venice/blobtransfer/MetadataBasedServerBlobFinderTest.java Unit tests for host normalization, schema-id decoding behavior, caching, and failure paths.
internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientServerFallbackBlobTransferTest.java Integration test validating cold-start server fallback mechanics and positive blob-transfer metrics.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestNettyP2PBlobTransferManager.java Tests that server fallback is (not) consulted based on enablement and peer usability.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestP2PFileTransferServerHandler.java Adds unit test for client-capacity 429 behavior; updates concurrency-limit test loop.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestClientOriginServerBlobTransfer.java End-to-end Netty/mTLS test of client-origin accept flag + per-store ACL enforcement.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestBlobTransferAdmissionController.java Unit tests for capacity calculation and admission/release semantics.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestBlobTransferAclHandler.java Unit tests for origin classification and accept/ACL gating behavior.
clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestBlobTransferManagerBuilder.java Updates builder tests for new blob-transfer config ctor signature and wiring.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +183 to 201
try {
transferPartitionMetadata =
blobSnapshotManager.getTransferMetadata(blobTransferRequest, successCountedAsActiveCurrentUser);
ctx.channel().attr(SUCCESS_COUNTED).set(successCountedAsActiveCurrentUser);
ctx.channel().attr(BLOB_TRANSFER_REQUEST).set(blobTransferRequest);
if (successCountedAsActiveCurrentUser.get()) {
if (globalConcurrentTransferRequests.incrementAndGet() >= maxAllowedConcurrentSnapshotUsers) {
String errMessage =
"The number of concurrent snapshot users exceeds the limit of " + maxAllowedConcurrentSnapshotUsers
+ ", wont be able to process the request for " + blobTransferRequest.getFullResourceName();
LOGGER.error(errMessage);
setupResponseAndFlush(HttpResponseStatus.TOO_MANY_REQUESTS, errMessage.getBytes(), false, ctx);
return;
}
}
} catch (Exception e) {
setupResponseAndFlush(HttpResponseStatus.NOT_FOUND, e.getMessage().getBytes(), false, ctx);
return;
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the existing flow which is currently in an if else block to handle both the mechanisms so would need to check this out

@shresthhh shresthhh force-pushed the shtiwary/stateful-cdc-server-blob-transfer branch from 45f7262 to abf3b78 Compare June 19, 2026 04:42
Venice server

Let a Stateful CDC / Da Vinci client cold-start by pulling a RocksDB snapshot
directly from a Venice server when no client peer is available, instead of
replaying the Version Topic (VT). This unblocks new-store onboarding and
gives a
self-healing path when all peers in a region are down. Everything is behind
two
independent, default-off feature flags; the existing server-to-server and
DVC<->DVC blob-transfer paths are unchanged.

## Summary
- Server discovery: MetadataBasedServerBlobFinder (venice-common) finds
servers
  hosting a partition via request-based metadata over D2 server-routing (the
Fast
  Client RequestBasedMetadata pattern), decodes the response against the
  server-advertised writer schema to tolerate MetadataResponseRecord version
skew,
  and fails safe to VT replay on any error.
- Client fallback: NettyP2PBlobTransferManager escalates peers -> one bounded
  server pass -> VT, gated by DAVINCI_BLOB_TRANSFER_SERVER_FALLBACK_ENABLED.
- Accept gate + access control: P2PFileTransferServerHandler honors
  SERVER_BLOB_TRANSFER_ACCEPT_CLIENT_REQUEST_ENABLED; BlobTransferAclHandler
  classifies SERVER- vs CLIENT-origin by application identity
(IdentityParser) and
  enforces the per-store read ACL (DynamicAccessController) on client
requests.
- Prioritization: BlobTransferAdmissionController caps concurrent client
transfers
  so server-to-server traffic is never starved.
- Format compatibility reuses the existing requestTableFormat 404 rejection.

## Testing Done
- Unit: da-vinci-client *BlobTransfer*/*P2P* (169) and venice-common
  MetadataBasedServerBlobFinder (11) pass.
- Integration: DaVinciClientServerFallbackBlobTransferTest (server->client
  cold-start; asserts a positive blob-transfer metric so it cannot false-pass
via
  VT) and TestClientOriginServerBlobTransfer (real-mTLS CLIENT-origin
accept/ACL
  gates) pass. Pre-existing BlobP2PTransferAmongServersTest (s2s) and
  DaVinciClientP2PBlobTransferTest (DVC<->DVC) remain green.
- spotless clean; full module compile.
@shresthhh shresthhh force-pushed the shtiwary/stateful-cdc-server-blob-transfer branch from abf3b78 to 3eeea99 Compare June 19, 2026 05:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants