当前位置:首页 > 网站源码 > 正文内容

注入卡密验证软件(注入卡密的软件)

网站源码1年前 (2023-09-26)273

【Kafka 】| 总结/Edison Zhou

1可用的Kafka .NET客户端

目前.NET圈子主流使用的是 Confluent.Kafka

confluent-kafka-dotnet: https://github.com/confluentinc/confluent-kafka-dotnet

其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。

因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。

NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:

2基于Confluent.Kafka的Sample

要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。

安装相关组件

在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:

编写KafkaService

编写IKafkaService接口:

Task SubscribeAsync<T>(IEnumerable< string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) whereT : class; }}

编写KafkaService实现类:

publicasyncTask PublishAsync<T>( stringtopicName, T message) whereT : class{varconfig = newProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小为16KLingerMs = 20// 修改等待时间为20ms};using( varproducer = newProducerBuilder< string, string>(config).Build) {awaitproducer.ProduceAsync(topicName, newMessage< string, string> {Key = Guid.NewGuid.ToString,Value = JsonConvert.SerializeObject(message)}); ;}}

展开全文

publicasyncTask SubscribeAsync<T>(IEnumerable< string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) whereT : class{varconfig = newConsumerConfig {BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer", EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起};using( varconsumer = newConsumerBuilder<Ignore, string>(config).Build) {consumer.Subscribe(topics);try{while( true) {try{varconsumeResult = consumer.Consume(cancellationToken); Console.WriteLine( $"Consumed message ' {consumeResult.Message?.Value}' at: ' {consumeResult?.TopicPartitionOffset}'." ); if(consumeResult.IsPartitionEOF) {Console.WriteLine( $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}已经到底了: {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}." ); continue; }T messageResult = null; try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch(Exception ex) {varerrorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value: {consumeResult.Message.Value}】 : {ex.StackTrace?.ToString}" ; Console.WriteLine(errorMessage);messageResult = null; }if(messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) {messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch(KafkaException e) {Console.WriteLine(e.Message);}}}catch(ConsumeException e) {Console.WriteLine( $"Consume error: {e.Error.Reason}" ); }}}catch(OperationCanceledException) {Console.WriteLine( "Closing consumer."); consumer.Close;}}

awaitTask.CompletedTask; }}}

为了方便后续的演示,在此项目中再创建一个类 EventData:

publicstringMessage { get; set; }

publicDateTime EventTime { get; set; } }

编写Producer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:

编写Consumer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:

测试Pub/Sub效果

将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。

3基于CAP项目的Sample

模拟场景说明

Catalog API

新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:

在Startup中的ConfigureServices方法中注入CAP:

......services.AddCap( x=> {x.UseMongoDB( "mongodb://account:password@mongodb-server:27017/products?authSource=admin"); x.UseKafka( "kafka1:9091,kafka2:9092,kafka3:9093"); });}

新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:

privatereadonlyICapPublisher _publisher; privatereadonlyIMapper _mapper;

publicProductController( ICapPublisher publisher, IMapper mapper) {_publisher = publisher;_mapper = mapper;}

[ HttpGet] publicIList<ProductDTO> Get( ) {return_mapper.Map<IList<ProductDTO>>(Products); ; }

注入卡密验证软件(注入卡密的软件)

[ HttpPut] publicasyncTask<IActionResult> UpdatePrice( stringid, decimalnewPrice ) {// 业务代码varproduct = Products.FirstOrDefault(p => p.Id == id); product.Price = newPrice;

// 发布消息await_publisher.PublishAsync( "ProductPriceChanged", newProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});

returnNoContent; }}}

Basket API

参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。

新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。

[ HttpGet] publicIList<MyBasketDTO> Get( ) {returnBaskets; }

[ NonAction] [ CapSubscribe( "ProductPriceChanged") ] publicasyncTask RefreshBasketProductPrice( ProductDTO productDTO) {if(productDTO == null) return;

foreach( varbasket inBaskets) {foreach( varcatalog inbasket.Catalogs) {if(catalog.Product.Id == productDTO.Id) {catalog.Product.Price = productDTO.Price;break; }}}

awaitTask.CompletedTask; }}}

测试效果

同时启动Catalog API 和 Basket API两个项目。

首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。

然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。

最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。

End总结

本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。

参考资料

阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741

麦比乌斯皇,《.NET使用Kafka小结》:https://www.cnblogs.com/hsxian/p/12907542.html

Tony,《.NET Core事件总线解决方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html

极客时间,胡夕《Kafka核心技术与实战》

B站,尚硅谷《Kafka 3.x入门到精通教程》

年终总结:Edison的2020年终总结

数字化转型: 我在传统企业做数字化转型

C#刷题: C#刷剑指Offer算法题系列文章目录

.NET面试:. NET开发面试知识体系

.NET大会: 2020年中国.NET开发者大会PDF资料

扫描二维码推送至手机访问。

版权声明:本文由我的模板布,如需转载请注明出处。


本文链接:http://sdjcht.com/post/32399.html

分享给朋友:

“注入卡密验证软件(注入卡密的软件)” 的相关文章

允许小程序访问相册(允许小程序访问相册安全吗)

允许小程序访问相册(允许小程序访问相册安全吗)

本篇文章给大家谈谈允许小程序访问相册,以及允许小程序访问相册安全吗对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、我不打开某个小程序,它能否访问我的相册? 2、如何允许...

推广电影链接赚佣金(推广赚佣金的平台)

推广电影链接赚佣金(推广赚佣金的平台)

本篇文章给大家谈谈推广电影链接赚佣金,以及推广赚佣金的平台对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、横店影视抖音关注提佣金是真的吗 2、抖音怎么分享链接赚佣金?抖...

央视数字藏品平台(央视数字藏品平台官网)

央视数字藏品平台(央视数字藏品平台官网)

本篇文章给大家谈谈央视数字藏品平台,以及央视数字藏品平台官网对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、继人民日报社报道后StarArk数字文创首次登上CCTV央视新闻...

小鹅通怎么开直播(小鹅通怎么用手机直播)

小鹅通怎么开直播(小鹅通怎么用手机直播)

今天给各位分享小鹅通怎么开直播的知识,其中也会对小鹅通怎么用手机直播进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!本文目录一览: 1、小鹅通怎么同时小红书快手视频号直播 2...

国家认可的溯源码平台(中国溯源认证平台)

国家认可的溯源码平台(中国溯源认证平台)

本篇文章给大家谈谈国家认可的溯源码平台,以及中国溯源认证平台对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、做追溯防伪,哪家公司平台比较好?中国追溯链.com可以吗?...

京东灵稀数字藏品入口(京东数字藏品怎么交易)

京东灵稀数字藏品入口(京东数字藏品怎么交易)

本篇文章给大家谈谈京东灵稀数字藏品入口,以及京东数字藏品怎么交易对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。 本文目录一览: 1、在线求解稀物数字藏品怎么玩 2、怎么进入京东灵犀平台...