From 2bcea535613474435685b967fda7601d88e5c91b Mon Sep 17 00:00:00 2001 From: AlanJager Date: Tue, 13 Oct 2020 20:38:45 +0800 Subject: [PATCH 1/2] Batch prepare dhcp server and apply dhcp info Signed-off-by: AlanJager --- .../network/service/flat/DhcpApply.java | 316 ++++++++++++------ .../network/service/flat/FlatDhcpBackend.java | 46 ++- .../BatchCreateVmFailDeadlockCase.groovy | 2 +- .../integration/kvm/vm/VmVolumeGCCase.groovy | 2 +- .../l3network/ipv6/IPv6DhcpCase.groovy | 16 +- .../l3network/ipv6/IPv6MigrateVmCase.groovy | 6 +- .../OneVxlanNetworkLifeCycleCase.groovy | 6 +- .../vxlanNetwork/VxlanLazyAttachCase.groovy | 10 +- .../flat/ChangeNetworkSerivceCase.groovy | 21 +- .../flat/dhcp/CheckFlatDhcpWorkCase.groovy | 11 +- .../GetDhcpInfoForConnectedKvmHostCase.groovy | 18 +- .../provider/flat/dhcp/OneVmDhcpCase.groovy | 23 +- ...ifyPrepareDhcpWhenReconnectHostCase.groovy | 11 +- .../provider/flat/dns/FlatAddDnsCase.groovy | 10 +- .../provider/flat/dns/FlatDnsOrderCase.groovy | 5 +- .../hostRoute/FlatAddHostRouteCase.groovy | 48 +-- .../flat/userdata/NoDhcpServiceCase.groovy | 4 +- .../VirtualrouterMultiNicCase.groovy | 7 +- .../testlib/FlatnetworkSimualtor.groovy | 8 + 19 files changed, 345 insertions(+), 225 deletions(-) diff --git a/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/DhcpApply.java b/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/DhcpApply.java index 6a809a9cb3c..78d741cbdf7 100644 --- a/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/DhcpApply.java +++ b/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/DhcpApply.java @@ -1,87 +1,166 @@ package org.zstack.network.service.flat; +import com.google.common.collect.Lists; +import edu.emory.mathcs.backport.java.util.Collections; +import org.zstack.core.asyncbatch.While; import org.zstack.core.cloudbus.CloudBus; import org.zstack.core.cloudbus.CloudBusCallBack; import org.zstack.core.workflow.FlowChainBuilder; import org.zstack.core.workflow.ShareFlow; import org.zstack.header.core.Completion; +import org.zstack.header.core.NoErrorCompletion; import org.zstack.header.core.workflow.*; import org.zstack.header.errorcode.ErrorCode; +import org.zstack.header.errorcode.ErrorCodeList; import org.zstack.header.host.HostConstant; import org.zstack.header.message.MessageReply; +import org.zstack.header.network.l2.BatchCheckNetworkPhysicalInterfaceMsg; +import org.zstack.header.network.l2.L2NetworkInventory; import org.zstack.kvm.KVMHostAsyncHttpCallMsg; import org.zstack.kvm.KVMHostAsyncHttpCallReply; +import org.zstack.utils.CollectionUtils; import org.zstack.utils.DebugUtils; import org.zstack.utils.network.IPv6Constants; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.zstack.core.Platform.err; import static org.zstack.core.Platform.operr; public class DhcpApply { CloudBus bus; - void apply(Map.Entry> e, String hostUuid, boolean rebuild, Completion completion) { - if (e == null) { + class InternalWorker { + String l3Uuid; + List info; + List info4; + List info6; + FlatDhcpAcquireDhcpServerIpReply.DhcpServerIpStruct dhcp4Server = null; + FlatDhcpAcquireDhcpServerIpReply.DhcpServerIpStruct dhcp6Server = null; + + void acquireDhcpServerIp(Map.Entry> entry, Completion completion) { + l3Uuid = entry.getKey(); + info = entry.getValue(); + info4 = info.stream().filter(i -> i.ipVersion == IPv6Constants.IPv4).collect(Collectors.toList()); + info6 = info.stream().filter(i -> i.ipVersion == IPv6Constants.IPv6).collect(Collectors.toList()); + DebugUtils.Assert(!info.isEmpty(), "how can info be empty???"); + + FlatDhcpAcquireDhcpServerIpMsg msg = new FlatDhcpAcquireDhcpServerIpMsg(); + msg.setL3NetworkUuid(l3Uuid); + bus.makeTargetServiceIdByResourceUuid(msg, FlatNetworkServiceConstant.SERVICE_ID, l3Uuid); + bus.send(msg, new CloudBusCallBack(completion) { + @Override + public void run(MessageReply reply) { + if (!reply.isSuccess()) { + completion.fail(reply.getError()); + return; + } + FlatDhcpAcquireDhcpServerIpReply r = reply.castReply(); + List dhcpServerIps = r.getDhcpServerList(); + if (dhcpServerIps == null || dhcpServerIps.isEmpty()) { + completion.fail(operr("could not get dhcp server ip for l3 network [uuid:%s]", msg.getL3NetworkUuid())); + return; + } + + for (FlatDhcpAcquireDhcpServerIpReply.DhcpServerIpStruct struct : dhcpServerIps) { + if (struct.getIpVersion() == IPv6Constants.IPv4) { + dhcp4Server = struct; + } else if (struct.getIpVersion() == IPv6Constants.IPv6) { + dhcp6Server = struct; + } + } + if (!info4.isEmpty() && dhcp4Server == null) { + completion.fail(operr("could not get dhcp4 server ip for l3 network [uuid:%s]", msg.getL3NetworkUuid())); + return; + } + if (!info6.isEmpty() && dhcp6Server == null) { + completion.fail(operr("could not get dhcp6 server ip for l3 network [uuid:%s]", msg.getL3NetworkUuid())); + return; + } + + completion.success(); + } + }); + } + + FlatDhcpBackend.PrepareDhcpCmd getPrepareDhcpCmd() { + FlatDhcpBackend.DhcpInfo i = info.get(0); + + FlatDhcpBackend.PrepareDhcpCmd cmd = new FlatDhcpBackend.PrepareDhcpCmd(); + cmd.bridgeName = i.bridgeName; + cmd.namespaceName = i.namespaceName; + if (dhcp4Server != null) { + cmd.dhcpServerIp = dhcp4Server.getIp(); + cmd.dhcpNetmask = dhcp4Server.getNetmask(); + } + if (dhcp6Server != null) { + cmd.dhcp6ServerIp = dhcp6Server.getIp(); + cmd.prefixLen = dhcp6Server.getIpr().getPrefixLen(); + cmd.addressMode = dhcp6Server.getIpr().getAddressMode(); + } + if (dhcp4Server != null && dhcp6Server == null) { + cmd.ipVersion = IPv6Constants.IPv4; + } else if (dhcp4Server == null && dhcp6Server != null) { + cmd.ipVersion = IPv6Constants.IPv6; + } else { + cmd.ipVersion = IPv6Constants.DUAL_STACK; + } + + return cmd; + } + + FlatDhcpBackend.ApplyDhcpCmd getApplyDhcpCmd(boolean rebuild) { + FlatDhcpBackend.ApplyDhcpCmd cmd = new FlatDhcpBackend.ApplyDhcpCmd(); + cmd.dhcp = info; + cmd.rebuild = rebuild; + cmd.l3NetworkUuid = l3Uuid; + + return cmd; + } + } + + void apply(Map> e, String hostUuid, boolean rebuild, Completion completion) { + if (e == null || e.isEmpty()) { completion.success(); return; } - final String l3Uuid = e.getKey(); - final List info = e.getValue(); - final List info4 = info.stream().filter(i -> i.ipVersion == IPv6Constants.IPv4).collect(Collectors.toList()); - final List info6 = info.stream().filter(i -> i.ipVersion == IPv6Constants.IPv6).collect(Collectors.toList()); - DebugUtils.Assert(!info.isEmpty(), "how can info be empty???"); - FlowChain chain = FlowChainBuilder.newShareFlowChain(); - chain.setName(String.format("flat-dhcp-provider-apply-dhcp-to-l3-network-%s", l3Uuid)); + chain.setName(String.format("flat-dhcp-provider-apply-dhcp-on-host-%s", hostUuid)); chain.then(new ShareFlow() { - FlatDhcpAcquireDhcpServerIpReply.DhcpServerIpStruct dhcp4Server = null; - FlatDhcpAcquireDhcpServerIpReply.DhcpServerIpStruct dhcp6Server = null; + List internalWorkers = new ArrayList<>(); @Override public void setup() { flow(new NoRollbackFlow() { - String __name__ = "get-dhcp-server-ip"; + String __name__ = "batch-acquire-dhcp-ip"; @Override - public void run(final FlowTrigger trigger, Map data) { - if (info.isEmpty()) { - trigger.next(); - return; - } - FlatDhcpAcquireDhcpServerIpMsg msg = new FlatDhcpAcquireDhcpServerIpMsg(); - msg.setL3NetworkUuid(l3Uuid); - bus.makeTargetServiceIdByResourceUuid(msg, FlatNetworkServiceConstant.SERVICE_ID, l3Uuid); - bus.send(msg, new CloudBusCallBack(trigger) { - @Override - public void run(MessageReply reply) { - if (!reply.isSuccess()) { - trigger.fail(reply.getError()); - return; - } - FlatDhcpAcquireDhcpServerIpReply r = reply.castReply(); - List dhcpServerIps = r.getDhcpServerList(); - if (dhcpServerIps == null || dhcpServerIps.isEmpty()) { - trigger.fail(operr("could not get dhcp server ip for l3 network [uuid:%s]", msg.getL3NetworkUuid())); - return; + public void run(FlowTrigger trigger, Map data) { + ErrorCodeList errorCodeList = new ErrorCodeList(); + new While<>(e.entrySet()).each((entry, c) -> { + InternalWorker internalWorker = new InternalWorker(); + internalWorker.acquireDhcpServerIp(entry, new Completion(c) { + @Override + public void success() { + internalWorkers.add(internalWorker); + c.done(); } - for (FlatDhcpAcquireDhcpServerIpReply.DhcpServerIpStruct struct : dhcpServerIps) { - if (struct.getIpVersion() == IPv6Constants.IPv4) { - dhcp4Server = struct; - } else if (struct.getIpVersion() == IPv6Constants.IPv6) { - dhcp6Server = struct; - } - } - if (!info4.isEmpty() && dhcp4Server == null) { - trigger.fail(operr("could not get dhcp4 server ip for l3 network [uuid:%s]", msg.getL3NetworkUuid())); - return; + @Override + public void fail(ErrorCode errorCode) { + errorCodeList.getCauses().add(errorCode); + c.allDone(); } - if (!info6.isEmpty() && dhcp6Server == null) { - trigger.fail(operr("could not get dhcp6 server ip for l3 network [uuid:%s]", msg.getL3NetworkUuid())); + }); + }).run(new NoErrorCompletion(trigger) { + @Override + public void done() { + if (!errorCodeList.getCauses().isEmpty()) { + trigger.fail(errorCodeList.getCauses().get(0)); return; } @@ -92,50 +171,55 @@ public void run(MessageReply reply) { }); flow(new NoRollbackFlow() { - String __name__ = "prepare-distributed-dhcp-server-on-host"; + String __name__ = "batch-prepare-distributed-dhcp-server-on-host"; @Override - public void run(final FlowTrigger trigger, Map data) { - FlatDhcpBackend.DhcpInfo i = info.get(0); - - FlatDhcpBackend.PrepareDhcpCmd cmd = new FlatDhcpBackend.PrepareDhcpCmd(); - cmd.bridgeName = i.bridgeName; - cmd.namespaceName = i.namespaceName; - if (dhcp4Server != null) { - cmd.dhcpServerIp = dhcp4Server.getIp(); - cmd.dhcpNetmask = dhcp4Server.getNetmask(); - } - if (dhcp6Server != null) { - cmd.dhcp6ServerIp = dhcp6Server.getIp(); - cmd.prefixLen = dhcp6Server.getIpr().getPrefixLen(); - cmd.addressMode = dhcp6Server.getIpr().getAddressMode(); - } - if (dhcp4Server != null && dhcp6Server == null) { - cmd.ipVersion = IPv6Constants.IPv4; - } else if (dhcp4Server == null && dhcp6Server != null) { - cmd.ipVersion = IPv6Constants.IPv6; - } else { - cmd.ipVersion = IPv6Constants.DUAL_STACK; + public void run(FlowTrigger trigger, Map data) { + List msgs = new ArrayList<>(); + List> subSets = Lists.partition(internalWorkers, 50); + for (List internalWorkers : subSets) { + List dhcpCmds = internalWorkers + .stream() + .map(InternalWorker::getPrepareDhcpCmd) + .collect(Collectors.toList()); + + FlatDhcpBackend.BatchPrepareDhcpCmd cmd = new FlatDhcpBackend.BatchPrepareDhcpCmd(); + cmd.dhcpInfos = dhcpCmds; + + KVMHostAsyncHttpCallMsg msg = new KVMHostAsyncHttpCallMsg(); + msg.setHostUuid(hostUuid); + msg.setNoStatusCheck(true); + msg.setCommand(cmd); + msg.setPath(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH); + bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, hostUuid); + msgs.add(msg); } - KVMHostAsyncHttpCallMsg msg = new KVMHostAsyncHttpCallMsg(); - msg.setHostUuid(hostUuid); - msg.setNoStatusCheck(true); - msg.setCommand(cmd); - msg.setPath(FlatDhcpBackend.PREPARE_DHCP_PATH); - bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, hostUuid); - bus.send(msg, new CloudBusCallBack(trigger) { + ErrorCodeList errorCodeList = new ErrorCodeList(); + new While<>(msgs).each((msg, c) -> bus.send(msg, new CloudBusCallBack(c) { @Override public void run(MessageReply reply) { if (!reply.isSuccess()) { - trigger.fail(reply.getError()); + errorCodeList.getCauses().add(reply.getError()); + c.allDone(); return; } KVMHostAsyncHttpCallReply ar = reply.castReply(); FlatDhcpBackend.PrepareDhcpRsp rsp = ar.toResponse(FlatDhcpBackend.PrepareDhcpRsp.class); if (!rsp.isSuccess()) { - trigger.fail(operr("operation error, because:%s", rsp.getError())); + errorCodeList.getCauses().add(operr("operation error, because:%s", rsp.getError())); + c.allDone(); + return; + } + + c.done(); + } + })).run(new NoErrorCompletion(trigger) { + @Override + public void done() { + if (!errorCodeList.getCauses().isEmpty()) { + trigger.fail(errorCodeList.getCauses().get(0)); return; } @@ -146,33 +230,55 @@ public void run(MessageReply reply) { }); flow(new NoRollbackFlow() { - String __name__ = "apply-dhcp"; + String __name__ = "batch-apply-dhcp"; @Override - public void run(final FlowTrigger trigger, Map data) { - FlatDhcpBackend.ApplyDhcpCmd cmd = new FlatDhcpBackend.ApplyDhcpCmd(); - cmd.dhcp = info; - cmd.rebuild = rebuild; - cmd.l3NetworkUuid = l3Uuid; - - KVMHostAsyncHttpCallMsg msg = new KVMHostAsyncHttpCallMsg(); - msg.setCommand(cmd); - msg.setHostUuid(hostUuid); - msg.setPath(FlatDhcpBackend.APPLY_DHCP_PATH); - msg.setNoStatusCheck(true); - bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, hostUuid); - bus.send(msg, new CloudBusCallBack(trigger) { + public void run(FlowTrigger trigger, Map data) { + List msgs = new ArrayList<>(); + List> subSets = Lists.partition(internalWorkers, 50); + for (List internalWorkers : subSets) { + List dhcpCmds = internalWorkers + .stream() + .map(worker -> worker.getApplyDhcpCmd(rebuild)) + .collect(Collectors.toList()); + + FlatDhcpBackend.BatchApplyDhcpCmd cmd = new FlatDhcpBackend.BatchApplyDhcpCmd(); + cmd.dhcpInfos = dhcpCmds; + + KVMHostAsyncHttpCallMsg msg = new KVMHostAsyncHttpCallMsg(); + msg.setHostUuid(hostUuid); + msg.setNoStatusCheck(true); + msg.setCommand(cmd); + msg.setPath(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH); + bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, hostUuid); + msgs.add(msg); + } + + ErrorCodeList errorCodeList = new ErrorCodeList(); + new While<>(msgs).each((msg, c) -> bus.send(msg, new CloudBusCallBack(c) { @Override public void run(MessageReply reply) { if (!reply.isSuccess()) { - trigger.fail(reply.getError()); + errorCodeList.getCauses().add(reply.getError()); + c.allDone(); return; } - KVMHostAsyncHttpCallReply r = reply.castReply(); - FlatDhcpBackend.ApplyDhcpRsp rsp = r.toResponse(FlatDhcpBackend.ApplyDhcpRsp.class); + KVMHostAsyncHttpCallReply ar = reply.castReply(); + FlatDhcpBackend.ApplyDhcpRsp rsp = ar.toResponse(FlatDhcpBackend.ApplyDhcpRsp.class); if (!rsp.isSuccess()) { - trigger.fail(operr("operation error, because:%s", rsp.getError())); + errorCodeList.getCauses().add(operr("operation error, because:%s", rsp.getError())); + c.allDone(); + return; + } + + c.done(); + } + })).run(new NoErrorCompletion(trigger) { + @Override + public void done() { + if (!errorCodeList.getCauses().isEmpty()) { + trigger.fail(errorCodeList.getCauses().get(0)); return; } @@ -181,20 +287,16 @@ public void run(MessageReply reply) { }); } }); - - done(new FlowDoneHandler(completion) { - @Override - public void handle(Map data) { - completion.success(); - } - }); - - error(new FlowErrorHandler(completion) { - @Override - public void handle(ErrorCode errCode, Map data) { - completion.fail(errCode); - } - }); + } + }).done(new FlowDoneHandler(completion) { + @Override + public void handle(Map data) { + completion.success(); + } + }).error(new FlowErrorHandler(completion) { + @Override + public void handle(ErrorCode errCode, Map data) { + completion.fail(errCode); } }).start(); } diff --git a/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java b/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java index 77a567f2415..89dcf19a93f 100755 --- a/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java +++ b/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java @@ -91,7 +91,9 @@ public class FlatDhcpBackend extends AbstractService implements NetworkServiceDh private DhcpExtension dhcpExtension; public static final String APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/apply"; + public static final String BATCH_APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/batchApply"; public static final String PREPARE_DHCP_PATH = "/flatnetworkprovider/dhcp/prepare"; + public static final String BATCH_PREPARE_DHCP_PATH = "/flatnetworkprovider/dhcp/batchPrepare"; public static final String RELEASE_DHCP_PATH = "/flatnetworkprovider/dhcp/release"; public static final String DHCP_CONNECT_PATH = "/flatnetworkprovider/dhcp/connect"; public static final String RESET_DEFAULT_GATEWAY_PATH = "/flatnetworkprovider/dhcp/resetDefaultGateway"; @@ -1297,6 +1299,10 @@ public static class ApplyDhcpCmd extends KVMAgentCommands.AgentCommand { public String l3NetworkUuid; } + public static class BatchApplyDhcpCmd extends KVMAgentCommands.AgentCommand { + public List dhcpInfos; + } + public static class ApplyDhcpRsp extends KVMAgentCommands.AgentResponse { } @@ -1318,6 +1324,10 @@ public static class PrepareDhcpCmd extends KVMAgentCommands.AgentCommand { public String addressMode; } + public static class BatchPrepareDhcpCmd extends KVMAgentCommands.AgentCommand { + public List dhcpInfos; + } + public static class PrepareDhcpRsp extends KVMAgentCommands.AgentResponse { } @@ -1474,43 +1484,29 @@ public DhcpInfo call(DhcpStruct arg) { } private void applyDhcpToHosts(List dhcpInfo, final String hostUuid, final boolean rebuild, final Completion completion) { - final Map> l3DhcpMap = new HashMap>(); + final Map> l3DhcpMap = new HashMap<>(); for (DhcpInfo d : dhcpInfo) { List lst = l3DhcpMap.get(d.l3NetworkUuid); if (lst == null) { - lst = new ArrayList(); + lst = new ArrayList<>(); l3DhcpMap.put(d.l3NetworkUuid, lst); } lst.add(d); } - ErrorCodeList errorCodeList = new ErrorCodeList(); - new While<>(l3DhcpMap.entrySet()).step((entry, c) -> { - DhcpApply dhcpApply = new DhcpApply(); - dhcpApply.bus = bus; - dhcpApply.apply(entry, hostUuid, rebuild, new Completion(c) { - @Override - public void success() { - c.done(); - } - - @Override - public void fail(ErrorCode errorCode) { - errorCodeList.getCauses().add(errorCode); - c.allDone(); - } - }); - }, 10).run(new NoErrorCompletion(completion) { + DhcpApply dhcpApply = new DhcpApply(); + dhcpApply.bus = bus; + dhcpApply.apply(l3DhcpMap, hostUuid, rebuild, new Completion(completion) { @Override - public void done() { - if (!errorCodeList.getCauses().isEmpty()) { - completion.fail(errorCodeList.getCauses().get(0)); - return; - } - + public void success() { completion.success(); } + + @Override + public void fail(ErrorCode errorCode) { + completion.fail(errorCode); + } }); } diff --git a/test/src/test/groovy/org/zstack/test/integration/db/deadlock/BatchCreateVmFailDeadlockCase.groovy b/test/src/test/groovy/org/zstack/test/integration/db/deadlock/BatchCreateVmFailDeadlockCase.groovy index 2d591e04c16..c3cd5d2f91e 100644 --- a/test/src/test/groovy/org/zstack/test/integration/db/deadlock/BatchCreateVmFailDeadlockCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/db/deadlock/BatchCreateVmFailDeadlockCase.groovy @@ -168,7 +168,7 @@ class BatchCreateVmFailDeadlockCase extends SubCase{ ImageInventory image = env.inventoryByName("iso") DiskOfferingInventory diskOffering = env.inventoryByName("diskOffering") - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) {FlatDhcpBackend.ApplyDhcpRsp rsp, HttpEntity e -> + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) {FlatDhcpBackend.ApplyDhcpRsp rsp, HttpEntity e -> Random r = new Random() int ran = r.nextInt(2) if (ran == 0) { diff --git a/test/src/test/groovy/org/zstack/test/integration/kvm/vm/VmVolumeGCCase.groovy b/test/src/test/groovy/org/zstack/test/integration/kvm/vm/VmVolumeGCCase.groovy index 1e445ecc713..1817076e8b2 100644 --- a/test/src/test/groovy/org/zstack/test/integration/kvm/vm/VmVolumeGCCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/kvm/vm/VmVolumeGCCase.groovy @@ -138,7 +138,7 @@ class VmVolumeGCCase extends SubCase { long expectVolumeNum = Q.New(VolumeVO.class).count() - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { FlatDhcpBackend.ApplyDhcpRsp rsp, HttpEntity e -> + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { FlatDhcpBackend.ApplyDhcpRsp rsp, HttpEntity e -> rsp.setError("case mock error") return rsp } diff --git a/test/src/test/groovy/org/zstack/test/integration/network/l3network/ipv6/IPv6DhcpCase.groovy b/test/src/test/groovy/org/zstack/test/integration/network/l3network/ipv6/IPv6DhcpCase.groovy index d6c706ea22a..31c22de8b2e 100644 --- a/test/src/test/groovy/org/zstack/test/integration/network/l3network/ipv6/IPv6DhcpCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/network/l3network/ipv6/IPv6DhcpCase.groovy @@ -65,16 +65,16 @@ class IPv6DhcpCase extends SubCase { ImageInventory image = env.inventoryByName("image1") List pcmds = new ArrayList<>() - env.afterSimulator(FlatDhcpBackend.PREPARE_DHCP_PATH) { rsp, HttpEntity e -> - FlatDhcpBackend.PrepareDhcpCmd pcmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.PrepareDhcpCmd.class) - pcmds.add(pcmd) + env.afterSimulator(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchPrepareDhcpCmd pcmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchPrepareDhcpCmd.class) + pcmds.addAll(pcmd.dhcpInfos) return rsp } List cmds = new ArrayList<>() - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> - FlatDhcpBackend.ApplyDhcpCmd cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.ApplyDhcpCmd.class) - cmds.add(cmd) + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) + cmds.addAll(cmd.dhcpInfos) return rsp } @@ -112,6 +112,10 @@ class IPv6DhcpCase extends SubCase { assert pcmd.addressMode == IPv6Constants.Stateful_DHCP assert pcmd.dhcpServerIp == null assert pcmd.dhcpNetmask == null + assert pcmd.dhcpNetmask == null + assert pcmd.dhcpNetmask == null + assert pcmd.dhcpNetmask == null + assert pcmd.dhcpNetmask == null /* simulate an old dual stack nic */ AllocateIpMsg msg = new AllocateIpMsg() diff --git a/test/src/test/groovy/org/zstack/test/integration/network/l3network/ipv6/IPv6MigrateVmCase.groovy b/test/src/test/groovy/org/zstack/test/integration/network/l3network/ipv6/IPv6MigrateVmCase.groovy index 696b42f9d53..6e8fea4b3eb 100644 --- a/test/src/test/groovy/org/zstack/test/integration/network/l3network/ipv6/IPv6MigrateVmCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/network/l3network/ipv6/IPv6MigrateVmCase.groovy @@ -60,9 +60,9 @@ class IPv6MigrateVmCase extends SubCase { hostUuid = h1.uuid } - List cmds = new ArrayList() - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> - FlatDhcpBackend.ApplyDhcpCmd cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.ApplyDhcpCmd.class) + List cmds = new ArrayList() + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) cmds.add(cmd) return rsp } diff --git a/test/src/test/groovy/org/zstack/test/integration/network/vxlanNetwork/OneVxlanNetworkLifeCycleCase.groovy b/test/src/test/groovy/org/zstack/test/integration/network/vxlanNetwork/OneVxlanNetworkLifeCycleCase.groovy index 27e80283bc3..cdebbd216e0 100644 --- a/test/src/test/groovy/org/zstack/test/integration/network/vxlanNetwork/OneVxlanNetworkLifeCycleCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/network/vxlanNetwork/OneVxlanNetworkLifeCycleCase.groovy @@ -313,8 +313,8 @@ class OneVxlanNetworkLifeCycleCase extends SubCase { } - env.simulator(FlatDhcpBackend.PREPARE_DHCP_PATH) { HttpEntity entity, EnvSpec spec -> - record.add(FlatDhcpBackend.PREPARE_DHCP_PATH) + env.simulator(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) { HttpEntity entity, EnvSpec spec -> + record.add(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) return new FlatDhcpBackend.PrepareDhcpRsp() } @@ -344,7 +344,7 @@ class OneVxlanNetworkLifeCycleCase extends SubCase { } assert record.get(2).equals(VxlanNetworkPoolConstant.VXLAN_KVM_REALIZE_L2VXLAN_NETWORK_PATH) - assert record.get(3).equals(FlatDhcpBackend.PREPARE_DHCP_PATH) + assert record.get(3).equals(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) poolinv = queryL2VxlanNetworkPool{}[0] diff --git a/test/src/test/groovy/org/zstack/test/integration/network/vxlanNetwork/VxlanLazyAttachCase.groovy b/test/src/test/groovy/org/zstack/test/integration/network/vxlanNetwork/VxlanLazyAttachCase.groovy index 95e03acc741..c982b98bf04 100644 --- a/test/src/test/groovy/org/zstack/test/integration/network/vxlanNetwork/VxlanLazyAttachCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/network/vxlanNetwork/VxlanLazyAttachCase.groovy @@ -195,11 +195,11 @@ class VxlanLazyAttachCase extends SubCase { delegate.l3NetworkUuids = [(env.specByName("l3-novlan") as L3NetworkSpec).inventory.uuid] } - FlatDhcpBackend.PrepareDhcpCmd dhcpCmd - env.afterSimulator(FlatDhcpBackend.PREPARE_DHCP_PATH) { rsp, HttpEntity e -> - FlatDhcpBackend.PrepareDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.PrepareDhcpCmd.class) + FlatDhcpBackend.BatchPrepareDhcpCmd dhcpCmd + env.afterSimulator(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchPrepareDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchPrepareDhcpCmd.class) - if (cmd.namespaceName.contains(l3.uuid)) { + if (cmd.dhcpInfos.get(0).namespaceName.contains(l3.uuid)) { dhcpCmd = cmd } @@ -224,7 +224,7 @@ class VxlanLazyAttachCase extends SubCase { delegate.vmInstanceUuid = vm.uuid } - assert dhcpCmd != null && dhcpCmd.bridgeName != null + assert dhcpCmd != null && dhcpCmd.dhcpInfos.get(0).bridgeName != null deleteL2Network { delegate.uuid = poolinv.getUuid() diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/ChangeNetworkSerivceCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/ChangeNetworkSerivceCase.groovy index adccd80b1da..9de14efdaff 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/ChangeNetworkSerivceCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/ChangeNetworkSerivceCase.groovy @@ -193,9 +193,12 @@ class ChangeNetworkSerivceCase extends SubCase{ } FlatDhcpBackend.ApplyDhcpCmd acmd - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - acmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchApplyDhcpCmd batchApplyDhcpCmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) + assert batchApplyDhcpCmd.dhcpInfos.size() == 1 + + acmd = batchApplyDhcpCmd.dhcpInfos.get(0) assert !acmd.dhcp.empty assert acmd.dhcp.stream().filter({dhcp -> nic.ip == dhcp.ip}) @@ -212,17 +215,17 @@ class ChangeNetworkSerivceCase extends SubCase{ return rsp } - FlatDhcpBackend.PrepareDhcpCmd cmd - env.afterSimulator(FlatDhcpBackend.PREPARE_DHCP_PATH) { rsp, HttpEntity e -> - cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.PrepareDhcpCmd.class) + FlatDhcpBackend.BatchPrepareDhcpCmd cmd + env.afterSimulator(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) { rsp, HttpEntity e -> + cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchPrepareDhcpCmd.class) Map tokens = FlatNetworkSystemTags.L3_NETWORK_DHCP_IP.getTokensByResourceUuid(l3.getUuid()); String dhcpServerIp = tokens.get(FlatNetworkSystemTags.L3_NETWORK_DHCP_IP_TOKEN); String dhcpServerIpUuid = tokens.get(FlatNetworkSystemTags.L3_NETWORK_DHCP_IP_UUID_TOKEN) UsedIpVO ipvo = dbf.findByUuid(dhcpServerIpUuid, UsedIpVO.class) assert ipvo.getIp() == dhcpServerIp - assert dhcpServerIp == cmd.dhcpServerIp - assert ipvo.getNetmask() == cmd.dhcpNetmask + assert dhcpServerIp == cmd.dhcpInfos.get(0).dhcpServerIp + assert ipvo.getNetmask() == cmd.dhcpInfos.get(0).dhcpNetmask return rsp } @@ -244,8 +247,8 @@ class ChangeNetworkSerivceCase extends SubCase{ assert null != acmd assert null != cmd FlatDhcpBackend.DhcpInfo dhcp = acmd.dhcp[0] - assert dhcp.bridgeName == cmd.bridgeName - assert dhcp.namespaceName == cmd.namespaceName + assert dhcp.bridgeName == cmd.dhcpInfos.get(0).bridgeName + assert dhcp.namespaceName == cmd.dhcpInfos.get(0).namespaceName } @Override diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/CheckFlatDhcpWorkCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/CheckFlatDhcpWorkCase.groovy index e69d393320d..65bbe86d577 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/CheckFlatDhcpWorkCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/CheckFlatDhcpWorkCase.groovy @@ -162,15 +162,16 @@ class CheckFlatDhcpWorkCase extends SubCase{ assert n.getDeviceId() == 1 List dhcpInfoList = new ArrayList() - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> - FlatDhcpBackend.ApplyDhcpCmd cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.ApplyDhcpCmd.class) + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) assert null != cmd - dhcpInfoList.addAll(cmd.dhcp) + cmd.dhcpInfos.each { info -> + dhcpInfoList.addAll(info.dhcp) + } return rsp } goOn: - for (FlatDhcpBackend.ApplyDhcpCmd cmd : dhcpInfoList) { - FlatDhcpBackend.DhcpInfo info = cmd.dhcp.get(0) + for (FlatDhcpBackend.DhcpInfo info : dhcpInfoList) { if (!info.isDefaultL3Network && info.hostname != null) { Assert.fail(String.format("wrong hostname set. %s", JSONObjectUtil.toJsonString(info))) } diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/GetDhcpInfoForConnectedKvmHostCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/GetDhcpInfoForConnectedKvmHostCase.groovy index 229a237a46d..39f5a521fae 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/GetDhcpInfoForConnectedKvmHostCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/GetDhcpInfoForConnectedKvmHostCase.groovy @@ -136,9 +136,9 @@ class GetDhcpInfoForConnectedKvmHostCase extends SubCase { creator.recreate = true creator.create() - FlatDhcpBackend.ApplyDhcpCmd cmd = null - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + FlatDhcpBackend.BatchApplyDhcpCmd cmd = null + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) return rsp } @@ -146,9 +146,10 @@ class GetDhcpInfoForConnectedKvmHostCase extends SubCase { uuid = host.uuid } assert null != cmd - assert 1 == cmd.dhcp.size() - assert cmd.dhcp.get(0).hostname == VmSystemTags.HOSTNAME.getTokenByResourceUuid(vm.uuid, VmSystemTags.HOSTNAME_TOKEN) - assert cmd.dhcp.get(0).vmMultiGateway == true + assert 1 == cmd.dhcpInfos.size() + assert 1 == cmd.dhcpInfos.get(0).dhcp.size() + assert cmd.dhcpInfos.get(0).dhcp.get(0).hostname == VmSystemTags.HOSTNAME.getTokenByResourceUuid(vm.uuid, VmSystemTags.HOSTNAME_TOKEN) + assert cmd.dhcpInfos.get(0).dhcp.get(0).vmMultiGateway GetVmHostnameResult result = getVmHostname { uuid = vm.uuid @@ -166,8 +167,9 @@ class GetDhcpInfoForConnectedKvmHostCase extends SubCase { uuid = host.uuid } assert null != cmd - assert 1 == cmd.dhcp.size() - assert cmd.dhcp.get(0).hostname == vm.vmNics[0].ip.replaceAll("\\.", "-") + assert 1 == cmd.dhcpInfos.size() + assert 1 == cmd.dhcpInfos.get(0).dhcp.size() + assert cmd.dhcpInfos.get(0).dhcp.get(0).hostname == vm.vmNics[0].ip.replaceAll("\\.", "-") } @Override diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/OneVmDhcpCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/OneVmDhcpCase.groovy index 009246189c1..27348ad394d 100755 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/OneVmDhcpCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/OneVmDhcpCase.groovy @@ -158,10 +158,10 @@ class OneVmDhcpCase extends SubCase { } void testSetDhcpMtu() { - FlatDhcpBackend.ApplyDhcpCmd cmd = null + FlatDhcpBackend.BatchApplyDhcpCmd cmd = null - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) return rsp } @@ -199,7 +199,7 @@ class OneVmDhcpCase extends SubCase { uuid = vm.uuid } - DhcpInfo info = cmd.dhcp[0] + DhcpInfo info = cmd.dhcpInfos.get(0).dhcp[0] assert info.mtu.equals(Integer.valueOf(NetworkServiceGlobalConfig.DHCP_MTU_NO_VLAN.value())) assert info.mtu.equals(Integer.valueOf(NetworkServiceGlobalConfig.DHCP_MTU_VLAN.value())) assert info.mtu.equals(Integer.valueOf(NetworkServiceGlobalConfig.DHCP_MTU_VXLAN.value())) @@ -213,7 +213,7 @@ class OneVmDhcpCase extends SubCase { uuid = vm.uuid } - info = cmd.dhcp[0] + info = cmd.dhcpInfos.get(0).dhcp[0] assert info.mtu.equals(1450) GetL3NetworkMtuResult r = getL3NetworkMtu { @@ -228,21 +228,22 @@ class OneVmDhcpCase extends SubCase { reconnectHost { delegate.uuid = vm.getHostUuid() } - info = cmd.dhcp[0] + info = cmd.dhcpInfos.get(0).dhcp[0] assert info.mtu.equals(1400) } private void testSetDhcpWhenVmOperations(Closure vmOperation) { FlatDhcpBackend.ApplyDhcpCmd cmd = null - - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchApplyDhcpCmd batchApplyDhcpCmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) + cmd = batchApplyDhcpCmd.dhcpInfos.get(0) return rsp } FlatDhcpBackend.PrepareDhcpCmd pcmd = null - env.afterSimulator(FlatDhcpBackend.PREPARE_DHCP_PATH) { rsp, HttpEntity e -> - pcmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.PrepareDhcpCmd.class) + env.afterSimulator(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchPrepareDhcpCmd batchPrepareDhcpCmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchPrepareDhcpCmd.class) + pcmd = batchPrepareDhcpCmd.dhcpInfos.get(0) return rsp } diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy index a080470c78e..6e7d69c10eb 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy @@ -160,9 +160,9 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { } def called = 0 - FlatDhcpBackend.ApplyDhcpCmd cmd = null - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> - cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.ApplyDhcpCmd.class) + FlatDhcpBackend.BatchApplyDhcpCmd cmd = null + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> + cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) called += 1 return rsp } @@ -174,13 +174,14 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { uuid = vm.uuid } assert called == 1 - assert cmd.dhcp.get(0).hostname == "test-name" + assert cmd.dhcpInfos.size() == 1 + assert cmd.dhcpInfos.get(0).dhcp.get(0).hostname == "test-name" called = 0 cmd = null reconnectHost { uuid=host.uuid } assert called == 1 - assert cmd.dhcp.get(0).hostname == "test-name" + assert cmd.dhcpInfos.get(0).dhcp.get(0).hostname == "test-name" def vr = queryVirtualRouterVm {}[0] as VirtualRouterVmInventory assert vr != null diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dns/FlatAddDnsCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dns/FlatAddDnsCase.groovy index 029f6a3b187..17a22d73f1c 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dns/FlatAddDnsCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dns/FlatAddDnsCase.groovy @@ -40,9 +40,9 @@ class FlatAddDnsCase extends SubCase { } void testAddDns() { - FlatDhcpBackend.ApplyDhcpCmd cmd = null - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + FlatDhcpBackend.BatchApplyDhcpCmd cmd = null + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) return rsp } @@ -52,8 +52,8 @@ class FlatAddDnsCase extends SubCase { } assert cmd != null - assert cmd.dhcp.get(0).dns.size() == 1 - assert cmd.dhcp.get(0).dns.get(0) == dns1 + assert cmd.dhcpInfos.get(0).dhcp.get(0).dns.size() == 1 + assert cmd.dhcpInfos.get(0).dhcp.get(0).dns.get(0) == dns1 def l2 = env.inventoryByName("l2") as L2NetworkInventory diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dns/FlatDnsOrderCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dns/FlatDnsOrderCase.groovy index a0d99b0702b..5be6e30fbfc 100755 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dns/FlatDnsOrderCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dns/FlatDnsOrderCase.groovy @@ -69,8 +69,9 @@ class FlatDnsOrderCase extends SubCase { assert l3.dns.get(2) == '1.1.1.3' FlatDhcpBackend.ApplyDhcpCmd cmd = null - env.simulator(FlatDhcpBackend.APPLY_DHCP_PATH){HttpEntity e,EnvSpec spec -> - cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + env.simulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH){HttpEntity e,EnvSpec spec -> + FlatDhcpBackend.BatchApplyDhcpCmd batchApplyDhcpCmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) + cmd = batchApplyDhcpCmd.dhcpInfos.get(0) def rsp = new FlatDhcpBackend.ApplyDhcpRsp() rsp.success = true return rsp diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/hostRoute/FlatAddHostRouteCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/hostRoute/FlatAddHostRouteCase.groovy index e39f8b004f6..cde34f7fbf5 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/hostRoute/FlatAddHostRouteCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/hostRoute/FlatAddHostRouteCase.groovy @@ -44,9 +44,9 @@ class FlatAddHostRouteCase extends SubCase { } void testAddDns() { - List cmds = new ArrayList<>() - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - FlatDhcpBackend.ApplyDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + List cmds = new ArrayList<>() + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) cmds.add(cmd) return rsp } @@ -65,9 +65,9 @@ class FlatAddHostRouteCase extends SubCase { } assert cmds.size() == 3 - for (FlatDhcpBackend.ApplyDhcpCmd cmd : cmds) { - assert cmd.l3NetworkUuid == l3.uuid - for (FlatDhcpBackend.DhcpInfo dinfo: cmd.dhcp) { + for (FlatDhcpBackend.BatchApplyDhcpCmd cmd : cmds) { + assert cmd.dhcpInfos.get(0).l3NetworkUuid == l3.uuid + for (FlatDhcpBackend.DhcpInfo dinfo: cmd.dhcpInfos.get(0).dhcp) { for (FlatDhcpBackend.HostRouteInfo rinfo : dinfo.hostRoutes) { assert (rinfo.prefix == "10.1.1.1/32" || rinfo.prefix == NetworkServiceConstants.METADATA_HOST_PREFIX) if (rinfo.prefix == "10.1.1.1/32") { @@ -77,9 +77,9 @@ class FlatAddHostRouteCase extends SubCase { } } - List cmds1 = new ArrayList<>() - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - FlatDhcpBackend.ApplyDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + List cmds1 = new ArrayList<>() + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) cmds1.add(cmd) return rsp } @@ -91,9 +91,9 @@ class FlatAddHostRouteCase extends SubCase { assert l3Inv.getHostRoute().size() == 3 assert cmds1.size() == 3 - for (FlatDhcpBackend.ApplyDhcpCmd cmd : cmds1) { - assert cmd.l3NetworkUuid == l3.uuid - for (FlatDhcpBackend.DhcpInfo dinfo: cmd.dhcp) { + for (FlatDhcpBackend.BatchApplyDhcpCmd cmd : cmds1) { + assert cmd.dhcpInfos.get(0).dhcp.l3NetworkUuid.get(0) == l3.uuid + for (FlatDhcpBackend.DhcpInfo dinfo: cmd.dhcpInfos.get(0).dhcp) { assert dinfo.hostRoutes.size() == 3 for (FlatDhcpBackend.HostRouteInfo rinfo : dinfo.hostRoutes) { assert (rinfo.prefix == "10.1.1.1/32" || rinfo.prefix == "10.0.1.0/24" @@ -108,9 +108,9 @@ class FlatAddHostRouteCase extends SubCase { } } - List cmds2 = new ArrayList<>() - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - FlatDhcpBackend.ApplyDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + List cmds2 = new ArrayList<>() + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) cmds2.add(cmd) return rsp } @@ -127,9 +127,9 @@ class FlatAddHostRouteCase extends SubCase { } assert cmds2.size() == 3 - for (FlatDhcpBackend.ApplyDhcpCmd cmd : cmds2) { - assert cmd.l3NetworkUuid == l3.uuid - for (FlatDhcpBackend.DhcpInfo dinfo: cmd.dhcp) { + for (FlatDhcpBackend.BatchApplyDhcpCmd cmd : cmds2) { + assert cmd.dhcpInfos.get(0).dhcp.l3NetworkUuid.get(0) == l3.uuid + for (FlatDhcpBackend.DhcpInfo dinfo: cmd.dhcpInfos.get(0).dhcp) { for (FlatDhcpBackend.HostRouteInfo rinfo : dinfo.hostRoutes) { assert (rinfo.prefix == "10.0.1.0/24" || rinfo.prefix == NetworkServiceConstants.METADATA_HOST_PREFIX) if (rinfo.prefix == "10.0.1.0/24") { @@ -139,9 +139,9 @@ class FlatAddHostRouteCase extends SubCase { } } - List cmds3 = new ArrayList<>() - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - FlatDhcpBackend.ApplyDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) + List cmds3 = new ArrayList<>() + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) cmds3.add(cmd) return rsp } @@ -152,9 +152,9 @@ class FlatAddHostRouteCase extends SubCase { assert l3Inv.getHostRoute().size() == 1 assert cmds3.size() == 3 - for (FlatDhcpBackend.ApplyDhcpCmd cmd : cmds3) { - assert cmd.l3NetworkUuid == l3.uuid - for (FlatDhcpBackend.DhcpInfo dinfo: cmd.dhcp) { + for (FlatDhcpBackend.BatchApplyDhcpCmd cmd : cmds3) { + assert cmd.dhcpInfos.get(0).dhcp.l3NetworkUuid.get(0) == l3.uuid + for (FlatDhcpBackend.DhcpInfo dinfo: cmd.dhcpInfos.get(0).dhcp) { assert dinfo.hostRoutes.size() == 1 assert dinfo.hostRoutes.get(0).prefix == NetworkServiceConstants.METADATA_HOST_PREFIX } diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/userdata/NoDhcpServiceCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/userdata/NoDhcpServiceCase.groovy index 2fc11fab93a..2598b387d31 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/userdata/NoDhcpServiceCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/userdata/NoDhcpServiceCase.groovy @@ -128,13 +128,13 @@ class NoDhcpServiceCase extends SubCase { FlatUserdataBackend.ApplyUserdataCmd cmd = null def prepareDhcp = false - env.afterSimulator(FlatDhcpBackend.PREPARE_DHCP_PATH) { rsp, HttpEntity e -> + env.afterSimulator(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) { rsp, HttpEntity e -> prepareDhcp = true return rsp } def applyDhcp = false - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> applyDhcp = true return rsp } diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/virtualrouter/VirtualrouterMultiNicCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/virtualrouter/VirtualrouterMultiNicCase.groovy index 912bd694d6f..40be4ed03bf 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/virtualrouter/VirtualrouterMultiNicCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/virtualrouter/VirtualrouterMultiNicCase.groovy @@ -208,9 +208,10 @@ class VirtualrouterMultiNicCase extends SubCase { } FlatDhcpBackend.ApplyDhcpCmd acmd - env.afterSimulator(FlatDhcpBackend.APPLY_DHCP_PATH) { rsp, HttpEntity e -> - acmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.ApplyDhcpCmd.class) - + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e -> + FlatDhcpBackend.BatchApplyDhcpCmd bcmd = JSONObjectUtil.toObject(e.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) + assert bcmd.dhcpInfos.size() == 1 + acmd = bcmd.dhcpInfos.get(0) assert !acmd.dhcp.isEmpty() if (acmd.dhcp.stream().filter({dhcp -> dhcp.ip.equals(earliestNic.ip)}).count()>0) { assert acmd.dhcp.stream().filter({ dhcp -> dhcp.isDefaultL3Network }).count() == 1 diff --git a/testlib/src/main/java/org/zstack/testlib/FlatnetworkSimualtor.groovy b/testlib/src/main/java/org/zstack/testlib/FlatnetworkSimualtor.groovy index 6db4964085f..79d2043f53b 100755 --- a/testlib/src/main/java/org/zstack/testlib/FlatnetworkSimualtor.groovy +++ b/testlib/src/main/java/org/zstack/testlib/FlatnetworkSimualtor.groovy @@ -17,10 +17,18 @@ class FlatnetworkSimualtor implements Simulator { return new FlatDhcpBackend.ApplyDhcpRsp() } + spec.simulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { + return new FlatDhcpBackend.ApplyDhcpRsp() + } + spec.simulator(FlatDhcpBackend.PREPARE_DHCP_PATH) { return new FlatDhcpBackend.PrepareDhcpRsp() } + spec.simulator(FlatDhcpBackend.BATCH_PREPARE_DHCP_PATH) { + return new FlatDhcpBackend.PrepareDhcpRsp() + } + spec.simulator(FlatDhcpBackend.RESET_DEFAULT_GATEWAY_PATH) { return new FlatDhcpBackend.ResetDefaultGatewayRsp() } From 37efe64e1740a1a29f9d0c12aaa34f3bba3871d2 Mon Sep 17 00:00:00 2001 From: J M Date: Fri, 6 Feb 2026 23:12:25 +0800 Subject: [PATCH 2/2] [thread]: support coalesce queue for batch dhcp Resolves: TIC-4930 Change-Id: I737574686f6e6b69726e79736c6279616e66706b --- .../org/zstack/compute/zone/AbstractZone.java | 1 - .../core/thread/AbstractCoalesceQueue.java | 189 +++++++ .../org/zstack/core/thread/CoalesceQueue.java | 61 +++ .../core/thread/ReturnValueCoalesceQueue.java | 43 ++ .../network/service/flat/FlatDhcpBackend.java | 56 +- .../core/chaintask/CoalesceQueueCase.groovy | 479 ++++++++++++++++++ ...ifyPrepareDhcpWhenReconnectHostCase.groovy | 143 ++++++ testlib/pom.xml | 33 ++ .../org/zstack/testlib/FailCoalesceQueue.java | 21 + 9 files changed, 1022 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java create mode 100644 core/src/main/java/org/zstack/core/thread/CoalesceQueue.java create mode 100644 core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java create mode 100644 test/src/test/groovy/org/zstack/test/integration/core/chaintask/CoalesceQueueCase.groovy create mode 100644 testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java diff --git a/compute/src/main/java/org/zstack/compute/zone/AbstractZone.java b/compute/src/main/java/org/zstack/compute/zone/AbstractZone.java index dd5e7f1c2fc..e327cd7f367 100755 --- a/compute/src/main/java/org/zstack/compute/zone/AbstractZone.java +++ b/compute/src/main/java/org/zstack/compute/zone/AbstractZone.java @@ -8,7 +8,6 @@ import org.zstack.header.zone.ZoneStateEvent; abstract class AbstractZone implements Zone { - private static DatabaseFacade dbf = Platform.getComponentLoader().getComponent(DatabaseFacade.class); private final static StateMachine stateMachine; static { diff --git a/core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java b/core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java new file mode 100644 index 00000000000..2706e7945eb --- /dev/null +++ b/core/src/main/java/org/zstack/core/thread/AbstractCoalesceQueue.java @@ -0,0 +1,189 @@ +package org.zstack.core.thread; + +import org.springframework.beans.factory.annotation.Autowire; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Configurable; +import org.zstack.header.core.AbstractCompletion; +import org.zstack.header.core.Completion; +import org.zstack.header.core.ReturnValueCompletion; +import org.zstack.header.errorcode.ErrorCode; +import org.zstack.utils.Utils; +import org.zstack.utils.logging.CLogger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Base implementation for coalesce queues. + * + * @param Request Item Type + * @param Batch Execution Result Type + * @param Single Request Result Type + */ +@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE) +public abstract class AbstractCoalesceQueue { + private static final CLogger logger = Utils.getLogger(AbstractCoalesceQueue.class); + + @Autowired + private ThreadFacade thdf; + + private final ConcurrentHashMap signatureQueues = new ConcurrentHashMap<>(); + + protected class PendingRequest { + final T item; + final AbstractCompletion completion; + + PendingRequest(T item, AbstractCompletion completion) { + this.item = item; + this.completion = completion; + } + + @SuppressWarnings("unchecked") + void notifySuccess(V result) { + if (completion == null) { + return; + } + + if (completion instanceof ReturnValueCompletion) { + ((ReturnValueCompletion) completion).success(result); + } else if (completion instanceof Completion) { + ((Completion) completion).success(); + } + } + + void notifyFailure(ErrorCode errorCode) { + if (completion == null) { + return; + } + + if (completion instanceof ReturnValueCompletion) { + ((ReturnValueCompletion) completion).fail(errorCode); + } else if (completion instanceof Completion) { + ((Completion) completion).fail(errorCode); + } + } + } + + private class SignatureQueue { + final String syncSignature; + List pendingList = Collections.synchronizedList(new ArrayList<>()); + + SignatureQueue(String syncSignature) { + this.syncSignature = syncSignature; + } + + synchronized List takeAll() { + List toProcess = pendingList; + pendingList = Collections.synchronizedList(new ArrayList<>()); + return toProcess; + } + + synchronized void add(PendingRequest request) { + pendingList.add(request); + } + + synchronized boolean isEmpty() { + return pendingList.isEmpty(); + } + } + + protected abstract String getName(); + + // Changed to take AbstractCompletion, subclasses cast it to specific type + protected abstract void executeBatch(List items, AbstractCompletion completion); + + protected abstract AbstractCompletion createBatchCompletion(String syncSignature, List requests, SyncTaskChain chain); + + protected abstract V calculateResult(T item, R batchResult); + + protected final void handleSuccess(String syncSignature, List requests, R batchResult, SyncTaskChain chain) { + for (PendingRequest req : requests) { + try { + V singleResult = calculateResult(req.item, batchResult); + req.notifySuccess(singleResult); + } catch (Throwable t) { + logger.warn(String.format("[%s] failed to calculate result for item %s", getName(), req.item), t); + req.notifyFailure(org.zstack.core.Platform.operr("failed to calculate result: %s", t.getMessage())); + } + } + cleanup(syncSignature); + chain.next(); + } + + protected final void handleFailure(String syncSignature, List requests, ErrorCode errorCode, SyncTaskChain chain) { + for (PendingRequest req : requests) { + req.notifyFailure(errorCode); + } + cleanup(syncSignature); + chain.next(); + } + + void setThreadFacade(ThreadFacade thdf) { + this.thdf = thdf; + } + + protected final void submitRequest(String syncSignature, T item, AbstractCompletion completion) { + doSubmit(syncSignature, new PendingRequest(item, completion)); + } + + private void doSubmit(String syncSignature, PendingRequest request) { + SignatureQueue queue = signatureQueues.computeIfAbsent(syncSignature, SignatureQueue::new); + queue.add(request); + + thdf.chainSubmit(new ChainTask(null) { + @Override + public String getSyncSignature() { + return String.format("coalesce-queue-%s-%s", AbstractCoalesceQueue.this.getName(), syncSignature); + } + + @Override + public void run(SyncTaskChain chain) { + List requests = queue.takeAll(); + + if (requests.isEmpty()) { + chain.next(); + return; + } + + String name = getName(); + logger.debug(String.format("[%s] coalescing %d requests for signature[%s]", + name, requests.size(), syncSignature)); + + + // Create the specific completion type (Completion or ReturnValueCompletion) + AbstractCompletion batchCompletion = createBatchCompletion(syncSignature, requests, chain); + + // Execute batch with the direct completion object + List items = requests.stream().map(req -> req.item).collect(Collectors.toList()); + executeBatch(items, batchCompletion); + } + + @Override + public String getName() { + return String.format("%s-coalesced-batch-%s", AbstractCoalesceQueue.this.getName(), syncSignature); + } + + @Override + protected int getSyncLevel() { + return 1; + } + }); + } + + private void cleanup(String syncSignature) { + signatureQueues.computeIfPresent(syncSignature, (k, queue) -> { + if (queue.isEmpty()) { + return null; + } + return queue; + }); + } + + // For testing + int getActiveQueueCount() { + return signatureQueues.size(); + } +} diff --git a/core/src/main/java/org/zstack/core/thread/CoalesceQueue.java b/core/src/main/java/org/zstack/core/thread/CoalesceQueue.java new file mode 100644 index 00000000000..6fe18d189bb --- /dev/null +++ b/core/src/main/java/org/zstack/core/thread/CoalesceQueue.java @@ -0,0 +1,61 @@ +package org.zstack.core.thread; + +import org.zstack.header.core.AbstractCompletion; +import org.zstack.header.core.Completion; +import org.zstack.header.errorcode.ErrorCode; + +import java.util.List; + +/** + * A coalesce queue for requests that do NOT expect a return value. + * + * @param Request Item Type + */ +public abstract class CoalesceQueue extends AbstractCoalesceQueue { + + /** + * Submit a request. + * + * @param syncSignature the sync signature; requests with the same signature will be coalesced + * @param item the request item + * @param completion the completion callback + */ + public void submit(String syncSignature, T item, Completion completion) { + submitRequest(syncSignature, item, completion); + } + + /** + * Executes the batched requests. + *

+ * Subclasses must implement this method to process the coalesced items. + * + * @param items the list of coalesced request items + * @param completion the completion callback for the batch execution + */ + protected abstract void executeBatch(List items, Completion completion); + + @Override + protected final void executeBatch(List items, AbstractCompletion batchCompletion) { + executeBatch(items, (Completion) batchCompletion); + } + + @Override + protected final AbstractCompletion createBatchCompletion(String syncSignature, List requests, SyncTaskChain chain) { + return new Completion(chain) { + @Override + public void success() { + handleSuccess(syncSignature, requests, null, chain); + } + + @Override + public void fail(ErrorCode errorCode) { + handleFailure(syncSignature, requests, errorCode, chain); + } + }; + } + + @Override + protected final Void calculateResult(T item, Void batchResult) { + return null; + } +} diff --git a/core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java b/core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java new file mode 100644 index 00000000000..346824647fd --- /dev/null +++ b/core/src/main/java/org/zstack/core/thread/ReturnValueCoalesceQueue.java @@ -0,0 +1,43 @@ +package org.zstack.core.thread; + +import org.zstack.header.core.AbstractCompletion; +import org.zstack.header.core.ReturnValueCompletion; +import org.zstack.header.errorcode.ErrorCode; + +import java.util.List; + +/** + * A coalesce queue for requests that expect a return value. + * + * @param Request Item Type + * @param Batch Execution Result Type + * @param Single Request Result Type + */ +public abstract class ReturnValueCoalesceQueue extends AbstractCoalesceQueue { + + public void submit(String syncSignature, T item, ReturnValueCompletion completion) { + submitRequest(syncSignature, item, completion); + } + + protected abstract void executeBatch(List items, ReturnValueCompletion completion); + + @Override + protected final void executeBatch(List items, AbstractCompletion batchCompletion) { + executeBatch(items, (ReturnValueCompletion) batchCompletion); + } + + @Override + protected final AbstractCompletion createBatchCompletion(String syncSignature, List requests, SyncTaskChain chain) { + return new ReturnValueCompletion(null) { + @Override + public void success(R batchResult) { + handleSuccess(syncSignature, requests, batchResult, chain); + } + + @Override + public void fail(ErrorCode errorCode) { + handleFailure(syncSignature, requests, errorCode, chain); + } + }; + } +} diff --git a/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java b/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java index 89dcf19a93f..315ab17a6b5 100755 --- a/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java +++ b/plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java @@ -13,8 +13,7 @@ import org.zstack.core.db.*; import org.zstack.core.defer.Defer; import org.zstack.core.defer.Deferred; -import org.zstack.core.thread.SyncTask; -import org.zstack.core.thread.ThreadFacade; +import org.zstack.core.thread.*; import org.zstack.header.AbstractService; import org.zstack.header.apimediator.ApiMessageInterceptionException; import org.zstack.header.apimediator.GlobalApiMessageInterceptor; @@ -90,6 +89,54 @@ public class FlatDhcpBackend extends AbstractService implements NetworkServiceDh @Autowired private DhcpExtension dhcpExtension; + /** + * Request wrapper for DHCP apply coalescing. + */ + private static class DhcpApplyRequest { + final String hostUuid; + final List dhcpInfos; + final boolean rebuild; + + DhcpApplyRequest(String hostUuid, List dhcpInfos, boolean rebuild) { + this.hostUuid = hostUuid; + this.dhcpInfos = dhcpInfos; + this.rebuild = rebuild; + } + } + + private class DhcpApplyQueue extends CoalesceQueue { + @Override + protected String getName() { + return "flat-dhcp-apply"; + } + + @Override + protected void executeBatch(List requests, Completion completion) { + if (requests.isEmpty()) { + completion.success(); + return; + } + + // All requests in the same batch have the same hostUuid + String hostUuid = requests.get(0).hostUuid; + + // Merge all DhcpInfo from all requests, grouped by L3 network + // TODO: unify DHCP apply logic and switch to merged/batch flow everywhere + boolean anyRebuild = false; + List mergedInfos = new ArrayList<>(); + for (DhcpApplyRequest req : requests) { + anyRebuild = anyRebuild || req.rebuild; + mergedInfos.addAll(req.dhcpInfos); + } + + logger.debug(String.format("Coalesced %d DHCP apply requests for host[uuid:%s]", requests.size(), hostUuid)); + + applyDhcpToHosts(mergedInfos, hostUuid, anyRebuild, completion); + } + } + + private final DhcpApplyQueue dhcpApplyCoalesceQueue = new DhcpApplyQueue(); + public static final String APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/apply"; public static final String BATCH_APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/batchApply"; public static final String PREPARE_DHCP_PATH = "/flatnetworkprovider/dhcp/prepare"; @@ -1517,7 +1564,10 @@ public void applyDhcpService(List dhcpStructList, VmInstanceSpec spe return; } - applyDhcpToHosts(toDhcpInfo(dhcpStructList), spec.getDestHost().getUuid(), false, completion); + String hostUuid = spec.getDestHost().getUuid(); + DhcpApplyRequest request = new DhcpApplyRequest(hostUuid, toDhcpInfo(dhcpStructList), false); + // Use coalesce queue: requests to the same host will be merged into a single batch + dhcpApplyCoalesceQueue.submit(hostUuid, request, completion); } private void releaseDhcpService(List info, final String vmUuid, final String hostUuid, final NoErrorCompletion completion) { diff --git a/test/src/test/groovy/org/zstack/test/integration/core/chaintask/CoalesceQueueCase.groovy b/test/src/test/groovy/org/zstack/test/integration/core/chaintask/CoalesceQueueCase.groovy new file mode 100644 index 00000000000..dcb797e6d2e --- /dev/null +++ b/test/src/test/groovy/org/zstack/test/integration/core/chaintask/CoalesceQueueCase.groovy @@ -0,0 +1,479 @@ +package org.zstack.test.integration.core.chaintask + +import org.zstack.core.thread.CoalesceQueue +import org.zstack.core.thread.ReturnValueCoalesceQueue +import org.zstack.header.core.Completion +import org.zstack.header.core.ReturnValueCompletion +import org.zstack.header.errorcode.ErrorCode +import org.zstack.testlib.FailCoalesceQueue +import org.zstack.testlib.SubCase + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + +class CoalesceQueueCase extends SubCase { + @Override + void clean() { + } + + @Override + void setup() { + } + + @Override + void environment() { + } + + @Override + void test() { + testCoalesceMultipleRequests() + testDifferentSignaturesNotCoalesced() + testBatchFailureNotifiesAllRequests() + testBatchThrowExceptionNotifiesAllRequests() + testReturnValueCompletion() + testResultCalculationFailure() + testSequentialBatches() + testHighVolumeNoLossAcrossBatches() + } + + void testCoalesceMultipleRequests() { + def requestCount = 10 + def completionLatch = new CountDownLatch(requestCount) + def batchExecutionCount = new AtomicInteger(0) + def processedItems = Collections.synchronizedList(new ArrayList()) + def completedTokens = Collections.synchronizedSet(new LinkedHashSet()) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-coalesce" + } + + @Override + protected void executeBatch(List items, Completion completion) { + batchExecutionCount.incrementAndGet() + processedItems.addAll(items) + + new Thread({ + try { + TimeUnit.MILLISECONDS.sleep(100) + } catch (InterruptedException ignored) { + } + completion.success() + }).start() + } + } + + def signature = "host-1" + (0.. + def token = "done-${idx}" + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + completedTokens.add(token) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + completedTokens.add(token) + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert processedItems.size() == requestCount + assert batchExecutionCount.get() < requestCount + assert completedTokens.size() == requestCount + (0.. + assert completedTokens.contains("done-${idx}") + } + } + + void testDifferentSignaturesNotCoalesced() { + def signaturesCount = 3 + def requestsPerSignature = 5 + def totalRequests = signaturesCount * requestsPerSignature + def completionLatch = new CountDownLatch(totalRequests) + def batchExecutionCount = new AtomicInteger(0) + def completedTokens = Collections.synchronizedSet(new LinkedHashSet()) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-multi-sig" + } + + @Override + protected void executeBatch(List items, Completion completion) { + batchExecutionCount.incrementAndGet() + completion.success() + } + } + + (0.. + def signature = "host-${sig}" + (0.. + def item = "${signature}-item-${idx}" + def token = "done-${item}" + queue.submit(signature, item, new Completion(null) { + @Override + void success() { + completedTokens.add(token) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + completedTokens.add(token) + completionLatch.countDown() + } + }) + } + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert batchExecutionCount.get() >= signaturesCount + assert completedTokens.size() == totalRequests + (0.. + def signature = "host-${sig}" + (0.. + assert completedTokens.contains("done-${signature}-item-${idx}") + } + } + } + + void testBatchFailureNotifiesAllRequests() { + def requestCount = 5 + def completionLatch = new CountDownLatch(requestCount) + def failureCount = new AtomicInteger(0) + def testError = org.zstack.core.Platform.operr("test error") + def completedTokens = Collections.synchronizedSet(new LinkedHashSet()) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-failure" + } + + @Override + protected void executeBatch(List items, Completion completion) { + completion.fail(testError) + } + } + + def signature = "host-fail" + (0.. + def token = "fail-${idx}" + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + completedTokens.add(token) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + failureCount.incrementAndGet() + completedTokens.add(token) + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert failureCount.get() == requestCount + assert completedTokens.size() == requestCount + (0.. + assert completedTokens.contains("fail-${idx}") + } + } + + void testBatchThrowExceptionNotifiesAllRequests() { + def requestCount = 5 + def completionLatch = new CountDownLatch(requestCount) + def failureCount = new AtomicInteger(0) + def testError = org.zstack.core.Platform.operr("test error") + def completedTokens = Collections.synchronizedSet(new LinkedHashSet()) + + def queue = new FailCoalesceQueue() + + def signature = "host-throw" + (0.. + def token = "throw-${idx}" + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + completedTokens.add(token) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + failureCount.incrementAndGet() + completedTokens.add(token) + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert failureCount.get() == requestCount + assert completedTokens.size() == requestCount + (0.. + assert completedTokens.contains("throw-${idx}") + } + } + + + void testReturnValueCompletion() { + def requestCount = 5 + def completionLatch = new CountDownLatch(requestCount) + def receivedResults = Collections.synchronizedMap(new LinkedHashMap()) + def mismatches = Collections.synchronizedList(new ArrayList()) + def batchResult = "batch-success" + + def queue = new ReturnValueCoalesceQueue() { + @Override + protected String getName() { + return "test-return-value" + } + + @Override + protected void executeBatch(List items, ReturnValueCompletion completion) { + completion.success(batchResult) + } + + @Override + protected String calculateResult(Integer item, String r) { + return "${r}-item-${item}" + } + } + + def signature = "host-result" + (0.. + queue.submit(signature, idx, new ReturnValueCompletion(null) { + @Override + void success(String result) { + def expected = String.format("%s-item-%s", batchResult, idx) + if (result != expected) { + mismatches.add(String.format("item-%s=%s", idx, result)) + } + receivedResults.put(idx, result) + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert receivedResults.size() == requestCount + assert mismatches.isEmpty() + (0.. + def expected = String.format("%s-item-%s", batchResult, idx) + assert receivedResults.get(idx) == expected + } + } + + void testResultCalculationFailure() { + def completionLatch = new CountDownLatch(2) + def successCount = new AtomicInteger(0) + def failCount = new AtomicInteger(0) + + def queue = new ReturnValueCoalesceQueue() { + @Override + protected String getName() { + return "test-calc-fail" + } + + @Override + protected void executeBatch(List items, ReturnValueCompletion completion) { + completion.success(null) + } + + @Override + protected String calculateResult(Integer item, Void batchResult) { + if (item == 0) { + throw new RuntimeException("Calculation failed for item 0 (on purpose)") + } + return "success" + } + } + + def signature = "host-calc" + queue.submit(signature, 0, new ReturnValueCompletion(null) { + @Override + void success(String ret) { + successCount.incrementAndGet() + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + failCount.incrementAndGet() + completionLatch.countDown() + } + }) + + queue.submit(signature, 1, new ReturnValueCompletion(null) { + @Override + void success(String ret) { + successCount.incrementAndGet() + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + failCount.incrementAndGet() + completionLatch.countDown() + } + }) + + assert completionLatch.await(10, TimeUnit.SECONDS) + assert successCount.get() == 1 + assert failCount.get() == 1 + } + + void testSequentialBatches() { + def firstBatchStart = new CountDownLatch(1) + def firstBatchContinue = new CountDownLatch(1) + def secondBatchStart = new CountDownLatch(1) + def secondBatchContinue = new CountDownLatch(1) + def allComplete = new CountDownLatch(6) + def batches = Collections.synchronizedList(new ArrayList>()) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-sequential" + } + + @Override + protected void executeBatch(List items, Completion completion) { + batches.add(new ArrayList<>(items)) + + if (batches.size() == 1) { + firstBatchStart.countDown() + try { + firstBatchContinue.await(5, TimeUnit.SECONDS) + } catch (InterruptedException ignored) { + } + } else if (batches.size() == 2) { + secondBatchStart.countDown() + try { + secondBatchContinue.await(5, TimeUnit.SECONDS) + } catch (InterruptedException ignored) { + } + } + + completion.success() + } + } + + def signature = "host-seq" + queue.submit(signature, 0, new Completion(null) { + @Override + void success() { + allComplete.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + allComplete.countDown() + } + }) + + assert firstBatchStart.await(5, TimeUnit.SECONDS) + + (1..<4).each { idx -> + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + allComplete.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + allComplete.countDown() + } + }) + } + + assert secondBatchStart.await(5, TimeUnit.SECONDS) + + (4..<6).each { idx -> + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + allComplete.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + allComplete.countDown() + } + }) + } + + firstBatchContinue.countDown() + secondBatchContinue.countDown() + assert allComplete.await(10, TimeUnit.SECONDS) + assert batches.size() == 3 + assert batches.get(0) == [0] + assert batches.get(1).containsAll([1, 2, 3]) + assert batches.get(2).containsAll([4, 5]) + } + + void testHighVolumeNoLossAcrossBatches() { + def requestCount = 300 + def completionLatch = new CountDownLatch(requestCount) + def processedItems = Collections.synchronizedSet(new LinkedHashSet()) + def batchCount = new AtomicInteger(0) + + def queue = new CoalesceQueue() { + @Override + protected String getName() { + return "test-high-volume" + } + + @Override + protected void executeBatch(List items, Completion completion) { + batchCount.incrementAndGet() + processedItems.addAll(items) + + new Thread({ + try { + TimeUnit.MILLISECONDS.sleep(3) + } catch (InterruptedException ignored) { + } + completion.success() + }).start() + } + } + + def signature = "host-high-volume" + (0.. + queue.submit(signature, idx, new Completion(null) { + @Override + void success() { + completionLatch.countDown() + } + + @Override + void fail(ErrorCode errorCode) { + completionLatch.countDown() + } + }) + } + + assert completionLatch.await(6, TimeUnit.SECONDS) + assert processedItems.size() == requestCount + assert batchCount.get() > 1 + } +} diff --git a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy index 6e7d69c10eb..f3b9239e879 100644 --- a/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/networkservice/provider/flat/dhcp/VerifyPrepareDhcpWhenReconnectHostCase.groovy @@ -1,6 +1,7 @@ package org.zstack.test.integration.networkservice.provider.flat.dhcp import org.springframework.http.HttpEntity +import org.zstack.core.thread.ThreadFacade import org.zstack.header.network.service.NetworkServiceType import org.zstack.network.securitygroup.SecurityGroupConstant import org.zstack.network.service.eip.EipConstant @@ -9,6 +10,9 @@ import org.zstack.network.service.flat.FlatNetworkServiceConstant import org.zstack.network.service.userdata.UserdataConstant import org.zstack.network.service.virtualrouter.vyos.VyosConstants import org.zstack.sdk.HostInventory +import org.zstack.sdk.ImageInventory +import org.zstack.sdk.InstanceOfferingInventory +import org.zstack.sdk.L3NetworkInventory import org.zstack.sdk.VirtualRouterVmInventory import org.zstack.sdk.VmInstanceInventory import org.zstack.test.integration.networkservice.provider.NetworkServiceProviderTest @@ -17,6 +21,10 @@ import org.zstack.testlib.SubCase import org.zstack.utils.data.SizeUnit import org.zstack.utils.gson.JSONObjectUtil +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger + class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { EnvSpec env @Override @@ -147,12 +155,14 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { void test() { env.create { checkDhcpWork() + testBatchStartVmApplyDhcp() } } void checkDhcpWork(){ def host = queryHost {}[0] as HostInventory def vm = env.inventoryByName("vm") as VmInstanceInventory + def vmItemTokens = new LinkedHashSet() setVmHostname { uuid = vm.uuid @@ -164,6 +174,11 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) called += 1 + cmd.dhcpInfos.each { info -> + info.dhcp.each { dhcp -> + vmItemTokens.add(String.format("%s-%s-%s", dhcp.ip, dhcp.netmask, dhcp.gateway)) + } + } return rsp } @@ -176,12 +191,17 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { assert called == 1 assert cmd.dhcpInfos.size() == 1 assert cmd.dhcpInfos.get(0).dhcp.get(0).hostname == "test-name" + def vmNic = vm.vmNics.get(0) + def expectedToken = String.format("%s-%s-%s", vmNic.ip, vmNic.netmask, vmNic.gateway) + assert vmItemTokens.contains(expectedToken) called = 0 cmd = null + vmItemTokens.clear() reconnectHost { uuid=host.uuid } assert called == 1 assert cmd.dhcpInfos.get(0).dhcp.get(0).hostname == "test-name" + assert vmItemTokens.contains(expectedToken) def vr = queryVirtualRouterVm {}[0] as VirtualRouterVmInventory assert vr != null @@ -192,6 +212,129 @@ class VerifyPrepareDhcpWhenReconnectHostCase extends SubCase { assert called == 1 } + void testBatchStartVmApplyDhcp() { + L3NetworkInventory l3 = env.inventoryByName("l3-1") as L3NetworkInventory + ImageInventory image = env.inventoryByName("image") as ImageInventory + InstanceOfferingInventory offering = env.inventoryByName("instanceOffering") as InstanceOfferingInventory + + def vmCount = 4 + def vms = new ArrayList() + def hostnameByIp = new LinkedHashMap() + (0.. + def hname = "batch-${idx}" + VmInstanceInventory inv = createVmInstance { + name = "batch-vm-${idx}" + imageUuid = image.uuid + l3NetworkUuids = [l3.uuid] + instanceOfferingUuid = offering.uuid + } as VmInstanceInventory + setVmHostname { + uuid = inv.uuid + hostname = hname + } + hostnameByIp.put(inv.vmNics.get(0).ip, hname) + vms.add(inv) + } + + vms.each { vmInv -> + stopVmInstance { + uuid = vmInv.uuid + } + } + + def batchCmds = Collections.synchronizedList(new ArrayList()) + def firstBatchArrived = new CountDownLatch(1) + def releaseFirstBatch = new CountDownLatch(1) + env.afterSimulator(FlatDhcpBackend.BATCH_APPLY_DHCP_PATH) { rsp, HttpEntity e1 -> + FlatDhcpBackend.BatchApplyDhcpCmd cmd = JSONObjectUtil.toObject(e1.body, FlatDhcpBackend.BatchApplyDhcpCmd.class) + batchCmds.add(cmd) + if (batchCmds.size() == 1) { + firstBatchArrived.countDown() + releaseFirstBatch.await(10, TimeUnit.SECONDS) + } + return rsp + } + + VmInstanceInventory blocker = vms.remove(0) + new Thread({ + startVmInstance { + uuid = blocker.uuid + } + }).start() + assert firstBatchArrived.await(10, TimeUnit.SECONDS) + + CountDownLatch doneLatch = new CountDownLatch(vms.size()) + vms.each { vmInv -> + new Thread({ + try { + startVmInstance { + uuid = vmInv.uuid + } + } finally { + doneLatch.countDown() + } + }).start() + } + + ThreadFacade thdf = bean(ThreadFacade.class) + retryInSecs { + assert thdf.getChainTaskInfo(String.format("coalesce-queue-flat-dhcp-apply-%s", vms[0].hostUuid)).pendingTask.size() == 3 + } + + releaseFirstBatch.countDown() + assert doneLatch.await(2, TimeUnit.MINUTES) + + retryInSecs(5) { + assert batchCmds.size() == 2 + } + retryInSecs(2) { + assert batchCmds.size() == 2 + } + + Closure> toTokenSet = { FlatDhcpBackend.BatchApplyDhcpCmd batch -> + def tokens = new LinkedHashSet() + batch.dhcpInfos.each { info -> + info.dhcp.each { dhcp -> + tokens.add(String.format("%s-%s-%s", dhcp.ip, dhcp.netmask, dhcp.gateway)) + } + } + return tokens + } + + Closure> toHostnameMap = { FlatDhcpBackend.BatchApplyDhcpCmd batch -> + def hostnames = new LinkedHashMap() + batch.dhcpInfos.each { info -> + info.dhcp.each { dhcp -> + hostnames.put(dhcp.ip, dhcp.hostname) + } + } + return hostnames + } + + def firstBatchTokens = toTokenSet(batchCmds.get(0)) + def secondBatchTokens = toTokenSet(batchCmds.get(1)) + def firstBatchHostnames = toHostnameMap(batchCmds.get(0)) + def secondBatchHostnames = toHostnameMap(batchCmds.get(1)) + + def blockerNic = blocker.vmNics.get(0) + def blockerToken = String.format("%s-%s-%s", blockerNic.ip, blockerNic.netmask, blockerNic.gateway) + assert firstBatchTokens.size() == 1 + assert firstBatchTokens.contains(blockerToken) + assert firstBatchHostnames.size() == 1 + assert firstBatchHostnames.get(blockerNic.ip) == hostnameByIp.get(blockerNic.ip) + + def expectedTokens = new LinkedHashSet() + def expectedHostnames = new LinkedHashMap() + vms.each { vmInv -> + def nic = vmInv.vmNics.get(0) + expectedTokens.add(String.format("%s-%s-%s", nic.ip, nic.netmask, nic.gateway)) + expectedHostnames.put(nic.ip, hostnameByIp.get(nic.ip)) + } + assert secondBatchTokens.containsAll(expectedTokens) + assert secondBatchTokens.size() == expectedTokens.size() + assert secondBatchHostnames == expectedHostnames + } + @Override void clean() { env.delete() diff --git a/testlib/pom.xml b/testlib/pom.xml index e2e8ce96e66..55cf0bb3bc8 100755 --- a/testlib/pom.xml +++ b/testlib/pom.xml @@ -180,6 +180,39 @@ + + org.codehaus.mojo + aspectj-maven-plugin + ${aspectj.plugin.version} + + + + compile + test-compile + + + + + ${project.java.version} + ${project.java.version} + ${project.java.version} + true + + + org.springframework + spring-aspects + + + org.zstack + core + + + org.zstack + header + + + + diff --git a/testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java b/testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java new file mode 100644 index 00000000000..50c8e22d1d9 --- /dev/null +++ b/testlib/src/main/java/org/zstack/testlib/FailCoalesceQueue.java @@ -0,0 +1,21 @@ +package org.zstack.testlib; + +import org.zstack.core.thread.CoalesceQueue; +import org.zstack.header.core.Completion; +import org.zstack.header.errorcode.OperationFailureException; + +import java.util.List; + +import static org.zstack.core.Platform.operr; + +public class FailCoalesceQueue extends CoalesceQueue { + @Override + protected String getName() { + return "test-failure"; + } + + @Override + protected void executeBatch(List items, Completion completion) { + throw new OperationFailureException(operr("test error")); + } +}