1MONGOC_CHANGE_STREAM_T(3)          libmongoc         MONGOC_CHANGE_STREAM_T(3)
2
3
4

SYNOPSIS

6          #include <mongoc/mongoc.h>
7
8          typedef struct _mongoc_change_stream_t mongoc_change_stream_t;
9
10       mongoc_change_stream_t  is  a  handle  to a change stream. A collection
11       change stream can be obtained using mongoc_collection_watch().
12
13       It is recommended to use a mongoc_change_stream_t and its functions in‐
14       stead  of a raw aggregation with a $changeStream stage. For more infor‐
15       mation see the MongoDB Manual Entry on Change Streams.
16

EXAMPLE

18       example-collection-watch.c
19
20          #include <mongoc/mongoc.h>
21
22          int
23          main (void)
24          {
25             bson_t empty = BSON_INITIALIZER;
26             const bson_t *doc;
27             bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
28             const bson_t *err_doc;
29             bson_error_t error;
30             const char *uri_string;
31             mongoc_uri_t *uri;
32             mongoc_client_t *client;
33             mongoc_collection_t *coll;
34             mongoc_change_stream_t *stream;
35             mongoc_write_concern_t *wc = mongoc_write_concern_new ();
36             bson_t opts = BSON_INITIALIZER;
37             bool r;
38
39             mongoc_init ();
40
41             uri_string = "mongodb://"
42                          "localhost:27017,localhost:27018,localhost:"
43                          "27019/db?replicaSet=rs0";
44
45             uri = mongoc_uri_new_with_error (uri_string, &error);
46             if (!uri) {
47                fprintf (stderr,
48                         "failed to parse URI: %s\n"
49                         "error message:       %s\n",
50                         uri_string,
51                         error.message);
52                return EXIT_FAILURE;
53             }
54
55             client = mongoc_client_new_from_uri (uri);
56             if (!client) {
57                return EXIT_FAILURE;
58             }
59
60             coll = mongoc_client_get_collection (client, "db", "coll");
61             stream = mongoc_collection_watch (coll, &empty, NULL);
62
63             mongoc_write_concern_set_wmajority (wc, 10000);
64             mongoc_write_concern_append (wc, &opts);
65             r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
66             if (!r) {
67                fprintf (stderr, "Error: %s\n", error.message);
68                return EXIT_FAILURE;
69             }
70
71             while (mongoc_change_stream_next (stream, &doc)) {
72                char *as_json = bson_as_relaxed_extended_json (doc, NULL);
73                fprintf (stderr, "Got document: %s\n", as_json);
74                bson_free (as_json);
75             }
76
77             if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
78                if (!bson_empty (err_doc)) {
79                   fprintf (stderr,
80                            "Server Error: %s\n",
81                            bson_as_relaxed_extended_json (err_doc, NULL));
82                } else {
83                   fprintf (stderr, "Client Error: %s\n", error.message);
84                }
85                return EXIT_FAILURE;
86             }
87
88             bson_destroy (to_insert);
89             mongoc_write_concern_destroy (wc);
90             bson_destroy (&opts);
91             mongoc_change_stream_destroy (stream);
92             mongoc_collection_destroy (coll);
93             mongoc_uri_destroy (uri);
94             mongoc_client_destroy (client);
95             mongoc_cleanup ();
96
97             return EXIT_SUCCESS;
98          }
99
100
101   Starting and Resuming
102       All watch functions accept several options to indicate where  a  change
103       stream  should  start  returning changes from: resumeAfter, startAfter,
104       and startAtOperationTime.
105
106       All changes returned by mongoc_change_stream_next()  include  a  resume
107       token  in the _id field. MongoDB 4.2 also includes an additional resume
108       token in each "aggregate" and "getMore" command response, which  points
109       to the end of that response's batch. The current token is automatically
110       cached by libmongoc. In the event of an error,  libmongoc  attempts  to
111       recreate  the  change  stream starting where it left off by passing the
112       cached resume token. libmongoc only attempts to resume once, but client
113       applications    can    access    the    cached    resume   token   with
114       mongoc_change_stream_get_resume_token() and use it for their own resume
115       logic by passing it as either the resumeAfter or startAfter option.
116
117       Additionally,  change  streams can start returning changes at an opera‐
118       tion time by using the startAtOperationTime  field.  This  can  be  the
119       timestamp returned in the operationTime field of a command reply.
120
121       resumeAfter,  startAfter,  and startAtOperationTime are mutually exclu‐
122       sive options. Setting more than one will result in a server error.
123
124       The following example implements custom resuming logic, persisting  the
125       resume token in a file.
126
127       example-resume.c
128
129          #include <mongoc/mongoc.h>
130
131          /* An example implementation of custom resume logic in a change stream.
132          * example-resume starts a client-wide change stream and persists the resume
133          * token in a file "resume-token.json". On restart, if "resume-token.json"
134          * exists, the change stream starts watching after the persisted resume token.
135          *
136          * This behavior allows a user to exit example-resume, and restart it later
137          * without missing any change events.
138          */
139          #include <unistd.h>
140
141          static const char *RESUME_TOKEN_PATH = "resume-token.json";
142
143          static bool
144          _save_resume_token (const bson_t *doc)
145          {
146             FILE *file_stream;
147             bson_iter_t iter;
148             bson_t resume_token_doc;
149             char *as_json = NULL;
150             size_t as_json_len;
151             ssize_t r, n_written;
152             const bson_value_t *resume_token;
153
154             if (!bson_iter_init_find (&iter, doc, "_id")) {
155                fprintf (stderr, "reply does not contain operationTime.");
156                return false;
157             }
158             resume_token = bson_iter_value (&iter);
159             /* store the resume token in a document, { resumeAfter: <resume token> }
160              * which we can later append easily. */
161             file_stream = fopen (RESUME_TOKEN_PATH, "w+");
162             if (!file_stream) {
163                fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
164                return false;
165             }
166             bson_init (&resume_token_doc);
167             BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
168             as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
169             bson_destroy (&resume_token_doc);
170             n_written = 0;
171             while (n_written < as_json_len) {
172                r = fwrite ((void *) (as_json + n_written),
173                            sizeof (char),
174                            as_json_len - n_written,
175                            file_stream);
176                if (r == -1) {
177                   fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
178                   bson_free (as_json);
179                   fclose (file_stream);
180                   return false;
181                }
182                n_written += r;
183             }
184
185             bson_free (as_json);
186             fclose (file_stream);
187             return true;
188          }
189
190          bool
191          _load_resume_token (bson_t *opts)
192          {
193             bson_error_t error;
194             bson_json_reader_t *reader;
195             bson_t doc;
196
197             /* if the file does not exist, skip. */
198             if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
199                return true;
200             }
201             reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
202             if (!reader) {
203                fprintf (stderr,
204                         "failed to open %s for reading: %s\n",
205                         RESUME_TOKEN_PATH,
206                         error.message);
207                return false;
208             }
209
210             bson_init (&doc);
211             if (-1 == bson_json_reader_read (reader, &doc, &error)) {
212                fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
213                bson_destroy (&doc);
214                bson_json_reader_destroy (reader);
215                return false;
216             }
217
218             printf ("found cached resume token in %s, resuming change stream.\n",
219                     RESUME_TOKEN_PATH);
220
221             bson_concat (opts, &doc);
222             bson_destroy (&doc);
223             bson_json_reader_destroy (reader);
224             return true;
225          }
226
227          int
228          main (void)
229          {
230             int exit_code = EXIT_FAILURE;
231             const char *uri_string;
232             mongoc_uri_t *uri = NULL;
233             bson_error_t error;
234             mongoc_client_t *client = NULL;
235             bson_t pipeline = BSON_INITIALIZER;
236             bson_t opts = BSON_INITIALIZER;
237             mongoc_change_stream_t *stream = NULL;
238             const bson_t *doc;
239
240             const int max_time = 30; /* max amount of time, in seconds, that
241                                         mongoc_change_stream_next can block. */
242
243             mongoc_init ();
244             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
245             uri = mongoc_uri_new_with_error (uri_string, &error);
246             if (!uri) {
247                fprintf (stderr,
248                         "failed to parse URI: %s\n"
249                         "error message:       %s\n",
250                         uri_string,
251                         error.message);
252                goto cleanup;
253             }
254
255             client = mongoc_client_new_from_uri (uri);
256             if (!client) {
257                goto cleanup;
258             }
259
260             if (!_load_resume_token (&opts)) {
261                goto cleanup;
262             }
263             BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);
264
265             printf ("listening for changes on the client (max %d seconds).\n", max_time);
266             stream = mongoc_client_watch (client, &pipeline, &opts);
267
268             while (mongoc_change_stream_next (stream, &doc)) {
269                char *as_json;
270
271                as_json = bson_as_canonical_extended_json (doc, NULL);
272                printf ("change received: %s\n", as_json);
273                bson_free (as_json);
274                if (!_save_resume_token (doc)) {
275                   goto cleanup;
276                }
277             }
278
279             exit_code = EXIT_SUCCESS;
280
281          cleanup:
282             mongoc_uri_destroy (uri);
283             bson_destroy (&pipeline);
284             bson_destroy (&opts);
285             mongoc_change_stream_destroy (stream);
286             mongoc_client_destroy (client);
287             mongoc_cleanup ();
288             return exit_code;
289          }
290
291
292       The following example shows using startAtOperationTime to synchronize a
293       change stream with another operation.
294
295       example-start-at-optime.c
296
297          /* An example of starting a change stream with startAtOperationTime. */
298          #include <mongoc/mongoc.h>
299
300          int
301          main (void)
302          {
303             int exit_code = EXIT_FAILURE;
304             const char *uri_string;
305             mongoc_uri_t *uri = NULL;
306             bson_error_t error;
307             mongoc_client_t *client = NULL;
308             mongoc_collection_t *coll = NULL;
309             bson_t pipeline = BSON_INITIALIZER;
310             bson_t opts = BSON_INITIALIZER;
311             mongoc_change_stream_t *stream = NULL;
312             bson_iter_t iter;
313             const bson_t *doc;
314             bson_value_t cached_operation_time = {0};
315             int i;
316             bool r;
317
318             mongoc_init ();
319             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
320             uri = mongoc_uri_new_with_error (uri_string, &error);
321             if (!uri) {
322                fprintf (stderr,
323                         "failed to parse URI: %s\n"
324                         "error message:       %s\n",
325                         uri_string,
326                         error.message);
327                goto cleanup;
328             }
329
330             client = mongoc_client_new_from_uri (uri);
331             if (!client) {
332                goto cleanup;
333             }
334
335             /* insert five documents. */
336             coll = mongoc_client_get_collection (client, "db", "coll");
337             for (i = 0; i < 5; i++) {
338                bson_t reply;
339                bson_t *insert_cmd = BCON_NEW ("insert",
340                                               "coll",
341                                               "documents",
342                                               "[",
343                                               "{",
344                                               "x",
345                                               BCON_INT64 (i),
346                                               "}",
347                                               "]");
348
349                r = mongoc_collection_write_command_with_opts (
350                   coll, insert_cmd, NULL, &reply, &error);
351                bson_destroy (insert_cmd);
352                if (!r) {
353                   bson_destroy (&reply);
354                   fprintf (stderr, "failed to insert: %s\n", error.message);
355                   goto cleanup;
356                }
357                if (i == 0) {
358                   /* cache the operation time in the first reply. */
359                   if (bson_iter_init_find (&iter, &reply, "operationTime")) {
360                      bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
361                   } else {
362                      fprintf (stderr, "reply does not contain operationTime.");
363                      bson_destroy (&reply);
364                      goto cleanup;
365                   }
366                }
367                bson_destroy (&reply);
368             }
369
370             /* start a change stream at the first returned operationTime. */
371             BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
372             stream = mongoc_collection_watch (coll, &pipeline, &opts);
373
374             /* since the change stream started at the operation time of the first
375              * insert, the five inserts are returned. */
376             printf ("listening for changes on db.coll:\n");
377             while (mongoc_change_stream_next (stream, &doc)) {
378                char *as_json;
379
380                as_json = bson_as_canonical_extended_json (doc, NULL);
381                printf ("change received: %s\n", as_json);
382                bson_free (as_json);
383             }
384
385             exit_code = EXIT_SUCCESS;
386
387          cleanup:
388             mongoc_uri_destroy (uri);
389             bson_destroy (&pipeline);
390             bson_destroy (&opts);
391             if (cached_operation_time.value_type) {
392                bson_value_destroy (&cached_operation_time);
393             }
394             mongoc_change_stream_destroy (stream);
395             mongoc_collection_destroy (coll);
396             mongoc_client_destroy (client);
397             mongoc_cleanup ();
398             return exit_code;
399          }
400
401

AUTHOR

403       MongoDB, Inc
404
406       2017-present, MongoDB, Inc
407
408
409
410
4111.25.1                           Nov 08, 2023        MONGOC_CHANGE_STREAM_T(3)
Impressum