@@ -450,6 +450,123 @@ public DataResult update(ClientSession clientSession, List<? extends Bson> queri
450450 start );
451451 }
452452
453+ /**
454+ * Update documents using an aggregation pipeline.
455+ *
456+ * @param query the filter to select the documents to update
457+ * @param pipeline the aggregation pipeline to apply for the update
458+ * @param options additional options for the query and update operation
459+ * @return a DataResult containing the result of the update operation
460+ */
461+ public DataResult updateWithPipeline (Bson query , List <? extends Bson > pipeline , QueryOptions options ) {
462+ return updateWithPipeline (null , query , pipeline , options );
463+ }
464+
465+ /**
466+ * Update documents using an aggregation pipeline.
467+ *
468+ * @param clientSession the client session to use for the operation, or null if not using sessions
469+ * @param query the filter to select the documents to update
470+ * @param pipeline the aggregation pipeline to apply for the update
471+ * @param options additional options for the query and update operation
472+ * @return a DataResult containing the result of the update operation
473+ */
474+ public DataResult updateWithPipeline (ClientSession clientSession , Bson query , List <? extends Bson > pipeline , QueryOptions options ) {
475+ long start = startQuery ();
476+
477+ boolean upsert = false ;
478+ boolean multi = false ;
479+ if (options != null ) {
480+ upsert = options .getBoolean (UPSERT );
481+ multi = options .getBoolean (MULTI );
482+ }
483+
484+ UpdateResult updateResult = mongoDBNativeQuery .updateWithPipeline (clientSession , query , pipeline , upsert , multi );
485+ return endWrite (updateResult .getMatchedCount (), updateResult .getUpsertedId () != null ? 1 : 0 ,
486+ updateResult .getUpsertedId () == null ? updateResult .getModifiedCount () : 0 , 0 , 0 , start );
487+ }
488+
489+ /**
490+ * Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document.
491+ *
492+ * @param query the filter to select the document to update
493+ * @param projection the fields to return in the resulting document
494+ * @param sort the sort criteria to apply before finding the document
495+ * @param pipeline the aggregation pipeline to apply for the update
496+ * @param options additional options for the query and update operation
497+ * @return a DataResult containing the updated document, or an empty result if no document matched
498+ */
499+ public DataResult <Document > findAndUpdateWithPipeline (Bson query , Bson projection , Bson sort ,
500+ List <? extends Bson > pipeline , QueryOptions options ) {
501+ return privateFindAndUpdateWithPipeline (null , query , projection , sort , pipeline , options , null , null );
502+ }
503+
504+ /**
505+ * Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document.
506+ *
507+ * @param clientSession the client session to use for the operation, or null if not using sessions
508+ * @param query the filter to select the document to update
509+ * @param projection the fields to return in the resulting document
510+ * @param sort the sort criteria to apply before finding the document
511+ * @param pipeline the aggregation pipeline to apply for the update
512+ * @param options additional options for the query and update operation
513+ * @return a DataResult containing the updated document, or an empty result if no document matched
514+ */
515+ public DataResult <Document > findAndUpdateWithPipeline (ClientSession clientSession , Bson query , Bson projection , Bson sort ,
516+ List <? extends Bson > pipeline , QueryOptions options ) {
517+ return privateFindAndUpdateWithPipeline (clientSession , query , projection , sort , pipeline , options , null , null );
518+ }
519+
520+ /**
521+ * Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document.
522+ *
523+ * @param query the filter to select the document to update
524+ * @param projection the fields to return in the resulting document
525+ * @param sort the sort criteria to apply before finding the document
526+ * @param pipeline the aggregation pipeline to apply for the update
527+ * @param clazz the class type to convert the result to; if null or Document.class, returns a Document
528+ * @param options additional options for the query and update operation
529+ * @param <T> the type of the returned result
530+ * @return a DataResult containing the updated document, or an empty result if no document matched
531+ */
532+ public <T > DataResult <T > findAndUpdateWithPipeline (Bson query , Bson projection , Bson sort ,
533+ List <? extends Bson > pipeline , Class <T > clazz , QueryOptions options ) {
534+ return privateFindAndUpdateWithPipeline (null , query , projection , sort , pipeline , options , clazz , null );
535+ }
536+
537+ /**
538+ * Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document.
539+ *
540+ * @param clientSession the client session to use for the operation, or null if not using sessions
541+ * @param query the filter to select the document to update
542+ * @param projection the fields to return in the resulting document
543+ * @param sort the sort criteria to apply before finding the document
544+ * @param pipeline the aggregation pipeline to apply for the update
545+ * @param clazz the class type to convert the result to; if null or Document.class, returns a Document
546+ * @param options additional options for the query and update operation
547+ * @param <T> the type of the returned result
548+ * @return a DataResult containing the updated document, or an empty result if no document matched
549+ */
550+ public <T > DataResult <T > findAndUpdateWithPipeline (ClientSession clientSession , Bson query , Bson projection , Bson sort ,
551+ List <? extends Bson > pipeline , Class <T > clazz , QueryOptions options ) {
552+ return privateFindAndUpdateWithPipeline (clientSession , query , projection , sort , pipeline , options , clazz , null );
553+ }
554+
555+ private <T > DataResult <T > privateFindAndUpdateWithPipeline (ClientSession clientSession , Bson query , Bson projection , Bson sort ,
556+ List <? extends Bson > pipeline , QueryOptions options , Class <T > clazz ,
557+ ComplexTypeConverter <T , Document > converter ) {
558+ long start = startQuery ();
559+ Document result = mongoDBNativeQuery .findAndUpdateWithPipeline (clientSession , query , projection , sort , pipeline , options );
560+ if (clazz != null && !clazz .equals (Document .class )) {
561+ try {
562+ return endQuery (Collections .singletonList (objectMapper .readValue (objectWriter .writeValueAsString (result ), clazz )), start );
563+ } catch (IOException e ) {
564+ logger .error ("Error deserializing result: " + e .getMessage (), e );
565+ }
566+ }
567+ return endQuery (Collections .singletonList (result ), start );
568+ }
569+
453570 public DataResult remove (Bson query , QueryOptions options ) {
454571 return remove (null , query , options );
455572 }
0 commit comments