当前位置: 首页 > news >正文

免费建微网站南昌seo服务

免费建微网站,南昌seo服务,成都电商网站制作,sem代运营目录 1.MySQL Client 2.JDBC 3. 查询计划 4.Spark Doris Connector 5.Flink Doris Connector 1.MySQL Client Doris 采用 MySQL 协议,高度兼容 MySQL 语法,支持标准 SQL,用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后&a…

目录

1.MySQL Client

2.JDBC

3. 查询计划

4.Spark Doris Connector

5.Flink Doris Connector


1.MySQL Client

        Doris 采用 MySQL 协议,高度兼容 MySQL 语法,支持标准 SQL,用户可以通过各类客户端工具来访问 Doris。登录到doris服务器后,可使用 select语句查询数据。

mysql -uroot -P9030 -h127.0.0.1

        为了防止用户的一个查询可能因为消耗内存过大。查询进行了内存控制,一个查询任务,在单个 BE 节点上默认使用不超过 2GB 内存。用户在使用时,如果发现报 Memory limit exceeded 错误,一般是超过内存限制了。遇到内存超限时,用户应该尽量通过优化自己的 sql 语句来解决。如果确切发现2GB内存不能满足,可以手动设置内存参数。

    select 查询如果使用limit分页查询,则需要指定order by 字段,否则同一个sql返回的数据可能不一样。

2.JDBC

        由于Doris 采用 MySQL 协议,同样也支持通过JDBC方式读取数据。

package com.yichenkeji.demo.test;import lombok.extern.slf4j.Slf4j;import java.sql.*;
import java.util.Properties;@Slf4j
public class DorisJDBCDemo {public static void main(String[] args) throws SQLException {String jdbc_driver = "com.mysql.cj.jdbc.Driver";String jdbc_url = "jdbc:mysql://192.168.179.131:9030/demo?rewriteBatchedStatements=true";String username = "root";String password = "";Connection conn = getConnection(username,password,jdbc_url,jdbc_driver);log.info("{}",conn);String sql = "select * from dim_area limit 10";Statement stmt = conn.createStatement();ResultSet rs = stmt.executeQuery(sql);while (rs.next()){log.info("id={},name={}",rs.getFloat("id"),rs.getString("name"));}closeConnection(conn);}/*** 获取连接* @param username* @param password* @param jdbcUrl* @param driver* @return*/public static Connection getConnection(String username,String password,String jdbcUrl,String driver) {Properties prop = new Properties();prop.put("user", username);prop.put("password", password);try {Class.forName(driver);log.info("jdbcUrl:{}",jdbcUrl);return DriverManager.getConnection(jdbcUrl, prop);} catch (Exception e) {throw new RuntimeException(e);}}/*** 关闭连接* @param conn*/public static void closeConnection(Connection conn) {if(conn != null){try {if(!conn.isClosed()){conn.close();}} catch (SQLException e) {log.error("SQLException:{}", e.getMessage());}}}}

3. 查询计划

        由于jdbc查询暂时不支持流式读取,如果读取的数据量过大,一次性读取全部数据需要很大的资源,所有可以使用查询计划API接口,给定一个 SQL,获取该 SQL 对应的查询计划。通过返回的数据分区信息,分批读取数据。

package com.yichenkeji.demo.test;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.TDorisExternalService;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.doris.sdk.thrift.TScanCloseParams;
import org.apache.doris.sdk.thrift.TScanColumnDesc;
import org.apache.doris.sdk.thrift.TScanNextBatchParams;
import org.apache.doris.sdk.thrift.TScanOpenParams;
import org.apache.doris.sdk.thrift.TScanOpenResult;
import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class DorisReaderDemo{static String dorisUrl = "192.168.179.131:8030";static String username = "root";static String password = "";static String database = "demo";static String table = "dim_area";static String querySql = String.format("SELECT id, name from demo.dim_area");static int readRowCount = 0;static int readTotal = 0;public static void main(String[] args) throws Exception {String queryPlanUrl = String.format("http://%s/api/%s/%s/_query_plan",dorisUrl,database,table);QueryPlanResult queryPlanResult = DorisUtil.getQueryPlan(username,password,queryPlanUrl,querySql);if (queryPlanResult != null && queryPlanResult.getOpaquedQueryPlan() != null){JSONObject partitions = queryPlanResult.getPartitions();log.info("partitions:{}",partitions);for(Map.Entry<String, Object> tablet : partitions.entrySet()){Long tabletId = Long.parseLong(tablet.getKey());JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));//get first backendString routingsBackend = value.getJSONArray("routings").getString(0);String backendHost = routingsBackend.split(":")[0];String backendPort = routingsBackend.split(":")[1];//connect backendTBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort));TProtocol protocol = factory.getProtocol(transport);TDorisExternalService.Client client = new TDorisExternalService.Client(protocol);if (!transport.isOpen()) {transport.open();}//build paramsTScanOpenParams params = new TScanOpenParams();params.cluster = "default_cluster";params.database = database;params.table = table;params.tablet_ids = Arrays.asList(tabletId);params.opaqued_query_plan = queryPlanResult.getOpaquedQueryPlan();// max row number of one read batchparams.setBatchSize(50000);params.setQueryTimeout(3600);params.setMemLimit(2147483648L);params.setUser(username);params.setPasswd(password);//open scannerTScanOpenResult tScanOpenResult = client.openScanner(params);if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.",routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs()));}List<TScanColumnDesc> selectedColumns = tScanOpenResult.getSelectedColumns();TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();nextBatchParams.setContextId(tScanOpenResult.getContextId());boolean eos = false;//read dataint offset = 0;while(!eos){nextBatchParams.setOffset(offset);TScanBatchResult nextResult = client.getNext(nextBatchParams);if (!TStatusCode.OK.equals(nextResult.getStatus().getStatusCode())) {throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.",routingsBackend, nextResult.getStatus().getStatusCode(), nextResult.getStatus().getErrorMsgs()));}eos = nextResult.isEos();if(!eos){RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();List<List<Object>> results = new ArrayList<>();while (arrowStreamReader.loadNextBatch()) {List<FieldVector>  fieldVectors = root.getFieldVectors();//total data rowsint rowCountInOneBatch = root.getRowCount();for(int row = 0 ; row < rowCountInOneBatch ;row++){List<Object> record = new ArrayList<>();for (int col = 0; col < fieldVectors.size(); col++) {FieldVector fieldVector = fieldVectors.get(col);Types.MinorType minorType = fieldVector.getMinorType();Object v = DorisUtil.convertValue(row , minorType, fieldVector);record.add(v);}results.add(record);}offset += root.getRowCount();}log.info("total data rows:{}",results.size());//处理完之后要关闭,否则容易内存溢出arrowStreamReader.close();}}//closeTScanCloseParams closeParams = new TScanCloseParams();closeParams.setContextId(tScanOpenResult.getContextId());client.closeScanner(closeParams);if ((transport != null) && transport.isOpen()) {transport.close();}}}}public static String basicAuthHeader(String username, String password) {final String tobeEncode = username + ":" + password;byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));return "Basic " + new String(encoded);}/*** 获取查询计划* @param username* @param password* @param queryPlanUrl* @param sql* @return* @throws Exception*/public static QueryPlanResult getQueryPlan(String username, String password, String queryPlanUrl, String sql){try (CloseableHttpClient client = HttpClients.custom().build()) {HttpPost post = new HttpPost(queryPlanUrl);post.setHeader(HttpHeaders.EXPECT, "100-continue");post.setHeader(HttpHeaders.AUTHORIZATION,  basicAuthHeader(username,password));log.info("queryPlanUrl:{}",queryPlanUrl);log.info("sql:{}",sql);//The param is specific SQL, and the query plan is returnedMap<String,String> params = new HashMap<>();params.put("sql",sql);StringEntity entity = new StringEntity(JSON.toJSONString(params));post.setEntity(entity);try (CloseableHttpResponse response = client.execute(post)) {if (response.getEntity() != null ) {JSONObject queryPlanJSONObject = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));JSONObject dataJSONObject = queryPlanJSONObject.getJSONObject("data");if (dataJSONObject.containsKey("exception")){throw new RuntimeException(dataJSONObject.getString("exception"));}String queryPlan = dataJSONObject.getString("opaqued_query_plan");JSONObject partitions = dataJSONObject.getJSONObject("partitions");return new QueryPlanResult(queryPlan,partitions);}}}catch (Exception e){throw new RuntimeException(e);}return null;}}

4.Spark Doris Connector

        Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。

val dorisSparkDF = spark.read.format("doris").option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME").option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT").option("user", "$YOUR_DORIS_USERNAME").option("password", "$YOUR_DORIS_PASSWORD").load()dorisSparkDF.show(5)

5.Flink Doris Connector

        Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍Flink如何通过Datastream和SQL操作Doris。

DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();

http://www.dt0577.cn/news/55367.html

相关文章:

  • wordpress更新一直下载失败南宁seo外包服务商
  • 有哪些免费做外贸网站全球网站流量排名查询
  • 做网站收入怎么样国外友链买卖平台
  • 用python做的网站多吗重庆网站
  • 做网站行业的动态seo的主要内容
  • 网站菜单分类怎么做宁德市疫情
  • 简单的网页案例seo就业指导
  • 公司有多少做网站互联网营销方式有哪些
  • 建一个网站大概多少钱seo公司赚钱吗
  • 网站建设客户需求分析调研表如何弄一个自己的网站
  • web是做网站的吗免费网站开发平台
  • 网站建设的流程电子商务拉新项目官方一手平台
  • 营销网站开发规划搜索引擎营销的特征
  • 网站建设与web前端区别网址链接查询
  • 免费微网站与公众号平台对接免费写文章的软件
  • 做的网站在ie会乱码百度竞价入门教程
  • 游戏官网做的好的网站网络营销制度课完整版
  • 做视频网站多大服务器搜索引擎优化文献
  • 怎样做彩票网站seo公司怎么样
  • 株洲做网站优化口碑营销方案
  • 网站编辑怎么做二级域名网站查询入口
  • 如何用asp.net做网站石家庄网站建设seo
  • 京山网站开发深圳营销型网站
  • 科技医疗网站建设昆明seo
  • 上海专业网站建设多少钱seo在线培训课程
  • 西安手机网站开发开发做一个网站需要多少钱
  • 兰州有做百度网站的吗无经验能做sem专员
  • 网站 页面 结构济南网站seo优化
  • 制作软件的工作叫什么北京搜索关键词优化
  • 外贸建立网站怎么做企业文化培训