본문으로 건너뛰기
버전: In Development

Client

aerospike-py는 동일한 기능을 가진 동기(Client)와 비동기(AsyncClient) API를 제공합니다.

Creating a Client

import aerospike_py as aerospike

client = aerospike.client({
"hosts": [("127.0.0.1", 3000)],
"cluster_name": "docker",
}).connect()

Context Manager

__enter__() / __exit__()

with aerospike.client({
"hosts": [("127.0.0.1", 3000)],
"cluster_name": "docker",
}).connect() as client:
client.put(key, bins)
# close()가 종료 시 자동으로 호출됩니다

Connection

connect(username=None, password=None)

Aerospike 클러스터에 연결합니다.

메서드 체이닝을 위해 self를 반환합니다.

client = aerospike.client(config).connect()
# 인증 포함
client = aerospike.client(config).connect("admin", "admin")

is_connected()

클라이언트가 연결되어 있으면 True를 반환합니다. 두 클라이언트 모두에서 동기 메서드입니다.

if client.is_connected():
print("Connected")

close()

클러스터와의 연결을 종료합니다.

client.close()

get_node_names()

클러스터 노드 이름 목록을 반환합니다.

nodes = client.get_node_names()

CRUD Operations

put(key, bins, meta=None, policy=None)

레코드를 작성합니다.

파라미터타입설명
keytuple[str, str, str|int|bytes](namespace, set, pk)
binsdict[str, Any]빈 이름-값 쌍
metaWriteMeta선택: {"ttl": int, "gen": int}
policyWritePolicy선택: {"key", "exists", "gen", "timeout", ...}
key = ("test", "demo", "user1")
client.put(key, {"name": "Alice", "age": 30})
client.put(key, {"x": 1}, meta={"ttl": 300})
client.put(key, {"x": 1}, policy={"exists": aerospike.POLICY_EXISTS_CREATE_ONLY})

get(key, policy=None)

레코드를 읽습니다. Record NamedTuple (key, meta, bins)를 반환합니다.

key, meta, bins = client.get(("test", "demo", "user1"))
# meta.gen == 1, meta.ttl == 2591998
# bins = {"name": "Alice", "age": 30}
노트

레코드가 존재하지 않으면 RecordNotFound가 발생합니다.

select(key, bins, policy=None)

레코드에서 특정 빈만 읽습니다.

_, meta, bins = client.select(key, ["name"])
# bins = {"name": "Alice"}

exists(key, policy=None)

레코드 존재 여부를 확인합니다. ExistsResult NamedTuple (key, meta)를 반환하며, 레코드가 없으면 metaNone입니다.

_, meta = client.exists(key)
if meta is not None:
print(f"Found, gen={meta.gen}")

remove(key, meta=None, policy=None)

레코드를 삭제합니다.

client.remove(key)
# 세대 검사 포함
client.remove(key, meta={"gen": 3}, policy={"gen": aerospike.POLICY_GEN_EQ})

touch(key, val=0, meta=None, policy=None)

레코드의 TTL을 리셋합니다.

client.touch(key, val=300)

String / Numeric Operations

append(key, bin, val, meta=None, policy=None)

빈에 문자열을 추가합니다.

client.append(key, "name", "_suffix")

prepend(key, bin, val, meta=None, policy=None)

빈 앞에 문자열을 삽입합니다.

client.prepend(key, "name", "prefix_")

increment(key, bin, offset, meta=None, policy=None)

정수 또는 실수 빈 값을 증가시킵니다.

client.increment(key, "age", 1)
client.increment(key, "score", 0.5)

remove_bin(key, bin_names, meta=None, policy=None)

레코드에서 특정 빈을 제거합니다.

client.remove_bin(key, ["temp_bin", "debug_bin"])

Multi-Operation

operate(key, ops, meta=None, policy=None)

단일 레코드에 여러 연산을 원자적으로 실행합니다.

ops = [
{"op": aerospike.OPERATOR_INCR, "bin": "counter", "val": 1},
{"op": aerospike.OPERATOR_READ, "bin": "counter", "val": None},
]
_, meta, bins = client.operate(key, ops)

operate_ordered(key, ops, meta=None, policy=None)

operate와 동일하지만 결과를 OperateOrderedResult NamedTuple의 ordered_bins 필드에 BinTuple(name, value) 리스트로 반환합니다.

_, meta, results = client.operate_ordered(key, ops)
# results = [("counter", 2)]

Batch Operations

batch_read(keys, bins=None, policy=None)

여러 레코드를 읽습니다. BatchRecords를 반환합니다.

  • bins=None - 모든 bin 읽기
  • bins=["a", "b"] - 특정 bin만 읽기
  • bins=[] - 존재 여부만 확인
keys = [("test", "demo", f"user_{i}") for i in range(10)]

# 모든 bin 읽기
batch = client.batch_read(keys)
for br in batch.batch_records:
if br.record:
key, meta, bins = br.record
print(bins)

# 특정 bin만 읽기
batch = client.batch_read(keys, bins=["name", "age"])

# 존재 여부만 확인
batch = client.batch_read(keys, bins=[])
for br in batch.batch_records:
print(f"{br.key}: exists={br.record is not None}")

batch_operate(keys, ops, policy=None)

여러 레코드에 연산을 실행합니다.

ops = [{"op": aerospike.OPERATOR_INCR, "bin": "views", "val": 1}]
results = client.batch_operate(keys, ops)

batch_remove(keys, policy=None)

여러 레코드를 삭제합니다.

results = client.batch_remove(keys)

Query

query(namespace, set_name)

Secondary Index 쿼리를 위한 Query 객체를 생성합니다. Query API를 참조하세요.

query = client.query("test", "demo")

Index Management

index_integer_create(namespace, set_name, bin_name, index_name, policy=None)

숫자 Secondary Index를 생성합니다.

client.index_integer_create("test", "demo", "age", "age_idx")

index_string_create(namespace, set_name, bin_name, index_name, policy=None)

문자열 Secondary Index를 생성합니다.

client.index_string_create("test", "demo", "name", "name_idx")

index_geo2dsphere_create(namespace, set_name, bin_name, index_name, policy=None)

지리공간 Secondary Index를 생성합니다.

client.index_geo2dsphere_create("test", "demo", "location", "geo_idx")

index_remove(namespace, index_name, policy=None)

Secondary Index를 제거합니다.

client.index_remove("test", "age_idx")

Truncate

truncate(namespace, set_name, nanos=0, policy=None)

네임스페이스/세트의 모든 레코드를 제거합니다.

client.truncate("test", "demo")

UDF

udf_put(filename, udf_type=0, policy=None)

Lua UDF 모듈을 등록합니다.

client.udf_put("my_udf.lua")

udf_remove(module, policy=None)

등록된 UDF 모듈을 제거합니다.

client.udf_remove("my_udf")

apply(key, module, function, args=None, policy=None)

레코드에 UDF를 실행합니다.

result = client.apply(key, "my_udf", "my_function", [1, "hello"])

Concurrency Patterns (Async)

asyncio.gather를 사용한 병렬 쓰기

keys = [("test", "demo", f"item_{i}") for i in range(100)]
tasks = [client.put(k, {"idx": i}) for i, k in enumerate(keys)]
await asyncio.gather(*tasks)

병렬 읽기

keys = [("test", "demo", f"item_{i}") for i in range(100)]
tasks = [client.get(k) for k in keys]
results = await asyncio.gather(*tasks, return_exceptions=True)

혼합 연산

async def process_user(client, user_id):
key = ("test", "users", user_id)
_, _, bins = await client.get(key)
bins["visits"] = bins.get("visits", 0) + 1
await client.put(key, bins)
return bins

results = await asyncio.gather(*[
process_user(client, f"user_{i}")
for i in range(10)
])

Admin Operations

User Management

메서드설명
admin_create_user(username, password, roles)사용자 생성
admin_drop_user(username)사용자 삭제
admin_change_password(username, password)비밀번호 변경
admin_grant_roles(username, roles)역할 부여
admin_revoke_roles(username, roles)역할 회수
admin_query_user(username)사용자 정보 조회
admin_query_users()전체 사용자 목록

Role Management

메서드설명
admin_create_role(role, privileges, ...)역할 생성
admin_drop_role(role)역할 삭제
admin_grant_privileges(role, privileges)권한 부여
admin_revoke_privileges(role, privileges)권한 회수
admin_query_role(role)역할 정보 조회
admin_query_roles()전체 역할 목록
admin_set_whitelist(role, whitelist)IP 화이트리스트 설정
admin_set_quotas(role, read_quota, write_quota)쿼터 설정
# 사용자 생성
client.admin_create_user("new_user", "password", ["read-write"])

# 권한이 포함된 역할 생성
client.admin_create_role("custom_role", [
{"code": aerospike.PRIV_READ, "ns": "test", "set": "demo"}
])

Expression Filters

policy 파라미터를 받는 모든 읽기/쓰기/배치 작업은 서버사이드 필터링을 위한 filter_expression 키를 지원합니다 (Server 5.2+ 필요):

from aerospike_py import exp

expr = exp.ge(exp.int_bin("age"), exp.int_val(21))

# 필터와 함께 Get
_, _, bins = client.get(key, policy={"filter_expression": expr})

# 필터와 함께 Put (필터가 매칭될 때만 업데이트)
expr = exp.eq(exp.string_bin("status"), exp.string_val("active"))
client.put(key, {"visits": 1}, policy={"filter_expression": expr})

# 필터와 함께 Query
query = client.query("test", "demo")
records = query.results(policy={"filter_expression": expr})

# 필터와 함께 Batch
ops = [{"op": aerospike.OPERATOR_READ, "bin": "status", "val": None}]
records = client.batch_operate(keys, ops, policy={"filter_expression": expr})

레코드가 필터 expression과 매칭되지 않으면 FilteredOut이 발생합니다. 자세한 문서는 Expression 필터 가이드를 참조하세요.