From f51855a8811180e6f407b4f5a87b7fc1d2740214 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 30 Jan 2026 14:38:37 +0800 Subject: [PATCH 1/2] calculate table size map --- .../tsfile/read/TsFileSequenceReader.java | 95 +++++++++++++++++++ .../tsfile/write/writer/TsFileIOWriter.java | 58 +++++++++++ 2 files changed, 153 insertions(+) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index 35a39dc9b..481a4d6c2 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -158,6 +158,101 @@ public TsFileSequenceReader(String file) throws IOException { this(file, true, null); } + public Map countChunksPerChunkGroup() throws IOException { + Map result = new LinkedHashMap<>(); + + File checkFile = FSFactoryProducer.getFSFactory().getFile(this.file); + if (!checkFile.exists()) { + return result; + } + + int headerLength = TSFileConfig.MAGIC_STRING.getBytes().length + Byte.BYTES; + if (checkFile.length() < headerLength) { + return result; + } + + if (!TSFileConfig.MAGIC_STRING.equals(readHeadMagic())) { + return result; + } + + readVersionNumber(); + checkFileVersion(); + + tsFileInput.position(headerLength); + + IDeviceID currentDevice = null; + int currentChunkCount = 0; + + try { + byte marker; + while ((marker = readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_GROUP_HEADER: + // finish last chunk group + if (currentDevice != null) { + result.put(currentDevice, currentChunkCount); + } + + // start new chunk group + ChunkGroupHeader chunkGroupHeader = readChunkGroupHeader(); + currentDevice = chunkGroupHeader.getDeviceID(); + currentChunkCount = 0; + break; + + case MetaMarker.CHUNK_HEADER: + case MetaMarker.TIME_CHUNK_HEADER: + case MetaMarker.VALUE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: + // count chunk + currentChunkCount++; + + // skip chunk content safely + ChunkHeader chunkHeader = readChunkHeader(marker); + skipChunkData(chunkHeader, marker); + break; + + case MetaMarker.OPERATION_INDEX_RANGE: + readPlanIndex(); + break; + + default: + throw new IOException("Unexpected marker " + marker); + } + } + + // last chunk group + if (currentDevice != null) { + result.put(currentDevice, currentChunkCount); + } + } catch (Exception e) { + } + + return result; + } + + private void skipChunkData(ChunkHeader chunkHeader, byte marker) throws IOException { + int dataSize = chunkHeader.getDataSize(); + + if (dataSize <= 0) { + return; + } + + if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker.CHUNK_HEADER) { + // multi-page chunk + while (dataSize > 0) { + PageHeader pageHeader = readPageHeader(chunkHeader.getDataType(), true); + skipPageData(pageHeader); + dataSize -= pageHeader.getSerializedPageSize(); + } + } else { + // single-page chunk + PageHeader pageHeader = readPageHeader(chunkHeader.getDataType(), false); + skipPageData(pageHeader); + } + } + public TsFileSequenceReader(String file, EncryptParameter firstEncryptParam) throws IOException { this(file, true, null); this.firstEncryptParam = firstEncryptParam; diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 377ef90ff..961ee8293 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -136,6 +136,12 @@ public class TsFileIOWriter implements AutoCloseable { private final List flushListeners = new ArrayList<>(); + protected String currentTable; + + protected long currentTableStartOffset; + + protected Map tableSizeMap = new HashMap<>(); + /** empty construct function. */ protected TsFileIOWriter() { setEncryptParam( @@ -260,6 +266,7 @@ protected void startFile() throws IOException { } public int startChunkGroup(IDeviceID deviceId) throws IOException { + updateTableSize(deviceId); this.currentChunkGroupDeviceId = deviceId; if (logger.isDebugEnabled()) { logger.debug("start chunk group:{}, file position {}", deviceId, out.getPosition()); @@ -427,6 +434,7 @@ public void endFile() throws IOException { if (!canWrite) { return; } + updateTableSize(null); checkInMemoryPathCount(); readChunkMetadataAndConstructIndexTree(); @@ -474,6 +482,8 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { TSMIterator tsmIterator = getTSMIterator(); Map deviceMetadataIndexMap = new TreeMap<>(); Queue measurementMetadataIndexQueue = new ArrayDeque<>(); + String prevTableName = null; + long prevTableMetadataStartOffset = metaOffset; IDeviceID currentDevice = null; IDeviceID prevDevice = null; Path currentPath = null; @@ -495,6 +505,7 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { filter.add(currentPath); // construct the index tree node for the series currentDevice = currentPath.getIDeviceID(); + boolean isTableModel = schema.getTableSchemaMap().containsKey(currentDevice.getTableName()); if (!currentDevice.equals(prevDevice)) { if (prevDevice != null) { addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out); @@ -503,6 +514,16 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { generateRootNode( measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT); + String currentTableName = isTableModel ? currentDevice.getTableName() : null; + if (!Objects.equals(currentTableName, prevTableName)) { + if (prevTableName != null) { + long currentTableSize = out.getPosition() - prevTableMetadataStartOffset; + tableSizeMap.compute( + prevTableName, (k, v) -> v == null ? currentTableSize : v + currentTableSize); + } + prevTableName = currentTableName; + prevTableMetadataStartOffset = out.getPosition(); + } } measurementMetadataIndexQueue = new ArrayDeque<>(); seriesIdxForCurrDevice = 0; @@ -533,6 +554,15 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { prevDevice, generateRootNode( measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT)); + prevTableName = + schema.getTableSchemaMap().containsKey(prevDevice.getTableName()) + ? prevDevice.getTableName() + : null; + if (prevTableName != null) { + long currentTableSize = out.getPosition() - prevTableMetadataStartOffset; + tableSizeMap.compute( + prevTableName, (k, v) -> v == null ? currentTableSize : v + currentTableSize); + } } Map> tableDeviceNodesMap = @@ -541,7 +571,12 @@ private void readChunkMetadataAndConstructIndexTree() throws IOException { // build an index root for each table Map tableNodesMap = new TreeMap<>(); for (Entry> entry : tableDeviceNodesMap.entrySet()) { + long tableDeviceMetadataNodeStartOffset = out.getPosition(); tableNodesMap.put(entry.getKey(), checkAndBuildLevelIndex(entry.getValue(), out)); + long tableDeviceMetadataNodeSize = out.getPosition() - tableDeviceMetadataNodeStartOffset; + tableSizeMap.compute( + prevTableName, + (k, v) -> v == null ? tableDeviceMetadataNodeSize : v + tableDeviceMetadataNodeSize); } TsFileMetadata tsFileMetadata = new TsFileMetadata(); @@ -863,4 +898,27 @@ public boolean isGenerateTableSchema() { public void setGenerateTableSchema(boolean generateTableSchema) { this.generateTableSchema = generateTableSchema; } + + public Map getTableSizeMap() { + return tableSizeMap; + } + + private void updateTableSize(IDeviceID currentStartChunkGroupDeviceId) throws IOException { + long currentPosition = out.getPosition(); + // endFile + boolean endFile = currentStartChunkGroupDeviceId == null; + if (endFile + || (currentStartChunkGroupDeviceId.isTableModel() + && !currentStartChunkGroupDeviceId.getTableName().equals(currentTable))) { + if (currentTable != null) { + long size = currentPosition - currentTableStartOffset; + tableSizeMap.compute(currentTable, (k, v) -> (v == null ? size : v + size)); + } + currentTableStartOffset = currentPosition; + currentTable = + currentStartChunkGroupDeviceId == null + ? null + : currentStartChunkGroupDeviceId.getTableName(); + } + } } From 67232bbdd8f27138d35ba547c28091eaf5bea6c1 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Fri, 30 Jan 2026 14:38:54 +0800 Subject: [PATCH 2/2] add ut --- .../tsfile/write/TsFileWriteApiTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java index 6449b52dc..dd6e74bc9 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileWriteApiTest.java @@ -1151,6 +1151,49 @@ public void writeAllNullValueTablet() } } + @Test + public void calculateTableSize() throws IOException, WriteProcessException { + TableSchema tableSchema1 = + new TableSchema( + "table1", + Arrays.asList( + new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD))); + TableSchema tableSchema2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + new ColumnSchema("s1", TSDataType.BLOB, ColumnCategory.FIELD))); + Tablet tablet1 = + new Tablet( + "table1", + IMeasurementSchema.getMeasurementNameList(tableSchema1.getColumnSchemas()), + IMeasurementSchema.getDataTypeList(tableSchema1.getColumnSchemas()), + tableSchema1.getColumnTypes()); + tablet1.addTimestamp(0, 0); + tablet1.addValue(0, 0, new byte[1024]); + Tablet tablet2 = + new Tablet( + "table2", + IMeasurementSchema.getMeasurementNameList(tableSchema2.getColumnSchemas()), + IMeasurementSchema.getDataTypeList(tableSchema2.getColumnSchemas()), + tableSchema2.getColumnTypes()); + tablet2.addTimestamp(0, 0); + tablet2.addValue(0, 0, new byte[1024 * 1024]); + Map tableSizeMap = null; + try (TsFileWriter writer = new TsFileWriter(f)) { + writer.registerTableSchema(tableSchema1); + writer.registerTableSchema(tableSchema2); + writer.writeTable(tablet1); + writer.writeTable(tablet2); + tableSizeMap = writer.getIOWriter().getTableSizeMap(); + } + Assert.assertTrue(tableSizeMap.get("table1") < 1024 * 1024); + Assert.assertTrue(tableSizeMap.get("table1") > 1024); + Assert.assertTrue(tableSizeMap.get("table2") >= 1024 * 1024); + } + @Test public void writeRecord() throws IOException, WriteProcessException, ReadProcessException { setEnv(100 * 1024 * 1024, 10 * 1024);