1FASTAVRO(1) fastavro FASTAVRO(1)
2
3
4
6 fastavro - fastavro Documentation
7
8 The current Python avro package is dog slow.
9
10 On a test case of about 10K records, it takes about 14sec to iterate
11 over all of them. In comparison the JAVA avro SDK does it in about
12 1.9sec.
13
14 fastavro is an alternative implementation that is much faster. It iter‐
15 ates over the same 10K records in 2.9sec, and if you use it with PyPy
16 it'll do it in 1.5sec (to be fair, the JAVA benchmark is doing some ex‐
17 tra JSON encoding/decoding).
18
19 If the optional C extension (generated by Cython) is available, then
20 fastavro will be even faster. For the same 10K records it'll run in
21 about 1.7sec.
22
24 • File Writer
25
26 • File Reader (iterating via records or blocks)
27
28 • Schemaless Writer
29
30 • Schemaless Reader
31
32 • JSON Writer
33
34 • JSON Reader
35
36 • Codecs (Snappy, Deflate, Zstandard, Bzip2, LZ4, XZ)
37
38 • Schema resolution
39
40 • Aliases
41
42 • Logical Types
43
44 • Parsing schemas into the canonical form
45
46 • Schema fingerprinting
47
49 • Anything involving Avro's RPC features
50
52 from fastavro import writer, reader, parse_schema
53
54 schema = {
55 'doc': 'A weather reading.',
56 'name': 'Weather',
57 'namespace': 'test',
58 'type': 'record',
59 'fields': [
60 {'name': 'station', 'type': 'string'},
61 {'name': 'time', 'type': 'long'},
62 {'name': 'temp', 'type': 'int'},
63 ],
64 }
65 parsed_schema = parse_schema(schema)
66
67 # 'records' can be an iterable (including generator)
68 records = [
69 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
70 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
71 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
72 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
73 ]
74
75 # Writing
76 with open('weather.avro', 'wb') as out:
77 writer(out, parsed_schema, records)
78
79 # Reading
80 with open('weather.avro', 'rb') as fo:
81 for record in reader(fo):
82 print(record)
83
86 class reader(fo: Union[IO, fastavro.io.json_decoder.AvroJSONDecoder],
87 reader_schema: Optional[Union[str, List, Dict]] = None, re‐
88 turn_record_name: bool = False)
89 Iterator over records in an avro file.
90
91 Parameters
92
93 • fo -- File-like object to read from
94
95 • reader_schema -- Reader schema
96
97 • return_record_name -- If true, when reading a union of
98 records, the result will be a tuple where the first
99 value is the name of the record and the second value is
100 the record itself
101
102 Example:
103
104 from fastavro import reader
105 with open('some-file.avro', 'rb') as fo:
106 avro_reader = reader(fo)
107 for record in avro_reader:
108 process_record(record)
109
110 The fo argument is a file-like object so another common example
111 usage would use an io.BytesIO object like so:
112
113 from io import BytesIO
114 from fastavro import writer, reader
115
116 fo = BytesIO()
117 writer(fo, schema, records)
118 fo.seek(0)
119 for record in reader(fo):
120 process_record(record)
121
122 metadata
123 Key-value pairs in the header metadata
124
125 codec The codec used when writing
126
127 writer_schema
128 The schema used when writing
129
130 reader_schema
131 The schema used when reading (if provided)
132
133 class block_reader(fo: IO, reader_schema: Optional[Union[str, List,
134 Dict]] = None, return_record_name: bool = False)
135 Iterator over Block in an avro file.
136
137 Parameters
138
139 • fo -- Input stream
140
141 • reader_schema -- Reader schema
142
143 • return_record_name -- If true, when reading a union of
144 records, the result will be a tuple where the first
145 value is the name of the record and the second value is
146 the record itself
147
148 Example:
149
150 from fastavro import block_reader
151 with open('some-file.avro', 'rb') as fo:
152 avro_reader = block_reader(fo)
153 for block in avro_reader:
154 process_block(block)
155
156 metadata
157 Key-value pairs in the header metadata
158
159 codec The codec used when writing
160
161 writer_schema
162 The schema used when writing
163
164 reader_schema
165 The schema used when reading (if provided)
166
167 class Block(bytes_, num_records, codec, reader_schema, writer_schema,
168 named_schemas, offset, size, return_record_name=False)
169 An avro block. Will yield records when iterated over
170
171 num_records
172 Number of records in the block
173
174 writer_schema
175 The schema used when writing
176
177 reader_schema
178 The schema used when reading (if provided)
179
180 offset Offset of the block from the begining of the avro file
181
182 size Size of the block in bytes
183
184 schemaless_reader(fo: IO, writer_schema: Union[str, List, Dict],
185 reader_schema: Optional[Union[str, List, Dict]] = None, re‐
186 turn_record_name: bool = False) -> Union[None, str, float, int, deci‐
187 mal.Decimal, bool, bytes, List, Dict]
188 Reads a single record writen using the schemaless_writer()
189
190 Parameters
191
192 • fo -- Input stream
193
194 • writer_schema -- Schema used when calling schema‐
195 less_writer
196
197 • reader_schema -- If the schema has changed since being
198 written then the new schema can be given to allow for
199 schema migration
200
201 • return_record_name -- If true, when reading a union of
202 records, the result will be a tuple where the first
203 value is the name of the record and the second value is
204 the record itself
205
206 Example:
207
208 parsed_schema = fastavro.parse_schema(schema)
209 with open('file', 'rb') as fp:
210 record = fastavro.schemaless_reader(fp, parsed_schema)
211
212 Note: The schemaless_reader can only read a single record.
213
214 is_avro(path_or_buffer: Union[str, IO]) -> bool
215 Return True if path (or buffer) points to an Avro file. This
216 will only work for avro files that contain the normal avro
217 schema header like those create from writer(). This function is
218 not intended to be used with binary data created from schema‐
219 less_writer() since that does not include the avro header.
220
221 Parameters
222 path_or_buffer -- Path to file
223
225 writer(fo: Union[IO, fastavro.io.json_encoder.AvroJSONEncoder], schema:
226 Union[str, List, Dict], records: Iterable[Any], codec: str = 'null',
227 sync_interval: int = 16000, metadata: Optional[Dict[str, str]] = None,
228 validator: bool = False, sync_marker: bytes = b'', codec_compres‐
229 sion_level: Optional[int] = None)
230 Write records to fo (stream) according to schema
231
232 Parameters
233
234 • fo -- Output stream
235
236 • schema -- Writer schema
237
238 • records -- Records to write. This is commonly a list of
239 the dictionary representation of the records, but it
240 can be any iterable
241
242 • codec -- Compression codec, can be 'null', 'deflate' or
243 'snappy' (if installed)
244
245 • sync_interval -- Size of sync interval
246
247 • metadata -- Header metadata
248
249 • validator -- If true, validation will be done on the
250 records
251
252 • sync_marker -- A byte string used as the avro sync
253 marker. If not provided, a random byte string will be
254 used.
255
256 • codec_compression_level -- Compression level to use
257 with the specified codec (if the codec supports it)
258
259 Example:
260
261 from fastavro import writer, parse_schema
262
263 schema = {
264 'doc': 'A weather reading.',
265 'name': 'Weather',
266 'namespace': 'test',
267 'type': 'record',
268 'fields': [
269 {'name': 'station', 'type': 'string'},
270 {'name': 'time', 'type': 'long'},
271 {'name': 'temp', 'type': 'int'},
272 ],
273 }
274 parsed_schema = parse_schema(schema)
275
276 records = [
277 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
278 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
279 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
280 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
281 ]
282
283 with open('weather.avro', 'wb') as out:
284 writer(out, parsed_schema, records)
285
286 The fo argument is a file-like object so another common example
287 usage would use an io.BytesIO object like so:
288
289 from io import BytesIO
290 from fastavro import writer
291
292 fo = BytesIO()
293 writer(fo, schema, records)
294
295 Given an existing avro file, it's possible to append to it by
296 re-opening the file in a+b mode. If the file is only opened in
297 ab mode, we aren't able to read some of the existing header in‐
298 formation and an error will be raised. For example:
299
300 # Write initial records
301 with open('weather.avro', 'wb') as out:
302 writer(out, parsed_schema, records)
303
304 # Write some more records
305 with open('weather.avro', 'a+b') as out:
306 writer(out, None, more_records)
307
308 Note: When appending, any schema provided will be ignored since
309 the schema in the avro file will be re-used. Therefore it is
310 convenient to just use None as the schema.
311
312 schemaless_writer(fo: IO, schema: Union[str, List, Dict], record: Any)
313 Write a single record without the schema or header information
314
315 Parameters
316
317 • fo -- Output file
318
319 • schema -- Schema
320
321 • record -- Record to write
322
323 Example:
324
325 parsed_schema = fastavro.parse_schema(schema)
326 with open('file', 'wb') as fp:
327 fastavro.schemaless_writer(fp, parsed_schema, record)
328
329 Note: The schemaless_writer can only write a single record.
330
331 Using the tuple notation to specify which branch of a union to take
332 Since this library uses plain dictionaries to reprensent a record, it
333 is possible for that dictionary to fit the definition of two different
334 records.
335
336 For example, given a dictionary like this:
337
338 {"name": "My Name"}
339
340 It would be valid against both of these records:
341
342 child_schema = {
343 "name": "Child",
344 "type": "record",
345 "fields": [
346 {"name": "name", "type": "string"},
347 {"name": "favorite_color", "type": ["null", "string"]},
348 ]
349 }
350
351 pet_schema = {
352 "name": "Pet",
353 "type": "record",
354 "fields": [
355 {"name": "name", "type": "string"},
356 {"name": "favorite_toy", "type": ["null", "string"]},
357 ]
358 }
359
360 This becomes a problem when a schema contains a union of these two sim‐
361 ilar records as it is not clear which record the dictionary represents.
362 For example, if you used the previous dictionary with the following
363 schema, it wouldn't be clear if the record should be serialized as a
364 Child or a Pet:
365
366 household_schema = {
367 "name": "Household",
368 "type": "record",
369 "fields": [
370 {"name": "address", "type": "string"},
371 {
372 "name": "family_members",
373 "type": {
374 "type": "array", "items": [
375 {
376 "name": "Child",
377 "type": "record",
378 "fields": [
379 {"name": "name", "type": "string"},
380 {"name": "favorite_color", "type": ["null", "string"]},
381 ]
382 }, {
383 "name": "Pet",
384 "type": "record",
385 "fields": [
386 {"name": "name", "type": "string"},
387 {"name": "favorite_toy", "type": ["null", "string"]},
388 ]
389 }
390 ]
391 }
392 },
393 ]
394 }
395
396 To resolve this, you can use a tuple notation where the first value of
397 the tuple is the fully namespaced record name and the second value is
398 the dictionary. For example:
399
400 records = [
401 {
402 "address": "123 Drive Street",
403 "family_members": [
404 ("Child", {"name": "Son"}),
405 ("Child", {"name": "Daughter"}),
406 ("Pet", {"name": "Dog"}),
407 ]
408 }
409 ]
410
411 Using the record hint to specify which branch of a union to take
412 In addition to the tuple notation for specifing the name of a record,
413 you can also include a special -type attribute (note that this attri‐
414 bute is -type, not type) on a record to do the same thing. So the exam‐
415 ple above which looked like this:
416
417 records = [
418 {
419 "address": "123 Drive Street",
420 "family_members": [
421 ("Child", {"name": "Son"}),
422 ("Child", {"name": "Daughter"}),
423 ("Pet", {"name": "Dog"}),
424 ]
425 }
426 ]
427
428 Would now look like this:
429
430 records = [
431 {
432 "address": "123 Drive Street",
433 "family_members": [
434 {"-type": "Child", "name": "Son"},
435 {"-type": "Child", "name": "Daughter"},
436 {"-type": "Pet", "name": "Dog"},
437 ]
438 }
439 ]
440
441 Unlike the tuple notation which can be used with any avro type in a
442 union, this -type hint can only be used with records. However, this can
443 be useful if you want to make a single record dictionary that can be
444 used both in and out of unions.
445
447 json_reader(fo: IO, schema: Union[str, List, Dict]) -> fas‐
448 tavro._read_py.reader
449 Iterator over records in an avro json file.
450
451 Parameters
452
453 • fo -- File-like object to read from
454
455 • schema -- Reader schema
456
457 Example:
458
459 from fastavro import json_reader
460
461 schema = {
462 'doc': 'A weather reading.',
463 'name': 'Weather',
464 'namespace': 'test',
465 'type': 'record',
466 'fields': [
467 {'name': 'station', 'type': 'string'},
468 {'name': 'time', 'type': 'long'},
469 {'name': 'temp', 'type': 'int'},
470 ]
471 }
472
473 with open('some-file', 'r') as fo:
474 avro_reader = json_reader(fo, schema)
475 for record in avro_reader:
476 print(record)
477
479 json_writer(fo: IO, schema: Union[str, List, Dict], records: Iter‐
480 able[Any], *, write_union_type: bool = True, validator: bool = False)
481 -> None
482 Write records to fo (stream) according to schema
483
484 Parameters
485
486 • fo -- File-like object to write to
487
488 • schema -- Writer schema
489
490 • records -- Records to write. This is commonly a list of
491 the dictionary representation of the records, but it
492 can be any iterable
493
494 • write_union_type -- Determine whether to write the
495 union type in the json message. If this is set to
496 False the output will be clear json. It may however
497 not be decodable back to avro record by json_read.
498
499 • validator -- If true, validation will be done on the
500 records
501
502 Example:
503
504 from fastavro import json_writer, parse_schema
505
506 schema = {
507 'doc': 'A weather reading.',
508 'name': 'Weather',
509 'namespace': 'test',
510 'type': 'record',
511 'fields': [
512 {'name': 'station', 'type': 'string'},
513 {'name': 'time', 'type': 'long'},
514 {'name': 'temp', 'type': 'int'},
515 ],
516 }
517 parsed_schema = parse_schema(schema)
518
519 records = [
520 {u'station': u'011990-99999', u'temp': 0, u'time': 1433269388},
521 {u'station': u'011990-99999', u'temp': 22, u'time': 1433270389},
522 {u'station': u'011990-99999', u'temp': -11, u'time': 1433273379},
523 {u'station': u'012650-99999', u'temp': 111, u'time': 1433275478},
524 ]
525
526 with open('some-file', 'w') as out:
527 json_writer(out, parsed_schema, records)
528
530 parse_schema(schema: Union[str, List, Dict], named_schemas: Op‐
531 tional[Dict[str, Dict]] = None, *, expand: bool = False, _write_hint:
532 bool = True, _force: bool = False) -> Union[str, List, Dict]
533 Returns a parsed avro schema
534
535 It is not necessary to call parse_schema but doing so and saving
536 the parsed schema for use later will make future operations
537 faster as the schema will not need to be reparsed.
538
539 Parameters
540
541 • schema -- Input schema
542
543 • named_schemas -- Dictionary of named schemas to their
544 schema definition
545
546 • expand -- If true, named schemas will be fully expanded
547 to their true schemas rather than being represented as
548 just the name. This format should be considered an out‐
549 put only and not passed in to other reader/writer func‐
550 tions as it does not conform to the avro specification
551 and will likely cause an exception
552
553 • _write_hint -- Internal API argument specifying whether
554 or not the __fastavro_parsed marker should be added to
555 the schema
556
557 • _force -- Internal API argument. If True, the schema
558 will always be parsed even if it has been parsed and
559 has the __fastavro_parsed marker
560
561 Example:
562
563 from fastavro import parse_schema
564 from fastavro import writer
565
566 parsed_schema = parse_schema(original_schema)
567 with open('weather.avro', 'wb') as out:
568 writer(out, parsed_schema, records)
569
570 Sometimes you might have two schemas where one schema references
571 another. For the sake of example, let's assume you have a Par‐
572 ent schema that references a Child schema`. If you were to try
573 to parse the parent schema on its own, you would get an excep‐
574 tion because the child schema isn't defined. To accomodate this,
575 we can use the named_schemas argument to pass a shared dictio‐
576 nary when parsing both of the schemas. The dictionary will get
577 populated with the necessary schema references to make parsing
578 possible. For example:
579
580 from fastavro import parse_schema
581
582 named_schemas = {}
583 parsed_child = parse_schema(child_schema, named_schemas)
584 parsed_parent = parse_schema(parent_schema, named_schemas)
585
586 fullname(schema: Dict) -> str
587 Returns the fullname of a schema
588
589 Parameters
590 schema -- Input schema
591
592 Example:
593
594 from fastavro.schema import fullname
595
596 schema = {
597 'doc': 'A weather reading.',
598 'name': 'Weather',
599 'namespace': 'test',
600 'type': 'record',
601 'fields': [
602 {'name': 'station', 'type': 'string'},
603 {'name': 'time', 'type': 'long'},
604 {'name': 'temp', 'type': 'int'},
605 ],
606 }
607
608 fname = fullname(schema)
609 assert fname == "test.Weather"
610
611 expand_schema(schema: Union[str, List, Dict]) -> Union[str, List, Dict]
612 Returns a schema where all named types are expanded to their
613 real schema
614
615 NOTE: The output of this function produces a schema that can in‐
616 clude multiple definitions of the same named type (as per de‐
617 sign) which are not valid per the avro specification. Therefore,
618 the output of this should not be passed to the normal
619 writer/reader functions as it will likely result in an error.
620
621 Parameters
622 schema (dict) -- Input schema
623
624 Example:
625
626 from fastavro.schema import expand_schema
627
628 original_schema = {
629 "name": "MasterSchema",
630 "namespace": "com.namespace.master",
631 "type": "record",
632 "fields": [{
633 "name": "field_1",
634 "type": {
635 "name": "Dependency",
636 "namespace": "com.namespace.dependencies",
637 "type": "record",
638 "fields": [
639 {"name": "sub_field_1", "type": "string"}
640 ]
641 }
642 }, {
643 "name": "field_2",
644 "type": "com.namespace.dependencies.Dependency"
645 }]
646 }
647
648 expanded_schema = expand_schema(original_schema)
649
650 assert expanded_schema == {
651 "name": "com.namespace.master.MasterSchema",
652 "type": "record",
653 "fields": [{
654 "name": "field_1",
655 "type": {
656 "name": "com.namespace.dependencies.Dependency",
657 "type": "record",
658 "fields": [
659 {"name": "sub_field_1", "type": "string"}
660 ]
661 }
662 }, {
663 "name": "field_2",
664 "type": {
665 "name": "com.namespace.dependencies.Dependency",
666 "type": "record",
667 "fields": [
668 {"name": "sub_field_1", "type": "string"}
669 ]
670 }
671 }]
672 }
673
674 load_schema(schema_path: str, *, repo: Optional[fastavro.reposi‐
675 tory.base.AbstractSchemaRepository] = None, named_schemas: Op‐
676 tional[Dict[str, Dict]] = None, _write_hint: bool = True, _in‐
677 jected_schemas: Optional[Set[str]] = None) -> Union[str, List, Dict]
678 Returns a schema loaded from repository.
679
680 Will recursively load referenced schemas attempting to load them
681 from same repository, using schema_path as schema name.
682
683 If repo is not provided, FlatDictRepository is used. FlatDic‐
684 tRepository will try to load schemas from the same directory as‐
685 suming files are named with the convention <full_name>.avsc.
686
687 Parameters
688
689 • schema_path -- Full schema name, or path to schema file
690 if default repo is used.
691
692 • repo -- Schema repository instance.
693
694 • named_schemas -- Dictionary of named schemas to their
695 schema definition
696
697 • _write_hint -- Internal API argument specifying whether
698 or not the __fastavro_parsed marker should be added to
699 the schema
700
701 • _injected_schemas -- Internal API argument. Set of
702 names that have been injected
703
704 Consider the following example with default FlatDictReposi‐
705 tory...
706
707 namespace.Parent.avsc:
708
709 {
710 "type": "record",
711 "name": "Parent",
712 "namespace": "namespace",
713 "fields": [
714 {
715 "name": "child",
716 "type": "Child"
717 }
718 ]
719 }
720
721 namespace.Child.avsc:
722
723 {
724 "type": "record",
725 "namespace": "namespace",
726 "name": "Child",
727 "fields": []
728 }
729
730 Code:
731
732 from fastavro.schema import load_schema
733
734 parsed_schema = load_schema("namespace.Parent.avsc")
735
736 load_schema_ordered(ordered_schemas: List[str], *, _write_hint: bool =
737 True) -> Union[str, List, Dict]
738 Returns a schema loaded from a list of schemas.
739
740 The list of schemas should be ordered such that any dependencies
741 are listed before any other schemas that use those dependencies.
742 For example, if schema A depends on schema B and schema B de‐
743 pends on schema C, then the list of schemas should be [C, B, A].
744
745 Parameters
746
747 • ordered_schemas -- List of paths to schemas
748
749 • _write_hint -- Internal API argument specifying whether
750 or not the __fastavro_parsed marker should be added to
751 the schema
752
753 Consider the following example...
754
755 Parent.avsc:
756
757 {
758 "type": "record",
759 "name": "Parent",
760 "namespace": "namespace",
761 "fields": [
762 {
763 "name": "child",
764 "type": "Child"
765 }
766 ]
767 }
768
769 namespace.Child.avsc:
770
771 {
772 "type": "record",
773 "namespace": "namespace",
774 "name": "Child",
775 "fields": []
776 }
777
778 Code:
779
780 from fastavro.schema import load_schema_ordered
781
782 parsed_schema = load_schema_ordered(
783 ["path/to/namespace.Child.avsc", "path/to/Parent.avsc"]
784 )
785
786 to_parsing_canonical_form(schema: Union[str, List, Dict]) -> str
787 Returns a string represening the parsing canonical form of the
788 schema.
789
790 For more details on the parsing canonical form, see here:
791 https://avro.apache.org/docs/current/spec.html#Parsing+Canonical+Form+for+Schemas
792
793 Parameters
794 schema -- Schema to transform
795
796 fingerprint(parsing_canonical_form: str, algorithm: str) -> str
797 Returns a string represening a fingerprint/hash of the parsing
798 canonical form of a schema.
799
800 For more details on the fingerprint, see here:
801 https://avro.apache.org/docs/current/spec.html#schema_fingerprints
802
803 Parameters
804
805 • parsing_canonical_form -- The parsing canonical form of
806 a schema
807
808 • algorithm -- The hashing algorithm
809
811 validate(datum: Any, schema: Union[str, List, Dict], field: str = '',
812 raise_errors: bool = True) -> bool
813 Determine if a python datum is an instance of a schema.
814
815 Parameters
816
817 • datum -- Data being validated
818
819 • schema -- Schema
820
821 • field -- Record field being validated
822
823 • raise_errors -- If true, errors are raised for invalid
824 data. If false, a simple True (valid) or False (in‐
825 valid) result is returned
826
827 Example:
828
829 from fastavro.validation import validate
830 schema = {...}
831 record = {...}
832 validate(record, schema)
833
834 validate_many(records: Iterable[Any], schema: Union[str, List, Dict],
835 raise_errors: bool = True) -> bool
836 Validate a list of data!
837
838 Parameters
839
840 • records -- List of records to validate
841
842 • schema -- Schema
843
844 • raise_errors -- If true, errors are raised for invalid
845 data. If false, a simple True (valid) or False (in‐
846 valid) result is returned
847
848 Example:
849
850 from fastavro.validation import validate_many
851 schema = {...}
852 records = [{...}, {...}, ...]
853 validate_many(records, schema)
854
856 generate_one(schema: Union[str, List, Dict]) -> Any
857 Returns a single instance of arbitrary data that conforms to the
858 schema.
859
860 Parameters
861 schema -- Schema that data should conform to
862
863 Example:
864
865 from fastavro import schemaless_writer
866 from fastavro.utils import generate_one
867
868 schema = {
869 'doc': 'A weather reading.',
870 'name': 'Weather',
871 'namespace': 'test',
872 'type': 'record',
873 'fields': [
874 {'name': 'station', 'type': 'string'},
875 {'name': 'time', 'type': 'long'},
876 {'name': 'temp', 'type': 'int'},
877 ],
878 }
879
880 with open('weather.avro', 'wb') as out:
881 schemaless_writer(out, schema, generate_one(schema))
882
883 generate_many(schema: Union[str, List, Dict], count: int) -> Itera‐
884 tor[Any]
885 A generator that yields arbitrary data that conforms to the
886 schema. It will yield a number of data structures equal to what
887 is given in the count
888
889 Parameters
890
891 • schema -- Schema that data should conform to
892
893 • count -- Number of objects to generate
894
895 Example:
896
897 from fastavro import writer
898 from fastavro.utils import generate_many
899
900 schema = {
901 'doc': 'A weather reading.',
902 'name': 'Weather',
903 'namespace': 'test',
904 'type': 'record',
905 'fields': [
906 {'name': 'station', 'type': 'string'},
907 {'name': 'time', 'type': 'long'},
908 {'name': 'temp', 'type': 'int'},
909 ],
910 }
911
912 with open('weather.avro', 'wb') as out:
913 writer(out, schema, generate_many(schema, 5))
914
915 anonymize_schema(schema: Union[str, List, Dict]) -> Union[str, List,
916 Dict]
917 Returns an anonymized schema
918
919 Parameters
920 schema -- Schema to anonymize
921
922 Example:
923
924 from fastavro.utils import anonymize_schema
925
926 anonymized_schema = anonymize_schema(original_schema)
927
929 Fastavro supports the following official logical types:
930
931 • decimal
932
933 • uuid
934
935 • date
936
937 • time-millis
938
939 • time-micros
940
941 • timestamp-millis
942
943 • timestamp-micros
944
945 • local-timestamp-millis
946
947 • local-timestamp-micros
948
949 Fastavro is missing support for the following official logical types:
950
951 • duration
952
953 How to specify logical types in your schemas
954 The docs say that when you want to make something a logical type, you
955 just need to add a logicalType key. Unfortunately, this means that a
956 common confusion is that people will take a pre-existing schema like
957 this:
958
959 schema = {
960 "type": "record",
961 "name": "root",
962 "fields": [
963 {
964 "name": "id",
965 "type": "string",
966 },
967 ]
968 }
969
970 And then add the uuid logical type like this:
971
972 schema = {
973 "type": "record",
974 "name": "root",
975 "fields": [
976 {
977 "name": "id",
978 "type": "string",
979 "logicalType": "uuid", # This is the wrong place to add this key
980 },
981 ]
982 }
983
984 However, that adds the logicalType key to the field schema which is not
985 correct. Instead, we want to group it with the string like so:
986
987 schema = {
988 "type": "record",
989 "name": "root",
990 "fields": [
991 {
992 "name": "id",
993 "type": {
994 "type": "string",
995 "logicalType": "uuid", # This is the correct place to add this key
996 },
997 },
998 ]
999 }
1000
1001 Custom Logical Types
1002 The Avro specification defines a handful of logical types that most im‐
1003 plementations support. For example, one of the definied logical types
1004 is a microsecond precision timestamp. The specification states that
1005 this value will get encoded as an avro long type.
1006
1007 For the sake of an example, let's say you want to create a new logical
1008 type for a microsecond precision timestamp that uses a string as the
1009 underlying avro type.
1010
1011 To do this, there are a few functions that need to be defined. First,
1012 we need an encoder function that will encode a datetime object as a
1013 string. The encoder function is called with two arguments: the data and
1014 the schema. So we could define this like so:
1015
1016 def encode_datetime_as_string(data, schema):
1017 return datetime.isoformat(data)
1018
1019 # or
1020
1021 def encode_datetime_as_string(data, *args):
1022 return datetime.isoformat(data)
1023
1024 Then, we need a decoder function that will transform the string back
1025 into a datetime object. The decoder function is called with three argu‐
1026 ments: the data, the writer schema, and the reader schema. So we could
1027 define this like so:
1028
1029 def decode_string_as_datetime(data, writer_schema, reader_schema):
1030 return datetime.fromisoformat(data)
1031
1032 # or
1033
1034 def decode_string_as_datetime(data, *args):
1035 return datetime.fromisoformat(data)
1036
1037 Finally, we need to tell fastavro to use these functions. The schema
1038 for this custom logical type will use the type string and can use what‐
1039 ever name you would like as the logicalType. In this example, let's
1040 suppose we call the logicalType datetime2. To have the library actually
1041 use the custom logical type, we use the name of <avro_type>-<logi‐
1042 cal_type>, so in this example that name would be string-datetime2 and
1043 then we add those functions like so:
1044
1045 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1046 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1047
1048 And you are done. Now if the library comes across a schema with a logi‐
1049 cal type of datetime2 and an avro type of string, it will use the cus‐
1050 tom functions. For a complete example, see here:
1051
1052 import io
1053 from datetime import datetime
1054
1055 import fastavro
1056 from fastavro import writer, reader
1057
1058
1059 def encode_datetime_as_string(data, *args):
1060 return datetime.isoformat(data)
1061
1062 def decode_string_as_datetime(data, *args):
1063 return datetime.fromisoformat(data)
1064
1065 fastavro.write.LOGICAL_WRITERS["string-datetime2"] = encode_datetime_as_string
1066 fastavro.read.LOGICAL_READERS["string-datetime2"] = decode_string_as_datetime
1067
1068
1069 writer_schema = fastavro.parse_schema({
1070 "type": "record",
1071 "name": "root",
1072 "fields": [
1073 {
1074 "name": "some_date",
1075 "type": [
1076 "null",
1077 {
1078 "type": "string",
1079 "logicalType": "datetime2",
1080 },
1081 ],
1082 },
1083 ]
1084 })
1085
1086 records = [
1087 {"some_date": datetime.now()}
1088 ]
1089
1090 bio = io.BytesIO()
1091
1092 writer(bio, writer_schema, records)
1093
1094 bio.seek(0)
1095
1096 for record in reader(bio):
1097 print(record)
1098
1100 A command line script is installed with the library that can be used to
1101 dump the contents of avro file(s) to the standard output.
1102
1103 Usage:
1104
1105 usage: fastavro [-h] [--schema] [--codecs] [--version] [-p] [file [file ...]]
1106
1107 iter over avro file, emit records as JSON
1108
1109 positional arguments:
1110 file file(s) to parse
1111
1112 optional arguments:
1113 -h, --help show this help message and exit
1114 --schema dump schema instead of records
1115 --codecs print supported codecs
1116 --version show program's version number and exit
1117 -p, --pretty pretty print json
1118
1119 Examples
1120 Read an avro file:
1121
1122 $ fastavro weather.avro
1123
1124 {"temp": 0, "station": "011990-99999", "time": -619524000000}
1125 {"temp": 22, "station": "011990-99999", "time": -619506000000}
1126 {"temp": -11, "station": "011990-99999", "time": -619484400000}
1127 {"temp": 111, "station": "012650-99999", "time": -655531200000}
1128 {"temp": 78, "station": "012650-99999", "time": -655509600000}
1129
1130 Show the schema:
1131
1132 $ fastavro --schema weather.avro
1133
1134 {
1135 "type": "record",
1136 "namespace": "test",
1137 "doc": "A weather reading.",
1138 "fields": [
1139 {
1140 "type": "string",
1141 "name": "station"
1142 },
1143 {
1144 "type": "long",
1145 "name": "time"
1146 },
1147 {
1148 "type": "int",
1149 "name": "temp"
1150 }
1151 ],
1152 "name": "Weather"
1153 }
1154
1156 fastavro.io.json_decoder
1157 class AvroJSONDecoder(fo: IO)
1158 Decoder for the avro JSON format.
1159
1160 NOTE: All attributes and methods on this class should be consid‐
1161 ered private.
1162
1163 Parameters
1164 fo -- File-like object to reader from
1165
1166 fastavro.io.json_encoder
1167 class AvroJSONEncoder(fo: IO, *, write_union_type: bool = True)
1168 Encoder for the avro JSON format.
1169
1170 NOTE: All attributes and methods on this class should be consid‐
1171 ered private.
1172
1173 Parameters
1174
1175 • fo -- Input stream
1176
1177 • write_union_type -- Determine whether to write the
1178 union type in the json message.
1179
1181 fastavro.repository.base
1182 class AbstractSchemaRepository
1183
1184 • genindex
1185
1186 • modindex
1187
1188 • search
1189
1191 Miki Tebeka
1192
1194 2022, Miki Tebeka
1195
1196
1197
1198
11991.5.1 Jun 10, 2022 FASTAVRO(1)