Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
23 changes: 12 additions & 11 deletions chat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package proto;
service ChatService {
rpc Subscribe(Chat.Consumer) returns (stream Chat.Message); // 订阅消息
rpc Reg(Chat.Id) returns (Chat.Nil); // 注册一个EndPoint
rpc UnReg(Chat.Id) returns (Chat.Nil); // 取消注册
rpc Query(Chat.ConsumeRange) returns (Chat.List); // 返回一个范围的消息
rpc Latest(Chat.ConsumeLatest) returns (Chat.List); // 返回最新的消息
}
Expand All @@ -16,24 +17,24 @@ message Chat {
repeated Message Messages = 1;
}
message Id {
uint64 Id=1;
uint64 Id = 1; //EndPoint id
}
message Message {
uint64 Id=1;
bytes Body=2;
int64 Offset=3;
uint64 Id = 1; //EndPoint id
bytes Body = 2; //message content
int64 Offset= 3; //offset
}
message Consumer{
uint64 Id=1;
int64 From=2;
uint64 Id = 1; //EndPoint id
int64 From = 2; // form offset
}
message ConsumeRange {
uint64 Id=1;
int64 From=2;
int64 To=3;
uint64 Id = 1; //EndPoint id
int64 From = 2; //form offset
int64 To = 3; //to offset
}
message ConsumeLatest {
uint64 Id=1;
int64 Length=2;
uint64 Id = 1; //EndPoint id
int64 Length = 2; //message num
}
}
73 changes: 73 additions & 0 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package: .
import:
- package: github.com/Shopify/sarama
version: v1.11.0
- package: github.com/Sirupsen/logrus
version: v0.11.5
- package: github.com/boltdb/bolt
version: v1.3.0
- package: github.com/golang/protobuf
version: master
subpackages:
- proto
- package: golang.org/x/net
subpackages:
- context
- package: google.golang.org/grpc
version: v1.2.1
- package: gopkg.in/urfave/cli.v2
version: v2
40 changes: 32 additions & 8 deletions kafka/init.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package kafka

import (
"fmt"
"log"

cli "gopkg.in/urfave/cli.v2"

"github.com/Shopify/sarama"
"gopkg.in/urfave/cli.v2"
)

var (
kAsyncProducer sarama.AsyncProducer
kClient sarama.Client
ChatTopic string
config *sarama.Config
)

func initKafka(c *cli.Context) {
addrs := c.StringSlice("kafka-brokers")
ChatTopic = c.String("chat-topic")
config := sarama.NewConfig()
func initKafka(addrs []string, chatTopic string) {
ChatTopic = chatTopic
config = sarama.NewConfig()
config.Producer.Return.Successes = false
config.Producer.Return.Errors = false
config.Producer.Return.Errors = true
producer, err := sarama.NewAsyncProducer(addrs, config)
if err != nil {
log.Fatalln(err)
Expand All @@ -34,8 +34,32 @@ func initKafka(c *cli.Context) {
}

func Init(c *cli.Context) {
initKafka(c)
addrs := c.StringSlice("kafka-brokers")
topic := c.String("chat-topic")
initKafka(addrs, topic)
}

func InitTest(addrs []string, chatTopic string) {
initKafka(addrs, chatTopic)
}

func NewConsumer() (sarama.Consumer, error) {
return sarama.NewConsumerFromClient(kClient)
}

func SendChat(ep uint64, msg []byte) {
pm := &sarama.ProducerMessage{
Topic: ChatTopic,
Key: sarama.StringEncoder(fmt.Sprint(ep)),
Value: sarama.ByteEncoder(msg),
}
kAsyncProducer.Input() <- pm
if config.Producer.Return.Errors || config.Producer.Return.Successes {
select {
case e := <-kAsyncProducer.Errors():
log.Println("error", ChatTopic, ep, e)
case <-kAsyncProducer.Successes():
default:
}
}
}
11 changes: 3 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ import (
"os"
"time"

"github.com/xtaci/chat/kafka"

cli "gopkg.in/urfave/cli.v2"
"chat/kafka"
pb "chat/proto"

log "github.com/Sirupsen/logrus"
"github.com/xtaci/logrushooks"
"google.golang.org/grpc"

pb "github.com/xtaci/chat/proto"
"gopkg.in/urfave/cli.v2"
)

func main() {
log.AddHook(logrushooks.LineNoHook{})

go func() {
log.Info(http.ListenAndServe("0.0.0.0:6060", nil))
}()
Expand Down
Loading