2
Answers

BatchSize option is not working in ChangeStream WatchAsync

Hi Team,

I am using ChangeStream for catch my backend deletions and send mails which is working fine.
In case of bulk deletions I want to send only few e-mails hence I am using BatchSize, some how it is not working.
As per my understanding, depending on BatchSize setting those many changes should capture.
I set BatchSize is 2, when I delete 5 records from collection it should only send 2 mails as I set BatchSize is 2, however it is sending all 5 mails.
Please help me to fix this issue. Below is my code:

public async Task RealtionalCollectionCollectionChange(CancellationToken cancellationToken)

{var options = new ChangeStreamOptions

{ FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, BatchSize = 2 };

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");

using (var cursor = await collection.WatchAsync(pipeline, options, cancellationToken))

{

while (await cursor.MoveNextAsync(cancellationToken))

{

if (cancellationToken.IsCancellationRequested) { break; }

foreach (var change in cursor.Current)

{

//Sending mail code 

} break; } } } cursor.Dispose(); } }

Thanks,

Lalitha.C

Answers (2)
2
Amit Mohanty

Amit Mohanty

17 52.2k 6.1m 1y
public async Task RealtionalCollectionCollectionChange(CancellationToken cancellationToken)
{
    var options = new ChangeStreamOptions
    {
        FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, 
        BatchSize = 2
    };
    var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{operationType: { $in: [ 'replace', 'insert', 'update', 'delete' ] } }");
    using(var cursor = await collection.WatchAsync(pipeline, options, cancellationToken))
    {
        while (await cursor.MoveNextAsync(cancellationToken))
        {
            if (cancellationToken.IsCancellationRequested) 
            {
                break;
            }
            int mailCounter = 0; // initialize counter variable
            foreach(var change in cursor.Current)
            {
                if (mailCounter < option.BatchSize) // limit number of emails sent
                {
                    // Sending mail code 
                    mailCounter ++; // increment mail count for this batch
                }
            }
        }
    }
    cursor.Dispose();
}
1
Lalitha Chevuru

Lalitha Chevuru

NA 84 6k 1y

Hi Amit,

It is working for one time i.e., when I delete first batch 5 records it sent 2 mails only, however when I delete second time 5 records, it didn't send any mails as mailcounter is 2.

When I set mailcounter = 0 in else part, first time also it is sending 5 mails.

When I try to dispose cursor it is throwing error.

Not sure how to fix this issue.

Thanks,

Lalitha.C