Skip to content

Commit

Permalink
0.6.1: move name/descr from Pipes into ExplainingVisitor methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Dec 4, 2023
1 parent 83b00f1 commit 948dfa6
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 56 deletions.
45 changes: 6 additions & 39 deletions kioss/_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ def __iter__(self) -> Iterator[T]:
raise ValueError("_pipe.ITERATOR_PRODUCING_VISITOR_CLASS is None")
return self._accept(ITERATOR_PRODUCING_VISITOR_CLASS())

def __add__(self, other: "APipe[T]") -> "APipe[T]":
return self.chain(other)

def __repr__(self) -> str:
"""
Explain the plan of the pipe
"""
if EXPLAINING_VISITOR_CLASS is None:
raise ValueError("_pipe.EXPLAINING_VISITOR_CLASS is None")
return self._accept(EXPLAINING_VISITOR_CLASS())
Expand All @@ -44,9 +50,6 @@ def __repr__(self) -> str:
def _accept(self, visitor: "AVisitor") -> Any:
raise NotImplementedError()

def __add__(self, other: "APipe[T]") -> "APipe[T]":
return self.chain(other)

@staticmethod
def sanitize_n_threads(n_threads: int):
if not isinstance(n_threads, int):
Expand Down Expand Up @@ -271,9 +274,6 @@ def __init__(self, source: Callable[[], Iterator[T]]):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_source_pipe(self)

def __str__(self) -> str:
return f"Source(of type: {type(self.source)})"


class FilterPipe(APipe[T]):
def __init__(self, upstream: APipe[T], predicate: Callable[[T], bool]):
Expand All @@ -283,9 +283,6 @@ def __init__(self, upstream: APipe[T], predicate: Callable[[T], bool]):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_filter_pipe(self)

def __str__(self) -> str:
return f"Filter(using predicate function of type {type(self.predicate)})"


class MapPipe(APipe[R]):
def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
Expand All @@ -296,9 +293,6 @@ def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_map_pipe(self)

def __str__(self) -> str:
return f"Map(function of type {type(self.func)}, using {self.n_threads} thread{'s' if self.n_threads > 1 else ''})"


class DoPipe(APipe[T]):
def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
Expand All @@ -309,9 +303,6 @@ def __init__(self, upstream: APipe[T], func: Callable[[T], R], n_threads: int):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_do_pipe(self)

def __str__(self) -> str:
return f"Do(side effects by applying a function of type {type(self.func)}, using {self.n_threads} thread{'s' if self.n_threads > 1 else ''})"


class LogPipe(APipe[T]):
def __init__(self, upstream: APipe[T], what: str = "elements"):
Expand All @@ -321,9 +312,6 @@ def __init__(self, upstream: APipe[T], what: str = "elements"):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_log_pipe(self)

def __str__(self) -> str:
return f"Log('{self.what}')"


class FlattenPipe(APipe[T]):
def __init__(self, upstream: APipe[Iterator[T]], n_threads: int):
Expand All @@ -333,11 +321,6 @@ def __init__(self, upstream: APipe[Iterator[T]], n_threads: int):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_flatten_pipe(self)

def __str__(self) -> str:
return (
f"Flatten(using {self.n_threads} thread{'s' if self.n_threads > 1 else ''})"
)


class BatchPipe(APipe[List[T]]):
def __init__(self, upstream: APipe[T], size: int, period: float):
Expand All @@ -348,9 +331,6 @@ def __init__(self, upstream: APipe[T], size: int, period: float):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_batch_pipe(self)

def __str__(self) -> str:
return f"Batch(elements by groups of {self.size} element{'s' if self.size > 1 else ''}, or over a period of {self.period} second{'s' if self.period > 1 else ''})"


class CatchPipe(APipe[T]):
def __init__(
Expand All @@ -366,9 +346,6 @@ def __init__(
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_catch_pipe(self)

def __str__(self) -> str:
return f"Catch(exception instances of class in [{', '.join(map(lambda class_: class_.__name__, self.classes))}]{', with an additional `when` condition' if self.when is not None else ''})"


class ChainPipe(APipe[T]):
def __init__(self, upstream: APipe[T], others: List[APipe]):
Expand All @@ -378,9 +355,6 @@ def __init__(self, upstream: APipe[T], others: List[APipe]):
def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_chain_pipe(self)

def __str__(self) -> str:
return f"Chain({len(self.others)+1} pipes)" # TODO itricate explains


class SlowPipe(APipe[T]):
def __init__(self, upstream: APipe[T], freq: float):
Expand All @@ -389,10 +363,3 @@ def __init__(self, upstream: APipe[T], freq: float):

def _accept(self, visitor: "AVisitor") -> Any:
return visitor.visit_slow_pipe(self)

def __str__(self) -> str:
return f"Slow(at a maximum frequancy of {self.freq} element{'s' if self.freq > 1 else ''} per second)"


# a: Iterator[str] = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(ITERATOR_PRODUCING_VISITOR())
# b: Iterator[str] = SourcePipe(range(8).__iter__).do(lambda e:e).map(str).do(print)._accept(_visitor.IteratorGeneratingVisitor())
60 changes: 44 additions & 16 deletions kioss/_visit/_explanation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,16 @@ def __init__(self, initial_margin: int = 0, add_header: bool = True):
self.margin_step = 2
self.add_header = add_header

def additional_explain_lines(self, pipe: _pipe.APipe) -> str:
name, descr = str(pipe).split("(")
return f"{' '*self.current_margin}{_util.colorize_in_grey('└' + '─'*(self.margin_step - 1))}{_util.colorize_in_red(name)}({descr}\n"
def additional_explain_lines(self, name: str, descr: str) -> str:
return f"{' '*self.current_margin}{_util.colorize_in_grey('└' + '─'*(self.margin_step - 1))}{_util.colorize_in_red(name)}({descr})\n"

def visit_any_pipe(self, pipe: _pipe.APipe) -> str:
def visit_any_pipe(self, pipe: _pipe.APipe, name: str, descr: str) -> str:
if self.add_header:
header = _util.bold(ExplainingVisitor.HEADER) + "\n"
self.add_header = False
else:
header = ""
additional_explain_lines = self.additional_explain_lines(pipe)
additional_explain_lines = self.additional_explain_lines(name, descr)
self.current_margin += self.margin_step
if pipe.upstream is not None:
upstream_repr = pipe.upstream._accept(self)
Expand All @@ -31,33 +30,62 @@ def visit_any_pipe(self, pipe: _pipe.APipe) -> str:
return f"{header}{additional_explain_lines}{upstream_repr}"

def visit_chain_pipe(self, pipe: _pipe.ChainPipe) -> Any:
additional_explain_lines = self.additional_explain_lines(pipe)
name = "Chain"
descr = f"{len(pipe.others)+1} pipes"
additional_explain_lines = self.additional_explain_lines(name, descr)
self.current_margin += self.margin_step
return f"{additional_explain_lines}{''.join(map(lambda pipe: pipe._accept(ExplainingVisitor(self.current_margin, add_header=False)), pipe.others))}{self.visit_any_pipe(pipe.upstream)}"
chained_pipes_repr = "".join(
map(
lambda pipe: pipe._accept(
ExplainingVisitor(self.current_margin, add_header=False)
),
pipe.others,
)
)
upstream_repr = pipe.upstream._accept(self)
return f"{additional_explain_lines}{chained_pipes_repr}{upstream_repr}"

def visit_source_pipe(self, pipe: _pipe.SourcePipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Source"
descr = f"of type: {type(pipe.source)}"
return self.visit_any_pipe(pipe, name, descr)

def visit_map_pipe(self, pipe: _pipe.MapPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Map"
descr = f"function of type {type(pipe.func)}, using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_do_pipe(self, pipe: _pipe.DoPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Do"
descr = f"side effects by applying a function of type {type(pipe.func)}, using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_flatten_pipe(self, pipe: _pipe.FlattenPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Flatten"
descr = f"using {pipe.n_threads} thread{'s' if pipe.n_threads > 1 else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_filter_pipe(self, pipe: _pipe.FilterPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Filter"
descr = f"using predicate function of type {type(pipe.predicate)}"
return self.visit_any_pipe(pipe, name, descr)

def visit_batch_pipe(self, pipe: _pipe.BatchPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Batch"
descr = f"elements by groups of {pipe.size} element{'s' if pipe.size > 1 else ''}, or over a period of {pipe.period} second{'s' if pipe.period > 1 else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_slow_pipe(self, pipe: _pipe.SlowPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Slow"
descr = f"at a maximum frequancy of {pipe.freq} element{'s' if pipe.freq > 1 else ''} per second"
return self.visit_any_pipe(pipe, name, descr)

def visit_catch_pipe(self, pipe: _pipe.CatchPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Catch"
descr = f"exception instances of class in [{', '.join(map(lambda class_: class_.__name__, pipe.classes))}]{', with an additional `when` condition' if pipe.when is not None else ''}"
return self.visit_any_pipe(pipe, name, descr)

def visit_log_pipe(self, pipe: _pipe.LogPipe) -> Any:
return self.visit_any_pipe(pipe)
name = "Log"
descr = f"the evolution of the ieration over {pipe.what}"
return self.visit_any_pipe(pipe, name, descr)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='kioss',
version='0.6.0',
version='0.6.1',
packages=['kioss'],
url='http://github.com/bonnal-enzo/kioss',
license='Apache 2.',
Expand Down

0 comments on commit 948dfa6

Please sign in to comment.