Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,101 @@ public TsFileSequenceReader(String file) throws IOException {
this(file, true, null);
}

public Map<IDeviceID, Integer> countChunksPerChunkGroup() throws IOException {
Map<IDeviceID, Integer> result = new LinkedHashMap<>();

File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file);
if (!checkFile.exists()) {
return result;
}

int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES;
if (checkFile.length() < headerLength) {
return result;
}

if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())) {
return result;
}

readVersionNumber();
checkFileVersion();

tsFileInput.position(headerLength);

IDeviceID currentDevice = null;
int currentChunkCount = 0;

try {
byte marker;
while ((marker = readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_GROUP_HEADER:
// finish last chunk group
if (currentDevice != null) {
result.put(currentDevice, currentChunkCount);
}

// start new chunk group
ChunkGroupHeader chunkGroupHeader = readChunkGroupHeader();
currentDevice = chunkGroupHeader.getDeviceID();
currentChunkCount = 0;
break;

case MetaMarker.CHUNK_HEADER:
case MetaMarker.TIME_CHUNK_HEADER:
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
// count chunk
currentChunkCount++;

// skip chunk content safely
ChunkHeader chunkHeader = readChunkHeader(marker);
skipChunkData(chunkHeader, marker);
break;

case MetaMarker.OPERATION_INDEX_RANGE:
readPlanIndex();
break;

default:
throw new IOException("Unexpected marker " + marker);
}
}

// last chunk group
if (currentDevice != null) {
result.put(currentDevice, currentChunkCount);
}
} catch (Exception e) {
}

return result;
}

private void skipChunkData(ChunkHeader chunkHeader, byte marker) throws IOException {
int dataSize = chunkHeader.getDataSize();

if (dataSize <= 0) {
return;
}

if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.CHUNK_HEADER) {
// multi-page chunk
while (dataSize > 0) {
PageHeader pageHeader = readPageHeader(chunkHeader.getDataType(), true);
skipPageData(pageHeader);
dataSize -= pageHeader.getSerializedPageSize();
}
} else {
// single-page chunk
PageHeader pageHeader = readPageHeader(chunkHeader.getDataType(), false);
skipPageData(pageHeader);
}
}

public TsFileSequenceReader(String file, EncryptParameter firstEncryptParam) throws IOException {
this(file, true, null);
this.firstEncryptParam = firstEncryptParam;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ public class TsFileIOWriter implements AutoCloseable {

private final List<FlushChunkMetadataListener> flushListeners = new ArrayList<>();

protected String currentTable;

protected long currentTableStartOffset;

protected Map<String, Long> tableSizeMap = new HashMap<>();

/** empty construct function. */
protected TsFileIOWriter() {
setEncryptParam(
Expand Down Expand Up @@ -260,6 +266,7 @@ protected void startFile() throws IOException {
}

public int startChunkGroup(IDeviceID deviceId) throws IOException {
updateTableSize(deviceId);
this.currentChunkGroupDeviceId = deviceId;
if (logger.isDebugEnabled()) {
logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition());
Expand Down Expand Up @@ -427,6 +434,7 @@ public void endFile() throws IOException {
if (!canWrite) {
return;
}
updateTableSize(null);

checkInMemoryPathCount();
readChunkMetadataAndConstructIndexTree();
Expand Down Expand Up @@ -474,6 +482,8 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
TSMIterator tsmIterator = getTSMIterator();
Map<IDeviceID, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
String prevTableName = null;
long prevTableMetadataStartOffset = metaOffset;
IDeviceID currentDevice = null;
IDeviceID prevDevice = null;
Path currentPath = null;
Expand All @@ -495,6 +505,7 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
filter.add(currentPath);
// construct the index tree node for the series
currentDevice = currentPath.getIDeviceID();
boolean isTableModel = schema.getTableSchemaMap().containsKey(currentDevice.getTableName());
if (!currentDevice.equals(prevDevice)) {
if (prevDevice != null) {
addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
Expand All @@ -503,6 +514,16 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
generateRootNode(
measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
String currentTableName = isTableModel ? currentDevice.getTableName() : null;
if (!Objects.equals(currentTableName, prevTableName)) {
if (prevTableName != null) {
long currentTableSize = out.getPosition() - prevTableMetadataStartOffset;
tableSizeMap.compute(
prevTableName, (k, v) -> v == null ? currentTableSize : v + currentTableSize);
}
prevTableName = currentTableName;
prevTableMetadataStartOffset = out.getPosition();
}
}
measurementMetadataIndexQueue = new ArrayDeque<>();
seriesIdxForCurrDevice = 0;
Expand Down Expand Up @@ -533,6 +554,15 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
prevDevice,
generateRootNode(
measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
prevTableName =
schema.getTableSchemaMap().containsKey(prevDevice.getTableName())
? prevDevice.getTableName()
: null;
if (prevTableName != null) {
long currentTableSize = out.getPosition() - prevTableMetadataStartOffset;
tableSizeMap.compute(
prevTableName, (k, v) -> v == null ? currentTableSize : v + currentTableSize);
}
}

Map<String, Map<IDeviceID, MetadataIndexNode>> tableDeviceNodesMap =
Expand All @@ -541,7 +571,12 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException {
// build an index root for each table
Map<String, MetadataIndexNode> tableNodesMap = new TreeMap<>();
for (Entry<String, Map<IDeviceID, MetadataIndexNode>> entry : tableDeviceNodesMap.entrySet()) {
long tableDeviceMetadataNodeStartOffset = out.getPosition();
tableNodesMap.put(entry.getKey(), checkAndBuildLevelIndex(entry.getValue(), out));
long tableDeviceMetadataNodeSize = out.getPosition() - tableDeviceMetadataNodeStartOffset;
tableSizeMap.compute(
prevTableName,
(k, v) -> v == null ? tableDeviceMetadataNodeSize : v + tableDeviceMetadataNodeSize);
}

TsFileMetadata tsFileMetadata = new TsFileMetadata();
Expand Down Expand Up @@ -863,4 +898,27 @@ public boolean isGenerateTableSchema() {
public void setGenerateTableSchema(boolean generateTableSchema) {
this.generateTableSchema = generateTableSchema;
}

public Map<String, Long> getTableSizeMap() {
return tableSizeMap;
}

private void updateTableSize(IDeviceID currentStartChunkGroupDeviceId) throws IOException {
long currentPosition = out.getPosition();
// endFile
boolean endFile = currentStartChunkGroupDeviceId == null;
if (endFile
|| (currentStartChunkGroupDeviceId.isTableModel()
&& !currentStartChunkGroupDeviceId.getTableName().equals(currentTable))) {
if (currentTable != null) {
long size = currentPosition - currentTableStartOffset;
tableSizeMap.compute(currentTable, (k, v) -> (v == null ? size : v + size));
}
currentTableStartOffset = currentPosition;
currentTable =
currentStartChunkGroupDeviceId == null
? null
: currentStartChunkGroupDeviceId.getTableName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,49 @@ public void writeAllNullValueTablet()
}
}

@Test
public void calculateTableSize() throws IOException, WriteProcessException {
TableSchema tableSchema1 =
new TableSchema(
"table1",
Arrays.asList(
new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD)));
TableSchema tableSchema2 =
new TableSchema(
"table2",
Arrays.asList(
new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD)));
Tablet tablet1 =
new Tablet(
"table1",
IMeasurementSchema.getMeasurementNameList(tableSchema1.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema1.getColumnSchemas()),
tableSchema1.getColumnTypes());
tablet1.addTimestamp(0, 0);
tablet1.addValue(0, 0, new byte[1024]);
Tablet tablet2 =
new Tablet(
"table2",
IMeasurementSchema.getMeasurementNameList(tableSchema2.getColumnSchemas()),
IMeasurementSchema.getDataTypeList(tableSchema2.getColumnSchemas()),
tableSchema2.getColumnTypes());
tablet2.addTimestamp(0, 0);
tablet2.addValue(0, 0, new byte[1024 * 1024]);
Map<String, Long> tableSizeMap = null;
try (TsFileWriter writer = new TsFileWriter(f)) {
writer.registerTableSchema(tableSchema1);
writer.registerTableSchema(tableSchema2);
writer.writeTable(tablet1);
writer.writeTable(tablet2);
tableSizeMap = writer.getIOWriter().getTableSizeMap();
}
Assert.assertTrue(tableSizeMap.get("table1") < 1024 * 1024);
Assert.assertTrue(tableSizeMap.get("table1") > 1024);
Assert.assertTrue(tableSizeMap.get("table2") >= 1024 * 1024);
}

@Test
public void writeRecord() throws IOException, WriteProcessException, ReadProcessException {
setEnv(100 * 1024 * 1024, 10 * 1024);
Expand Down
Loading