简介
Firebase 是一個集成工具.可以方便的與一些應用進行集成.官方文檔如下:
按平臺查看 Firebase 文檔
按產品查看 Firebase 文檔
Firebase 項目中的數據可以導出到很多地方.由於是谷歌旗下的,而且很多的使用者會將數據導出到雲存儲倉庫,也就是 Bigquery.會藉助 Bigquery 強大的 sql 功能進行數據的分析應用.所以通常來講, Firebase + Bigquery 是大多數開發者的選擇.
關於將 Firebase 項目中的數據導出到 Bigquery,這塊的工作需要由 Firebase 的使用方進行操作.可以參考 Firebase 官方文檔這篇文章.
本文檔僅僅適用於將 Firebase 項目中的數據導出到 Bigquery ,然後藉助 Bigquery 的 sql 查詢功能進行數據的查詢過濾, 然後將查詢到的結果數據轉到雲存儲(也就是 Google cloud storage),再將雲存儲中的數據文件下載到本地機器,然後對文件數據進行解析處理併發送到你所指定的平臺.
1. 客戶需要提供的配置
- 具有 Google Big Query 和 Google Cloud Storage 讀寫權限的 Service Account 的 JSON 文件,下面是一份配置樣例
{
"type": "service_account",
"project_id": "color-painting-2f221",
"private_key_id": "3497ffb7596*****************6927e9aa3563",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgI ********** KgwggSkAgEAAoI ********** QZQzUcpXUAdTZL ********** tR7Bn/oU\n-----END PRIVATE KEY-----\n",
"client_email": "color-pain**********221@appspot.gserviceaccount.com",
"client_id": "10450 ********** 07517",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/ro**t/v1/me***ata/x509/co******ng-2***21%40appspot.gserviceaccount.com"
}
- Google Cloud Storage 的 bucketName(注意 Google Cloud Storage 和 Google Big Query 需要在一個region),下面是一份配置樣例
bucketName: "us.artifacts.t****h.a****pot.com"
- Google Big Query 的項目名稱,代表某一個具體的項目存儲在 Bigquery 上的名字.下面是一份配置樣例
projectId: "t*******sh"
- Google Big Query 某一個項目下的數據集,數據集代表某一個項目下的具體存放位置.數據集下面通常就是存儲數據的表了.下面是一份配置樣例
dataset: "an********867"
- 數據表的通配符.如果一個項目下的所有數據在數據集中存儲的所有表名稱都有相同的前綴,那麼也可以配置一下這個前綴.(該前綴非必須)
2. 樣例工程
樣例工程暫無公共下載地址,請聯繫研發同事.
2.1 樣例工程結構
2.2 代碼示例
代碼僅提供示例和思路,切勿直接照搬!
此代碼案例僅展示如何使用 Bigquery 的 sdk 進行程序的編寫.具體業務邏輯需要根據自己的實際業務進行修改!
2.2.1 從 Bigquery 中查詢數據並將結果存入雲存儲
在此之前,如果有將 Firebase 項目中的數據導入到 Bigquery 中.那麼在此代碼示例中就可以連接 Bigquery 進行數據的查詢,並且將查詢的結果數據存儲到谷歌雲存儲上.
import com.google.api.gax.paging.Page;
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.*;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.threeten.bp.Duration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static java.time.format.DateTimeFormatter.BASIC_ISO_DATE;
/**
* @Description: 通过 bigquery 的查询将结果存储到 Google 的云存储中
* @Company: ThinkingData
* @Author: Spring.Wang
* @Date: 2022/1/27
* @Version: 1.0
* @Copyright: Copyright (c) 2022
*/
@Service
public class ExportToCloudStorageService {
private static final Logger logger = LoggerFactory.getLogger(ExportToCloudStorageService.class);
@Autowired
private BQParams bqParams;
@Resource(name = "StorageQueue")
private BlockingQueue<CloudStorageInfo> storageQueue;
private ThreadPoolExecutor threadPoolExecutor;
private List<String> dates;
private BigQuery bigQuery;
private Storage storage;
private String projectId;
private String dataSet;
private String region;
private String filter;
private String bucketName;
private String lower;
private String upper;
private String filePath;
private Integer export;
private SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
@PostConstruct
public void init(){
projectId = bqParams.projectId;
dataSet = bqParams.dataset;
region = bqParams.region;
filter = bqParams.filter;
bucketName = bqParams.bucketName;
lower = bqParams.lowerbound;
upper = bqParams.upperbound;
filePath = bqParams.filePath;
export = bqParams.export;
bigQuery = GoogleUtil.getBigQuery(filePath,projectId);
storage = GoogleUtil.getStorage(filePath,projectId);
getDates();
start();
}
//获取时间范围
private List<String> getDates(){
LocalDate lowerbound = LocalDate.parse(lower.replace("-",""), BASIC_ISO_DATE);
LocalDate upperbound = LocalDate.parse(upper.replace("-",""), BASIC_ISO_DATE);
if(upperbound.isBefore(lowerbound)){
logger.error("=====时间下界不能小于时间上界=====");
System.exit(-1);
}
LocalDate temp = lowerbound;
LocalDate upperboundPlus = upperbound.plusDays(1);
dates = new ArrayList<>();
while (temp.isBefore(upperboundPlus)){
dates.add(temp.format(BASIC_ISO_DATE));
temp = temp.plusDays(1);
}
return dates;
}
//开启线程池开始查询数据并且存储到 Google 云盘上
private void start(){
logger.info("=====开始进行数据的查询导出存储处理======");
AtomicInteger threadNum = new AtomicInteger(0);
threadPoolExecutor = new ThreadPoolExecutor(export,export, 10,TimeUnit.SECONDS,new ArrayBlockingQueue<>(180),
r -> {
Thread thread = new Thread(r,"export-threadPool-"+threadNum.incrementAndGet());
return thread;
},new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
//注意这里不能以导出线程数量作为 latch 计数.如果处理的是 7 天的数据,但处理线程为 1,会导致毒药标记提前放入
final CountDownLatch latch = new CountDownLatch(dates.size());
for (String date : dates) {
threadPoolExecutor.submit(new ExportTask(date,latch));
}
threadPoolExecutor.submit(new EndTask(latch));
}
@PreDestroy
public void destroy(){
while (true){
if(threadPoolExecutor.getActiveCount() == 0){
logger.info("=====开始关闭数据查询导出服务=====");
threadPoolExecutor.shutdown();
if(!threadPoolExecutor.isTerminated()){
try {
threadPoolExecutor.awaitTermination(60,TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("=====关闭数据查询导出服务异常!=====",e);
}
threadPoolExecutor.shutdownNow();
}
logger.info("=====数据查询导出服务成功关闭=====");
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//数据导出线程(数据从 bigquery 中查询出来,导出到一张临时表中)
private class ExportTask implements Runnable {
private final String date;
private final CountDownLatch latch;
private final String jobId;
public ExportTask(String date, CountDownLatch latch) {
this.date = date;
this.latch = latch;
this.jobId = UUID.randomUUID().toString();
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
logger.info("=====数据查询线程是: " + Thread.currentThread().getName() + " =====");
logger.info("=====开始处理当日分区表的数据=====");
dealCurrentDay();
logger.info("=====当日分区表数据处理完毕=====");
logger.info("=====开始处理 1-7 天前的增量数据=====");
for (int i = 1; i <= 7; i++) {
deal1to7DayBefore(i);
}
logger.info("===== 1-7 天前的增量数据处理完毕=====");
logger.info("=====数据查询线程: " + Thread.currentThread().getName() + "处理成功=====");
} finally {
latch.countDown();
}
}
//处理当日的分区表数据
private void dealCurrentDay() {
try {
//首先为当日分区表数据创建一张状态表
String currentDayStateTable = String.format("ta_export_%s_events_%s_%s", region, date, date);
logger.info(String.format("%s 当日状态表:%s", date, currentDayStateTable));
//查询当日状态数据,插入到状态表中
TableId tableId = TableId.of(projectId, dataSet, currentDayStateTable);
String sql = String.format("select * from `%s.%s.events_%s`", projectId, dataSet, date) + filter;
logger.info(String.format("查询 %s 状态数据语句:%s", date, sql));
QueryJobConfiguration queryJobConfiguration = QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false).setDestinationTable(tableId).build();
bigQuery.query(queryJobConfiguration);
Thread.sleep(1000);
//将状态表中的数据存储到 google 云
String googlePath = String.format("gs://%s/ta_export/%s-*.json.gz", bucketName, currentDayStateTable);
ExtractJobConfiguration extractJobConfiguration = ExtractJobConfiguration.newBuilder(tableId, googlePath).setCompression("gzip").setFormat(FormatOptions.json().getType()).build();
Job job = bigQuery.create(JobInfo.of(extractJobConfiguration)).waitFor(RetryOption.totalTimeout(Duration.ofMinutes(15)));
if (job.getStatus().getError() != null) {
logger.error(String.format("%s 当日状态表数据导入到 google 云失败: %s", date, job.getStatus().getError().getMessage()));
}
logger.info(String.format("%s 当日状态表数据导入到 google 云成功!", date));
Thread.sleep(1000);
//把 google 云中的数据放入到存储队列中
String prefix = String.format("ta_export/%s-", currentDayStateTable);
Page<Blob> list = storage.list(bucketName, Storage.BlobListOption.currentDirectory(), Storage.BlobListOption.prefix(prefix));
Schema schema = bigQuery.getTable(dataSet, currentDayStateTable).getDefinition().getSchema();
for (Blob blob : list.iterateAll()) {
CloudStorageInfo cloudStorageInfo = new CloudStorageInfo();
cloudStorageInfo.setBlob(blob);
cloudStorageInfo.setDate(date);
cloudStorageInfo.setEnd(false);
cloudStorageInfo.setJobId(jobId);
cloudStorageInfo.setSchema(schema);
cloudStorageInfo.setMd5(blob.getMd5ToHexString());
storageQueue.put(cloudStorageInfo);
}
} catch (Exception e) {
logger.error(String.format("处理当日 %s 分区表数据失败:%s",date,e.getMessage()));
System.exit(-1);
}
}
//处理 1-7 天前的原表,在今天的增量数据(需和昨天的状态表比较)
private void deal1to7DayBefore(int i) {
String partitionStr = "";
String lastStr = "";
try {
Calendar partition = Calendar.getInstance();
partition.add(Calendar.DAY_OF_MONTH, -3-i);
partitionStr = format.format(partition.getTime());
Calendar last = Calendar.getInstance();
last.add(Calendar.DAY_OF_MONTH, -4);
lastStr = format.format(last.getTime());
//先判断(-3-i)天前分区状态表存不存在,不存在就不用处理,在程序最开始会用到
String lastDayStateTable = String.format("ta_export_%s_events_%s_%s", region, partitionStr, lastStr);
Table table = bigQuery.getTable(TableId.of(projectId, dataSet, lastDayStateTable));
if (table != null) {
logger.info(String.format("昨天存在 %s 分区表的状态表,可以进行增量处理", partitionStr));
//生成原表在今天的状态表
String currentDayStateTable = String.format("ta_export_%s_events_%s_%s", region, partitionStr, date);
logger.info(String.format("%s 在 %s 的状态表:%s", partitionStr, date, currentDayStateTable));
//查询原表数据,放入当前的状态表中
TableId tableId = TableId.of(projectId, dataSet, currentDayStateTable);
String sql_select = String.format("select * from `%s.%s.events_%s`", projectId, dataSet, partitionStr) + filter;
logger.info(String.format("查询 %s 状态数据语句:%s", partitionStr, sql_select));
QueryJobConfiguration query_select = QueryJobConfiguration.newBuilder(sql_select).setUseLegacySql(false).setDestinationTable(tableId).build();
bigQuery.query(query_select);
Thread.sleep(1000);
//结果表用之前清空,该表复用
String dest = String.format("%s.%s.%s_result_response",projectId,dataSet,region);
String sql_truncate = String.format("truncate table `%s`",dest);
logger.info(String.format("清空结果表的语句:%s", sql_truncate));
QueryJobConfiguration query_truncate = QueryJobConfiguration.newBuilder(sql_truncate).setUseLegacySql(false).build();
bigQuery.query(query_truncate);
Thread.sleep(1000);
//将今天和昨天的状态表,进行对比,筛选出增量数据,并插入结果表
String latest = String.format("%s.%s.ta_export_%s_events_%s_%s",projectId,dataSet,region,partitionStr,lastStr);
String current = String.format("%s.%s.ta_export_%s_events_%s_%s",projectId,dataSet,region,partitionStr,date);
String sql_insert = String.format("insert into `%s`\n" +
"select\n" +
"b_event_date,\n" +
"b_event_timestamp,\n" +
"b_event_name,\n" +
"b_avent_params,\n" +
"b_event_previous_timestamp,\n" +
"b_event_value_in_usd,\n" +
"b_event_bundle_sequence_id,\n" +
"b_event_server_timestamp_offset,\n" +
"b_user_id,\n" +
"b_user_pseudo_id,\n" +
"b_privacy_info,\n" +
"b_user_properties,\n" +
"b_user_first_touch_timestamp,\n" +
"b_user_ltv,\n" +
"b_device,\n" +
"b_geo,\n" +
"b_app_info,\n" +
"b_traffic_source,\n" +
"b_stream_id,\n" +
"b_platform,\n" +
"b_event_dimensions,\n" +
"b_ecommerce,\n" +
"b_items\n" +
"from\n" +
"(select\n" +
"a.event_date a_event_date,\n" +
"a.event_timestamp a_event_timestamp,\n" +
"a.event_name a_event_name,\n" +
"a.event_params a_avent_params,\n" +
"a.event_previous_timestamp a_event_previous_timestamp,\n" +
"a.event_value_in_usd a_event_value_in_usd,\n" +
"a.event_bundle_sequence_id a_event_bundle_sequence_id,\n" +
"a.event_server_timestamp_offset a_event_server_timestamp_offset,\n" +
"a.user_id a_user_id,\n" +
"a.user_pseudo_id a_user_pseudo_id,\n" +
"a.privacy_info a_privacy_info,\n" +
"a.user_properties a_user_properties,\n" +
"a.user_first_touch_timestamp a_user_first_touch_timestamp,\n" +
"a.user_ltv a_user_ltv,\n" +
"a.device a_device,\n" +
"a.geo a_geo,\n" +
"a.app_info a_app_info,\n" +
"a.traffic_source a_traffic_source,\n" +
"a.stream_id a_stream_id,\n" +
"a.platform a_platform,\n" +
"a.event_dimensions a_event_dimensions,\n" +
"a.ecommerce a_ecommerce,\n" +
"a.items a_items,\n" +
"b.event_date b_event_date,\n" +
"b.event_timestamp b_event_timestamp,\n" +
"b.event_name b_event_name,\n" +
"b.event_params b_avent_params,\n" +
"b.event_previous_timestamp b_event_previous_timestamp,\n" +
"b.event_value_in_usd b_event_value_in_usd,\n" +
"b.event_bundle_sequence_id b_event_bundle_sequence_id,\n" +
"b.event_server_timestamp_offset b_event_server_timestamp_offset,\n" +
"b.user_id b_user_id,\n" +
"b.user_pseudo_id b_user_pseudo_id,\n" +
"b.privacy_info b_privacy_info,\n" +
"b.user_properties b_user_properties,\n" +
"b.user_first_touch_timestamp b_user_first_touch_timestamp,\n" +
"b.user_ltv b_user_ltv,\n" +
"b.device b_device,\n" +
"b.geo b_geo,\n" +
"b.app_info b_app_info,\n" +
"b.traffic_source b_traffic_source,\n" +
"b.stream_id b_stream_id,\n" +
"b.platform b_platform,\n" +
"b.event_dimensions b_event_dimensions,\n" +
"b.ecommerce b_ecommerce,\n" +
"b.items b_items\n" +
"from `%s` a\n" +
"right outer join `%s` b\n" +
"on a.event_name=b.event_name and a.event_timestamp=b.event_timestamp and a.device.advertising_id=b.device.advertising_id) t\n" +
"where t.b_event_name is not null and t.b_event_timestamp is not null and t.b_device.advertising_id is not null and\n" +
"t.a_event_name is null and t.a_event_timestamp is null and t.a_device.advertising_id is null", dest, latest, current);
logger.info(String.format("获取 %s 增量数据语句:%s", partitionStr, sql_insert));
QueryJobConfiguration query_insert = QueryJobConfiguration.newBuilder(sql_insert).setUseLegacySql(false).build();
bigQuery.query(query_insert);
Thread.sleep(1000);
//将结果表中的数据(增量)存储到 google 云
String destTable = String.format("%s_result_response",region);
TableId destTableId = TableId.of(projectId, dataSet, destTable);
String googlePath = String.format("gs://%s/ta_export/%s-*.json.gz", bucketName, destTable);
ExtractJobConfiguration extractJobConfiguration = ExtractJobConfiguration.newBuilder(destTableId, googlePath).setCompression("gzip").setFormat(FormatOptions.json().getType()).build();
Job job = bigQuery.create(JobInfo.of(extractJobConfiguration)).waitFor(RetryOption.totalTimeout(Duration.ofMinutes(15)));
if (job.getStatus().getError() != null) {
logger.error(String.format("%s 增量数据导入到 google 云失败: %s", partitionStr, job.getStatus().getError().getMessage()));
}
logger.info(String.format("%s 增量数据导入到 google 云成功!", partitionStr));
Thread.sleep(1000);
//增量数据筛选出来后需要删除昨天的状态表,因为状态表是每天都会进行更新的.我们每次比较的时候比较的都是某一天的分区表在今天的状态表和截至到昨天为止最新的状态表
//今天比对完之后,昨天的状态表就可以删除了.明天的时候会用明天的状态表和今天的状态表进行比对,昨天的状态表就没用了.
boolean delete = bigQuery.delete(TableId.of(projectId, dataSet, lastDayStateTable));
if (delete) {
logger.info(String.format("%s 在 %s 的状态表删除成功!", partitionStr, lastStr));
} else {
logger.error(String.format("%s 在 %s 的状态表删除失败!!!", partitionStr, lastStr));
}
Thread.sleep(1000);
//把 google 云中的数据放入到存储队列中
String prefix = String.format("ta_export/%s-", destTable);
Page<Blob> list = storage.list(bucketName, Storage.BlobListOption.currentDirectory(), Storage.BlobListOption.prefix(prefix));
Schema schema = bigQuery.getTable(TableId.of(projectId,dataSet,destTable)).getDefinition().getSchema();
for (Blob blob : list.iterateAll()) {
CloudStorageInfo cloudStorageInfo = new CloudStorageInfo();
cloudStorageInfo.setBlob(blob);
cloudStorageInfo.setDate(partitionStr);
cloudStorageInfo.setEnd(false);
cloudStorageInfo.setJobId(jobId);
cloudStorageInfo.setSchema(schema);
cloudStorageInfo.setMd5(blob.getMd5ToHexString());
storageQueue.put(cloudStorageInfo);
}
} else {
logger.info(String.format("昨天不存在 %s 分区表的状态表,无增量处理", partitionStr));
}
} catch (Exception e) {
logger.error(String.format("处理 %s 那天分区数据在 %s 今天失败:%s",partitionStr,date,e.getMessage()));
System.exit(-1);
}
}
}
//把毒药数据标记位放入到存储队列中
private class EndTask implements Runnable{
private CountDownLatch latch;
public EndTask(CountDownLatch latch){
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("=====开始放入存储队列毒药标记=====");
CloudStorageInfo cloudStorageInfo = new CloudStorageInfo();
cloudStorageInfo.setEnd(true);
try {
storageQueue.put(cloudStorageInfo);
} catch (InterruptedException e) {
logger.error("=====存储队列毒药标记放入失败=====",e);
}
logger.info("=====存储队列毒药标记放入成功=====");
}
}
}
此代碼包含了連接 Bigquery 進行認證.獲取 Bigquery 的表,運行 sql 查詢語句查詢數據並將結果放入臨時表,將臨時表中的數據轉儲到雲存儲上形成文件等內容.
2.2.2 從雲存儲上下載文件到本地機器
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.google.cloud.storage.Blob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description: 把 Google 云盘中的数据拉取到本地存储
* @Company: ThinkingData
* @Author: Spring.Wang
* @Date: 2022/1/29
* @Version: 1.0
* @Copyright: Copyright (c) 2022
*/
@Service
public class FetchDataToLocalService {
private final static Logger logger = LoggerFactory.getLogger(FetchDataToLocalService.class);
@Autowired
private BQParams bqParams;
@Resource(name = "StorageQueue")
private BlockingQueue<CloudStorageInfo> storageQueue;
@Resource(name = "FetchQueue")
private BlockingQueue<LocalStorageInfo> fetchQueue;
private ThreadPoolExecutor threadPoolExecutor;
private String localDir;
private Integer fetch;
private final Retryer retryer = RetryerUtil.initRetryerByIncTimes(5, 10L, 30L);
@PostConstruct
public void init(){
localDir = bqParams.localDir;
fetch = bqParams.fetch;
//创建本地存储路径
if(!Files.exists(Paths.get(localDir))){
try {
Files.createDirectories(Paths.get(localDir));
} catch (IOException e) {
logger.error("==========创建本地存储路径失败==========",e);
return;
}
}
start();
}
//从存储队列中拿出数据,变换成本地数据格式,然后放入到抓取队列中.
private void start(){
logger.info("==========开始从存储队列中拿取数据==========");
AtomicInteger threadNum = new AtomicInteger(0);
threadPoolExecutor = new ThreadPoolExecutor(fetch, fetch, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(180),
r -> {
Thread thread = new Thread(r, "fetch-threadPool-" + threadNum.incrementAndGet());
return thread;
}, new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
final CountDownLatch latch = new CountDownLatch(fetch);
for (int i = 0; i < fetch; i++) {
threadPoolExecutor.submit(new FetchTask(latch));
}
threadPoolExecutor.submit(new EndTask(latch));
}
@PreDestroy
public void destroy(){
while (true){
if(threadPoolExecutor.getActiveCount() == 0){
logger.info("==========开始关闭数据抓取服务==========");
threadPoolExecutor.shutdown();
if(!threadPoolExecutor.isTerminated()){
try {
threadPoolExecutor.awaitTermination(60,TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("==========关闭数据抓取服务异常!==========",e);
}
threadPoolExecutor.shutdownNow();
}
logger.info("==========数据抓取服务关闭成功==========");
break;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private class FetchTask implements Runnable{
private final CountDownLatch latch;
public FetchTask(CountDownLatch latch){
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("==========数据抓取线程是: " + Thread.currentThread().getName()+"==========");
while (true){
try {
//从存储队列中拿取数据
CloudStorageInfo cloudStorageInfo = storageQueue.take();
if(cloudStorageInfo == null){
Thread.sleep(1000);
continue;
}
//不是毒药数据的处理
if(!cloudStorageInfo.isEnd()){
Blob blob = cloudStorageInfo.getBlob();
String name = blob.getName().replace("/", "");
Path localDirPath = Paths.get(localDir).resolve(Paths.get(cloudStorageInfo.getDate()));
if(!Files.exists(localDirPath)){
Files.createDirectories(localDirPath);
}
Path localPath = localDirPath.resolve(Paths.get(name));
retryer.call(() -> {
try {
blob.downloadTo(localPath);
logger.info("==========" + blob.getName()+ "从 Google 云存储下载到本地成功==========");
} catch (Exception e) {
// 这里可能会出现 Connection closed prematurely: bytesRead = ?, Content-Length = ? 异常
// 原因可能跟谷歌的网络波动有关,如果这里下载失败,会导致文件不完整,那么后面的数据处理线程处理到该不完整文件时
// 会导致数据处理线程失败.会有连锁反应,看后续如何优化,技术支持的同事可以考虑一下
logger.error("==========" + blob.getName() +"下载失败==========",e);
}
return null;
}
);
//创建本地存储格式数据
LocalStorageInfo localStorageInfo = new LocalStorageInfo();
localStorageInfo.setPath(localPath);
localStorageInfo.setEnd(false);
localStorageInfo.setJobId(cloudStorageInfo.getJobId());
localStorageInfo.setSchema(cloudStorageInfo.getSchema());
localStorageInfo.setMd5(cloudStorageInfo.getMd5());
fetchQueue.put(localStorageInfo);
boolean delete = blob.delete();
if(delete){
logger.info("==========" + blob.getName()+ "从 Google 云存储上删除成功==========");
}else{
logger.error("==========" + blob.getName() +"从 Google 云存储上删除失败==========");
}
}else{
//毒药数据的处理(最后才会处理)
//如果碰到毒药数据,则停止当前抓取线程,同时将毒药数据放回到存储队列中
//如果是多线程,那么别的线程也会从存储队列中拿到毒药,进行停止.最终全部线程停止
storageQueue.put(cloudStorageInfo);
latch.countDown();
break;
}
} catch (InterruptedException | IOException | ExecutionException | RetryException e) {
logger.error("==========数据抓取线程:" + Thread.currentThread().getName()+"处理失败==========",e);
}
}//end while
logger.info("==========数据抓取线程:" + Thread.currentThread().getName()+"处理成功==========");
}
}
private class EndTask implements Runnable{
private final CountDownLatch latch;
public EndTask(CountDownLatch latch){
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("==========开始放入抓取队列毒药标记==========");
LocalStorageInfo localStorageInfo = new LocalStorageInfo();
localStorageInfo.setEnd(true);
try {
fetchQueue.put(localStorageInfo);
} catch (InterruptedException e) {
logger.error("==========抓取队列毒药标记放入失败==========",e);
}
logger.info("==========抓取队列毒药标记放入成功==========");
}
}
}
此代碼展示瞭如何從谷歌雲存儲上將數據文件下載到本地機器,下載結束後刪除雲存儲上的臨時數據文件等內容.
問題解答
在對接任何廣告平台API時,若遇到無論是棘手的技術或者產品問題,歡迎聯繫我幫你解答,查看我的履歷