当前位置: 首页 > news >正文

网站建设怎么做账务处理网站seo视频狼雨seo教程

网站建设怎么做账务处理,网站seo视频狼雨seo教程,开网店的步骤和流程,参考消息官方网站阅读文章目录Pulsar IO (Connector连接器)基础定义安装Pulsar和内置连接器连接Pulsar到Cassandra安装cassandra集群配置Cassandra接收器创建Cassandra Sink验证Cassandra Sink结果删除Cassandra Sink连接Pulsar到PostgreSQL安装PostgreSQL集群配置JDBC接收器创建JDBC Sink验证JDBC …

文章目录

  • Pulsar IO (Connector连接器)
    • 基础定义
    • 安装Pulsar和内置连接器
    • 连接Pulsar到Cassandra
      • 安装cassandra集群
      • 配置Cassandra接收器
      • 创建Cassandra Sink
      • 验证Cassandra Sink结果
      • 删除Cassandra Sink
    • 连接Pulsar到PostgreSQL
      • 安装PostgreSQL集群
      • 配置JDBC接收器
      • 创建JDBC Sink
      • 验证JDBC Sink结果
  • Pulsar SQL
    • 定义
    • 简单使用

Pulsar IO (Connector连接器)

基础定义

Pulsar IO连接器能够轻松地创建、部署和管理与外部系统(如Apache Cassandra、Aerospike等)交互的连接器。IO连接器有两种类型:源连接器和接收器连接器。

image-20230212113058962

可以通过Connector Admin CLI使用源和接收器子命令管理Pulsar连接器(例如,在连接器上创建、更新、启动、停止、重新启动、重新加载、删除和执行其他操作)。有关最新和完整的信息,请参阅Pulsar管理文档。

安装Pulsar和内置连接器

在将Pulsar连接到数据库之前,需要先安装Pulsar和所需的内置连接器。要启用Pulsar连接器,您需要在下载页面上下载连接器的tarball版本。

# 下载最新版本2.11.0的pulsar-io-cassandra和pulsar-io-jdbc-postgres,需要什么连接器可以从官方查看是否支持并下载,这里举例就下载两个
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-cassandra-2.11.0.nar
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-jdbc-postgres-2.11.0.nar
# 在pulsar根目录下创建目录
mkdir connectors
# 将压缩文件移动connectors目录
mv pulsar-io-jdbc-postgres-2.11.0.nar pulsar-io-jdbc-postgres-2.11.0.nar connectors
# 重启pulsar
# 查看可用连接器列表
curl -w '\n' -s http://localhost:8080/admin/v2/functions/connectors

image-20230308101658342

连接Pulsar到Cassandra

安装cassandra集群

# 下载镜像并启动cassandra测试容器
docker run -d --rm --name=cassandra -p 9042:9042 cassandra
# 查看进程
docker ps
# 查看运行日志
docker logs cassandra
# 等待一小段时间后查看Cassandra集群状态
docker exec cassandra nodetool status
# 使用cqlsh连接到Cassandra集群

image-20230308091133556

# 使用cqlsh连接到Cassandra集群
docker exec -ti cassandra cqlsh localhost
# 创建一个密钥空间pulsar_itxs_keyspace
CREATE KEYSPACE pulsar_itxs_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
# 创建一个表pulsar_itxs_table
USE pulsar_itxs_keyspace;
CREATE TABLE pulsar_itxs_table (key text PRIMARY KEY, col text);

image-20230308092441377

配置Cassandra接收器

现在已经有一个Cassandra集群在本地运行;要运行Cassandra接收器连接器,需要准备一个配置文件,其中包括Pulsar连接器运行时需要知道的信息,例如Pulsar连接器如何找到Cassandra集群,Pulsar连接器用于写入Pulsar消息的键空间和表是什么等等;可以使用Json或者Yaml这两种格式创建配置文件。

vim examples/cassandra-sink.json

{"roots": "192.168.3.100:9042","keyspace": "pulsar_itxs_keyspace","columnFamily": "pulsar_itxs_table","keyname": "key","columnName": "col"
}

vim examples/cassandra-sink.yml

configs:roots: "192.168.3.100:9042"keyspace: "pulsar_itxs_keyspace"columnFamily: "pulsar_itxs_table"keyname: "key"columnName: "col"

创建Cassandra Sink

可以使用Connector Admin CLI创建sink连接器和操作。运行下面命令来创建一个Cassandra接收器连接器,接收器类型为Cassandra,配置文件为上一步创建的examples/cassandra-sink.yml。

bin/pulsar-admin sinks create \--tenant my-test \--namespace my-namespace \--name cassandra-itxs-sink \--sink-type cassandra \--sink-config-file examples/cassandra-sink.yml \--inputs persistent://my-test/my-namespace/itxs_cassandra    

命令执行后,Pulsar创建接收器连接器cassandra-itxs-sink。这个接收器连接器作为Pulsar函数运行,并将主题itxs_cassandra中产生的消息写入Cassandra表pulsar_itxs_table;

image-20230308103049990

可以使用Connector Admin CLI对连接器进行监控和其他操作。

  • 获取连接器的信息
bin/pulsar-admin sinks get \--tenant my-test \--namespace my-namespace \--name cassandra-itxs-sink
  • 检查连接器的状态
bin/pulsar-admin sinks status \--tenant my-test \--namespace my-namespace \--name cassandra-itxs-sink

验证Cassandra Sink结果

生成一些消息到Cassandra接收器itxs_cassandra的输入主题

for i in {0..9}; do bin/pulsar-client produce -m "itxskey-$i" -n 1 persistent://my-test/my-namespace/itxs_cassandra; done

再次查看连接器的状态,可以有10条记录处理统计信息

image-20230308103247012

查看Cassandra的pulsar_itxs_table

USE pulsar_itxs_keyspace;
select * from pulsar_itxs_table;

image-20230308105728900

删除Cassandra Sink

bin/pulsar-admin sinks delete \--tenant my-test \--namespace my-namespace \--name cassandra-itxs-sink

连接Pulsar到PostgreSQL

安装PostgreSQL集群

这里使用PostgreSQL 12 docker镜像在docker中启动一个单节点PostgreSQL集群。

# 从Docker中拉取PostgreSQL 12映像
docker pull postgres:12
# 启动postgres容器
docker run -d -it --rm \--name pulsar-postgres \-p 5432:5432 \-e POSTGRES_PASSWORD=password \-e POSTGRES_USER=postgres \postgres:12
# 查看运行日志
docker logs -f pulsar-postgres
# 进入容器
docker exec -it pulsar-postgres /bin/bash
# 使用默认用户名和密码登录PostgreSQL
psql -U postgres postgres
# 使用以下命令创建pulsar_postgres_jdbc_sink表:
create table if not exists pulsar_postgres_jdbc_sink
(
id serial PRIMARY KEY,
name VARCHAR(255) NOT NULL
);

配置JDBC接收器

现在有一个本地运行的PostgreSQ,接下来需要配置JDBC接收器连接器。

  • 创建配置文件vim connectors/pulsar-postgres-jdbc-sink.yaml
configs:userName: "postgres"password: "password"jdbcUrl: "jdbc:postgresql://192.169.3.100:5432/postgres"tableName: "pulsar_postgres_jdbc_sink"

创建JDBC Sink

执行下面命令后,Pulsar将创建接收器连接器pulse -postgres-jdbc-sink。这个sink连接器作为Pulsar函数运行,并将Topic为pulsar-postgres-jdbc-sink-topic中产生的消息写入PostgreSQL表pulsar_postgres_jdbc_sink。

bin/pulsar-admin sinks create \--tenant my-test \--namespace my-namespace \--archive ./connectors/pulsar-io-jdbc-postgres-2.11.0.nar \--inputs persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic \--name pulsar-postgres-my-jdbc-sink \--sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml \--parallelism 1

列出所有的sink

bin/pulsar-admin sinks list \--tenant my-test \--namespace my-namespace

image-20230308140145820

验证JDBC Sink结果

通过JavaAPI生成一些消息到Cassandra接收器pulsar-postgres-jdbc-sink-topic这个主题,在Java项目添加maven依赖

    <properties><pulsar.version>2.11.0</pulsar.version></properties>        <dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>${pulsar.version}</version></dependency>

这里演示实体类成员变量简单就直接使用public声明了

package sn.itxs.pulsar.io;public class User{public int id;public String name;
}

新增ClientDemo.java

package sn.itxs.pulsar.io;import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.AvroSchema;public class ClientDemo {public static void main(String[] args) throws Exception {PulsarClient client = null;Producer<User> producer = null;try {client = PulsarClient.builder().serviceUrl("pulsar://192.168.5.52:6650").build();producer = client.newProducer(AvroSchema.of(User.class)).topic("persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic").create();User user = new User();int index = 10;while (index++ < 20) {try {user.id = index;user.name = "this is a test " + index;producer.newMessage().value(user).send();} catch (Exception e) {e.printStackTrace();}}System.out.println("send finish");} catch (Exception e) {e.printStackTrace();}finally {if (producer!=null){producer.close();}if (client!=null){client.close();}}}
}

运行程序后查看PostgreSQL表pulsar_postgres_jdbc_sink,已经有刚才

image-20230308163802240

上面由于在Java中创建了Schema,因此不需要手工创建,可以查看当前persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic主体已生成Schema信息如下:

image-20230308165053977

如果要从pulsar-admin命令行创建schema可以这样操作

  • 创建schema,创建一个avro-schema文件,将以下内容复制到该文件中,并将该文件放在pulsar/connectors文件夹中。vim connectors/avro-schema
{"type": "AVRO","schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}","properties": {}
}
  • 上传schema到topic,将avro-schema模式上传到pulsar-postgres-jdbc-sink-topic主题
bin/pulsar-admin schemas upload persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema
  • 检查模式是否上传成功。
bin/pulsar-admin schemas get persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic1

image-20230308145650209

如需stop停止、restart重启指定的sinks可以如下操作,当然也可以更新指定sinks,详细命令可以查阅官网

bin/pulsar-admin sinks stop \--tenant my-test \--namespace my-namespace \--name pulsar-postgres-my-jdbc-sink \

Pulsar SQL

定义

Apache Pulsar用于存储事件数据流,事件数据由预定义的字段构成。通过模式注册表的实现,可以在Pulsar中存储结构化数据,并使用Trino(以前是Presto SQL)查询数据。作为Pulsar SQL的核心,Pulsar Trino插件使Trino集群中的Trino worker能够查询来自Pulsar的数据.

image-20230308170103068

由于Pulsar采用了基于两级段的架构,因此查询性能高效且可扩展性强。Pulsar中的主题在Apache BookKeeper中存储为段。每个主题段被复制到一些BookKeeper节点上,从而支持并发读和高读吞吐量。在Pulsar Trino连接器中,数据直接从BookKeeper中读取,因此Trino worker可以同时从水平可扩展数量的BookKeeper节点中读取

image-20230308170332105

简单使用

在Pulsar中查询数据前,需要安装Pulsar和内置连接器。

# 这里演示就直接启动独立集群
PULSAR_STANDALONE_USE_ZOOKEEPER=1 ./bin/pulsar standalone
# 启动一个Pulsar SQL worker
./bin/pulsar sql-worker run
# 初始化Pulsar独立集群和SQL worker后,执行SQL CLI:
./bin/pulsar sql
show catalogs;
show schemas in pulsar;
show tables in pulsar."public/default";

image-20230308172341425

通过前面的Java示例,我们改为Json格式写入Pulsar的user-topic

package sn.itxs.pulsar.io;import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;public class ClientSqlDemo {public static void main(String[] args) throws Exception {PulsarClient client = null;Producer<User> producer = null;try {client = PulsarClient.builder().serviceUrl("pulsar://192.168.5.52:6650").build();producer = client.newProducer(Schema.JSON(User.class)).topic("user-topic").create();User user = new User();int index = 10;while (index++ < 20) {try {user.id = index;user.name = "this is a test " + index;producer.newMessage().value(user).send();} catch (Exception e) {e.printStackTrace();}}System.out.println("send finish");} catch (Exception e) {e.printStackTrace();}finally {if (producer!=null){producer.close();}if (client!=null){client.close();}}}
}

运行程序后再来查询就有刚才发送的消息数据,_开头的字段为Pulsar 自带的。

select * from pulsar."public/default"."user-topic";

image-20230308175830023

  • 本人博客网站IT小神 www.itxiaoshen.com
http://www.ds6.com.cn/news/21672.html

相关文章:

  • 大作设计网站公司百度官方下载安装
  • 网站备案接入服务单位男生技能培训班有哪些
  • 做的好的音乐网站web个人网站设计代码
  • 小说网站建立seo在线优化工具
  • 网站备案有哪些费用看到招聘游戏推广员千万别去
  • 如何在本地搭建网站sem优化技巧
  • 连云港网站建设公司seo推广是什么工作
  • 广东海外建设监理有限公司网站佐力药业股票
  • 钓鱼网站的危害搜索引擎营销的主要方法
  • 主任说到学校新网站的建设工作淘宝关键词怎么选取
  • 少儿编程学什么搜索引擎优化的主要特征
  • 深圳注册公司个人数字证书海淀seo搜索引擎优化公司
  • 电子商务网站开发常用工具长沙seo优化排名
  • 网站开发所需开发环境360手机优化大师安卓版
  • 淄博临淄信息港网站页面怎么优化
  • 海淀做网站的公司上海牛巨微seo
  • 怎么做刷业网站seo优化分析
  • 什么网站做美式软装设计理念成品视频直播软件推荐哪个好用
  • 模板网站可以做推广吗简述seo和sem的区别
  • 生物科技网站建设 中企动力北京关键词优化到首页怎么做到的
  • 建设网站需要分析什么深圳营销型网站开发
  • 网站建设丨金手指谷哥12seo网站推广推荐
  • 基层单位不能建设网站广州新闻最新消息今天
  • 棉桃剥壳机做网站如何做电商
  • 静态网站制作模板关键词seo深圳
  • 泉州台商区建设局网站互联网推广引流公司
  • 网站建设实践报告小结爱站网注册人查询
  • 南昌网站建设报价怎么做好市场宣传和推广
  • 全网vip影视网站一键搭建百度浏览器在线打开
  • 腾讯wordpress主机企业网站优化排名