Spaces:
Running
Running
text-generation-webui
/
installer_files
/conda
/lib
/python3.10
/site-packages
/ruamel
/yaml
/parser.py
# coding: utf-8 | |
# The following YAML grammar is LL(1) and is parsed by a recursive descent | |
# parser. | |
# | |
# stream ::= STREAM-START implicit_document? explicit_document* | |
# STREAM-END | |
# implicit_document ::= block_node DOCUMENT-END* | |
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* | |
# block_node_or_indentless_sequence ::= | |
# ALIAS | |
# | properties (block_content | | |
# indentless_block_sequence)? | |
# | block_content | |
# | indentless_block_sequence | |
# block_node ::= ALIAS | |
# | properties block_content? | |
# | block_content | |
# flow_node ::= ALIAS | |
# | properties flow_content? | |
# | flow_content | |
# properties ::= TAG ANCHOR? | ANCHOR TAG? | |
# block_content ::= block_collection | flow_collection | SCALAR | |
# flow_content ::= flow_collection | SCALAR | |
# block_collection ::= block_sequence | block_mapping | |
# flow_collection ::= flow_sequence | flow_mapping | |
# block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* | |
# BLOCK-END | |
# indentless_sequence ::= (BLOCK-ENTRY block_node?)+ | |
# block_mapping ::= BLOCK-MAPPING_START | |
# ((KEY block_node_or_indentless_sequence?)? | |
# (VALUE block_node_or_indentless_sequence?)?)* | |
# BLOCK-END | |
# flow_sequence ::= FLOW-SEQUENCE-START | |
# (flow_sequence_entry FLOW-ENTRY)* | |
# flow_sequence_entry? | |
# FLOW-SEQUENCE-END | |
# flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? | |
# flow_mapping ::= FLOW-MAPPING-START | |
# (flow_mapping_entry FLOW-ENTRY)* | |
# flow_mapping_entry? | |
# FLOW-MAPPING-END | |
# flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? | |
# | |
# FIRST sets: | |
# | |
# stream: { STREAM-START <} | |
# explicit_document: { DIRECTIVE DOCUMENT-START } | |
# implicit_document: FIRST(block_node) | |
# block_node: { ALIAS TAG ANCHOR SCALAR BLOCK-SEQUENCE-START | |
# BLOCK-MAPPING-START FLOW-SEQUENCE-START FLOW-MAPPING-START } | |
# flow_node: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START FLOW-MAPPING-START } | |
# block_content: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START | |
# FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR } | |
# flow_content: { FLOW-SEQUENCE-START FLOW-MAPPING-START SCALAR } | |
# block_collection: { BLOCK-SEQUENCE-START BLOCK-MAPPING-START } | |
# flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START } | |
# block_sequence: { BLOCK-SEQUENCE-START } | |
# block_mapping: { BLOCK-MAPPING-START } | |
# block_node_or_indentless_sequence: { ALIAS ANCHOR TAG SCALAR | |
# BLOCK-SEQUENCE-START BLOCK-MAPPING-START FLOW-SEQUENCE-START | |
# FLOW-MAPPING-START BLOCK-ENTRY } | |
# indentless_sequence: { ENTRY } | |
# flow_collection: { FLOW-SEQUENCE-START FLOW-MAPPING-START } | |
# flow_sequence: { FLOW-SEQUENCE-START } | |
# flow_mapping: { FLOW-MAPPING-START } | |
# flow_sequence_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START | |
# FLOW-MAPPING-START KEY } | |
# flow_mapping_entry: { ALIAS ANCHOR TAG SCALAR FLOW-SEQUENCE-START | |
# FLOW-MAPPING-START KEY } | |
# need to have full path with import, as pkg_resources tries to load parser.py in __init__.py | |
# only to not do anything with the package afterwards | |
# and for Jython too | |
from ruamel.yaml.error import MarkedYAMLError | |
from ruamel.yaml.tokens import * # NOQA | |
from ruamel.yaml.events import * # NOQA | |
from ruamel.yaml.scanner import Scanner, RoundTripScanner, ScannerError # NOQA | |
from ruamel.yaml.scanner import BlankLineComment | |
from ruamel.yaml.comments import C_PRE, C_POST, C_SPLIT_ON_FIRST_BLANK | |
from ruamel.yaml.compat import _F, nprint, nprintf # NOQA | |
if False: # MYPY | |
from typing import Any, Dict, Optional, List, Optional # NOQA | |
__all__ = ['Parser', 'RoundTripParser', 'ParserError'] | |
def xprintf(*args, **kw): | |
# type: (Any, Any) -> Any | |
return nprintf(*args, **kw) | |
pass | |
class ParserError(MarkedYAMLError): | |
pass | |
class Parser: | |
# Since writing a recursive-descendant parser is a straightforward task, we | |
# do not give many comments here. | |
DEFAULT_TAGS = {'!': '!', '!!': 'tag:yaml.org,2002:'} | |
def __init__(self, loader): | |
# type: (Any) -> None | |
self.loader = loader | |
if self.loader is not None and getattr(self.loader, '_parser', None) is None: | |
self.loader._parser = self | |
self.reset_parser() | |
def reset_parser(self): | |
# type: () -> None | |
# Reset the state attributes (to clear self-references) | |
self.current_event = self.last_event = None | |
self.tag_handles = {} # type: Dict[Any, Any] | |
self.states = [] # type: List[Any] | |
self.marks = [] # type: List[Any] | |
self.state = self.parse_stream_start # type: Any | |
def dispose(self): | |
# type: () -> None | |
self.reset_parser() | |
def scanner(self): | |
# type: () -> Any | |
if hasattr(self.loader, 'typ'): | |
return self.loader.scanner | |
return self.loader._scanner | |
def resolver(self): | |
# type: () -> Any | |
if hasattr(self.loader, 'typ'): | |
return self.loader.resolver | |
return self.loader._resolver | |
def check_event(self, *choices): | |
# type: (Any) -> bool | |
# Check the type of the next event. | |
if self.current_event is None: | |
if self.state: | |
self.current_event = self.state() | |
if self.current_event is not None: | |
if not choices: | |
return True | |
for choice in choices: | |
if isinstance(self.current_event, choice): | |
return True | |
return False | |
def peek_event(self): | |
# type: () -> Any | |
# Get the next event. | |
if self.current_event is None: | |
if self.state: | |
self.current_event = self.state() | |
return self.current_event | |
def get_event(self): | |
# type: () -> Any | |
# Get the next event and proceed further. | |
if self.current_event is None: | |
if self.state: | |
self.current_event = self.state() | |
# assert self.current_event is not None | |
# if self.current_event.end_mark.line != self.peek_event().start_mark.line: | |
xprintf('get_event', repr(self.current_event), self.peek_event().start_mark.line) | |
self.last_event = value = self.current_event | |
self.current_event = None | |
return value | |
# stream ::= STREAM-START implicit_document? explicit_document* | |
# STREAM-END | |
# implicit_document ::= block_node DOCUMENT-END* | |
# explicit_document ::= DIRECTIVE* DOCUMENT-START block_node? DOCUMENT-END* | |
def parse_stream_start(self): | |
# type: () -> Any | |
# Parse the stream start. | |
token = self.scanner.get_token() | |
self.move_token_comment(token) | |
event = StreamStartEvent(token.start_mark, token.end_mark, encoding=token.encoding) | |
# Prepare the next state. | |
self.state = self.parse_implicit_document_start | |
return event | |
def parse_implicit_document_start(self): | |
# type: () -> Any | |
# Parse an implicit document. | |
if not self.scanner.check_token(DirectiveToken, DocumentStartToken, StreamEndToken): | |
self.tag_handles = self.DEFAULT_TAGS | |
token = self.scanner.peek_token() | |
start_mark = end_mark = token.start_mark | |
event = DocumentStartEvent(start_mark, end_mark, explicit=False) | |
# Prepare the next state. | |
self.states.append(self.parse_document_end) | |
self.state = self.parse_block_node | |
return event | |
else: | |
return self.parse_document_start() | |
def parse_document_start(self): | |
# type: () -> Any | |
# Parse any extra document end indicators. | |
while self.scanner.check_token(DocumentEndToken): | |
self.scanner.get_token() | |
# Parse an explicit document. | |
if not self.scanner.check_token(StreamEndToken): | |
version, tags = self.process_directives() | |
if not self.scanner.check_token(DocumentStartToken): | |
raise ParserError( | |
None, | |
None, | |
_F( | |
"expected '<document start>', but found {pt!r}", | |
pt=self.scanner.peek_token().id, | |
), | |
self.scanner.peek_token().start_mark, | |
) | |
token = self.scanner.get_token() | |
start_mark = token.start_mark | |
end_mark = token.end_mark | |
# if self.loader is not None and \ | |
# end_mark.line != self.scanner.peek_token().start_mark.line: | |
# self.loader.scalar_after_indicator = False | |
event = DocumentStartEvent( | |
start_mark, end_mark, explicit=True, version=version, tags=tags, | |
comment=token.comment | |
) # type: Any | |
self.states.append(self.parse_document_end) | |
self.state = self.parse_document_content | |
else: | |
# Parse the end of the stream. | |
token = self.scanner.get_token() | |
event = StreamEndEvent(token.start_mark, token.end_mark, comment=token.comment) | |
assert not self.states | |
assert not self.marks | |
self.state = None | |
return event | |
def parse_document_end(self): | |
# type: () -> Any | |
# Parse the document end. | |
token = self.scanner.peek_token() | |
start_mark = end_mark = token.start_mark | |
explicit = False | |
if self.scanner.check_token(DocumentEndToken): | |
token = self.scanner.get_token() | |
end_mark = token.end_mark | |
explicit = True | |
event = DocumentEndEvent(start_mark, end_mark, explicit=explicit) | |
# Prepare the next state. | |
if self.resolver.processing_version == (1, 1): | |
self.state = self.parse_document_start | |
else: | |
self.state = self.parse_implicit_document_start | |
return event | |
def parse_document_content(self): | |
# type: () -> Any | |
if self.scanner.check_token( | |
DirectiveToken, DocumentStartToken, DocumentEndToken, StreamEndToken | |
): | |
event = self.process_empty_scalar(self.scanner.peek_token().start_mark) | |
self.state = self.states.pop() | |
return event | |
else: | |
return self.parse_block_node() | |
def process_directives(self): | |
# type: () -> Any | |
yaml_version = None | |
self.tag_handles = {} | |
while self.scanner.check_token(DirectiveToken): | |
token = self.scanner.get_token() | |
if token.name == 'YAML': | |
if yaml_version is not None: | |
raise ParserError( | |
None, None, 'found duplicate YAML directive', token.start_mark | |
) | |
major, minor = token.value | |
if major != 1: | |
raise ParserError( | |
None, | |
None, | |
'found incompatible YAML document (version 1.* is required)', | |
token.start_mark, | |
) | |
yaml_version = token.value | |
elif token.name == 'TAG': | |
handle, prefix = token.value | |
if handle in self.tag_handles: | |
raise ParserError( | |
None, | |
None, | |
_F('duplicate tag handle {handle!r}', handle=handle), | |
token.start_mark, | |
) | |
self.tag_handles[handle] = prefix | |
if bool(self.tag_handles): | |
value = yaml_version, self.tag_handles.copy() # type: Any | |
else: | |
value = yaml_version, None | |
if self.loader is not None and hasattr(self.loader, 'tags'): | |
self.loader.version = yaml_version | |
if self.loader.tags is None: | |
self.loader.tags = {} | |
for k in self.tag_handles: | |
self.loader.tags[k] = self.tag_handles[k] | |
for key in self.DEFAULT_TAGS: | |
if key not in self.tag_handles: | |
self.tag_handles[key] = self.DEFAULT_TAGS[key] | |
return value | |
# block_node_or_indentless_sequence ::= ALIAS | |
# | properties (block_content | indentless_block_sequence)? | |
# | block_content | |
# | indentless_block_sequence | |
# block_node ::= ALIAS | |
# | properties block_content? | |
# | block_content | |
# flow_node ::= ALIAS | |
# | properties flow_content? | |
# | flow_content | |
# properties ::= TAG ANCHOR? | ANCHOR TAG? | |
# block_content ::= block_collection | flow_collection | SCALAR | |
# flow_content ::= flow_collection | SCALAR | |
# block_collection ::= block_sequence | block_mapping | |
# flow_collection ::= flow_sequence | flow_mapping | |
def parse_block_node(self): | |
# type: () -> Any | |
return self.parse_node(block=True) | |
def parse_flow_node(self): | |
# type: () -> Any | |
return self.parse_node() | |
def parse_block_node_or_indentless_sequence(self): | |
# type: () -> Any | |
return self.parse_node(block=True, indentless_sequence=True) | |
def transform_tag(self, handle, suffix): | |
# type: (Any, Any) -> Any | |
return self.tag_handles[handle] + suffix | |
def parse_node(self, block=False, indentless_sequence=False): | |
# type: (bool, bool) -> Any | |
if self.scanner.check_token(AliasToken): | |
token = self.scanner.get_token() | |
event = AliasEvent(token.value, token.start_mark, token.end_mark) # type: Any | |
self.state = self.states.pop() | |
return event | |
anchor = None | |
tag = None | |
start_mark = end_mark = tag_mark = None | |
if self.scanner.check_token(AnchorToken): | |
token = self.scanner.get_token() | |
self.move_token_comment(token) | |
start_mark = token.start_mark | |
end_mark = token.end_mark | |
anchor = token.value | |
if self.scanner.check_token(TagToken): | |
token = self.scanner.get_token() | |
tag_mark = token.start_mark | |
end_mark = token.end_mark | |
tag = token.value | |
elif self.scanner.check_token(TagToken): | |
token = self.scanner.get_token() | |
start_mark = tag_mark = token.start_mark | |
end_mark = token.end_mark | |
tag = token.value | |
if self.scanner.check_token(AnchorToken): | |
token = self.scanner.get_token() | |
start_mark = tag_mark = token.start_mark | |
end_mark = token.end_mark | |
anchor = token.value | |
if tag is not None: | |
handle, suffix = tag | |
if handle is not None: | |
if handle not in self.tag_handles: | |
raise ParserError( | |
'while parsing a node', | |
start_mark, | |
_F('found undefined tag handle {handle!r}', handle=handle), | |
tag_mark, | |
) | |
tag = self.transform_tag(handle, suffix) | |
else: | |
tag = suffix | |
# if tag == '!': | |
# raise ParserError("while parsing a node", start_mark, | |
# "found non-specific tag '!'", tag_mark, | |
# "Please check 'http://pyyaml.org/wiki/YAMLNonSpecificTag' | |
# and share your opinion.") | |
if start_mark is None: | |
start_mark = end_mark = self.scanner.peek_token().start_mark | |
event = None | |
implicit = tag is None or tag == '!' | |
if indentless_sequence and self.scanner.check_token(BlockEntryToken): | |
comment = None | |
pt = self.scanner.peek_token() | |
if self.loader and self.loader.comment_handling is None: | |
if pt.comment and pt.comment[0]: | |
comment = [pt.comment[0], []] | |
pt.comment[0] = None | |
elif self.loader: | |
if pt.comment: | |
comment = pt.comment | |
end_mark = self.scanner.peek_token().end_mark | |
event = SequenceStartEvent( | |
anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment | |
) | |
self.state = self.parse_indentless_sequence_entry | |
return event | |
if self.scanner.check_token(ScalarToken): | |
token = self.scanner.get_token() | |
# self.scanner.peek_token_same_line_comment(token) | |
end_mark = token.end_mark | |
if (token.plain and tag is None) or tag == '!': | |
implicit = (True, False) | |
elif tag is None: | |
implicit = (False, True) | |
else: | |
implicit = (False, False) | |
# nprint('se', token.value, token.comment) | |
event = ScalarEvent( | |
anchor, | |
tag, | |
implicit, | |
token.value, | |
start_mark, | |
end_mark, | |
style=token.style, | |
comment=token.comment, | |
) | |
self.state = self.states.pop() | |
elif self.scanner.check_token(FlowSequenceStartToken): | |
pt = self.scanner.peek_token() | |
end_mark = pt.end_mark | |
event = SequenceStartEvent( | |
anchor, | |
tag, | |
implicit, | |
start_mark, | |
end_mark, | |
flow_style=True, | |
comment=pt.comment, | |
) | |
self.state = self.parse_flow_sequence_first_entry | |
elif self.scanner.check_token(FlowMappingStartToken): | |
pt = self.scanner.peek_token() | |
end_mark = pt.end_mark | |
event = MappingStartEvent( | |
anchor, | |
tag, | |
implicit, | |
start_mark, | |
end_mark, | |
flow_style=True, | |
comment=pt.comment, | |
) | |
self.state = self.parse_flow_mapping_first_key | |
elif block and self.scanner.check_token(BlockSequenceStartToken): | |
end_mark = self.scanner.peek_token().start_mark | |
# should inserting the comment be dependent on the | |
# indentation? | |
pt = self.scanner.peek_token() | |
comment = pt.comment | |
# nprint('pt0', type(pt)) | |
if comment is None or comment[1] is None: | |
comment = pt.split_old_comment() | |
# nprint('pt1', comment) | |
event = SequenceStartEvent( | |
anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment | |
) | |
self.state = self.parse_block_sequence_first_entry | |
elif block and self.scanner.check_token(BlockMappingStartToken): | |
end_mark = self.scanner.peek_token().start_mark | |
comment = self.scanner.peek_token().comment | |
event = MappingStartEvent( | |
anchor, tag, implicit, start_mark, end_mark, flow_style=False, comment=comment | |
) | |
self.state = self.parse_block_mapping_first_key | |
elif anchor is not None or tag is not None: | |
# Empty scalars are allowed even if a tag or an anchor is | |
# specified. | |
event = ScalarEvent(anchor, tag, (implicit, False), "", start_mark, end_mark) | |
self.state = self.states.pop() | |
else: | |
if block: | |
node = 'block' | |
else: | |
node = 'flow' | |
token = self.scanner.peek_token() | |
raise ParserError( | |
_F('while parsing a {node!s} node', node=node), | |
start_mark, | |
_F('expected the node content, but found {token_id!r}', token_id=token.id), | |
token.start_mark, | |
) | |
return event | |
# block_sequence ::= BLOCK-SEQUENCE-START (BLOCK-ENTRY block_node?)* | |
# BLOCK-END | |
def parse_block_sequence_first_entry(self): | |
# type: () -> Any | |
token = self.scanner.get_token() | |
# move any comment from start token | |
# self.move_token_comment(token) | |
self.marks.append(token.start_mark) | |
return self.parse_block_sequence_entry() | |
def parse_block_sequence_entry(self): | |
# type: () -> Any | |
if self.scanner.check_token(BlockEntryToken): | |
token = self.scanner.get_token() | |
self.move_token_comment(token) | |
if not self.scanner.check_token(BlockEntryToken, BlockEndToken): | |
self.states.append(self.parse_block_sequence_entry) | |
return self.parse_block_node() | |
else: | |
self.state = self.parse_block_sequence_entry | |
return self.process_empty_scalar(token.end_mark) | |
if not self.scanner.check_token(BlockEndToken): | |
token = self.scanner.peek_token() | |
raise ParserError( | |
'while parsing a block collection', | |
self.marks[-1], | |
_F('expected <block end>, but found {token_id!r}', token_id=token.id), | |
token.start_mark, | |
) | |
token = self.scanner.get_token() # BlockEndToken | |
event = SequenceEndEvent(token.start_mark, token.end_mark, comment=token.comment) | |
self.state = self.states.pop() | |
self.marks.pop() | |
return event | |
# indentless_sequence ::= (BLOCK-ENTRY block_node?)+ | |
# indentless_sequence? | |
# sequence: | |
# - entry | |
# - nested | |
def parse_indentless_sequence_entry(self): | |
# type: () -> Any | |
if self.scanner.check_token(BlockEntryToken): | |
token = self.scanner.get_token() | |
self.move_token_comment(token) | |
if not self.scanner.check_token( | |
BlockEntryToken, KeyToken, ValueToken, BlockEndToken | |
): | |
self.states.append(self.parse_indentless_sequence_entry) | |
return self.parse_block_node() | |
else: | |
self.state = self.parse_indentless_sequence_entry | |
return self.process_empty_scalar(token.end_mark) | |
token = self.scanner.peek_token() | |
c = None | |
if self.loader and self.loader.comment_handling is None: | |
c = token.comment | |
start_mark = token.start_mark | |
else: | |
start_mark = self.last_event.end_mark # type: ignore | |
c = self.distribute_comment(token.comment, start_mark.line) # type: ignore | |
event = SequenceEndEvent(start_mark, start_mark, comment=c) | |
self.state = self.states.pop() | |
return event | |
# block_mapping ::= BLOCK-MAPPING_START | |
# ((KEY block_node_or_indentless_sequence?)? | |
# (VALUE block_node_or_indentless_sequence?)?)* | |
# BLOCK-END | |
def parse_block_mapping_first_key(self): | |
# type: () -> Any | |
token = self.scanner.get_token() | |
self.marks.append(token.start_mark) | |
return self.parse_block_mapping_key() | |
def parse_block_mapping_key(self): | |
# type: () -> Any | |
if self.scanner.check_token(KeyToken): | |
token = self.scanner.get_token() | |
self.move_token_comment(token) | |
if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken): | |
self.states.append(self.parse_block_mapping_value) | |
return self.parse_block_node_or_indentless_sequence() | |
else: | |
self.state = self.parse_block_mapping_value | |
return self.process_empty_scalar(token.end_mark) | |
if self.resolver.processing_version > (1, 1) and self.scanner.check_token(ValueToken): | |
self.state = self.parse_block_mapping_value | |
return self.process_empty_scalar(self.scanner.peek_token().start_mark) | |
if not self.scanner.check_token(BlockEndToken): | |
token = self.scanner.peek_token() | |
raise ParserError( | |
'while parsing a block mapping', | |
self.marks[-1], | |
_F('expected <block end>, but found {token_id!r}', token_id=token.id), | |
token.start_mark, | |
) | |
token = self.scanner.get_token() | |
self.move_token_comment(token) | |
event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) | |
self.state = self.states.pop() | |
self.marks.pop() | |
return event | |
def parse_block_mapping_value(self): | |
# type: () -> Any | |
if self.scanner.check_token(ValueToken): | |
token = self.scanner.get_token() | |
# value token might have post comment move it to e.g. block | |
if self.scanner.check_token(ValueToken): | |
self.move_token_comment(token) | |
else: | |
if not self.scanner.check_token(KeyToken): | |
self.move_token_comment(token, empty=True) | |
# else: empty value for this key cannot move token.comment | |
if not self.scanner.check_token(KeyToken, ValueToken, BlockEndToken): | |
self.states.append(self.parse_block_mapping_key) | |
return self.parse_block_node_or_indentless_sequence() | |
else: | |
self.state = self.parse_block_mapping_key | |
comment = token.comment | |
if comment is None: | |
token = self.scanner.peek_token() | |
comment = token.comment | |
if comment: | |
token._comment = [None, comment[1]] | |
comment = [comment[0], None] | |
return self.process_empty_scalar(token.end_mark, comment=comment) | |
else: | |
self.state = self.parse_block_mapping_key | |
token = self.scanner.peek_token() | |
return self.process_empty_scalar(token.start_mark) | |
# flow_sequence ::= FLOW-SEQUENCE-START | |
# (flow_sequence_entry FLOW-ENTRY)* | |
# flow_sequence_entry? | |
# FLOW-SEQUENCE-END | |
# flow_sequence_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? | |
# | |
# Note that while production rules for both flow_sequence_entry and | |
# flow_mapping_entry are equal, their interpretations are different. | |
# For `flow_sequence_entry`, the part `KEY flow_node? (VALUE flow_node?)?` | |
# generate an inline mapping (set syntax). | |
def parse_flow_sequence_first_entry(self): | |
# type: () -> Any | |
token = self.scanner.get_token() | |
self.marks.append(token.start_mark) | |
return self.parse_flow_sequence_entry(first=True) | |
def parse_flow_sequence_entry(self, first=False): | |
# type: (bool) -> Any | |
if not self.scanner.check_token(FlowSequenceEndToken): | |
if not first: | |
if self.scanner.check_token(FlowEntryToken): | |
self.scanner.get_token() | |
else: | |
token = self.scanner.peek_token() | |
raise ParserError( | |
'while parsing a flow sequence', | |
self.marks[-1], | |
_F("expected ',' or ']', but got {token_id!r}", token_id=token.id), | |
token.start_mark, | |
) | |
if self.scanner.check_token(KeyToken): | |
token = self.scanner.peek_token() | |
event = MappingStartEvent( | |
None, None, True, token.start_mark, token.end_mark, flow_style=True | |
) # type: Any | |
self.state = self.parse_flow_sequence_entry_mapping_key | |
return event | |
elif not self.scanner.check_token(FlowSequenceEndToken): | |
self.states.append(self.parse_flow_sequence_entry) | |
return self.parse_flow_node() | |
token = self.scanner.get_token() | |
event = SequenceEndEvent(token.start_mark, token.end_mark, comment=token.comment) | |
self.state = self.states.pop() | |
self.marks.pop() | |
return event | |
def parse_flow_sequence_entry_mapping_key(self): | |
# type: () -> Any | |
token = self.scanner.get_token() | |
if not self.scanner.check_token(ValueToken, FlowEntryToken, FlowSequenceEndToken): | |
self.states.append(self.parse_flow_sequence_entry_mapping_value) | |
return self.parse_flow_node() | |
else: | |
self.state = self.parse_flow_sequence_entry_mapping_value | |
return self.process_empty_scalar(token.end_mark) | |
def parse_flow_sequence_entry_mapping_value(self): | |
# type: () -> Any | |
if self.scanner.check_token(ValueToken): | |
token = self.scanner.get_token() | |
if not self.scanner.check_token(FlowEntryToken, FlowSequenceEndToken): | |
self.states.append(self.parse_flow_sequence_entry_mapping_end) | |
return self.parse_flow_node() | |
else: | |
self.state = self.parse_flow_sequence_entry_mapping_end | |
return self.process_empty_scalar(token.end_mark) | |
else: | |
self.state = self.parse_flow_sequence_entry_mapping_end | |
token = self.scanner.peek_token() | |
return self.process_empty_scalar(token.start_mark) | |
def parse_flow_sequence_entry_mapping_end(self): | |
# type: () -> Any | |
self.state = self.parse_flow_sequence_entry | |
token = self.scanner.peek_token() | |
return MappingEndEvent(token.start_mark, token.start_mark) | |
# flow_mapping ::= FLOW-MAPPING-START | |
# (flow_mapping_entry FLOW-ENTRY)* | |
# flow_mapping_entry? | |
# FLOW-MAPPING-END | |
# flow_mapping_entry ::= flow_node | KEY flow_node? (VALUE flow_node?)? | |
def parse_flow_mapping_first_key(self): | |
# type: () -> Any | |
token = self.scanner.get_token() | |
self.marks.append(token.start_mark) | |
return self.parse_flow_mapping_key(first=True) | |
def parse_flow_mapping_key(self, first=False): | |
# type: (Any) -> Any | |
if not self.scanner.check_token(FlowMappingEndToken): | |
if not first: | |
if self.scanner.check_token(FlowEntryToken): | |
self.scanner.get_token() | |
else: | |
token = self.scanner.peek_token() | |
raise ParserError( | |
'while parsing a flow mapping', | |
self.marks[-1], | |
_F("expected ',' or '}}', but got {token_id!r}", token_id=token.id), | |
token.start_mark, | |
) | |
if self.scanner.check_token(KeyToken): | |
token = self.scanner.get_token() | |
if not self.scanner.check_token( | |
ValueToken, FlowEntryToken, FlowMappingEndToken | |
): | |
self.states.append(self.parse_flow_mapping_value) | |
return self.parse_flow_node() | |
else: | |
self.state = self.parse_flow_mapping_value | |
return self.process_empty_scalar(token.end_mark) | |
elif self.resolver.processing_version > (1, 1) and self.scanner.check_token( | |
ValueToken | |
): | |
self.state = self.parse_flow_mapping_value | |
return self.process_empty_scalar(self.scanner.peek_token().end_mark) | |
elif not self.scanner.check_token(FlowMappingEndToken): | |
self.states.append(self.parse_flow_mapping_empty_value) | |
return self.parse_flow_node() | |
token = self.scanner.get_token() | |
event = MappingEndEvent(token.start_mark, token.end_mark, comment=token.comment) | |
self.state = self.states.pop() | |
self.marks.pop() | |
return event | |
def parse_flow_mapping_value(self): | |
# type: () -> Any | |
if self.scanner.check_token(ValueToken): | |
token = self.scanner.get_token() | |
if not self.scanner.check_token(FlowEntryToken, FlowMappingEndToken): | |
self.states.append(self.parse_flow_mapping_key) | |
return self.parse_flow_node() | |
else: | |
self.state = self.parse_flow_mapping_key | |
return self.process_empty_scalar(token.end_mark) | |
else: | |
self.state = self.parse_flow_mapping_key | |
token = self.scanner.peek_token() | |
return self.process_empty_scalar(token.start_mark) | |
def parse_flow_mapping_empty_value(self): | |
# type: () -> Any | |
self.state = self.parse_flow_mapping_key | |
return self.process_empty_scalar(self.scanner.peek_token().start_mark) | |
def process_empty_scalar(self, mark, comment=None): | |
# type: (Any, Any) -> Any | |
return ScalarEvent(None, None, (True, False), "", mark, mark, comment=comment) | |
def move_token_comment(self, token, nt=None, empty=False): | |
# type: (Any, Optional[Any], Optional[bool]) -> Any | |
pass | |
class RoundTripParser(Parser): | |
"""roundtrip is a safe loader, that wants to see the unmangled tag""" | |
def transform_tag(self, handle, suffix): | |
# type: (Any, Any) -> Any | |
# return self.tag_handles[handle]+suffix | |
if handle == '!!' and suffix in ( | |
'null', | |
'bool', | |
'int', | |
'float', | |
'binary', | |
'timestamp', | |
'omap', | |
'pairs', | |
'set', | |
'str', | |
'seq', | |
'map', | |
): | |
return Parser.transform_tag(self, handle, suffix) | |
return handle + suffix | |
def move_token_comment(self, token, nt=None, empty=False): | |
# type: (Any, Optional[Any], Optional[bool]) -> Any | |
token.move_old_comment(self.scanner.peek_token() if nt is None else nt, empty=empty) | |
class RoundTripParserSC(RoundTripParser): | |
"""roundtrip is a safe loader, that wants to see the unmangled tag""" | |
# some of the differences are based on the superclass testing | |
# if self.loader.comment_handling is not None | |
def move_token_comment(self, token, nt=None, empty=False): | |
# type: (Any, Any, Any, Optional[bool]) -> None | |
token.move_new_comment(self.scanner.peek_token() if nt is None else nt, empty=empty) | |
def distribute_comment(self, comment, line): | |
# type: (Any, Any) -> Any | |
# ToDo, look at indentation of the comment to determine attachment | |
if comment is None: | |
return None | |
if not comment[0]: | |
return None | |
if comment[0][0] != line + 1: | |
nprintf('>>>dcxxx', comment, line) | |
assert comment[0][0] == line + 1 | |
# if comment[0] - line > 1: | |
# return | |
typ = self.loader.comment_handling & 0b11 | |
# nprintf('>>>dca', comment, line, typ) | |
if typ == C_POST: | |
return None | |
if typ == C_PRE: | |
c = [None, None, comment[0]] | |
comment[0] = None | |
return c | |
# nprintf('>>>dcb', comment[0]) | |
for _idx, cmntidx in enumerate(comment[0]): | |
# nprintf('>>>dcb', cmntidx) | |
if isinstance(self.scanner.comments[cmntidx], BlankLineComment): | |
break | |
else: | |
return None # no space found | |
if _idx == 0: | |
return None # first line was blank | |
# nprintf('>>>dcc', idx) | |
if typ == C_SPLIT_ON_FIRST_BLANK: | |
c = [None, None, comment[0][:_idx]] | |
comment[0] = comment[0][_idx:] | |
return c | |
raise NotImplementedError # reserved | |