-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathindex.js
79 lines (67 loc) · 2.15 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
const { BigQuery } = require('@google-cloud/bigquery');
const bq = new BigQuery();
const datasetId = 'weather_etl';
const tableId = 'Weather_Table';
const { Storage } = require('@google-cloud/storage');
const csv = require('csv-parser');
exports.readObservation = (file, context) => {
const gcs = new Storage();
const dataFile = gcs.bucket(file.bucket).file(file.name);
dataFile.createReadStream()
.on('error', (error) => {
console.error(error);
})
.pipe(csv())
.on('data', (row) => {
printDict(row);
writeToBq(row);
})
.on('end', () => {
console.log('End!');
});
};
// HELPER FUNCTIONS
function printDict(row) {
for (let key in row) {
console.log(`${key}: ${row[key]}`);
}
}
async function writeToBq(rows) {
rows = [rows];
// Transform numeric fields and replace missing values
rows.forEach((row) => {
for (let key in row) {
if (!isNaN(row[key])) {
let value = parseFloat(row[key]);
if (value === -9999) {
row[key] = null;
}
}
}
// Set station identifier code for all rows
row['station'] = '724380-93819';
for (let key in row) {
if (!isNaN(row[key])) {
if (key === 'airtemp' || key === 'dewpoint' || key === 'pressure' || key === 'windspeed' || key === 'precip1hour' || key === 'precip6hour') {
// Convert numeric fields to decimal by dividing by 10
row[key] = parseFloat(row[key]) / 10;
if (isNaN(row[key])) {
row[key] = null;
}
} else {
// Convert other numeric fields to integers
row[key] = parseInt(row[key]);
}
}
}
});
rows.forEach((element) => console.log(element));
await bq
.dataset(datasetId)
.table(tableId)
.insert(rows)
.then(() => {
rows.forEach((row)=>{console.log(`${row}`)})
})
.catch((err)=>{console.error(`ERROR: ${err}`)})
}