Laravel 实现 Kafka 消息推送与接收处理 | Laravel China 社区


本站和网页 https://learnku.com/articles/15176/laravel-implementation-of-kafka-message-push-and-receive-processing 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

Laravel 实现 Kafka 消息推送与接收处理 | Laravel China 社区
Laravel
话题列表
社区 Wiki
优质外文
招聘求职
Laravel 实战教程
社区文档
登录
注册
Laravel
首页
Laravel
Go
PHP
Vue.js
Python
Java
MySQL
Rust
LK
Elasticsearch
F2E 前端
Server
程序员
Database
DevTools
Computer Science
手机开发
AdonisJS
社区
Wiki
教程
Laravel 实战教程首页
《L01 Laravel 教程 - Web 开发实战入门》
《L02 Laravel 教程 - Web 开发实战进阶》
《L03 Laravel 教程 - 实战构架 API 服务器》
《L04 Laravel 教程 - 微信小程序从零到发布》
《L05 Laravel 教程 - 电商实战》
《L06 Laravel 教程 - 电商进阶》
《LX1 Laravel / PHP 扩展包视频教程》
《LX2 PHP 扩展包实战教程 - 从入门到发布》
《L07 Laravel 教程 - Laravel TDD 测试实战》
《LX3 Laravel 性能优化入门》
《LX4 Laravel / PHP 五分钟视频》
文档
社区文档首页
《Laravel 中文文档》
《Laravel 速查表》
《PHP 代码简洁之道》
《Laravel 编码技巧》
《Dcat Admin 中文文档》
《Laravel Nova 中文文档》
《Lumen 中文文档》
《Dingo API 中文文档》
《 Laravel 项目开发规范》
《构建 Laravel 开发环境》
登录
注册
微信登录
Laravel 实现 Kafka 消息推送与接收处理
27
24
15
Explorer 的个人博客
11726
15
创建于 4年前
更新于 4年前
安装环境要求
PHP 版本大于 7.0
Kafka Server 版本大于 0.8.0
消费模块 Kafka Server 版本需要大于 0.9.0
安装
使用 Composer 安装
添加 composer 依赖 nmred/kafka-php 到项目的 composer.json 文件中即可,如:
"require": {
"php": ">=5.6.4",
"laravel/lumen-framework": "5.4.*",
"nmred/kafka-php": "dev-master"
},
创建 KafkaService
<?php
namespace App\Http\Services;
use Kafka;
class KafkaService
public function __construct()
date_default_timezone_set('PRC');
/*
* Produce
*/
public function Producer($topic, $value , $url)
$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList($url);
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(function () use($value,$topic){
return [
'topic' => $topic,
'value' => $value,
'key' => '',
],
];
});
$producer->success(function ($result){
return "success";
});
$producer->error(function ($errorCode){
var_dump($errorCode);
});
$producer->send(true);
/*
* Consumer
*/
public function consumer($group,$topics , $url){
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(500);
$config->setMetadataBrokerList($url);
$config->setGroupId($group);
$config->setBrokerVersion('1.0.0');
$config->setTopics([$topics]);
$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic, $part, $message) {
echo "receive a message...\n";
app('consumerKafka')->consumerData($message['message']['value']); //你的接收处理逻辑
var_dump($message['message']['value']);
});
执行produce方法生产消息
<?php
namespace App\Http\Services;
use App\Http\Services\KafkaService;
class ProduceService
public function produce()
$topic = env('topic_test'); //配置在env中
$url = env('kafka_url_test'); //配置在env中
$value =
'code' => 'test',
'data_type' => 'personal',
'action' => 'update',
'data' =>
'id' => 1,
'name' => 'tom',
'gender' => 2
],
'redirect_url' => '',
'operator' => 'system',
];
$value = json_encode ($value, JSON_FORCE_OBJECT );
$kafka = new KafkaService();
$kafka->Producer($topic, $value , $url);
执行php artisan consumer:kafka 消费消息
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
class ConsumerKafka extends Command
/**
* The name and signature of the console command.
* @var string
*/
protected $signature = 'consumer:kafka';
/**
* The console command description.
* @var string
*/
protected $description = '处理异步kafka消息';
/**
* Create a new command instance.
*/
public function __construct()
parent::__construct();
/**
* Execute the console command.
* @return mixed
*/
public function handle()
$this->log('开始监听消息...');
app('kafkaService')->consumer($group=env('KAFKA_GROUP'),$topics =env('KAFKA_TOPIC'), $url=env('KAFKA_URL'));
return $this;
private function log($msg = '')
if (!$msg) {
return $this;
if (php_sapi_name() == 'cli') {
echo $msg, PHP_EOL;
app('myLog')->lumenLog($msg, 'kafka_consumer');
return $this;
本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 4年前 自动加精
举报
Explorer
课程读者
163 声望
后台开发 @ 彩生活
暂无个人描述~
27 人点赞
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
推荐文章:
更多推荐...
博客
用Go实现支持多种协议的抓包工具——Shermie-Proxy
22
33
5个月前
博客
消息中间件应用的常见问题与方案
20
7个月前
博客
[踩了个坑] Laravel 访问https网址,url('/')竟然只返回 http?
14
13
8个月前
博客
基于Hyperf + Vue + Element 构建的后台管理系统(内置聊天系统)
64
21
10个月前
博客
从 0 开始打造聊天室,搞定 Laravel 实时通信 —— 发送消息
18
13
10个月前
分享
屎山警告!Pipeline竟优雅的处理了Laravel多条件查询
22
30
11个月前
讨论数量: 15
排序:
时间
投票
newbing
48 声望
首席加班码农 @ 彩豚智能
不知道是否可以适配一个Laravel Queue 的 driver 出来。另外 Kafka 2.0 发布了。
4年前
评论
评论
举报
fanyoujian
0 声望
我按照你的方式,执行 返回 1000 , 但是服务器没有打印出消息啊
4年前
评论
评论
kafkaasaxa
3年前
出现1000 是怎么解决的啊
举报
ashishnimrot
0 声望
Senior Web Developer @ DLS
Class kafkaService does not exist {"exception":"[object] (ReflectionException(code: -1): Class kafkaService does not exist at vendor/laravel/framework/src/Illuminate/Container/Container.php:752)
3年前
评论
评论
举报
likening
0 声望
请问,消费者怎么主动设置offset呢
3年前
评论
评论
举报
jamestan
0 声望
@ashishnimrot 请问这个问题解决了吗?我也遇到这个问题
3年前
评论
评论
举报
yyq0917
0 声望
@jamestan 换个写法 直接new这个类调用方法就可以了
3年前
评论
评论
举报
PeterHub
课程读者
1 声望
我未用过,但是用过人的不建议用哦 https://segmentfault.com/q/101000001091058...
3年前
评论
评论
举报
chengxuyuan_liaoli
0 声望
开发 @ 微行
这个代码没有出现两次请求吗
3年前
评论
评论
chengxuyuan_liaoli
(作者)
3年前
发送send请求,请求了两次
举报
chengxuyuan_liaoli
0 声望
开发 @ 微行
消费者这两个参数是啥 $topic, $part
3年前
评论
评论
举报
ICanFly
课程读者
1 声望
我也遇到了“输出1000”的问题, 最后排除到原因是kafka-php这个库没有详细报出具体错误信息,使用debug模式就可以看到详细的日志输出了。以下是我安装rdkafka定位到问题的
安装C写的rdkafka扩展
安装librdkafka 库
git clone https://github.com/edenhill/librdkafka.git
./configure
make
sudo make install
安装rdkafka
sudo pecl install rdkafka
配置扩展到php.ini
extension=rdkafka.so
写代码测试
$conf = new \RdKafka\Conf();
// enable debug mode
$conf->set('log_level', LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new \RdKafka\Producer($conf);
$rk->addBrokers("192.168.5.170:9092,192.168.5.99:9092");
$topic = $rk->newTopic("php_topic");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
$producer = new \RdKafka\Producer();
$producer->addBrokers("192.168.5.170:9092,192.168.5.99:9092");
$obj_topic = $producer->newTopic("php_topic");
//
$input_handler = fopen('php://stdin', 'r');
while (true) {
echo "\nPlease input messages:\n";
$payload = trim(fgets($input_handler));
// empty message will be quit
if (empty($payload)) {
break;
// send message
$obj_topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
echo "done\n";
exit(0);
检查输出日志发现无法解析服务器的错误,后来才发现是公司运维配置了本地host映射。在自己服务器上/etc/hosts配置好映射就好了。
%3|1571918912.221|FAIL|rdkafka#producer-1| [thrd:server5:9092/5]: server5:9092/5: Failed to resolve 'server5:9092': Name or service not known (after 3ms in state CONNECT)
这个时候去用kafka-php库也能正常工作了。
以上是我踩坑排除问题的思路,希望能帮助到大家:)
3年前
评论
评论
举报
bo_life
课程读者
0 声望
我创建topic 1000什么原因 求解
2年前
评论
评论
举报
mar_he
课程读者
18 声望
php @ 是蓝色生死恋
我使用的是kafka-php的包,你们有没有遇到不能主动创建主题的问题。如果生产者使用的主题,没有在zookeeper中注册的话,生产者会报1000的错误。如果使用已创建的主题就不会有问题。
2年前
评论
评论
举报
jiyongchang
0 声望
1000的问题怎么解决
9个月前
评论
评论
举报
讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
<a href="javascript:;" class="mr-2 ui popover text-mute" data-html="黏贴或拖拽图片至输入框内皆可上传图片">
<a href="javascript:;" class="mr-2 ui popover text-mute hide-on-mobile" data-html="支持除了 H1~H6 以外的GitHub 兼容 Markdown">
支持 MD
帮助
关注本文
评论
Explorer
后台开发 @ 彩生活
文章
粉丝
11
喜欢
31
收藏
29
排名:356
访问:2.5 万
关注
私信
所有博文
阅读模式
文章归档
1 篇
2019 年 9 月
1 篇
2018 年 7 月
最新文章
最受欢迎
3年前
Laravel Passport OAuth 数据库查询改缓存优化
4年前
Laravel 实现 Kafka 消息推送与接收处理
27
Laravel 实现 Kafka 消息推送与接收处理
Laravel Passport OAuth 数据库查询改缓存优化
博客标签
passport
社区赞助商
成为赞助商
社区赞助商
成为赞助商
关于 LearnKu
LearnKu 是终身编程者的修道场
做最专业、严肃的技术论坛
LearnKu 诞生的故事
资源推荐
《社区使用指南》
《文档撰写指南》
《LearnKu 社区规范》
《提问的智慧》
服务提供商
其他信息
成为版主
所有测验
联系站长(反馈建议)
粤ICP备18099781号-6
粤公网安备 44030502004330号
违法和不良信息举报
由 Summer 设计和编码 ❤
请登录
提交
忘记密码?
or
注册
第三方账号登录
微信登录
GitHub 登录
内容举报
匿名举报,为防止滥用,仅管理员可见举报者。
我要举报该,理由是:
垃圾广告:恶意灌水、广告、推广等内容
无意义内容:测试、灌水、文不对题、消极内容、文章品质太差等
违规内容:色情、暴利、血腥、敏感信息等
不友善内容:人身攻击、挑衅辱骂、恶意行为
科学上网:翻墙、VPN、Shadowsocks,政策风险,会被关站!
不懂提问:提问太随意,需要再做一遍《提问的智慧》测验
随意提问:提问没有发布在社区问答分类下
排版混乱:没有合理使用 Markdown 编写文章,未使用代码高亮
内容结构混乱:逻辑不清晰,内容混乱,难以阅读
标题随意:标题党、标题不释义
尊重版权:分享付费课程、破解软件(付费),侵犯作者劳动成果
其他理由:请补充说明
举报
取消