1MONGOC_CHANGE_STREAM_T(3)      MongoDB C Driver      MONGOC_CHANGE_STREAM_T(3)
2
3
4

NAME

6       mongoc_change_stream_t - mongoc_change_stream_t
7

SYNOPSIS

9          #include <mongoc/mongoc.h>
10
11          typedef struct _mongoc_change_stream_t mongoc_change_stream_t;
12
13       mongoc_change_stream_t  is  a  handle  to a change stream. A collection
14       change stream can be obtained using mongoc_collection_watch.
15
16       It is recommended to use a  mongoc_change_stream_t  and  its  functions
17       instead  of  a  raw  aggregation  with  a $changeStream stage. For more
18       information see the MongoDB Manual Entry on Change Streams.
19

EXAMPLE

21       example-collection-watch.c.INDENT 0.0
22
23          #include <mongoc/mongoc.h>
24
25          int
26          main ()
27          {
28             bson_t empty = BSON_INITIALIZER;
29             const bson_t *doc;
30             bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
31             const bson_t *err_doc;
32             bson_error_t error;
33             const char *uri_string;
34             mongoc_uri_t *uri;
35             mongoc_client_t *client;
36             mongoc_collection_t *coll;
37             mongoc_change_stream_t *stream;
38             mongoc_write_concern_t *wc = mongoc_write_concern_new ();
39             bson_t opts = BSON_INITIALIZER;
40             bool r;
41
42             mongoc_init ();
43
44             uri_string = "mongodb://"
45                          "localhost:27017,localhost:27018,localhost:"
46                          "27019/db?replicaSet=rs0";
47
48             uri = mongoc_uri_new_with_error (uri_string, &error);
49             if (!uri) {
50                fprintf (stderr,
51                         "failed to parse URI: %s\n"
52                         "error message:       %s\n",
53                         uri_string,
54                         error.message);
55                return EXIT_FAILURE;
56             }
57
58             client = mongoc_client_new_from_uri (uri);
59             if (!client) {
60                return EXIT_FAILURE;
61             }
62
63             coll = mongoc_client_get_collection (client, "db", "coll");
64             stream = mongoc_collection_watch (coll, &empty, NULL);
65
66             mongoc_write_concern_set_wmajority (wc, 10000);
67             mongoc_write_concern_append (wc, &opts);
68             r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
69             if (!r) {
70                fprintf (stderr, "Error: %s\n", error.message);
71                return EXIT_FAILURE;
72             }
73
74             while (mongoc_change_stream_next (stream, &doc)) {
75                char *as_json = bson_as_relaxed_extended_json (doc, NULL);
76                fprintf (stderr, "Got document: %s\n", as_json);
77                bson_free (as_json);
78             }
79
80             if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
81                if (!bson_empty (err_doc)) {
82                   fprintf (stderr,
83                            "Server Error: %s\n",
84                            bson_as_relaxed_extended_json (err_doc, NULL));
85                } else {
86                   fprintf (stderr, "Client Error: %s\n", error.message);
87                }
88                return EXIT_FAILURE;
89             }
90
91             bson_destroy (to_insert);
92             mongoc_write_concern_destroy (wc);
93             bson_destroy (&opts);
94             mongoc_change_stream_destroy (stream);
95             mongoc_collection_destroy (coll);
96             mongoc_uri_destroy (uri);
97             mongoc_client_destroy (client);
98             mongoc_cleanup ();
99
100             return EXIT_SUCCESS;
101          }
102
103
104   Starting and Resuming
105       All watch functions accept two  options  to  indicate  where  a  change
106       stream  should  start  returning changes from: startAtOperationTime and
107       resumeAfter.
108
109       All changes returned  by  mongoc_change_stream_next  include  a  resume
110       token  in  the  _id field. This resume token is automatically cached in
111       libmongoc.  In the event of an error, libmongoc  attempts  to  recreate
112       the  change  stream  starting  where  it left off by passing the resume
113       token.  libmongoc only attempts to resume once, but client applications
114       can  cache  this  resume token and use it for their own resume logic by
115       passing it as the option resumeAfter.
116
117       Additionally, change streams can start returning changes at  an  opera‐
118       tion  time  by  using  the  startAtOperationTime field. This can be the
119       timestamp returned in the operationTime field of a command reply.
120
121       startAtOperationTime and resumeAfter are  mutually  exclusive  options.
122       Setting them both will result in a server error.
123
124       The  following example implements custom resuming logic, persisting the
125       resume token in a file.  example-resume.c.INDENT 0.0
126
127          #include <mongoc/mongoc.h>
128
129          /* An example implementation of custom resume logic in a change stream.
130          * example-resume starts a client-wide change stream and persists the resume
131          * token in a file "resume-token.json". On restart, if "resume-token.json"
132          * exists, the change stream starts watching after the persisted resume token.
133          *
134          * This behavior allows a user to exit example-resume, and restart it later
135          * without missing any change events.
136          */
137          #include <unistd.h>
138
139          static const char *RESUME_TOKEN_PATH = "resume-token.json";
140
141          static bool
142          _save_resume_token (const bson_t *doc)
143          {
144             FILE *file_stream;
145             bson_iter_t iter;
146             bson_t resume_token_doc;
147             char *as_json = NULL;
148             size_t as_json_len;
149             ssize_t r, n_written;
150             const bson_value_t *resume_token;
151
152             if (!bson_iter_init_find (&iter, doc, "_id")) {
153                fprintf (stderr, "reply does not contain operationTime.");
154                return false;
155             }
156             resume_token = bson_iter_value (&iter);
157             /* store the resume token in a document, { resumeAfter: <resume token> }
158              * which we can later append easily. */
159             file_stream = fopen (RESUME_TOKEN_PATH, "w+");
160             if (!file_stream) {
161                fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
162                return false;
163             }
164             bson_init (&resume_token_doc);
165             BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
166             as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
167             bson_destroy (&resume_token_doc);
168             n_written = 0;
169             while (n_written < as_json_len) {
170                r = fwrite ((void *) (as_json + n_written),
171                            sizeof (char),
172                            as_json_len - n_written,
173                            file_stream);
174                if (r == -1) {
175                   fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
176                   bson_free (as_json);
177                   fclose (file_stream);
178                   return false;
179                }
180                n_written += r;
181             }
182
183             bson_free (as_json);
184             fclose (file_stream);
185             return true;
186          }
187
188          bool
189          _load_resume_token (bson_t *opts)
190          {
191             bson_error_t error;
192             bson_json_reader_t *reader;
193             bson_t doc;
194
195             /* if the file does not exist, skip. */
196             if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
197                return true;
198             }
199             reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
200             if (!reader) {
201                fprintf (stderr,
202                         "failed to open %s for reading: %s\n",
203                         RESUME_TOKEN_PATH,
204                         error.message);
205                return false;
206             }
207
208             bson_init (&doc);
209             if (-1 == bson_json_reader_read (reader, &doc, &error)) {
210                fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
211                bson_destroy (&doc);
212                bson_json_reader_destroy (reader);
213                return false;
214             }
215
216             printf ("found cached resume token in %s, resuming change stream.\n",
217                     RESUME_TOKEN_PATH);
218
219             bson_concat (opts, &doc);
220             bson_destroy (&doc);
221             bson_json_reader_destroy (reader);
222             return true;
223          }
224
225          int
226          main ()
227          {
228             int exit_code = EXIT_FAILURE;
229             const char *uri_string;
230             mongoc_uri_t *uri = NULL;
231             bson_error_t error;
232             mongoc_client_t *client = NULL;
233             bson_t pipeline = BSON_INITIALIZER;
234             bson_t opts = BSON_INITIALIZER;
235             mongoc_change_stream_t *stream = NULL;
236             const bson_t *doc;
237
238             const int max_time = 30; /* max amount of time, in seconds, that
239                                         mongoc_change_stream_next can block. */
240
241             mongoc_init ();
242             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
243             uri = mongoc_uri_new_with_error (uri_string, &error);
244             if (!uri) {
245                fprintf (stderr,
246                         "failed to parse URI: %s\n"
247                         "error message:       %s\n",
248                         uri_string,
249                         error.message);
250                goto cleanup;
251             }
252
253             client = mongoc_client_new_from_uri (uri);
254             if (!client) {
255                goto cleanup;
256             }
257
258             if (!_load_resume_token (&opts)) {
259                goto cleanup;
260             }
261             BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);
262
263             printf ("listening for changes on the client (max %d seconds).\n", max_time);
264             stream = mongoc_client_watch (client, &pipeline, &opts);
265
266             while (mongoc_change_stream_next (stream, &doc)) {
267                char *as_json;
268
269                as_json = bson_as_canonical_extended_json (doc, NULL);
270                printf ("change received: %s\n", as_json);
271                bson_free (as_json);
272                if (!_save_resume_token (doc)) {
273                   goto cleanup;
274                }
275             }
276
277             exit_code = EXIT_SUCCESS;
278
279          cleanup:
280             mongoc_uri_destroy (uri);
281             bson_destroy (&pipeline);
282             bson_destroy (&opts);
283             mongoc_change_stream_destroy (stream);
284             mongoc_client_destroy (client);
285             mongoc_cleanup ();
286             return exit_code;
287          }
288
289
290The following example shows using startAtOperationTime to synchronize a change
291stream with another operation.  example-start-at-optime.c.INDENT 0.0
292
293          /* An example of starting a change stream with startAtOperationTime. */
294          #include <mongoc/mongoc.h>
295
296          int
297          main ()
298          {
299             int exit_code = EXIT_FAILURE;
300             const char *uri_string;
301             mongoc_uri_t *uri = NULL;
302             bson_error_t error;
303             mongoc_client_t *client = NULL;
304             mongoc_collection_t *coll = NULL;
305             bson_t pipeline = BSON_INITIALIZER;
306             bson_t opts = BSON_INITIALIZER;
307             mongoc_change_stream_t *stream = NULL;
308             bson_iter_t iter;
309             const bson_t *doc;
310             bson_value_t cached_operation_time = {0};
311             int i;
312             bool r;
313
314             mongoc_init ();
315             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
316             uri = mongoc_uri_new_with_error (uri_string, &error);
317             if (!uri) {
318                fprintf (stderr,
319                         "failed to parse URI: %s\n"
320                         "error message:       %s\n",
321                         uri_string,
322                         error.message);
323                goto cleanup;
324             }
325
326             client = mongoc_client_new_from_uri (uri);
327             if (!client) {
328                goto cleanup;
329             }
330
331             /* insert five documents. */
332             coll = mongoc_client_get_collection (client, "db", "coll");
333             for (i = 0; i < 5; i++) {
334                bson_t reply;
335                bson_t *insert_cmd = BCON_NEW ("insert",
336                                               "coll",
337                                               "documents",
338                                               "[",
339                                               "{",
340                                               "x",
341                                               BCON_INT64 (i),
342                                               "}",
343                                               "]");
344
345                r = mongoc_collection_write_command_with_opts (
346                   coll, insert_cmd, NULL, &reply, &error);
347                bson_destroy (insert_cmd);
348                if (!r) {
349                   bson_destroy (&reply);
350                   fprintf (stderr, "failed to insert: %s\n", error.message);
351                   goto cleanup;
352                }
353                if (i == 0) {
354                   /* cache the operation time in the first reply. */
355                   if (bson_iter_init_find (&iter, &reply, "operationTime")) {
356                      bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
357                   } else {
358                      fprintf (stderr, "reply does not contain operationTime.");
359                      bson_destroy (&reply);
360                      goto cleanup;
361                   }
362                }
363                bson_destroy (&reply);
364             }
365
366             /* start a change stream at the first returned operationTime. */
367             BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
368             stream = mongoc_collection_watch (coll, &pipeline, &opts);
369
370             /* since the change stream started at the operation time of the first
371              * insert, the five inserts are returned. */
372             printf ("listening for changes on db.coll:\n");
373             while (mongoc_change_stream_next (stream, &doc)) {
374                char *as_json;
375
376                as_json = bson_as_canonical_extended_json (doc, NULL);
377                printf ("change received: %s\n", as_json);
378                bson_free (as_json);
379             }
380
381             exit_code = EXIT_SUCCESS;
382
383          cleanup:
384             mongoc_uri_destroy (uri);
385             bson_destroy (&pipeline);
386             bson_destroy (&opts);
387             if (cached_operation_time.value_type) {
388                bson_value_destroy (&cached_operation_time);
389             }
390             mongoc_change_stream_destroy (stream);
391             mongoc_collection_destroy (coll);
392             mongoc_client_destroy (client);
393             mongoc_cleanup ();
394             return exit_code;
395          }
396

AUTHOR

398       MongoDB, Inc
399
401       2017-present, MongoDB, Inc
402
403
404
405
4061.13.1                           Jan 24, 2019        MONGOC_CHANGE_STREAM_T(3)
Impressum