Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import java.util.Map;
import org.apache.helix.HelixAdmin;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
Expand Down Expand Up @@ -38,6 +39,28 @@ public interface HelixAdminClient {
*/
void createVeniceStorageCluster(String clusterName, ClusterConfig clusterConfig, RESTConfig restConfig);

/**
* Legacy (non-HAAS) creation of a Venice storage cluster: creates the storage Helix cluster and
* registers it as a resource in the Venice controller cluster. No-op if the cluster already exists.
* @param clusterName of the Venice storage cluster.
*/
void createVeniceStorageClusterLegacy(String clusterName);

/**
* Enable the customized state config on the given storage cluster. The customized state config may get
* wiped or may never have been written to the ZK cluster config before, so it needs to be (re)enabled.
* @param clusterName of the Venice storage cluster.
*/
void setupCustomizedStateConfig(String clusterName);

/**
* Returns the underlying {@link HelixAdmin} used for storage-cluster operations. This is a low-level
* escape hatch for raw Helix operations not otherwise exposed by this interface (e.g. used by tests
* and maintenance tooling). Prefer the dedicated methods on this interface for normal operations.
* @return the storage-cluster {@link HelixAdmin}.
*/
HelixAdmin getHelixAdmin();

/**
* Check if the given Venice storage cluster's cluster resource is in the Venice controller cluster.
* @param clusterName of the Venice storage cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,23 +297,16 @@
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.client.ZkClient;
Expand Down Expand Up @@ -351,7 +344,6 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
private final String kafkaSSLBootstrapServers;
private final Map<String, AdminConsumerService> adminConsumerServices = new ConcurrentHashMap<>();

private static final int CONTROLLER_CLUSTER_NUMBER_OF_PARTITION = 1;
private static final long CONTROLLER_CLUSTER_RESOURCE_EV_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(5);
private static final long CONTROLLER_CLUSTER_RESOURCE_EV_CHECK_DELAY_MS = 500;
private static final long HELIX_RESOURCE_ASSIGNMENT_RETRY_INTERVAL_MS = 500;
Expand All @@ -367,7 +359,6 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner {
static final int VERSION_ID_UNSET = -1;

// TODO remove this field and all invocations once we are fully on HaaS. Use the helixAdminClient instead.
private final HelixAdmin admin;
/**
* Client/wrapper used for performing Helix operations in Venice.
*/
Expand Down Expand Up @@ -541,23 +532,9 @@ public VeniceHelixAdmin(
this.fabricControllerClientProvider =
new FabricControllerClientProvider(multiClusterConfigs, sslFactory, d2Clients);

// TODO: Consider re-using the same zkClient for the ZKHelixAdmin and TopicManager.
ZkClient zkClientForHelixAdmin = ZkClientFactory.newZkClient(multiClusterConfigs.getZkAddress());
zkClientForHelixAdmin
.subscribeStateChanges(new ZkClientStatusStats(metricsRepository, "controller-zk-client-for-helix-admin"));
/**
* N.B.: The following setup steps are necessary when using the {@link ZKHelixAdmin} constructor which takes
* in an external {@link ZkClient}.
*
* {@link ZkClient#setZkSerializer(ZkSerializer)} is necessary, otherwise Helix will throw:
*
* org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError: java.io.NotSerializableException: org.apache.helix.ZNRecord
*/
zkClientForHelixAdmin.setZkSerializer(new ZNRecordSerializer());
if (!zkClientForHelixAdmin.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
throw new VeniceException("Failed to connect to ZK within " + ZkClient.DEFAULT_CONNECTION_TIMEOUT + " ms!");
}
this.admin = new ZKHelixAdmin(zkClientForHelixAdmin);
// All Helix-admin operations (storage clusters, the controller cluster, and the HAAS grand cluster)
// go through this single client, which owns its own Helix ZK connection(s). VeniceHelixAdmin no longer
// holds a separate ZKHelixAdmin of its own.
this.helixAdminClient = new ZkHelixAdminClient(multiClusterConfigs, metricsRepository);
// There is no way to get the internal zkClient from HelixManager or HelixAdmin. So create a new one here.
this.zkClient = ZkClientFactory.newZkClient(multiClusterConfigs.getZkAddress());
Expand Down Expand Up @@ -1069,7 +1046,7 @@ public synchronized void initStorageCluster(String clusterName) {
}
// The customized state config may get wiped or have never been written to ZK cluster config before, we need to
// enable at first.
HelixUtils.setupCustomizedStateConfig(admin, clusterName);
helixAdminClient.setupCustomizedStateConfig(clusterName);
// The resource and partition may be disabled for this controller before, we need to enable again at first. Then the
// state transition will be triggered.
List<String> partitionNames =
Expand Down Expand Up @@ -1146,11 +1123,11 @@ public boolean isResourceStillAlive(String resourceName) {
*/
@Override
public boolean isClusterValid(String clusterName) {
return admin.getClusters().contains(clusterName);
return helixAdminClient.isVeniceStorageClusterCreated(clusterName);
}

protected HelixAdmin getHelixAdmin() {
return this.admin;
return helixAdminClient.getHelixAdmin();
}

/**
Expand Down Expand Up @@ -6590,7 +6567,6 @@ public void stopVeniceController() {
helixManager.disconnect();
topicManagerRepository.close();
zkClient.close();
admin.close();
helixAdminClient.close();
} catch (Exception e) {
throw new VeniceException("Can not stop controller correctly.", e);
Expand Down Expand Up @@ -6823,65 +6799,18 @@ OfflinePushStatusInfo getOfflinePushStatusInfo(
// TODO remove this method once we are fully on HaaS
// Create the controller cluster for venice cluster assignment if required.
private void createControllerClusterIfRequired() {
if (admin.getClusters().contains(controllerClusterName)) {
LOGGER.info("Cluster: {} already exists.", controllerClusterName);
return;
}

boolean isClusterCreated = admin.addCluster(controllerClusterName, false);
if (isClusterCreated == false) {
/**
* N.B.: {@link HelixAdmin#addCluster(String, boolean)} has a somewhat quirky implementation:
*
* When it returns true, it does not necessarily mean the cluster is fully created, because it
* short-circuits the rest of its work if it sees the top-level znode is present.
*
* When it returns false, it means the cluster is either not created at all or is only partially
* created.
*
* Therefore, when calling this function twice in a row, it is possible that the first invocation
* may do a portion of the setup work, and then fail on a subsequent step (thus returning false);
* and that the second invocation would short-circuit when seeing that the initial portion of the
* work is done (thus returning true). In this case, however, the cluster is actually not created.
*
* Because the function swallows any errors (though still logs them, thankfully) and only returns
* a boolean, it is impossible to catch the specific exception that prevented the first invocation
* from working, hence why our own logs instruct the operator to look for previous Helix logs for
* the details...
*
* In the main code, I (FGV) believe we don't retry the #addCluster() call, so we should not fall
* in the scenario where this returns true and we mistakenly think the cluster exists. This does
* happen in the integration test suite, however, which retries service creation several times
* if it gets any exception.
*
* In any case, if we call this function twice and progress past this code block, another Helix
* function below, {@link HelixAdmin#setConfig(HelixConfigScope, Map)}, fails with the following
* symptoms:
*
* org.apache.helix.HelixException: fail to set config. cluster: venice-controllers is NOT setup.
*
* Thus, if you see this, it is actually because {@link HelixAdmin#addCluster(String, boolean)}
* returned true even though the Helix cluster is only partially setup.
*/
throw new VeniceException(
"admin.addCluster() for '" + controllerClusterName + "' returned false. "
+ "Look for previous errors logged by Helix for more details...");
}
HelixConfigScope configScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(controllerClusterName)
.build();
Map<String, String> helixClusterProperties = new HashMap<String, String>();
helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true));
// Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constraints on CRUSH alg to
// choose proper instance to hold the replica.
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY_AWARE_ENABLED.name(), String.valueOf(false));
/**
* This {@link HelixAdmin#setConfig(HelixConfigScope, Map)} function will throw a HelixException if
* the previous {@link HelixAdmin#addCluster(String, boolean)} call failed silently (details above).
* Delegate to {@link HelixAdminClient#createVeniceControllerCluster()} so that ALL Helix-admin
* operations on the controller cluster go through the single {@link #helixAdminClient}, rather
* than the duplicate {@link #admin} {@link HelixAdmin} held directly by this class.
*
* {@link ZkHelixAdminClient#createVeniceControllerCluster()} is a strict superset of the previous
* inline logic (same ALLOW_PARTICIPANT_AUTO_JOIN + TOPOLOGY_AWARE_ENABLED=false config and
* LeaderStandby state model, plus retry logic and the same {@code persistBestPossibleAssignment}
* /capacity/cloud-config handling already used by the HAAS path). It is idempotent and a no-op if
* the controller cluster already exists.
*/
admin.setConfig(configScope, helixClusterProperties);
admin.addStateModelDef(controllerClusterName, LeaderStandbySMD.name, LeaderStandbySMD.build());
helixAdminClient.createVeniceControllerCluster();
}

private void setupStorageClusterAsNeeded(String clusterName) {
Expand Down Expand Up @@ -6919,62 +6848,15 @@ private void setupStorageClusterAsNeeded(String clusterName) {

// TODO remove this method once we are fully on HaaS
private void createClusterIfRequired(String clusterName) {
if (admin.getClusters().contains(clusterName)) {
LOGGER.info("Cluster: {} already exists.", clusterName);
return;
}

boolean isClusterCreated = admin.addCluster(clusterName, false);
if (!isClusterCreated) {
LOGGER.info("Cluster: {} creation returned false.", clusterName);
return;
}

VeniceControllerClusterConfig config = multiClusterConfigs.getControllerConfig(clusterName);
HelixConfigScope clusterConfigScope =
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(clusterName).build();
Map<String, String> helixClusterProperties = new HashMap<>();
helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true));
long delayedTime = config.getDelayToRebalanceMS();
if (delayedTime > 0) {
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), String.valueOf(delayedTime));
}
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), String.valueOf(true));
helixClusterProperties.put(
ClusterConfig.ClusterConfigProperty.TOPOLOGY_AWARE_ENABLED.name(),
String.valueOf(config.isServerHelixClusterTopologyAware()));
if (config.isServerHelixClusterTopologyAware()) {
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), config.getServerHelixClusterTopology());
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), config.getServerHelixClusterFaultZoneType());
}

admin.setConfig(clusterConfigScope, helixClusterProperties);
LOGGER.info(
"Cluster creation: {} completed, auto join to true. Delayed rebalance time: {}ms",
clusterName,
delayedTime);
admin.addStateModelDef(clusterName, LeaderStandbySMD.name, LeaderStandbySMD.build());

admin.addResource(
controllerClusterName,
clusterName,
CONTROLLER_CLUSTER_NUMBER_OF_PARTITION,
LeaderStandbySMD.name,
IdealState.RebalanceMode.FULL_AUTO.toString(),
AutoRebalanceStrategy.class.getName());
IdealState idealState = admin.getResourceIdealState(controllerClusterName, clusterName);
// Use crush alg to allocate controller as well.
int controllerClusterReplica = config.getControllerClusterReplica();
idealState.setReplicas(String.valueOf(controllerClusterReplica));
idealState.setMinActiveReplicas(controllerClusterReplica);
idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
idealState.setRebalanceStrategy(CrushRebalanceStrategy.class.getName());
admin.setResourceIdealState(controllerClusterName, clusterName, idealState);
admin.rebalance(controllerClusterName, clusterName, controllerClusterReplica);
/**
* Delegate to {@link HelixAdminClient#createVeniceStorageClusterLegacy(String)} so that this
* legacy (non-HAAS) storage-cluster setup goes through the single {@link #helixAdminClient} rather
* than the duplicate {@link #admin} {@link HelixAdmin} held directly by this class. The delegated
* method is a verbatim relocation of the previous inline logic (cluster creation with the same
* properties + LeaderStandby state model, then controller-cluster resource registration using
* DelayedAutoRebalancer + CrushRebalanceStrategy). No ZK-address or behaviour change.
*/
helixAdminClient.createVeniceStorageClusterLegacy(clusterName);
}

public boolean updateIdealState(String clusterName, String resourceName, int minReplica) {
Expand Down
Loading