Skip to content

Commit b60c77a

Browse files
committed
MLE-26420 Nicer error when missing index for incremental write
1 parent 0e6f0ae commit b60c77a

File tree

4 files changed

+113
-59
lines changed

4 files changed

+113
-59
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.fasterxml.jackson.databind.JsonNode;
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import com.fasterxml.jackson.databind.node.ArrayNode;
9+
import com.marklogic.client.FailedRequestException;
910
import com.marklogic.client.datamovement.DocumentWriteSetFilter;
1011
import com.marklogic.client.document.DocumentWriteOperation;
1112
import com.marklogic.client.document.DocumentWriteSet;
@@ -42,16 +43,21 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) {
4243
}
4344
}
4445

45-
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
46-
.addVariable("fieldName", fieldName)
47-
.addVariable("uris", new JacksonHandle(uris))
48-
.evalAs(JsonNode.class);
49-
50-
return filterDocuments(context, uri -> {
51-
if (response.has(uri)) {
52-
return response.get(uri).asText();
53-
}
54-
return null;
55-
});
46+
try {
47+
JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT)
48+
.addVariable("fieldName", fieldName)
49+
.addVariable("uris", new JacksonHandle(uris))
50+
.evalAs(JsonNode.class);
51+
52+
return filterDocuments(context, uri -> {
53+
if (response.has(uri)) {
54+
return response.get(uri).asText();
55+
}
56+
return null;
57+
});
58+
} catch (FailedRequestException e) {
59+
String message = "Unable to query for existing incremental write hashes; cause: " + e.getMessage();
60+
throw new FailedRequestException(message, e.getFailedRequest());
61+
}
5662
}
5763
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ protected final DocumentWriteSet filterDocuments(Context context, Function<Strin
109109
continue;
110110
}
111111

112-
final String contentHash = serializeContent(doc);
112+
final String contentHash = computeHash(serializeContent(doc));
113113
final String existingHash = hashRetriever.apply(doc.getUri());
114114
if (logger.isTraceEnabled()) {
115115
logger.trace("URI: {}, existing Hash: {}, new Hash: {}", doc.getUri(), existingHash, contentHash);

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
package com.marklogic.client.datamovement.filter;
55

6+
import com.marklogic.client.FailedRequestException;
67
import com.marklogic.client.document.DocumentWriteOperation;
78
import com.marklogic.client.document.DocumentWriteSet;
89
import com.marklogic.client.row.RowTemplate;
@@ -31,25 +32,32 @@ public DocumentWriteSet apply(Context context) {
3132

3233
// It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using
3334
// a serialized query instead. That doesn't allow a user to override the query though.
34-
Map<String, String> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
35-
op.fromLexicons(Map.of(
36-
"uri", op.cts.uriReference(),
37-
"hash", op.cts.fieldReference(super.fieldName)
38-
)).where(
39-
op.cts.documentQuery(op.xs.stringSeq(uris))
40-
),
41-
42-
rows -> {
43-
Map<String, String> map = new HashMap<>();
44-
rows.forEach(row -> {
45-
String uri = row.getString("uri");
46-
String existingHash = row.getString("hash");
47-
map.put(uri, existingHash);
48-
});
49-
return map;
50-
}
51-
);
52-
53-
return filterDocuments(context, uri -> existingHashes.get(uri));
35+
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
36+
37+
try {
38+
Map<String, String> existingHashes = rowTemplate.query(op ->
39+
op.fromLexicons(Map.of(
40+
"uri", op.cts.uriReference(),
41+
"hash", op.cts.fieldReference(super.fieldName)
42+
)).where(
43+
op.cts.documentQuery(op.xs.stringSeq(uris))
44+
),
45+
46+
rows -> {
47+
Map<String, String> map = new HashMap<>();
48+
rows.forEach(row -> {
49+
String uri = row.getString("uri");
50+
String existingHash = row.getString("hash");
51+
map.put(uri, existingHash);
52+
});
53+
return map;
54+
}
55+
);
56+
57+
return filterDocuments(context, uri -> existingHashes.get(uri));
58+
} catch (FailedRequestException e) {
59+
String message = "Unable to query for existing incremental write hashes; cause: " + e.getMessage();
60+
throw new FailedRequestException(message, e.getFailedRequest());
61+
}
5462
}
5563
}

marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java

Lines changed: 67 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import com.fasterxml.jackson.databind.ObjectMapper;
77
import com.fasterxml.jackson.databind.node.ObjectNode;
8-
import com.marklogic.client.document.DocumentWriteOperation;
8+
import com.marklogic.client.document.*;
99
import com.marklogic.client.impl.DocumentWriteOperationImpl;
1010
import com.marklogic.client.io.DocumentMetadataHandle;
1111
import com.marklogic.client.io.Format;
@@ -150,8 +150,40 @@ void invalidJsonWithFormat() {
150150
"Expecting the server to throw an error. Actual message: " + message);
151151
}
152152

153+
@Test
154+
void noRangeIndexForField() {
155+
filter = IncrementalWriteFilter.newBuilder()
156+
.fieldName("non-existent-field")
157+
.build();
158+
159+
writeTenDocuments();
160+
161+
assertNotNull(batchFailure.get());
162+
String message = batchFailure.get().getMessage();
163+
assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("XDMP-FIELDRIDXNOTFOUND"),
164+
"When the user tries to use the incremental write feature without the required range index, we should " +
165+
"fail with a clear error message. Actual message: " + message);
166+
}
167+
168+
@Test
169+
void noRangeIndexForFieldWithEval() {
170+
filter = IncrementalWriteFilter.newBuilder()
171+
.fieldName("non-existent-field")
172+
.useEvalQuery(true)
173+
.build();
174+
175+
writeTenDocuments();
176+
177+
assertNotNull(batchFailure.get());
178+
String message = batchFailure.get().getMessage();
179+
assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("XDMP-FIELDRIDXNOTFOUND"),
180+
"When the user tries to use the incremental write feature without the required range index, we should " +
181+
"fail with a helpful error message. Actual message: " + message);
182+
}
183+
153184
private void verifyIncrementalWriteWorks() {
154185
writeTenDocuments();
186+
verifyDocumentsHasHashInMetadataKey();
155187
assertEquals(10, writtenCount.get());
156188
assertEquals(0, skippedCount.get(), "No docs should have been skipped on the first write.");
157189

@@ -169,36 +201,44 @@ private void verifyIncrementalWriteWorks() {
169201
}
170202

171203
private void writeTenDocuments() {
172-
new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher
173-
.withThreadCount(1).withBatchSize(5)
174-
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length))
175-
.withDocumentWriteSetFilter(filter),
176-
177-
writeBatcher -> {
178-
for (int i = 1; i <= 10; i++) {
179-
// Consistent URIs are required for incremental writes to work.
180-
String uri = "/incremental/test/doc-" + i + ".xml";
181-
String content = "<doc>This is document number " + i + "</doc>";
182-
writeBatcher.add(uri, METADATA, new StringHandle(content));
183-
}
204+
docs = new ArrayList<>();
205+
for (int i = 1; i <= 10; i++) {
206+
// Consistent URIs are required for incremental writes to work.
207+
String uri = "/incremental/test/doc-" + i + ".xml";
208+
String content = "<doc>This is document number " + i + "</doc>";
209+
docs.add(new DocumentWriteOperationImpl(uri, METADATA, new StringHandle(content)));
210+
}
211+
writeDocs(docs);
212+
}
213+
214+
private void verifyDocumentsHasHashInMetadataKey() {
215+
GenericDocumentManager mgr = Common.client.newDocumentManager();
216+
mgr.setMetadataCategories(DocumentManager.Metadata.METADATAVALUES);
217+
DocumentPage page = mgr.search(Common.client.newQueryManager().newStructuredQueryBuilder().collection("incremental-test"), 1);
218+
while (page.hasNext()) {
219+
DocumentRecord doc = page.next();
220+
DocumentMetadataHandle metadata = doc.getMetadata(new DocumentMetadataHandle());
221+
assertTrue(metadata.getMetadataValues().containsKey("incrementalWriteHash"),
222+
"Document " + doc.getUri() + " should have an incrementalWriteHash in its metadata values.");
223+
224+
String hash = metadata.getMetadataValues().get("incrementalWriteHash");
225+
try {
226+
// Can use Java's support for parsing unsigned longs in base 16 to verify the hash is valid.
227+
Long.parseUnsignedLong(hash, 16);
228+
} catch (NumberFormatException e) {
229+
fail("Document " + doc.getUri() + " has an invalid incrementalWriteHash value: " + hash);
184230
}
185-
);
231+
}
186232
}
187233

188234
private void modifyFiveDocuments() {
189-
new WriteBatcherTemplate(Common.client).runWriteJob(writeBatcher -> writeBatcher
190-
.withThreadCount(1).withBatchSize(5)
191-
.onBatchSuccess(batch -> writtenCount.addAndGet(batch.getItems().length))
192-
.withDocumentWriteSetFilter(filter),
193-
194-
writeBatcher -> {
195-
for (int i = 6; i <= 10; i++) {
196-
String uri = "/incremental/test/doc-" + i + ".xml";
197-
String content = "<doc>This is modified content</doc>";
198-
writeBatcher.add(uri, METADATA, new StringHandle(content));
199-
}
200-
}
201-
);
235+
docs = new ArrayList<>();
236+
for (int i = 6; i <= 10; i++) {
237+
String uri = "/incremental/test/doc-" + i + ".xml";
238+
String content = "<doc>This is modified content</doc>";
239+
docs.add(new DocumentWriteOperationImpl(uri, METADATA, new StringHandle(content)));
240+
}
241+
writeDocs(docs);
202242
}
203243

204244
private void writeDocs(List<DocumentWriteOperation> docs) {

0 commit comments

Comments
 (0)