Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.Net: SK Process Cloud Events - Publish Interface abstractions scaffolding #10222

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;

namespace Microsoft.SemanticKernel;

/// <summary>
/// An interface that provides a channel for emitting external messages from a step.
/// In addition provide common methods like initialization and Uninitialization
/// </summary>
public interface IExternalKernelProcessMessageChannel
{
/// <summary>
/// Initialization of the external messaging channel used
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Initialize();

/// <summary>
/// Uninitialization of the external messaging channel used
/// </summary>
/// <returns>A <see cref="ValueTask"/></returns>
public abstract ValueTask Uninitialize();

/// <summary>
/// Emits the specified event from the step outside the SK process
/// </summary>
/// <param name="externalTopicEvent">name of the topic to be used externally as the event name</param>
/// <param name="eventData">data to be transmitted externally</param>
/// <returns></returns>
public abstract Task EmitExternalEventAsync(string externalTopicEvent, object? eventData);
esttenorio marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@
/// </summary>
/// <returns>A <see cref="Task{T}"/> where T is <see cref="KernelProcess"/></returns>
public abstract Task<KernelProcess> GetStateAsync();

public abstract Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync();

Check failure on line 31 in dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

Missing XML comment for publicly visible type or member 'KernelProcessContext.GetExternalMessageChannelAsync()'

Check failure on line 31 in dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, ubuntu-latest, Release, true, integration)

Missing XML comment for publicly visible type or member 'KernelProcessContext.GetExternalMessageChannelAsync()'

Check failure on line 31 in dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, windows-latest, Release)

Missing XML comment for publicly visible type or member 'KernelProcessContext.GetExternalMessageChannelAsync()'

Check failure on line 31 in dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, windows-latest, Debug)

Missing XML comment for publicly visible type or member 'KernelProcessContext.GetExternalMessageChannelAsync()'

Check failure on line 31 in dotnet/src/Experimental/Process.Abstractions/KernelProcessContext.cs

View workflow job for this annotation

GitHub Actions / dotnet-build-and-test (8.0, windows-latest, Debug)

Missing XML comment for publicly visible type or member 'KernelProcessContext.GetExternalMessageChannelAsync()'
esttenorio marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ namespace Microsoft.SemanticKernel;
public sealed class KernelProcessStepContext
{
private readonly IKernelProcessMessageChannel _stepMessageChannel;
private readonly IExternalKernelProcessMessageChannel? _externalMessageChannel;

/// <summary>
/// Initializes a new instance of the <see cref="KernelProcessStepContext"/> class.
/// </summary>
/// <param name="channel">An instance of <see cref="IKernelProcessMessageChannel"/>.</param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel)
/// <param name="externalMessageChannel">An instance of <see cref="IExternalKernelProcessMessageChannel"/></param>
public KernelProcessStepContext(IKernelProcessMessageChannel channel, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
this._stepMessageChannel = channel;
this._externalMessageChannel = externalMessageChannel;
}

/// <summary>
/// Emit an event from the current step.
/// Emit an SK process event from the current step.
/// </summary>
/// <param name="processEvent">An instance of <see cref="KernelProcessEvent"/> to be emitted from the <see cref="KernelProcessStep"/></param>
/// <returns>A <see cref="ValueTask"/></returns>
Expand All @@ -31,7 +34,7 @@ public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
}

/// <summary>
/// Emit an event from the current step with a simplified method signature.
/// Emit an SK process event from the current step with a simplified method signature.
/// </summary>
/// <param name="eventId"></param>
/// <param name="data"></param>
Expand All @@ -52,4 +55,22 @@ public ValueTask EmitEventAsync(
Visibility = visibility
});
}

/// <summary>
/// Emit an external event to through a <see cref="IExternalKernelProcessMessageChannel"/>
/// component if connected from within the SK process
/// </summary>
/// <param name="externalTopicName"></param>
/// <param name="processEventData"></param>
/// <returns></returns>
/// <exception cref="KernelException"></exception>
public async Task EmitExternalEventAsync(string externalTopicName, object? processEventData = null)
esttenorio marked this conversation as resolved.
Show resolved Hide resolved
{
if (this._externalMessageChannel == null)
{
throw new KernelException($"External message channel not configured for step with topic {externalTopicName}");
}

await this._externalMessageChannel.EmitExternalEventAsync(externalTopicName, processEventData).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.AspNetCore.Mvc;
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Process.Serialization;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

namespace SemanticKernel.Process.IntegrationTests.Controllers;

Expand Down Expand Up @@ -72,6 +73,23 @@ public async Task<IActionResult> GetProcessAsync(string processId)
return this.Ok(daprProcess);
}

/// <summary>
/// Retrieves information about a process.
esttenorio marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <param name="processId">The Id of the process.</param>
/// <param name="cloudClient">Mock Cloud client ingested via dependency injection</param>
/// <returns></returns>
[HttpGet("processes/{processId}/mockCloudClient")]
public async Task<IActionResult> GetMockCloudClientAsync(string processId, MockCloudEventClient cloudClient)
{
if (!s_processes.TryGetValue(processId, out DaprKernelProcessContext? context))
{
return this.NotFound();
}

return this.Ok(cloudClient);
}

/// <summary>
/// Checks the health of the Dapr runtime by attempting to send a message to a health actor.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

using Microsoft.SemanticKernel;
using SemanticKernel.Process.IntegrationTests;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

var builder = WebApplication.CreateBuilder(args);

Expand All @@ -15,6 +16,10 @@
// Configure the Kernel with DI. This is required for dependency injection to work with processes.
builder.Services.AddKernel();

// Configure IExternalKernelProcessMessageChannel used for testing purposes
builder.Services.AddSingleton<IExternalKernelProcessMessageChannel>(MockCloudEventClient.GetInstance());
builder.Services.AddSingleton(MockCloudEventClient.GetInstance());

// Configure Dapr
builder.Services.AddActors(static options =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Serialization;
using SemanticKernel.Process.IntegrationTests.CloudEvents;

namespace SemanticKernel.Process.IntegrationTests;
internal sealed class DaprTestProcessContext : KernelProcessContext
Expand Down Expand Up @@ -68,4 +69,14 @@ public override Task StopAsync()
{
throw new NotImplementedException();
}

public override async Task<IExternalKernelProcessMessageChannel?> GetExternalMessageChannelAsync()
{
var response = await this._httpClient.GetFromJsonAsync<MockCloudEventClient>($"http://localhost:5200/processes/{this._processId}/mockCloudClient", options: this._serializerOptions);
return response switch
{
null => throw new InvalidOperationException("Process not found"),
_ => response
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ private async Task WaitForHostStartupAsync()
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
/// <param name="initialEvent">An optional initial event.</param>
/// <returns>A <see cref="Task{KernelProcessContext}"/></returns>
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent)
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
// Actual Kernel injection of Kernel and ExternalKernelProcessMessageChannel is in dotnet\src\Experimental\Process.IntegrationTestHost.Dapr\Program.cs
var context = new DaprTestProcessContext(process, this._httpClient!);
await context.StartWithEventAsync(initialEvent);
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public class ProcessTestFixture
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
/// <param name="initialEvent">An optional initial event.</param>
/// <returns>A <see cref="Task{KernelProcessContext}"/></returns>
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent)
public async Task<KernelProcessContext> StartProcessAsync(KernelProcess process, Kernel kernel, KernelProcessEvent initialEvent, IExternalKernelProcessMessageChannel? externalMessageChannel = null)
{
return await process.StartAsync(kernel, initialEvent);
return await process.StartAsync(kernel, initialEvent, externalMessageChannel);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.SemanticKernel;

namespace SemanticKernel.Process.IntegrationTests.CloudEvents;
public class MockCloudEventClient : IExternalKernelProcessMessageChannel
{
/// <summary>
/// Initialization counter for testing
/// </summary>
public int InitializationCounter { get; set; } = 0;
/// <summary>
/// Uninitialization counter for testing
/// </summary>
public int UninitializationCounter { get; set; } = 0;
/// <summary>
/// Captures cloud events emitted for testing
/// </summary>
public List<MockCloudEventData> CloudEvents { get; set; }

private static MockCloudEventClient? s_instance = null;

public MockCloudEventClient()
{
this.CloudEvents = [];
}

public static MockCloudEventClient GetInstance()
{
if (s_instance == null)
{
s_instance = new MockCloudEventClient();
}
return s_instance;
}

/// <inheritdoc/>
public Task EmitExternalEventAsync(string externalTopicEvent, object? eventData)
{
this.CloudEvents.Add(new() { TopicName = externalTopicEvent, Data = (string)eventData });
return Task.CompletedTask;
}

/// <inheritdoc/>
public ValueTask Initialize()
{
this.InitializationCounter++;
return ValueTask.CompletedTask;
}

/// <inheritdoc/>
public ValueTask Uninitialize()
{
this.UninitializationCounter++;
return ValueTask.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.

namespace SemanticKernel.Process.IntegrationTests.CloudEvents;

/// <summary>
/// Mock cloud event data used for testing purposes only
/// </summary>
public class MockCloudEventData
{
/// <summary>
/// Name of the mock topic
/// </summary>
public string TopicName { get; set; }

/// <summary>
/// Data emitted in the mock cloud event
/// </summary>
public string? Data { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Threading.Tasks;
using Microsoft.SemanticKernel;

namespace SemanticKernel.Process.IntegrationTests;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

/// <summary>
/// A step that emits messages externally
/// </summary>
public sealed class MockProxyStep : KernelProcessStep
{
public static class FunctionNames
{
public const string OnRepeatMessage = nameof(OnRepeatMessage);
public const string OnEchoMessage = nameof(OnEchoMessage);
}

public static class TopicNames
{
public const string RepeatExternalTopic = nameof(RepeatExternalTopic);
public const string EchoExternalTopic = nameof(EchoExternalTopic);
}

[KernelFunction(FunctionNames.OnRepeatMessage)]
public async Task OnRepeatMessageAsync(KernelProcessStepContext context, string message)
{
await context.EmitExternalEventAsync(TopicNames.RepeatExternalTopic, message);
}

[KernelFunction(FunctionNames.OnEchoMessage)]
public async Task OnEchoMessageAsync(KernelProcessStepContext context, string message)
{
await context.EmitExternalEventAsync(TopicNames.EchoExternalTopic, message);
}
}

#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
<ItemGroup>
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/TestSettings/**/*.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/**/*Tests.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
<Compile Include="$(RepoRoot)/dotnet/src/Experimental/Process.IntegrationTests.Shared/CloudEvents/*.cs" Link="%(RecursiveDir)%(Filename)%(Extension)" />
</ItemGroup>
</Project>
Loading
Loading