S3 ke rds menggunakan python

Di postingan sebelumnya, kita membahas cara memindahkan data dari bucket S3 sumber ke target setiap kali file baru dibuat di bucket sumber dengan menggunakan fungsi AWS Lambda. Dalam posting ini, saya akan menunjukkan cara menggunakan Lambda untuk mengeksekusi penyerapan data dari S3 ke RDS setiap kali file baru dibuat di keranjang sumber. AWS Lambda mendukung beberapa bahasa pemrograman yang berbeda. Kami akan menggunakan Python 3. 6 di sini

Untuk mengakses RDS dengan fungsi lambda, fungsi lambda Anda perlu mengakses VPC tempat RDS berada dengan memberikan izin yang tepat ke fungsi tersebut. Anda juga perlu memberi tahu fungsi VPC mana yang akan diakses dan grup keamanan mana dalam VPC yang akan digunakan. Ini sebenarnya lebih sederhana daripada kedengarannya dan saya akan membahas semuanya di sini. AWS memiliki dokumentasi tentang cara mengonfigurasi fungsi lambda untuk mengakses RDS di sini, yang saya rujuk untuk menulis posting ini

Skenario

Mari pikirkan tentang skenario penyerapan waktu nyata. Anda memiliki aplikasi yang membuang data di S3 setiap 15 menit di JSON. Persyaratan Anda adalah mengambil data dari S3, mengubahnya, dan menulisnya ke Postgres RDS setiap kali ada file baru yang masuk ke bucket

Detail Penyerapan

Sebagai contoh, mari kita gunakan contoh data JSON yang digunakan di sini (Bagaimana Kueri JSON Postgres Menangani Kunci yang Hilang). Data ini juga digunakan di postingan Lambda sebelumnya (Penyerapan Data Berbasis Peristiwa dengan AWS Lambda (S3 hingga S3)). Intinya, kami akan mengubah target dari S3 ke Postgres RDS. Sebagai metode penyerapan, kami akan memuat data sebagai JSON ke dalam Postgres. Kami membahas metode penyerapan ini di sini (Strategi Penyerapan Data JSON Baru dengan Menggunakan Kekuatan Postgres)

Prasyarat

  • Mesin Mac atau Linux dengan Python 3. 6 lingkungan pengembangan
  • Lingkungan virtual untuk fungsi lambda ini dengan psycopg2-binary diinstal
  • Postgres RDS dengan skema dan tabel target (mis. g. usermanaged. transaksi)
  • Sumber S3 Bucket (mis. g. lambda. tes. sumber)

Jika Anda memiliki mesin Windows, saya kesulitan membuat psycopg2 bekerja di lambda saat saya mendorong fungsi dari sana. Saya merekomendasikan untuk menggunakan kotak virtual Linux untuk mengembangkan fungsi lambda jika Anda menggunakan Windows. Fungsi Lambda berjalan di Linux EC2 dan mengembangkan kode di lingkungan OS serupa lebih baik untuk bahasa skrip seperti Python

Untuk menyiapkan lingkungan pengembangan Python di Linux, kami memiliki instruksi

Untuk meluncurkan Postgres RDS, lihat postingan di bawah ini. Anda perlu membuat tabel dengan satu kolom dengan tipe data jsonb

Anda perlu membuat tabel dengan satu kolom dengan tipe data jsonb. Lihat posting di bawah ini untuk kumpulan data dan pembuatan tabel

Kode

Kami akan menggunakan file konfigurasi untuk detail koneksi database. Untuk skema target dan nama tabel, kami akan menetapkan variabel lingkungan saat kami menerapkan fungsi. Simpan file sebagai db_config. py

db_namapengguna = ""
db_kata sandi = ""
db_name = ""
db_endpoint = ""

Fungsi utamanya adalah handler(). File disimpan sebagai MoveS3ToPg. py, yang akan menjadi nama fungsi lambda

Nama dan kunci bucket diambil dari peristiwa. Peristiwa S3 adalah file JSON yang berisi nama bucket dan kunci objek

Kode mengambil file target dan mengubahnya menjadi file csv. Kemudian, diunggah ke Postgres dengan perintah salin. Untuk penjelasan mendetail tentang pola penyerapan ini, lihat Strategi Penyerapan Data JSON Baru dengan Menggunakan Kekuatan Postgres

Untuk parameter basis data, impor db_config. py dengan pernyataan impor dan ambil nilainya. Nama skema dan tabel ditetapkan sebagai variabel lingkungan yang dapat diambil oleh os. mengepung

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

impor boto3
impor botokor
impor psycopg2
impor os
impor json
impor db_config

dbname = db_config. db_name
pengguna = db_config. db_username
tuan rumah = db_config. db_endpoint
kata sandi = db_config. db_password
skema = os. lingkungan['targetSchema']
tabel = os. lingkungan['Tabel target']

connection_string = "dbname='{}' user='{}' host='{}' password='{}'"\
    . format(dbname, pengguna, host, kata sandi)

klien = boto3. klien('s3', 'ap-tenggara-2', \
config=botocore. config. Konfigurasi(s3={'addression_style'. 'jalur'}))

# memeriksa
cetak (dbname, pengguna, host, kata sandi, skema, tabel, connection_string)

def pg_load(connection_string, table_name, file_path)
mencoba
conn = psycopg2. terhubung(connection_string)
print("Menghubungkan ke Database")
cur = samb. kursor()
# Buka file input untuk disalin
f = buka(file_path, "r")
# Muat file csv ke dalam tabel
bajingan. copy_expert("salin {} DARI STDIN DENGAN kutipan CSV e'\x01' pembatas e'\x02'". format(nama_tabel), f)
bajingan. eksekusi("komit;")
print("Memuat data ke {}". format(nama_tabel))
samb. menutup()
print("Koneksi DB ditutup. ")

kecuali Pengecualian sebagai e
print('Kesalahan {}'. format(str(e)))

def transform_json(input_path, output_path)
# Buka file input dan muat sebagai json
masukan = buka(jalur_masukan, 'r')
json_file = json. memuat (masukan)
# Buka file keluaran dan buat file csv untuk unggahan db
keluaran = buka(jalur_keluaran, 'w')
sebagai catatan di json_file
keluaran. tulis (json. dump (catatan))
keluaran. tulis('\n')
keluaran. menutup()
print('Diubah {} menjadi {}'. format(jalur_input, jalur_keluaran))

penangan def (acara, konteks)
# Dapatkan info dari acara S3 Put
untuk rekor dalam acara['Rekaman']
bucket_name = record['s3']['bucket']['name']
kunci = rekam['s3']['objek']['kunci']
local_path = '/tmp/' + kunci. pisahkan('/')[-1]
# Unduh file dari S3
klien. unduh_file(nama_keranjang, kunci, jalur_lokal)
print("Mengunduh file s3, {}, ke {}". format(kunci, jalur_lokal))
# Ubah file
jalur_keluaran = '/tmp/keluaran. csv'
transform_json(local_path, output_path)
# Muat csv ke Postgres
pg_load(string koneksi, skema'. '+tabel, jalur_keluaran)

S3 Letakkan Contoh Acara JSON

Setiap kali file dibuat di bucket sumber, file JSON akan dikirim ke fungsi lambda. Ini digunakan untuk menguji fungsi secara manual. Anda perlu mengubah beberapa parameter (seperti bucket atau nama objek) agar sesuai dengan penyiapan Anda

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

{
"Rekaman". [
{
"Versi acara". "2. 0",
"Sumber acara". "aws. s3",
"awsWilayah". "kami-barat-2",
"waktu acara". "1970-01-01T00. 00. 00. 000Z",
"nama acara". "Objek Dibuat. Meletakkan",
"identitas pengguna". {
"Id utama". "AIDAJDPLRKLG7UEXAMPLE"
},
"permintaanParameter". {
"sumberIPAddress". "127. 0. 0. 1"
},
"responElemen". {
"x-amz-permintaan-id". "C3D13FE58DE4C810",
"x-amz-id-2". "FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
},
"s3". {
"Versi s3Schema". "1. 0",
"Id konfigurasi". "testConfigRule",
"keranjang". {
"nama". "ember sumber",
"identitas pemilik". {
"Id utama". "Contoh A3NL1KOZZK"
},
"arn". "arn. aws. s3. keranjang sumber"
},
"obyek". {
"kunci". "Wajah bahagia. jpg",
"ukuran". 1024,
"eTag". "d41d8cd98f00b204e9800998ecf8427e",
"Id versi". "096fKKXTRTtl3on89fVO. nfljtsv6qko"
}
}
}
]
}

Langkah Ringkasan

  1. Buat Titik Akhir VPC untuk Amazon S3
  2. Buat kebijakan khusus untuk fungsi (mis. g. s3_to_pg_lambda)
  3. Melampirkan kebijakan ke peran yang digunakan untuk fungsi (mis. g. s3_to_pg_lambda)
  4. Buat fungsi dan file konfigurasi
  5. Kemas kode dengan pustaka yang diperlukan dan file konfigurasi
  6. Menyebarkan fungsi
  7. Uji fungsi dengan menjalankannya secara manual
  8. Tambahkan izin untuk fungsi untuk mengakses bucket S3
  9. Konfigurasi acara di keranjang sumber
  10. Uji fungsi dengan permintaan S3 PUT

Langkah

(1) Buat Titik Akhir VPC untuk Amazon S3

Untuk mengakses S3 dari fungsi Lambda yang dijalankan dalam VPC, saya perlu mengonfigurasi VPC Endpoint untuk S3. Cara mengakses S3 dari VPC tergantung pada pengaturan Anda. VPC Anda mungkin sudah mengizinkan akses ke S3 tanpa membuat titik akhir (mis. g. menggunakan Internet Gateway atau NAT). Saya menggunakan lingkungan yang dibuat di sini (Cara Membuat Lingkungan Komputasi Ilmu Data Pribadi Anda Sendiri Di AWS). Dalam penyiapan ini, pendekatan terbaik adalah membuat titik akhir

Jika fungsi lambda habis sebelum mengunduh file dari S3, Anda memiliki masalah konfigurasi akses

(2) Buat kebijakan khusus untuk fungsi tersebut

Fungsi Lambda perlu mendapatkan data dari S3 dan akses ke RDS dalam VPC. Untuk akses RDS, Anda memerlukan tindakan EC2 untuk membuat ENI (digunakan untuk menjalankan fungsi dalam VPC yang ditentukan) dan tindakan CloudWatch Logs untuk menulis log. Untuk ini, Anda dapat menggunakan AWSLambdaVPCAccessExecutionRole yang telah dibuat sebelumnya. Kebijakannya seperti di bawah ini. Saya hanya menambahkan beberapa izin tindakan S3 ke AWSLambdaVPCAccessExecutionRole. Gunakan ini untuk membuat kebijakan khusus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

{
"Versi: kapan". "17-10-2012",
"Penyataan". [
{
"Memengaruhi". "Mengizinkan",
"Tindakan". [
"s3. DapatkanObjek",
"s3. DaftarBucket"
],
"Sumber". "*"
},
{
"Memengaruhi". "Mengizinkan",
"Tindakan". [
"log. BuatLogGroup",
"log. BuatLogStream",
"log. PutLogEvents",
"ec2. Buat Antarmuka Jaringan",
"ec2. Jelaskan Antarmuka Jaringan",
"ec2. HapusAntarmuka Jaringan"
],
"Sumber". "*"
}
]
}

(3) Buat peran eksekusi lambda baru dan lampirkan kebijakan

Setelah kebijakan dibuat, Anda perlu membuat peran eksekusi lambda baru dan melampirkan kebijakan ke dalamnya

(4) Buat fungsi dan file konfigurasi

Lihat file konfigurasi dan kode di atas. Nama file python utama harus menjadi nama fungsinya

(5) Kemas kode dengan pustaka yang diperlukan dan file konfigurasi

Buka folder paket situs dari lingkungan virtual tempat fungsi lambda dikembangkan. Zip seluruh konten. Tambahkan file konfigurasi dan fungsi utama

cd /home/user/python3/lambda_pg/lib/python3. 6/paket-situs
zip -r9 /home/user/tmp/MoveS3ToPg. zip *
cd / rumah / pengguna / tmp /
zip -g PindahkanS3ToPg. zip MoveS3ToPg. py
zip -g PindahkanS3ToPg. zip db_config. py

(6) Menyebarkan fungsi

Pastikan untuk memiliki parameter vpc-config dengan semua subnet grup subnet RDS. Grup keamanan tidak boleh sama dengan RDS. Ini karena sumber daya AWS tidak dapat terhubung dalam grup keamanan yang sama. Gunakan grup keamanan default untuk VPC. Ini biasanya berhasil

Jika Anda memiliki beberapa variabel lingkungan seperti dalam contoh ini, gunakan tanda kutip ganda untuk membungkus variabel

Handler harus disetel sebagai {function-name}. {nama-fungsi-utama-dalam-kode}. Peran adalah peran khusus yang dibuat pada langkah 3

fungsi buat aws lambda \
--wilayah ap-tenggara-2 \
--fungsi-nama MoveS3ToPg \
--file-zip fileb. // PindahkanS3ToPg. zip \
--role arn. aws. saya. role/s3_to_pg_lambda \
--environment Variables="{targetSchema=usermanaged,targetTable=transaction}" \
--vpc-config SubnetIds=subnet-xxxxxx1,subnet-xxxxxx6,SecurityGroupIds=sg-1aaaaa1 \
--handler MoveS3ToPg. penangan \
--runtime python3. 6 \
--batas waktu 10 \
--ukuran memori 1024

Jika Anda perlu memperbarui fungsi, gunakan perintah update-function-code di bawah ini

aws lambda pembaruan-fungsi-kode \
--fungsi-nama MoveS3ToPg \
--wilayah ap-tenggara-2 \
--file-zip fileb. // PindahkanS3ToPg. zip

Jika Anda perlu memperbarui konfigurasi (seperti vpc-config atau variabel lingkungan), gunakan perintah update-function-configuration

aws lambda pembaruan-fungsi-konfigurasi \
--fungsi-nama MoveS3ToPg \
--wilayah ap-tenggara-2 \
--vpc-config SubnetIds=subnet-xxxxxx1,subnet-xxxxxx6,SecurityGroupIds=sg-1aaaaa1

(7) Uji fungsi dengan menjalankannya secara manual

Gunakan perintah pemanggilan lambda. Muatannya adalah file Json acara S3 yang Anda buat secara manual dari contoh di atas

aws lambda memanggil \
--invokasi-jenis Acara \
--fungsi-nama MoveS3ToPg \
--wilayah ap-tenggara-2 \
--berkas muatan. //masukanFile. txt \
berkas keluaran. txt

(8) Tambahkan izin untuk fungsi mengakses bucket S3

Jalankan perintah add-izin lambda. Tanpa izin, Anda tidak dapat mengonfigurasi peristiwa lambda di keranjang sumber

(9) Konfigurasi acara di keranjang sumber

Setiap kali permintaan put terjadi, Anda mengirim acara ke fungsi lambda

(10) Uji fungsi dengan permintaan S3 PUT

Anda dapat menggunakan aws cli untuk mengunggah file ke keranjang target dan memeriksa apakah fungsi lambda dijalankan dengan benar

aws s3 cp. /sumber data. json s3. //lambda. tes. sumber/hari ini/file_sumber. json

Penanganan Kesalahan

Ketika izin, peran eksekusi, akses VPC ke S3, grup keamanan, atau subnet tidak dikonfigurasi dengan benar, fungsi Anda akan habis waktu. Ini berarti fungsi akan memaksimalkan batas waktu eksekusi default dan menghentikan eksekusi

Jika waktu fungsi Anda mulai habis setelah eksekusi berhasil, Anda mungkin perlu men-deploy ulang fungsi tersebut. Kemudian, itu akan diperbaiki

Bagaimana cara mentransfer data dari S3 ke RDS?

Ikuti langkah-langkah ini untuk mencapai hal yang sama. .
S3 ke RDS Langkah 1. Buat dan lampirkan Peran IAM ke RDS Cluster
S3 ke RDS Langkah 2. Buat dan Lampirkan Grup Parameter ke RDS Cluster
S3 ke RDS Langkah 3. Mulai ulang Instans RDS Anda
S3 ke RDS Langkah 4. Mengubah Kebijakan Bucket S3
S3 ke RDS Langkah 5. Tetapkan Titik Akhir VPC

Bagaimana cara menarik data dari AWS S3 menggunakan Python?

Cara Mengunggah Dan Mengunduh File Dari AWS S3 Menggunakan Python (2022) .
Langkah 1. Siapkan akun. .
Langkah 2. Buat pengguna. .
Langkah 3. Buat ember. .
Langkah 4. Buat kebijakan dan tambahkan ke pengguna Anda. .
Langkah 5. Unduh AWS CLI dan konfigurasikan pengguna Anda. .
Langkah 6. Unggah file Anda. .
Langkah 7. Periksa apakah autentikasi berfungsi

Bagaimana cara terhubung ke RDS menggunakan Python?

3) Hubungkan melalui Alat DBeaver . Tambahkan detail pemasangan RDS Anda di sini. Host server adalah Titik Akhir instance RDS Anda, Database adalah DB yang Anda buat melalui kode python. Berikan nama pengguna dan kata sandi Otentikasi yang telah Anda gunakan saat membuat instance RDS.

Bagaimana cara mengembalikan database saya dari S3 ke RDS?

Di pojok kanan atas konsol Amazon RDS, pilih Wilayah AWS untuk membuat instans DB Anda. Pilih Wilayah AWS yang sama dengan bucket Amazon S3 yang berisi cadangan database Anda. Di panel navigasi, pilih Database. Pilih Pulihkan dari S3