Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 
 

556 linhas
18 KiB

  1. import contextvars
  2. import os
  3. import uuid
  4. from pathlib import Path
  5. from typing import Dict, List, Optional, Union
  6. from graphviz import Digraph
  7. # Global contexts for a diagrams and a cluster.
  8. #
  9. # These global contexts are for letting the clusters and nodes know
  10. # where context they are belong to. So the all clusters and nodes does
  11. # not need to specify the current diagrams or cluster via parameters.
  12. __diagram = contextvars.ContextVar("diagrams")
  13. __cluster = contextvars.ContextVar("cluster")
  14. def getdiagram() -> "Diagram":
  15. try:
  16. return __diagram.get()
  17. except LookupError:
  18. return None
  19. def setdiagram(diagram: "Diagram"):
  20. __diagram.set(diagram)
  21. def getcluster() -> "Cluster":
  22. try:
  23. return __cluster.get()
  24. except LookupError:
  25. return None
  26. def setcluster(cluster: "Cluster"):
  27. __cluster.set(cluster)
  28. class Diagram:
  29. __directions = ("TB", "BT", "LR", "RL")
  30. __curvestyles = ("ortho", "curved")
  31. __outformats = ("png", "jpg", "svg", "pdf", "dot")
  32. # fmt: off
  33. _default_graph_attrs = {
  34. "pad": "2.0",
  35. "splines": "ortho",
  36. "nodesep": "0.60",
  37. "ranksep": "0.75",
  38. "fontname": "Sans-Serif",
  39. "fontsize": "15",
  40. "fontcolor": "#2D3436",
  41. }
  42. _default_node_attrs = {
  43. "shape": "box",
  44. "style": "rounded",
  45. "fixedsize": "true",
  46. "width": "1.4",
  47. "height": "1.4",
  48. "labelloc": "b",
  49. # imagepos attribute is not backward compatible
  50. # TODO: check graphviz version to see if "imagepos" is available >= 2.40
  51. # https://github.com/xflr6/graphviz/blob/master/graphviz/backend.py#L248
  52. # "imagepos": "tc",
  53. "imagescale": "true",
  54. "fontname": "Sans-Serif",
  55. "fontsize": "13",
  56. "fontcolor": "#2D3436",
  57. }
  58. _default_edge_attrs = {
  59. "color": "#7B8894",
  60. }
  61. # fmt: on
  62. # TODO: Label position option
  63. # TODO: Save directory option (filename + directory?)
  64. def __init__(
  65. self,
  66. name: str = "",
  67. filename: str = "",
  68. direction: str = "LR",
  69. curvestyle: str = "ortho",
  70. outformat: str = "png",
  71. autolabel: bool = False,
  72. show: bool = True,
  73. strict: bool = False,
  74. graph_attr: Optional[dict] = None,
  75. node_attr: Optional[dict] = None,
  76. edge_attr: Optional[dict] = None,
  77. ):
  78. """Diagram represents a global diagrams context.
  79. :param name: Diagram name. It will be used for output filename if the
  80. filename isn't given.
  81. :param filename: The output filename, without the extension (.png).
  82. If not given, it will be generated from the name.
  83. :param direction: Data flow direction. Default is 'left to right'.
  84. :param curvestyle: Curve bending style. One of "ortho" or "curved".
  85. :param outformat: Output file format. Default is 'png'.
  86. :param show: Open generated image after save if true, just only save otherwise.
  87. :param graph_attr: Provide graph_attr dot config attributes.
  88. :param node_attr: Provide node_attr dot config attributes.
  89. :param edge_attr: Provide edge_attr dot config attributes.
  90. :param strict: Rendering should merge multi-edges.
  91. """
  92. if graph_attr is None:
  93. graph_attr = {}
  94. if node_attr is None:
  95. node_attr = {}
  96. if edge_attr is None:
  97. edge_attr = {}
  98. self.name = name
  99. if not name and not filename:
  100. filename = "diagrams_image"
  101. elif not filename:
  102. filename = "_".join(self.name.split()).lower()
  103. self.filename = filename
  104. self.dot = Digraph(self.name, filename=self.filename, strict=strict)
  105. # Set attributes.
  106. for k, v in self._default_graph_attrs.items():
  107. self.dot.graph_attr[k] = v
  108. self.dot.graph_attr["label"] = self.name
  109. for k, v in self._default_node_attrs.items():
  110. self.dot.node_attr[k] = v
  111. for k, v in self._default_edge_attrs.items():
  112. self.dot.edge_attr[k] = v
  113. if not self._validate_direction(direction):
  114. raise ValueError(f'"{direction}" is not a valid direction')
  115. self.dot.graph_attr["rankdir"] = direction
  116. if not self._validate_curvestyle(curvestyle):
  117. raise ValueError(f'"{curvestyle}" is not a valid curvestyle')
  118. self.dot.graph_attr["splines"] = curvestyle
  119. if isinstance(outformat, list):
  120. for one_format in outformat:
  121. if not self._validate_outformat(one_format):
  122. raise ValueError(f'"{one_format}" is not a valid output format')
  123. else:
  124. if not self._validate_outformat(outformat):
  125. raise ValueError(f'"{outformat}" is not a valid output format')
  126. self.outformat = outformat
  127. # Merge passed in attributes
  128. self.dot.graph_attr.update(graph_attr)
  129. self.dot.node_attr.update(node_attr)
  130. self.dot.edge_attr.update(edge_attr)
  131. self.show = show
  132. self.autolabel = autolabel
  133. def __str__(self) -> str:
  134. return str(self.dot)
  135. def __enter__(self):
  136. setdiagram(self)
  137. return self
  138. def __exit__(self, exc_type, exc_value, traceback):
  139. self.render()
  140. # Remove the graphviz file leaving only the image.
  141. os.remove(self.filename)
  142. setdiagram(None)
  143. def _repr_png_(self):
  144. return self.dot.pipe(format="png")
  145. def _validate_direction(self, direction: str) -> bool:
  146. return direction.upper() in self.__directions
  147. def _validate_curvestyle(self, curvestyle: str) -> bool:
  148. return curvestyle.lower() in self.__curvestyles
  149. def _validate_outformat(self, outformat: str) -> bool:
  150. return outformat.lower() in self.__outformats
  151. def node(self, nodeid: str, label: str, **attrs) -> None:
  152. """Create a new node."""
  153. self.dot.node(nodeid, label=label, **attrs)
  154. def connect(self, node: "Node", node2: "Node", edge: "Edge") -> None:
  155. """Connect the two Nodes."""
  156. self.dot.edge(node.nodeid, node2.nodeid, **edge.attrs)
  157. def subgraph(self, dot: Digraph) -> None:
  158. """Create a subgraph for clustering"""
  159. self.dot.subgraph(dot)
  160. def render(self) -> None:
  161. if isinstance(self.outformat, list):
  162. for one_format in self.outformat:
  163. self.dot.render(format=one_format, view=self.show, quiet=True)
  164. else:
  165. self.dot.render(format=self.outformat, view=self.show, quiet=True)
  166. class Cluster:
  167. __directions = ("TB", "BT", "LR", "RL")
  168. __bgcolors = ("#E5F5FD", "#EBF3E7", "#ECE8F6", "#FDF7E3")
  169. # fmt: off
  170. _default_graph_attrs = {
  171. "shape": "box",
  172. "style": "rounded",
  173. "labeljust": "l",
  174. "pencolor": "#AEB6BE",
  175. "fontname": "Sans-Serif",
  176. "fontsize": "12",
  177. }
  178. # fmt: on
  179. # FIXME:
  180. # Cluster direction does not work now. Graphviz couldn't render
  181. # correctly for a subgraph that has a different rank direction.
  182. def __init__(
  183. self,
  184. label: str = "cluster",
  185. direction: str = "LR",
  186. graph_attr: Optional[dict] = None,
  187. ):
  188. """Cluster represents a cluster context.
  189. :param label: Cluster label.
  190. :param direction: Data flow direction. Default is 'left to right'.
  191. :param graph_attr: Provide graph_attr dot config attributes.
  192. """
  193. if graph_attr is None:
  194. graph_attr = {}
  195. self.label = label
  196. self.name = "cluster_" + self.label
  197. self.dot = Digraph(self.name)
  198. # Set attributes.
  199. for k, v in self._default_graph_attrs.items():
  200. self.dot.graph_attr[k] = v
  201. self.dot.graph_attr["label"] = self.label
  202. if not self._validate_direction(direction):
  203. raise ValueError(f'"{direction}" is not a valid direction')
  204. self.dot.graph_attr["rankdir"] = direction
  205. # Node must be belong to a diagrams.
  206. self._diagram = getdiagram()
  207. if self._diagram is None:
  208. raise EnvironmentError("Global diagrams context not set up")
  209. self._parent = getcluster()
  210. # Set cluster depth for distinguishing the background color
  211. self.depth = self._parent.depth + 1 if self._parent else 0
  212. coloridx = self.depth % len(self.__bgcolors)
  213. self.dot.graph_attr["bgcolor"] = self.__bgcolors[coloridx]
  214. # Merge passed in attributes
  215. self.dot.graph_attr.update(graph_attr)
  216. def __enter__(self):
  217. setcluster(self)
  218. return self
  219. def __exit__(self, exc_type, exc_value, traceback):
  220. if self._parent:
  221. self._parent.subgraph(self.dot)
  222. else:
  223. self._diagram.subgraph(self.dot)
  224. setcluster(self._parent)
  225. def _validate_direction(self, direction: str) -> bool:
  226. return direction.upper() in self.__directions
  227. def node(self, nodeid: str, label: str, **attrs) -> None:
  228. """Create a new node in the cluster."""
  229. self.dot.node(nodeid, label=label, **attrs)
  230. def subgraph(self, dot: Digraph) -> None:
  231. self.dot.subgraph(dot)
  232. class Node:
  233. """Node represents a node for a specific backend service."""
  234. _provider = None
  235. _type = None
  236. _icon_dir = None
  237. _icon = None
  238. _height = 1.9
  239. def __init__(self, label: str = "", *, nodeid: str = None, **attrs: Dict):
  240. """Node represents a system component.
  241. :param label: Node label.
  242. """
  243. # Generates an ID for identifying a node, unless specified
  244. self._id = nodeid or self._rand_id()
  245. self.label = label
  246. # Node must be belong to a diagrams.
  247. self._diagram = getdiagram()
  248. if self._diagram is None:
  249. raise EnvironmentError("Global diagrams context not set up")
  250. if self._diagram.autolabel:
  251. prefix = self.__class__.__name__
  252. if self.label:
  253. self.label = prefix + "\n" + self.label
  254. else:
  255. self.label = prefix
  256. # fmt: off
  257. # If a node has an icon, increase the height slightly to avoid
  258. # that label being spanned between icon image and white space.
  259. # Increase the height by the number of new lines included in the label.
  260. padding = 0.4 * (self.label.count('\n'))
  261. self._attrs = {
  262. "shape": "none",
  263. "height": str(self._height + padding),
  264. "image": self._load_icon(),
  265. } if self._icon else {}
  266. # fmt: on
  267. self._attrs.update(attrs)
  268. self._cluster = getcluster()
  269. # If a node is in the cluster context, add it to cluster.
  270. if self._cluster:
  271. self._cluster.node(self._id, self.label, **self._attrs)
  272. else:
  273. self._diagram.node(self._id, self.label, **self._attrs)
  274. def __repr__(self):
  275. _name = self.__class__.__name__
  276. return f"<{self._provider}.{self._type}.{_name}>"
  277. def __sub__(self, other: Union["Node", List["Node"], "Edge"]):
  278. """Implement Self - Node, Self - [Nodes] and Self - Edge."""
  279. if isinstance(other, list):
  280. for node in other:
  281. self.connect(node, Edge(self))
  282. return other
  283. elif isinstance(other, Node):
  284. return self.connect(other, Edge(self))
  285. else:
  286. other.node = self
  287. return other
  288. def __rsub__(self, other: Union[List["Node"], List["Edge"]]):
  289. """Called for [Nodes] and [Edges] - Self because list don't have __sub__ operators."""
  290. for o in other:
  291. if isinstance(o, Edge):
  292. o.connect(self)
  293. else:
  294. o.connect(self, Edge(self))
  295. return self
  296. def __rshift__(self, other: Union["Node", List["Node"], "Edge"]):
  297. """Implements Self >> Node, Self >> [Nodes] and Self Edge."""
  298. if isinstance(other, list):
  299. for node in other:
  300. self.connect(node, Edge(self, forward=True))
  301. return other
  302. elif isinstance(other, Node):
  303. return self.connect(other, Edge(self, forward=True))
  304. else:
  305. other.forward = True
  306. other.node = self
  307. return other
  308. def __lshift__(self, other: Union["Node", List["Node"], "Edge"]):
  309. """Implements Self << Node, Self << [Nodes] and Self << Edge."""
  310. if isinstance(other, list):
  311. for node in other:
  312. self.connect(node, Edge(self, reverse=True))
  313. return other
  314. elif isinstance(other, Node):
  315. return self.connect(other, Edge(self, reverse=True))
  316. else:
  317. other.reverse = True
  318. return other.connect(self)
  319. def __rrshift__(self, other: Union[List["Node"], List["Edge"]]):
  320. """Called for [Nodes] and [Edges] >> Self because list don't have __rshift__ operators."""
  321. for o in other:
  322. if isinstance(o, Edge):
  323. o.forward = True
  324. o.connect(self)
  325. else:
  326. o.connect(self, Edge(self, forward=True))
  327. return self
  328. def __rlshift__(self, other: Union[List["Node"], List["Edge"]]):
  329. """Called for [Nodes] << Self because list of Nodes don't have __lshift__ operators."""
  330. for o in other:
  331. if isinstance(o, Edge):
  332. o.reverse = True
  333. o.connect(self)
  334. else:
  335. o.connect(self, Edge(self, reverse=True))
  336. return self
  337. @property
  338. def nodeid(self):
  339. return self._id
  340. # TODO: option for adding flow description to the connection edge
  341. def connect(self, node: "Node", edge: "Edge"):
  342. """Connect to other node.
  343. :param node: Other node instance.
  344. :param edge: Type of the edge.
  345. :return: Connected node.
  346. """
  347. if not isinstance(node, Node):
  348. ValueError(f"{node} is not a valid Node")
  349. if not isinstance(edge, Edge):
  350. ValueError(f"{edge} is not a valid Edge")
  351. # An edge must be added on the global diagrams, not a cluster.
  352. self._diagram.connect(self, node, edge)
  353. return node
  354. @staticmethod
  355. def _rand_id():
  356. return uuid.uuid4().hex
  357. def _load_icon(self):
  358. basedir = Path(os.path.abspath(os.path.dirname(__file__)))
  359. return os.path.join(basedir.parent, self._icon_dir, self._icon)
  360. class Edge:
  361. """Edge represents an edge between two nodes."""
  362. _default_edge_attrs = {
  363. "fontcolor": "#2D3436",
  364. "fontname": "Sans-Serif",
  365. "fontsize": "13",
  366. }
  367. def __init__(
  368. self,
  369. node: "Node" = None,
  370. forward: bool = False,
  371. reverse: bool = False,
  372. label: str = "",
  373. color: str = "",
  374. style: str = "",
  375. **attrs: Dict,
  376. ):
  377. """Edge represents an edge between two nodes.
  378. :param node: Parent node.
  379. :param forward: Points forward.
  380. :param reverse: Points backward.
  381. :param label: Edge label.
  382. :param color: Edge color.
  383. :param style: Edge style.
  384. :param attrs: Other edge attributes
  385. """
  386. if node is not None:
  387. assert isinstance(node, Node)
  388. self.node = node
  389. self.forward = forward
  390. self.reverse = reverse
  391. self._attrs = {}
  392. # Set attributes.
  393. for k, v in self._default_edge_attrs.items():
  394. self._attrs[k] = v
  395. if label:
  396. # Graphviz complaining about using label for edges, so replace it with xlabel.
  397. # Update: xlabel option causes the misaligned label position: https://github.com/mingrammer/diagrams/issues/83
  398. self._attrs["label"] = label
  399. if color:
  400. self._attrs["color"] = color
  401. if style:
  402. self._attrs["style"] = style
  403. self._attrs.update(attrs)
  404. def __sub__(self, other: Union["Node", "Edge", List["Node"]]):
  405. """Implement Self - Node or Edge and Self - [Nodes]"""
  406. return self.connect(other)
  407. def __rsub__(self, other: Union[List["Node"], List["Edge"]]) -> List["Edge"]:
  408. """Called for [Nodes] or [Edges] - Self because list don't have __sub__ operators."""
  409. return self.append(other)
  410. def __rshift__(self, other: Union["Node", "Edge", List["Node"]]):
  411. """Implements Self >> Node or Edge and Self >> [Nodes]."""
  412. self.forward = True
  413. return self.connect(other)
  414. def __lshift__(self, other: Union["Node", "Edge", List["Node"]]):
  415. """Implements Self << Node or Edge and Self << [Nodes]."""
  416. self.reverse = True
  417. return self.connect(other)
  418. def __rrshift__(self, other: Union[List["Node"], List["Edge"]]) -> List["Edge"]:
  419. """Called for [Nodes] or [Edges] >> Self because list of Edges don't have __rshift__ operators."""
  420. return self.append(other, forward=True)
  421. def __rlshift__(self, other: Union[List["Node"], List["Edge"]]) -> List["Edge"]:
  422. """Called for [Nodes] or [Edges] << Self because list of Edges don't have __lshift__ operators."""
  423. return self.append(other, reverse=True)
  424. def append(self, other: Union[List["Node"], List["Edge"]], forward=None, reverse=None) -> List["Edge"]:
  425. result = []
  426. for o in other:
  427. if isinstance(o, Edge):
  428. o.forward = forward if forward else o.forward
  429. o.reverse = forward if forward else o.reverse
  430. self._attrs = o.attrs.copy()
  431. result.append(o)
  432. else:
  433. result.append(Edge(o, forward=forward, reverse=reverse, **self._attrs))
  434. return result
  435. def connect(self, other: Union["Node", "Edge", List["Node"]]):
  436. if isinstance(other, list):
  437. for node in other:
  438. self.node.connect(node, self)
  439. return other
  440. elif isinstance(other, Edge):
  441. self._attrs = other._attrs.copy()
  442. return self
  443. else:
  444. if self.node is not None:
  445. return self.node.connect(other, self)
  446. else:
  447. self.node = other
  448. return self
  449. @property
  450. def attrs(self) -> Dict:
  451. if self.forward and self.reverse:
  452. direction = "both"
  453. elif self.forward:
  454. direction = "forward"
  455. elif self.reverse:
  456. direction = "back"
  457. else:
  458. direction = "none"
  459. return {**self._attrs, "dir": direction}
  460. Group = Cluster