Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): track volumes from multichannel configs #16698

Merged
merged 5 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions api/src/opentrons/protocol_engine/commands/aspirate.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
except PipetteOverpressureError as e:
state_update.set_liquid_operated(
labware_id=labware_id,
well_name=well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
labware_id, well_name, pipette_id
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
Expand All @@ -167,8 +169,13 @@ async def execute(self, params: AspirateParams) -> _ExecuteReturn:
else:
state_update.set_liquid_operated(
labware_id=labware_id,
well_name=well_name,
volume_added=-volume_aspirated,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
labware_id, well_name, pipette_id
),
volume_added=-volume_aspirated
* self._state_view.geometry.get_nozzles_per_well(
labware_id, well_name, pipette_id
),
Copy link
Contributor

@SyntaxColoring SyntaxColoring Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this well_names/volume_added pattern happens across several commands, it might make sense to centralize it in GeometryView. Like:

class WellAndVol(NamedTuple):
    well_name: str
    volume: float

class GeometryView:
    ...
    def project_liquid_operation(
        labware_id: str, focused_well: str, pipette_id: str, volume_per_tip: float
    ) -> list[WellAndVol]:
        ...

And then StateUpdate.set_liquid_operated() would take a list[WellAndVol], instead of taking a list of well_names and a separate total volume_added.

This would probably help with command implementation testability, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess... I do kind of want to enforce the "all the wells gain/lose the same volume" though, and I think it's good to be explicit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edge case scenario with liquid level detection:

  • say you have 100 µL in A1 and H1 and 50 µL in B1–G1.
  • your pipette is in COLUMN nozzle configuration.
  • command a meniscus-relative aspirate of 25 µL from the column (A1–H1).
  • we (i.e., humans) know that the A1 and H1 tips will aspirate 25 µL of liquid and the other tips will aspirate 0.

do we want to account for that in software? or are we content to say that is Not a Thing You Should Do in documentation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That Is Not A Thing You Should Do

)
state_update.set_fluid_aspirated(
pipette_id=params.pipetteId,
Expand Down
19 changes: 16 additions & 3 deletions api/src/opentrons/protocol_engine/commands/aspirate_in_place.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn:
):
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_name=current_location.well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
Expand Down Expand Up @@ -150,8 +154,17 @@ async def execute(self, params: AspirateInPlaceParams) -> _ExecuteReturn:
):
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_name=current_location.well_name,
volume_added=-volume,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=-volume
* self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
)

return SuccessData(
Expand Down
13 changes: 11 additions & 2 deletions api/src/opentrons/protocol_engine/commands/dispense.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Dispense command request, result, and implementation models."""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Union
from typing_extensions import Literal
Expand Down Expand Up @@ -109,7 +110,9 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn:
except PipetteOverpressureError as e:
state_update.set_liquid_operated(
labware_id=labware_id,
well_name=well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
labware_id, well_name, params.pipetteId
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
Expand All @@ -134,9 +137,15 @@ async def execute(self, params: DispenseParams) -> _ExecuteReturn:
pipette_id=params.pipetteId, volume=volume
)
)
if volume_added is not None:
volume_added *= self._state_view.geometry.get_nozzles_per_well(
labware_id, well_name, params.pipetteId
)
state_update.set_liquid_operated(
labware_id=labware_id,
well_name=well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
labware_id, well_name, params.pipetteId
),
volume_added=volume_added if volume_added is not None else CLEAR,
)
state_update.set_fluid_ejected(pipette_id=params.pipetteId, volume=volume)
Expand Down
19 changes: 17 additions & 2 deletions api/src/opentrons/protocol_engine/commands/dispense_in_place.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Dispense-in-place command request, result, and implementation models."""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Union
from typing_extensions import Literal
Expand Down Expand Up @@ -91,7 +92,11 @@ async def execute(self, params: DispenseInPlaceParams) -> _ExecuteReturn:
):
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_name=current_location.well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=CLEAR,
)
state_update.set_fluid_unknown(pipette_id=params.pipetteId)
Expand Down Expand Up @@ -129,9 +134,19 @@ async def execute(self, params: DispenseInPlaceParams) -> _ExecuteReturn:
pipette_id=params.pipetteId, volume=volume
)
)
if volume_added is not None:
volume_added *= self._state_view.geometry.get_nozzles_per_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
)
state_update.set_liquid_operated(
labware_id=current_location.labware_id,
well_name=current_location.well_name,
well_names=self._state_view.geometry.get_wells_covered_by_pipette_focused_on_well(
current_location.labware_id,
current_location.well_name,
params.pipetteId,
),
volume_added=volume_added if volume_added is not None else CLEAR,
)
return SuccessData(
Expand Down
194 changes: 194 additions & 0 deletions api/src/opentrons/protocol_engine/state/_well_math.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Utilities for doing coverage math on wells."""

from typing import Iterator
from typing_extensions import assert_never
from opentrons_shared_data.errors.exceptions import (
InvalidStoredData,
InvalidProtocolData,
)

from opentrons.hardware_control.nozzle_manager import NozzleMap


def wells_covered_by_pipette_configuration(
nozzle_map: NozzleMap,
target_well: str,
labware_wells_by_column: list[list[str]],
) -> Iterator[str]:
"""Compute the wells covered by a pipette nozzle configuration."""
if len(labware_wells_by_column) >= 12 and len(labware_wells_by_column[0]) >= 8:
yield from wells_covered_dense(
nozzle_map,
target_well,
labware_wells_by_column,
)
elif len(labware_wells_by_column) < 12 and len(labware_wells_by_column[0]) < 8:
yield from wells_covered_sparse(
nozzle_map, target_well, labware_wells_by_column
)
else:
raise InvalidStoredData(
"Labware of non-SBS and non-reservoir format cannot be handled"
)


def row_col_ordinals_from_column_major_map(
target_well: str, column_major_wells: list[list[str]]
) -> tuple[int, int]:
"""Turn a well name into the index of its row and column (in that order) within the labware."""
for column_index, column in enumerate(column_major_wells):
if target_well in column:
return column.index(target_well), column_index
raise InvalidStoredData(f"Well name {target_well} is not present in labware")


def wells_covered_dense( # noqa: C901
nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]]
) -> Iterator[str]:
"""Get the list of wells covered by a nozzle map on an SBS format labware with a specified multiplier of 96 into the number of wells.

This will handle the offsetting of the nozzle map into higher-density well plates. For instance, a full column config target at A1 of a
96 plate would cover wells A1, B1, C1, D1, E1, F1, G1, H1, and use downsample_factor 1.0 (96*1 = 96). A full column config target on a
384 plate would cover wells A1, C1, E1, G1, I1, K1, M1, O1 and use downsample_factor 4.0 (96*4 = 384), while a full column config
targeting B1 would cover wells B1, D1, F1, H1, J1, L1, N1, P1 - still using downsample_factor 4.0, with the offset gathered from the
target well.

The function may also handle sub-96 regular labware with fractional downsample factors, but that's physically improbable and it's not
tested. If you have a regular labware with fewer than 96 wells that is still regularly-spaced and has little enough space between well
walls that it's reasonable to use with multiple channels, you probably want wells_covered_trough.
"""
target_row_index, target_column_index = row_col_ordinals_from_column_major_map(
target_well, target_wells_by_column
)
column_downsample = len(target_wells_by_column) // 12
row_downsample = len(target_wells_by_column[0]) // 8
if column_downsample < 1 or row_downsample < 1:
raise InvalidStoredData(
"This labware cannot be used wells_covered_dense because it is less dense than an SBS 96 standard"
)

for nozzle_column in range(len(nozzle_map.columns)):
target_column_offset = nozzle_column * column_downsample
for nozzle_row in range(len(nozzle_map.rows)):
target_row_offset = nozzle_row * row_downsample
if nozzle_map.starting_nozzle == "A1":
if (
target_column_index + target_column_offset
< len(target_wells_by_column)
) and (
target_row_index + target_row_offset
< len(target_wells_by_column[target_column_index])
):
yield target_wells_by_column[
target_column_index + target_column_offset
][target_row_index + target_row_offset]
elif nozzle_map.starting_nozzle == "A12":
if (target_column_index - target_column_offset >= 0) and (
target_row_index + target_row_offset
< len(target_wells_by_column[target_column_index])
):
yield target_wells_by_column[
target_column_index - target_column_offset
][target_row_index + target_row_offset]
elif nozzle_map.starting_nozzle == "H1":
if (
target_column_index + target_column_offset
< len(target_wells_by_column)
) and (target_row_index - target_row_offset >= 0):
yield target_wells_by_column[
target_column_index + target_column_offset
][target_row_index - target_row_offset]
elif nozzle_map.starting_nozzle == "H12":
if (target_column_index - target_column_offset >= 0) and (
target_row_index - target_row_offset >= 0
):
yield target_wells_by_column[
target_column_index - target_column_offset
][target_row_index - target_row_offset]
else:
raise InvalidProtocolData(
f"A pipette nozzle configuration may not having a starting nozzle of {nozzle_map.starting_nozzle}"
)


def wells_covered_sparse( # noqa: C901
nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]]
) -> Iterator[str]:
"""Get the list of wells covered by a nozzle map on a column-oriented reservoir.

This function handles reservoirs whose wells span multiple rows and columns - the most common case is something like a
12-well reservoir, whose wells are the height of an SBS column and the width of an SBS row, or a 1-well reservoir whose well
is the size of an SBS active area.
"""
target_row_index, target_column_index = row_col_ordinals_from_column_major_map(
target_well, target_wells_by_column
)
column_upsample = 12 // len(target_wells_by_column)
row_upsample = 8 // len(target_wells_by_column[0])
if column_upsample < 1 or row_upsample < 1:
raise InvalidStoredData(
"This labware cannot be uased with wells_covered_sparse because it is more dense than an SBS 96 standard."
sfoster1 marked this conversation as resolved.
Show resolved Hide resolved
)
for nozzle_column in range(max(1, len(nozzle_map.columns) // column_upsample)):
for nozzle_row in range(max(1, len(nozzle_map.rows) // row_upsample)):
if nozzle_map.starting_nozzle == "A1":
if (
target_column_index + nozzle_column < len(target_wells_by_column)
) and (
target_row_index + nozzle_row
< len(target_wells_by_column[target_column_index])
):
yield target_wells_by_column[target_column_index + nozzle_column][
target_row_index + nozzle_row
]
elif nozzle_map.starting_nozzle == "A12":
if (target_column_index - nozzle_column >= 0) and (
target_row_index + nozzle_row
< len(target_wells_by_column[target_column_index])
):
yield target_wells_by_column[target_column_index - nozzle_column][
target_row_index + nozzle_row
]
elif nozzle_map.starting_nozzle == "H1":
if (
target_column_index + nozzle_column
< len(target_wells_by_column[target_column_index])
) and (target_row_index - nozzle_row >= 0):
yield target_wells_by_column[target_column_index + nozzle_column][
target_row_index - nozzle_row
]
elif nozzle_map.starting_nozzle == "H12":
if (target_column_index - nozzle_column >= 0) and (
target_row_index - nozzle_row >= 0
):
yield target_wells_by_column[target_column_index - nozzle_column][
target_row_index - nozzle_row
]
else:
raise InvalidProtocolData(
f"A pipette nozzle configuration may not having a starting nozzle of {nozzle_map.starting_nozzle}"
)


def nozzles_per_well(
nozzle_map: NozzleMap, target_well: str, target_wells_by_column: list[list[str]]
) -> int:
"""Get the number of nozzles that will interact with each well in the labware.

For instance, if this is an SBS 96 or more dense, there is always 1 nozzle per well
that is interacted with (and some wells may not be interacted with at all). If this is
a 12-column reservoir, then all active nozzles in each column of the configuration will
interact with each well; so an 8-channel full config would have 8 nozzles per well,
and a 96 channel with a rectangle config from A1 to D12 would have 4 nozzles per well.
"""
_, target_column_index = row_col_ordinals_from_column_major_map(
target_well, target_wells_by_column
)
# labware as or more dense than a 96 plate will only ever have 1 nozzle per well (and some wells won't be touched)
if len(target_wells_by_column) >= len(nozzle_map.columns) and len(
target_wells_by_column[target_column_index]
) >= len(nozzle_map.rows):
return 1
return max(1, len(nozzle_map.columns) // len(target_wells_by_column)) * max(
1, len(nozzle_map.rows) // len(target_wells_by_column[target_column_index])
)
46 changes: 46 additions & 0 deletions api/src/opentrons/protocol_engine/state/geometry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Geometry state getters."""

import enum
from numpy import array, dot, double as npdouble
from numpy.typing import NDArray
Expand All @@ -8,6 +9,7 @@

from opentrons.types import Point, DeckSlotName, StagingSlotName, MountType

from opentrons_shared_data.errors.exceptions import InvalidStoredData
from opentrons_shared_data.labware.constants import WELL_NAME_PATTERN
from opentrons_shared_data.deck.types import CutoutFixture
from opentrons_shared_data.pipette import PIPETTE_X_SPAN
Expand Down Expand Up @@ -61,6 +63,7 @@
find_volume_at_well_height,
find_height_at_well_volume,
)
from ._well_math import wells_covered_by_pipette_configuration, nozzles_per_well


SLOT_WIDTH = 128
Expand Down Expand Up @@ -1517,3 +1520,46 @@ def validate_dispense_volume_into_well(
raise errors.InvalidDispenseVolumeError(
f"Attempting to dispense {volume}µL of liquid into a well that can only hold {well_volumetric_capacity}µL (well {well_name} in labware_id: {labware_id})"
)

def get_wells_covered_by_pipette_focused_on_well(
self, labware_id: str, focused_on_well_name: str, pipette_id: str
) -> list[str]:
"""Get a flat list of wells that are covered by a pipette when moved to a specified well.

When you move a pipette in a multichannel configuration to a specific well - here called
"focused on" the well, for lack of a better option - the pipette will operate on other wells as well.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've seen this called the "primary well" or "active well" elsewhere.


For instance, a pipette with a COLUMN configuration that is focused on well A1 of an SBS standard labware
will also "cover", under this definition, wells B1-H1. That same pipette, when focused on well C5, will "cover"
wells C5-H5.

This math only works, and may only be applied, if one of the following is true:
- The pipette is in a SINGLE configuration
- The pipette is in a non-SINGLE configuration, and the labware is an SBS-format 96 or 384 well plate (and is so
marked in its definition's parameters.format key, as 96Standard or 384Standard)

If all of the following do not apply, regardless of the nozzle configuration of the pipette this function will
return only the labware covered by the primary well.
"""
pipette_nozzle_map = self._pipettes.get_nozzle_configuration(pipette_id)
labware_columns = [
column for column in self._labware.get_definition(labware_id).ordering
]
try:
return list(
wells_covered_by_pipette_configuration(
pipette_nozzle_map, focused_on_well_name, labware_columns
)
)
except InvalidStoredData:
return [focused_on_well_name]

def get_nozzles_per_well(
self, labware_id: str, focused_on_well_name: str, pipette_id: str
) -> int:
"""Get the number of nozzles that will interact with each well."""
return nozzles_per_well(
self._pipettes.get_nozzle_configuration(pipette_id),
focused_on_well_name,
self._labware.get_definition(labware_id).ordering,
)
Loading