본문으로 건너뛰기
버전: In Development

NumPy 배치 쓰기 가이드

개요

batch_write_numpy()numpy 구조화 배열에서 직접 여러 레코드를 Aerospike에 기록합니다. 각 행이 별도의 쓰기 작업이 되며, dtype 필드가 Aerospike bin에 매핑됩니다.

  • 배열에서 레코드로 직접 매핑 -- 중간 Python dict나 루프 불필요
  • 키 필드 추출 -- 지정된 dtype 필드(기본값 _key)가 레코드의 사용자 키로 사용됨
  • 자동 bin 매핑 -- 밑줄(_)로 시작하지 않는 모든 필드가 bin이 됨
  • 배치 실행 -- 모든 행이 단일 배치 호출로 기록됨
사용 시기

데이터가 이미 numpy 배열에 있을 때(예: ML 피처 스토어, 센서 데이터 파이프라인, 과학 데이터셋) batch_write_numpy()를 사용하세요. 일반 Python dict의 경우 put() 또는 표준 배치 작업을 대신 사용하세요.

설치

pip install "aerospike-py[numpy]"

빠른 시작

import numpy as np
import aerospike_py as aerospike

client = aerospike.client({
"hosts": [("127.0.0.1", 3000)],
}).connect()

# 1. 키 필드와 bin 필드로 dtype 정의
dtype = np.dtype([
("_key", "i4"), # 레코드 키 (int32)
("score", "f8"), # bin: float64
("count", "i4"), # bin: int32
])

# 2. 구조화 배열 생성
data = np.array([
(1, 0.95, 10),
(2, 0.87, 20),
(3, 0.72, 15),
], dtype=dtype)

# 3. 배치 쓰기
results = client.batch_write_numpy(data, "test", "demo", dtype)

# 4. 결과 확인
for record in results:
key, meta, bins = record
print(f"Key: {key}, Gen: {meta['gen']}")

동작 원리

numpy 구조화 배열                  Aerospike
┌──────┬───────┬───────┐
│ _key │ score │ count │
├──────┼───────┼───────┤ ┌──────────────────────┐
│ 1 │ 0.95 │ 10 │ ──────▶ │ key=1 {score, count} │
│ 2 │ 0.87 │ 20 │ ──────▶ │ key=2 {score, count} │
│ 3 │ 0.72 │ 15 │ ──────▶ │ key=3 {score, count} │
└──────┴───────┴───────┘ └──────────────────────┘
▲ ▲
key_field="_key" bins = 밑줄로 시작하지 않는 필드
  1. key_field(기본값 "_key") 컬럼이 각 레코드의 사용자 키로 추출됩니다
  2. _로 시작하지 않는 모든 필드가 Aerospike bin이 됩니다
  3. _로 시작하는 필드(키 필드 제외)는 무시됩니다

키 필드

기본적으로 "_key"라는 이름의 dtype 필드가 레코드 키로 사용됩니다. key_field로 다른 필드를 지정할 수 있습니다:

dtype = np.dtype([
("user_id", "i8"), # 이 필드를 레코드 키로 사용
("score", "f8"),
])

data = np.array([(100, 1.5), (101, 2.5)], dtype=dtype)

# "_key" 대신 "user_id"를 키로 사용
results = client.batch_write_numpy(
data, "test", "demo", dtype, key_field="user_id"
)
노트

커스텀 key_field를 사용할 때, 해당 필드를 bin으로도 저장하려면 필드 이름이 _로 시작하지 않아야 합니다. _로 시작하는 필드는 키로만 사용되고 bin으로 기록되지 않습니다.

지원되는 dtype 종류

batch_read()_dtype에서 지원되는 것과 동일한 dtype 종류가 쓰기에도 지원됩니다:

numpy 종류코드예시Aerospike 값
부호 있는 정수i"i1", "i2", "i4", "i8"Int(i64)
부호 없는 정수u"u1", "u2", "u4", "u8"Int(i64)
부동 소수점f"f4", "f8"Float(f64)
고정 바이트S"S8", "S16"Blob(bytes) 또는 String
Void 바이트V"V4", "V16"Blob(bytes)
하위 배열--("f4", (128,))Blob(bytes)
지원되지 않는 dtype

유니코드 문자열(U)과 Python 객체(O)는 지원되지 않습니다. 문자열 데이터에는 S(고정 바이트)를 사용하세요.

예제

센서 데이터 적재

import numpy as np
import aerospike_py as aerospike

client = aerospike.client({"hosts": [("127.0.0.1", 3000)]}).connect()

dtype = np.dtype([
("_key", "i4"),
("temperature", "f8"),
("humidity", "f4"),
("pressure", "f4"),
("status", "u1"),
])

# 1000개의 센서 측정값 생성
n = 1000
data = np.zeros(n, dtype=dtype)
data["_key"] = np.arange(n)
data["temperature"] = np.random.normal(25.0, 5.0, n)
data["humidity"] = np.random.uniform(30.0, 90.0, n).astype(np.float32)
data["pressure"] = np.random.normal(1013.25, 10.0, n).astype(np.float32)
data["status"] = 1

results = client.batch_write_numpy(data, "test", "sensors", dtype)
print(f"Wrote {len(results)} records")

벡터 임베딩

ML 임베딩을 바이트 blob으로 저장합니다:

import numpy as np
import aerospike_py as aerospike

client = aerospike.client({"hosts": [("127.0.0.1", 3000)]}).connect()

dim = 128
dtype = np.dtype([
("_key", "i4"),
("embedding", "V" + str(dim * 4)), # 128 * 4 바이트 = 512바이트 blob
("label", "i4"),
])

n = 100
embeddings = np.random.randn(n, dim).astype(np.float32)

data = np.zeros(n, dtype=dtype)
data["_key"] = np.arange(n)
for i in range(n):
data["embedding"][i] = embeddings[i].tobytes()
data["label"] = np.random.randint(0, 10, n)

results = client.batch_write_numpy(data, "test", "vectors", dtype)

쓰기와 읽기 왕복

batch_write_numpy()batch_read()_dtype을 조합하여 완전한 numpy 왕복을 수행합니다:

import numpy as np
import aerospike_py as aerospike

client = aerospike.client({"hosts": [("127.0.0.1", 3000)]}).connect()

# dtype 정의
dtype = np.dtype([
("_key", "i4"),
("x", "f8"),
("y", "f8"),
("category", "i4"),
])

# 쓰기
data = np.array([
(1, 1.0, 2.0, 0),
(2, 3.0, 4.0, 1),
(3, 5.0, 6.0, 0),
], dtype=dtype)
client.batch_write_numpy(data, "test", "points", dtype)

# _dtype으로 읽기
read_dtype = np.dtype([("x", "f8"), ("y", "f8"), ("category", "i4")])
keys = [("test", "points", i) for i in range(1, 4)]
batch = client.batch_read(keys, _dtype=read_dtype, policy={"key": aerospike.POLICY_KEY_SEND})

# 벡터화 분석
print(batch.batch_records["x"].mean()) # 3.0
print(batch.batch_records["category"].sum()) # 1

Pandas DataFrame에서 Aerospike로

pandas DataFrame을 numpy를 통해 Aerospike에 기록합니다:

import numpy as np
import pandas as pd
import aerospike_py as aerospike

client = aerospike.client({"hosts": [("127.0.0.1", 3000)]}).connect()

# DataFrame
df = pd.DataFrame({
"user_id": [1, 2, 3],
"score": [0.95, 0.87, 0.72],
"level": [10, 20, 15],
})

# 구조화 배열로 변환
dtype = np.dtype([
("_key", "i4"),
("score", "f8"),
("level", "i4"),
])

data = np.zeros(len(df), dtype=dtype)
data["_key"] = df["user_id"].values
data["score"] = df["score"].values
data["level"] = df["level"].values

results = client.batch_write_numpy(data, "test", "users", dtype)

오류 처리

from aerospike_py.exception import AerospikeError

try:
results = client.batch_write_numpy(data, "test", "demo", dtype)
for record in results:
key, meta, bins = record
if meta is None:
print(f"Write failed for key {key}")
except AerospikeError as e:
print(f"Batch write error: {e}")

모범 사례

  • dtype을 데이터에 맞추기 -- 메모리와 네트워크 전송량을 줄이기 위해 최소한의 dtype을 사용하세요 ("f8" 대신 "f4", "i8" 대신 "i2")
  • 배치 크기 -- 최적의 성능을 위해 호출당 100-5,000행을 유지하세요
  • 키 필드 규칙 -- 일관성을 위해 기본 키 필드로 "_key"를 사용하세요
  • 밑줄 접두사 -- _로 시작하는 필드는 bin에서 제외됩니다. 메타데이터 필드에 활용하세요
  • batch_read와의 왕복 -- 효율적인 읽기를 위해 동일한 dtype 필드(_key 제외)를 batch_read(_dtype=...)에 사용하세요
  • 대용량 데이터셋 -- 큰 배열을 청크로 분할하여 배치로 기록하세요:
chunk_size = 1000
for i in range(0, len(data), chunk_size):
chunk = data[i:i + chunk_size]
client.batch_write_numpy(chunk, "test", "demo", dtype)

API 레퍼런스

# Sync
results: list[Record] = client.batch_write_numpy(
data: np.ndarray,
namespace: str,
set_name: str,
_dtype: np.dtype,
key_field: str = "_key",
policy: dict | None = None,
)

# Async
results: list[Record] = await client.batch_write_numpy(
data: np.ndarray,
namespace: str,
set_name: str,
_dtype: np.dtype,
key_field: str = "_key",
policy: dict | None = None,
)
파라미터타입기본값설명
datanp.ndarray필수레코드 데이터가 담긴 구조화 numpy 배열
namespacestr필수대상 Aerospike namespace
set_namestr필수대상 set 이름
_dtypenp.dtype필수배열 레이아웃을 설명하는 구조화 dtype
key_fieldstr"_key"레코드 사용자 키로 사용할 dtype 필드 이름
policydict | NoneNone선택적 BatchPolicy 오버라이드

반환값: list[Record] -- 쓰기 결과가 담긴 Record NamedTuple (key, meta, bins) 리스트.

참고: numpy 배열로 레코드를 읽어오려면 NumPy 배치 읽기 가이드를 참조하세요.