こんにちは、株式会社JADEでAmethystのテックリードをしている大井です。
この記事は、JADE Advent Calendarの1日目の記事です。
みなさん、ChatGPT、使ってますか?
弊社では、ChatGPTをGoogle Chatから使えるようにしています。
JADEでは、この通称Google Bard(中身は ChatGPT)を利用して、さまざまな業務に活用しています。
今回の記事では、こちらをどのように実現しているのかを解説したいと思います。
Cloud Functions経由でChatbotを作れるようにする
下記のドキュメントを参考に、Cloud FunctionsでGoogle Chatアプリを作成します。
言われるがままやっていけば相手のアバターをカードとして表示するアプリが簡単に作れると思います。
Cloud Functions内でChatGPT APIを叩く
投稿元のメッセージを受け取って、ChatGPT APIを叩き、メッセージを返すように関数を定義してみましょう。
from openai import OpenAI from openai.types.chat import ChatCompletionMessageParam client = OpenAI(api_key={api_key}) def generate_response(input_message: str) -> str: messages: list[ChatCompletionMessageParam] = [ { "role": "system", "content": {system_message}, }, { "role": "user", "content": input_message, }, ] res = client.chat.completions.create( messages=messages, model="gpt-4-1106-preview", ) return res["choices"][0]["content"]
この関数を使うことで、メッセージを受け取ってGPTの返信を生成することができると思います。
最も簡単な実装では、これで完成です。
Google Chat APIの30秒Timeoutを突破する
ただ、この実装だと正しく動作しないことがあることに気づかれるかと思います。
そうです。GoogleChatAPIは30秒以内に返答しないとタイムアウトしてしまいます。
そして、たいていの場合、ChatGPT APIの返答には30秒以上かかってしまいます。
この問題を解決する方法はいろいろありますが、Wrapper用のCloud Functionsを用意し、そこでGoogle Chat APIには速攻で返答を返しつつ、GPT API -> Google Chat API(via サービスアカウント)を担当する本体のCloud Functionsにリクエストを投げっぱなしにするのが簡単だと思いました。
実装はこんな感じです。
# Wrapper側 @functions_framework.http def wrapper(req: flask.Request): request_json: Any = req.get_json(silent=True) if request_json["type"] != "MESSAGE": return loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_in_executor(None, request, request_json) time.sleep(10) return {} def request(req): try: requests.post( {本体のURL}, json=req, ) except Exception as e: print(e)
# 本体側 credentials = service_account.Credentials.from_service_account_file( "secret.json", scopes=["https://www.googleapis.com/auth/chat.bot"] ) @functions_framework.http def hello_chat(req: flask.Request): request_json: Any = req.get_json(silent=True) msg = generate_response(request_json["message"]["text"]) post_to_chat(msg, request_json["message"]["thread"]["name"] ,request_json["space"]["name"]) return def post_to_chat(msg: str, thread: str, space: str): service = build("chat", "v1", credentials=credentials) service.spaces().messages().create( parent=space, body={ "text": msg, "thread": {"name": thread}, }, messageReplyOption="REPLY_MESSAGE_OR_FAIL", ).execute()
ここで、Google Chat APIを直接叩く形で使っているため、spaceやthreadの指定が必要になっている点に注意が必要です。
(なにげにこの仕様を調べるのにたいへん苦労しました……)
スレッドの会話ログを読み込むようにする
ここまでの実装で、送られてきたメッセージを読んでそれに対する返答を書くことができるようになりました。
ただ、GPTの返信に対してさらにユーザが返信した場合、スレッドの会話ログが送られないので、文脈を考慮しない返信しかできないという問題があります。
これを解決するのは大変です。
なぜかというと、特定のスレッドの全発言を取得するにはspace.messages.listを使う必要があるのですが、このAPIはサービスアカウントから使うことができず、ユーザ認証が必要だからです。
OAuthを使ってユーザ(たとえば、開発者自身)がログインし、そのrefresh_tokenを入手してください。
長くなるのでその方法は解説しませんが、以下のドキュメントが参考になります。
def get_thread(space: str, thread:str): service = credentials.Credentials.from_authorized_user_info( authorized_user_info, ["https://www.googleapis.com/auth/chat.messages"] ) messages = [] token: str | None = None while True: if token: chat = service.spaces().messages().list(parent=space, filter="thread.name =" + thread, pageToken=token) else: chat = service.spaces().messages().list(parent=space, filter="thread.name =" + thread) messages += chat["messages"] if "nextPageToken" in chat: token = chat["nextPageToken"] else: break return messages
このような実装で、投稿されたスレッドの投稿をすべて取得することができます。
ただ、このようにして取得した各メッセージの投稿者の情報にはidしか含まれておらず、displayNameがなくて不便です。
embedするような実装を作りましょう。
def get_members(space: str): chat_service = build("chat", "v1", credentials=credentials) members = chat_service.spaces().members().list(parent=space).execute() return members def embed_display_name_to_thread(messages, members): for message in messages: for member in members["memberships"]: if message["sender"]["name"] == member["member"]["name"]: message["sender"]["displayName"] = member["member"]["displayName"] break
この情報をいい感じにGPTに送信するようにすることで、スレッドの履歴や複数人の会話をうまく認識させることができます。
streamingでメッセージを返すようにする
ところで、WebでChatGPTを使ってる皆さんは、少しずつ(リアルタイムに打っているかのように)GPTが返信してくれる形態に慣れているのではないでしょうか。
この実装では、メッセージが完成してから投稿されるようになっており、待ち時間が長く不快です。
そのような実装にすることを考えてみましょう。
OpenAIのchat.completions apiにはstreamというパラメータがあり、これをTrueにすると少しずつデータを返してくれるようになります。
作戦はこうです。
- streamでGPT APIを叩く
- 1文字目をGoogle Chatに送る
- Google Chatの返信を待っている間にGPT APIを進める
- Google Chatの返信が来たら、最新の情報をGoogle Chatにupdateする
- 3に戻る
また、Async版のOpenAIライブラリが存在するので、それも使っていきましょう。
client = AsyncOpenAI(api_key={api_key}) async def create_message(space, thread, body) -> str: message = await asyncio.get_event_loop().run_in_executor( None, lambda: chat_service.spaces() .messages() .create( parent=space, body={ "text": body, "thread": {"name": thread}, }, messageReplyOption="REPLY_MESSAGE_OR_FAIL", ) .execute(), ) return message["name"] async def update_message(name, body): await asyncio.get_event_loop().run_in_executor( None, lambda: chat_service.spaces() .messages() .update( name=name, updateMask="text", body={"text": body}, ) .execute(), ) return def gpt(messages, space, thread): stream = await client.chat.completions.create(略) task: Task[str] | Task[None] | None = None message_name = "" body = "" async for res in stream: if task and task.done(): # createの場合、messageのnameが入る updateはNone task_result = task.result() if task_result: message_name = task_result task = None if res.choices[0].delta.content: body += res.choices[0].delta.content if res.choices[0].finish_reason == "stop": break if not task: if not message_name: task = asyncio.create_task(create_message(space, thread, body)) else: task = asyncio.create_task(update_message(message_name, body)) if task: res = await task if res: message_name = res await update_message(message_name, body)
ざっくりこのような実装で、どんどん追記していくスタイルの快適なGPT生活を手に入れることができると思います。
今後の発展要素
本記事での紹介はここまでにとどめますが、さらなる発展案として、以下のような案があります。
- Google Chatの添付ファイルを読み込むようにする
- Chat APIをservice accountから叩くことで実現可能
- Function Calling APIを用いて様々な機能を実現する
- たとえば、urlをgetしてその中身も解釈できるようにする機能が、弊社のGPTには実装されています
- 事業会社であれば本番DBを叩けるようにして様々な質問を自然言語で行うことができるかもしれません
おわりに
長々とした記事になってしまいましたが、ここまで読んでいただきありがとうございます。
週明け月曜日は、郡山さんの「 GA4の話(カリキュラム・ナレッジhttps://ja.dev/knowledge)」です!
楽しみにしていてください!