1FASTAVRO(1) fastavro FASTAVRO(1)
2
3
4
6 fastavro - fastavro Documentation
7
8 The current Python avro package is dog slow.
9
10 On a test case of about 10K records, it takes about 14sec to iterate
11 over all of them. In comparison the JAVA avro SDK does it in about
12 1.9sec.
13
14 fastavro is an alternative implementation that is much faster. It iter‐
15 ates over the same 10K records in 2.9sec, and if you use it with PyPy
16 it'll do it in 1.5sec (to be fair, the JAVA benchmark is doing some ex‐
17 tra JSON encoding/decoding).
18
19 If the optional C extension (generated by Cython) is available, then
20 fastavro will be even faster. For the same 10K records it'll run in
21 about 1.7sec.
22
24 • File Writer
25
26 • File Reader (iterating via records or blocks)
27
28 • Schemaless Writer
29
30 • Schemaless Reader
31
32 • JSON Writer
33
34 • JSON Reader
35
36 • Codecs (Snappy, Deflate, Zstandard, Bzip2, LZ4, XZ)
37
38 • Schema resolution
39
40 • Aliases
41
42 • Logical Types
43
44 • Parsing schemas into the canonical form
45
46 • Schema fingerprinting
47
49 • Anything involving Avro's RPC features
50
52 from fastavro import writer, reader, parse_schema
53
54 schema = {
55 'doc': 'A weather reading.',
56 'name': 'Weather',
57 'namespace': 'test',
58 'type': 'record',
59 'fields': [
60 {'name': 'station', 'type': 'string'},
61 {'name': 'time', 'type': 'long'},
62 {'name': 'temp', 'type': 'int'},
63 ],
64 }
65 parsed_schema = parse_schema(schema)
66
67 # 'records' can be an iterable (including generator)
68 records = [
69 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
70 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
71 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
72 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
73 ]
74
75 # Writing
76 with open('weather.avro', 'wb') as out:
77 writer(out, parsed_schema, records)
78
79 # Reading
80 with open('weather.avro', 'rb') as fo:
81 for record in reader(fo):
82 print(record)
83
85 fastavro.read
86 class reader(fo, reader_schema=None, return_record_name=False)
87 Iterator over records in an avro file.
88
89 Parameters
90
91 • fo (file-like) -- Input stream
92
93 • reader_schema (dict, optional) -- Reader schema
94
95 • return_record_name (bool, optional) -- If true, when
96 reading a union of records, the result will be a tuple
97 where the first value is the name of the record and the
98 second value is the record itself
99
100 Example:
101
102 from fastavro import reader
103 with open('some-file.avro', 'rb') as fo:
104 avro_reader = reader(fo)
105 for record in avro_reader:
106 process_record(record)
107
108 The fo argument is a file-like object so another common example
109 usage would use an io.BytesIO object like so:
110
111 from io import BytesIO
112 from fastavro import writer, reader
113
114 fo = BytesIO()
115 writer(fo, schema, records)
116 fo.seek(0)
117 for record in reader(fo):
118 process_record(record)
119
120 metadata
121 Key-value pairs in the header metadata
122
123 codec The codec used when writing
124
125 writer_schema
126 The schema used when writing
127
128 reader_schema
129 The schema used when reading (if provided)
130
131 class block_reader(fo, reader_schema=None, return_record_name=False)
132 Iterator over Block in an avro file.
133
134 Parameters
135
136 • fo (file-like) -- Input stream
137
138 • reader_schema (dict, optional) -- Reader schema
139
140 • return_record_name (bool, optional) -- If true, when
141 reading a union of records, the result will be a tuple
142 where the first value is the name of the record and the
143 second value is the record itself
144
145 Example:
146
147 from fastavro import block_reader
148 with open('some-file.avro', 'rb') as fo:
149 avro_reader = block_reader(fo)
150 for block in avro_reader:
151 process_block(block)
152
153 metadata
154 Key-value pairs in the header metadata
155
156 codec The codec used when writing
157
158 writer_schema
159 The schema used when writing
160
161 reader_schema
162 The schema used when reading (if provided)
163
164 class Block(bytes_, num_records, codec, reader_schema, writer_schema,
165 named_schemas, offset, size, return_record_name=False)
166 An avro block. Will yield records when iterated over
167
168 num_records
169 Number of records in the block
170
171 writer_schema
172 The schema used when writing
173
174 reader_schema
175 The schema used when reading (if provided)
176
177 offset Offset of the block from the begining of the avro file
178
179 size Size of the block in bytes
180
181 schemaless_reader(fo, writer_schema, reader_schema=None, re‐
182 turn_record_name=False)
183 Reads a single record writen using the schemaless_writer()
184
185 Parameters
186
187 • fo (file-like) -- Input stream
188
189 • writer_schema (dict) -- Schema used when calling
190 schemaless_writer
191
192 • reader_schema (dict, optional) -- If the schema has
193 changed since being written then the new schema can be
194 given to allow for schema migration
195
196 • return_record_name (bool, optional) -- If true, when
197 reading a union of records, the result will be a tuple
198 where the first value is the name of the record and the
199 second value is the record itself
200
201 Example:
202
203 parsed_schema = fastavro.parse_schema(schema)
204 with open('file', 'rb') as fp:
205 record = fastavro.schemaless_reader(fp, parsed_schema)
206
207 Note: The schemaless_reader can only read a single record.
208
209 is_avro(path_or_buffer)
210 Return True if path (or buffer) points to an Avro file. This
211 will only work for avro files that contain the normal avro
212 schema header like those create from writer(). This function is
213 not intended to be used with binary data created from schema‐
214 less_writer() since that does not include the avro header.
215
216 Parameters
217 path_or_buffer (path to file or file-like object) -- Path
218 to file
219
220 fastavro.write
221 writer(fo, schema, records, codec='null', sync_interval=16000, meta‐
222 data=None, validator=None, sync_marker=None, codec_compres‐
223 sion_level=None)
224 Write records to fo (stream) according to schema
225
226 Parameters
227
228 • fo (file-like) -- Output stream
229
230 • schema (dict) -- Writer schema
231
232 • records (iterable) -- Records to write. This is com‐
233 monly a list of the dictionary representation of the
234 records, but it can be any iterable
235
236 • codec (string, optional) -- Compression codec, can be
237 'null', 'deflate' or 'snappy' (if installed)
238
239 • sync_interval (int, optional) -- Size of sync interval
240
241 • metadata (dict, optional) -- Header metadata
242
243 • validator (None, True or a function) -- Validator func‐
244 tion. If None (the default) - no validation. If True
245 then then fastavro.validation.validate will be used. If
246 it's a function, it should have the same signature as
247 fastavro.writer.validate and raise an exeption on er‐
248 ror.
249
250 • sync_marker (bytes, optional) -- A byte string used as
251 the avro sync marker. If not provided, a random byte
252 string will be used.
253
254 • codec_compression_level (int, optional) -- Compression
255 level to use with the specified codec (if the codec
256 supports it)
257
258 Example:
259
260 from fastavro import writer, parse_schema
261
262 schema = {
263 'doc': 'A weather reading.',
264 'name': 'Weather',
265 'namespace': 'test',
266 'type': 'record',
267 'fields': [
268 {'name': 'station', 'type': 'string'},
269 {'name': 'time', 'type': 'long'},
270 {'name': 'temp', 'type': 'int'},
271 ],
272 }
273 parsed_schema = parse_schema(schema)
274
275 records = [
276 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
277 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
278 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
279 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
280 ]
281
282 with open('weather.avro', 'wb') as out:
283 writer(out, parsed_schema, records)
284
285 The fo argument is a file-like object so another common example
286 usage would use an io.BytesIO object like so:
287
288 from io import BytesIO
289 from fastavro import writer
290
291 fo = BytesIO()
292 writer(fo, schema, records)
293
294 Given an existing avro file, it's possible to append to it by
295 re-opening the file in a+b mode. If the file is only opened in
296 ab mode, we aren't able to read some of the existing header in‐
297 formation and an error will be raised. For example:
298
299 # Write initial records
300 with open('weather.avro', 'wb') as out:
301 writer(out, parsed_schema, records)
302
303 # Write some more records
304 with open('weather.avro', 'a+b') as out:
305 writer(out, parsed_schema, more_records)
306
307 schemaless_writer(fo, schema, record)
308 Write a single record without the schema or header information
309
310 Parameters
311
312 • fo (file-like) -- Output file
313
314 • schema (dict) -- Schema
315
316 • record (dict) -- Record to write
317
318 Example:
319
320 parsed_schema = fastavro.parse_schema(schema)
321 with open('file', 'rb') as fp:
322 fastavro.schemaless_writer(fp, parsed_schema, record)
323
324 Note: The schemaless_writer can only write a single record.
325
326 Using the tuple notation to specify which branch of a union to take
327 Since this library uses plain dictionaries to reprensent a record, it
328 is possible for that dictionary to fit the definition of two different
329 records.
330
331 For example, given a dictionary like this:
332
333 {"name": "My Name"}
334
335 It would be valid against both of these records:
336
337 child_schema = {
338 "name": "Child",
339 "type": "record",
340 "fields": [
341 {"name": "name", "type": "string"},
342 {"name": "favorite_color", "type": ["null", "string"]},
343 ]
344 }
345
346 pet_schema = {
347 "name": "Pet",
348 "type": "record",
349 "fields": [
350 {"name": "name", "type": "string"},
351 {"name": "favorite_toy", "type": ["null", "string"]},
352 ]
353 }
354
355 This becomes a problem when a schema contains a union of these two sim‐
356 ilar records as it is not clear which record the dictionary represents.
357 For example, if you used the previous dictionary with the following
358 schema, it wouldn't be clear if the record should be serialized as a
359 Child or a
360 `
361 Pet:
362
363 household_schema = {
364 "name": "Household",
365 "type": "record",
366 "fields": [
367 {"name": "address", "type": "string"},
368 {
369 "name": "family_members",
370 "type": {
371 "type": "array", "items": [
372 {
373 "name": "Child",
374 "type": "record",
375 "fields": [
376 {"name": "name", "type": "string"},
377 {"name": "favorite_color", "type": ["null", "string"]},
378 ]
379 }, {
380 "name": "Pet",
381 "type": "record",
382 "fields": [
383 {"name": "name", "type": "string"},
384 {"name": "favorite_toy", "type": ["null", "string"]},
385 ]
386 }
387 ]
388 }
389 },
390 ]
391 }
392
393 To resolve this, you can use a tuple notation where the first value of
394 the tuple is the fully namespaced record name and the second value is
395 the dictionary. For example:
396
397 records = [
398 {
399 "address": "123 Drive Street",
400 "family_members": [
401 ("Child", {"name": "Son"}),
402 ("Child", {"name": "Daughter"}),
403 ("Pet", {"name": "Dog"}),
404 ]
405 }
406 ]
407
408 Using the record hint to specify which branch of a union to take
409 In addition to the tuple notation for specifing the name of a record,
410 you can also include a special -type attribute (note that this attri‐
411 bute is -type, not type) on a record to do the same thing. So the exam‐
412 ple above which looked like this:
413
414 records = [
415 {
416 "address": "123 Drive Street",
417 "family_members": [
418 ("Child", {"name": "Son"}),
419 ("Child", {"name": "Daughter"}),
420 ("Pet", {"name": "Dog"}),
421 ]
422 }
423 ]
424
425 Would now look like this:
426
427 records = [
428 {
429 "address": "123 Drive Street",
430 "family_members": [
431 {"-type": "Child", name": "Son"},
432 {"-type": "Child", name": "Daughter"},
433 {"-type": "Pet", name": "Dog"},
434 ]
435 }
436 ]
437
438 Unlike the tuple notation which can be used with any avro type in a
439 union, this -type hint can only be used with records. However, this can
440 be useful if you want to make a single record dictionary that can be
441 used both in and out of unions.
442
443 fastavro.json_read
444 json_reader(fo, schema)
445 Iterator over records in an avro json file.
446
447 Parameters
448
449 • fo (file-like) -- Input stream
450
451 • reader_schema (dict) -- Reader schema
452
453 Example:
454
455 from fastavro import json_reader
456
457 schema = {
458 'doc': 'A weather reading.',
459 'name': 'Weather',
460 'namespace': 'test',
461 'type': 'record',
462 'fields': [
463 {'name': 'station', 'type': 'string'},
464 {'name': 'time', 'type': 'long'},
465 {'name': 'temp', 'type': 'int'},
466 ]
467 }
468
469 with open('some-file', 'r') as fo:
470 avro_reader = json_reader(fo, schema)
471 for record in avro_reader:
472 print(record)
473
474 fastavro.json_write
475 json_writer(fo, schema, records, *, write_union_type=True)
476 Write records to fo (stream) according to schema
477
478 Parameters
479
480 • fo (file-like) -- Output stream
481
482 • schema (dict) -- Writer schema
483
484 • records (iterable) -- Records to write. This is com‐
485 monly a list of the dictionary representation of the
486 records, but it can be any iterable
487
488 • write_union_type (bool (default True)) -- Determine
489 whether to write the union type in the json message.
490 If this is set to False the output will be clear json.
491 It may however not be decodable back to avro record by
492 json_read.
493
494 Example:
495
496 from fastavro import json_writer, parse_schema
497
498 schema = {
499 'doc': 'A weather reading.',
500 'name': 'Weather',
501 'namespace': 'test',
502 'type': 'record',
503 'fields': [
504 {'name': 'station', 'type': 'string'},
505 {'name': 'time', 'type': 'long'},
506 {'name': 'temp', 'type': 'int'},
507 ],
508 }
509 parsed_schema = parse_schema(schema)
510
511 records = [
512 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
513 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
514 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
515 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
516 ]
517
518 with open('some-file', 'w') as out:
519 json_writer(out, parsed_schema, records)
520
521 fastavro.schema
522 parse_schema(schema, named_schemas=None, *, expand=False,
523 _write_hint=True, _force=False)
524 Returns a parsed avro schema
525
526 It is not necessary to call parse_schema but doing so and saving
527 the parsed schema for use later will make future operations
528 faster as the schema will not need to be reparsed.
529
530 Parameters
531
532 • schema (dict) -- Input schema
533
534 • named_schemas (dict) -- Dictionary of named schemas to
535 their schema definition
536
537 • expand (bool) -- If true, named schemas will be fully
538 expanded to their true schemas rather than being repre‐
539 sented as just the name. This format should be consid‐
540 ered an output only and not passed in to other
541 reader/writer functions as it does not conform to the
542 avro specification and will likely cause an exception
543
544 • _write_hint (bool) -- Internal API argument specifying
545 whether or not the __fastavro_parsed marker should be
546 added to the schema
547
548 • _force (bool) -- Internal API argument. If True, the
549 schema will always be parsed even if it has been parsed
550 and has the __fastavro_parsed marker
551
552 Example:
553
554 from fastavro import parse_schema
555 from fastavro import writer
556
557 parsed_schema = parse_schema(original_schema)
558 with open('weather.avro', 'wb') as out:
559 writer(out, parsed_schema, records)
560
561 Sometimes you might have two schemas where one schema references
562 another. For the sake of example, let's assume you have a Par‐
563 ent schema that references a Child schema`. If you were to try
564 to parse the parent schema on its own, you would get an excep‐
565 tion because the child schema isn't defined. To accomodate this,
566 we can use the named_schemas argument to pass a shared dictio‐
567 nary when parsing both of the schemas. The dictionary will get
568 populated with the necessary schema references to make parsing
569 possible. For example:
570
571 from fastavro import parse_schema
572
573 named_schemas = {}
574 parsed_child = parse_schema(child_schema, named_schemas)
575 parsed_parent = parse_schema(parent_schema, named_schemas)
576
577 fullname(schema)
578 Returns the fullname of a schema
579
580 Parameters
581 schema (dict) -- Input schema
582
583 Example:
584
585 from fastavro.schema import fullname
586
587 schema = {
588 'doc': 'A weather reading.',
589 'name': 'Weather',
590 'namespace': 'test',
591 'type': 'record',
592 'fields': [
593 {'name': 'station', 'type': 'string'},
594 {'name': 'time', 'type': 'long'},
595 {'name': 'temp', 'type': 'int'},
596 ],
597 }
598
599 fname = fullname(schema)
600 assert fname == "test.Weather"
601
602 expand_schema(schema)
603 Returns a schema where all named types are expanded to their
604 real schema
605
606 NOTE: The output of this function produces a schema that can in‐
607 clude multiple definitions of the same named type (as per de‐
608 sign) which are not valid per the avro specification. Therefore,
609 the output of this should not be passed to the normal
610 writer/reader functions as it will likely result in an error.
611
612 Parameters
613 schema (dict) -- Input schema
614
615 Example:
616
617 from fastavro.schema import expand_schema
618
619 original_schema = {
620 "name": "MasterSchema",
621 "namespace": "com.namespace.master",
622 "type": "record",
623 "fields": [{
624 "name": "field_1",
625 "type": {
626 "name": "Dependency",
627 "namespace": "com.namespace.dependencies",
628 "type": "record",
629 "fields": [
630 {"name": "sub_field_1", "type": "string"}
631 ]
632 }
633 }, {
634 "name": "field_2",
635 "type": "com.namespace.dependencies.Dependency"
636 }]
637 }
638
639 expanded_schema = expand_schema(original_schema)
640
641 assert expanded_schema == {
642 "name": "com.namespace.master.MasterSchema",
643 "type": "record",
644 "fields": [{
645 "name": "field_1",
646 "type": {
647 "name": "com.namespace.dependencies.Dependency",
648 "type": "record",
649 "fields": [
650 {"name": "sub_field_1", "type": "string"}
651 ]
652 }
653 }, {
654 "name": "field_2",
655 "type": {
656 "name": "com.namespace.dependencies.Dependency",
657 "type": "record",
658 "fields": [
659 {"name": "sub_field_1", "type": "string"}
660 ]
661 }
662 }]
663 }
664
665 load_schema(schema_path, *, named_schemas=None, _write_hint=True, _in‐
666 jected_schemas=None)
667 Returns a schema loaded from the file at schema_path.
668
669 Will recursively load referenced schemas assuming they can be
670 found in files in the same directory and named with the conven‐
671 tion <full_name>.avsc.
672
673 Parameters
674
675 • schema (str) -- Path to schema file to load
676
677 • named_schemas (dict) -- Dictionary of named schemas to
678 their schema definition
679
680 • _write_hint (bool) -- Internal API argument specifying
681 whether or not the __fastavro_parsed marker should be
682 added to the schema
683
684 • _injected_schemas (set) -- Internal API argument. Set
685 of names that have been injected
686
687 Consider the following example...
688
689 Parent.avsc:
690
691 {
692 "type": "record",
693 "name": "Parent",
694 "namespace": "namespace",
695 "fields": [
696 {
697 "name": "child",
698 "type": "Child"
699 }
700 ]
701 }
702
703 namespace.Child.avsc:
704
705 {
706 "type": "record",
707 "namespace": "namespace",
708 "name": "Child",
709 "fields": []
710 }
711
712 Code:
713
714 from fastavro.schema import load_schema
715
716 parsed_schema = load_schema("Parent.avsc")
717
718 load_schema_ordered(ordered_schemas, *, _write_hint=True)
719 Returns a schema loaded from a list of schemas.
720
721 The list of schemas should be ordered such that any dependencies
722 are listed before any other schemas that use those dependencies.
723 For example, if schema A depends on schema B and schema B de‐
724 pends on schema C, then the list of schemas should be [C, B, A].
725
726 Parameters
727
728 • ordered_schemas (list) -- List of paths to schemas
729
730 • _write_hint (bool) -- Internal API argument specifying
731 whether or not the __fastavro_parsed marker should be
732 added to the schema
733
734 Consider the following example...
735
736 Parent.avsc:
737
738 {
739 "type": "record",
740 "name": "Parent",
741 "namespace": "namespace",
742 "fields": [
743 {
744 "name": "child",
745 "type": "Child"
746 }
747 ]
748 }
749
750 namespace.Child.avsc:
751
752 {
753 "type": "record",
754 "namespace": "namespace",
755 "name": "Child",
756 "fields": []
757 }
758
759 Code:
760
761 from fastavro.schema import load_schema_ordered
762
763 parsed_schema = load_schema_ordered(
764 ["path/to/namespace.Child.avsc", "path/to/Parent.avsc"]
765 )
766
767 to_parsing_canonical_form(schema)
768 Returns a string represening the parsing canonical form of the
769 schema.
770
771 For more details on the parsing canonical form, see here:
772 https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas
773
774 fingerprint(parsing_canonical_form, algorithm)
775 Returns a string represening a fingerprint/hash of the parsing
776 canonical form of a schema.
777
778 For more details on the fingerprint, see here:
779 https://avro.apache.org/docs/current/spec.html#schema_fingerprints
780
781 fastavro.validation
782 validate(datum, schema, field=None, raise_errors=True)
783 Determine if a python datum is an instance of a schema.
784
785 Parameters
786
787 • datum (Any) -- Data being validated
788
789 • schema (dict) -- Schema
790
791 • field (str, optional) -- Record field being validated
792
793 • raise_errors (bool, optional) -- If true, errors are
794 raised for invalid data. If false, a simple True
795 (valid) or False (invalid) result is returned
796
797 Example:
798
799 from fastavro.validation import validate
800 schema = {...}
801 record = {...}
802 validate(record, schema)
803
804 validate_many(records, schema, raise_errors=True)
805 Validate a list of data!
806
807 Parameters
808
809 • records (iterable) -- List of records to validate
810
811 • schema (dict) -- Schema
812
813 • raise_errors (bool, optional) -- If true, errors are
814 raised for invalid data. If false, a simple True
815 (valid) or False (invalid) result is returned
816
817 Example:
818
819 from fastavro.validation import validate_many
820 schema = {...}
821 records = [{...}, {...}, ...]
822 validate_many(records, schema)
823
824 fastavro.utils
825 generate_one(schema: Union[str, List, Dict]) -> Any
826 Returns a single instance of arbitrary data that conforms to the
827 schema.
828
829 Parameters
830 schema (dict, list, string) -- Schema
831
832 Example:
833
834 from fastavro import schemaless_writer
835 from fastavro.utils import generate_one
836
837 schema = {
838 'doc': 'A weather reading.',
839 'name': 'Weather',
840 'namespace': 'test',
841 'type': 'record',
842 'fields': [
843 {'name': 'station', 'type': 'string'},
844 {'name': 'time', 'type': 'long'},
845 {'name': 'temp', 'type': 'int'},
846 ],
847 }
848
849 with open('weather.avro', 'wb') as out:
850 schemaless_writer(out, schema, generate_one(schema))
851
852 generate_many(schema: Union[str, List, Dict], count: int) -> Itera‐
853 tor[Any]
854 A generator that yields arbitrary data that conforms to the
855 schema. It will yield a number of data structures equal to what
856 is given in the count
857
858 Parameters
859
860 • schema (dict, list, string) -- Schema
861
862 • count (int) -- Number of objects to generate
863
864 Example:
865
866 from fastavro import writer
867 from fastavro.utils import generate_data
868
869 schema = {
870 'doc': 'A weather reading.',
871 'name': 'Weather',
872 'namespace': 'test',
873 'type': 'record',
874 'fields': [
875 {'name': 'station', 'type': 'string'},
876 {'name': 'time', 'type': 'long'},
877 {'name': 'temp', 'type': 'int'},
878 ],
879 }
880
881 with open('weather.avro', 'wb') as out:
882 writer(out, schema, generate_data(schema, 5))
883
884 anonymize_schema(schema: Union[str, List, Dict]) -> Union[str, List,
885 Dict]
886 Returns an anonymized schema
887
888 Parameters
889 schema (dict, list, string) -- Schema
890
891 Example:
892
893 from fastavro.utils import anonymize_schema
894
895 anonymized_schema = anonymize_schema(original_schema)
896
897 Logical Types
898 Fastavro supports the following official logical types:
899
900 • decimal
901
902 • uuid
903
904 • date
905
906 • time-millis
907
908 • time-micros
909
910 • timestamp-millis
911
912 • timestamp-micros
913
914 • local-timestamp-millis
915
916 • local-timestamp-micros
917
918 Fastavro is missing support for the following official logical types:
919
920 • duration
921
922 How to specify logical types in your schemas
923 The docs say that when you want to make something a logical type, you
924 just need to add a logicalType key. Unfortunately, this means that a
925 common confusion is that people will take a pre-existing schema like
926 this:
927
928 schema = {
929 "type": "record",
930 "name": "root",
931 "fields": [
932 {
933 "name": "id",
934 "type": "string",
935 },
936 ]
937 }
938
939 And then add the uuid logical type like this:
940
941 schema = {
942 "type": "record",
943 "name": "root",
944 "fields": [
945 {
946 "name": "id",
947 "type": "string",
948 "logicalType": "uuid", # This is the wrong place to add this key
949 },
950 ]
951 }
952
953 However, that adds the logicalType key to the field schema which is not
954 correct. Instead, we want to group it with the string like so:
955
956 schema = {
957 "type": "record",
958 "name": "root",
959 "fields": [
960 {
961 "name": "id",
962 "type": {
963 "type": "string",
964 "logicalType": "uuid", # This is the correct place to add this key
965 },
966 },
967 ]
968 }
969
970 Custom Logical Types
971 The Avro specification defines a handful of logical types that most im‐
972 plementations support. For example, one of the definied logical types
973 is a microsecond precision timestamp. The specification states that
974 this value will get encoded as an avro long type.
975
976 For the sake of an example, let's say you want to create a new logical
977 type for a microsecond precision timestamp that uses a string as the
978 underlying avro type.
979
980 To do this, there are a few functions that need to be defined. First,
981 we need an encoder function that will encode a datetime object as a
982 string. The encoder function is called with two arguments: the data and
983 the schema. So we could define this like so:
984
985 def encode_datetime_as_string(data, schema):
986 return datetime.isoformat(data)
987
988 # or
989
990 def encode_datetime_as_string(data, *args):
991 return datetime.isoformat(data)
992
993 Then, we need a decoder function that will transform the string back
994 into a datetime object. The decoder function is called with three argu‐
995 ments: the data, the writer schema, and the reader schema. So we could
996 define this like so:
997
998 def decode_string_as_datetime(data, writer_schema, reader_schema):
999 return datetime.fromisoformat(data)
1000
1001 # or
1002
1003 def decode_string_as_datetime(data, *args):
1004 return datetime.fromisoformat(data)
1005
1006 Finally, we need to tell fastavro to use these functions. The schema
1007 for this custom logical type will use the type string and can use what‐
1008 ever name you would like as the logicalType. In this example, let's
1009 suppose we call the logicalType datetime2. To have the library actually
1010 use the custom logical type, we use the name of <avro_type>-<logi‐
1011 cal_type>, so in this example that name would be string-datetime2 and
1012 then we add those functions like so:
1013
1014 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1015 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1016
1017 And you are done. Now if the library comes across a schema with a logi‐
1018 cal type of datetime2 and an avro type of string, it will use the cus‐
1019 tom functions. For a complete example, see here:
1020
1021 import io
1022 from datetime import datetime
1023
1024 import fastavro
1025 from fastavro import writer, reader
1026
1027
1028 def encode_datetime_as_string(data, *args):
1029 return datetime.isoformat(data)
1030
1031 def decode_string_as_datetime(data, *args):
1032 return datetime.fromisoformat(data)
1033
1034 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1035 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1036
1037
1038 writer_schema = fastavro.parse_schema({
1039 "type": "record",
1040 "name": "root",
1041 "fields": [
1042 {
1043 "name": "some_date",
1044 "type": [
1045 "null",
1046 {
1047 "type": "string",
1048 "logicalType": "datetime2",
1049 },
1050 ],
1051 },
1052 ]
1053 })
1054
1055 records = [
1056 {"some_date": datetime.now()}
1057 ]
1058
1059 bio = io.BytesIO()
1060
1061 writer(bio, writer_schema, records)
1062
1063 bio.seek(0)
1064
1065 for record in reader(bio):
1066 print(record)
1067
1068 fastavro command line script
1069 A command line script is installed with the library that can be used to
1070 dump the contents of avro file(s) to the standard output.
1071
1072 Usage:
1073
1074 usage: fastavro [-h] [--schema] [--codecs] [--version] [-p] [file [file ...]]
1075
1076 iter over avro file, emit records as JSON
1077
1078 positional arguments:
1079 file file(s) to parse
1080
1081 optional arguments:
1082 -h, --help show this help message and exit
1083 --schema dump schema instead of records
1084 --codecs print supported codecs
1085 --version show program's version number and exit
1086 -p, --pretty pretty print json
1087
1088 Examples
1089 Read an avro file:
1090
1091 $ fastavro weather.avro
1092
1093 {"temp": 0, "station": "011990-99999", "time": -619524000000}
1094 {"temp": 22, "station": "011990-99999", "time": -619506000000}
1095 {"temp": -11, "station": "011990-99999", "time": -619484400000}
1096 {"temp": 111, "station": "012650-99999", "time": -655531200000}
1097 {"temp": 78, "station": "012650-99999", "time": -655509600000}
1098
1099 Show the schema:
1100
1101 $ fastavro --schema weather.avro
1102
1103 {
1104 "type": "record",
1105 "namespace": "test",
1106 "doc": "A weather reading.",
1107 "fields": [
1108 {
1109 "type": "string",
1110 "name": "station"
1111 },
1112 {
1113 "type": "long",
1114 "name": "time"
1115 },
1116 {
1117 "type": "int",
1118 "name": "temp"
1119 }
1120 ],
1121 "name": "Weather"
1122 }
1123
1124 • genindex
1125
1126 • modindex
1127
1128 • search
1129
1131 Miki Tebeka
1132
1134 2021, Miki Tebeka
1135
1136
1137
1138
11391.4.7 Oct 29, 2021 FASTAVRO(1)