Skip to content

Commit

Permalink
.Net: SK Process Cloud Events - Publish Interface abstractions scaffo…
Browse files Browse the repository at this point in the history
…lding (#10222)

### Motivation and Context

Initial change need for Cloud Event support, in particular the
publishing events scenario.

- Initial changes need in abstractions layer
- min changes for Dapr Runtime
- min changes for Local Runtime
- UTs in Local and Dapr Runtime to ensure parity

Coming up changes in separate PRs:
- New abstractions to be used when creating sk ProcessBuilder
- Additional internal plumbing needed
- Samples/Demos with usage

Fixes #9721 

### Description

<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [x] The code builds clean without any errors or warnings
- [x] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [x] All unit tests pass, and I have added new tests where possible
- [x] I didn't break anyone 😄
  • Loading branch information
esttenorio authored Jan 28, 2025
1 parent 9655254 commit 10818c5
Show file tree
Hide file tree
Showing 25 changed files with 493 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;

namespace Microsoft.SemanticKernel;

/// <summary>
/// An interface that provides a channel for emitting external messages from a step.
/// In addition provide common methods like initialization and Uninitialization
/// </summary>
public interface IExternalKernelProcessMessageChannel
{
/// <summary>
/// Initialization of the external messaging channel used
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Initialize();

/// <summary>
/// Uninitialization of the external messaging channel used
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Uninitialize();

/// <summary>
/// Emits the specified event from the step outside the SK process
/// </summary>
/// <param name="externalTopicEvent">name of the topic to be used externally as the event name</param>
/// <param name="eventData">data to be transmitted externally</param>
/// <returns></returns>
public abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ public abstract class KernelProcessContext
/// </summary>
/// <returns>A <see cref="Task{T}"/> where T is <see cref="KernelProcess"/></returns>
public abstract Task<KernelProcess> GetStateAsync();

/// <summary>
/// Gets the instance of <see cref="IExternalKernelProcessMessageChannel"/> used for external messages
/// </summary>
/// <returns></returns>
public abstract Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ namespace Microsoft.SemanticKernel;
public sealed class KernelProcessStepContext
{
private readonly IKernelProcessMessageChannel _stepMessageChannel;
private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel;

/// <summary>
/// Initializes a new instance of the <see cref="KernelProcessStepContext"/> class.
/// </summary>
/// <param name="channel">An instance of <see cref="IKernelProcessMessageChannel"/>.</param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel)
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannel"/></param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
this._stepMessageChannel = channel;
this._externalMessageChannel = externalMessageChannel;
}

/// <summary>
/// Emit an event from the current step.
/// Emit an SK process event from the current step.
/// </summary>
/// <param name="processEvent">An instance of <see cref="KernelProcessEvent"/> to be emitted from the <see cref="KernelProcessStep"/></param>
/// <returns>A <see cref="ValueTask"/></returns>
Expand All @@ -31,7 +34,7 @@ public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
}

/// <summary>
/// Emit an event from the current step with a simplified method signature.
/// Emit an SK process event from the current step with a simplified method signature.
/// </summary>
/// <param name="eventId"></param>
/// <param name="data"></param>
Expand All @@ -52,4 +55,22 @@ public ValueTask EmitEventAsync(
Visibility = visibility
});
}

/// <summary>
/// Emit an external event to through a <see cref="IExternalKernelProcessMessageChannel"/>
/// component if connected from within the SK process
/// </summary>
/// <param name="topicName"></param>
/// <param name="processEventData"></param>
/// <returns></returns>
/// <exception cref="KernelException"></exception>
public async Task EmitExternalEventAsync(string topicName, object? processEventData = null)
{
if (this._externalMessageChannel == null)
{
throw new KernelException($"External message channel not configured for step with topic {topicName}");
}

await this._externalMessageChannel.EmitExternalEventAsync(topicName, processEventData).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Process.Serialization;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

namespace SemanticKernel.Process.IntegrationTests.Controllers;

Expand Down Expand Up @@ -72,6 +73,23 @@ public async Task<IActionResult> GetProcessAsync(string processId)
return this.Ok(daprProcess);
}

/// <summary>
/// Retrieves current state of the MockCloudEventClient used in the running process
/// </summary>
/// <param name="processId">The Id of the process.</param>
/// <param name="cloudClient">Mock Cloud client ingested via dependency injection</param>
/// <returns></returns>
[HttpGet("processes/{processId}/mockCloudClient")]
public Task<IActionResult> GetMockCloudClient(string processId, MockCloudEventClient cloudClient)
{
if (!s_processes.TryGetValue(processId, out DaprKernelProcessContext? context))
{
return Task.FromResult<IActionResult>(this.NotFound());
}

return Task.FromResult<IActionResult>(this.Ok(cloudClient));
}

/// <summary>
/// Checks the health of the Dapr runtime by attempting to send a message to a health actor.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using Microsoft.SemanticKernel;
using SemanticKernel.Process.IntegrationTests;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

var builder = WebApplication.CreateBuilder(args);

Expand All @@ -15,6 +16,10 @@
// Configure the Kernel with DI. This is required for dependency injection to work with processes.
builder.Services.AddKernel();

// Configure IExternalKernelProcessMessageChannel used for testing purposes
builder.Services.AddSingleton<IExternalKernelProcessMessageChannel>(MockCloudEventClient.Instance);
builder.Services.AddSingleton(MockCloudEventClient.Instance);

// Configure Dapr
builder.Services.AddActors(static options =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Serialization;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

namespace SemanticKernel.Process.IntegrationTests;
internal sealed class DaprTestProcessContext : KernelProcessContext
Expand Down Expand Up @@ -68,4 +69,14 @@ public override Task StopAsync()
{
throw new NotImplementedException();
}

public override async Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync()
{
var response = await this._httpClient.GetFromJsonAsync<MockCloudEventClient>($"http://localhost:5200/processes/{this._processId}/mockCloudClient", options: this._serializerOptions);
return response switch
{
null => throw new InvalidOperationException("Process not found"),
_ => response
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,11 @@ private async Task WaitForHostStartupAsync()
/// <param name="process">The process to start.</param>
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
/// <param name="initialEvent">An optional initial event.</param>
/// <param name="externalMessageChannel">channel used for external messages</param>
/// <returns>A <see cref="Task{KernelProcessContext}"/></returns>
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent)
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
// Actual Kernel injection of Kernel and ExternalKernelProcessMessageChannel is in dotnet\src\Experimental\Process.IntegrationTestHost.Dapr\Program.cs
var context = new DaprTestProcessContext(process, this._httpClient!);
await context.StartWithEventAsync(initialEvent);
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ public class ProcessTestFixture
/// <param name="process">The process to start.</param>
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
/// <param name="initialEvent">An optional initial event.</param>
/// <param name="externalMessageChannel">channel used for external messages</param>
/// <returns>A <see cref="Task{KernelProcessContext}"/></returns>
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent)
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
return await process.StartAsync(kernel, initialEvent);
return await process.StartAsync(kernel, initialEvent, externalMessageChannel);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.SemanticKernel;

namespace SemanticKernel.Process.IntegrationTests.CloudEvents;
/// <summary>
/// Class used for testing purposes to mock emitting external cloud events
/// </summary>
public class MockCloudEventClient : IExternalKernelProcessMessageChannel
{
/// <summary>
/// Initialization counter for testing
/// </summary>
public int InitializationCounter { get; set; } = 0;
/// <summary>
/// Uninitialization counter for testing
/// </summary>
public int UninitializationCounter { get; set; } = 0;
/// <summary>
/// Captures cloud events emitted for testing
/// </summary>
public List<MockCloudEventData> CloudEvents { get; set; } = [];

private static MockCloudEventClient? s_instance = null;

/// <summary>
/// Instance of <see cref="MockCloudEventClient"/> when used as singleton
/// </summary>
public static MockCloudEventClient Instance
{
get
{
return s_instance ??= new MockCloudEventClient();
}
}

/// <inheritdoc/>
public Task EmitExternalEventAsync(string externalTopicEvent, object? eventData)
{
if (eventData != null)
{
this.CloudEvents.Add(new() { TopicName = externalTopicEvent, Data = (string)eventData });
}

return Task.CompletedTask;
}

/// <inheritdoc/>
public ValueTask Initialize()
{
this.InitializationCounter++;
return ValueTask.CompletedTask;
}

/// <inheritdoc/>
public ValueTask Uninitialize()
{
this.UninitializationCounter++;
return ValueTask.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.

namespace SemanticKernel.Process.IntegrationTests.CloudEvents;

/// <summary>
/// Mock cloud event data used for testing purposes only
/// </summary>
public class MockCloudEventData
{
/// <summary>
/// Name of the mock topic
/// </summary>
public required string TopicName { get; set; }

/// <summary>
/// Data emitted in the mock cloud event
/// </summary>
public string? Data { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;
using Microsoft.SemanticKernel;

namespace SemanticKernel.Process.IntegrationTests;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

/// <summary>
/// A step that emits messages externally
/// </summary>
public sealed class MockProxyStep : KernelProcessStep
{
public static class FunctionNames
{
public const string OnRepeatMessage = nameof(OnRepeatMessage);
public const string OnEchoMessage = nameof(OnEchoMessage);
}

public static class TopicNames
{
public const string RepeatExternalTopic = nameof(RepeatExternalTopic);
public const string EchoExternalTopic = nameof(EchoExternalTopic);
}

[KernelFunction(FunctionNames.OnRepeatMessage)]
public async Task OnRepeatMessageAsync(KernelProcessStepContext context, string message)
{
await context.EmitExternalEventAsync(TopicNames.RepeatExternalTopic, message);
}

[KernelFunction(FunctionNames.OnEchoMessage)]
public async Task OnEchoMessageAsync(KernelProcessStepContext context, string message)
{
await context.EmitExternalEventAsync(TopicNames.EchoExternalTopic, message);
}
}

#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
<ItemGroup>
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/TestSettings/**/*.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/**/*Tests.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/CloudEvents/*.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
</ItemGroup>
</Project>
Loading

0 comments on commit 10818c5

Please sign in to comment.