1MONGOC_CHANGE_STREAM_T(3)          libmongoc         MONGOC_CHANGE_STREAM_T(3)
2
3
4

NAME

6       mongoc_change_stream_t - mongoc_change_stream_t
7

SYNOPSIS

9          #include <mongoc/mongoc.h>
10
11          typedef struct _mongoc_change_stream_t mongoc_change_stream_t;
12
13       mongoc_change_stream_t  is  a  handle  to a change stream. A collection
14       change stream can be obtained using mongoc_collection_watch.
15
16       It is recommended to use a  mongoc_change_stream_t  and  its  functions
17       instead  of  a  raw  aggregation  with  a $changeStream stage. For more
18       information see the MongoDB Manual Entry on Change Streams.
19

EXAMPLE

21       example-collection-watch.c.INDENT 0.0
22
23          #include <mongoc/mongoc.h>
24
25          int
26          main ()
27          {
28             bson_t empty = BSON_INITIALIZER;
29             const bson_t *doc;
30             bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
31             const bson_t *err_doc;
32             bson_error_t error;
33             const char *uri_string;
34             mongoc_uri_t *uri;
35             mongoc_client_t *client;
36             mongoc_collection_t *coll;
37             mongoc_change_stream_t *stream;
38             mongoc_write_concern_t *wc = mongoc_write_concern_new ();
39             bson_t opts = BSON_INITIALIZER;
40             bool r;
41
42             mongoc_init ();
43
44             uri_string = "mongodb://"
45                          "localhost:27017,localhost:27018,localhost:"
46                          "27019/db?replicaSet=rs0";
47
48             uri = mongoc_uri_new_with_error (uri_string, &error);
49             if (!uri) {
50                fprintf (stderr,
51                         "failed to parse URI: %s\n"
52                         "error message:       %s\n",
53                         uri_string,
54                         error.message);
55                return EXIT_FAILURE;
56             }
57
58             client = mongoc_client_new_from_uri (uri);
59             if (!client) {
60                return EXIT_FAILURE;
61             }
62
63             coll = mongoc_client_get_collection (client, "db", "coll");
64             stream = mongoc_collection_watch (coll, &empty, NULL);
65
66             mongoc_write_concern_set_wmajority (wc, 10000);
67             mongoc_write_concern_append (wc, &opts);
68             r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
69             if (!r) {
70                fprintf (stderr, "Error: %s\n", error.message);
71                return EXIT_FAILURE;
72             }
73
74             while (mongoc_change_stream_next (stream, &doc)) {
75                char *as_json = bson_as_relaxed_extended_json (doc, NULL);
76                fprintf (stderr, "Got document: %s\n", as_json);
77                bson_free (as_json);
78             }
79
80             if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
81                if (!bson_empty (err_doc)) {
82                   fprintf (stderr,
83                            "Server Error: %s\n",
84                            bson_as_relaxed_extended_json (err_doc, NULL));
85                } else {
86                   fprintf (stderr, "Client Error: %s\n", error.message);
87                }
88                return EXIT_FAILURE;
89             }
90
91             bson_destroy (to_insert);
92             mongoc_write_concern_destroy (wc);
93             bson_destroy (&opts);
94             mongoc_change_stream_destroy (stream);
95             mongoc_collection_destroy (coll);
96             mongoc_uri_destroy (uri);
97             mongoc_client_destroy (client);
98             mongoc_cleanup ();
99
100             return EXIT_SUCCESS;
101          }
102
103
104   Starting and Resuming
105       All watch functions accept several options to indicate where  a  change
106       stream  should  start  returning changes from: resumeAfter, startAfter,
107       and startAtOperationTime.
108
109       All changes returned  by  mongoc_change_stream_next  include  a  resume
110       token  in the _id field. MongoDB 4.2 also includes an additional resume
111       token in each "aggregate" and "getMore" command response, which  points
112       to the end of that response's batch. The current token is automatically
113       cached by libmongoc. In the event of an error,  libmongoc  attempts  to
114       recreate  the  change  stream starting where it left off by passing the
115       cached resume token. libmongoc only attempts to resume once, but client
116       applications   can   access   the   cached   resume   token  with  mon‐
117       goc_change_stream_get_resume_token and use  it  for  their  own  resume
118       logic by passing it as either the resumeAfter or startAfter option.
119
120       Additionally,  change  streams can start returning changes at an opera‐
121       tion time by using the startAtOperationTime  field.  This  can  be  the
122       timestamp returned in the operationTime field of a command reply.
123
124       resumeAfter,  startAfter,  and startAtOperationTime are mutually exclu‐
125       sive options. Setting more than one will result in a server error.
126
127       The following example implements custom resuming logic, persisting  the
128       resume token in a file.  example-resume.c.INDENT 0.0
129
130          #include <mongoc/mongoc.h>
131
132          /* An example implementation of custom resume logic in a change stream.
133          * example-resume starts a client-wide change stream and persists the resume
134          * token in a file "resume-token.json". On restart, if "resume-token.json"
135          * exists, the change stream starts watching after the persisted resume token.
136          *
137          * This behavior allows a user to exit example-resume, and restart it later
138          * without missing any change events.
139          */
140          #include <unistd.h>
141
142          static const char *RESUME_TOKEN_PATH = "resume-token.json";
143
144          static bool
145          _save_resume_token (const bson_t *doc)
146          {
147             FILE *file_stream;
148             bson_iter_t iter;
149             bson_t resume_token_doc;
150             char *as_json = NULL;
151             size_t as_json_len;
152             ssize_t r, n_written;
153             const bson_value_t *resume_token;
154
155             if (!bson_iter_init_find (&iter, doc, "_id")) {
156                fprintf (stderr, "reply does not contain operationTime.");
157                return false;
158             }
159             resume_token = bson_iter_value (&iter);
160             /* store the resume token in a document, { resumeAfter: <resume token> }
161              * which we can later append easily. */
162             file_stream = fopen (RESUME_TOKEN_PATH, "w+");
163             if (!file_stream) {
164                fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
165                return false;
166             }
167             bson_init (&resume_token_doc);
168             BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
169             as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
170             bson_destroy (&resume_token_doc);
171             n_written = 0;
172             while (n_written < as_json_len) {
173                r = fwrite ((void *) (as_json + n_written),
174                            sizeof (char),
175                            as_json_len - n_written,
176                            file_stream);
177                if (r == -1) {
178                   fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
179                   bson_free (as_json);
180                   fclose (file_stream);
181                   return false;
182                }
183                n_written += r;
184             }
185
186             bson_free (as_json);
187             fclose (file_stream);
188             return true;
189          }
190
191          bool
192          _load_resume_token (bson_t *opts)
193          {
194             bson_error_t error;
195             bson_json_reader_t *reader;
196             bson_t doc;
197
198             /* if the file does not exist, skip. */
199             if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
200                return true;
201             }
202             reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
203             if (!reader) {
204                fprintf (stderr,
205                         "failed to open %s for reading: %s\n",
206                         RESUME_TOKEN_PATH,
207                         error.message);
208                return false;
209             }
210
211             bson_init (&doc);
212             if (-1 == bson_json_reader_read (reader, &doc, &error)) {
213                fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
214                bson_destroy (&doc);
215                bson_json_reader_destroy (reader);
216                return false;
217             }
218
219             printf ("found cached resume token in %s, resuming change stream.\n",
220                     RESUME_TOKEN_PATH);
221
222             bson_concat (opts, &doc);
223             bson_destroy (&doc);
224             bson_json_reader_destroy (reader);
225             return true;
226          }
227
228          int
229          main ()
230          {
231             int exit_code = EXIT_FAILURE;
232             const char *uri_string;
233             mongoc_uri_t *uri = NULL;
234             bson_error_t error;
235             mongoc_client_t *client = NULL;
236             bson_t pipeline = BSON_INITIALIZER;
237             bson_t opts = BSON_INITIALIZER;
238             mongoc_change_stream_t *stream = NULL;
239             const bson_t *doc;
240
241             const int max_time = 30; /* max amount of time, in seconds, that
242                                         mongoc_change_stream_next can block. */
243
244             mongoc_init ();
245             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
246             uri = mongoc_uri_new_with_error (uri_string, &error);
247             if (!uri) {
248                fprintf (stderr,
249                         "failed to parse URI: %s\n"
250                         "error message:       %s\n",
251                         uri_string,
252                         error.message);
253                goto cleanup;
254             }
255
256             client = mongoc_client_new_from_uri (uri);
257             if (!client) {
258                goto cleanup;
259             }
260
261             if (!_load_resume_token (&opts)) {
262                goto cleanup;
263             }
264             BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);
265
266             printf ("listening for changes on the client (max %d seconds).\n", max_time);
267             stream = mongoc_client_watch (client, &pipeline, &opts);
268
269             while (mongoc_change_stream_next (stream, &doc)) {
270                char *as_json;
271
272                as_json = bson_as_canonical_extended_json (doc, NULL);
273                printf ("change received: %s\n", as_json);
274                bson_free (as_json);
275                if (!_save_resume_token (doc)) {
276                   goto cleanup;
277                }
278             }
279
280             exit_code = EXIT_SUCCESS;
281
282          cleanup:
283             mongoc_uri_destroy (uri);
284             bson_destroy (&pipeline);
285             bson_destroy (&opts);
286             mongoc_change_stream_destroy (stream);
287             mongoc_client_destroy (client);
288             mongoc_cleanup ();
289             return exit_code;
290          }
291
292
293The following example shows using startAtOperationTime to synchronize a change
294stream with another operation.  example-start-at-optime.c.INDENT 0.0
295
296          /* An example of starting a change stream with startAtOperationTime. */
297          #include <mongoc/mongoc.h>
298
299          int
300          main ()
301          {
302             int exit_code = EXIT_FAILURE;
303             const char *uri_string;
304             mongoc_uri_t *uri = NULL;
305             bson_error_t error;
306             mongoc_client_t *client = NULL;
307             mongoc_collection_t *coll = NULL;
308             bson_t pipeline = BSON_INITIALIZER;
309             bson_t opts = BSON_INITIALIZER;
310             mongoc_change_stream_t *stream = NULL;
311             bson_iter_t iter;
312             const bson_t *doc;
313             bson_value_t cached_operation_time = {0};
314             int i;
315             bool r;
316
317             mongoc_init ();
318             uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
319             uri = mongoc_uri_new_with_error (uri_string, &error);
320             if (!uri) {
321                fprintf (stderr,
322                         "failed to parse URI: %s\n"
323                         "error message:       %s\n",
324                         uri_string,
325                         error.message);
326                goto cleanup;
327             }
328
329             client = mongoc_client_new_from_uri (uri);
330             if (!client) {
331                goto cleanup;
332             }
333
334             /* insert five documents. */
335             coll = mongoc_client_get_collection (client, "db", "coll");
336             for (i = 0; i < 5; i++) {
337                bson_t reply;
338                bson_t *insert_cmd = BCON_NEW ("insert",
339                                               "coll",
340                                               "documents",
341                                               "[",
342                                               "{",
343                                               "x",
344                                               BCON_INT64 (i),
345                                               "}",
346                                               "]");
347
348                r = mongoc_collection_write_command_with_opts (
349                   coll, insert_cmd, NULL, &reply, &error);
350                bson_destroy (insert_cmd);
351                if (!r) {
352                   bson_destroy (&reply);
353                   fprintf (stderr, "failed to insert: %s\n", error.message);
354                   goto cleanup;
355                }
356                if (i == 0) {
357                   /* cache the operation time in the first reply. */
358                   if (bson_iter_init_find (&iter, &reply, "operationTime")) {
359                      bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
360                   } else {
361                      fprintf (stderr, "reply does not contain operationTime.");
362                      bson_destroy (&reply);
363                      goto cleanup;
364                   }
365                }
366                bson_destroy (&reply);
367             }
368
369             /* start a change stream at the first returned operationTime. */
370             BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
371             stream = mongoc_collection_watch (coll, &pipeline, &opts);
372
373             /* since the change stream started at the operation time of the first
374              * insert, the five inserts are returned. */
375             printf ("listening for changes on db.coll:\n");
376             while (mongoc_change_stream_next (stream, &doc)) {
377                char *as_json;
378
379                as_json = bson_as_canonical_extended_json (doc, NULL);
380                printf ("change received: %s\n", as_json);
381                bson_free (as_json);
382             }
383
384             exit_code = EXIT_SUCCESS;
385
386          cleanup:
387             mongoc_uri_destroy (uri);
388             bson_destroy (&pipeline);
389             bson_destroy (&opts);
390             if (cached_operation_time.value_type) {
391                bson_value_destroy (&cached_operation_time);
392             }
393             mongoc_change_stream_destroy (stream);
394             mongoc_collection_destroy (coll);
395             mongoc_client_destroy (client);
396             mongoc_cleanup ();
397             return exit_code;
398          }
399

AUTHOR

401       MongoDB, Inc
402
404       2017-present, MongoDB, Inc
405
406
407
408
4091.16.2                           Feb 25, 2020        MONGOC_CHANGE_STREAM_T(3)
Impressum