Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.Net: SK Process Cloud Events - Publish Interface abstraction in DAPR #10222

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;

namespace Microsoft.SemanticKernel;

/// <summary>
/// An interface that provides a channel for emitting external messages from a step.
/// In addition provide common methods like initialization and Uninitialization
/// </summary>
public interface IExternalKernelProcessMessageChannel : IExternalKernelProcessMessageChannelEmitter
{
/// <summary>
/// Initialization of the external messaging channel used
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Initialize();

/// <summary>
/// Uninitialization of the external messaging channel used
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Uninitialize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;

namespace Microsoft.SemanticKernel;

/// <summary>
/// An interface that provides a channel for emitting external messages from a step.
/// </summary>
public interface IExternalKernelProcessMessageChannelEmitter
{
/// <summary>
/// Emits the specified event from the step outside the SK process
/// </summary>
/// <param name="externalTopicEvent">name of the topic to be used externally as the event name</param>
/// <param name="eventData">data to be transmitted externally</param>
/// <returns></returns>
abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ namespace Microsoft.SemanticKernel;
public sealed class KernelProcessStepContext
{
private readonly IKernelProcessMessageChannel _stepMessageChannel;
private readonly IExternalKernelProcessMessageChannelEmitter? _externalMessageChannel;

/// <summary>
/// Initializes a new instance of the <see cref="KernelProcessStepContext"/> class.
/// </summary>
/// <param name="channel">An instance of <see cref="IKernelProcessMessageChannel"/>.</param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel)
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannelEmitter"/></param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannelEmitter? externalMessageChannel = null)
{
this._stepMessageChannel = channel;
this._externalMessageChannel = externalMessageChannel;
}

/// <summary>
/// Emit an event from the current step.
/// Emit an SK process event from the current step.
/// </summary>
/// <param name="processEvent">An instance of <see cref="KernelProcessEvent"/> to be emitted from the <see cref="KernelProcessStep"/></param>
/// <returns>A <see cref="ValueTask"/></returns>
Expand All @@ -31,7 +34,7 @@ public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
}

/// <summary>
/// Emit an event from the current step with a simplified method signature.
/// Emit an SK process event from the current step with a simplified method signature.
/// </summary>
/// <param name="eventId"></param>
/// <param name="data"></param>
Expand All @@ -52,4 +55,22 @@ public ValueTask EmitEventAsync(
Visibility = visibility
});
}

/// <summary>
/// Emit an external event to through a <see cref="IExternalKernelProcessMessageChannelEmitter"/>
/// component if connected from within the SK process
/// </summary>
/// <param name="externalTopicName"></param>
/// <param name="processEventData"></param>
/// <returns></returns>
/// <exception cref="KernelException"></exception>
public async Task EmitExternalEventAsync(string externalTopicName, object? processEventData = null)
{
if (this._externalMessageChannel == null)
{
throw new KernelException($"External message channel not configured for step with topic {externalTopicName}");
}

await this._externalMessageChannel.EmitExternalEventAsync(externalTopicName, processEventData).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;
using Dapr.Actors.Runtime;

namespace Microsoft.SemanticKernel;

/// <summary>
/// An actor that represents en external event messaging buffer.
/// </summary>
internal sealed class ExternalMessageBufferActor : Actor, IExternalMessageBuffer
{
private readonly IExternalKernelProcessMessageChannel _externalMessageChannel;

/// <summary>
/// Required constructor for Dapr Actor.
/// </summary>
/// <param name="host">The actor host.</param>
/// <param name="externalMessageChannel">Instance of <see cref="IExternalKernelProcessMessageChannel"/></param>
public ExternalMessageBufferActor(ActorHost host, IExternalKernelProcessMessageChannel externalMessageChannel) : base(host)
{
this._externalMessageChannel = externalMessageChannel;
}

public async Task EmitExternalEventAsync(string externalTopicEvent, object? eventData)
{
await this._externalMessageChannel.EmitExternalEventAsync(externalTopicEvent, eventData).ConfigureAwait(false);
}

protected override async Task OnDeactivateAsync()
{
await this._externalMessageChannel.Uninitialize().ConfigureAwait(false);
}

protected override async Task OnActivateAsync()
{
await this._externalMessageChannel.Initialize().ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;

namespace Microsoft.SemanticKernel;

/// <summary>
/// Class used to allow using <see cref="IExternalEventBuffer"/> as <see cref="IExternalKernelProcessMessageChannelEmitter"/>
/// in SK Process shared abstractions
/// </summary>
public class ExternalMessageBufferActorWrapper : IExternalKernelProcessMessageChannelEmitter
{
private readonly IExternalMessageBuffer _actor;

/// <summary>
/// Constructor to wrap <see cref="IExternalMessageBuffer"/> as <see cref="IExternalKernelProcessMessageChannelEmitter"/>
/// </summary>
/// <param name="actor">The actor host.</param>
public ExternalMessageBufferActorWrapper(IExternalMessageBuffer actor)
{
this._actor = actor;
}

/// <inheritdoc cref="IExternalMessageBuffer.EmitExternalEventAsync(string, object?)"/>
public async Task EmitExternalEventAsync(string externalTopicEvent, object? eventData)
{
await this._actor.EmitExternalEventAsync(externalTopicEvent, eventData).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,15 @@ protected virtual async ValueTask ActivateStepAsync()
this._functions.Add(f.Name, f);
}

// Creating external process channel actor to be used for external messaging by some steps
IExternalKernelProcessMessageChannelEmitter? externalMessageChannelActor = null;
var scopedExternalMessageBufferId = this.ScopedActorId(new ActorId(this.Id.GetId()));
var actor = this.ProxyFactory.CreateActorProxy<IExternalMessageBuffer>(scopedExternalMessageBufferId, nameof(ExternalMessageBufferActor));
externalMessageChannelActor = new ExternalMessageBufferActorWrapper(actor);

// Initialize the input channels
this._initialInputs = this.FindInputChannels(this._functions, this._logger);
// TODO: only need to pass conditionally external channel to specific steps - new step type?
this._initialInputs = this.FindInputChannels(this._functions, this._logger, externalMessageChannelActor);
this._inputs = this._initialInputs.ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.ToDictionary(kvp => kvp.Key, kvp => kvp.Value));

// Activate the step with user-defined state if needed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;
using Dapr.Actors;

namespace Microsoft.SemanticKernel;

// estenori-note:
// for some reason dapr doesn't like if instead public interface IExternalMessageBuffer : IActor, IExternalKernelProcessMessageChannelBase
// instead defining the interface component is necessary. To make it compatible with shared components a "casting" to IExternalKernelProcessMessageChannelEmitter
// is added in StepActor logic to make use of FindInputChannels

/// <summary>
/// An interface for <see cref="IExternalKernelProcessMessageChannelEmitter"/>
/// </summary>
public interface IExternalMessageBuffer : IActor
{
/// <summary>
/// Emits external events outside of the SK process
/// </summary>
/// <param name="externalTopicEvent"></param>
/// <param name="eventData"></param>
/// <returns></returns>
abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public static void AddProcessActors(this ActorRuntimeOptions actorOptions)
actorOptions.Actors.RegisterActor<EventBufferActor>();
actorOptions.Actors.RegisterActor<MessageBufferActor>();
actorOptions.Actors.RegisterActor<ExternalEventBufferActor>();
actorOptions.Actors.RegisterActor<ExternalMessageBufferActor>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ public static void InitializeUserState(this KernelProcessStepState stateObject,
/// <param name="channel">The source channel to evaluate</param>
/// <param name="functions">A dictionary of KernelFunction instances.</param>
/// <param name="logger">An instance of <see cref="ILogger"/>.</param>
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannelEmitter"/></param>
/// <returns><see cref="Dictionary{TKey, TValue}"/></returns>
/// <exception cref="InvalidOperationException"></exception>
public static Dictionary<string, Dictionary<string, object?>?> FindInputChannels(this IKernelProcessMessageChannel channel, Dictionary<string, KernelFunction> functions, ILogger? logger)
public static Dictionary<string, Dictionary<string, object?>?> FindInputChannels(
this IKernelProcessMessageChannel channel,
Dictionary<string, KernelFunction> functions,
ILogger? logger,
IExternalKernelProcessMessageChannelEmitter? externalMessageChannel = null)
{
if (functions is null)
{
Expand All @@ -126,7 +131,7 @@ public static void InitializeUserState(this KernelProcessStepState stateObject,
// and are instantiated here.
if (param.ParameterType == typeof(KernelProcessStepContext))
{
inputs[kvp.Key]![param.Name] = new KernelProcessStepContext(channel);
inputs[kvp.Key]![param.Name] = new KernelProcessStepContext(channel, externalMessageChannel);
}
else
{
Expand Down
Loading