1MONGOC_CHANGE_STREAM_T(3) MongoDB C Driver MONGOC_CHANGE_STREAM_T(3)
2
3
4
6 mongoc_change_stream_t - mongoc_change_stream_t
7
9 #include <mongoc/mongoc.h>
10
11 typedef struct _mongoc_change_stream_t mongoc_change_stream_t;
12
13 mongoc_change_stream_t is a handle to a change stream. A collection
14 change stream can be obtained using mongoc_collection_watch.
15
16 It is recommended to use a mongoc_change_stream_t and its functions
17 instead of a raw aggregation with a $changeStream stage. For more
18 information see the MongoDB Manual Entry on Change Streams.
19
21 example-collection-watch.c.INDENT 0.0
22
23 #include <mongoc/mongoc.h>
24
25 int
26 main ()
27 {
28 bson_t empty = BSON_INITIALIZER;
29 const bson_t *doc;
30 bson_t *to_insert = BCON_NEW ("x", BCON_INT32 (1));
31 const bson_t *err_doc;
32 bson_error_t error;
33 const char *uri_string;
34 mongoc_uri_t *uri;
35 mongoc_client_t *client;
36 mongoc_collection_t *coll;
37 mongoc_change_stream_t *stream;
38 mongoc_write_concern_t *wc = mongoc_write_concern_new ();
39 bson_t opts = BSON_INITIALIZER;
40 bool r;
41
42 mongoc_init ();
43
44 uri_string = "mongodb://"
45 "localhost:27017,localhost:27018,localhost:"
46 "27019/db?replicaSet=rs0";
47
48 uri = mongoc_uri_new_with_error (uri_string, &error);
49 if (!uri) {
50 fprintf (stderr,
51 "failed to parse URI: %s\n"
52 "error message: %s\n",
53 uri_string,
54 error.message);
55 return EXIT_FAILURE;
56 }
57
58 client = mongoc_client_new_from_uri (uri);
59 if (!client) {
60 return EXIT_FAILURE;
61 }
62
63 coll = mongoc_client_get_collection (client, "db", "coll");
64 stream = mongoc_collection_watch (coll, &empty, NULL);
65
66 mongoc_write_concern_set_wmajority (wc, 10000);
67 mongoc_write_concern_append (wc, &opts);
68 r = mongoc_collection_insert_one (coll, to_insert, &opts, NULL, &error);
69 if (!r) {
70 fprintf (stderr, "Error: %s\n", error.message);
71 return EXIT_FAILURE;
72 }
73
74 while (mongoc_change_stream_next (stream, &doc)) {
75 char *as_json = bson_as_relaxed_extended_json (doc, NULL);
76 fprintf (stderr, "Got document: %s\n", as_json);
77 bson_free (as_json);
78 }
79
80 if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
81 if (!bson_empty (err_doc)) {
82 fprintf (stderr,
83 "Server Error: %s\n",
84 bson_as_relaxed_extended_json (err_doc, NULL));
85 } else {
86 fprintf (stderr, "Client Error: %s\n", error.message);
87 }
88 return EXIT_FAILURE;
89 }
90
91 bson_destroy (to_insert);
92 mongoc_write_concern_destroy (wc);
93 bson_destroy (&opts);
94 mongoc_change_stream_destroy (stream);
95 mongoc_collection_destroy (coll);
96 mongoc_uri_destroy (uri);
97 mongoc_client_destroy (client);
98 mongoc_cleanup ();
99
100 return EXIT_SUCCESS;
101 }
102
103
104 Starting and Resuming
105 All watch functions accept two options to indicate where a change
106 stream should start returning changes from: startAtOperationTime and
107 resumeAfter.
108
109 All changes returned by mongoc_change_stream_next include a resume
110 token in the _id field. This resume token is automatically cached in
111 libmongoc. In the event of an error, libmongoc attempts to recreate
112 the change stream starting where it left off by passing the resume
113 token. libmongoc only attempts to resume once, but client applications
114 can cache this resume token and use it for their own resume logic by
115 passing it as the option resumeAfter.
116
117 Additionally, change streams can start returning changes at an opera‐
118 tion time by using the startAtOperationTime field. This can be the
119 timestamp returned in the operationTime field of a command reply.
120
121 startAtOperationTime and resumeAfter are mutually exclusive options.
122 Setting them both will result in a server error.
123
124 The following example implements custom resuming logic, persisting the
125 resume token in a file. example-resume.c.INDENT 0.0
126
127 #include <mongoc/mongoc.h>
128
129 /* An example implementation of custom resume logic in a change stream.
130 * example-resume starts a client-wide change stream and persists the resume
131 * token in a file "resume-token.json". On restart, if "resume-token.json"
132 * exists, the change stream starts watching after the persisted resume token.
133 *
134 * This behavior allows a user to exit example-resume, and restart it later
135 * without missing any change events.
136 */
137 #include <unistd.h>
138
139 static const char *RESUME_TOKEN_PATH = "resume-token.json";
140
141 static bool
142 _save_resume_token (const bson_t *doc)
143 {
144 FILE *file_stream;
145 bson_iter_t iter;
146 bson_t resume_token_doc;
147 char *as_json = NULL;
148 size_t as_json_len;
149 ssize_t r, n_written;
150 const bson_value_t *resume_token;
151
152 if (!bson_iter_init_find (&iter, doc, "_id")) {
153 fprintf (stderr, "reply does not contain operationTime.");
154 return false;
155 }
156 resume_token = bson_iter_value (&iter);
157 /* store the resume token in a document, { resumeAfter: <resume token> }
158 * which we can later append easily. */
159 file_stream = fopen (RESUME_TOKEN_PATH, "w+");
160 if (!file_stream) {
161 fprintf (stderr, "failed to open %s for writing\n", RESUME_TOKEN_PATH);
162 return false;
163 }
164 bson_init (&resume_token_doc);
165 BSON_APPEND_VALUE (&resume_token_doc, "resumeAfter", resume_token);
166 as_json = bson_as_canonical_extended_json (&resume_token_doc, &as_json_len);
167 bson_destroy (&resume_token_doc);
168 n_written = 0;
169 while (n_written < as_json_len) {
170 r = fwrite ((void *) (as_json + n_written),
171 sizeof (char),
172 as_json_len - n_written,
173 file_stream);
174 if (r == -1) {
175 fprintf (stderr, "failed to write to %s\n", RESUME_TOKEN_PATH);
176 bson_free (as_json);
177 fclose (file_stream);
178 return false;
179 }
180 n_written += r;
181 }
182
183 bson_free (as_json);
184 fclose (file_stream);
185 return true;
186 }
187
188 bool
189 _load_resume_token (bson_t *opts)
190 {
191 bson_error_t error;
192 bson_json_reader_t *reader;
193 bson_t doc;
194
195 /* if the file does not exist, skip. */
196 if (-1 == access (RESUME_TOKEN_PATH, R_OK)) {
197 return true;
198 }
199 reader = bson_json_reader_new_from_file (RESUME_TOKEN_PATH, &error);
200 if (!reader) {
201 fprintf (stderr,
202 "failed to open %s for reading: %s\n",
203 RESUME_TOKEN_PATH,
204 error.message);
205 return false;
206 }
207
208 bson_init (&doc);
209 if (-1 == bson_json_reader_read (reader, &doc, &error)) {
210 fprintf (stderr, "failed to read doc from %s\n", RESUME_TOKEN_PATH);
211 bson_destroy (&doc);
212 bson_json_reader_destroy (reader);
213 return false;
214 }
215
216 printf ("found cached resume token in %s, resuming change stream.\n",
217 RESUME_TOKEN_PATH);
218
219 bson_concat (opts, &doc);
220 bson_destroy (&doc);
221 bson_json_reader_destroy (reader);
222 return true;
223 }
224
225 int
226 main ()
227 {
228 int exit_code = EXIT_FAILURE;
229 const char *uri_string;
230 mongoc_uri_t *uri = NULL;
231 bson_error_t error;
232 mongoc_client_t *client = NULL;
233 bson_t pipeline = BSON_INITIALIZER;
234 bson_t opts = BSON_INITIALIZER;
235 mongoc_change_stream_t *stream = NULL;
236 const bson_t *doc;
237
238 const int max_time = 30; /* max amount of time, in seconds, that
239 mongoc_change_stream_next can block. */
240
241 mongoc_init ();
242 uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
243 uri = mongoc_uri_new_with_error (uri_string, &error);
244 if (!uri) {
245 fprintf (stderr,
246 "failed to parse URI: %s\n"
247 "error message: %s\n",
248 uri_string,
249 error.message);
250 goto cleanup;
251 }
252
253 client = mongoc_client_new_from_uri (uri);
254 if (!client) {
255 goto cleanup;
256 }
257
258 if (!_load_resume_token (&opts)) {
259 goto cleanup;
260 }
261 BSON_APPEND_INT64 (&opts, "maxAwaitTimeMS", max_time * 1000);
262
263 printf ("listening for changes on the client (max %d seconds).\n", max_time);
264 stream = mongoc_client_watch (client, &pipeline, &opts);
265
266 while (mongoc_change_stream_next (stream, &doc)) {
267 char *as_json;
268
269 as_json = bson_as_canonical_extended_json (doc, NULL);
270 printf ("change received: %s\n", as_json);
271 bson_free (as_json);
272 if (!_save_resume_token (doc)) {
273 goto cleanup;
274 }
275 }
276
277 exit_code = EXIT_SUCCESS;
278
279 cleanup:
280 mongoc_uri_destroy (uri);
281 bson_destroy (&pipeline);
282 bson_destroy (&opts);
283 mongoc_change_stream_destroy (stream);
284 mongoc_client_destroy (client);
285 mongoc_cleanup ();
286 return exit_code;
287 }
288
289
290The following example shows using startAtOperationTime to synchronize a change
291stream with another operation. example-start-at-optime.c.INDENT 0.0
292
293 /* An example of starting a change stream with startAtOperationTime. */
294 #include <mongoc/mongoc.h>
295
296 int
297 main ()
298 {
299 int exit_code = EXIT_FAILURE;
300 const char *uri_string;
301 mongoc_uri_t *uri = NULL;
302 bson_error_t error;
303 mongoc_client_t *client = NULL;
304 mongoc_collection_t *coll = NULL;
305 bson_t pipeline = BSON_INITIALIZER;
306 bson_t opts = BSON_INITIALIZER;
307 mongoc_change_stream_t *stream = NULL;
308 bson_iter_t iter;
309 const bson_t *doc;
310 bson_value_t cached_operation_time = {0};
311 int i;
312 bool r;
313
314 mongoc_init ();
315 uri_string = "mongodb://localhost:27017/db?replicaSet=rs0";
316 uri = mongoc_uri_new_with_error (uri_string, &error);
317 if (!uri) {
318 fprintf (stderr,
319 "failed to parse URI: %s\n"
320 "error message: %s\n",
321 uri_string,
322 error.message);
323 goto cleanup;
324 }
325
326 client = mongoc_client_new_from_uri (uri);
327 if (!client) {
328 goto cleanup;
329 }
330
331 /* insert five documents. */
332 coll = mongoc_client_get_collection (client, "db", "coll");
333 for (i = 0; i < 5; i++) {
334 bson_t reply;
335 bson_t *insert_cmd = BCON_NEW ("insert",
336 "coll",
337 "documents",
338 "[",
339 "{",
340 "x",
341 BCON_INT64 (i),
342 "}",
343 "]");
344
345 r = mongoc_collection_write_command_with_opts (
346 coll, insert_cmd, NULL, &reply, &error);
347 bson_destroy (insert_cmd);
348 if (!r) {
349 bson_destroy (&reply);
350 fprintf (stderr, "failed to insert: %s\n", error.message);
351 goto cleanup;
352 }
353 if (i == 0) {
354 /* cache the operation time in the first reply. */
355 if (bson_iter_init_find (&iter, &reply, "operationTime")) {
356 bson_value_copy (bson_iter_value (&iter), &cached_operation_time);
357 } else {
358 fprintf (stderr, "reply does not contain operationTime.");
359 bson_destroy (&reply);
360 goto cleanup;
361 }
362 }
363 bson_destroy (&reply);
364 }
365
366 /* start a change stream at the first returned operationTime. */
367 BSON_APPEND_VALUE (&opts, "startAtOperationTime", &cached_operation_time);
368 stream = mongoc_collection_watch (coll, &pipeline, &opts);
369
370 /* since the change stream started at the operation time of the first
371 * insert, the five inserts are returned. */
372 printf ("listening for changes on db.coll:\n");
373 while (mongoc_change_stream_next (stream, &doc)) {
374 char *as_json;
375
376 as_json = bson_as_canonical_extended_json (doc, NULL);
377 printf ("change received: %s\n", as_json);
378 bson_free (as_json);
379 }
380
381 exit_code = EXIT_SUCCESS;
382
383 cleanup:
384 mongoc_uri_destroy (uri);
385 bson_destroy (&pipeline);
386 bson_destroy (&opts);
387 if (cached_operation_time.value_type) {
388 bson_value_destroy (&cached_operation_time);
389 }
390 mongoc_change_stream_destroy (stream);
391 mongoc_collection_destroy (coll);
392 mongoc_client_destroy (client);
393 mongoc_cleanup ();
394 return exit_code;
395 }
396
398 MongoDB, Inc
399
401 2017-present, MongoDB, Inc
402
403
404
405
4061.14.0 Feb 22, 2019 MONGOC_CHANGE_STREAM_T(3)