Cosmos Db Feed Await Invokeoperationasync
Azure Cosmos DB Change Feed
In this lab you will use the Change Feed Processor Library and Azure Functions to implement three use cases for the Azure Cosmos DB Change Feed
If this is your first lab and you have not already completed the setup for the lab content see the instructions for Account Setup before starting this lab.
Build A .NET Console App to Generate Data
In order to simulate data flowing into our store, in the form of actions on an e-commerce website, we'll build a simple .NET Console App to generate and add documents to our Cosmos DB CartContainer
-
On your local machine, locate the CosmosLabs folder in your Documents folder and open the
Lab08
folder that will be used to contain the content of your .NET Core project. If you are completing this lab through Microsoft Hands-on Labs, the CosmosLabs folder will be located at the path: C:\labs\CosmosLabs -
In the
Lab08
folder, right-click the folder and select the Open with Code menu option.Alternatively, you can run a terminal in your current directory and execute the
code .
command. -
In the explorer pane on the left, locate the DataGenerator folder and expand it.
-
Select the
program.cs
link in the Explorer pane to open the file in the editor. -
For the
_endpointUrl
variable, replace the placeholder value with the URI value and for the_primaryKey
variable, replace the placeholder value with the PRIMARY KEY value from your Azure Cosmos DB account. Use these instructions to get these values if you do not already have them:- For example, if your url is
https://cosmosacct.documents.azure.com:443/
, your new variable assignment will look like this:
private static readonly string _endpointUrl = "https://cosmosacct.documents.azure.com:443/";
- For example, if your primary key is
elzirrKCnXlacvh1CRAnQdYVbVLspmYHQyYrhx0PltHi8wn5lHVHFnd1Xm3ad5cn4TUcH4U0MSeHsVykkFPHpQ==
, your new variable assignment will look like this:
private static readonly string _primaryKey = "elzirrKCnXlacvh1CRAnQdYVbVLspmYHQyYrhx0PltHi8wn5lHVHFnd1Xm3ad5cn4TUcH4U0MSeHsVykkFPHpQ==";
- For example, if your url is
Create Function to Add Documents to Cosmos DB
The key functionality of the console application is to add documents to our Cosmos DB to simulate activity on our e-commerce website. Here, you'll create a data definition for these documents and define a function to add them
-
Within the
program.cs
file in the DataGenerator folder, locate theAddItem()
method. The purpose of this method is to add an instance of CartAction to our CosmosDB Container.If you'd like to review how to add documents to a CosmosDB container, refer to Lab 01 .
Create a Function to Generate Random Shopping Data
- Within the
Program.cs
file in the DataGenerator folder, locate theGenerateActions()
method. The purpose of this method is to create randomized CartAction objects that you'll consume using the CosmosDB change feed.
Run the Console App and Verify Functionality
You're ready to run the console app, and in this step you'll take a look at your Cosmos DB account to ensure test data is being written as expected.
- Open a terminal window
-
In the terminal pane, enter and execute the following command to run your console app:
cd DataGenerator dotnet run
-
After a brief build process, you should begin to see the asterisks being printed as data is being generated and written to Cosmos DB.
-
Let the console app run for a minute or two and then stop it by pressing any key in the console.
-
Switch to the Azure Portal and your Cosmos DB Account.
-
From within the Azure Cosmos DB blade, select the Data Explorer tab on the left.
-
Expand the StoreDatabase then the CartContainer and select Items. You should see something like the following screenshot.
Note your data will be slightly different since it is random, the important thing is that there is data here at all
Consume Cosmos DB Change Feed via the Change Feed Processor
The two main options for consuming the Cosmos DB change feed are Azure Functions and the Change Feed Processor library. We'll start with the Change Feed Processor via a simple console application
Connect to the Cosmos DB Change Feed
The first use case we'll explore for Cosmos DB Change Feed is Live Migration. A common concern when designing a Cosmos DB container is proper selection of a partition key. You'll recall that we created our CartContainer
with a partition key of /Item
. What if we find out later this key is wrong? Or what if writes work better with /Item
while reads work better with /BuyerState
as the partition key? We can avoid analysis paralysis by using Cosmos DB Change Feed to migrate our data in real time to a second container with a different partition key!
-
Switch back to Visual Studio Code
-
Select the
Program.cs
link under the ChangeFeedConsole folder in the Explorer pane to open the file in the editor. -
For the
_endpointUrl
variable, replace the placeholder value with the URI value and for the_primaryKey
variable, replace the placeholder value with the PRIMARY KEY value from your Azure Cosmos DB account. -
Notice the container configuration value at the top of the
program.cs
file, for the name of the destination container, following_containerId
:private static readonly string _destinationContainerId = "CartContainerByState";
In this case we are going to migrate our data to another container within the same database. The same ideas apply even if we wanted to migrate our data to another database entirely.
-
In order to consume the change feed we make use of a Lease Container. Add the following lines of code in place of
//todo: Add lab code here
to create the lease container:ContainerProperties leaseContainerProperties = new ContainerProperties("consoleLeases", "/id"); Container leaseContainer = await database.CreateContainerIfNotExistsAsync(leaseContainerProperties, throughput: 400);
The Lease Container stores information to allow for parallel processing of the change feed, and acts as a book mark for where we last processed changes from the feed.
-
Now, add the following lines of code directly after the leaseContainer definition in order to get an instance of the change processor:
var builder = container.GetChangeFeedProcessorBuilder("migrationProcessor", (IReadOnlyCollection<object> input, CancellationToken cancellationToken) => { Console.WriteLine(input.Count + " Changes Received"); //todo: Add processor code here }); var processor = builder .WithInstanceName("changeFeedConsole") .WithLeaseContainer(leaseContainer) .Build();
Each time a set of changes is received, the
Func<T>
defined inCreateChangeFeedProcessorBuilder
will be called. We're skipping the handling of those changes for the moment. -
In order for our processor to run, we have to start it. Following the definition of processor add the following line of code:
await processor.StartAsync();
-
Finally, when a key is pressed to terminate the processor we need to end it. Locate the
//todo: Add stop code here
line and replace it with this code:await processor.StopAsync();
-
At this point, your
Program.cs
file should look like this:using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; namespace ChangeFeedConsole { class Program { private static readonly string _endpointUrl = "<your-endpoint-url>"; private static readonly string _primaryKey = "<your-primary-key>"; private static readonly string _databaseId = "StoreDatabase"; private static readonly string _containerId = "CartContainer"; private static readonly string _destinationContainerId = "CartContainerByState"; private static CosmosClient _client = new CosmosClient(_endpointUrl, _primaryKey); static async Task Main(string[] args) { Database database = _client.GetDatabase(_databaseId); Container container = db.GetContainer(_containerId); Container destinationContainer = db.GetContainer(_destinationContainerId); ContainerProperties leaseContainerProperties = new ContainerProperties("consoleLeases", "/id"); Container leaseContainer = await db.CreateContainerIfNotExistsAsync(leaseContainerProperties, throughput: 400); var builder = container.GetChangeFeedProcessorBuilder("migrationProcessor", (IReadOnlyCollection<object> input, CancellationToken cancellationToken) => { Console.WriteLine(input.Count + " Changes Received"); //todo: Add processor code here }); var processor = builder .WithInstanceName("changeFeedConsole") .WithLeaseContainer(leaseContainer) .Build(); await processor.StartAsync(); Console.WriteLine("Started Change Feed Processor"); Console.WriteLine("Press any key to stop the processor..."); Console.ReadKey(); Console.WriteLine("Stopping Change Feed Processor"); await processor.StopAsync(); } } }
Complete the Live Data Migration
-
Within the
program.cs
file in the ChangeFeedConsole folder, locate the todo we left ourselves//todo: Add processor code here
-
Modify the signature of the
Func<T>
in theGetChangeFeedProcessorBuilder
replacingobject
withCartAction
as follows:var builder = container.GetChangeFeedProcessorBuilder("migrationProcessor", (IReadOnlyCollection<CartAction> input, CancellationToken cancellationToken) => { Console.WriteLine(input.Count + " Changes Received"); //todo: Add processor code here });
-
The input is a collection of CartAction documents that have changed. To migrate them, we'll simply loop through them and write them out to our destination container. Replace the
//todo: Add processor code here
with the following code:var tasks = new List<Task>(); foreach (var doc in input) { tasks.Add(destinationContainer.CreateItemAsync(doc, new PartitionKey(doc.BuyerState))); } return Task.WhenAll(tasks);
Test to Confirm the Change Feed Function Works
Now that we have our first Change Feed consumer, we're ready to run a test and confirm that it works
-
Open a second terminal window and navigate to the ChangeFeedConsole folder
-
Start up your console app by running the following commands in the second terminal window:
cd ChangeFeedConsole dotnet run
-
Once the function starts running you'll see the following messages in your console:
Started Change Feed Processor Press any key to stop the processor...
Because this is the first we've run this consumer, there will be no data to consume. We'll start the data generator in order to start receiving changes.
-
In the first terminal window, navigate to the DataGenerator folder
-
Start the DataGenerator again by running the following command in the first terminal window
dotnet run
-
You should see the asterisks start to appear again as the data is being written.
-
Soon after data starts being written, you'll start to see the following output in the second terminal window:
100 Changes Received 100 Changes Received 3 Changes Received ...
-
After a few minutes, navigate to the cosmosdblab Data Explorer and expand StoreDatabase then CartContainerByState and select Items. You should see items populating there, and note that the Partition Key this time is
/BuyerState
. -
Press any key in the first terminal to stop data generation
-
Let the ChangeFeedConsole finish running (it shouldn't take very long). You'll know it's done when it stops writing new log messages. Stop the function by pressing any key in the second terminal window.
You've now written your first Cosmos DB Change Feed consumer, which writes live data to a new collection. Congrats! In the next steps we'll take a look at using Azure Functions to consume Cosmos DB change feed for two additional use cases.
Create an Azure Function to Consume Cosmos DB Change Feed
One of the interesting features of Azure Cosmos DB is its change feed. The change feed provides support for many scenarios, three of which we'll investigate further in this lab.
Create a .NET Core Azure Functions Project
In this exercise, we will implement .NET SDK's change feed processor library to read Azure Cosmos DB's change feed in in a scalable and fault-tolerant way. Azure Functions provide a quick and easy way to hook up with the Cosmos DB Change Feed, by implementing the change feed processor out of the box. You'll start by setting up a.NET Core Azure Functions project.
For more information, please read the doc.
-
Open a terminal window and navigate to the Lab08 folder you've been using for this lab.
-
To install command line support for Azure Functions, you'll need
node.js
. -
In your terminal pane, check your node version by running the following
node --version
If you do not have node.js installed download it here. If you are using a version older than
8.5
run the following:npm i -g node@latest
-
Enter and execute the following command to download the Azure Function tooling:
npm install -g azure-functions-core-tools
If this command fails, refer to the previous step to setup node.js. You may need to restart your terminal window for these changes to take effect.
-
In your terminal pane, enter and execute the following command. This command creates a new Azure Functions project:
func init ChangeFeedFunctions
-
When prompted, choose the dotnet worker runtime. Use the arrow keys to scroll up and down.
-
Change directory to the
ChangeFeedFunctions
directory created in the previous stepcd ChangeFeedFunctions
-
In your terminal pane, enter and execute the following command:
func new
-
When prompted, select C# from the list of languages. Use the arrow keys to scroll up and down.
-
When prompted, select CosmosDBTrigger from the list of templates. Use the arrow keys to scroll up and down.
-
When prompted, enter the name
MaterializedViewFunction
for the function -
Open the ChangeFeedFunctions.csproj file and update the target framework to .NET Core 3.1
<TargetFramework>netcoreapp3.1</TargetFramework>
-
In your terminal pane, enter and execute the following commands:
dotnet add package Microsoft.Azure.Cosmos dotnet add package Microsoft.NET.Sdk.Functions --version 3.0.9 dotnet add package Microsoft.Azure.WebJobs.Extensions.CosmosDB --version 3.0.7 dotnet add ChangeFeedFunctions.csproj reference ..\\Shared\\Shared.csproj
-
In your terminal pane, build the project:
dotnet build
-
Your first Azure Function has now been created, in Visual Studio Code and note the new ChangeFeedFunctions folder, expand it and explore the local.settings.json, and the MaterializedViewFunction.cs files.
Use Cosmos DB Change Feed for the Materialized View Pattern
The Materialized View pattern is used to generate pre-populated views of data in environments where the source data format is not well suited to the applications requirements. In this example, we'll create a real time collection of sales data aggregated by State that would allow another application to quickly retrieve summary sales data
Create the Materialized View Azure Function
-
Locate the local.settings.json file and select it to open it in the editor.
-
Add a new value
DBConnection
using the Primary Connection String parameter from your Cosmos DB account collected earlier in this lab. The local.settings.json file should like this:{ "IsEncrypted": false, "Values": { "AzureWebJobsStorage": "UseDevelopmentStorage=true", "FUNCTIONS_WORKER_RUNTIME": "dotnet", "DBConnection": "<your-db-connection-string>" } }
-
Select the new
MaterializedViewFunction.cs
file to open it in the editor.The databaseName, collectionName and ConnectionStringSetting refer to the source Cosmos DB account that the function is listening for changes on.
-
Change the databaseName value to
StoreDatabase
-
Change the collectionName value to
CartContainerByState
Cosmos DB Change Feeds are guaranteed to be in order within a partition, so in this case we want to use the Container where the partition is already set to the State,
CartContainerByState
, as our source -
Replace the ConnectionStringSetting placeholder with the new setting you added earlier DBConnection
ConnectionStringSetting = "DBConnection",
-
Between ConnectionStringSetting and LeaseCollectionName add the following line:
CreateLeaseCollectionIfNotExists = true,
-
Change the LeaseCollectionName value to
materializedViewLeases
Lease collections are a critical part of the Cosmos DB Change Feed. They allow multiple instances of a function to operate over a collection and serve as a virtual bookmark for where the function last left off.
-
Your Run function should now look like this:
[FunctionName("MaterializedViewFunction")] public static void Run([CosmosDBTrigger( databaseName: "StoreDatabase", collectionName: "CartContainerByState", ConnectionStringSetting = "DBConnection", CreateLeaseCollectionIfNotExists = true, LeaseCollectionName = "materializedViewLeases")]IReadOnlyList<Document> input, ILogger log) { if (input != null && input.Count > 0) { log.LogInformation("Documents modified " + input.Count); log.LogInformation("First document Id " + input[0].Id); } }
The function works by polling your container on an interval and checking for changes since the last lease time. Each turn of the function may result in multiple documents that have changed, which is why the input is an IReadOnlyList of Documents.
-
Add the following using statements to the top of the
MaterializedViewFunction.cs
file:using System.Threading.Tasks; using System.Linq; using Newtonsoft.Json; using Microsoft.Azure.Cosmos; using Shared;
-
Modify the signature of the Run function to be
async
with aTask
return type. Your function should now look like the following:[FunctionName("MaterializedViewFunction")] public static async Task Run([CosmosDBTrigger( databaseName: "StoreDatabase", collectionName: "CartContainerByState", ConnectionStringSetting = "DBConnection", CreateLeaseCollectionIfNotExists = true, LeaseCollectionName = "materializedViewLeases")]IReadOnlyList<Document> input, ILogger log) { if (input != null && input.Count > 0) { log.LogInformation("Documents modified " + input.Count); log.LogInformation("First document Id " + input[0].Id); } }
-
Your target this time is the container called StateSales. Add the following lines to the top of the MaterializedViewFunction to setup the destination connection. Be sure to replace the endpoint url and the key.
private static readonly string _endpointUrl = "<your-endpoint-url>"; private static readonly string _primaryKey = "<your-primary-key>"; private static readonly string _databaseId = "StoreDatabase"; private static readonly string _containerId = "StateSales"; private static CosmosClient _client = new CosmosClient(_endpointUrl, _primaryKey);
Add a new Class for StateSales Data
-
Open
DataModel.cs
within the Shared folder in the editor -
Following the definition of the CartAction class, add a new class as follows:
public class StateCount { [JsonProperty("id")] public string Id { get; set; } public string State { get; set; } public int Count { get; set; } public double TotalSales { get; set; } public StateCount() { Id = Guid.NewGuid().ToString(); } }
Update the MaterializedViewFunction to Create the Materialized View
The Azure Function receives a list of Documents that have changed. We want to organize this list into a dictionary keyed off of the state of each document and keep track of the total price and count of items purchased. We'll use this dictionary later to write data to our materialized view collection StateSales
-
Switch back to the MaterializedViewFunction.cs file in the editor
-
Locate the following section in the code for MaterializedViewFunction.cs
if (input != null && input.Count > 0) { log.LogInformation("Documents modified " + input.Count); log.LogInformation("First document Id " + input[0].Id); }
-
Replace the two logging lines with the following code:
var stateDict = new Dictionary<string, List<double>>(); foreach (var doc in input) { var action = JsonConvert.DeserializeObject<CartAction>(doc.ToString()); if (action.Action != ActionType.Purchased) { continue; } if (stateDict.ContainsKey(action.BuyerState)) { stateDict[action.BuyerState].Add(action.Price); } else { stateDict.Add(action.BuyerState, new List<double> { action.Price }); } }
-
Following the conclusion of this foreach loop, add this code to connect to our destination container:
var database = _client.GetDatabase(_databaseId); var container = database.GetContainer(_containerId); //todo - Next steps go here
-
Because we're dealing with an aggregate collection, we'll be either creating or updating a document for each entry in our dictionary. For starters, we need to check to see if the document we care about exists. Add the following code after the
todo
line above:var tasks = new List<Task>(); foreach (var key in stateDict.Keys) { var query = new QueryDefinition("select * from StateSales s where s.State = @state").WithParameter("@state", key); var resultSet = container.GetItemQueryIterator<StateCount>(query, requestOptions: new QueryRequestOptions() { PartitionKey = new Microsoft.Azure.Cosmos.PartitionKey(key), MaxItemCount = 1 }); while (resultSet.HasMoreResults) { var stateCount = (await resultSet.ReadNextAsync()).FirstOrDefault(); if (stateCount == null) { //todo: Add new doc code here } else { //todo: Add existing doc code here } //todo: Upsert document } } await Task.WhenAll(tasks);
Take note of the maxItemCount on the CreateItemQuery call. We're only expecting a single result at most because each state has at most one document.
-
In the case that the stateCount object is null we'll create a new one. Replace the
//todo: Add new doc code here
section with the following code:stateCount = new StateCount(); stateCount.State = key; stateCount.TotalSales = stateDict[key].Sum(); stateCount.Count = stateDict[key].Count;
-
In the case that the stateCount object exists, we'll update it. Replace the
//todo: Add existing doc code here
section with the following code:stateCount.TotalSales += stateDict[key].Sum(); stateCount.Count += stateDict[key].Count;
-
Finally, we'll do an upsert (Update or Insert) operation on our destination Cosmos DB account. Replace the
//todo: Upsert document
section with the following code:log.LogInformation("Upserting materialized view document"); tasks.Add(container.UpsertItemAsync(stateCount, new Microsoft.Azure.Cosmos.PartitionKey(stateCount.State)));
We're using a list of tasks here because we can do our upserts in parallel.
-
Your MaterializedViewFunction should now look like this:
using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Azure.Documents; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Host; using Microsoft.Extensions.Logging; using System.Linq; using Newtonsoft.Json; using Microsoft.Azure.Cosmos; using Shared; namespace ChangeFeedFunctions { public static class MaterializedViewFunction { private static readonly string _endpointUrl = "<your-endpoint-url>"; private static readonly string _primaryKey = "<primary-key>"; private static readonly string _databaseId = "StoreDatabase"; private static readonly string _containerId = "StateSales"; private static CosmosClient _client = new CosmosClient(_endpointUrl, _primaryKey); [FunctionName("MaterializedViewFunction")] public static async Task Run([CosmosDBTrigger( databaseName: "StoreDatabase", collectionName: "CartContainerByState", ConnectionStringSetting = "DBConnection", CreateLeaseCollectionIfNotExists = true, LeaseCollectionName = "materializedViewLeases")]IReadOnlyList<Document> input, ILogger log) { if (input != null && input.Count > 0) { var stateDict = new Dictionary<string, List<double>>(); foreach (var doc in input) { var action = JsonConvert.DeserializeObject<CartAction>(doc.ToString()); if (action.Action != ActionType.Purchased) { continue; } if (stateDict.ContainsKey(action.BuyerState)) { stateDict[action.BuyerState].Add(action.Price); } else { stateDict.Add(action.BuyerState, new List<double> { action.Price }); } } var database = _client.GetDatabase(_databaseId); var container = database.GetContainer(_containerId); var tasks = new List<Task>(); foreach (var key in stateDict.Keys) { var query = new QueryDefinition("select * from StateSales s where s.State = @state").WithParameter("@state", key); var resultSet = container.GetItemQueryIterator<StateCount>(query, requestOptions: new QueryRequestOptions() { PartitionKey = new Microsoft.Azure.Cosmos.PartitionKey(key), MaxItemCount = 1 }); while (resultSet.HasMoreResults) { var stateCount = (await resultSet.ReadNextAsync()).FirstOrDefault(); if (stateCount == null) { stateCount = new StateCount(); stateCount.State = key; stateCount.TotalSales = stateDict[key].Sum(); stateCount.Count = stateDict[key].Count; } else { stateCount.TotalSales += stateDict[key].Sum(); stateCount.Count += stateDict[key].Count; } log.LogInformation("Upserting materialized view document"); tasks.Add(container.UpsertItemAsync(stateCount, new Microsoft.Azure.Cosmos.PartitionKey(stateCount.State))); } } await Task.WhenAll(tasks); } } } }
Test to Confirm the Materialized View Functions Works
-
Open three terminal windows.
-
In the first terminal window, navigate to the DataGenerator folder
-
Start the DataGenerator by entering and executing the following in the first terminal window:
dotnet run
-
In a second terminal window, navigate to the ChangeFeedConsole folder
-
Start the ChangeFeedConsole consumer by entering and executing the following in the second terminal window:
dotnet run
-
In the third terminal window, navigate to the ChangeFeedFunctions folder
-
In the third terminal window, start the Azure Functions by entering and executing the following:
func host start
If prompted, select Allow access
Data will pass from DataGenerator > CartContainer > ChangeFeedConsole > CartContainerByState > MaterializedViewFunction > StateSales
-
You should see the asterisks in the first window as data is being generated, and in the second and third windows you should see console messages indicating that your functions are running.
-
Open a browser window and navigate to the Cosmos DB resource Data Explorer
-
Expand StoreDatabase, then StateSales and select Items
-
You should see data being populated in the container by state, select on an item to see the contents of the data.
-
In the first terminal window, press any key to stop data generation
-
In the second terminal window, press any key to stop data migration
-
In the third terminal window, let the function finish processing data by waiting for the console log messages to stop. It should only take a few seconds. Then press
Ctrl+C
to end execution of the functions.
Use Azure Cosmos DB Change Feed to Write Data to EventHub using Azure Functions
In the final example of a Change Feed use case in this lab, you'll write a simple Azure Function to write out change data to an Azure Event Hub. You'll use a stream Processor to create real-time data outputs that you can consume in Power BI to build an e-commerce dashboard.
Create a Power BI Account (Optional)
This step is optional, if you do not wish to follow the lab to creating the dashboard you can skip it
To sign up for a Power BI account, visit the Power BI site and select Sign up free.
- Once logged in, create a new workspace called CosmosDB
Retrieve Azure Event Hub Connection Info
-
Switch to the Azure Portal
-
On the left side of the portal, select the Resource groups link.
-
In the Resource groups blade, locate and select the cosmoslabs resource group.
-
In the cosmoslabs resource blade, and select the Event Hub namespace.
-
In the Event Hub blade, find Shared Access Policies under Settings and select it
-
In the Shared Access Policies blade, select the policy RootManageSharedAccessKey
-
In the panel that appears, copy the value for Connection string-primary key and save it for use later in this lab.
Create Outputs for the Azure Stream Analytics Job
This step is optional, if you do not wish to connect to Power BI to visualize your Event Hub, you may skip it
-
Return to the cosmoslabs blade in the browser
-
In the cosmoslabs resource blade and select the stream analytics job
-
Select Outputs on the CartStreamProcessor Overview Screen
-
At the top of the Outputs page, select +Add and choose Power BI
-
Select the Authorize button and follow the login prompts to authorize this output in your Power BI account
-
In the window that appears enter the following data
-
Set Output alias to
averagePriceOutput
-
Set Group workspace to
CosmosDB
or whatever name you used when you created a new workspace in Power BI -
Set Dataset name to
averagePrice
-
Set Table name to
averagePrice
-
Set Authentication mode to
User token
-
Select Save
-
-
Repeat the previous step to add a second output
-
Set Output alias to
incomingRevenueOutput
-
Set Group workspace to
cosmosdb
-
Set Dataset name to
incomingRevenue
-
Set Table name to
incomingRevenue
-
Set Authentication mode to
User token
-
Select Save
-
-
Repeat the previous step to add a third output
-
Set Output alias to
top5Output
-
Set Group workspace to
cosmosdb
-
Set Dataset name to
top5
-
Set Table name to
top5
-
Set Authentication mode to
User token
-
Select Save
-
-
Repeat the previous step add a fourth (and final) output
-
Set Output alias to
uniqueVisitorCountOutput
-
Set Group workspace to
cosmosdb
-
Set Dataset name to
uniqueVisitorCount
-
Set Table name to
uniqueVisitorCount
-
Set Authentication mode to
User token
-
Select Save
-
-
Once you've completed these steps, the Outputs blade should look like this:
Create an Azure Function to write data to the Event Hub
With all of the configuration out of the way, you'll see how simple it is to write an Azure Function to write change data to your new Event Hub in real time
-
Open a terminal window and navigate to the the ChangeFeedFunctions folder
-
Create a new function by entering and executing the following command:
func new
-
When prompted select CosmosDBTrigger as the template
-
When prompted enter
AnalyticsFunction
as the name
-
-
Add the Microsoft Azure Event Hubs NuGet Package by entering and executing the following:
dotnet add package Microsoft.Azure.EventHubs --version 4.3.0
-
Select new AnalyticsFunction.cs file to open it in the editor
-
Add the following using statements to the top of the AnalyticsFunction.cs file
using Microsoft.Azure.EventHubs; using System.Threading.Tasks; using System.Text;
-
Modify the signature of the Run function by setting
- databaseName to
StoreDatabase
- collectionName to
CartContainer
- ConnectionStringSetting to
DBConnection
- LeaseCollectionName to
analyticsLeases
.
- databaseName to
-
In between the ConnectionStringSetting and LeaseCollectionName add the following line:
CreateLeaseCollectionIfNotExists = true,
-
Modify the Run function to be
async
. The code file should now look like this:using System.Collections.Generic; using Microsoft.Azure.Documents; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Host; using Microsoft.Extensions.Logging; using Microsoft.Azure.EventHubs; using System.Threading.Tasks; using System.Text; namespace ChangeFeedFunctions { public static class AnalyticsFunction { [FunctionName("AnalyticsFunction")] public static async Task Run([CosmosDBTrigger( databaseName: "StoreDatabase", collectionName: "CartContainer", ConnectionStringSetting = "DBConnection", CreateLeaseCollectionIfNotExists = true, LeaseCollectionName = "analyticsLeases")]IReadOnlyList<Document> input, ILogger log) { if (input != null && input.Count > 0) { log.LogInformation("Documents modified " + input.Count); log.LogInformation("First document Id " + input[0].Id); } } } }
-
At the top of the class, add the following configuration parameters:
private static readonly string _eventHubConnection = "<event-hub-connection>"; private static readonly string _eventHubName = "carteventhub";
-
Replace the placeholder in _eventHubConnection with the value of the Event Hubs Connection string-primary key you collected earlier.
-
Start by creating an EventHubClient by replacing the two logging lines with the following code:
var sbEventHubConnection = new EventHubsConnectionStringBuilder(_eventHubConnection){ EntityPath = _eventHubName }; var eventHubClient = EventHubClient.CreateFromConnectionString(sbEventHubConnection.ToString()); //todo: Next steps here
-
For each document that changed we want to write the data out to the Event Hub. Fortunately, we configured our Event Hub to expect JSON data so there's very little processing to do here. Add the following code snippet.
var tasks = new List<Task>(); foreach (var doc in input) { var json = doc.ToString(); var eventData = new EventData(Encoding.UTF8.GetBytes(json)); log.LogInformation("Writing to Event Hub"); tasks.Add(eventHubClient.SendAsync(eventData)); } await Task.WhenAll(tasks);
-
The final version of the AnalyticsFunction looks like this:
using System.Collections.Generic; using Microsoft.Azure.Documents; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.Host; using Microsoft.Extensions.Logging; using Microsoft.Azure.EventHubs; using System.Threading.Tasks; using System.Text; namespace ChangeFeedFunctions { public static class AnalyticsFunction { private static readonly string _eventHubConnection = "<your-connection-string>"; private static readonly string _eventHubName = "carteventhub"; [FunctionName("AnalyticsFunction")] public static async Task Run([CosmosDBTrigger( databaseName: "StoreDatabase", collectionName: "CartContainer", ConnectionStringSetting = "DBConnection", CreateLeaseCollectionIfNotExists = true, LeaseCollectionName = "analyticsLeases")]IReadOnlyList<Document> input, ILogger log) { if (input != null && input.Count > 0) { var sbEventHubConnection = new EventHubsConnectionStringBuilder(_eventHubConnection) { EntityPath = _eventHubName }; var eventHubClient = EventHubClient.CreateFromConnectionString(sbEventHubConnection.ToString()); var tasks = new List<Task>(); foreach (var doc in input) { var json = doc.ToString(); var eventData = new EventData(Encoding.UTF8.GetBytes(json)); log.LogInformation("Writing to Event Hub"); tasks.Add(eventHubClient.SendAsync(eventData)); } await Task.WhenAll(tasks); } } } }
Creating a Power BI Dashboard to Test the AnalyticsFunction
-
Once again, open three terminal windows.
-
In the first terminal window navigate to the DataGenerator folder
-
In the first terminal window start the data generation process by entering and executing the following line:
dotnet run
-
In the second terminal window navigate to the ChangeFeedFunctions folder
-
In the second terminal window, start the Azure Functions by entering and executing the following line:
func host start
-
In the third terminal window navigate to the ChangeFeedConsole folder
-
In the third terminal window, start the change feed console processor by entering and executing the following line:
dotnet run
The remaining steps are optional, if you do not wish to visualize the Event Hub output data in Power BI, you may skip them
-
Confirm the data generator is running and that the Azure Functions and Console Change Processor are firing before proceeding to the next steps
-
Return to the CartStreamProcessor overview screen and select the Start button at the top to start the processor. When prompted choose to start the output now. Starting the processor may take several minutes.
[!TIP] If the Stream Analytics Job fails to start it may be due to a bad connection to Event Hubs. To correct this to go Inputs in the Stream Analytics Job, then note the Service Bus namespace and Event Hub name, then delete the
cartInput
connection to the Event Hub and recreate it.
Wait for the processor to start before continuing
-
Open a web browser and navigate to the Power BI website.
-
Sign in, and choose CosmosDB from the left hand section
-
In the top right of the screen select Create and choose Dashboard give the dashboard any Name
-
In the Dashboard screen, select Add tile from the top
-
Choose Custom Streaming Data and hit Next
-
Choose averagePrice from the Add custom streaming data tile window
-
From Visualization Type select Clustered bar char
-
Under Axis select Add Value and select Action
-
Under Value select Add value and select AVG
-
Select Next
- Give it a name like
Average Price
and select Apply
-
-
Follow these same steps to add tiles for the remaining three inputs
-
For incomingRevenue select a Line chart with Axis set to
Time
and Values set toSum
. Set Time window to display to at least 30 minutes. -
For uniqueVisitors select a Card with Fields set to
uniqueVisitors
-
For top5 select a Clustered column chart with Axis set to
Item
and Value set tocountEvents
-
-
When complete, you'll have a dashboard that looks like the image below, updating in real time!
If this is your final lab, follow the steps in Removing Lab Assets to remove all lab resources.
Source: https://azurecosmosdb.github.io/labs/dotnet/labs/08-change_feed_with_azure_functions.html
0 Response to "Cosmos Db Feed Await Invokeoperationasync"
Post a Comment