Go操作Elasticsearch

操作ElasticSearch

golang elasticsearch入门教程

官方文档

我们使用第三方库https://github.com/olivere/elastic 来连接ES并进行操作。

注意下载与你的ES相同版本的client,在官方的README中可以看到如下表格:

Elasticsearch version Elastic version Package URL Remarks
7.x 7.0 github.com/olivere/elastic/v7 (source doc) Use Go modules.
6.x 6.0 github.com/olivere/elastic (source doc) Use a dependency manager (see below).
5.x 5.0 gopkg.in/olivere/elastic.v5 (source doc) Actively maintained.
2.x 3.0 gopkg.in/olivere/elastic.v3 (source doc) Deprecated. Please update.
1.x 2.0 gopkg.in/olivere/elastic.v2 (source doc) Deprecated. Please update.
0.9-1.3 1.0 gopkg.in/olivere/elastic.v1 (source doc) Deprecated. Please update.

操作示例

来看两个示例。(elk版本:6.4.0

示例1 helloworld

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"context"
"fmt"
"log"

"github.com/olivere/elastic"
)

type person struct {
Name string `json:"name"`
Age int `json:"age"`
Married bool `json:"married"`
}

func main() {
client, err := elastic.NewClient(elastic.SetURL("http://ip:9200"), elastic.SetSniff(false))
if err != nil {
// Handle error
log.Fatal(err)
}
fmt.Println("连接成功!")
p1 := person{
Name: "张三",
Age: 18,
Married: false,
}
response, err := client.Index().Index("person").Type("person").BodyJson(p1).Do(context.Background()) // Index a document
if err != nil {
log.Fatal(err)
}
fmt.Println(response)
fmt.Printf("Indexed person %v to index %v, type %v\n", response.Id, response.Index, response.Type)
}

如果不加elastic.SetSniff(false),会报错no active connection found: no Elasticsearch node available,解决方案:Cannot connect to elastic search : no active connection found: no Elasticsearch node available

运行结果:

示例2 CRUD

首先看一下init函数(init函数笔记),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/olivere/elastic"
)

type employee struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Age int `json:"age"`
About string `json:"about"`
Interests []string `json:"interests"`
}

var client *elastic.Client
var host = "http://ip:9200/"

func main() {

}

// go init函数不能被其他函数调用,而是在main函数执行之前,自动被调用
func init() {
errorLog := log.New(os.Stdout, "APP", log.LstdFlags)
client, err := elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host), elastic.SetSniff(false))
if err != nil {
log.Fatal(err)
}
pingResult, code, err := client.Ping(host).Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, pingResult.Version.Number)
version, err := client.ElasticsearchVersion(host)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Elasticsearch version %s\n", version)
}

create 创建

  • 使用结构体创建:BodyJson
  • 使用字符串(通常是json字符串)创建:BodyString(也可以使用BodyJson

其实观察源码可以发现:BodyJson几乎是万能的,也就是说使用字符串也可以用BodyJson的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// BodyJson is the document as a serializable JSON interface.
func (s *IndexService) BodyJson(body interface{}) *IndexService {
s.bodyJson = body
return s
}

// BodyString is the document encoded as a string.
func (s *IndexService) BodyString(body string) *IndexService {
s.bodyString = body
return s
}

type IndexService struct {
...
bodyJson interface{}
bodyString string
}

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/olivere/elastic"
)

type employee struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Age int `json:"age"`
About string `json:"about"`
Interests []string `json:"interests"`
}

var client *elastic.Client
var host = "http://ip
:9200"
var ctx = context.Background() // 执行ES请求需要提供一个上下文对象

func main() {
create()
}

// go init函数不能被其他函数调用,而是在main函数执行之前,自动被调用
func init() {
errorLog := log.New(os.Stdout, "APP", log.LstdFlags)
var err error
client, err = elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host), elastic.SetSniff(false))
if err != nil {
log.Fatal(err)
}
}

func create() {
// 使用结构体
e1 := employee{
FirstName: "top",
LastName: "sen",
Age: 18,
About: "热爱学习",
Interests: []string{"法律", "看书"},
}
response, err := client.Index().
Index("hx").
Type("employee").
Id("3").
BodyJson(e1).
Do(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("id: %s index: %s, type: %s\n", response.Id, response.Index, response.Type)

// 使用字符串
e2_str := `{
"first_name":"虎",
"last_name":"哥",
"age":25,
"about":"I love to go rock climbing",
"interests":[
"sports",
"music"
]
}`
resp1, err := client.Index().Index("hx").Type("employee").Id("5").BodyString(e2_str).Do(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("id: %s to index: %s, type: %s\n", resp1.Id, resp1.Index, resp1.Type)
}

通过查询可以发现已经成功添加:

delete 删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/olivere/elastic"
)

type employee struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Age int `json:"age"`
About string `json:"about"`
Interests []string `json:"interests"`
}

var client *elastic.Client
var host = "http://ip:9200"
var ctx = context.Background() // 执行ES请求需要提供一个上下文对象

func main() {
deleteEs()
}

func deleteEs() {
response, err := client.Delete().Index("hx").Type("employee").Id("3").Do(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("id: %s index: %s, type: %s\n", response.Id, response.Index, response.Type)
}

// go init函数不能被其他函数调用,而是在main函数执行之前,自动被调用
func init() {
errorLog := log.New(os.Stdout, "APP", log.LstdFlags)
var err error
client, err = elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host), elastic.SetSniff(false))
if err != nil {
log.Fatal(err)
}
}

update 修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/olivere/elastic"
)

type employee struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Age int `json:"age"`
About string `json:"about"`
Interests []string `json:"interests"`
}

var client *elastic.Client
var host = "http://ip:9200"
var ctx = context.Background() // 执行ES请求需要提供一个上下文对象

func main() {
update()
}

func update() {
response, err := client.Update().Index("hx").Type("employee").Id("2").
Doc(map[string]interface{}{
"age": 81,
}).Do(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("id: %s index: %s, type: %s\n", response.Id, response.Index, response.Type)
}

// go init函数不能被其他函数调用,而是在main函数执行之前,自动被调用
func init() {
errorLog := log.New(os.Stdout, "APP", log.LstdFlags)
var err error
client, err = elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host), elastic.SetSniff(false))
if err != nil {
log.Fatal(err)
}
}

源码:

1
2
3
4
5
// Doc allows for updating a partial document.
func (s *UpdateService) Doc(doc interface{}) *UpdateService {
s.doc = doc
return s
}

gets 查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/olivere/elastic"
)

type employee struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Age int `json:"age"`
About string `json:"about"`
Interests []string `json:"interests"`
}

var client *elastic.Client
var host = "http://ip:9200"
var ctx = context.Background() // 执行ES请求需要提供一个上下文对象

func main() {
gets()
}

func gets() {
// 通过id查找
getResult, err := client.Get().Index("hx").Type("employee").Id("2").Do(ctx)
if err != nil {
log.Fatal(err)
}
if getResult.Found {
fmt.Printf("Got document %s in version %d from index %s, type %s\n", getResult.Id, getResult.Version, getResult.Index, getResult.Type)
}
}

// go init函数不能被其他函数调用,而是在main函数执行之前,自动被调用
func init() {
errorLog := log.New(os.Stdout, "APP", log.LstdFlags)
var err error
client, err = elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host), elastic.SetSniff(false))
if err != nil {
log.Fatal(err)
}
}

query 搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"context"
"fmt"
"log"
"os"
"reflect"

"github.com/olivere/elastic"
)

type employee struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Age int `json:"age"`
About string `json:"about"`
Interests []string `json:"interests"`
}

var client *elastic.Client
var host = "http://ip:9200"
var ctx = context.Background() // 执行ES请求需要提供一个上下文对象

func main() {
query()
}

func query() {
// 查询所有
searchResult, err := client.Search("hx").Type("employee").Do(ctx)
if err != nil {
log.Fatal(err)
}
printEmployee(searchResult)

// 给定字段内容查询(字段相等)
q := elastic.NewQueryStringQuery("first_name:张")
result, err := client.Search("hx").Type("employee").Query(q).Do(ctx)
if err != nil {
log.Fatal(err)
}
printEmployee(result)

// 条件查询
// 年龄大于25的且first_name为“张”
boolQuery := elastic.NewBoolQuery()
boolQuery.Must(elastic.NewMatchQuery("first_name", "张"))
boolQuery.Filter(elastic.NewRangeQuery("age").Gt(25))
res, err := client.Search("hx").Type("employee").Query(boolQuery).Do(ctx)
if err != nil {
log.Fatal(err)
}
printEmployee(res)

// 短语搜索 搜索about字段中有“学习”
matchPhraseQuery := elastic.NewMatchPhraseQuery("about", "学习")
res1, err := client.Search("hx").Type("employee").Query(matchPhraseQuery).Do(ctx)
if err != nil {
log.Fatal(err)
}
printEmployee(res1)
}

// 打印查询到的employee
func printEmployee(result *elastic.SearchResult) {
var typ employee
for _, item := range result.Each(reflect.TypeOf(typ)) { // 从搜索结果中取数据的方法
emp, ok := item.(employee)
if ok {
fmt.Println(emp)
} else {
fmt.Println("断言失败!")
}
}
fmt.Println() // 换行
}

// go init函数不能被其他函数调用,而是在main函数执行之前,自动被调用
func init() {
errorLog := log.New(os.Stdout, "APP", log.LstdFlags)
var err error
client, err = elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host), elastic.SetSniff(false))
if err != nil {
log.Fatal(err)
}
}

list 分页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"context"
"fmt"
"log"
"os"
"reflect"

"github.com/olivere/elastic"
)

type employee struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Age int `json:"age"`
About string `json:"about"`
Interests []string `json:"interests"`
}

var client *elastic.Client
var host = "http://ip:9200"
var ctx = context.Background() // 执行ES请求需要提供一个上下文对象

func main() {
list(3, 2)
}

func list(pageSize int, pageNum int) {
// 对pageSize和pageNum非法情况的处理
if pageSize <= 0 {
pageSize = 3
}
if pageNum <= 0 {
pageNum = 1
}
res, err := client.Search("hx").Type("employee").Size(pageSize).From((pageNum - 1) * pageSize).Do(ctx)
if err != nil {
log.Fatal(err)
}
printEmployee(res)
}

// 打印查询到的employee
func printEmployee(result *elastic.SearchResult) {
var typ employee
for _, item := range result.Each(reflect.TypeOf(typ)) { // 从搜索结果中取数据的方法
emp, ok := item.(employee)
if ok {
fmt.Println(emp)
} else {
fmt.Println("断言失败!")
}
}
fmt.Println() // 换行
}

// go init函数不能被其他函数调用,而是在main函数执行之前,自动被调用
func init() {
errorLog := log.New(os.Stdout, "APP", log.LstdFlags)
var err error
client, err = elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host), elastic.SetSniff(false))
if err != nil {
log.Fatal(err)
}
}

Go操作Elasticsearch7

上面的教程中使用的是elk6.4.0。这里我们继续学习elk7的相关操作。(docker compose可以成功启动但是每次一起动服务器就卡机了。。。可能是因为我买的轻量应用服务器吧)

docker compose安装elk7

使用Docker-Compose部署单节点ELK

docker/awesome-compose

使用docker compose安装elk会方便很多。目前在整个ELK-Stack中还包括了Filebeat进行日志采集,这里我们也一并带上。

首先在同级目录下创建两个文件:.env文件和docker-compose.yml文件。

1
2
3
elk7
├── docker-compose.yml
└── .env

注意.开头的文件名代表隐藏文件,ls是看不到的,需要ls -a

.env文件内容如下:

1
2
# ELK版本号,统一elasticsearch,kibana,logstash的images版本
ELK_VERSION=7.16.1

docker-compose.yml文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
version: '3.7'

services:
elasticsearch:
image: "elasticsearch:${ELK_VERSION}"
environment:
- discovery.type=single-node # 将ES的集群发现模式配置为单节点模式
- ES_JAVA_OPTS:="-Xms512m -Xmx512m"
volumes:
- /etc/localtime:/etc/localtime # Docker容器中时间和宿主机同步
- /home/root/dockerVolumes/elk7/elasticsearch:/usr/share/elasticsearch/data # 将ES的数据映射并持久化至宿主机中
ports:
- "9200:9200"
- "9300:9300"

logstash:
depends_on:
- elasticsearch
image: "logstash:${ELK_VERSION}"
volumes:
- /home/root/dockerVolumes/elk7/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf # 将宿主机本地的logstash配置映射至logstash容器内部
ports:
- "5044:5044"
links:
- elasticsearch

kibana:
depends_on:
- elasticsearch
image: "kibana:${ELK_VERSION}"
environment:
- ELASTICSEARCH_URL=http://elasticsearch:9200 # 配置ES的地址
volumes:
- /etc/localtime:/etc/localtime # Docker容器中时间和宿主机同步
ports:
- "5601:5601"
links:
- elasticsearch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 服务器配置低,带不动
# filebeat:
# depends_on:
# - elasticsearch
# - logstash
# image: "elastic/filebeat:${ELK_VERSION}"
# user: root # 必须为root,否则会因为无权限而无法启动
# environment:
# - strict.perms=false
# volumes:
# - /home/root/dockerVolumes/elk7/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
# # 映射到容器中[作为数据源]
# - /home/root/dockerVolumes/elk7/filebeat/logs:/usr/share/filebeat/logs:rw
# - /home/root/dockerVolumes/elk7/filebeat/data:/usr/share/filebeat/data:rw
# # 将指定容器连接到当前连接,可以设置别名,避免ip方式导致的容器重启动态改变的无法连接情况
# links:
# - logstash

可以用container_name指定容器名称,如container_name: docker-web-container。docker compose默认将会使用 项目名称-服务名称-序号 这样的格式。

注意: 指定容器名称后,该服务将无法进行扩展(scale),因为 Docker 不允许多个容器具有相同的名称。

关于docker-compose.yml文件:

有多种版本的Compose文件格式:122.x3.x

相容性矩阵:下表显示了哪些Compose文件版本支持特定的Docker版本。

撰写档案格式 Docker Engine版本
3.7 18.06.0+
3.6 18.02.0+
3.5 17.12.0+
3.4 17.09.0+
3.3 17.06.0+
3.2 17.04.0+
3.1 1.13.1+
3.0 1.13.0+
2.4 17.12.0+
2.3 17.06.0+
2.2 1.13.0+
2.1 1.12.0+
2.0 1.10.0+
1.0 1.9.1。+

Compose模板文件支持动态读取主机的系统环境变量和当前目录下的 .env 文件中的变量。

更多内容可参考:Compose 模板文件

注意,因为涉及到数据卷挂载,elk的安装有点特殊,我们需要提前在宿主机中创建elasticsearch的挂载目录并授予权限:

1
chmod 777 /home/root/dockerVolumes/elk7/elasticsearch

接下来在/home/root/dockerVolumes/elk7/logstash路径下创建logstash.conf文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 5047
codec => json
}
}

output {
elasticsearch {
hosts => ["http://elasticsearch:9207"]
index => "%{[service]}-%{+YYYY.MM.dd}"
}
stdout { codec => rubydebug }
}

最后在/home/root/dockerVolumes/elk7/filebeat创建Filebeat配置文件filebeat.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
filebeat.inputs:
- type: log
enabled: true
paths:
# 容器中目录下的所有.log文件
- /usr/share/filebeat/logs/*.log
multiline.pattern: ^\[
multiline.negate: true
multiline.match: after

filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false

setup.template.settings:
index.number_of_shards: 1

setup.dashboards.enabled: false

setup.kibana:
host: "http://kibana:5601"

# 直接传输至ES
#output.elasticsearch:
# hosts: ["http://es-master:9200"]
# index: "filebeat-%{[beat.version]}-%{+yyyy.MM.dd}"

# 传输至LogStash
output.logstash:
hosts: ["logstash:5044"]

processors:
- add_host_metadata: ~
- add_cloud_metadata: ~

随后使用docker-compose命令启动:docker-compose up -d。(docker-compose down命令将会停止 up 命令所启动的容器,并移除网络)