Source code for jsonseq.decode
"""RFC 7464 GeoJSON Text Sequence decoding."""
import json
from typing import Iterable, Iterator
[docs]class JSONSeqDecoder(object):
"""Decode Python objects from a stream of JSON texts."""
def __init__(self, **kwds):
"""Create a decoder.
Parameters
----------
kwds : dict, optional
Keyword arguments for JSONDecoder()
"""
self.decoder = json.JSONDecoder(**kwds)
[docs] def decode(self, seq: Iterable) -> Iterator[object]:
"""Iterate over decoded objects in the JSON text sequence.
Parameters
----------
seq : Iterable
JSON strings or pieces of strings.
Yields
------
object
"""
buffer = ""
has_rs = None
for line in seq:
if has_rs is None:
has_rs = line.startswith(u"\x1e")
if not has_rs:
yield self.decoder.decode(line)
else:
if line.startswith(u"\x1e"):
if buffer:
yield self.decoder.decode(buffer)
buffer = line.lstrip(u"\x1e")
else:
buffer += line
if buffer:
yield self.decoder.decode(buffer)
return