```def to_partition_tasks(self, psets: dict[str, l...
# daft-dev
c
Copy code
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
    return physical_plan.materialize(self._scheduler.to_partition_tasks(psets))
could you please walk through this code snippet? i stuck on this logic.... help me @Colin Ho @jay
j
Yup, so there are two types of generators that will be produced from the physical plan: 1.
InProgressPhysicalPlan
2.
MaterializedPhysicalPlan
InProgressPhysicalPlan
is a generator that can emit: • `None`: indicating that the plan is waiting on more work to be done before it can proceed • `PartitionTaskBuilder`: indicating that this is a pipeline of instructions that can be further appended to (allowing us to do fusing of operations such as Project -> Filter -> …) • `PartitionTask`: indicating that this is a task that has been “built”, and that the runner should run this task.
MaterializedPhysicalPlan
is very similar, but only emits: •
None
PartitionTask
MaterializedResult
: indicates that this is a final result that is completed, and the runner should store this in the final set of partitions Essentially the
materialize(plan: InProgressPhysicalPlan) -> MaterializedPhysicalPlan
function will just finalize any
PartitionTaskBuilders
that were emitted, and emit those as
PartitionTasks
. This way the runner can just run any
PartitionTasks
that are emitted from this plan.
c
it seems that
materialize
can also generate
MaterializedResult[PartitionT]
?
j
From the runner’s point of view, it has this black-box
MaterializedPhysicalPlan
that it can now call
next()
on. It will receive either a •
None
(this is the plan’s way of saying — “please do more work so I can proceed”) •
PartitionTask
(this is the plan’s way of saying — “please run this task”) • Or a
StopIteration
(this is the plan’s way o saying — no more work left) •
MaterializedResult
: indicates that this is a final result that is completed, and the runner should store this in the final set of partitions
Oh yes, sorry.
MaterializedResult[PartitionT]
is the plan’s way of saying “this is a FINAL result”
Let me amend my previous messages to reflect this
In this manner, each Python generator has very fine-grained control around how it wants to do execution. For example, our joins can say “please run these
PartitionTasks
from the left/right children”, keep a buffer of the left/right tasks, and then start emitting new
PartitionTaskBuilders
to kickstart the downstream generators as results become available
c
I will compare the code to understand what you said, thx
None
(this is the plan’s way of saying — “please do more work so I can proceed”). how to do more work to callback the process of generating partition tasks?
@jay
j
The plan is waiting on previous work that is submitted, it doesn’t have to do anything At that point, the runner should have pending work in the queue that it is still working on (previous
PartitionTasks
that it received)
c
thx @jay i have grasp the processing of generating partition tasks using a whole day, hh
j
Yeah that’s the worst part of our code 😅
c
the code is clean, what we need is a doc