Client
aerospike-py는 동일한 기능을 가진 동기(Client)와 비동기(AsyncClient) API를 제공합니다.
Creating a Client
- Sync Client
- Async Client
import aerospike_py as aerospike
client = aerospike.client({
"hosts": [("127.0.0.1", 3000)],
"cluster_name": "docker",
}).connect()
import asyncio
from aerospike_py import AsyncClient
async def main():
client = AsyncClient({
"hosts": [("127.0.0.1", 3000)],
"cluster_name": "docker",
})
await client.connect()
asyncio.run(main())
Context Manager
- Sync Client
- Async Client
__enter__() / __exit__()
with aerospike.client({
"hosts": [("127.0.0.1", 3000)],
"cluster_name": "docker",
}).connect() as client:
client.put(key, bins)
# close()가 종료 시 자동으로 호출됩니다
async __aenter__() / async __aexit__()
async with AsyncClient({
"hosts": [("127.0.0.1", 3000)],
"cluster_name": "docker",
}) as client:
await client.connect()
await client.put(key, bins)
# close()가 자동으로 호출됩니다
Connection
connect(username=None, password=None)
Aerospike 클러스터에 연결합니다.
- Sync Client
- Async Client
메서드 체이닝을 위해 self를 반환합니다.
client = aerospike.client(config).connect()
# 인증 포함
client = aerospike.client(config).connect("admin", "admin")
await client.connect()
await client.connect("admin", "admin")
is_connected()
클라이언트가 연결되어 있으면 True를 반환합니다. 두 클라이언트 모두에서 동기 메서드입니다.
if client.is_connected():
print("Connected")
close()
클러스터와의 연결을 종료합니다.
- Sync Client
- Async Client
client.close()
await client.close()
get_node_names()
클러스터 노드 이름 목록을 반환합니다.
- Sync Client
- Async Client
nodes = client.get_node_names()
nodes = await client.get_node_names()
CRUD Operations
put(key, bins, meta=None, policy=None)
레코드를 작성합니다.
| 파라미터 | 타입 | 설명 |
|---|---|---|
key | tuple[str, str, str|int|bytes] | (namespace, set, pk) |
bins | dict[str, Any] | 빈 이름-값 쌍 |
meta | WriteMeta | 선택: {"ttl": int, "gen": int} |
policy | WritePolicy | 선택: {"key", "exists", "gen", "timeout", ...} |
- Sync Client
- Async Client
key = ("test", "demo", "user1")
client.put(key, {"name": "Alice", "age": 30})
client.put(key, {"x": 1}, meta={"ttl": 300})
client.put(key, {"x": 1}, policy={"exists": aerospike.POLICY_EXISTS_CREATE_ONLY})
key = ("test", "demo", "user1")
await client.put(key, {"name": "Alice", "age": 30})
await client.put(key, {"x": 1}, meta={"ttl": 300})
await client.put(key, {"x": 1}, policy={"exists": aerospike.POLICY_EXISTS_CREATE_ONLY})
get(key, policy=None)
레코드를 읽습니다. Record NamedTuple (key, meta, bins)를 반환합니다.
- Sync Client
- Async Client
key, meta, bins = client.get(("test", "demo", "user1"))
# meta.gen == 1, meta.ttl == 2591998
# bins = {"name": "Alice", "age": 30}
key, meta, bins = await client.get(("test", "demo", "user1"))
레코드가 존재하지 않으면 RecordNotFound가 발생합니다.
select(key, bins, policy=None)
레코드에서 특정 빈만 읽습니다.
- Sync Client
- Async Client
_, meta, bins = client.select(key, ["name"])
# bins = {"name": "Alice"}
_, meta, bins = await client.select(key, ["name"])
exists(key, policy=None)
레코드 존재 여부를 확인합니다. ExistsResult NamedTuple (key, meta)를 반환하며, 레코드가 없으면 meta가 None입니다.
- Sync Client
- Async Client
_, meta = client.exists(key)
if meta is not None:
print(f"Found, gen={meta.gen}")
_, meta = await client.exists(key)
if meta is not None:
print(f"Found, gen={meta.gen}")
remove(key, meta=None, policy=None)
레코드를 삭제합니다.
- Sync Client
- Async Client
client.remove(key)
# 세대 검사 포함
client.remove(key, meta={"gen": 3}, policy={"gen": aerospike.POLICY_GEN_EQ})
await client.remove(key)
await client.remove(key, meta={"gen": 3}, policy={"gen": aerospike.POLICY_GEN_EQ})
touch(key, val=0, meta=None, policy=None)
레코드의 TTL을 리셋합니다.
- Sync Client
- Async Client
client.touch(key, val=300)
await client.touch(key, val=300)
String / Numeric Operations
append(key, bin, val, meta=None, policy=None)
빈에 문자열을 추가합니다.
- Sync Client
- Async Client
client.append(key, "name", "_suffix")
await client.append(key, "name", "_suffix")
prepend(key, bin, val, meta=None, policy=None)
빈 앞에 문자열을 삽입합니다.
- Sync Client
- Async Client
client.prepend(key, "name", "prefix_")
await client.prepend(key, "name", "prefix_")
increment(key, bin, offset, meta=None, policy=None)
정수 또는 실수 빈 값을 증가시킵니다.
- Sync Client
- Async Client
client.increment(key, "age", 1)
client.increment(key, "score", 0.5)
await client.increment(key, "age", 1)
await client.increment(key, "score", 0.5)
remove_bin(key, bin_names, meta=None, policy=None)
레코드에서 특정 빈을 제거합니다.
- Sync Client
- Async Client
client.remove_bin(key, ["temp_bin", "debug_bin"])
await client.remove_bin(key, ["temp_bin", "debug_bin"])
Multi-Operation
operate(key, ops, meta=None, policy=None)
단일 레코드에 여러 연산을 원자적으로 실행합니다.
- Sync Client
- Async Client
ops = [
{"op": aerospike.OPERATOR_INCR, "bin": "counter", "val": 1},
{"op": aerospike.OPERATOR_READ, "bin": "counter", "val": None},
]
_, meta, bins = client.operate(key, ops)
ops = [
{"op": aerospike.OPERATOR_INCR, "bin": "counter", "val": 1},
{"op": aerospike.OPERATOR_READ, "bin": "counter", "val": None},
]
_, meta, bins = await client.operate(key, ops)
operate_ordered(key, ops, meta=None, policy=None)
operate와 동일하지만 결과를 OperateOrderedResult NamedTuple의 ordered_bins 필드에 BinTuple(name, value) 리스트로 반환합니다.
- Sync Client
- Async Client
_, meta, results = client.operate_ordered(key, ops)
# results = [("counter", 2)]
_, meta, results = await client.operate_ordered(key, ops)
# results = [("counter", 2)]
Batch Operations
batch_read(keys, bins=None, policy=None)
여러 레코드를 읽습니다. BatchRecords를 반환합니다.
bins=None- 모든 bin 읽기bins=["a", "b"]- 특정 bin만 읽기bins=[]- 존재 여부만 확인
- Sync Client
- Async Client
keys = [("test", "demo", f"user_{i}") for i in range(10)]
# 모든 bin 읽기
batch = client.batch_read(keys)
for br in batch.batch_records:
if br.record:
key, meta, bins = br.record
print(bins)
# 특정 bin만 읽기
batch = client.batch_read(keys, bins=["name", "age"])
# 존재 여부만 확인
batch = client.batch_read(keys, bins=[])
for br in batch.batch_records:
print(f"{br.key}: exists={br.record is not None}")
keys = [("test", "demo", f"user_{i}") for i in range(10)]
# 모든 bin 읽기
batch = await client.batch_read(keys)
for br in batch.batch_records:
if br.record:
key, meta, bins = br.record
print(bins)
# 특정 bin만 읽기
batch = await client.batch_read(keys, bins=["name", "age"])
# 존재 여부만 확인
batch = await client.batch_read(keys, bins=[])
for br in batch.batch_records:
print(f"{br.key}: exists={br.record is not None}")
batch_operate(keys, ops, policy=None)
여러 레코드에 연산을 실행합니다.
- Sync Client
- Async Client
ops = [{"op": aerospike.OPERATOR_INCR, "bin": "views", "val": 1}]
results = client.batch_operate(keys, ops)
ops = [{"op": aerospike.OPERATOR_INCR, "bin": "views", "val": 1}]
results = await client.batch_operate(keys, ops)
batch_remove(keys, policy=None)
여러 레코드를 삭제합니다.
- Sync Client
- Async Client
results = client.batch_remove(keys)
await client.batch_remove(keys)
Query
query(namespace, set_name)
Secondary Index 쿼리를 위한 Query 객체를 생성합니다. Query API를 참조하세요.
query = client.query("test", "demo")
Index Management
index_integer_create(namespace, set_name, bin_name, index_name, policy=None)
숫자 Secondary Index를 생성합니다.
- Sync Client
- Async Client
client.index_integer_create("test", "demo", "age", "age_idx")
await client.index_integer_create("test", "demo", "age", "age_idx")
index_string_create(namespace, set_name, bin_name, index_name, policy=None)
문자열 Secondary Index를 생성합니다.
- Sync Client
- Async Client
client.index_string_create("test", "demo", "name", "name_idx")
await client.index_string_create("test", "demo", "name", "name_idx")
index_geo2dsphere_create(namespace, set_name, bin_name, index_name, policy=None)
지리공간 Secondary Index를 생성합니다.
- Sync Client
- Async Client
client.index_geo2dsphere_create("test", "demo", "location", "geo_idx")
await client.index_geo2dsphere_create("test", "demo", "location", "geo_idx")
index_remove(namespace, index_name, policy=None)
Secondary Index를 제거합니다.
- Sync Client
- Async Client
client.index_remove("test", "age_idx")
await client.index_remove("test", "age_idx")
Truncate
truncate(namespace, set_name, nanos=0, policy=None)
네임스페이스/세트의 모든 레코드를 제거합니다.
- Sync Client
- Async Client
client.truncate("test", "demo")
await client.truncate("test", "demo")
UDF
udf_put(filename, udf_type=0, policy=None)
Lua UDF 모듈을 등록합니다.
- Sync Client
- Async Client
client.udf_put("my_udf.lua")
await client.udf_put("my_udf.lua")
udf_remove(module, policy=None)
등록된 UDF 모듈을 제거합니다.
- Sync Client
- Async Client
client.udf_remove("my_udf")
await client.udf_remove("my_udf")
apply(key, module, function, args=None, policy=None)
레코드에 UDF를 실행합니다.
- Sync Client
- Async Client
result = client.apply(key, "my_udf", "my_function", [1, "hello"])
result = await client.apply(key, "my_udf", "my_function", [1, "hello"])
Concurrency Patterns (Async)
asyncio.gather를 사용한 병렬 쓰기
keys = [("test", "demo", f"item_{i}") for i in range(100)]
tasks = [client.put(k, {"idx": i}) for i, k in enumerate(keys)]
await asyncio.gather(*tasks)
병렬 읽기
keys = [("test", "demo", f"item_{i}") for i in range(100)]
tasks = [client.get(k) for k in keys]
results = await asyncio.gather(*tasks, return_exceptions=True)
혼합 연산
async def process_user(client, user_id):
key = ("test", "users", user_id)
_, _, bins = await client.get(key)
bins["visits"] = bins.get("visits", 0) + 1
await client.put(key, bins)
return bins
results = await asyncio.gather(*[
process_user(client, f"user_{i}")
for i in range(10)
])
Admin Operations
User Management
- Sync Client
- Async Client
| 메서드 | 설명 |
|---|---|
admin_create_user(username, password, roles) | 사용자 생성 |
admin_drop_user(username) | 사용자 삭제 |
admin_change_password(username, password) | 비밀번호 변경 |
admin_grant_roles(username, roles) | 역할 부여 |
admin_revoke_roles(username, roles) | 역할 회수 |
admin_query_user(username) | 사용자 정보 조회 |
admin_query_users() | 전체 사용자 목록 |
| 메서드 | 설명 |
|---|---|
async admin_create_user(username, password, roles) | 사용자 생성 |
async admin_drop_user(username) | 사용자 삭제 |
async admin_change_password(username, password) | 비밀번호 변경 |
async admin_grant_roles(username, roles) | 역할 부여 |
async admin_revoke_roles(username, roles) | 역할 회수 |
async admin_query_user(username) | 사용자 정보 조회 |
async admin_query_users() | 전체 사용자 목록 |
Role Management
- Sync Client
- Async Client
| 메서드 | 설명 |
|---|---|
admin_create_role(role, privileges, ...) | 역할 생성 |
admin_drop_role(role) | 역할 삭제 |
admin_grant_privileges(role, privileges) | 권한 부여 |
admin_revoke_privileges(role, privileges) | 권한 회수 |
admin_query_role(role) | 역할 정보 조회 |
admin_query_roles() | 전체 역할 목록 |
admin_set_whitelist(role, whitelist) | IP 화이트리스트 설정 |
admin_set_quotas(role, read_quota, write_quota) | 쿼터 설정 |
# 사용자 생성
client.admin_create_user("new_user", "password", ["read-write"])
# 권한이 포함된 역할 생성
client.admin_create_role("custom_role", [
{"code": aerospike.PRIV_READ, "ns": "test", "set": "demo"}
])
| 메서드 | 설명 |
|---|---|
async admin_create_role(role, privileges, ...) | 역할 생성 |
async admin_drop_role(role) | 역할 삭제 |
async admin_grant_privileges(role, privileges) | 권한 부여 |
async admin_revoke_privileges(role, privileges) | 권한 회수 |
async admin_query_role(role) | 역할 정보 조회 |
async admin_query_roles() | 전체 역할 목록 |
async admin_set_whitelist(role, whitelist) | IP 화이트리스트 설정 |
async admin_set_quotas(role, read_quota, write_quota) | 쿼터 설정 |
# 사용자 생성
await client.admin_create_user("new_user", "password", ["read-write"])
# 역할 부여
await client.admin_grant_roles("new_user", ["sys-admin"])
# 권한이 포함된 역할 생성
await client.admin_create_role("custom_role", [
{"code": aerospike.PRIV_READ, "ns": "test", "set": "demo"}
])
Expression Filters
policy 파라미터를 받는 모든 읽기/쓰기/배치 작업은 서버사이드 필터링을 위한 filter_expression 키를 지원합니다 (Server 5.2+ 필요):
- Sync Client
- Async Client
from aerospike_py import exp
expr = exp.ge(exp.int_bin("age"), exp.int_val(21))
# 필터와 함께 Get
_, _, bins = client.get(key, policy={"filter_expression": expr})
# 필터와 함께 Put (필터가 매칭될 때만 업데이트)
expr = exp.eq(exp.string_bin("status"), exp.string_val("active"))
client.put(key, {"visits": 1}, policy={"filter_expression": expr})
# 필터와 함께 Query
query = client.query("test", "demo")
records = query.results(policy={"filter_expression": expr})
# 필터와 함께 Batch
ops = [{"op": aerospike.OPERATOR_READ, "bin": "status", "val": None}]
records = client.batch_operate(keys, ops, policy={"filter_expression": expr})
from aerospike_py import exp
expr = exp.ge(exp.int_bin("age"), exp.int_val(21))
# 필터와 함께 Get
_, _, bins = await client.get(key, policy={"filter_expression": expr})
# 필터와 함께 Batch
ops = [{"op": aerospike.OPERATOR_READ, "bin": "age", "val": None}]
records = await client.batch_operate(keys, ops, policy={"filter_expression": expr})
레코드가 필터 expression과 매칭되지 않으면 FilteredOut이 발생합니다.
자세한 문서는 Expression 필터 가이드를 참조하세요.