1FASTAVRO(1) fastavro FASTAVRO(1)
2
3
4
6 fastavro - fastavro Documentation
7
8 The current Python avro package is dog slow.
9
10 On a test case of about 10K records, it takes about 14sec to iterate
11 over all of them. In comparison the JAVA avro SDK does it in about
12 1.9sec.
13
14 fastavro is an alternative implementation that is much faster. It iter‐
15 ates over the same 10K records in 2.9sec, and if you use it with PyPy
16 it’ll do it in 1.5sec (to be fair, the JAVA benchmark is doing some ex‐
17 tra JSON encoding/decoding).
18
19 If the optional C extension (generated by Cython) is available, then
20 fastavro will be even faster. For the same 10K records it’ll run in
21 about 1.7sec.
22
24 • File Writer
25
26 • File Reader (iterating via records or blocks)
27
28 • Schemaless Writer
29
30 • Schemaless Reader
31
32 • JSON Writer
33
34 • JSON Reader
35
36 • Codecs (Snappy, Deflate, Zstandard, Bzip2, LZ4, XZ)
37
38 • Schema resolution
39
40 • Aliases
41
42 • Logical Types
43
44 • Parsing schemas into the canonical form
45
46 • Schema fingerprinting
47
49 • Anything involving Avro’s RPC features
50
52 from fastavro import writer, reader, parse_schema
53
54 schema = {
55 'doc': 'A weather reading.',
56 'name': 'Weather',
57 'namespace': 'test',
58 'type': 'record',
59 'fields': [
60 {'name': 'station', 'type': 'string'},
61 {'name': 'time', 'type': 'long'},
62 {'name': 'temp', 'type': 'int'},
63 ],
64 }
65 parsed_schema = parse_schema(schema)
66
67 # 'records' can be an iterable (including generator)
68 records = [
69 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
70 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
71 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
72 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
73 ]
74
75 # Writing
76 with open('weather.avro', 'wb') as out:
77 writer(out, parsed_schema, records)
78
79 # Reading
80 with open('weather.avro', 'rb') as fo:
81 for record in reader(fo):
82 print(record)
83
85 fastavro.read
86 class reader(fo, reader_schema=None, return_record_name=False)
87 Iterator over records in an avro file.
88
89 Parameters
90
91 • fo (file-like) – Input stream
92
93 • reader_schema (dict, optional) – Reader schema
94
95 • return_record_name (bool, optional) – If true, when
96 reading a union of records, the result will be a tuple
97 where the first value is the name of the record and the
98 second value is the record itself
99
100 Example:
101
102 from fastavro import reader
103 with open('some-file.avro', 'rb') as fo:
104 avro_reader = reader(fo)
105 for record in avro_reader:
106 process_record(record)
107
108 The fo argument is a file-like object so another common example
109 usage would use an io.BytesIO object like so:
110
111 from io import BytesIO
112 from fastavro import writer, reader
113
114 fo = BytesIO()
115 writer(fo, schema, records)
116 fo.seek(0)
117 for record in reader(fo):
118 process_record(record)
119
120 metadata
121 Key-value pairs in the header metadata
122
123 codec The codec used when writing
124
125 writer_schema
126 The schema used when writing
127
128 reader_schema
129 The schema used when reading (if provided)
130
131 class block_reader(fo, reader_schema=None, return_record_name=False)
132 Iterator over Block in an avro file.
133
134 Parameters
135
136 • fo (file-like) – Input stream
137
138 • reader_schema (dict, optional) – Reader schema
139
140 • return_record_name (bool, optional) – If true, when
141 reading a union of records, the result will be a tuple
142 where the first value is the name of the record and the
143 second value is the record itself
144
145 Example:
146
147 from fastavro import block_reader
148 with open('some-file.avro', 'rb') as fo:
149 avro_reader = block_reader(fo)
150 for block in avro_reader:
151 process_block(block)
152
153 metadata
154 Key-value pairs in the header metadata
155
156 codec The codec used when writing
157
158 writer_schema
159 The schema used when writing
160
161 reader_schema
162 The schema used when reading (if provided)
163
164 class Block(bytes_, num_records, codec, reader_schema, writer_schema,
165 named_schemas, offset, size, return_record_name=False)
166 An avro block. Will yield records when iterated over
167
168 num_records
169 Number of records in the block
170
171 writer_schema
172 The schema used when writing
173
174 reader_schema
175 The schema used when reading (if provided)
176
177 offset Offset of the block from the begining of the avro file
178
179 size Size of the block in bytes
180
181 schemaless_reader(fo, writer_schema, reader_schema=None, re‐
182 turn_record_name=False)
183 Reads a single record writen using the schemaless_writer()
184
185 Parameters
186
187 • fo (file-like) – Input stream
188
189 • writer_schema (dict) – Schema used when calling schema‐
190 less_writer
191
192 • reader_schema (dict, optional) – If the schema has
193 changed since being written then the new schema can be
194 given to allow for schema migration
195
196 • return_record_name (bool, optional) – If true, when
197 reading a union of records, the result will be a tuple
198 where the first value is the name of the record and the
199 second value is the record itself
200
201 Example:
202
203 parsed_schema = fastavro.parse_schema(schema)
204 with open('file', 'rb') as fp:
205 record = fastavro.schemaless_reader(fp, parsed_schema)
206
207 Note: The schemaless_reader can only read a single record.
208
209 is_avro(path_or_buffer)
210 Return True if path (or buffer) points to an Avro file. This
211 will only work for avro files that contain the normal avro
212 schema header like those create from writer(). This function is
213 not intended to be used with binary data created from schema‐
214 less_writer() since that does not include the avro header.
215
216 Parameters
217 path_or_buffer (path to file or file-like object) – Path
218 to file
219
220 fastavro.write
221 writer(fo, schema, records, codec='null', sync_interval=16000, meta‐
222 data=None, validator=None, sync_marker=None, codec_compres‐
223 sion_level=None)
224 Write records to fo (stream) according to schema
225
226 Parameters
227
228 • fo (file-like) – Output stream
229
230 • schema (dict) – Writer schema
231
232 • records (iterable) – Records to write. This is commonly
233 a list of the dictionary representation of the records,
234 but it can be any iterable
235
236 • codec (string, optional) – Compression codec, can be
237 ‘null’, ‘deflate’ or ‘snappy’ (if installed)
238
239 • sync_interval (int, optional) – Size of sync interval
240
241 • metadata (dict, optional) – Header metadata
242
243 • validator (None, True or a function) – Validator func‐
244 tion. If None (the default) - no validation. If True
245 then then fastavro.validation.validate will be used. If
246 it’s a function, it should have the same signature as
247 fastavro.writer.validate and raise an exeption on er‐
248 ror.
249
250 • sync_marker (bytes, optional) – A byte string used as
251 the avro sync marker. If not provided, a random byte
252 string will be used.
253
254 • codec_compression_level (int, optional) – Compression
255 level to use with the specified codec (if the codec
256 supports it)
257
258 Example:
259
260 from fastavro import writer, parse_schema
261
262 schema = {
263 'doc': 'A weather reading.',
264 'name': 'Weather',
265 'namespace': 'test',
266 'type': 'record',
267 'fields': [
268 {'name': 'station', 'type': 'string'},
269 {'name': 'time', 'type': 'long'},
270 {'name': 'temp', 'type': 'int'},
271 ],
272 }
273 parsed_schema = parse_schema(schema)
274
275 records = [
276 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
277 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
278 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
279 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
280 ]
281
282 with open('weather.avro', 'wb') as out:
283 writer(out, parsed_schema, records)
284
285 The fo argument is a file-like object so another common example
286 usage would use an io.BytesIO object like so:
287
288 from io import BytesIO
289 from fastavro import writer
290
291 fo = BytesIO()
292 writer(fo, schema, records)
293
294 Given an existing avro file, it’s possible to append to it by
295 re-opening the file in a+b mode. If the file is only opened in
296 ab mode, we aren’t able to read some of the existing header in‐
297 formation and an error will be raised. For example:
298
299 # Write initial records
300 with open('weather.avro', 'wb') as out:
301 writer(out, parsed_schema, records)
302
303 # Write some more records
304 with open('weather.avro', 'a+b') as out:
305 writer(out, parsed_schema, more_records)
306
307 schemaless_writer(fo, schema, record)
308 Write a single record without the schema or header information
309
310 Parameters
311
312 • fo (file-like) – Output file
313
314 • schema (dict) – Schema
315
316 • record (dict) – Record to write
317
318 Example:
319
320 parsed_schema = fastavro.parse_schema(schema)
321 with open('file', 'rb') as fp:
322 fastavro.schemaless_writer(fp, parsed_schema, record)
323
324 Note: The schemaless_writer can only write a single record.
325
326 Using the tuple notation to specify which branch of a union to take
327 Since this library uses plain dictionaries to reprensent a record, it
328 is possible for that dictionary to fit the definition of two different
329 records.
330
331 For example, given a dictionary like this:
332
333 {"name": "My Name"}
334
335 It would be valid against both of these records:
336
337 child_schema = {
338 "name": "Child",
339 "type": "record",
340 "fields": [
341 {"name": "name", "type": "string"},
342 {"name": "favorite_color", "type": ["null", "string"]},
343 ]
344 }
345
346 pet_schema = {
347 "name": "Pet",
348 "type": "record",
349 "fields": [
350 {"name": "name", "type": "string"},
351 {"name": "favorite_toy", "type": ["null", "string"]},
352 ]
353 }
354
355 This becomes a problem when a schema contains a union of these two sim‐
356 ilar records as it is not clear which record the dictionary represents.
357 For example, if you used the previous dictionary with the following
358 schema, it wouldn’t be clear if the record should be serialized as a
359 Child or a
360 `
361 Pet:
362
363 household_schema = {
364 "name": "Household",
365 "type": "record",
366 "fields": [
367 {"name": "address", "type": "string"},
368 {
369 "name": "family_members",
370 "type": {
371 "type": "array", "items": [
372 {
373 "name": "Child",
374 "type": "record",
375 "fields": [
376 {"name": "name", "type": "string"},
377 {"name": "favorite_color", "type": ["null", "string"]},
378 ]
379 }, {
380 "name": "Pet",
381 "type": "record",
382 "fields": [
383 {"name": "name", "type": "string"},
384 {"name": "favorite_toy", "type": ["null", "string"]},
385 ]
386 }
387 ]
388 }
389 },
390 ]
391 }
392
393 To resolve this, you can use a tuple notation where the first value of
394 the tuple is the fully namespaced record name and the second value is
395 the dictionary. For example:
396
397 records = [
398 {
399 "address": "123 Drive Street",
400 "family_members": [
401 ("Child", {"name": "Son"}),
402 ("Child", {"name": "Daughter"}),
403 ("Pet", {"name": "Dog"}),
404 ]
405 }
406 ]
407
408 Using the record hint to specify which branch of a union to take
409 In addition to the tuple notation for specifing the name of a record,
410 you can also include a special -type attribute (note that this attri‐
411 bute is -type, not type) on a record to do the same thing. So the exam‐
412 ple above which looked like this:
413
414 records = [
415 {
416 "address": "123 Drive Street",
417 "family_members": [
418 ("Child", {"name": "Son"}),
419 ("Child", {"name": "Daughter"}),
420 ("Pet", {"name": "Dog"}),
421 ]
422 }
423 ]
424
425 Would now look like this:
426
427 records = [
428 {
429 "address": "123 Drive Street",
430 "family_members": [
431 {"-type": "Child", name": "Son"},
432 {"-type": "Child", name": "Daughter"},
433 {"-type": "Pet", name": "Dog"},
434 ]
435 }
436 ]
437
438 Unlike the tuple notation which can be used with any avro type in a
439 union, this -type hint can only be used with records. However, this can
440 be useful if you want to make a single record dictionary that can be
441 used both in and out of unions.
442
443 fastavro.json_read
444 json_reader(fo, schema)
445 Iterator over records in an avro json file.
446
447 Parameters
448
449 • fo (file-like) – Input stream
450
451 • reader_schema (dict) – Reader schema
452
453 Example:
454
455 from fastavro import json_reader
456
457 schema = {
458 'doc': 'A weather reading.',
459 'name': 'Weather',
460 'namespace': 'test',
461 'type': 'record',
462 'fields': [
463 {'name': 'station', 'type': 'string'},
464 {'name': 'time', 'type': 'long'},
465 {'name': 'temp', 'type': 'int'},
466 ]
467 }
468
469 with open('some-file', 'r') as fo:
470 avro_reader = json_reader(fo, schema)
471 for record in avro_reader:
472 print(record)
473
474 fastavro.json_write
475 json_writer(fo, schema, records)
476 Write records to fo (stream) according to schema
477
478 Parameters
479
480 • fo (file-like) – Output stream
481
482 • schema (dict) – Writer schema
483
484 • records (iterable) – Records to write. This is commonly
485 a list of the dictionary representation of the records,
486 but it can be any iterable
487
488 Example:
489
490 from fastavro import json_writer, parse_schema
491
492 schema = {
493 'doc': 'A weather reading.',
494 'name': 'Weather',
495 'namespace': 'test',
496 'type': 'record',
497 'fields': [
498 {'name': 'station', 'type': 'string'},
499 {'name': 'time', 'type': 'long'},
500 {'name': 'temp', 'type': 'int'},
501 ],
502 }
503 parsed_schema = parse_schema(schema)
504
505 records = [
506 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
507 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
508 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
509 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
510 ]
511
512 with open('some-file', 'w') as out:
513 json_writer(out, parsed_schema, records)
514
515 fastavro.schema
516 parse_schema(schema, named_schemas=None, *, expand=False,
517 _write_hint=True, _force=False)
518 Returns a parsed avro schema
519
520 It is not necessary to call parse_schema but doing so and saving
521 the parsed schema for use later will make future operations
522 faster as the schema will not need to be reparsed.
523
524 Parameters
525
526 • schema (dict) – Input schema
527
528 • named_schemas (dict) – Dictionary of named schemas to
529 their schema definition
530
531 • expand (bool) – If true, named schemas will be fully
532 expanded to their true schemas rather than being repre‐
533 sented as just the name. This format should be consid‐
534 ered an output only and not passed in to other
535 reader/writer functions as it does not conform to the
536 avro specification and will likely cause an exception
537
538 • _write_hint (bool) – Internal API argument specifying
539 whether or not the __fastavro_parsed marker should be
540 added to the schema
541
542 • _force (bool) – Internal API argument. If True, the
543 schema will always be parsed even if it has been parsed
544 and has the __fastavro_parsed marker
545
546 Example:
547
548 from fastavro import parse_schema
549 from fastavro import writer
550
551 parsed_schema = parse_schema(original_schema)
552 with open('weather.avro', 'wb') as out:
553 writer(out, parsed_schema, records)
554
555 Sometimes you might have two schemas where one schema references
556 another. For the sake of example, let’s assume you have a Par‐
557 ent schema that references a Child schema`. If you were to try
558 to parse the parent schema on its own, you would get an excep‐
559 tion because the child schema isn’t defined. To accomodate this,
560 we can use the named_schemas argument to pass a shared dictio‐
561 nary when parsing both of the schemas. The dictionary will get
562 populated with the necessary schema references to make parsing
563 possible. For example:
564
565 from fastavro import parse_schema
566
567 named_schemas = {}
568 parsed_child = parse_schema(child_schema, named_schemas)
569 parsed_parent = parse_schema(parent_schema, named_schemas)
570
571 fullname(schema)
572 Returns the fullname of a schema
573
574 Parameters
575 schema (dict) – Input schema
576
577 Example:
578
579 from fastavro.schema import fullname
580
581 schema = {
582 'doc': 'A weather reading.',
583 'name': 'Weather',
584 'namespace': 'test',
585 'type': 'record',
586 'fields': [
587 {'name': 'station', 'type': 'string'},
588 {'name': 'time', 'type': 'long'},
589 {'name': 'temp', 'type': 'int'},
590 ],
591 }
592
593 fname = fullname(schema)
594 assert fname == "test.Weather"
595
596 expand_schema(schema)
597 Returns a schema where all named types are expanded to their
598 real schema
599
600 NOTE: The output of this function produces a schema that can in‐
601 clude multiple definitions of the same named type (as per de‐
602 sign) which are not valid per the avro specification. Therefore,
603 the output of this should not be passed to the normal
604 writer/reader functions as it will likely result in an error.
605
606 Parameters
607 schema (dict) – Input schema
608
609 Example:
610
611 from fastavro.schema import expand_schema
612
613 original_schema = {
614 "name": "MasterSchema",
615 "namespace": "com.namespace.master",
616 "type": "record",
617 "fields": [{
618 "name": "field_1",
619 "type": {
620 "name": "Dependency",
621 "namespace": "com.namespace.dependencies",
622 "type": "record",
623 "fields": [
624 {"name": "sub_field_1", "type": "string"}
625 ]
626 }
627 }, {
628 "name": "field_2",
629 "type": "com.namespace.dependencies.Dependency"
630 }]
631 }
632
633 expanded_schema = expand_schema(original_schema)
634
635 assert expanded_schema == {
636 "name": "com.namespace.master.MasterSchema",
637 "type": "record",
638 "fields": [{
639 "name": "field_1",
640 "type": {
641 "name": "com.namespace.dependencies.Dependency",
642 "type": "record",
643 "fields": [
644 {"name": "sub_field_1", "type": "string"}
645 ]
646 }
647 }, {
648 "name": "field_2",
649 "type": {
650 "name": "com.namespace.dependencies.Dependency",
651 "type": "record",
652 "fields": [
653 {"name": "sub_field_1", "type": "string"}
654 ]
655 }
656 }]
657 }
658
659 load_schema(schema_path, *, named_schemas=None, _write_hint=True, _in‐
660 jected_schemas=None)
661 Returns a schema loaded from the file at schema_path.
662
663 Will recursively load referenced schemas assuming they can be
664 found in files in the same directory and named with the conven‐
665 tion <full_name>.avsc.
666
667 Parameters
668
669 • schema (str) – Path to schema file to load
670
671 • named_schemas (dict) – Dictionary of named schemas to
672 their schema definition
673
674 • _write_hint (bool) – Internal API argument specifying
675 whether or not the __fastavro_parsed marker should be
676 added to the schema
677
678 • _injected_schemas (set) – Internal API argument. Set of
679 names that have been injected
680
681 Consider the following example…
682
683 Parent.avsc:
684
685 {
686 "type": "record",
687 "name": "Parent",
688 "namespace": "namespace",
689 "fields": [
690 {
691 "name": "child",
692 "type": "Child"
693 }
694 ]
695 }
696
697 namespace.Child.avsc:
698
699 {
700 "type": "record",
701 "namespace": "namespace",
702 "name": "Child",
703 "fields": []
704 }
705
706 Code:
707
708 from fastavro.schema import load_schema
709
710 parsed_schema = load_schema("Parent.avsc")
711
712 load_schema_ordered(ordered_schemas, *, _write_hint=True)
713 Returns a schema loaded from a list of schemas.
714
715 The list of schemas should be ordered such that any dependencies
716 are listed before any other schemas that use those dependencies.
717 For example, if schema A depends on schema B and schema B de‐
718 pends on schema C, then the list of schemas should be [C, B, A].
719
720 Parameters
721
722 • ordered_schemas (list) – List of paths to schemas
723
724 • _write_hint (bool) – Internal API argument specifying
725 whether or not the __fastavro_parsed marker should be
726 added to the schema
727
728 Consider the following example…
729
730 Parent.avsc:
731
732 {
733 "type": "record",
734 "name": "Parent",
735 "namespace": "namespace",
736 "fields": [
737 {
738 "name": "child",
739 "type": "Child"
740 }
741 ]
742 }
743
744 namespace.Child.avsc:
745
746 {
747 "type": "record",
748 "namespace": "namespace",
749 "name": "Child",
750 "fields": []
751 }
752
753 Code:
754
755 from fastavro.schema import load_schema_ordered
756
757 parsed_schema = load_schema_ordered(
758 ["path/to/namespace.Child.avsc", "path/to/Parent.avsc"]
759 )
760
761 to_parsing_canonical_form(schema)
762 Returns a string represening the parsing canonical form of the
763 schema.
764
765 For more details on the parsing canonical form, see here:
766 https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas
767
768 fingerprint(parsing_canonical_form, algorithm)
769 Returns a string represening a fingerprint/hash of the parsing
770 canonical form of a schema.
771
772 For more details on the fingerprint, see here:
773 https://avro.apache.org/docs/current/spec.html#schema_fingerprints
774
775 fastavro.validation
776 validate(datum, schema, field=None, raise_errors=True)
777 Determine if a python datum is an instance of a schema.
778
779 Parameters
780
781 • datum (Any) – Data being validated
782
783 • schema (dict) – Schema
784
785 • field (str, optional) – Record field being validated
786
787 • raise_errors (bool, optional) – If true, errors are
788 raised for invalid data. If false, a simple True
789 (valid) or False (invalid) result is returned
790
791 Example:
792
793 from fastavro.validation import validate
794 schema = {...}
795 record = {...}
796 validate(record, schema)
797
798 validate_many(records, schema, raise_errors=True)
799 Validate a list of data!
800
801 Parameters
802
803 • records (iterable) – List of records to validate
804
805 • schema (dict) – Schema
806
807 • raise_errors (bool, optional) – If true, errors are
808 raised for invalid data. If false, a simple True
809 (valid) or False (invalid) result is returned
810
811 Example:
812
813 from fastavro.validation import validate_many
814 schema = {...}
815 records = [{...}, {...}, ...]
816 validate_many(records, schema)
817
818 fastavro.utils
819 generate_one(schema: Union[str, List, Dict]) -> Any
820 Returns a single instance of arbitrary data that conforms to the
821 schema.
822
823 Parameters
824 schema (dict, list, string) – Schema
825
826 Example:
827
828 from fastavro import schemaless_writer
829 from fastavro.utils import generate_one
830
831 schema = {
832 'doc': 'A weather reading.',
833 'name': 'Weather',
834 'namespace': 'test',
835 'type': 'record',
836 'fields': [
837 {'name': 'station', 'type': 'string'},
838 {'name': 'time', 'type': 'long'},
839 {'name': 'temp', 'type': 'int'},
840 ],
841 }
842
843 with open('weather.avro', 'wb') as out:
844 schemaless_writer(out, schema, generate_one(schema))
845
846 generate_many(schema: Union[str, List, Dict], count: int) -> Itera‐
847 tor[Any]
848 A generator that yields arbitrary data that conforms to the
849 schema. It will yield a number of data structures equal to what
850 is given in the count
851
852 Parameters
853
854 • schema (dict, list, string) – Schema
855
856 • count (int) – Number of objects to generate
857
858 Example:
859
860 from fastavro import writer
861 from fastavro.utils import generate_data
862
863 schema = {
864 'doc': 'A weather reading.',
865 'name': 'Weather',
866 'namespace': 'test',
867 'type': 'record',
868 'fields': [
869 {'name': 'station', 'type': 'string'},
870 {'name': 'time', 'type': 'long'},
871 {'name': 'temp', 'type': 'int'},
872 ],
873 }
874
875 with open('weather.avro', 'wb') as out:
876 writer(out, schema, generate_data(schema, 5))
877
878 anonymize_schema(schema: Union[str, List, Dict]) -> Union[str, List,
879 Dict]
880 Returns an anonymized schema
881
882 Parameters
883 schema (dict, list, string) – Schema
884
885 Example:
886
887 from fastavro.utils import anonymize_schema
888
889 anonymized_schema = anonymize_schema(original_schema)
890
891 Logical Types
892 Fastavro supports the following official logical types:
893
894 • decimal
895
896 • uuid
897
898 • date
899
900 • time-millis
901
902 • time-micros
903
904 • timestamp-millis
905
906 • timestamp-micros
907
908 • local-timestamp-millis
909
910 • local-timestamp-micros
911
912 Fastavro is missing support for the following official logical types:
913
914 • duration
915
916 How to specify logical types in your schemas
917 The docs say that when you want to make something a logical type, you
918 just need to add a logicalType key. Unfortunately, this means that a
919 common confusion is that people will take a pre-existing schema like
920 this:
921
922 schema = {
923 "type": "record",
924 "name": "root",
925 "fields": [
926 {
927 "name": "id",
928 "type": "string",
929 },
930 ]
931 }
932
933 And then add the uuid logical type like this:
934
935 schema = {
936 "type": "record",
937 "name": "root",
938 "fields": [
939 {
940 "name": "id",
941 "type": "string",
942 "logicalType": "uuid", # This is the wrong place to add this key
943 },
944 ]
945 }
946
947 However, that adds the logicalType key to the field schema which is not
948 correct. Instead, we want to group it with the string like so:
949
950 schema = {
951 "type": "record",
952 "name": "root",
953 "fields": [
954 {
955 "name": "id",
956 "type": {
957 "type": "string",
958 "logicalType": "uuid", # This is the correct place to add this key
959 },
960 },
961 ]
962 }
963
964 Custom Logical Types
965 The Avro specification defines a handful of logical types that most im‐
966 plementations support. For example, one of the definied logical types
967 is a microsecond precision timestamp. The specification states that
968 this value will get encoded as an avro long type.
969
970 For the sake of an example, let’s say you want to create a new logical
971 type for a microsecond precision timestamp that uses a string as the
972 underlying avro type.
973
974 To do this, there are a few functions that need to be defined. First,
975 we need an encoder function that will encode a datetime object as a
976 string. The encoder function is called with two arguments: the data and
977 the schema. So we could define this like so:
978
979 def encode_datetime_as_string(data, schema):
980 return datetime.isoformat(data)
981
982 # or
983
984 def encode_datetime_as_string(data, *args):
985 return datetime.isoformat(data)
986
987 Then, we need a decoder function that will transform the string back
988 into a datetime object. The decoder function is called with three argu‐
989 ments: the data, the writer schema, and the reader schema. So we could
990 define this like so:
991
992 def decode_string_as_datetime(data, writer_schema, reader_schema):
993 return datetime.fromisoformat(data)
994
995 # or
996
997 def decode_string_as_datetime(data, *args):
998 return datetime.fromisoformat(data)
999
1000 Finally, we need to tell fastavro to use these functions. The schema
1001 for this custom logical type will use the type string and can use what‐
1002 ever name you would like as the logicalType. In this example, let’s
1003 suppose we call the logicalType datetime2. To have the library actually
1004 use the custom logical type, we use the name of <avro_type>-<logi‐
1005 cal_type>, so in this example that name would be string-datetime2 and
1006 then we add those functions like so:
1007
1008 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1009 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1010
1011 And you are done. Now if the library comes across a schema with a logi‐
1012 cal type of datetime2 and an avro type of string, it will use the cus‐
1013 tom functions. For a complete example, see here:
1014
1015 import io
1016 from datetime import datetime
1017
1018 import fastavro
1019 from fastavro import writer, reader
1020
1021
1022 def encode_datetime_as_string(data, *args):
1023 return datetime.isoformat(data)
1024
1025 def decode_string_as_datetime(data, *args):
1026 return datetime.fromisoformat(data)
1027
1028 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1029 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1030
1031
1032 writer_schema = fastavro.parse_schema({
1033 "type": "record",
1034 "name": "root",
1035 "fields": [
1036 {
1037 "name": "some_date",
1038 "type": [
1039 "null",
1040 {
1041 "type": "string",
1042 "logicalType": "datetime2",
1043 },
1044 ],
1045 },
1046 ]
1047 })
1048
1049 records = [
1050 {"some_date": datetime.now()}
1051 ]
1052
1053 bio = io.BytesIO()
1054
1055 writer(bio, writer_schema, records)
1056
1057 bio.seek(0)
1058
1059 for record in reader(bio):
1060 print(record)
1061
1062 fastavro command line script
1063 A command line script is installed with the library that can be used to
1064 dump the contents of avro file(s) to the standard output.
1065
1066 Usage:
1067
1068 usage: fastavro [-h] [--schema] [--codecs] [--version] [-p] [file [file ...]]
1069
1070 iter over avro file, emit records as JSON
1071
1072 positional arguments:
1073 file file(s) to parse
1074
1075 optional arguments:
1076 -h, --help show this help message and exit
1077 --schema dump schema instead of records
1078 --codecs print supported codecs
1079 --version show program's version number and exit
1080 -p, --pretty pretty print json
1081
1082 Examples
1083 Read an avro file:
1084
1085 $ fastavro weather.avro
1086
1087 {"temp": 0, "station": "011990-99999", "time": -619524000000}
1088 {"temp": 22, "station": "011990-99999", "time": -619506000000}
1089 {"temp": -11, "station": "011990-99999", "time": -619484400000}
1090 {"temp": 111, "station": "012650-99999", "time": -655531200000}
1091 {"temp": 78, "station": "012650-99999", "time": -655509600000}
1092
1093 Show the schema:
1094
1095 $ fastavro --schema weather.avro
1096
1097 {
1098 "type": "record",
1099 "namespace": "test",
1100 "doc": "A weather reading.",
1101 "fields": [
1102 {
1103 "type": "string",
1104 "name": "station"
1105 },
1106 {
1107 "type": "long",
1108 "name": "time"
1109 },
1110 {
1111 "type": "int",
1112 "name": "temp"
1113 }
1114 ],
1115 "name": "Weather"
1116 }
1117
1118 • genindex
1119
1120 • modindex
1121
1122 • search
1123
1125 Miki Tebeka
1126
1128 2021, Miki Tebeka
1129
1130
1131
1132
11331.4.1 May 22, 2021 FASTAVRO(1)