Skip to content

Commit 0be5ffa

Browse files
committed
Introduce created_at column to system_distributed.compression_dictionaries
patch by Stefan Miklosovic; reviewed by Yifan Cai for CASSANDRA-21178
1 parent eee06be commit 0be5ffa

7 files changed

Lines changed: 112 additions & 41 deletions

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
5.1
2+
* Introduce created_at column to system_distributed.compression_dictionaries (CASSANDRA-21178)
23
* Be able to detect and remove orphaned compression dictionaries (CASSANDRA-21157)
34
* Fix BigTableVerifier to only read a data file during extended verification (CASSANDRA-21150)
45
* Reduce memory allocation during transformation of BatchStatement to Mutation (CASSANDRA-21141)

src/java/org/apache/cassandra/db/compression/CompressionDictionary.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.DataOutput;
2323
import java.io.EOFException;
2424
import java.io.IOException;
25+
import java.time.Instant;
2526
import java.util.Objects;
2627

2728
import javax.annotation.Nullable;
@@ -34,6 +35,7 @@
3435
import org.apache.cassandra.cql3.UntypedResultSet;
3536
import org.apache.cassandra.io.compress.ICompressor;
3637
import org.apache.cassandra.io.compress.ZstdDictionaryCompressor;
38+
import org.apache.cassandra.utils.FBUtilities;
3739
import org.apache.cassandra.utils.concurrent.Ref;
3840

3941
/**
@@ -119,6 +121,14 @@ default Kind kind()
119121
return dictId().kind;
120122
}
121123

124+
/**
125+
* Returns a date of creation of this dictionary. By creation, we mean when
126+
* the compression object was instantiated.
127+
*
128+
* @return when was this dictionary created.
129+
*/
130+
Instant createdAt();
131+
122132
/**
123133
* Returns a reference from lazily initialized reference counter.
124134
*
@@ -262,6 +272,7 @@ static LightweightCompressionDictionary createFromRowLightweight(UntypedResultSe
262272
String keyspaceName = row.getString("keyspace_name");
263273
String tableName = row.getString("table_name");
264274
String tableId = row.getString("table_id");
275+
Instant createdAt = row.getTimestamp("created_at").toInstant();
265276

266277
try
267278
{
@@ -270,7 +281,8 @@ static LightweightCompressionDictionary createFromRowLightweight(UntypedResultSe
270281
tableId,
271282
new DictId(CompressionDictionary.Kind.valueOf(kindStr), dictId),
272283
checksum,
273-
size);
284+
size,
285+
createdAt);
274286
}
275287
catch (IllegalArgumentException ex)
276288
{
@@ -285,6 +297,7 @@ static CompressionDictionary createFromRow(UntypedResultSet.Row row)
285297
byte[] dict = row.getByteArray("dict");
286298
int storedLength = row.getInt("dict_length");
287299
int storedChecksum = row.getInt("dict_checksum");
300+
Instant createdAt = row.getTimestamp("created_at").toInstant();
288301

289302
try
290303
{
@@ -305,7 +318,7 @@ static CompressionDictionary createFromRow(UntypedResultSet.Row row)
305318
kindStr, dictId, storedChecksum, calculatedChecksum));
306319
}
307320

308-
return kind.createDictionary(new DictId(kind, dictId), row.getByteArray("dict"), storedChecksum);
321+
return kind.createDictionary(new DictId(kind, dictId), row.getByteArray("dict"), storedChecksum, createdAt);
309322
}
310323
catch (IllegalArgumentException ex)
311324
{
@@ -330,9 +343,9 @@ enum Kind
330343
ZSTD
331344
{
332345
@Override
333-
public CompressionDictionary createDictionary(DictId dictId, byte[] dict, int checksum)
346+
public CompressionDictionary createDictionary(DictId dictId, byte[] dict, int checksum, Instant createdAt)
334347
{
335-
return new ZstdCompressionDictionary(dictId, dict, checksum);
348+
return new ZstdCompressionDictionary(dictId, dict, checksum, createdAt);
336349
}
337350

338351
@Override
@@ -364,7 +377,21 @@ public ICompressionDictionaryTrainer createTrainer(String keyspaceName,
364377
* @param checksum checksum of this dictionary
365378
* @return a compression dictionary instance
366379
*/
367-
public abstract CompressionDictionary createDictionary(CompressionDictionary.DictId dictId, byte[] dict, int checksum);
380+
public CompressionDictionary createDictionary(CompressionDictionary.DictId dictId, byte[] dict, int checksum)
381+
{
382+
return createDictionary(dictId, dict, checksum, FBUtilities.now());
383+
}
384+
385+
/**
386+
* Creates a compression dictionary instance for this kind
387+
*
388+
* @param dictId the dictionary identifier
389+
* @param dict the raw dictionary bytes
390+
* @param checksum checksum of this dictionary
391+
* @param createdAt creation date of to-be-constructed dictionary
392+
* @return a compression dictionary instance
393+
*/
394+
public abstract CompressionDictionary createDictionary(CompressionDictionary.DictId dictId, byte[] dict, int checksum, Instant createdAt);
368395

369396
/**
370397
* Creates a dictionary compressor for this kind
@@ -436,20 +463,23 @@ class LightweightCompressionDictionary
436463
public final DictId dictId;
437464
public final int checksum;
438465
public final int size;
466+
public final Instant createdAt;
439467

440468
public LightweightCompressionDictionary(String keyspaceName,
441469
String tableName,
442470
String tableId,
443471
DictId dictId,
444472
int checksum,
445-
int size)
473+
int size,
474+
Instant createdAt)
446475
{
447476
this.keyspaceName = keyspaceName;
448477
this.tableName = tableName;
449478
this.tableId = tableId;
450479
this.dictId = dictId;
451480
this.checksum = checksum;
452481
this.size = size;
482+
this.createdAt = createdAt;
453483
}
454484

455485
@Override
@@ -462,6 +492,7 @@ public String toString()
462492
", dictId=" + dictId +
463493
", checksum=" + checksum +
464494
", size=" + size +
495+
", createdAt=" + createdAt +
465496
'}';
466497
}
467498
}

src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.cassandra.db.compression;
2020

21+
import java.time.Instant;
2122
import java.util.Arrays;
2223

2324
import javax.management.openmbean.ArrayType;
@@ -33,7 +34,6 @@
3334
import com.fasterxml.jackson.annotation.JsonProperty;
3435

3536
import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
36-
import org.apache.cassandra.io.util.FileUtils;
3737

3838
import static java.lang.String.format;
3939

@@ -63,7 +63,7 @@ public class CompressionDictionaryDetailsTabularData
6363
public static final String KIND_NAME = "Kind";
6464
public static final String CHECKSUM_NAME = "Checksum";
6565
public static final String SIZE_NAME = "Size";
66-
66+
public static final String CREATED_AT_NAME = "CreatedAt";
6767

6868
private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME,
6969
TABLE_NAME,
@@ -72,7 +72,8 @@ public class CompressionDictionaryDetailsTabularData
7272
DICT_NAME,
7373
KIND_NAME,
7474
CHECKSUM_NAME,
75-
SIZE_NAME };
75+
SIZE_NAME,
76+
CREATED_AT_NAME };
7677

7778
private static final String[] ITEM_DESCS = new String[]{ "keyspace",
7879
"table",
@@ -81,7 +82,8 @@ public class CompressionDictionaryDetailsTabularData
8182
"dictionary_bytes",
8283
"kind",
8384
"checksum",
84-
"size" };
85+
"size",
86+
"created_at" };
8587

8688
private static final String TYPE_NAME = "DictionaryDetails";
8789
private static final String ROW_DESC = "DictionaryDetails";
@@ -100,7 +102,8 @@ public class CompressionDictionaryDetailsTabularData
100102
new ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes
101103
SimpleType.STRING, // kind
102104
SimpleType.INTEGER, // checksum
103-
SimpleType.INTEGER }; // size of dict bytes
105+
SimpleType.INTEGER, // size of dict bytes
106+
SimpleType.STRING }; // created_at
104107

105108
COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES);
106109
TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES);
@@ -133,6 +136,7 @@ public static CompositeData fromLightweightCompressionDictionary(LightweightComp
133136
dictionary.dictId.kind.name(),
134137
dictionary.checksum,
135138
dictionary.size,
139+
dictionary.createdAt.toString()
136140
});
137141
}
138142
catch (OpenDataException e)
@@ -166,6 +170,7 @@ public static CompositeData fromCompressionDictionary(String keyspace, String ta
166170
dictionary.kind().name(),
167171
dictionary.checksum(),
168172
dictionary.rawDictionary().length,
173+
dictionary.createdAt().toString(),
169174
});
170175
}
171176
catch (OpenDataException e)
@@ -196,7 +201,8 @@ public static CompositeData fromCompressionDictionaryDataObject(CompressionDicti
196201
dataObject.dict,
197202
dataObject.kind,
198203
dataObject.dictChecksum,
199-
dataObject.dictLength
204+
dataObject.dictLength,
205+
dataObject.createdAt.toString()
200206
});
201207
}
202208
catch (OpenDataException e)
@@ -221,7 +227,8 @@ public static CompressionDictionaryDataObject fromCompositeData(CompositeData co
221227
(byte[]) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME),
222228
(String) compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME),
223229
(Integer) compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME),
224-
(Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME));
230+
(Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME),
231+
Instant.parse((String) compositeData.get(CompressionDictionaryDetailsTabularData.CREATED_AT_NAME)));
225232
}
226233

227234
public static class CompressionDictionaryDataObject
@@ -234,6 +241,7 @@ public static class CompressionDictionaryDataObject
234241
public final String kind;
235242
public final int dictChecksum;
236243
public final int dictLength;
244+
public final Instant createdAt;
237245

238246
@JsonCreator
239247
public CompressionDictionaryDataObject(@JsonProperty("keyspace") String keyspace,
@@ -243,7 +251,8 @@ public CompressionDictionaryDataObject(@JsonProperty("keyspace") String keyspace
243251
@JsonProperty("dict") byte[] dict,
244252
@JsonProperty("kind") String kind,
245253
@JsonProperty("dictChecksum") int dictChecksum,
246-
@JsonProperty("dictLength") int dictLength)
254+
@JsonProperty("dictLength") int dictLength,
255+
@JsonProperty("createdAt") Instant createdAt)
247256
{
248257
this.keyspace = keyspace;
249258
this.table = table;
@@ -253,6 +262,7 @@ public CompressionDictionaryDataObject(@JsonProperty("keyspace") String keyspace
253262
this.kind = kind;
254263
this.dictChecksum = dictChecksum;
255264
this.dictLength = dictLength;
265+
this.createdAt = createdAt;
256266

257267
validate();
258268
}
@@ -269,6 +279,7 @@ public CompressionDictionaryDataObject(@JsonProperty("keyspace") String keyspace
269279
* <li>dictLength is bigger than 0</li>
270280
* <li>dictLength has to be equal to dict's length</li>
271281
* <li>dictChecksum has to be equal to checksum computed as part of this method</li>
282+
* <li>creation date is not null</li>
272283
* </ul>
273284
*/
274285
private void validate()
@@ -278,15 +289,11 @@ private void validate()
278289
if (table == null)
279290
throw new IllegalArgumentException("Table not specified.");
280291
if (tableId == null)
281-
throw new IllegalArgumentException("Table id not specified");
292+
throw new IllegalArgumentException("Table id not specified.");
282293
if (dictId <= 0)
283294
throw new IllegalArgumentException("Provided dictionary id must be positive but it is '" + dictId + "'.");
284295
if (dict == null || dict.length == 0)
285296
throw new IllegalArgumentException("Provided dictionary byte array is null or empty.");
286-
if (dict.length > FileUtils.ONE_MIB)
287-
throw new IllegalArgumentException("Imported dictionary can not be larger than " +
288-
FileUtils.ONE_MIB + " bytes, but it is " +
289-
dict.length + " bytes.");
290297
if (kind == null)
291298
throw new IllegalArgumentException("Provided kind is null.");
292299

@@ -305,6 +312,8 @@ private void validate()
305312
throw new IllegalArgumentException("Size has to be strictly positive number, it is '" + dictLength + "'.");
306313
if (dict.length != dictLength)
307314
throw new IllegalArgumentException("The length of the provided dictionary array (" + dict.length + ") is not equal to provided length value (" + dictLength + ").");
315+
if (createdAt == null)
316+
throw new IllegalArgumentException("The creation date not specified.");
308317

309318
int checksumOfDictionaryToImport = CompressionDictionary.calculateChecksum((byte) dictionaryKind.ordinal(), dictId, dict);
310319
if (checksumOfDictionaryToImport != dictChecksum)

src/java/org/apache/cassandra/db/compression/ZstdCompressionDictionary.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.cassandra.db.compression;
2020

21+
import java.time.Instant;
2122
import java.util.Objects;
2223
import java.util.concurrent.ConcurrentHashMap;
2324
import java.util.concurrent.atomic.AtomicReference;
@@ -30,6 +31,7 @@
3031
import org.slf4j.LoggerFactory;
3132

3233
import org.apache.cassandra.io.compress.ZstdCompressorBase;
34+
import org.apache.cassandra.utils.FBUtilities;
3335
import org.apache.cassandra.utils.concurrent.Ref;
3436
import org.apache.cassandra.utils.concurrent.RefCounted;
3537
import org.apache.cassandra.utils.concurrent.SelfRefCounted;
@@ -45,21 +47,24 @@ public class ZstdCompressionDictionary implements CompressionDictionary, SelfRef
4547
private final ConcurrentHashMap<Integer, ZstdDictCompress> zstdDictCompressPerLevel = new ConcurrentHashMap<>();
4648
private final AtomicReference<ZstdDictDecompress> dictDecompress = new AtomicReference<>();
4749
private volatile Ref<ZstdCompressionDictionary> selfRef;
50+
private final Instant createdAt;
4851

4952
@VisibleForTesting
5053
public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary)
5154
{
5255
this(dictId,
5356
rawDictionary,
54-
CompressionDictionary.calculateChecksum((byte) dictId.kind.ordinal(), dictId.id, rawDictionary));
57+
CompressionDictionary.calculateChecksum((byte) dictId.kind.ordinal(), dictId.id, rawDictionary),
58+
FBUtilities.now());
5559
}
5660

57-
public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary, int checksum)
61+
public ZstdCompressionDictionary(DictId dictId, byte[] rawDictionary, int checksum, Instant createdAt)
5862
{
5963
this.dictId = dictId;
6064
this.rawDictionary = rawDictionary;
6165
this.checksum = checksum;
6266
this.selfRef = null;
67+
this.createdAt = createdAt;
6368
}
6469

6570
@Override
@@ -86,6 +91,12 @@ public int checksum()
8691
return checksum;
8792
}
8893

94+
@Override
95+
public Instant createdAt()
96+
{
97+
return createdAt;
98+
}
99+
89100
@Override
90101
public int estimatedOccupiedMemoryBytes()
91102
{

0 commit comments

Comments
 (0)