Edge gateway core communication service
This is a communication module, and there is no specific use specified
- make
- go >= 1.13
make
# run
cp conf/config-dist.yml config.yml
ncp -c config.ymlThis is the core of the entire application and can also be used as a library
The model used here is an IO module and a message center, where each IO module receives a message and broadcasts it to the other IO modules
The rules are used to determine whether the message is acceptable or not, and it is negative when no rule is matched (the message is not accepted)
This module can be used as a library.
Messages can be sent and received directly using the api interface
ncpio:
- type: tcpc
name: airctl
params: "localhost:8980"
i_rules:
- regexp: '.*"method": ?"(webrtc|history|ncp)".*'
invert: true
- regexp: '.*"method".*'
o_rules:
- regexp: '.*'
- type: api
name: airctlo_rulesoutput the io rule (after recv)i_rulesinput the io rule (before send)
i_rules&o_rulesuse regexp matched, because message should plain- There can be more than one rule, match them accordingly, and as long as one of them is met, the rule will be allows
- The
invertfield controls whether the regular match result should be inverted - When no rule is matched, the message does disallows
The complexity of using the regular expression part is hard to debug. That's why debug mode was added
./ncp -debugEvery messages by IO need two rules group
Three log: (RECV, BROADCAST, SEND)
RECV:IOWhen received the message- by
o_rules BROADCAST: When broadcasting a message to allIO- by
i_rules SEND:IOWhen can send a message
There can be more than one IO module of each type. You can give each one a name
By default, a unique name is automatically generated, incrementing from 0, 1, 2.
It is recommended not to set the name field manually, if you set the same name field,
it will be placed in the broadcast message and sent to you by name.
The same name will not receive the broadcast message with the same name.
Please check if you intentionally do not receive the broadcast, or if you accidentally set it to be the same
| Type | Description |
|---|---|
| API | build-in interface |
| tcpc | TCP socket client |
| tcps | TCP socket server |
| exec | Execute system command |
| jsonrpc2 | JSONRPC 2.0 simulation |
| logger | Record message log |
| mqtt | Connect Mqtt Broker |
At the core are six IO modules, with different combinations defined by configuration files to achieve different functions
Why is there no udp server and udp client?
There was no need for this at the time of development, some messages need to be verified for reliability. udp is not suitable
Is a special interface
It is used when ncpio is used as a library, and through this API is an IO module
import(
ncpio "sb.im/ncp/ncpio"
)
func main()
ncpios := ncpio.NewNcpIOs([]ncpio.Config{
{
Type: "api",
IRules: []ncpio.Rule{
{
Regexp: `.*`,
Invert: false,
},
},
ORules: []ncpio.Rule{
{
Regexp: `.*`,
Invert: false,
},
},
},
{
Type: "jsonrpc2",
Params: `{"result":"ok"}`,
IRules: []ncpio.Rule{
{
Regexp: `.*`,
Invert: false,
},
},
ORules: []ncpio.Rule{
{
Regexp: `.*`,
Invert: false,
},
},
},
})
go ncpios.Run(ctx)
ncpio.I <- []byte(`{"jsonrpc":"2.0","method":"test","id":"test.0"}`)
fmt.Println(string(<-ncpio.O))
}Note: Unit tests are tested using this interface
There can be at most one API interface
As a tcp client to connect to the server, the most common working mode
tcpc, tcps message splitting using \n as separator, will automatically chomp \r\n the case
As a tcp server to wait for client connections, reverse tcp mode of operation
Note: Only one client can be connected at a time. multiple clients connected at the same time will wait until the previous close before handling new connections
exec system command. NOTE: This is very dangerous!
- type: exec
params: "echo"
i_rules:
- regexp: '.*"method": ?"exec".*'
o_rules:
- regexp: '.*'This params is command, example: params: "/bin/sh"
This Recvice JSONRPC, ignore jsonrpc.method. jsonrpc.method is use of only match
For example:
params: "echo". jsonrpc2 {"jsonrpc":"2.0","method":"exec","params":["-n", "xxx"],"id":"x"}
Finally exec: echo -n xxx
Is a simulator that can simulate jsonrpc correct and error messages, which is often used for development and debugging
The params field of this module is a bit more complicated: the
When it is json, this is inserted directly (only result and error are legal)
When it's not json, the result is assigned directly to result.
Simulation success: params: 233 is the same as this.
{"result": 233}Simulation error:
{"error": {"code": 0, "message": "xxxxx"}}params: "file:///tmp/ncp/test.log?size=128M&count=8&prefix=SB"Why //, this is the standard way to write // when the url is part of /tmp is the root path
- size The maximum size of each log file, beyond which it will rotate
- count Keeps the last few files
- prefix log prefix
This module is a bit complicated so break it down to ncpio's mqtt and mqttd
Of course, you can also ignore this function and treat it as a normal io
All params are defined as strings, because yaml doesn't support save parsing, so it's not possible to do the same as json.RawMessage
The reason is: go-yaml/issues#13
yaml is contextual, so it doesn't have the same context as json
So mqtt's params are specified as a path
Messages received through the IO module will arrive here
The config file mqtt.yaml
config params:
mqttd:
rpc:
qos: 0
lru: 128
i: "nodes/%s/rpc/recv"
o: "nodes/%s/rpc/send"There are two mqtt topics corresponding to i and o, sending and receiving. topic is full duplex, why do we need to split into two topics?
A simple use can be to set both values to be the same and use one topic for sending and receiving
Message reliability control, two modes. Let's review tcp and kcp first
tcp returns an acknowledgement message when it receives a message, and then sends a new message, which is acknowledged by a sliding window
Later, in the pursuit of low latency, udp is wrapped with reliability, and packets are sent violently to improve response time. This is what kcp does
The kcp approach has lower latency than tcp, but consumes more traffic than tcp.
Message reliability control, although mqtt but qos can be reliability control, set qos 1 or qos 2
In the actual test product, mqtt itself qos does not perform well when the network conditions are bad, the real-time requirements are high, and the data volume is large
mqtt's qos will repeatedly confirm the arrival of packet messages (qos is doubled for broker, two clients), so use qos 0 for reliability control at the upper layer itself
This introduces two parameters: qos and lru
Use mqtt's own qos (similar to tcp mode) to set qos to 2. lru is set to 0.
own reliability processing (kcp-like mode) to set qos to 0. lru to a number other than 0 (default is 128)
Why is there a LRU here?
So use a new mode, brute force data sending, using LRU to filter duplicate messages sent
It's more complicated to write your own message validation part to control reliability.
It is recommended that the demo use mqtt's own qos. product environment uses LRU and qos 0 for this working mode
status & network is send self status
status send self status: mqtt last will message, network error
{
"msg":"neterror",
"timestamp":"1619164695",
"status":{
"lat":"22.6876423001",
"lng":"114.2248673001",
"alt":"10088.0001"
}
}status config is mqttd.static . An extended field that can hold some meta information
msg: online, offline, neterror
{"loss":0,"delay":5}network send self and mqtt broker network status
loss(1~100)%delayunitms
The received message will be judged as non-jsonrpc, if it is jsonrpc message will connect to mqtt broker in rpc way
If it is not jsonrpc, it will go through this module, which is a one-way data
tran socket recv
{"weather":{"WD":0,"WS":0,"T":66115,"RH":426,"Pa":99780}}mqtt send topic: nodes/:id/msg/weather
{"WD":0,"WS":0,"T":66115,"RH":426,"Pa":99780}Automatically maps the outermost key of the json as part of the topic. (Multiple outer keys will be split into multiple messages)
gtran.prefix config mqtt prefix
trans config mqtt topic qos and retain
For example:
{
"weather":{"WD":0,"WS":0,"T":66115,"RH":426,"Pa":99780},
"battery":{"status":"ok"}
}mqtt send topic: nodes/:id/msg/weather
{"WD":0,"WS":0,"T":66115,"RH":426,"Pa":99780}mqtt send topic: nodes/:id/msg/battery
{"status":"ok"}build-in history method
history get trans send history data
params:
| Field | Type | Description |
|---|---|---|
| topic | string | Topic |
| time | string | such as "300ms", "-1.5h" or "2h45m". Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h" golang time#ParseDuration |
Request:
{
"jsonrpc":"2.0",
"id":"sdwc.1-1553321035000",
"method":"history",
"params":{
"topic":"msg/weather",
"time": "10s"
}
}Response:
{
"jsonrpc": "2.0",
"id":"sdwc.1-1553321035000",
"result":[
{"1553321035": {"WD": 20, "WS": 5}},
{"1553321034": {"WD": 10, "WS": 1}},
{"1553321000": {"WD": 1, "WS": 1}}
]
}The result[key] , key is timestamp
ncp_online and ncp_offline control online status
Is jsonrpc notification. not result, not params
If use: tcpc, tcps, mqtt, logger
Use this config.yaml :
# type: tcps / tcpc / mqtt / logger / jsonrpc2 / api
ncpio:
- type: tcpc
params: "localhost:8980"
i_rules:
- regexp: '.*"method": ?"(webrtc|history|ncp)".*'
invert: true
- regexp: '.*"method".*'
o_rules:
- regexp: '.*'
- type: tcps
params: "localhost:1208"
i_rules:
- regexp: '.*"method": ?"webrtc".*'
o_rules:
- regexp: '.*"method".*'
invert: true
- regexp: '.*'
- type: mqtt
params: "config.yml"
i_rules:
- regexp: '.*'
o_rules:
- regexp: '.*'
- type: logger
params: "file:///tmp/ncp/test.log?size=128M&count=8&prefix=SB"
i_rules:
- regexp: '.*'broker to mqttd send:
{"jsonrpc":"2.0","id":"test.0-1553321035000","method":"webrtc","params":[]}mqttwill first go through its owno_rulesand find that it is. *(matching any message), so it broadcasts this message totcpctcpsloggerloggeris a logger:i_rulesis usually. *writes to the log after receiving this messagetcpc'si_rulesmatches. *"method": ?" (webrtc|history|ncp)". *, butinvertreverses the match. This message is rejectedi_rulesoftcpsjust matches this command. This command is sent totcpstcpsreceives the message filtered byo_rulesand sends it to the message center, which broadcasts the message to every enabledIOother than itself (excluding the sender of the message).mqttalso receives the message, and after receiving the broadcast message, it uses its owni_rulesto determine if the message is ready to be sent.loggerwill also receive this message, and after receiving the broadcast message, it will use its owni_rulesto determine if the message needs to be logged to the log file
- io udp's io maybe needs to be added
- io mqtt or need a configuration item to enable to use json and jsonrpc