Skip to content

Commit c947055

Browse files
committed
MLE-26420 Added DocumentWriteSetFilter
Started making tests under "com.marklogic.client.datamovement" as well so that protected methods can be unit-tested.
1 parent 37954b5 commit c947055

File tree

12 files changed

+174
-88
lines changed

12 files changed

+174
-88
lines changed

CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
# Each line is a file pattern followed by one or more owners.
33

44
# These owners will be the default owners for everything in the repo.
5-
* @anu3990 @billfarber @rjrudin @stevebio
5+
* @billfarber @rjrudin @stevebio
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement;
5+
6+
import com.marklogic.client.DatabaseClient;
7+
import com.marklogic.client.document.DocumentWriteSet;
8+
9+
import java.util.function.Function;
10+
11+
/**
12+
* A filter that can modify a DocumentWriteSet before it is written to the database.
13+
*
14+
* @since 8.1.0
15+
*/
16+
public interface DocumentWriteSetFilter extends Function<DocumentWriteSetFilter.Context, DocumentWriteSet> {
17+
18+
interface Context {
19+
/**
20+
* @return the DocumentWriteSet to be written
21+
*/
22+
DocumentWriteSet getDocumentWriteSet();
23+
24+
/**
25+
* @return the batch number
26+
*/
27+
long getBatchNumber();
28+
29+
/**
30+
* @return the DatabaseClient being used for this batch
31+
*/
32+
DatabaseClient getDatabaseClient();
33+
34+
/**
35+
* @return the temporal collection name, or null if not writing to a temporal collection
36+
*/
37+
String getTemporalCollection();
38+
}
39+
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/WriteBatcher.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,4 +357,17 @@ WriteBatcher addAs(String uri, DocumentMetadataWriteHandle metadataHandle,
357357
* @param writeBatch the information about the batch that failed
358358
*/
359359
void retryWithFailureListeners(WriteBatch writeBatch);
360+
361+
/**
362+
* Sets a filter to modify or replace the DocumentWriteSet before it is written.
363+
* The filter can return either the modified DocumentWriteSet or a new one.
364+
* If the filter returns null or an empty DocumentWriteSet, no write will occur.
365+
*
366+
* @param filter the function to apply before writing
367+
* @return this instance for method chaining
368+
* @since 8.1.0
369+
*/
370+
default WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) {
371+
return this;
372+
}
360373
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriteSet.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.marklogic.client.datamovement.impl;
55

66
import com.marklogic.client.DatabaseClient;
7+
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
78
import com.marklogic.client.datamovement.WriteBatch;
89
import com.marklogic.client.datamovement.WriteBatcher;
910
import com.marklogic.client.datamovement.WriteEvent;
@@ -16,15 +17,17 @@
1617
* Mutable class that captures the documents to be written. Documents are added via calls to "getDocumentWriteSet()", where the
1718
* DocumentWriteSet is empty when this class is constructed.
1819
*/
19-
class BatchWriteSet {
20+
class BatchWriteSet implements DocumentWriteSetFilter.Context {
2021

2122
private final WriteBatcher batcher;
22-
private final DocumentWriteSet documentWriteSet;
2323
private final long batchNumber;
2424
private final DatabaseClient client;
2525
private final ServerTransform transform;
2626
private final String temporalCollection;
2727

28+
// Can be overridden after creation
29+
private DocumentWriteSet documentWriteSet;
30+
2831
private long itemsSoFar;
2932
private Runnable onSuccess;
3033
private Consumer<Throwable> onFailure;
@@ -38,10 +41,21 @@ class BatchWriteSet {
3841
this.batchNumber = batchNumber;
3942
}
4043

44+
/**
45+
* Must be called if a DocumentWriteSetFilter modified the DocumentWriteSet owned by this class.
46+
*
47+
* @since 8.1.0
48+
*/
49+
void updateWithFilteredDocumentWriteSet(DocumentWriteSet filteredDocumentWriteSet) {
50+
this.documentWriteSet = filteredDocumentWriteSet;
51+
}
52+
53+
@Override
4154
public DocumentWriteSet getDocumentWriteSet() {
4255
return documentWriteSet;
4356
}
4457

58+
@Override
4559
public long getBatchNumber() {
4660
return batchNumber;
4761
}
@@ -50,6 +64,11 @@ public void setItemsSoFar(long itemsSoFar) {
5064
this.itemsSoFar = itemsSoFar;
5165
}
5266

67+
@Override
68+
public DatabaseClient getDatabaseClient() {
69+
return client;
70+
}
71+
5372
public DatabaseClient getClient() {
5473
return client;
5574
}
@@ -58,6 +77,7 @@ public ServerTransform getTransform() {
5877
return transform;
5978
}
6079

80+
@Override
6181
public String getTemporalCollection() {
6282
return temporalCollection;
6383
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/BatchWriter.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package com.marklogic.client.datamovement.impl;
55

6+
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
67
import com.marklogic.client.document.DocumentWriteOperation;
78
import com.marklogic.client.document.DocumentWriteSet;
89
import com.marklogic.client.document.XMLDocumentManager;
@@ -13,7 +14,7 @@
1314
import java.io.Closeable;
1415
import java.util.function.Consumer;
1516

16-
record BatchWriter(BatchWriteSet batchWriteSet) implements Runnable {
17+
record BatchWriter(BatchWriteSet batchWriteSet, DocumentWriteSetFilter filter) implements Runnable {
1718

1819
private static Logger logger = LoggerFactory.getLogger(WriteBatcherImpl.class);
1920

@@ -28,6 +29,16 @@ public void run() {
2829
logger.trace("Begin write batch {} to forest on host '{}'", batchWriteSet.getBatchNumber(), batchWriteSet.getClient().getHost());
2930

3031
DocumentWriteSet documentWriteSet = batchWriteSet.getDocumentWriteSet();
32+
if (filter != null) {
33+
documentWriteSet = filter.apply(batchWriteSet);
34+
if (documentWriteSet == null || documentWriteSet.isEmpty()) {
35+
logger.debug("Filter returned empty write set for batch {}, skipping write", batchWriteSet.getBatchNumber());
36+
closeAllHandles();
37+
return;
38+
}
39+
batchWriteSet.updateWithFilteredDocumentWriteSet(documentWriteSet);
40+
}
41+
3142
writeDocuments(documentWriteSet);
3243

3344
// This seems like it should be part of a finally block - but it's able to throw an exception. Which implies

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/WriteBatcherImpl.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public class WriteBatcherImpl
126126
private boolean initialized = false;
127127
private CompletableThreadPoolExecutor threadPool = null;
128128
private DocumentMetadataHandle defaultMetadata;
129+
private DocumentWriteSetFilter documentWriteSetFilter;
129130

130131
public WriteBatcherImpl(DataMovementManager moveMgr, ForestConfiguration forestConfig) {
131132
super(moveMgr);
@@ -200,7 +201,7 @@ public WriteBatcher add(DocumentWriteOperation writeOperation) {
200201
writeSet.getDocumentWriteSet().add(doc);
201202
}
202203
if ( writeSet.getDocumentWriteSet().size() > minBatchSize ) {
203-
threadPool.submit( new BatchWriter(writeSet) );
204+
threadPool.submit( new BatchWriter(writeSet, documentWriteSetFilter) );
204205
}
205206
}
206207
return this;
@@ -308,7 +309,7 @@ private void retry(WriteBatch batch, boolean callFailListeners) {
308309
for (WriteEvent doc : batch.getItems()) {
309310
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
310311
}
311-
BatchWriter runnable = new BatchWriter(writeSet);
312+
BatchWriter runnable = new BatchWriter(writeSet, documentWriteSetFilter);
312313
runnable.run();
313314
}
314315
@Override
@@ -379,7 +380,7 @@ private void flush(boolean waitForCompletion) {
379380
DocumentWriteOperation doc = iter.next();
380381
writeSet.getDocumentWriteSet().add(doc);
381382
}
382-
threadPool.submit( new BatchWriter(writeSet) );
383+
threadPool.submit( new BatchWriter(writeSet, documentWriteSetFilter) );
383384
}
384385

385386
if (waitForCompletion) awaitCompletion();
@@ -597,7 +598,7 @@ public synchronized WriteBatcher withForestConfig(ForestConfiguration forestConf
597598
for ( WriteEvent doc : writerTask.batchWriteSet().getBatchOfWriteEvents().getItems() ) {
598599
writeSet.getDocumentWriteSet().add(doc.getTargetUri(), doc.getMetadata(), doc.getContent());
599600
}
600-
BatchWriter retryWriterTask = new BatchWriter(writeSet);
601+
BatchWriter retryWriterTask = new BatchWriter(writeSet, documentWriteSetFilter);
601602
Runnable fretryWriterTask = (Runnable) threadPool.submit(retryWriterTask);
602603
threadPool.replaceTask(writerTask, fretryWriterTask);
603604
// jump to the next task
@@ -846,4 +847,10 @@ public void addAll(Stream<? extends DocumentWriteOperation> operations) {
846847
public DocumentMetadataHandle getDocumentMetadata() {
847848
return defaultMetadata;
848849
}
850+
851+
@Override
852+
public WriteBatcher withDocumentWriteSetFilter(DocumentWriteSetFilter filter) {
853+
this.documentWriteSetFilter = filter;
854+
return this;
855+
}
849856
}

marklogic-client-api/src/main/java/com/marklogic/client/impl/okhttp/RetryIOExceptionInterceptor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package com.marklogic.client.impl.okhttp;
55

6+
import com.marklogic.client.MarkLogicIOException;
67
import okhttp3.Interceptor;
78
import okhttp3.Request;
89
import okhttp3.Response;
@@ -47,7 +48,7 @@ public Response intercept(Chain chain) throws IOException {
4748
for (int attempt = 0; attempt <= maxRetries; attempt++) {
4849
try {
4950
return chain.proceed(request);
50-
} catch (IOException e) {
51+
} catch (MarkLogicIOException | IOException e) {
5152
if (attempt == maxRetries || !isRetryableIOException(e)) {
5253
logger.warn("Not retryable: {}; {}", e.getClass(), e.getMessage());
5354
throw e;
@@ -65,7 +66,7 @@ public Response intercept(Chain chain) throws IOException {
6566
throw new IllegalStateException("Unexpected end of retry loop");
6667
}
6768

68-
private boolean isRetryableIOException(IOException e) {
69+
private boolean isRetryableIOException(Exception e) {
6970
return e instanceof ConnectException ||
7071
e instanceof SocketTimeoutException ||
7172
e instanceof UnknownHostException ||

marklogic-client-api/src/test/java/com/marklogic/client/test/datamovement/WriteNakedPropertiesTest.java renamed to marklogic-client-api/src/test/java/com/marklogic/client/datamovement/WriteNakedPropertiesTest.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,20 @@
11
/*
22
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
33
*/
4-
package com.marklogic.client.test.datamovement;
4+
package com.marklogic.client.datamovement;
55

66
import com.marklogic.client.DatabaseClient;
7-
import com.marklogic.client.datamovement.DataMovementManager;
8-
import com.marklogic.client.datamovement.WriteBatcher;
97
import com.marklogic.client.io.DocumentMetadataHandle;
8+
import com.marklogic.client.test.AbstractClientTest;
109
import com.marklogic.client.test.Common;
11-
import org.junit.jupiter.api.BeforeEach;
1210
import org.junit.jupiter.api.Test;
1311

1412
import javax.xml.namespace.QName;
1513

1614
import static org.junit.jupiter.api.Assertions.assertEquals;
1715
import static org.junit.jupiter.api.Assertions.assertTrue;
1816

19-
public class WriteNakedPropertiesTest {
20-
21-
@BeforeEach
22-
void setup() {
23-
Common.newRestAdminClient().newXMLDocumentManager().delete("/naked.xml");
24-
}
17+
class WriteNakedPropertiesTest extends AbstractClientTest {
2518

2619
@Test
2720
void test() {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.io.DocumentMetadataHandle;
7+
import com.marklogic.client.io.StringHandle;
8+
import com.marklogic.client.test.AbstractClientTest;
9+
import com.marklogic.client.test.Common;
10+
import org.junit.jupiter.api.Test;
11+
12+
import java.util.concurrent.atomic.AtomicInteger;
13+
14+
import static org.junit.jupiter.api.Assertions.assertEquals;
15+
16+
class RemoveAllDocumentsFilterTest extends AbstractClientTest {
17+
18+
private static final DocumentMetadataHandle METADATA = new DocumentMetadataHandle()
19+
.withCollections("incremental-test")
20+
.withPermission("rest-reader", DocumentMetadataHandle.Capability.READ, DocumentMetadataHandle.Capability.UPDATE);
21+
22+
AtomicInteger writtenCount = new AtomicInteger();
23+
24+
@Test
25+
void filterRemovesAllDocuments() {
26+
new WriteBatcherTemplate(Common.newClient()).runWriteJob(
27+
writeBatcher -> writeBatcher
28+
.withDocumentWriteSetFilter(context -> context.getDatabaseClient().newDocumentManager().newWriteSet())
29+
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length)),
30+
31+
writeBatcher -> {
32+
for (int i = 1; i <= 10; i++) {
33+
writeBatcher.add("/incremental/test/doc-" + i + ".xml", METADATA, new StringHandle("<doc/>"));
34+
}
35+
}
36+
);
37+
38+
assertEquals(0, writtenCount.get(), "No documents should have been written since the filter removed them all. " +
39+
"This test is verifying that no error will occur either when the filter doesn't return any documents.");
40+
assertCollectionSize("incremental-test", 0);
41+
}
42+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2010-2025 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
3+
*/
4+
package com.marklogic.client.datamovement.filter;
5+
6+
import com.marklogic.client.DatabaseClient;
7+
import com.marklogic.client.datamovement.DataMovementManager;
8+
import com.marklogic.client.datamovement.WriteBatcher;
9+
10+
import java.util.function.Consumer;
11+
12+
// Experimenting with a template that gets rid of some annoying DMSDK boilerplate.
13+
record WriteBatcherTemplate(DatabaseClient databaseClient) {
14+
15+
public void runWriteJob(Consumer<WriteBatcher> writeBatcherConfigurer, Consumer<WriteBatcher> writeBatcherUser) {
16+
try (DataMovementManager dmm = databaseClient.newDataMovementManager()) {
17+
WriteBatcher writeBatcher = dmm.newWriteBatcher();
18+
writeBatcherConfigurer.accept(writeBatcher);
19+
20+
dmm.startJob(writeBatcher);
21+
writeBatcherUser.accept(writeBatcher);
22+
writeBatcher.flushAndWait();
23+
writeBatcher.awaitCompletion();
24+
dmm.stopJob(writeBatcher);
25+
}
26+
}
27+
}

0 commit comments

Comments
 (0)