1MONGOC_CHANGE_STREAM_T(3)          libmongoc         MONGOC_CHANGE_STREAM_T(3)
2
3
4

NAME

6       mongoc_change_stream_t - mongoc_change_stream_t
7

SYNOPSIS

9          #include <mongoc/mongoc.h>
10
11          typedef struct _mongoc_change_stream_t mongoc_change_stream_t;
12
13       mongoc_change_stream_t  is  a  handle  to a change stream. A collection
14       change stream can be obtained using mongoc_collection_watch.
15
16       It is recommended to use a mongoc_change_stream_t and its functions in‐
17       stead  of a raw aggregation with a $changeStream stage. For more infor‐
18       mation see the MongoDB Manual Entry on Change Streams.
19

EXAMPLE

21       example-collection-watch.c
22
23          #include <mongoc/mongoc.h>
24
25          int
26          main ()
27          {
28             bson_t empty = BSON_INITIALIZER;
29             const bson_t *doc;
30             bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
31             const bson_t *err_doc;
32             bson_error_t error;
33             const char *uri_string;
34             mongoc_uri_t *uri;
35             mongoc_client_t *client;
36             mongoc_collection_t *coll;
37             mongoc_change_stream_t *stream;
38             mongoc_write_concern_t *wc = mongoc_write_concern_new ();
39             bson_t opts = BSON_INITIALIZER;
40             bool r;
41
42             mongoc_init ();
43
44             uri_string = "mongodb://"
45                          "localhost:27017,localhost:27018,localhost:"
46                          "27019/db?replicaSet=rs0";
47
48             uri = mongoc_uri_new_with_error (uri_string, &error);
49             if (!uri) {
50                fprintf (stderr,
51                         "failed to parse URI: %s\n"
52                         "error message:       %s\n",
53                         uri_string,
54                         error.message);
55                return EXIT_FAILURE;
56             }
57
58             client = mongoc_client_new_from_uri (uri);
59             if (!client) {
60                return EXIT_FAILURE;
61             }
62
63             coll = mongoc_client_get_collection (client, "db", "coll");
64             stream = mongoc_collection_watch (coll, &empty, NULL);
65
66             mongoc_write_concern_set_wmajority (wc, 10000);
67             mongoc_write_concern_append (wc, &opts);
68             r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
69             if (!r) {
70                fprintf (stderr, "Error: %s\n", error.message);
71                return EXIT_FAILURE;
72             }
73
74             while (mongoc_change_stream_next (stream, &doc)) {
75                char *as_json = bson_as_relaxed_extended_json (doc, NULL);
76                fprintf (stderr, "Got document: %s\n", as_json);
77                bson_free (as_json);
78             }
79
80             if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
81                if (!bson_empty (err_doc)) {
82                   fprintf (stderr,
83                            "Server Error: %s\n",
84                            bson_as_relaxed_extended_json (err_doc, NULL));
85                } else {
86                   fprintf (stderr, "Client Error: %s\n", error.message);
87                }
88                return EXIT_FAILURE;
89             }
90
91             bson_destroy (to_insert);
92             mongoc_write_concern_destroy (wc);
93             bson_destroy (&opts);
94             mongoc_change_stream_destroy (stream);
95             mongoc_collection_destroy (coll);
96             mongoc_uri_destroy (uri);
97             mongoc_client_destroy (client);
98             mongoc_cleanup ();
99
100             return EXIT_SUCCESS;
101          }
102
103
104   Starting and Resuming
105       All watch functions accept several options to indicate where  a  change
106       stream  should  start  returning changes from: resumeAfter, startAfter,
107       and startAtOperationTime.
108
109       All changes returned by mongoc_change_stream_next include a resume  to‐
110       ken  in  the  _id field. MongoDB 4.2 also includes an additional resume
111       token in each "aggregate" and "getMore" command response, which  points
112       to the end of that response's batch. The current token is automatically
113       cached by libmongoc. In the event of an error,  libmongoc  attempts  to
114       recreate  the  change  stream starting where it left off by passing the
115       cached resume token. libmongoc only attempts to resume once, but client
116       applications   can   access   the   cached   resume   token  with  mon‐
117       goc_change_stream_get_resume_token and use  it  for  their  own  resume
118       logic by passing it as either the resumeAfter or startAfter option.
119
120       Additionally,  change  streams can start returning changes at an opera‐
121       tion time by using the startAtOperationTime  field.  This  can  be  the
122       timestamp returned in the operationTime field of a command reply.
123
124       resumeAfter,  startAfter,  and startAtOperationTime are mutually exclu‐
125       sive options. Setting more than one will result in a server error.
126
127       The following example implements custom resuming logic, persisting  the
128       resume token in a file.
129
130       example-resume.c
131
132          #include <mongoc/mongoc.h>
133
134          /* An example implementation of custom resume logic in a change stream.
135          * example-resume starts a client-wide change stream and persists the resume
136          * token in a file "resume-token.json". On restart, if "resume-token.json"
137          * exists, the change stream starts watching after the persisted resume token.
138          *
139          * This behavior allows a user to exit example-resume, and restart it later
140          * without missing any change events.
141          */
142          #include <unistd.h>
143
144          static const char *RESUME_TOKEN_PATH = "resume-token.json";
145
146          static bool
147          _save_resume_token (const bson_t *doc)
148          {
149             FILE *file_stream;
150             bson_iter_t iter;
151             bson_t resume_token_doc;
152             char *as_json = NULL;
153             size_t as_json_len;
154             ssize_t r, n_written;
155             const bson_value_t *resume_token;
156
157             if (!bson_iter_init_find (&iter, doc, "_id")) {
158                fprintf (stderr, "reply does not contain operationTime.");
159                return false;
160             }
161             resume_token = bson_iter_value (&iter);
162             /* store the resume token in a document, { resumeAfter: <resume token> }
163              * which we can later append easily. */
164             file_stream = fopen (RESUME_TOKEN_PATH, "w+");
165             if (!file_stream) {
166                fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
167                return false;
168             }
169             bson_init (&resume_token_doc);
170             BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
171             as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
172             bson_destroy (&resume_token_doc);
173             n_written = 0;
174             while (n_written < as_json_len) {
175                r = fwrite ((void *) (as_json + n_written),
176                            sizeof (char),
177                            as_json_len - n_written,
178                            file_stream);
179                if (r == -1) {
180                   fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
181                   bson_free (as_json);
182                   fclose (file_stream);
183                   return false;
184                }
185                n_written += r;
186             }
187
188             bson_free (as_json);
189             fclose (file_stream);
190             return true;
191          }
192
193          bool
194          _load_resume_token (bson_t *opts)
195          {
196             bson_error_t error;
197             bson_json_reader_t *reader;
198             bson_t doc;
199
200             /* if the file does not exist, skip. */
201             if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
202                return true;
203             }
204             reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
205             if (!reader) {
206                fprintf (stderr,
207                         "failed to open %s for reading: %s\n",
208                         RESUME_TOKEN_PATH,
209                         error.message);
210                return false;
211             }
212
213             bson_init (&doc);
214             if (-1 == bson_json_reader_read (reader, &doc, &error)) {
215                fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
216                bson_destroy (&doc);
217                bson_json_reader_destroy (reader);
218                return false;
219             }
220
221             printf ("found cached resume token in %s, resuming change stream.\n",
222                     RESUME_TOKEN_PATH);
223
224             bson_concat (opts, &doc);
225             bson_destroy (&doc);
226             bson_json_reader_destroy (reader);
227             return true;
228          }
229
230          int
231          main ()
232          {
233             int exit_code = EXIT_FAILURE;
234             const char *uri_string;
235             mongoc_uri_t *uri = NULL;
236             bson_error_t error;
237             mongoc_client_t *client = NULL;
238             bson_t pipeline = BSON_INITIALIZER;
239             bson_t opts = BSON_INITIALIZER;
240             mongoc_change_stream_t *stream = NULL;
241             const bson_t *doc;
242
243             const int max_time = 30; /* max amount of time, in seconds, that
244                                         mongoc_change_stream_next can block. */
245
246             mongoc_init ();
247             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
248             uri = mongoc_uri_new_with_error (uri_string, &error);
249             if (!uri) {
250                fprintf (stderr,
251                         "failed to parse URI: %s\n"
252                         "error message:       %s\n",
253                         uri_string,
254                         error.message);
255                goto cleanup;
256             }
257
258             client = mongoc_client_new_from_uri (uri);
259             if (!client) {
260                goto cleanup;
261             }
262
263             if (!_load_resume_token (&opts)) {
264                goto cleanup;
265             }
266             BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);
267
268             printf ("listening for changes on the client (max %d seconds).\n", max_time);
269             stream = mongoc_client_watch (client, &pipeline, &opts);
270
271             while (mongoc_change_stream_next (stream, &doc)) {
272                char *as_json;
273
274                as_json = bson_as_canonical_extended_json (doc, NULL);
275                printf ("change received: %s\n", as_json);
276                bson_free (as_json);
277                if (!_save_resume_token (doc)) {
278                   goto cleanup;
279                }
280             }
281
282             exit_code = EXIT_SUCCESS;
283
284          cleanup:
285             mongoc_uri_destroy (uri);
286             bson_destroy (&pipeline);
287             bson_destroy (&opts);
288             mongoc_change_stream_destroy (stream);
289             mongoc_client_destroy (client);
290             mongoc_cleanup ();
291             return exit_code;
292          }
293
294
295       The following example shows using startAtOperationTime to synchronize a
296       change stream with another operation.
297
298       example-start-at-optime.c
299
300          /* An example of starting a change stream with startAtOperationTime. */
301          #include <mongoc/mongoc.h>
302
303          int
304          main ()
305          {
306             int exit_code = EXIT_FAILURE;
307             const char *uri_string;
308             mongoc_uri_t *uri = NULL;
309             bson_error_t error;
310             mongoc_client_t *client = NULL;
311             mongoc_collection_t *coll = NULL;
312             bson_t pipeline = BSON_INITIALIZER;
313             bson_t opts = BSON_INITIALIZER;
314             mongoc_change_stream_t *stream = NULL;
315             bson_iter_t iter;
316             const bson_t *doc;
317             bson_value_t cached_operation_time = {0};
318             int i;
319             bool r;
320
321             mongoc_init ();
322             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
323             uri = mongoc_uri_new_with_error (uri_string, &error);
324             if (!uri) {
325                fprintf (stderr,
326                         "failed to parse URI: %s\n"
327                         "error message:       %s\n",
328                         uri_string,
329                         error.message);
330                goto cleanup;
331             }
332
333             client = mongoc_client_new_from_uri (uri);
334             if (!client) {
335                goto cleanup;
336             }
337
338             /* insert five documents. */
339             coll = mongoc_client_get_collection (client, "db", "coll");
340             for (i = 0; i < 5; i++) {
341                bson_t reply;
342                bson_t *insert_cmd = BCON_NEW ("insert",
343                                               "coll",
344                                               "documents",
345                                               "[",
346                                               "{",
347                                               "x",
348                                               BCON_INT64 (i),
349                                               "}",
350                                               "]");
351
352                r = mongoc_collection_write_command_with_opts (
353                   coll, insert_cmd, NULL, &reply, &error);
354                bson_destroy (insert_cmd);
355                if (!r) {
356                   bson_destroy (&reply);
357                   fprintf (stderr, "failed to insert: %s\n", error.message);
358                   goto cleanup;
359                }
360                if (i == 0) {
361                   /* cache the operation time in the first reply. */
362                   if (bson_iter_init_find (&iter, &reply, "operationTime")) {
363                      bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
364                   } else {
365                      fprintf (stderr, "reply does not contain operationTime.");
366                      bson_destroy (&reply);
367                      goto cleanup;
368                   }
369                }
370                bson_destroy (&reply);
371             }
372
373             /* start a change stream at the first returned operationTime. */
374             BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
375             stream = mongoc_collection_watch (coll, &pipeline, &opts);
376
377             /* since the change stream started at the operation time of the first
378              * insert, the five inserts are returned. */
379             printf ("listening for changes on db.coll:\n");
380             while (mongoc_change_stream_next (stream, &doc)) {
381                char *as_json;
382
383                as_json = bson_as_canonical_extended_json (doc, NULL);
384                printf ("change received: %s\n", as_json);
385                bson_free (as_json);
386             }
387
388             exit_code = EXIT_SUCCESS;
389
390          cleanup:
391             mongoc_uri_destroy (uri);
392             bson_destroy (&pipeline);
393             bson_destroy (&opts);
394             if (cached_operation_time.value_type) {
395                bson_value_destroy (&cached_operation_time);
396             }
397             mongoc_change_stream_destroy (stream);
398             mongoc_collection_destroy (coll);
399             mongoc_client_destroy (client);
400             mongoc_cleanup ();
401             return exit_code;
402          }
403
404

AUTHOR

406       MongoDB, Inc
407
409       2017-present, MongoDB, Inc
410
411
412
413
4141.21.1                           Mar 02, 2022        MONGOC_CHANGE_STREAM_T(3)
Impressum