|
51 | 51 | import org.apache.accumulo.core.conf.Property; |
52 | 52 | import org.apache.accumulo.core.data.NamespaceId; |
53 | 53 | import org.apache.accumulo.core.data.TableId; |
| 54 | +import org.apache.accumulo.core.fate.AdminUtil; |
54 | 55 | import org.apache.accumulo.core.fate.AgeOffStore; |
55 | 56 | import org.apache.accumulo.core.fate.Fate; |
56 | 57 | import org.apache.accumulo.core.fate.FateTxId; |
57 | 58 | import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; |
58 | 59 | import org.apache.accumulo.core.fate.Repo; |
59 | 60 | import org.apache.accumulo.core.fate.ZooStore; |
| 61 | +import org.apache.accumulo.core.fate.zookeeper.ServiceLock; |
60 | 62 | import org.apache.accumulo.core.fate.zookeeper.ZooCache; |
61 | 63 | import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; |
62 | 64 | import org.apache.accumulo.core.util.UtilWaitThread; |
|
65 | 67 | import org.apache.accumulo.manager.tableOps.TraceRepo; |
66 | 68 | import org.apache.accumulo.manager.tableOps.Utils; |
67 | 69 | import org.apache.accumulo.server.ServerContext; |
| 70 | +import org.apache.accumulo.server.util.Admin; |
68 | 71 | import org.apache.accumulo.test.util.Wait; |
69 | 72 | import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; |
70 | 73 | import org.apache.zookeeper.KeeperException; |
@@ -404,6 +407,51 @@ public void testCancelWhileInCall() throws Exception { |
404 | 407 | assertFalse(fate.cancel(txid)); |
405 | 408 | } |
406 | 409 |
|
| 410 | + /** |
| 411 | + * Test that verifies that fate table locks are created and deleted from the correct ZooKeeper |
| 412 | + * path |
| 413 | + */ |
| 414 | + @Test |
| 415 | + public void testTableLockDeleteUsesCorrectZKPath() throws Exception { |
| 416 | + |
| 417 | + ConfigurationCopy config = new ConfigurationCopy(); |
| 418 | + config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); |
| 419 | + config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); |
| 420 | + AdminUtil<Admin> admin = new AdminUtil<>(true); |
| 421 | + |
| 422 | + callStarted = new CountDownLatch(1); |
| 423 | + finishCall = new CountDownLatch(1); |
| 424 | + |
| 425 | + long txId = fate.startTransaction(); |
| 426 | + String formattedTxId = FateTxId.formatTid(txId); |
| 427 | + LOG.debug("Starting test testDeleteUsesCorrectZKPath {}", formattedTxId); |
| 428 | + assertEquals(NEW, getTxStatus(zk, txId)); |
| 429 | + fate.seedTransaction("TestOperation", txId, new TestOperation(NS, TID), false, |
| 430 | + "Test Delete Op"); |
| 431 | + assertEquals(SUBMITTED, getTxStatus(zk, txId)); |
| 432 | + fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2)); |
| 433 | + // Wait for the transaction runner to be in progress |
| 434 | + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txId)); |
| 435 | + |
| 436 | + assertFalse(fate.cancel(txId)); |
| 437 | + |
| 438 | + var tableLocksPath = ServiceLock.path(ZK_ROOT + Constants.ZTABLE_LOCKS); |
| 439 | + assertFalse(zk.getChildren(tableLocksPath.toString()).isEmpty(), |
| 440 | + "Table locks at " + tableLocksPath + "do not exist"); |
| 441 | + for (var tableName : zk.getChildren(tableLocksPath.toString())) { |
| 442 | + for (var tableLock : zk.getChildren( |
| 443 | + ServiceLock.path(ZK_ROOT + Constants.ZTABLE_LOCKS + "/" + tableName).toString())) { |
| 444 | + LOG.debug("Found table {} with fate lock {}", tableName, tableLock); |
| 445 | + } |
| 446 | + } |
| 447 | + |
| 448 | + admin.deleteLocks(zk, tableLocksPath, FateTxId.toHexString(formattedTxId)); |
| 449 | + for (var tableName : zk.getChildren(tableLocksPath.toString())) { |
| 450 | + var fateTableLocks = tableLocksPath + "/" + tableName; |
| 451 | + assertTrue(zk.getChildren(fateTableLocks).isEmpty(), " table fate locks are still present"); |
| 452 | + } |
| 453 | + } |
| 454 | + |
407 | 455 | @Test |
408 | 456 | public void testRepoFails() throws Exception { |
409 | 457 | /* |
|
0 commit comments