使用azure逻辑应用程序以字符串形式将消息发送到azure服务总线

使用azure逻辑应用程序以字符串形式将消息发送到azure服务总线

问题描述:

我正在使用逻辑应用程序操作发送消息"将消息发送到服务总线主题.在控制台应用程序中读取时,如果执行此操作:

I am sending a message to a service bus topic using the logic app action "Send Message". When reading it in a console application, if i do this:

SubscriptionClient subClient = SubscriptionClient.CreateFromConnectionString(connstr, topicName, subscriptionName);
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = true; 
options.MaxConcurrentCalls = 1;  
subClient.OnMessage((message) => {
    string sjson = null;
    try
    {
        sjson = message.GetBody<string>();
        Console.WriteLine(sjson);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}, options);

它引发以下异常:

System.Runtime.Serialization.SerializationException: There was an error deserializing the object of type System.String. The input source is not correctly formatted. 
---> System.Xml.XmlException: The input source is not correctly formatted.
at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3)
at System.Xml.XmlBufferReader.ReadValue(XmlBinaryNodeType nodeType, ValueHandle value)at System.Xml.XmlBinaryReader.ReadNode() 
at System.Xml.XmlBinaryReader.Read() 
at System.Xml.XmlBaseReader.IsStartElement() 
at System.Xml.XmlBaseReader.IsStartElement(XmlDictionaryString localName, XmlDictionaryString namespaceUri)  
at System.Runtime.Serialization.XmlReaderDelegator.IsStartElement(XmlDictionaryString localname, XmlDictionaryString ns) 
at System.Runtime.Serialization.XmlObjectSerializer.IsRootElement(XmlReaderDelegator reader, DataContract contract, XmlDictionaryString name, XmlDictionaryString ns)
at System.Runtime.Serialization.DataContractSerializer.InternalIsStartObject(XmlReaderDelegator reader)  
at System.Runtime.Serialization.DataContractSerializer.InternalReadObject(XmlReaderDelegator xmlReader, Boolean verifyObjectName, DataContractResolver dataContractResolver) 
at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)--- End of inner exception stack trace ---at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)at System.Runtime.Serialization.DataContractSerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)  
at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)  
at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlReader reader, Boolean verifyObjectName)at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName)  
at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlDictionaryReader reader)at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(Stream stream) 
at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T](XmlObjectSerializer serializer)  
at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T]() 

事实证明,Logic App以流而不是字符串的形式发送消息,这就是引发异常的原因.因为,如果执行此操作,则控制台应用程序能够读取消息:

It turns out that the Logic App sends the message as a Stream instead of a string, that's why the exception is being thrown. Because, the console application is able to read the message if I do this:

SubscriptionClient subClient = SubscriptionClient.CreateFromConnectionString(connstr, topicName, subscriptionName);
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = true; 
options.MaxConcurrentCalls = 1;  
subClient.OnMessage((message) => {
    Stream stream;
    StreamReader reader;
    string messageJson;
    try
    {
        stream = message.GetBody<Stream>();
        reader = new StreamReader(stream);
        messageJson = reader.ReadToEnd();
        Console.WriteLine(messageJson);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}, options);

所以,我的问题是,有没有办法使逻辑应用程序以字符串而不是流的形式发送消息?还是逻辑应用程序的局限性?

So, my question is, is there a way to make the logic app send the message as a string instead of a stream? Or is it a limitation of the Logic App?

在Logic App操作中,我尝试同时使用"application/json","System.String"和"text/plain"作为消息的内容类型,

I have tried with both "application/json", "System.String" and "text/plain" as content type of the message in Logic App action, it does not work.

不幸的是,无法更改尸体的递送方式.为了解决这个问题,我们使用帮助程序方法更新了侦听器,该方法试图找出主体是哪种类型并相应地进行提取:

Unfortunately, it isn't possible to change how the body gets delivered. To address this, we updated our listener with a helper method that attempts to figure out which type the body is and extract accordingly:

private static JObject GetBody(BrokeredMessage brokeredMessage)
        {
            string objAsJson;
            object objIsStream;

            brokeredMessage.Properties.TryGetValue("isStream", out objIsStream);
            bool bIsStream = objIsStream != null ? (bool)objIsStream : brokeredMessage.DeliveryCount % 2 == 0; // Default delivery method is String; Retry String as Stream if it fails

            if (bIsStream)
            {
                // Azure Functions and Logic Apps send messages as Stream
                // Service Bus Explorer defaults to Stream but can send as String
                Stream stream = brokeredMessage.GetBody<Stream>();
                StreamReader reader = new StreamReader(stream);
                objAsJson = reader.ReadToEnd();
            }
            else
            {
                // Our services send messages as String
                objAsJson = brokeredMessage.GetBody<string>();
            }

            return JObject.Parse(objAsJson);
        }