Skip to content

Commit

Permalink
node.ForEachTracked and node.ForEachTrackedWithNodeSink methods
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed May 23, 2023
1 parent a0b13f3 commit a06e1cc
Showing 1 changed file with 81 additions and 0 deletions.
81 changes: 81 additions & 0 deletions hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,3 +877,84 @@ func (n *Node) ForEach(ctx context.Context, f func(k string, val *cbg.Deferred)
}
return nil
}

// ForEachTracked recursively calls function f on each k / val pair found in the HAMT.
// This performs a full traversal of the graph and for large HAMTs can cause
// a large number of loads from the underlying store.
// The values are returned as raw bytes, not decoded.
// This method also provides the trail of indices to the current node, which can be used to formulate a selector suffix
func (n *Node) ForEachTracked(ctx context.Context, trail []int, f func(k string, val *cbg.Deferred, selectorSuffix []int) error) error {
idx := 0
for _, p := range n.Pointers {
// Seek the next set bit in the bitfield to find the actual index for this pointer
for n.Bitfield.Bit(idx) == 0 {
idx++
}
trail = append(trail, idx)
if p.isShard() {
chnd, err := p.loadChild(ctx, n.store, n.bitWidth, n.hash)
if err != nil {
return err
}

if err := chnd.ForEachTracked(ctx, trail, f); err != nil {
return err
}
} else {
for _, kv := range p.KVs {
if err := f(string(kv.Key), kv.Value, trail); err != nil {
return err
}
}
}
idx++
}
return nil
}

// ForEachTrackedWithNodeSink recursively calls function f on each k / val pair found in the HAMT.
// This performs a full traversal of the graph and for large HAMTs can cause
// a large number of loads from the underlying store.
// The values are returned as raw bytes, not decoded.
// This method also provides the trail of indices to the current node, which can be used to formulate a selector suffix
// This method also provides a callback to sink the current node
func (n *Node) ForEachTrackedWithNodeSink(ctx context.Context, trail []int, b *bytes.Buffer, sink cbg.CBORUnmarshaler, f func(k string, val *cbg.Deferred, selectorSuffix []int) error) error {
if sink != nil {
if b == nil {
b = bytes.NewBuffer(nil)
}
b.Reset()
if err := n.MarshalCBOR(b); err != nil {
return err
}
if err := sink.UnmarshalCBOR(b); err != nil {
return err
}
}
idx := 0
for _, p := range n.Pointers {
// Seek the next set bit in the bitfield to find the actual index for this pointer
for n.Bitfield.Bit(idx) == 0 {
idx++
}
trail = append(trail, idx)
if p.isShard() {
chnd, err := p.loadChild(ctx, n.store, n.bitWidth, n.hash)
if err != nil {
return err
}

if err := chnd.ForEachTrackedWithNodeSink(ctx, trail, b, sink, f); err != nil {
return err
}
} else {
for _, kv := range p.KVs {
if err := f(string(kv.Key), kv.Value, trail); err != nil {
return err
}
}
}
idx++
}
return nil
}

0 comments on commit a06e1cc

Please sign in to comment.