Spaces:
Sleeping
Sleeping
| import logging | |
| import re | |
| from io import BytesIO | |
| from typing import Dict, List, Mapping, Optional, Sequence, Tuple, Union, cast | |
| import numpy as np | |
| from pdf2zh import settings | |
| from pdf2zh.casting import safe_float | |
| from pdf2zh.cmapdb import CMap, CMapBase, CMapDB | |
| from pdf2zh.pdfcolor import PREDEFINED_COLORSPACE, PDFColorSpace | |
| from pdf2zh.pdfdevice import PDFDevice, PDFTextSeq | |
| from pdf2zh.pdfexceptions import PDFException, PDFValueError | |
| from pdf2zh.pdffont import ( | |
| PDFCIDFont, | |
| PDFFont, | |
| PDFFontError, | |
| PDFTrueTypeFont, | |
| PDFType1Font, | |
| PDFType3Font, | |
| ) | |
| from pdf2zh.pdfpage import PDFPage | |
| from pdf2zh.pdftypes import ( | |
| LITERALS_ASCII85_DECODE, | |
| PDFObjRef, | |
| PDFStream, | |
| dict_value, | |
| list_value, | |
| resolve1, | |
| stream_value, | |
| ) | |
| from pdf2zh.psexceptions import PSEOF, PSTypeError | |
| from pdf2zh.psparser import ( | |
| KWD, | |
| LIT, | |
| PSKeyword, | |
| PSLiteral, | |
| PSStackParser, | |
| PSStackType, | |
| keyword_name, | |
| literal_name, | |
| ) | |
| from pdf2zh.utils import ( | |
| MATRIX_IDENTITY, | |
| Matrix, | |
| PathSegment, | |
| Point, | |
| Rect, | |
| choplist, | |
| mult_matrix, | |
| apply_matrix_pt, | |
| ) | |
| log = logging.getLogger(__name__) | |
| class PDFResourceError(PDFException): | |
| pass | |
| class PDFInterpreterError(PDFException): | |
| pass | |
| LITERAL_PDF = LIT("PDF") | |
| LITERAL_TEXT = LIT("Text") | |
| LITERAL_FONT = LIT("Font") | |
| LITERAL_FORM = LIT("Form") | |
| LITERAL_IMAGE = LIT("Image") | |
| class PDFTextState: | |
| matrix: Matrix | |
| linematrix: Point | |
| def __init__(self) -> None: | |
| self.font: Optional[PDFFont] = None | |
| self.fontsize: float = 0 | |
| self.charspace: float = 0 | |
| self.wordspace: float = 0 | |
| self.scaling: float = 100 | |
| self.leading: float = 0 | |
| self.render: int = 0 | |
| self.rise: float = 0 | |
| self.reset() | |
| # self.matrix is set | |
| # self.linematrix is set | |
| def __repr__(self) -> str: | |
| return ( | |
| "<PDFTextState: font=%r, fontsize=%r, charspace=%r, " | |
| "wordspace=%r, scaling=%r, leading=%r, render=%r, rise=%r, " | |
| "matrix=%r, linematrix=%r>" | |
| % ( | |
| self.font, | |
| self.fontsize, | |
| self.charspace, | |
| self.wordspace, | |
| self.scaling, | |
| self.leading, | |
| self.render, | |
| self.rise, | |
| self.matrix, | |
| self.linematrix, | |
| ) | |
| ) | |
| def copy(self) -> "PDFTextState": | |
| obj = PDFTextState() | |
| obj.font = self.font | |
| obj.fontsize = self.fontsize | |
| obj.charspace = self.charspace | |
| obj.wordspace = self.wordspace | |
| obj.scaling = self.scaling | |
| obj.leading = self.leading | |
| obj.render = self.render | |
| obj.rise = self.rise | |
| obj.matrix = self.matrix | |
| obj.linematrix = self.linematrix | |
| return obj | |
| def reset(self) -> None: | |
| self.matrix = MATRIX_IDENTITY | |
| self.linematrix = (0, 0) | |
| Color = Union[ | |
| float, # Greyscale | |
| Tuple[float, float, float], # R, G, B | |
| Tuple[float, float, float, float], # C, M, Y, K | |
| ] | |
| class PDFGraphicState: | |
| def __init__(self) -> None: | |
| self.linewidth: float = 0 | |
| self.linecap: Optional[object] = None | |
| self.linejoin: Optional[object] = None | |
| self.miterlimit: Optional[object] = None | |
| self.dash: Optional[Tuple[object, object]] = None | |
| self.intent: Optional[object] = None | |
| self.flatness: Optional[object] = None | |
| # stroking color | |
| self.scolor: Optional[Color] = None | |
| # non stroking color | |
| self.ncolor: Optional[Color] = None | |
| def copy(self) -> "PDFGraphicState": | |
| obj = PDFGraphicState() | |
| obj.linewidth = self.linewidth | |
| obj.linecap = self.linecap | |
| obj.linejoin = self.linejoin | |
| obj.miterlimit = self.miterlimit | |
| obj.dash = self.dash | |
| obj.intent = self.intent | |
| obj.flatness = self.flatness | |
| obj.scolor = self.scolor | |
| obj.ncolor = self.ncolor | |
| return obj | |
| def __repr__(self) -> str: | |
| return ( | |
| "<PDFGraphicState: linewidth=%r, linecap=%r, linejoin=%r, " | |
| " miterlimit=%r, dash=%r, intent=%r, flatness=%r, " | |
| " stroking color=%r, non stroking color=%r>" | |
| % ( | |
| self.linewidth, | |
| self.linecap, | |
| self.linejoin, | |
| self.miterlimit, | |
| self.dash, | |
| self.intent, | |
| self.flatness, | |
| self.scolor, | |
| self.ncolor, | |
| ) | |
| ) | |
| class PDFResourceManager: | |
| """Repository of shared resources. | |
| ResourceManager facilitates reuse of shared resources | |
| such as fonts and images so that large objects are not | |
| allocated multiple times. | |
| """ | |
| def __init__(self, caching: bool = True) -> None: | |
| self.caching = caching | |
| self._cached_fonts: Dict[object, PDFFont] = {} | |
| def get_procset(self, procs: Sequence[object]) -> None: | |
| for proc in procs: | |
| if proc is LITERAL_PDF or proc is LITERAL_TEXT: | |
| pass | |
| else: | |
| pass | |
| def get_cmap(self, cmapname: str, strict: bool = False) -> CMapBase: | |
| try: | |
| return CMapDB.get_cmap(cmapname) | |
| except CMapDB.CMapNotFound: | |
| if strict: | |
| raise | |
| return CMap() | |
| def get_font(self, objid: object, spec: Mapping[str, object]) -> PDFFont: | |
| if objid and objid in self._cached_fonts: | |
| font = self._cached_fonts[objid] | |
| else: | |
| # log.debug("get_font: create: objid=%r, spec=%r", objid, spec) | |
| if settings.STRICT: | |
| if spec["Type"] is not LITERAL_FONT: | |
| raise PDFFontError("Type is not /Font") | |
| # Create a Font object. | |
| if "Subtype" in spec: | |
| subtype = literal_name(spec["Subtype"]) | |
| else: | |
| if settings.STRICT: | |
| raise PDFFontError("Font Subtype is not specified.") | |
| subtype = "Type1" | |
| if subtype in ("Type1", "MMType1"): | |
| # Type1 Font | |
| font = PDFType1Font(self, spec) | |
| elif subtype == "TrueType": | |
| # TrueType Font | |
| font = PDFTrueTypeFont(self, spec) | |
| elif subtype == "Type3": | |
| # Type3 Font | |
| font = PDFType3Font(self, spec) | |
| elif subtype in ("CIDFontType0", "CIDFontType2"): | |
| # CID Font | |
| font = PDFCIDFont(self, spec) | |
| elif subtype == "Type0": | |
| # Type0 Font | |
| dfonts = list_value(spec["DescendantFonts"]) | |
| assert dfonts | |
| subspec = dict_value(dfonts[0]).copy() | |
| for k in ("Encoding", "ToUnicode"): | |
| if k in spec: | |
| subspec[k] = resolve1(spec[k]) | |
| font = self.get_font(None, subspec) | |
| else: | |
| if settings.STRICT: | |
| raise PDFFontError("Invalid Font spec: %r" % spec) | |
| font = PDFType1Font(self, spec) # this is so wrong! | |
| if objid and self.caching: | |
| self._cached_fonts[objid] = font | |
| return font | |
| class PDFContentParser(PSStackParser[Union[PSKeyword, PDFStream]]): | |
| def __init__(self, streams: Sequence[object]) -> None: | |
| self.streams = streams | |
| self.istream = 0 | |
| # PSStackParser.__init__(fp=None) is safe only because we've overloaded | |
| # all the methods that would attempt to access self.fp without first | |
| # calling self.fillfp(). | |
| PSStackParser.__init__(self, None) # type: ignore[arg-type] | |
| def fillfp(self) -> None: | |
| if not self.fp: | |
| if self.istream < len(self.streams): | |
| strm = stream_value(self.streams[self.istream]) | |
| self.istream += 1 | |
| else: | |
| raise PSEOF("Unexpected EOF, file truncated?") | |
| self.fp = BytesIO(strm.get_data()) | |
| # if log.isEnabledFor(logging.DEBUG): | |
| # log.debug(f'STREAM DATA {strm.get_data()}') | |
| def seek(self, pos: int) -> None: | |
| self.fillfp() | |
| PSStackParser.seek(self, pos) | |
| def fillbuf(self) -> None: | |
| if self.charpos < len(self.buf): | |
| return | |
| while 1: | |
| self.fillfp() | |
| self.bufpos = self.fp.tell() | |
| self.buf = self.fp.read(self.BUFSIZ) | |
| if self.buf: | |
| break | |
| self.fp = None # type: ignore[assignment] | |
| self.charpos = 0 | |
| def get_inline_data(self, pos: int, target: bytes = b"EI") -> Tuple[int, bytes]: | |
| self.seek(pos) | |
| i = 0 | |
| data = b"" | |
| while i <= len(target): | |
| self.fillbuf() | |
| if i: | |
| ci = self.buf[self.charpos] | |
| c = bytes((ci,)) | |
| data += c | |
| self.charpos += 1 | |
| if ( | |
| len(target) <= i | |
| and c.isspace() | |
| or i < len(target) | |
| and c == (bytes((target[i],))) | |
| ): | |
| i += 1 | |
| else: | |
| i = 0 | |
| else: | |
| try: | |
| j = self.buf.index(target[0], self.charpos) | |
| data += self.buf[self.charpos : j + 1] | |
| self.charpos = j + 1 | |
| i = 1 | |
| except ValueError: | |
| data += self.buf[self.charpos :] | |
| self.charpos = len(self.buf) | |
| data = data[: -(len(target) + 1)] # strip the last part | |
| data = re.sub(rb"(\x0d\x0a|[\x0d\x0a])$", b"", data) | |
| return (pos, data) | |
| def flush(self) -> None: | |
| self.add_results(*self.popall()) | |
| KEYWORD_BI = KWD(b"BI") | |
| KEYWORD_ID = KWD(b"ID") | |
| KEYWORD_EI = KWD(b"EI") | |
| def do_keyword(self, pos: int, token: PSKeyword) -> None: | |
| if token is self.KEYWORD_BI: | |
| # inline image within a content stream | |
| self.start_type(pos, "inline") | |
| elif token is self.KEYWORD_ID: | |
| try: | |
| (_, objs) = self.end_type("inline") | |
| if len(objs) % 2 != 0: | |
| error_msg = f"Invalid dictionary construct: {objs!r}" | |
| raise PSTypeError(error_msg) | |
| d = {literal_name(k): resolve1(v) for (k, v) in choplist(2, objs)} | |
| eos = b"EI" | |
| filter = d.get("F", None) | |
| if filter is not None: | |
| if isinstance(filter, PSLiteral): | |
| filter = [filter] | |
| if filter[0] in LITERALS_ASCII85_DECODE: | |
| eos = b"~>" | |
| (pos, data) = self.get_inline_data(pos + len(b"ID "), target=eos) | |
| if eos != b"EI": # it may be necessary for decoding | |
| data += eos | |
| obj = PDFStream(d, data) | |
| self.push((pos, obj)) | |
| if eos == b"EI": # otherwise it is still in the stream | |
| self.push((pos, self.KEYWORD_EI)) | |
| except PSTypeError: | |
| if settings.STRICT: | |
| raise | |
| else: | |
| self.push((pos, token)) | |
| PDFStackT = PSStackType[PDFStream] | |
| """Types that may appear on the PDF argument stack.""" | |
| class PDFPageInterpreter: | |
| """Processor for the content of a PDF page | |
| Reference: PDF Reference, Appendix A, Operator Summary | |
| """ | |
| def __init__( | |
| self, rsrcmgr: PDFResourceManager, device: PDFDevice, obj_patch | |
| ) -> None: | |
| self.rsrcmgr = rsrcmgr | |
| self.device = device | |
| self.obj_patch = obj_patch | |
| def dup(self) -> "PDFPageInterpreter": | |
| return self.__class__(self.rsrcmgr, self.device, self.obj_patch) | |
| def init_resources(self, resources: Dict[object, object]) -> None: | |
| """Prepare the fonts and XObjects listed in the Resource attribute.""" | |
| self.resources = resources | |
| self.fontmap: Dict[object, PDFFont] = {} | |
| self.fontid: Dict[PDFFont, object] = {} | |
| self.xobjmap = {} | |
| self.csmap: Dict[str, PDFColorSpace] = PREDEFINED_COLORSPACE.copy() | |
| if not resources: | |
| return | |
| def get_colorspace(spec: object) -> Optional[PDFColorSpace]: | |
| if isinstance(spec, list): | |
| name = literal_name(spec[0]) | |
| else: | |
| name = literal_name(spec) | |
| if name == "ICCBased" and isinstance(spec, list) and len(spec) >= 2: | |
| return PDFColorSpace(name, stream_value(spec[1])["N"]) | |
| elif name == "DeviceN" and isinstance(spec, list) and len(spec) >= 2: | |
| return PDFColorSpace(name, len(list_value(spec[1]))) | |
| else: | |
| return PREDEFINED_COLORSPACE.get(name) | |
| for k, v in dict_value(resources).items(): | |
| # log.debug("Resource: %r: %r", k, v) | |
| if k == "Font": | |
| for fontid, spec in dict_value(v).items(): | |
| objid = None | |
| if isinstance(spec, PDFObjRef): | |
| objid = spec.objid | |
| spec = dict_value(spec) | |
| self.fontmap[fontid] = self.rsrcmgr.get_font(objid, spec) | |
| self.fontid[self.fontmap[fontid]] = fontid | |
| elif k == "ColorSpace": | |
| for csid, spec in dict_value(v).items(): | |
| colorspace = get_colorspace(resolve1(spec)) | |
| if colorspace is not None: | |
| self.csmap[csid] = colorspace | |
| elif k == "ProcSet": | |
| self.rsrcmgr.get_procset(list_value(v)) | |
| elif k == "XObject": | |
| for xobjid, xobjstrm in dict_value(v).items(): | |
| self.xobjmap[xobjid] = xobjstrm | |
| def init_state(self, ctm: Matrix) -> None: | |
| """Initialize the text and graphic states for rendering a page.""" | |
| # gstack: stack for graphical states. | |
| self.gstack: List[Tuple[Matrix, PDFTextState, PDFGraphicState]] = [] | |
| self.ctm = ctm | |
| self.device.set_ctm(self.ctm) | |
| self.textstate = PDFTextState() | |
| self.graphicstate = PDFGraphicState() | |
| self.curpath: List[PathSegment] = [] | |
| # argstack: stack for command arguments. | |
| self.argstack: List[PDFStackT] = [] | |
| # set some global states. | |
| self.scs: Optional[PDFColorSpace] = None | |
| self.ncs: Optional[PDFColorSpace] = None | |
| if self.csmap: | |
| self.scs = self.ncs = next(iter(self.csmap.values())) | |
| def push(self, obj: PDFStackT) -> None: | |
| self.argstack.append(obj) | |
| def pop(self, n: int) -> List[PDFStackT]: | |
| if n == 0: | |
| return [] | |
| x = self.argstack[-n:] | |
| self.argstack = self.argstack[:-n] | |
| return x | |
| def get_current_state(self) -> Tuple[Matrix, PDFTextState, PDFGraphicState]: | |
| return (self.ctm, self.textstate.copy(), self.graphicstate.copy()) | |
| def set_current_state( | |
| self, | |
| state: Tuple[Matrix, PDFTextState, PDFGraphicState], | |
| ) -> None: | |
| (self.ctm, self.textstate, self.graphicstate) = state | |
| self.device.set_ctm(self.ctm) | |
| def do_q(self) -> None: | |
| """Save graphics state""" | |
| self.gstack.append(self.get_current_state()) | |
| def do_Q(self) -> None: | |
| """Restore graphics state""" | |
| if self.gstack: | |
| self.set_current_state(self.gstack.pop()) | |
| def do_cm( | |
| self, | |
| a1: PDFStackT, | |
| b1: PDFStackT, | |
| c1: PDFStackT, | |
| d1: PDFStackT, | |
| e1: PDFStackT, | |
| f1: PDFStackT, | |
| ) -> None: | |
| """Concatenate matrix to current transformation matrix""" | |
| self.ctm = mult_matrix(cast(Matrix, (a1, b1, c1, d1, e1, f1)), self.ctm) | |
| self.device.set_ctm(self.ctm) | |
| def do_w(self, linewidth: PDFStackT) -> None: | |
| """Set line width""" | |
| self.graphicstate.linewidth = cast(float, linewidth) | |
| def do_J(self, linecap: PDFStackT) -> None: | |
| """Set line cap style""" | |
| self.graphicstate.linecap = linecap | |
| def do_j(self, linejoin: PDFStackT) -> None: | |
| """Set line join style""" | |
| self.graphicstate.linejoin = linejoin | |
| def do_M(self, miterlimit: PDFStackT) -> None: | |
| """Set miter limit""" | |
| self.graphicstate.miterlimit = miterlimit | |
| def do_d(self, dash: PDFStackT, phase: PDFStackT) -> None: | |
| """Set line dash pattern""" | |
| self.graphicstate.dash = (dash, phase) | |
| def do_ri(self, intent: PDFStackT) -> None: | |
| """Set color rendering intent""" | |
| self.graphicstate.intent = intent | |
| def do_i(self, flatness: PDFStackT) -> None: | |
| """Set flatness tolerance""" | |
| self.graphicstate.flatness = flatness | |
| def do_gs(self, name: PDFStackT) -> None: | |
| """Set parameters from graphics state parameter dictionary""" | |
| # TODO | |
| def do_m(self, x: PDFStackT, y: PDFStackT) -> None: | |
| """Begin new subpath""" | |
| self.curpath.append(("m", cast(float, x), cast(float, y))) | |
| def do_l(self, x: PDFStackT, y: PDFStackT) -> None: | |
| """Append straight line segment to path""" | |
| self.curpath.append(("l", cast(float, x), cast(float, y))) | |
| def do_c( | |
| self, | |
| x1: PDFStackT, | |
| y1: PDFStackT, | |
| x2: PDFStackT, | |
| y2: PDFStackT, | |
| x3: PDFStackT, | |
| y3: PDFStackT, | |
| ) -> None: | |
| """Append curved segment to path (three control points)""" | |
| self.curpath.append( | |
| ( | |
| "c", | |
| cast(float, x1), | |
| cast(float, y1), | |
| cast(float, x2), | |
| cast(float, y2), | |
| cast(float, x3), | |
| cast(float, y3), | |
| ), | |
| ) | |
| def do_v(self, x2: PDFStackT, y2: PDFStackT, x3: PDFStackT, y3: PDFStackT) -> None: | |
| """Append curved segment to path (initial point replicated)""" | |
| self.curpath.append( | |
| ("v", cast(float, x2), cast(float, y2), cast(float, x3), cast(float, y3)), | |
| ) | |
| def do_y(self, x1: PDFStackT, y1: PDFStackT, x3: PDFStackT, y3: PDFStackT) -> None: | |
| """Append curved segment to path (final point replicated)""" | |
| self.curpath.append( | |
| ("y", cast(float, x1), cast(float, y1), cast(float, x3), cast(float, y3)), | |
| ) | |
| def do_h(self) -> None: | |
| """Close subpath""" | |
| self.curpath.append(("h",)) | |
| def do_re(self, x: PDFStackT, y: PDFStackT, w: PDFStackT, h: PDFStackT) -> None: | |
| """Append rectangle to path""" | |
| x = cast(float, x) | |
| y = cast(float, y) | |
| w = cast(float, w) | |
| h = cast(float, h) | |
| self.curpath.append(("m", x, y)) | |
| self.curpath.append(("l", x + w, y)) | |
| self.curpath.append(("l", x + w, y + h)) | |
| self.curpath.append(("l", x, y + h)) | |
| self.curpath.append(("h",)) | |
| def do_S(self) -> None: | |
| """Stroke path""" | |
| def is_black(color: Color) -> bool: | |
| if isinstance(color, Tuple): | |
| return sum(color) == 0 | |
| else: | |
| return color == 0 | |
| if ( | |
| len(self.curpath) == 2 | |
| and self.curpath[0][0] == "m" | |
| and self.curpath[1][0] == "l" | |
| and apply_matrix_pt(self.ctm, self.curpath[0][-2:])[1] | |
| == apply_matrix_pt(self.ctm, self.curpath[1][-2:])[1] | |
| and is_black(self.graphicstate.scolor) | |
| ): # 独立直线,水平,黑色 | |
| # print(apply_matrix_pt(self.ctm,self.curpath[0][-2:]),apply_matrix_pt(self.ctm,self.curpath[1][-2:]),self.graphicstate.scolor) | |
| self.device.paint_path(self.graphicstate, True, False, False, self.curpath) | |
| self.curpath = [] | |
| return "n" | |
| else: | |
| self.curpath = [] | |
| def do_s(self) -> None: | |
| """Close and stroke path""" | |
| self.do_h() | |
| self.do_S() | |
| def do_f(self) -> None: | |
| """Fill path using nonzero winding number rule""" | |
| # self.device.paint_path(self.graphicstate, False, True, False, self.curpath) | |
| self.curpath = [] | |
| def do_F(self) -> None: | |
| """Fill path using nonzero winding number rule (obsolete)""" | |
| def do_f_a(self) -> None: | |
| """Fill path using even-odd rule""" | |
| # self.device.paint_path(self.graphicstate, False, True, True, self.curpath) | |
| self.curpath = [] | |
| def do_B(self) -> None: | |
| """Fill and stroke path using nonzero winding number rule""" | |
| # self.device.paint_path(self.graphicstate, True, True, False, self.curpath) | |
| self.curpath = [] | |
| def do_B_a(self) -> None: | |
| """Fill and stroke path using even-odd rule""" | |
| # self.device.paint_path(self.graphicstate, True, True, True, self.curpath) | |
| self.curpath = [] | |
| def do_b(self) -> None: | |
| """Close, fill, and stroke path using nonzero winding number rule""" | |
| self.do_h() | |
| self.do_B() | |
| def do_b_a(self) -> None: | |
| """Close, fill, and stroke path using even-odd rule""" | |
| self.do_h() | |
| self.do_B_a() | |
| def do_n(self) -> None: | |
| """End path without filling or stroking""" | |
| self.curpath = [] | |
| def do_W(self) -> None: | |
| """Set clipping path using nonzero winding number rule""" | |
| def do_W_a(self) -> None: | |
| """Set clipping path using even-odd rule""" | |
| def do_CS(self, name: PDFStackT) -> None: | |
| """Set color space for stroking operations | |
| Introduced in PDF 1.1 | |
| """ | |
| try: | |
| self.scs = self.csmap[literal_name(name)] | |
| except KeyError: | |
| if settings.STRICT: | |
| raise PDFInterpreterError("Undefined ColorSpace: %r" % name) | |
| def do_cs(self, name: PDFStackT) -> None: | |
| """Set color space for nonstroking operations""" | |
| try: | |
| self.ncs = self.csmap[literal_name(name)] | |
| except KeyError: | |
| if settings.STRICT: | |
| raise PDFInterpreterError("Undefined ColorSpace: %r" % name) | |
| def do_G(self, gray: PDFStackT) -> None: | |
| """Set gray level for stroking operations""" | |
| self.graphicstate.scolor = cast(float, gray) | |
| self.scs = self.csmap["DeviceGray"] | |
| def do_g(self, gray: PDFStackT) -> None: | |
| """Set gray level for nonstroking operations""" | |
| self.graphicstate.ncolor = cast(float, gray) | |
| self.ncs = self.csmap["DeviceGray"] | |
| def do_RG(self, r: PDFStackT, g: PDFStackT, b: PDFStackT) -> None: | |
| """Set RGB color for stroking operations""" | |
| self.graphicstate.scolor = (cast(float, r), cast(float, g), cast(float, b)) | |
| self.scs = self.csmap["DeviceRGB"] | |
| def do_rg(self, r: PDFStackT, g: PDFStackT, b: PDFStackT) -> None: | |
| """Set RGB color for nonstroking operations""" | |
| self.graphicstate.ncolor = (cast(float, r), cast(float, g), cast(float, b)) | |
| self.ncs = self.csmap["DeviceRGB"] | |
| def do_K(self, c: PDFStackT, m: PDFStackT, y: PDFStackT, k: PDFStackT) -> None: | |
| """Set CMYK color for stroking operations""" | |
| self.graphicstate.scolor = ( | |
| cast(float, c), | |
| cast(float, m), | |
| cast(float, y), | |
| cast(float, k), | |
| ) | |
| self.scs = self.csmap["DeviceCMYK"] | |
| def do_k(self, c: PDFStackT, m: PDFStackT, y: PDFStackT, k: PDFStackT) -> None: | |
| """Set CMYK color for nonstroking operations""" | |
| self.graphicstate.ncolor = ( | |
| cast(float, c), | |
| cast(float, m), | |
| cast(float, y), | |
| cast(float, k), | |
| ) | |
| self.ncs = self.csmap["DeviceCMYK"] | |
| def do_SCN(self) -> None: | |
| """Set color for stroking operations.""" | |
| if self.scs: | |
| n = self.scs.ncomponents | |
| else: | |
| if settings.STRICT: | |
| raise PDFInterpreterError("No colorspace specified!") | |
| n = 1 | |
| args = self.pop(n) | |
| self.graphicstate.scolor = cast(Color, args) | |
| return args | |
| def do_scn(self) -> None: | |
| """Set color for nonstroking operations""" | |
| if self.ncs: | |
| n = self.ncs.ncomponents | |
| else: | |
| if settings.STRICT: | |
| raise PDFInterpreterError("No colorspace specified!") | |
| n = 1 | |
| args = self.pop(n) | |
| self.graphicstate.ncolor = cast(Color, args) | |
| return args | |
| def do_SC(self) -> None: | |
| """Set color for stroking operations""" | |
| return self.do_SCN() | |
| def do_sc(self) -> None: | |
| """Set color for nonstroking operations""" | |
| return self.do_scn() | |
| def do_sh(self, name: object) -> None: | |
| """Paint area defined by shading pattern""" | |
| def do_BT(self) -> None: | |
| """Begin text object | |
| Initializing the text matrix, Tm, and the text line matrix, Tlm, to | |
| the identity matrix. Text objects cannot be nested; a second BT cannot | |
| appear before an ET. | |
| """ | |
| self.textstate.reset() | |
| def do_ET(self) -> None: | |
| """End a text object""" | |
| def do_BX(self) -> None: | |
| """Begin compatibility section""" | |
| def do_EX(self) -> None: | |
| """End compatibility section""" | |
| def do_MP(self, tag: PDFStackT) -> None: | |
| """Define marked-content point""" | |
| self.device.do_tag(cast(PSLiteral, tag)) | |
| def do_DP(self, tag: PDFStackT, props: PDFStackT) -> None: | |
| """Define marked-content point with property list""" | |
| self.device.do_tag(cast(PSLiteral, tag), props) | |
| def do_BMC(self, tag: PDFStackT) -> None: | |
| """Begin marked-content sequence""" | |
| self.device.begin_tag(cast(PSLiteral, tag)) | |
| def do_BDC(self, tag: PDFStackT, props: PDFStackT) -> None: | |
| """Begin marked-content sequence with property list""" | |
| self.device.begin_tag(cast(PSLiteral, tag), props) | |
| def do_EMC(self) -> None: | |
| """End marked-content sequence""" | |
| self.device.end_tag() | |
| def do_Tc(self, space: PDFStackT) -> None: | |
| """Set character spacing. | |
| Character spacing is used by the Tj, TJ, and ' operators. | |
| :param space: a number expressed in unscaled text space units. | |
| """ | |
| self.textstate.charspace = cast(float, space) | |
| def do_Tw(self, space: PDFStackT) -> None: | |
| """Set the word spacing. | |
| Word spacing is used by the Tj, TJ, and ' operators. | |
| :param space: a number expressed in unscaled text space units | |
| """ | |
| self.textstate.wordspace = cast(float, space) | |
| def do_Tz(self, scale: PDFStackT) -> None: | |
| """Set the horizontal scaling. | |
| :param scale: is a number specifying the percentage of the normal width | |
| """ | |
| self.textstate.scaling = cast(float, scale) | |
| def do_TL(self, leading: PDFStackT) -> None: | |
| """Set the text leading. | |
| Text leading is used only by the T*, ', and " operators. | |
| :param leading: a number expressed in unscaled text space units | |
| """ | |
| self.textstate.leading = -cast(float, leading) | |
| def do_Tf(self, fontid: PDFStackT, fontsize: PDFStackT) -> None: | |
| """Set the text font | |
| :param fontid: the name of a font resource in the Font subdictionary | |
| of the current resource dictionary | |
| :param fontsize: size is a number representing a scale factor. | |
| """ | |
| try: | |
| self.textstate.font = self.fontmap[literal_name(fontid)] | |
| except KeyError: | |
| if settings.STRICT: | |
| raise PDFInterpreterError("Undefined Font id: %r" % fontid) | |
| self.textstate.font = self.rsrcmgr.get_font(None, {}) | |
| self.textstate.fontsize = cast(float, fontsize) | |
| def do_Tr(self, render: PDFStackT) -> None: | |
| """Set the text rendering mode""" | |
| self.textstate.render = cast(int, render) | |
| def do_Ts(self, rise: PDFStackT) -> None: | |
| """Set the text rise | |
| :param rise: a number expressed in unscaled text space units | |
| """ | |
| self.textstate.rise = cast(float, rise) | |
| def do_Td(self, tx: PDFStackT, ty: PDFStackT) -> None: | |
| """Move to the start of the next line | |
| Offset from the start of the current line by (tx , ty). | |
| """ | |
| tx_ = safe_float(tx) | |
| ty_ = safe_float(ty) | |
| if tx_ is not None and ty_ is not None: | |
| (a, b, c, d, e, f) = self.textstate.matrix | |
| e_new = tx_ * a + ty_ * c + e | |
| f_new = tx_ * b + ty_ * d + f | |
| self.textstate.matrix = (a, b, c, d, e_new, f_new) | |
| elif settings.STRICT: | |
| raise PDFValueError(f"Invalid offset ({tx!r}, {ty!r}) for Td") | |
| self.textstate.linematrix = (0, 0) | |
| def do_TD(self, tx: PDFStackT, ty: PDFStackT) -> None: | |
| """Move to the start of the next line. | |
| offset from the start of the current line by (tx , ty). As a side effect, this | |
| operator sets the leading parameter in the text state. | |
| """ | |
| tx_ = safe_float(tx) | |
| ty_ = safe_float(ty) | |
| if tx_ is not None and ty_ is not None: | |
| (a, b, c, d, e, f) = self.textstate.matrix | |
| e_new = tx_ * a + ty_ * c + e | |
| f_new = tx_ * b + ty_ * d + f | |
| self.textstate.matrix = (a, b, c, d, e_new, f_new) | |
| elif settings.STRICT: | |
| raise PDFValueError("Invalid offset ({tx}, {ty}) for TD") | |
| if ty_ is not None: | |
| self.textstate.leading = ty_ | |
| self.textstate.linematrix = (0, 0) | |
| def do_Tm( | |
| self, | |
| a: PDFStackT, | |
| b: PDFStackT, | |
| c: PDFStackT, | |
| d: PDFStackT, | |
| e: PDFStackT, | |
| f: PDFStackT, | |
| ) -> None: | |
| """Set text matrix and text line matrix""" | |
| self.textstate.matrix = cast(Matrix, (a, b, c, d, e, f)) | |
| self.textstate.linematrix = (0, 0) | |
| def do_T_a(self) -> None: | |
| """Move to start of next text line""" | |
| (a, b, c, d, e, f) = self.textstate.matrix | |
| self.textstate.matrix = ( | |
| a, | |
| b, | |
| c, | |
| d, | |
| self.textstate.leading * c + e, | |
| self.textstate.leading * d + f, | |
| ) | |
| self.textstate.linematrix = (0, 0) | |
| def do_TJ(self, seq: PDFStackT) -> None: | |
| """Show text, allowing individual glyph positioning""" | |
| if self.textstate.font is None: | |
| if settings.STRICT: | |
| raise PDFInterpreterError("No font specified!") | |
| return | |
| assert self.ncs is not None | |
| self.device.render_string( | |
| self.textstate, | |
| cast(PDFTextSeq, seq), | |
| self.ncs, | |
| self.graphicstate.copy(), | |
| ) | |
| def do_Tj(self, s: PDFStackT) -> None: | |
| """Show text""" | |
| self.do_TJ([s]) | |
| def do__q(self, s: PDFStackT) -> None: | |
| """Move to next line and show text | |
| The ' (single quote) operator. | |
| """ | |
| self.do_T_a() | |
| self.do_TJ([s]) | |
| def do__w(self, aw: PDFStackT, ac: PDFStackT, s: PDFStackT) -> None: | |
| """Set word and character spacing, move to next line, and show text | |
| The " (double quote) operator. | |
| """ | |
| self.do_Tw(aw) | |
| self.do_Tc(ac) | |
| self.do_TJ([s]) | |
| def do_BI(self) -> None: | |
| """Begin inline image object""" | |
| def do_ID(self) -> None: | |
| """Begin inline image data""" | |
| def do_EI(self, obj: PDFStackT) -> None: | |
| """End inline image object""" | |
| if isinstance(obj, PDFStream) and "W" in obj and "H" in obj: | |
| iobjid = str(id(obj)) | |
| self.device.begin_figure(iobjid, (0, 0, 1, 1), MATRIX_IDENTITY) | |
| self.device.render_image(iobjid, obj) | |
| self.device.end_figure(iobjid) | |
| def do_Do(self, xobjid_arg: PDFStackT) -> None: | |
| """Invoke named XObject""" | |
| xobjid = literal_name(xobjid_arg) | |
| try: | |
| xobj = stream_value(self.xobjmap[xobjid]) | |
| except KeyError: | |
| if settings.STRICT: | |
| raise PDFInterpreterError("Undefined xobject id: %r" % xobjid) | |
| return | |
| # log.debug("Processing xobj: %r", xobj) | |
| subtype = xobj.get("Subtype") | |
| if subtype is LITERAL_FORM and "BBox" in xobj: | |
| interpreter = self.dup() | |
| bbox = cast(Rect, list_value(xobj["BBox"])) | |
| matrix = cast(Matrix, list_value(xobj.get("Matrix", MATRIX_IDENTITY))) | |
| # According to PDF reference 1.7 section 4.9.1, XObjects in | |
| # earlier PDFs (prior to v1.2) use the page's Resources entry | |
| # instead of having their own Resources entry. | |
| xobjres = xobj.get("Resources") | |
| if xobjres: | |
| resources = dict_value(xobjres) | |
| else: | |
| resources = self.resources.copy() | |
| self.device.begin_figure(xobjid, bbox, matrix) | |
| ctm = mult_matrix(matrix, self.ctm) | |
| ops_base = interpreter.render_contents( | |
| resources, | |
| [xobj], | |
| ctm=ctm, | |
| ) | |
| try: # 有的时候 form 字体加不上这里会烂掉 | |
| self.device.fontid = interpreter.fontid | |
| self.device.fontmap = interpreter.fontmap | |
| ops_new = self.device.end_figure(xobjid) | |
| ctm_inv = np.linalg.inv(np.array(ctm[:4]).reshape(2, 2)) | |
| pos_inv = -np.mat(ctm[4:]) * ctm_inv | |
| a, b, c, d = ctm_inv.reshape(4).tolist() | |
| e, f = pos_inv.tolist()[0] | |
| self.obj_patch[self.xobjmap[xobjid].objid] = ( | |
| f"q {ops_base}Q {a} {b} {c} {d} {e} {f} cm {ops_new}" | |
| ) | |
| except Exception: | |
| pass | |
| elif subtype is LITERAL_IMAGE and "Width" in xobj and "Height" in xobj: | |
| self.device.begin_figure(xobjid, (0, 0, 1, 1), MATRIX_IDENTITY) | |
| self.device.render_image(xobjid, xobj) | |
| self.device.end_figure(xobjid) | |
| else: | |
| # unsupported xobject type. | |
| pass | |
| def process_page(self, page: PDFPage) -> None: | |
| # log.debug("Processing page: %r", page) | |
| # print(page.mediabox,page.cropbox) | |
| # (x0, y0, x1, y1) = page.mediabox | |
| (x0, y0, x1, y1) = page.cropbox | |
| if page.rotate == 90: | |
| ctm = (0, -1, 1, 0, -y0, x1) | |
| elif page.rotate == 180: | |
| ctm = (-1, 0, 0, -1, x1, y1) | |
| elif page.rotate == 270: | |
| ctm = (0, 1, -1, 0, y1, -x0) | |
| else: | |
| ctm = (1, 0, 0, 1, -x0, -y0) | |
| self.device.begin_page(page, ctm) | |
| ops_base = self.render_contents(page.resources, page.contents, ctm=ctm) | |
| self.device.fontid = self.fontid | |
| self.device.fontmap = self.fontmap | |
| ops_new = self.device.end_page(page) | |
| # 上面渲染的时候会根据 cropbox 减掉页面偏移得到真实坐标,这里输出的时候需要用 cm 把页面偏移加回来 | |
| self.obj_patch[page.page_xref] = ( | |
| f"q {ops_base}Q 1 0 0 1 {x0} {y0} cm {ops_new}" # ops_base 里可能有图,需要让 ops_new 里的文字覆盖在上面,使用 q/Q 重置位置矩阵 | |
| ) | |
| for obj in page.contents: | |
| self.obj_patch[obj.objid] = "" | |
| def render_contents( | |
| self, | |
| resources: Dict[object, object], | |
| streams: Sequence[object], | |
| ctm: Matrix = MATRIX_IDENTITY, | |
| ) -> None: | |
| """Render the content streams. | |
| This method may be called recursively. | |
| """ | |
| # log.debug( | |
| # "render_contents: resources=%r, streams=%r, ctm=%r", | |
| # resources, | |
| # streams, | |
| # ctm, | |
| # ) | |
| self.init_resources(resources) | |
| self.init_state(ctm) | |
| return self.execute(list_value(streams)) | |
| def execute(self, streams: Sequence[object]) -> None: | |
| ops = "" | |
| try: | |
| parser = PDFContentParser(streams) | |
| except PSEOF: | |
| # empty page | |
| return | |
| while True: | |
| try: | |
| _, (_, obj) = parser.nextobject() | |
| except PSEOF: | |
| break | |
| if isinstance(obj, PSKeyword): | |
| name = keyword_name(obj) | |
| method = "do_%s" % name.replace("*", "_a").replace('"', "_w").replace( | |
| "'", | |
| "_q", | |
| ) | |
| if hasattr(self, method): | |
| func = getattr(self, method) | |
| nargs = func.__code__.co_argcount - 1 | |
| if nargs: | |
| args = self.pop(nargs) | |
| # log.debug("exec: %s %r", name, args) | |
| if len(args) == nargs: | |
| func(*args) | |
| if not ( | |
| name[0] == "T" | |
| or name in ['"', "'", "EI", "MP", "DP", "BMC", "BDC"] | |
| ): # 过滤 T 系列文字指令,因为 EI 的参数是 obj 所以也需要过滤(只在少数文档中画横线时使用),过滤 marked 系列指令 | |
| p = " ".join( | |
| [ | |
| ( | |
| f"{x:f}" | |
| if isinstance(x, float) | |
| else str(x).replace("'", "") | |
| ) | |
| for x in args | |
| ] | |
| ) | |
| ops += f"{p} {name} " | |
| else: | |
| # log.debug("exec: %s", name) | |
| targs = func() | |
| if targs is None: | |
| targs = [] | |
| if not (name[0] == "T" or name in ["BI", "ID", "EMC"]): | |
| p = " ".join( | |
| [ | |
| ( | |
| f"{x:f}" | |
| if isinstance(x, float) | |
| else str(x).replace("'", "") | |
| ) | |
| for x in targs | |
| ] | |
| ) | |
| ops += f"{p} {name} " | |
| elif settings.STRICT: | |
| error_msg = "Unknown operator: %r" % name | |
| raise PDFInterpreterError(error_msg) | |
| else: | |
| self.push(obj) | |
| # print('REV DATA',ops) | |
| return ops | |