欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

ceph 高可用分布式存储群集 14-ceph 对象存储桶通知(桶通知)实践

最编程 2024-04-04 08:50:51
...
注:后面的操作其实都可以调用ceph和rabbitmq的api完成,我们的生产环境都是python调用api的代码全自动完成的。
创建topic
操作bucket notification API可以用Postman或者直接shell的curl命令发送HTTP请求,也可以通过工具发送。awscli是用python boto3库实现的管理工具。以下操作基于awscli。安装awscli工具:
curl "https://s3.amazonaws.com/aws-cli/awscli-bundle.zip" -o "awscli-bundle.zip"
yum install unzip -y
unzip awscli-bundle.zip
yum install python36 -y
python3 awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws
注:yum安装的awscli版本太低,最好使用较新版本。
使用前需要对用户、密钥等对象存储访问基础信息进行配置。
/usr/local/bin/aws configure
配置完成后查看.aws/credentials有了值
[root@host-10-2-112-152 ~]# cat .aws/credentials
[default]
aws_access_key_id = U4UBTSG47ZBXSB3TVBK3
aws_secret_access_key = E9UZ41lfzQJOHXxOEhHlx9LhPUaG9EpsaHkEAtcd
这里的aws_access_key_id,aws_secret_access_key为hxapp这个bucket的。
注:
awscli的一些案例可以在awscli案例中找到,很有帮助。
数字签名上的设置,否则会返回签名因为不同版本差异会报有误。
/usr/local/bin/aws configure set default.sns.signature_version s3
创建topic
/usr/local/bin/aws --region=default --endpoint-url http://s3-cn-east-2.mydeamon.cn:7480 sns create-topic --name=hxapp  --attributes='{"push-endpoint": "amqp://10.2.112.192:5672", "amqp-exchange": "s3_exchange", "amqp-ack-level": "broker"}'
其中http://s3-cn-east-2.mydeamon.cn:7480是rgw对外提供的服务地址,hxapp为新创建topic的名称。amqp://10.2.112.192:5672是rabbitmq的IP和端口,这里如果不带user和passwd默认是guest/guest,vhost默认是/。s3_exchange为rabbitmq上exchange的名称。
返回结果表明创建成功
{
   "TopicArn": "arn:aws:sns:default::hxapp"
}
该结果的固定格式如下:
arn:aws:sns:<zone-group>:<tenant>:<topic>
查看集群topic方式
radosgw-admin topic list
查看topic的配置
/usr/local/bin/aws --region=default --endpoint-url http://s3-cn-east-2.mydeamon.cn:7480 sns get-topic-attributes --topic-arn="arn:aws:sns:default::hxapp"
创建通知配置
有了topic之后需要指定对应的桶通知哪些内容。需要指定一个已经存在的桶(例子中是testwxc),设定桶通知的配置,如ID、对应的topic、事件和信息过滤器。
/usr/local/bin/aws --region=default --endpoint-url http://s3-cn-east-2.mydeamon.cn:7480  s3api put-bucket-notification-configuration --bucket=hxapp --notification-configuration='{"TopicConfigurations": [{"Id": "hxapp", "TopicArn": "arn:aws:sns:default::hxapp", "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]}]}'
查看已经成功设置的内容
# /usr/local/bin/aws --region=default --endpoint-url http://s3-cn-east-2.mydeamon.cn:7480  s3api get-bucket-notification-configuration --bucket=hxapp
{
    "TopicConfigurations": [
        {
            "Id": "hxapp",
            "TopicArn": "arn:aws:sns:default::hxapp",
            "Events": [
                "s3:ObjectCreated:*",
                "s3:ObjectRemoved:*"
            ],
        }
    ]
}
三、事件获取
完成RGW服务侧的配置之后,使用python语言写一个接收程序(消费者)获取推送的通知信息。需要使用到amqp的库。python 连接操作rabbitmq主要是使用pika库.
代码完成流程:
* 1、连接rabbitmq-server;
* 2、获得Channel;
* 3、定义获取Exchange、Queue;
* 4、将队列和Routing key绑定;
* 5、消费Queue中的内容。
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import time
import pika

BUCKET_NAME = 'hxapp'
TOPIC_NAME = BUCKET_NAME
ROUTING_KEY = BUCKET_NAME

QUEUE_NAME = 's3_queue'
AMQP_EXCHANGE = 's3_exchange'
HOST = '10.2.112.192'

def callback(channel, method, properties, body):
    print(" [x] Received %r" % body.decode('utf-8'))
    time.sleep(1)
    channel.basic_ack(delivery_tag=method.delivery_tag)
    #接收一个消息就退出
    exit(1)

parameters = pika.ConnectionParameters(host=HOST)

connection = pika.BlockingConnection(parameters)
# create exchange channel = connection.channel() channel.exchange_declare( exchange=AMQP_EXCHANGE, exchange_type="topic", durable=True, )
# create queue result = channel.queue_declare(queue=QUEUE_NAME, durable=True) queue_name = result.method.queue # bind queue to exchange channel.queue_bind( exchange=AMQP_EXCHANGE, queue=queue_name, routing_key=ROUTING_KEY, ) # 声明一个队列 # channel.queue_declare(queue=QUEUE_NAME, durable=True) # 发送消息 # data = bytes('Hello World!', encoding='utf-8') # channel.basic_publish(exchange='', routing_key=TOPIC_NAME, body=data) # print(" [x] Sent 'Hello World!'") # 接收消息 channel.basic_consume(on_message_callback=callback, queue=QUEUE_NAME) channel.start_consuming() # 关闭连接 # connection.close()

运行之后上传对象,可以获取到对象创建的消息。下面是生产环境中获取的一个示范桶通知。

 [x] Received '{"Records":[{"eventVersion":"2.2","eventSource":"ceph:s3","awsRegion":"","eventTime":"2022-01-05 09:30:31.814050Z","eventName":"s3:ObjectRemoved:Delete","userIdentity":{"principalId":"hxapp"},"requestParameters":{"sourceIPAddress":""},"responseElements":{"x-amz-request-id":"c2725bb2-49cb-4098-a4d3-3dc2d167511b.24310585.2439","x-amz-id-2":"172f339-default-default"},"s3":{"s3SchemaVersion":"1.0","configurationId":"hxapp","bucket":{"name":"hxapp","ownerIdentity":{"principalId":"hxapp"},"arn":"arn:aws:s3:::hxapp","id":"c2725bb2-49cb-4098-a4d3-3dc2d167511b.18142341.3"},"object":{"key":"158_3327/273338.png","size":0,"etag":"936079baae666679d09625850d5de44a","versionId":"","sequencer":"3765D56141C68530","metadata":[{"key":"x-amz-content-sha256","val":"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"},{"key":"x-amz-date","val":"20220105T093031Z"},{"key":"x-amz-user-agent","val":"aws-sdk-php/3.208.2"}],"tags":[]}},"eventId":"1641375031.814073.936079baae666679d09625850d5de44a","opaqueData":""}]}'

我这边的生产环境消息队列的运行情况图如下

注意事项:

ceph rgw产生的消息到达exchange之后,并不知道消息应该推到哪个queue,即表示不能routable。rabbitmq要通过routingkey将exchange的信息分发到queue中。

通过rabbitmq官方教程中的图就能更为清楚的说明消息队列的结构。其中routes根据routkeying发送到队列中。
但是,routing key是什么?这也是我花了很久没有走通的地方。回过头来看官方文档已经说明了这一点。
翻译过来“create topic使用的名称是exchange向queue分发消息时的routing key”。

注:关于生产环境如何使用程序调用ceph和rabbitmq的api程序创建桶通知和消费rabbitmq的消息队列,附上开发文档如下

https://docs.aws.amazon.com/sns/latest/api/API_CreateTopic.html

https://www.rabbitmq.com/devtools.html

 作者:Dexter_Wang   工作岗位:某互联网公司资深Linux架构师  联系邮箱:993852246@qq.com